"""End-to-end test of a 'business' profile (Pattern 3, in-process FFI). A business profile sets businessAddress=True, so a connecting customer gets their own GROUP chat (business chat) rather than a plain direct contact. This starts a business profile via profiles.start_bot, connects a customer, and asserts the customer ends up in a group (the distinguishing business-address behavior). Run: .venv/bin/python business_test.py """ import asyncio import json import sys import time from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent)) import profiles as pm # noqa: E402 from simplex_chat import ChatApi, SqliteDb # noqa: E402 DATA = Path("data") BIZ_PREFIX = str(DATA / "biztest_biz") CUST_PREFIX = str(DATA / "biztest_cust") BIZ_PID = 99002 def cleanup(): for pat in ("biztest_biz_*", "biztest_cust_*"): for p in DATA.glob(pat): p.unlink() async def wait_until(fn, timeout=120, every=1): start = time.time() while time.time() - start < timeout: v = await fn() if v: return v await asyncio.sleep(every) return None async def main() -> int: cleanup() addr_box = {} async def on_address(pid, addr): addr_box["addr"] = addr profile = { "id": BIZ_PID, "name": "biztestco", "bot_type": "business", "db_prefix": BIZ_PREFIX, "config": json.dumps({"welcome_message": "Thanks for contacting us!"}), } cust = None ok = True try: await pm.start_bot(profile, on_address) addr = await wait_until(lambda: asyncio.sleep(0, addr_box.get("addr")), timeout=90) print("business address:", bool(addr)) assert addr, "business profile never published an address" cust = await ChatApi.init(SqliteDb(file_prefix=CUST_PREFIX)) if not await cust.api_get_active_user(): await cust.api_create_active_user({"displayName": "customer", "fullName": ""}) await cust.start_chat() await cust.send_chat_cmd(f"/connect {addr}") u = await cust.api_get_active_user() uid = u["userId"] async def customer_groups(): return await cust.api_list_groups(uid) groups = await wait_until(lambda: customer_groups(), timeout=90, every=2) contacts = await cust.api_list_contacts(uid) print("customer groups:", [g.get("groupProfile", {}).get("displayName") for g in (groups or [])]) print("customer direct contacts:", [c.get("localDisplayName") for c in contacts]) # business address ⇒ a business GROUP chat, not a plain direct contact assert groups, "customer did not land in a business group (businessAddress not in effect?)" ok = bool(groups) except AssertionError as e: ok = False print("ASSERT FAIL:", e) finally: await pm.stop_bot(BIZ_PID) if cust: try: await cust.close() except Exception: pass cleanup() print("\nRESULT:", "PASS — business profile creates per-customer group chats" if ok else "FAIL") return 0 if ok else 1 if __name__ == "__main__": raise SystemExit(asyncio.run(main()))