"""End-to-end test of the broadcast bot (Pattern 3, in-process FFI). Runs the real bot via profiles.start_bot, connects a publisher ("pub") and a non-publisher ("sub") to it, then checks: - a publisher's message is broadcast to all contacts (sub receives it) - a non-publisher's message gets the prohibited reply and is deleted Uses three libsimplex controllers in one process (bot + pub + sub) — exactly the multi-controller model the manager relies on. Needs network (SMP). Run: .venv/bin/python broadcast_test.py """ import asyncio import json import sys import time from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent)) import profiles as pm # noqa: E402 from simplex_chat import ChatApi, SqliteDb # noqa: E402 DATA = Path("data") BOT_PREFIX = str(DATA / "bctest_bot") PUB_PREFIX = str(DATA / "bctest_pub") SUB_PREFIX = str(DATA / "bctest_sub") BOT_PID = 99001 def cleanup(): for pat in ("bctest_bot_*", "bctest_pub_*", "bctest_sub_*"): for p in DATA.glob(pat): p.unlink() async def make_account(prefix: str, display: str) -> ChatApi: chat = await ChatApi.init(SqliteDb(file_prefix=prefix)) user = await chat.api_get_active_user() if not user: await chat.api_create_active_user({"displayName": display, "fullName": ""}) await chat.start_chat() return chat async def wait_until(fn, timeout=120, every=1): start = time.time() while time.time() - start < timeout: v = await fn() if v: return v await asyncio.sleep(every) return None async def first_contact_id(chat: ChatApi) -> int | None: u = await chat.api_get_active_user() cs = await chat.api_list_contacts(u["userId"]) return cs[0]["contactId"] if cs else None async def incoming_texts(chat: ChatApi, contact_id: int) -> list[str]: c = await chat.api_get_chat("direct", contact_id, 50) out = [] for ci in c.get("chatItems", []): d = ci.get("chatDir", {}).get("type", "") if d.endswith("Rcv"): out.append(ci.get("content", {}).get("msgContent", {}).get("text", "")) return out async def main() -> int: cleanup() addr_box = {} async def on_address(pid, addr): addr_box["addr"] = addr profile = { "id": BOT_PID, "name": "bctestbot", "bot_type": "broadcast", "db_prefix": BOT_PREFIX, "config": json.dumps({"publishers": ["pub"]}), } pub = sub = None ok = True try: await pm.start_bot(profile, on_address) addr = await wait_until(lambda: asyncio.sleep(0, addr_box.get("addr")), timeout=90) print("bot address:", bool(addr)) assert addr, "bot never published an address" pub = await make_account(PUB_PREFIX, "pub") sub = await make_account(SUB_PREFIX, "sub") await pub.send_chat_cmd(f"/connect {addr}") await sub.send_chat_cmd(f"/connect {addr}") pub_cid = await wait_until(lambda: first_contact_id(pub)) sub_cid = await wait_until(lambda: first_contact_id(sub)) print("pub connected:", bool(pub_cid), "| sub connected:", bool(sub_cid)) assert pub_cid and sub_cid, "publisher/subscriber did not connect" # wait until the BOT itself has both contacts, else /feed would miss sub both = await wait_until( lambda: asyncio.sleep(0, len(pm.get_running(BOT_PID).contacts) >= 2), timeout=60 ) print("bot sees both contacts:", bool(both)) # 1) publisher broadcasts → sub should receive it await pub.api_send_text_message({"chatType": "direct", "chatId": pub_cid}, "hello all") got_bcast = await wait_until( lambda: _contains(incoming_texts(sub, sub_cid), "hello all"), timeout=60, every=2 ) print("broadcast delivered to sub:", bool(got_bcast)) ok = ok and bool(got_bcast) # 2) non-publisher (sub) sends → should get prohibited reply await sub.api_send_text_message({"chatType": "direct", "chatId": sub_cid}, "spam please") got_prohibited = await wait_until( lambda: _contains(incoming_texts(sub, sub_cid), "deleted"), timeout=60, every=2 ) print("non-publisher got prohibited reply:", bool(got_prohibited)) ok = ok and bool(got_prohibited) except AssertionError as e: ok = False print("ASSERT FAIL:", e) finally: await pm.stop_bot(BOT_PID) for c in (pub, sub): if c: try: await c.close() except Exception: pass cleanup() print("\nRESULT:", "PASS" if ok else "FAIL") return 0 if ok else 1 async def _contains(coro, needle): texts = await coro return any(needle.lower() in (t or "").lower() for t in texts) if __name__ == "__main__": raise SystemExit(asyncio.run(main()))