"""Process supervisor: spawns and tracks the official SimpleX binaries. Two kinds of managed process (see ../CLAUDE.md): - cli : `simplex-chat -p PORT -d PREFIX` → we hold a WebSocket and stream commands - directory : `simplex-directory-service ...` → autonomous; lifecycle + read its web-folder - broadcast : `simplex-broadcast-bot ...` → autonomous; lifecycle only The DB belongs to each process; we never open it ourselves while the process runs. NOTE: exact flag spellings for the autonomous bots should be confirmed with ` --help` once binaries are in ./bin — the directory flags here follow apps/simplex-directory-service/src/Directory/Options.hs. """ from __future__ import annotations import asyncio import contextlib import json from collections.abc import Awaitable, Callable from dataclasses import dataclass from pathlib import Path from .binaries import binary_path from .ws_client import SimplexWSClient # (profile_name, event_record) -> None EventSink = Callable[[str, dict], Awaitable[None]] @dataclass class Managed: name: str kind: str # 'cli' | 'directory' | 'broadcast' proc: asyncio.subprocess.Process port: int | None = None client: SimplexWSClient | None = None uid: int | None = None # cached active-user id for cli profiles class Supervisor: def __init__(self, data_dir: str = "data", base_port: int = 5300, on_event: EventSink | None = None): self.data_dir = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) self._next_port = base_port self._procs: dict[str, Managed] = {} self._on_event = on_event def _alloc_port(self) -> int: port = self._next_port self._next_port += 1 return port def list(self) -> list[dict]: return [ {"name": m.name, "kind": m.kind, "port": m.port, "running": m.proc.returncode is None} for m in self._procs.values() ] # ── Pattern 1: interactive CLI (driven over WebSocket) ────────────────────── async def start_cli(self, name: str, display_name: str | None = None, allow_files: bool = True, extra_args: tuple[str, ...] = ()) -> Managed: if name in self._procs: return self._procs[name] port = self._alloc_port() prefix = str(self.data_dir / name) # On a fresh DB the CLI prompts for a display name on stdin and won't start # the WS server. --create-bot-display-name creates the profile non-interactively # (no-op once the DB exists, so it's safe to always pass). args = ["-p", str(port), "-d", prefix, "--create-bot-display-name", display_name or name] if allow_files: args.append("--create-bot-allow-files") args += list(extra_args) proc = await asyncio.create_subprocess_exec( binary_path("simplex-chat"), *args, stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) client = SimplexWSClient(port, on_event=lambda r: self._emit(name, r)) await self._connect_with_retry(client) m = Managed(name=name, kind="cli", proc=proc, port=port, client=client) self._procs[name] = m return m async def _connect_with_retry(self, client: SimplexWSClient, attempts: int = 40, delay: float = 0.25) -> None: for _ in range(attempts): try: await client.connect() return except OSError: await asyncio.sleep(delay) # WS server not up yet raise RuntimeError("simplex-chat websocket did not come up") async def send(self, name: str, cmd: str) -> dict: m = self._procs.get(name) if not m or not m.client: raise RuntimeError(f"{name!r} is not a running cli profile") return await m.client.send_cmd(cmd) # ── High-level helpers (normalize the binary's responses for the GUI) ─────── def _cli(self, name: str) -> "Managed": m = self._procs.get(name) if not m or not m.client: raise RuntimeError(f"{name!r} is not a running cli profile") return m async def _uid(self, name: str) -> int: m = self._cli(name) if m.uid is None: r = await m.client.send_cmd("/user") m.uid = r["user"]["userId"] return m.uid async def get_profile(self, name: str) -> dict: m = self._cli(name) u = (await m.client.send_cmd("/user")).get("user", {}) p = u.get("profile", {}) return { "displayName": p.get("displayName", name), "fullName": p.get("fullName", ""), "bio": p.get("shortDescr", ""), "image": p.get("image", ""), "address": await self.get_address(name), } async def get_address(self, name: str) -> str: m = self._cli(name) uid = await self._uid(name) r = await m.client.send_cmd(f"/_show_address {uid}") if r.get("type") == "userContactLink": link = r["contactLink"]["connLinkContact"] else: rc = await m.client.send_cmd(f"/_address {uid}") link = rc.get("connLinkContact", {}) if rc.get("type") == "userContactLinkCreated" else {} # auto-accept incoming contact requests so the address is usable settings = {"businessAddress": False, "autoAccept": {"acceptIncognito": False}} await m.client.send_cmd(f"/_address_settings {uid} " + json.dumps(settings)) return link.get("connShortLink") or link.get("connFullLink", "") async def update_profile(self, name: str, display_name: str, full_name: str, bio: str) -> bool: m = self._cli(name) uid = await self._uid(name) profile = {"displayName": display_name, "fullName": full_name} if bio: profile["shortDescr"] = bio await m.client.send_cmd(f"/_profile {uid} " + json.dumps(profile)) return True async def get_contacts(self, name: str) -> list[dict]: m = self._cli(name) uid = await self._uid(name) r = await m.client.send_cmd(f"/_contacts {uid}") return [ {"contactId": c["contactId"], "name": c["localDisplayName"]} for c in r.get("contacts", []) ] async def get_groups(self, name: str) -> list[dict]: m = self._cli(name) uid = await self._uid(name) r = await m.client.send_cmd(f"/_groups {uid}") out = [] for g in r.get("groups", []): mem = g.get("membership", {}) out.append({ "groupId": g["groupId"], "name": g["groupProfile"]["displayName"], "members": g.get("groupSummary", {}).get("currentMembers", 0), "role": mem.get("memberRole", ""), "status": mem.get("memberStatus", ""), }) return out async def get_history(self, name: str, chat_type: str, chat_id: int, count: int = 50) -> list[dict]: m = self._cli(name) ref = ("@" if chat_type == "direct" else "#") + str(chat_id) r = await m.client.send_cmd(f"/_get chat {ref} count={count}") items = r.get("chat", {}).get("chatItems", []) if r.get("type") == "apiChat" else [] out = [] for ci in items: meta = ci.get("meta", {}) d = ci.get("chatDir", {}).get("type", "") text = meta.get("itemText") or ci.get("content", {}).get("msgContent", {}).get("text", "") sender = ci.get("chatDir", {}).get("groupMember", {}).get("localDisplayName", "") if d == "groupRcv" else "" out.append({"id": meta.get("itemId"), "ts": meta.get("itemTs", ""), "text": text, "outgoing": d.endswith("Snd"), "sender": sender}) return out async def send_message(self, name: str, chat_type: str, chat_id: int, text: str) -> bool: m = self._cli(name) ref = ("@" if chat_type == "direct" else "#") + str(chat_id) msgs = [{"msgContent": {"type": "text", "text": text}, "mentions": {}}] await m.client.send_cmd(f"/_send {ref} json " + json.dumps(msgs)) return True async def create_group(self, name: str, group_name: str, observer: bool = False) -> dict: m = self._cli(name) uid = await self._uid(name) gi = await m.client.send_cmd(f"/_group {uid} " + json.dumps({"displayName": group_name, "fullName": ""})) gid = gi["groupInfo"]["groupId"] role = "observer" if observer else "member" lk = await m.client.send_cmd(f"/_create link #{gid} {role}") link = lk.get("groupLink", {}).get("connLinkContact", {}) if lk.get("type") == "groupLinkCreated" else {} return {"groupId": gid, "link": link.get("connShortLink") or link.get("connFullLink", "")} async def group_link(self, name: str, gid: int) -> str: m = self._cli(name) r = await m.client.send_cmd(f"/_get link #{gid}") link = r.get("groupLink", {}).get("connLinkContact", {}) if r.get("type") == "groupLink" else {} return link.get("connShortLink") or link.get("connFullLink", "") async def leave_group(self, name: str, gid: int) -> bool: await self._cli(name).client.send_cmd(f"/_leave #{gid}") return True async def delete_group(self, name: str, gid: int) -> bool: await self._cli(name).client.send_cmd(f"/_delete #{gid} full") return True async def join_group(self, name: str, gid: int) -> bool: await self._cli(name).client.send_cmd(f"/_join #{gid}") return True async def clear_chat(self, name: str, chat_type: str, chat_id: int) -> bool: ref = ("@" if chat_type == "direct" else "#") + str(chat_id) await self._cli(name).client.send_cmd(f"/_delete {ref} messages") return True async def delete_contact(self, name: str, contact_id: int) -> bool: await self._cli(name).client.send_cmd(f"/_delete @{contact_id} full") return True # ── Pattern 2: autonomous official bots (lifecycle + config-at-launch) ────── async def start_directory(self, name: str, super_users: str, web_folder: str, extra_args: tuple[str, ...] = ()) -> Managed: if name in self._procs: return self._procs[name] prefix = str(self.data_dir / name) proc = await asyncio.create_subprocess_exec( binary_path("simplex-directory-service"), "-d", prefix, "--super-users", super_users, # CONTACT_ID:NAME,... "--directory-file", f"{prefix}_directory.log", # append-only state log "--web-folder", web_folder, # bot writes listing files here *extra_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) m = Managed(name=name, kind="directory", proc=proc) # no WS — autonomous self._procs[name] = m return m async def start_broadcast(self, name: str, display_name: str, publishers: str, extra_args: tuple[str, ...] = ()) -> Managed: if name in self._procs: return self._procs[name] prefix = str(self.data_dir / name) proc = await asyncio.create_subprocess_exec( binary_path("simplex-broadcast-bot"), "-d", prefix, "--display-name", display_name, "--publishers", publishers, # CONTACT_ID:NAME,... *extra_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) m = Managed(name=name, kind="broadcast", proc=proc) self._procs[name] = m return m # ── Lifecycle ─────────────────────────────────────────────────────────────── async def stop(self, name: str) -> None: m = self._procs.pop(name, None) if not m: return if m.client: await m.client.close() with contextlib.suppress(ProcessLookupError): m.proc.terminate() with contextlib.suppress(asyncio.TimeoutError): await asyncio.wait_for(m.proc.wait(), 5) with contextlib.suppress(ProcessLookupError): if m.proc.returncode is None: m.proc.kill() async def stop_all(self) -> None: for name in list(self._procs): await self.stop(name) async def _emit(self, name: str, resp: dict) -> None: if self._on_event: await self._on_event(name, resp)