"""Process supervisor: spawns and tracks the official SimpleX binaries. Two kinds of managed process (see ../CLAUDE.md): - cli : `simplex-chat -p PORT -d PREFIX` → we hold a WebSocket and stream commands - directory : `simplex-directory-service ...` → autonomous; lifecycle + read its web-folder - broadcast : `simplex-broadcast-bot ...` → autonomous; lifecycle only The DB belongs to each process; we never open it ourselves while the process runs. NOTE: exact flag spellings for the autonomous bots should be confirmed with ` --help` once binaries are in ./bin — the directory flags here follow apps/simplex-directory-service/src/Directory/Options.hs. """ import asyncio import contextlib from collections.abc import Awaitable, Callable from dataclasses import dataclass from pathlib import Path from .binaries import binary_path from .ws_client import SimplexWSClient # (profile_name, event_record) -> None EventSink = Callable[[str, dict], Awaitable[None]] @dataclass class Managed: name: str kind: str # 'cli' | 'directory' | 'broadcast' proc: asyncio.subprocess.Process port: int | None = None client: SimplexWSClient | None = None class Supervisor: def __init__(self, data_dir: str = "data", base_port: int = 5300, on_event: EventSink | None = None): self.data_dir = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) self._next_port = base_port self._procs: dict[str, Managed] = {} self._on_event = on_event def _alloc_port(self) -> int: port = self._next_port self._next_port += 1 return port def list(self) -> list[dict]: return [ {"name": m.name, "kind": m.kind, "port": m.port, "running": m.proc.returncode is None} for m in self._procs.values() ] # ── Pattern 1: interactive CLI (driven over WebSocket) ────────────────────── async def start_cli(self, name: str, display_name: str | None = None, allow_files: bool = True, extra_args: tuple[str, ...] = ()) -> Managed: if name in self._procs: return self._procs[name] port = self._alloc_port() prefix = str(self.data_dir / name) # On a fresh DB the CLI prompts for a display name on stdin and won't start # the WS server. --create-bot-display-name creates the profile non-interactively # (no-op once the DB exists, so it's safe to always pass). args = ["-p", str(port), "-d", prefix, "--create-bot-display-name", display_name or name] if allow_files: args.append("--create-bot-allow-files") args += list(extra_args) proc = await asyncio.create_subprocess_exec( binary_path("simplex-chat"), *args, stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) client = SimplexWSClient(port, on_event=lambda r: self._emit(name, r)) await self._connect_with_retry(client) m = Managed(name=name, kind="cli", proc=proc, port=port, client=client) self._procs[name] = m return m async def _connect_with_retry(self, client: SimplexWSClient, attempts: int = 40, delay: float = 0.25) -> None: for _ in range(attempts): try: await client.connect() return except OSError: await asyncio.sleep(delay) # WS server not up yet raise RuntimeError("simplex-chat websocket did not come up") async def send(self, name: str, cmd: str) -> dict: m = self._procs.get(name) if not m or not m.client: raise RuntimeError(f"{name!r} is not a running cli profile") return await m.client.send_cmd(cmd) # ── Pattern 2: autonomous official bots (lifecycle + config-at-launch) ────── async def start_directory(self, name: str, super_users: str, web_folder: str, extra_args: tuple[str, ...] = ()) -> Managed: if name in self._procs: return self._procs[name] prefix = str(self.data_dir / name) proc = await asyncio.create_subprocess_exec( binary_path("simplex-directory-service"), "-d", prefix, "--super-users", super_users, # CONTACT_ID:NAME,... "--directory-file", f"{prefix}_directory.log", # append-only state log "--web-folder", web_folder, # bot writes listing files here *extra_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) m = Managed(name=name, kind="directory", proc=proc) # no WS — autonomous self._procs[name] = m return m async def start_broadcast(self, name: str, display_name: str, publishers: str, extra_args: tuple[str, ...] = ()) -> Managed: if name in self._procs: return self._procs[name] prefix = str(self.data_dir / name) proc = await asyncio.create_subprocess_exec( binary_path("simplex-broadcast-bot"), "-d", prefix, "--display-name", display_name, "--publishers", publishers, # CONTACT_ID:NAME,... *extra_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) m = Managed(name=name, kind="broadcast", proc=proc) self._procs[name] = m return m # ── Lifecycle ─────────────────────────────────────────────────────────────── async def stop(self, name: str) -> None: m = self._procs.pop(name, None) if not m: return if m.client: await m.client.close() with contextlib.suppress(ProcessLookupError): m.proc.terminate() with contextlib.suppress(asyncio.TimeoutError): await asyncio.wait_for(m.proc.wait(), 5) with contextlib.suppress(ProcessLookupError): if m.proc.returncode is None: m.proc.kill() async def stop_all(self) -> None: for name in list(self._procs): await self.stop(name) async def _emit(self, name: str, resp: dict) -> None: if self._on_event: await self._on_event(name, resp)