"""WebSocket client for a `simplex-chat -p ` process (Pattern 1 control). Protocol (simplex-chat/bots/README.md): - send {"corrId": "", "cmd": ""} - response{"corrId": "", "resp": {"type": ..., ...}} (matched by corrId) - event {"resp": {"type": ..., ...}} (no corrId) We keep one long-lived connection per profile. Responses resolve futures keyed by corrId; events (no corrId) are pushed to an async callback. """ import asyncio import itertools import json from collections.abc import Awaitable, Callable import websockets EventHandler = Callable[[dict], Awaitable[None]] class SimplexWSClient: def __init__(self, port: int, on_event: EventHandler | None = None): self.url = f"ws://localhost:{port}/" self._on_event = on_event self._ws: websockets.WebSocketClientProtocol | None = None self._corr = itertools.count(1) self._pending: dict[str, asyncio.Future] = {} self._reader: asyncio.Task | None = None async def connect(self) -> None: # max_size=None: chat responses (chat lists, profiles w/ avatars) can be large. self._ws = await websockets.connect(self.url, max_size=None) self._reader = asyncio.create_task(self._read_loop()) async def _read_loop(self) -> None: assert self._ws is not None async for raw in self._ws: try: msg = json.loads(raw) except Exception: continue corr = msg.get("corrId") resp = msg.get("resp") fut = self._pending.pop(corr, None) if corr else None if fut is not None: if not fut.done(): fut.set_result(resp) elif resp is not None and self._on_event is not None: # Forward-compat: never fail on unknown event types (bots/README.md) try: await self._on_event(resp) except Exception: pass async def send_cmd(self, cmd: str, timeout: float = 30.0) -> dict: """Send a chat command string, await its matching response record.""" if self._ws is None: raise RuntimeError("not connected") corr = str(next(self._corr)) fut: asyncio.Future = asyncio.get_event_loop().create_future() self._pending[corr] = fut await self._ws.send(json.dumps({"corrId": corr, "cmd": cmd})) try: return await asyncio.wait_for(fut, timeout) finally: self._pending.pop(corr, None) async def close(self) -> None: if self._reader: self._reader.cancel() if self._ws: await self._ws.close()