"""Pattern-1 smoke test: spawn a real `simplex-chat -p` process, drive it over the WebSocket corrId/cmd↔resp handshake, and confirm a structured response comes back. Validates the supervisor + ws_client against a real binary. Local commands only (/user, /_create user, /users) — no network needed. Run (with ./bin/simplex-chat present): python smoke_test.py """ import asyncio import json import shutil import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent)) from supervisor.supervisor import Supervisor # noqa: E402 async def main() -> int: events: list[tuple[str, str]] = [] async def on_event(name: str, resp: dict) -> None: events.append((name, resp.get("type", "?"))) sup = Supervisor(data_dir="data", base_port=5400, on_event=on_event) ok = False try: print("→ spawning simplex-chat -p (profile 'smoke') …") m = await sup.start_cli("smoke") print(f" process pid={m.proc.pid} port={m.port}, WebSocket connected") print("→ /user (show active user)") r = await sup.send("smoke", "/user") print(" resp.type =", r.get("type")) if r.get("type") != "activeUser": new_user = { "profile": {"displayName": "smoke", "fullName": ""}, "pastTimestamp": False, "userChatRelay": False, } print("→ /_create user") rc = await sup.send("smoke", "/_create user " + json.dumps(new_user)) print(" resp.type =", rc.get("type")) r = await sup.send("smoke", "/user") print(" /user resp.type =", r.get("type")) user = r.get("user") or {} print(" active user displayName =", user.get("localDisplayName")) rl = await sup.send("smoke", "/users") print("→ /users resp.type =", rl.get("type"), "count =", len(rl.get("users", []) if isinstance(rl, dict) else [])) ok = r.get("type") == "activeUser" finally: await sup.stop("smoke") print("→ stopped process") print(" events observed:", events[:5]) print("\nRESULT:", "PASS — corrId/cmd↔resp handshake works" if ok else "FAIL") return 0 if ok else 1 if __name__ == "__main__": raise SystemExit(asyncio.run(main()))