"""Bot lifecycle management — start, stop, status, message sending.""" import asyncio import json import logging from dataclasses import dataclass, field from typing import Any log = logging.getLogger(__name__) BOT_TYPES = ["echo", "broadcast", "support", "directory", "deadmans"] @dataclass class RunningBot: profile_id: int name: str bot_type: str task: asyncio.Task address: str = "" contacts: list[dict] = field(default_factory=list) groups: list[dict] = field(default_factory=list) log_lines: list[str] = field(default_factory=list) chat: Any = None # simplex_chat ChatApi instance # profile_id → RunningBot _running: dict[int, RunningBot] = {} def is_running(profile_id: int) -> bool: b = _running.get(profile_id) return b is not None and not b.task.done() def get_running(profile_id: int) -> RunningBot | None: b = _running.get(profile_id) if b and not b.task.done(): return b return None def all_statuses() -> dict[int, bool]: return {pid: is_running(pid) for pid in _running} async def start_bot(profile: dict, on_address: callable) -> None: """Start a bot for the given profile dict. Idempotent.""" pid = profile["id"] if is_running(pid): return config = json.loads(profile.get("config") or "{}") bot_type = profile["bot_type"] db_prefix = profile["db_prefix"] task = asyncio.create_task( _run_bot(pid, profile["name"], bot_type, db_prefix, config, on_address), name=f"bot-{pid}-{profile['name']}", ) _running[pid] = RunningBot( profile_id=pid, name=profile["name"], bot_type=bot_type, task=task, ) log.info("Started bot %d (%s / %s)", pid, profile["name"], bot_type) async def stop_bot(profile_id: int) -> None: b = _running.get(profile_id) if b and not b.task.done(): b.task.cancel() try: await b.task except asyncio.CancelledError: pass log.info("Stopped bot %d", profile_id) async def send_message(profile_id: int, contact_or_group: str, text: str) -> bool: """Send a text message from a running bot. Returns True on success.""" b = get_running(profile_id) if not b or not b.chat: return False try: contacts = await b.chat.api_list_contacts(1) for c in contacts: if c["localDisplayName"] == contact_or_group: await b.chat.api_send_text_message( {"chatType": "direct", "chatId": c["contactId"]}, text ) return True groups = await b.chat.api_list_groups(1) for g in groups: if g["groupInfo"]["groupProfile"]["displayName"] == contact_or_group: await b.chat.api_send_text_message( {"chatType": "group", "chatId": g["groupInfo"]["groupId"]}, text ) return True except Exception as e: log.error("send_message error: %s", e) return False async def _run_bot( profile_id: int, name: str, bot_type: str, db_prefix: str, config: dict, on_address: callable, ) -> None: """Inner coroutine — runs the simplex-chat event loop for one profile.""" try: from simplex_chat import ChatApi, SqliteDb except ImportError: log.error("simplex-chat Python package not installed. Run: pip install simplex-chat") return b = _running[profile_id] try: chat = await ChatApi.init(SqliteDb(file_prefix=db_prefix)) b.chat = chat await chat.start_chat() # Create or fetch address user = await chat.api_get_active_user() if not user: user = await chat.api_create_active_user( {"displayName": name, "fullName": ""} ) user_id = user["userId"] addr = await chat.api_get_user_address(user_id) if not addr: addr = await chat.api_create_user_address(user_id) address = addr.get("connShortLink") or addr.get("connFullLink", "") b.address = address await on_address(profile_id, address) # Configure address settings based on bot type settings: dict = {"businessAddress": False, "autoAccept": {"acceptIncognito": False}} if bot_type == "support": settings["businessAddress"] = True welcome = config.get("welcome_message", f"Welcome to {name} support.") settings["autoReply"] = {"type": "text", "text": welcome} elif bot_type in ("echo", "broadcast", "directory", "deadmans"): welcome = config.get("welcome_message", f"Connected to {name}.") settings["autoReply"] = {"type": "text", "text": welcome} await chat.api_set_address_settings(user_id, settings) # Refresh contacts/groups async def refresh() -> None: try: b.contacts = await chat.api_list_contacts(user_id) b.groups = await chat.api_list_groups(user_id) except Exception: pass await refresh() # Event loop while True: evt = await chat.recv_chat_event(500_000) if evt is None: continue tag = evt.get("type", "") b.log_lines.append(f"[{tag}]") if len(b.log_lines) > 200: b.log_lines = b.log_lines[-200:] if tag == "contactConnected": await refresh() ct = evt.get("contact", {}) ct_name = ct.get("localDisplayName", "?") _append_log(b, f"Contact connected: {ct_name}") if bot_type == "echo": pass # echo handled on message elif bot_type == "broadcast": welcome = config.get("welcome_message", "You are subscribed.") try: await chat.api_send_text_message( {"chatType": "direct", "chatId": ct["contactId"]}, welcome ) except Exception: pass elif tag == "newChatItems": items = evt.get("chatItems", []) for item in items: ci = item.get("chatItem", {}) direction = ci.get("meta", {}).get("itemStatus", {}).get("type", "") if direction != "sndSent": content = ci.get("content", {}) mc = content.get("msgContent", {}) text = mc.get("text", "") chat_info = item.get("chatInfo", {}) _append_log(b, f"Message: {text[:80]}") if bot_type == "echo" and text: try: await chat.api_send_text_reply(item, f"Echo: {text}") except Exception: pass elif bot_type == "broadcast": publishers = config.get("publishers", []) sender = chat_info.get("contact", {}).get("localDisplayName", "") if sender in publishers and text: # broadcast to all contacts contacts = await chat.api_list_contacts(user_id) for c in contacts: try: await chat.api_send_text_message( {"chatType": "direct", "chatId": c["contactId"]}, text ) except Exception: pass except asyncio.CancelledError: pass except Exception as e: log.exception("Bot %d crashed: %s", profile_id, e) _append_log(b, f"ERROR: {e}") finally: if b.chat: try: await b.chat.close() except Exception: pass def _append_log(b: RunningBot, line: str) -> None: b.log_lines.append(line) if len(b.log_lines) > 200: b.log_lines = b.log_lines[-200:]