Files
simplex-manager/manager/profiles.py
Jon 11e799188d Add Python manager: FastAPI backend + web UI
- main.py: FastAPI app with profile CRUD, start/stop, send message endpoints
- profiles.py: asyncio bot lifecycle using simplex-chat Python SDK
- db.py: SQLite registry tracking profiles, types, config, addresses
- templates/: Jinja2 + HTMX web UI
  - login.html: token-based auth
  - index.html: profile list with live status polling, create dialog
  - profile.html: per-bot dashboard with QR code, contacts/groups, event log, send form
- requirements.txt: fastapi, uvicorn, jinja2, simplex-chat
- start.sh: one-command startup with venv bootstrap

Bot types: echo, broadcast, support (business address), directory, deadmans
Run: cd manager && MANAGER_TOKEN=secret ./start.sh

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 00:53:41 +01:00

244 lines
8.1 KiB
Python

"""Bot lifecycle management — start, stop, status, message sending."""
import asyncio
import json
import logging
from dataclasses import dataclass, field
from typing import Any
log = logging.getLogger(__name__)
BOT_TYPES = ["echo", "broadcast", "support", "directory", "deadmans"]
@dataclass
class RunningBot:
profile_id: int
name: str
bot_type: str
task: asyncio.Task
address: str = ""
contacts: list[dict] = field(default_factory=list)
groups: list[dict] = field(default_factory=list)
log_lines: list[str] = field(default_factory=list)
chat: Any = None # simplex_chat ChatApi instance
# profile_id → RunningBot
_running: dict[int, RunningBot] = {}
def is_running(profile_id: int) -> bool:
b = _running.get(profile_id)
return b is not None and not b.task.done()
def get_running(profile_id: int) -> RunningBot | None:
b = _running.get(profile_id)
if b and not b.task.done():
return b
return None
def all_statuses() -> dict[int, bool]:
return {pid: is_running(pid) for pid in _running}
async def start_bot(profile: dict, on_address: callable) -> None:
"""Start a bot for the given profile dict. Idempotent."""
pid = profile["id"]
if is_running(pid):
return
config = json.loads(profile.get("config") or "{}")
bot_type = profile["bot_type"]
db_prefix = profile["db_prefix"]
task = asyncio.create_task(
_run_bot(pid, profile["name"], bot_type, db_prefix, config, on_address),
name=f"bot-{pid}-{profile['name']}",
)
_running[pid] = RunningBot(
profile_id=pid,
name=profile["name"],
bot_type=bot_type,
task=task,
)
log.info("Started bot %d (%s / %s)", pid, profile["name"], bot_type)
async def stop_bot(profile_id: int) -> None:
b = _running.get(profile_id)
if b and not b.task.done():
b.task.cancel()
try:
await b.task
except asyncio.CancelledError:
pass
log.info("Stopped bot %d", profile_id)
async def send_message(profile_id: int, contact_or_group: str, text: str) -> bool:
"""Send a text message from a running bot. Returns True on success."""
b = get_running(profile_id)
if not b or not b.chat:
return False
try:
contacts = await b.chat.api_list_contacts(1)
for c in contacts:
if c["localDisplayName"] == contact_or_group:
await b.chat.api_send_text_message(
{"chatType": "direct", "chatId": c["contactId"]}, text
)
return True
groups = await b.chat.api_list_groups(1)
for g in groups:
if g["groupInfo"]["groupProfile"]["displayName"] == contact_or_group:
await b.chat.api_send_text_message(
{"chatType": "group", "chatId": g["groupInfo"]["groupId"]}, text
)
return True
except Exception as e:
log.error("send_message error: %s", e)
return False
async def _run_bot(
profile_id: int,
name: str,
bot_type: str,
db_prefix: str,
config: dict,
on_address: callable,
) -> None:
"""Inner coroutine — runs the simplex-chat event loop for one profile."""
try:
from simplex_chat import ChatApi, SqliteDb
except ImportError:
log.error("simplex-chat Python package not installed. Run: pip install simplex-chat")
return
b = _running[profile_id]
try:
chat = await ChatApi.init(SqliteDb(file_prefix=db_prefix))
b.chat = chat
await chat.start_chat()
# Create or fetch address
user = await chat.api_get_active_user()
if not user:
user = await chat.api_create_active_user(
{"displayName": name, "fullName": ""}
)
user_id = user["userId"]
addr = await chat.api_get_user_address(user_id)
if not addr:
addr = await chat.api_create_user_address(user_id)
address = addr.get("connShortLink") or addr.get("connFullLink", "")
b.address = address
await on_address(profile_id, address)
# Configure address settings based on bot type
settings: dict = {"businessAddress": False, "autoAccept": {"acceptIncognito": False}}
if bot_type == "support":
settings["businessAddress"] = True
welcome = config.get("welcome_message", f"Welcome to {name} support.")
settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type in ("echo", "broadcast", "directory", "deadmans"):
welcome = config.get("welcome_message", f"Connected to {name}.")
settings["autoReply"] = {"type": "text", "text": welcome}
await chat.api_set_address_settings(user_id, settings)
# Refresh contacts/groups
async def refresh() -> None:
try:
b.contacts = await chat.api_list_contacts(user_id)
b.groups = await chat.api_list_groups(user_id)
except Exception:
pass
await refresh()
# Event loop
while True:
evt = await chat.recv_chat_event(500_000)
if evt is None:
continue
tag = evt.get("type", "")
b.log_lines.append(f"[{tag}]")
if len(b.log_lines) > 200:
b.log_lines = b.log_lines[-200:]
if tag == "contactConnected":
await refresh()
ct = evt.get("contact", {})
ct_name = ct.get("localDisplayName", "?")
_append_log(b, f"Contact connected: {ct_name}")
if bot_type == "echo":
pass # echo handled on message
elif bot_type == "broadcast":
welcome = config.get("welcome_message", "You are subscribed.")
try:
await chat.api_send_text_message(
{"chatType": "direct", "chatId": ct["contactId"]}, welcome
)
except Exception:
pass
elif tag == "newChatItems":
items = evt.get("chatItems", [])
for item in items:
ci = item.get("chatItem", {})
direction = ci.get("meta", {}).get("itemStatus", {}).get("type", "")
if direction != "sndSent":
content = ci.get("content", {})
mc = content.get("msgContent", {})
text = mc.get("text", "")
chat_info = item.get("chatInfo", {})
_append_log(b, f"Message: {text[:80]}")
if bot_type == "echo" and text:
try:
await chat.api_send_text_reply(item, f"Echo: {text}")
except Exception:
pass
elif bot_type == "broadcast":
publishers = config.get("publishers", [])
sender = chat_info.get("contact", {}).get("localDisplayName", "")
if sender in publishers and text:
# broadcast to all contacts
contacts = await chat.api_list_contacts(user_id)
for c in contacts:
try:
await chat.api_send_text_message(
{"chatType": "direct", "chatId": c["contactId"]}, text
)
except Exception:
pass
except asyncio.CancelledError:
pass
except Exception as e:
log.exception("Bot %d crashed: %s", profile_id, e)
_append_log(b, f"ERROR: {e}")
finally:
if b.chat:
try:
await b.chat.close()
except Exception:
pass
def _append_log(b: RunningBot, line: str) -> None:
b.log_lines.append(line)
if len(b.log_lines) > 200:
b.log_lines = b.log_lines[-200:]