Supervisor gains normalized helpers (get_profile/address, update_profile, contacts, groups, history, send, create/leave/delete/join group, clear chat, delete contact) exposed as REST. Front end rebuilt to mirror simplex-manager: - sidebar layout; profiles list + per-profile detail page - Profile card + Edit dialog (display name / full name / bio) - Address card: SMP link + copy + QR (qrcode) + scan caption - Contacts (Chat / Clear / Delete), Groups (Create, Chat / Link / Leave / Delete, Join when invited), Create Channel (observer link) - in-GUI chat view: history + composer with live polling - live Event Log per profile over the /events WebSocket Validated via running server: address shown, profile edit persists, group create returns link. (json/JSONResponse imports fixed.) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
291 lines
13 KiB
Python
291 lines
13 KiB
Python
"""Process supervisor: spawns and tracks the official SimpleX binaries.
|
|
|
|
Two kinds of managed process (see ../CLAUDE.md):
|
|
- cli : `simplex-chat -p PORT -d PREFIX` → we hold a WebSocket and stream commands
|
|
- directory : `simplex-directory-service ...` → autonomous; lifecycle + read its web-folder
|
|
- broadcast : `simplex-broadcast-bot ...` → autonomous; lifecycle only
|
|
|
|
The DB belongs to each process; we never open it ourselves while the process runs.
|
|
|
|
NOTE: exact flag spellings for the autonomous bots should be confirmed with
|
|
`<binary> --help` once binaries are in ./bin — the directory flags here follow
|
|
apps/simplex-directory-service/src/Directory/Options.hs.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import contextlib
|
|
import json
|
|
from collections.abc import Awaitable, Callable
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from .binaries import binary_path
|
|
from .ws_client import SimplexWSClient
|
|
|
|
# (profile_name, event_record) -> None
|
|
EventSink = Callable[[str, dict], Awaitable[None]]
|
|
|
|
|
|
@dataclass
|
|
class Managed:
|
|
name: str
|
|
kind: str # 'cli' | 'directory' | 'broadcast'
|
|
proc: asyncio.subprocess.Process
|
|
port: int | None = None
|
|
client: SimplexWSClient | None = None
|
|
uid: int | None = None # cached active-user id for cli profiles
|
|
|
|
|
|
class Supervisor:
|
|
def __init__(self, data_dir: str = "data", base_port: int = 5300, on_event: EventSink | None = None):
|
|
self.data_dir = Path(data_dir)
|
|
self.data_dir.mkdir(parents=True, exist_ok=True)
|
|
self._next_port = base_port
|
|
self._procs: dict[str, Managed] = {}
|
|
self._on_event = on_event
|
|
|
|
def _alloc_port(self) -> int:
|
|
port = self._next_port
|
|
self._next_port += 1
|
|
return port
|
|
|
|
def list(self) -> list[dict]:
|
|
return [
|
|
{"name": m.name, "kind": m.kind, "port": m.port, "running": m.proc.returncode is None}
|
|
for m in self._procs.values()
|
|
]
|
|
|
|
# ── Pattern 1: interactive CLI (driven over WebSocket) ──────────────────────
|
|
async def start_cli(self, name: str, display_name: str | None = None,
|
|
allow_files: bool = True, extra_args: tuple[str, ...] = ()) -> Managed:
|
|
if name in self._procs:
|
|
return self._procs[name]
|
|
port = self._alloc_port()
|
|
prefix = str(self.data_dir / name)
|
|
# On a fresh DB the CLI prompts for a display name on stdin and won't start
|
|
# the WS server. --create-bot-display-name creates the profile non-interactively
|
|
# (no-op once the DB exists, so it's safe to always pass).
|
|
args = ["-p", str(port), "-d", prefix, "--create-bot-display-name", display_name or name]
|
|
if allow_files:
|
|
args.append("--create-bot-allow-files")
|
|
args += list(extra_args)
|
|
proc = await asyncio.create_subprocess_exec(
|
|
binary_path("simplex-chat"), *args,
|
|
stdin=asyncio.subprocess.DEVNULL,
|
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT,
|
|
)
|
|
client = SimplexWSClient(port, on_event=lambda r: self._emit(name, r))
|
|
await self._connect_with_retry(client)
|
|
m = Managed(name=name, kind="cli", proc=proc, port=port, client=client)
|
|
self._procs[name] = m
|
|
return m
|
|
|
|
async def _connect_with_retry(self, client: SimplexWSClient, attempts: int = 40, delay: float = 0.25) -> None:
|
|
for _ in range(attempts):
|
|
try:
|
|
await client.connect()
|
|
return
|
|
except OSError:
|
|
await asyncio.sleep(delay) # WS server not up yet
|
|
raise RuntimeError("simplex-chat websocket did not come up")
|
|
|
|
async def send(self, name: str, cmd: str) -> dict:
|
|
m = self._procs.get(name)
|
|
if not m or not m.client:
|
|
raise RuntimeError(f"{name!r} is not a running cli profile")
|
|
return await m.client.send_cmd(cmd)
|
|
|
|
# ── High-level helpers (normalize the binary's responses for the GUI) ───────
|
|
def _cli(self, name: str) -> "Managed":
|
|
m = self._procs.get(name)
|
|
if not m or not m.client:
|
|
raise RuntimeError(f"{name!r} is not a running cli profile")
|
|
return m
|
|
|
|
async def _uid(self, name: str) -> int:
|
|
m = self._cli(name)
|
|
if m.uid is None:
|
|
r = await m.client.send_cmd("/user")
|
|
m.uid = r["user"]["userId"]
|
|
return m.uid
|
|
|
|
async def get_profile(self, name: str) -> dict:
|
|
m = self._cli(name)
|
|
u = (await m.client.send_cmd("/user")).get("user", {})
|
|
p = u.get("profile", {})
|
|
return {
|
|
"displayName": p.get("displayName", name),
|
|
"fullName": p.get("fullName", ""),
|
|
"bio": p.get("shortDescr", ""),
|
|
"image": p.get("image", ""),
|
|
"address": await self.get_address(name),
|
|
}
|
|
|
|
async def get_address(self, name: str) -> str:
|
|
m = self._cli(name)
|
|
uid = await self._uid(name)
|
|
r = await m.client.send_cmd(f"/_show_address {uid}")
|
|
if r.get("type") == "userContactLink":
|
|
link = r["contactLink"]["connLinkContact"]
|
|
else:
|
|
rc = await m.client.send_cmd(f"/_address {uid}")
|
|
link = rc.get("connLinkContact", {}) if rc.get("type") == "userContactLinkCreated" else {}
|
|
# auto-accept incoming contact requests so the address is usable
|
|
settings = {"businessAddress": False, "autoAccept": {"acceptIncognito": False}}
|
|
await m.client.send_cmd(f"/_address_settings {uid} " + json.dumps(settings))
|
|
return link.get("connShortLink") or link.get("connFullLink", "")
|
|
|
|
async def update_profile(self, name: str, display_name: str, full_name: str, bio: str) -> bool:
|
|
m = self._cli(name)
|
|
uid = await self._uid(name)
|
|
profile = {"displayName": display_name, "fullName": full_name}
|
|
if bio:
|
|
profile["shortDescr"] = bio
|
|
await m.client.send_cmd(f"/_profile {uid} " + json.dumps(profile))
|
|
return True
|
|
|
|
async def get_contacts(self, name: str) -> list[dict]:
|
|
m = self._cli(name)
|
|
uid = await self._uid(name)
|
|
r = await m.client.send_cmd(f"/_contacts {uid}")
|
|
return [
|
|
{"contactId": c["contactId"], "name": c["localDisplayName"]}
|
|
for c in r.get("contacts", [])
|
|
]
|
|
|
|
async def get_groups(self, name: str) -> list[dict]:
|
|
m = self._cli(name)
|
|
uid = await self._uid(name)
|
|
r = await m.client.send_cmd(f"/_groups {uid}")
|
|
out = []
|
|
for g in r.get("groups", []):
|
|
mem = g.get("membership", {})
|
|
out.append({
|
|
"groupId": g["groupId"],
|
|
"name": g["groupProfile"]["displayName"],
|
|
"members": g.get("groupSummary", {}).get("currentMembers", 0),
|
|
"role": mem.get("memberRole", ""),
|
|
"status": mem.get("memberStatus", ""),
|
|
})
|
|
return out
|
|
|
|
async def get_history(self, name: str, chat_type: str, chat_id: int, count: int = 50) -> list[dict]:
|
|
m = self._cli(name)
|
|
ref = ("@" if chat_type == "direct" else "#") + str(chat_id)
|
|
r = await m.client.send_cmd(f"/_get chat {ref} count={count}")
|
|
items = r.get("chat", {}).get("chatItems", []) if r.get("type") == "apiChat" else []
|
|
out = []
|
|
for ci in items:
|
|
meta = ci.get("meta", {})
|
|
d = ci.get("chatDir", {}).get("type", "")
|
|
text = meta.get("itemText") or ci.get("content", {}).get("msgContent", {}).get("text", "")
|
|
sender = ci.get("chatDir", {}).get("groupMember", {}).get("localDisplayName", "") if d == "groupRcv" else ""
|
|
out.append({"id": meta.get("itemId"), "ts": meta.get("itemTs", ""),
|
|
"text": text, "outgoing": d.endswith("Snd"), "sender": sender})
|
|
return out
|
|
|
|
async def send_message(self, name: str, chat_type: str, chat_id: int, text: str) -> bool:
|
|
m = self._cli(name)
|
|
ref = ("@" if chat_type == "direct" else "#") + str(chat_id)
|
|
msgs = [{"msgContent": {"type": "text", "text": text}, "mentions": {}}]
|
|
await m.client.send_cmd(f"/_send {ref} json " + json.dumps(msgs))
|
|
return True
|
|
|
|
async def create_group(self, name: str, group_name: str, observer: bool = False) -> dict:
|
|
m = self._cli(name)
|
|
uid = await self._uid(name)
|
|
gi = await m.client.send_cmd(f"/_group {uid} " + json.dumps({"displayName": group_name, "fullName": ""}))
|
|
gid = gi["groupInfo"]["groupId"]
|
|
role = "observer" if observer else "member"
|
|
lk = await m.client.send_cmd(f"/_create link #{gid} {role}")
|
|
link = lk.get("groupLink", {}).get("connLinkContact", {}) if lk.get("type") == "groupLinkCreated" else {}
|
|
return {"groupId": gid, "link": link.get("connShortLink") or link.get("connFullLink", "")}
|
|
|
|
async def group_link(self, name: str, gid: int) -> str:
|
|
m = self._cli(name)
|
|
r = await m.client.send_cmd(f"/_get link #{gid}")
|
|
link = r.get("groupLink", {}).get("connLinkContact", {}) if r.get("type") == "groupLink" else {}
|
|
return link.get("connShortLink") or link.get("connFullLink", "")
|
|
|
|
async def leave_group(self, name: str, gid: int) -> bool:
|
|
await self._cli(name).client.send_cmd(f"/_leave #{gid}")
|
|
return True
|
|
|
|
async def delete_group(self, name: str, gid: int) -> bool:
|
|
await self._cli(name).client.send_cmd(f"/_delete #{gid} full")
|
|
return True
|
|
|
|
async def join_group(self, name: str, gid: int) -> bool:
|
|
await self._cli(name).client.send_cmd(f"/_join #{gid}")
|
|
return True
|
|
|
|
async def clear_chat(self, name: str, chat_type: str, chat_id: int) -> bool:
|
|
ref = ("@" if chat_type == "direct" else "#") + str(chat_id)
|
|
await self._cli(name).client.send_cmd(f"/_delete {ref} messages")
|
|
return True
|
|
|
|
async def delete_contact(self, name: str, contact_id: int) -> bool:
|
|
await self._cli(name).client.send_cmd(f"/_delete @{contact_id} full")
|
|
return True
|
|
|
|
# ── Pattern 2: autonomous official bots (lifecycle + config-at-launch) ──────
|
|
async def start_directory(self, name: str, super_users: str, web_folder: str,
|
|
extra_args: tuple[str, ...] = ()) -> Managed:
|
|
if name in self._procs:
|
|
return self._procs[name]
|
|
prefix = str(self.data_dir / name)
|
|
proc = await asyncio.create_subprocess_exec(
|
|
binary_path("simplex-directory-service"),
|
|
"-d", prefix,
|
|
"--super-users", super_users, # CONTACT_ID:NAME,...
|
|
"--directory-file", f"{prefix}_directory.log", # append-only state log
|
|
"--web-folder", web_folder, # bot writes listing files here
|
|
*extra_args,
|
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT,
|
|
)
|
|
m = Managed(name=name, kind="directory", proc=proc) # no WS — autonomous
|
|
self._procs[name] = m
|
|
return m
|
|
|
|
async def start_broadcast(self, name: str, display_name: str, publishers: str,
|
|
extra_args: tuple[str, ...] = ()) -> Managed:
|
|
if name in self._procs:
|
|
return self._procs[name]
|
|
prefix = str(self.data_dir / name)
|
|
proc = await asyncio.create_subprocess_exec(
|
|
binary_path("simplex-broadcast-bot"),
|
|
"-d", prefix,
|
|
"--display-name", display_name,
|
|
"--publishers", publishers, # CONTACT_ID:NAME,...
|
|
*extra_args,
|
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT,
|
|
)
|
|
m = Managed(name=name, kind="broadcast", proc=proc)
|
|
self._procs[name] = m
|
|
return m
|
|
|
|
# ── Lifecycle ───────────────────────────────────────────────────────────────
|
|
async def stop(self, name: str) -> None:
|
|
m = self._procs.pop(name, None)
|
|
if not m:
|
|
return
|
|
if m.client:
|
|
await m.client.close()
|
|
with contextlib.suppress(ProcessLookupError):
|
|
m.proc.terminate()
|
|
with contextlib.suppress(asyncio.TimeoutError):
|
|
await asyncio.wait_for(m.proc.wait(), 5)
|
|
with contextlib.suppress(ProcessLookupError):
|
|
if m.proc.returncode is None:
|
|
m.proc.kill()
|
|
|
|
async def stop_all(self) -> None:
|
|
for name in list(self._procs):
|
|
await self.stop(name)
|
|
|
|
async def _emit(self, name: str, resp: dict) -> None:
|
|
if self._on_event:
|
|
await self._on_event(name, resp)
|