"""Bot lifecycle management — start, stop, status, message sending.""" import asyncio import json import logging from dataclasses import dataclass, field from typing import Any log = logging.getLogger(__name__) # api_list_groups returns BARE GroupInfo dicts (verified against the live API): # g["groupId"], g["groupProfile"]["displayName"], # g["groupSummary"]["currentMembers"], g["membership"]["memberRole"] # There is no "groupInfo" wrapper and no "members" list in this response. # # A "channel" is a group whose join link has acceptMemberRole == "observer" # (joiners are read-only; only the owner broadcasts). A regular group's link # has role "member" (2-way). This is the only thing that distinguishes them. def group_name(g: dict) -> str: return g["groupProfile"]["displayName"] def group_id(g: dict) -> int: return g["groupId"] def group_member_count(g: dict) -> int: return g.get("groupSummary", {}).get("currentMembers", 0) BOT_TYPES = ["echo", "broadcast", "support", "directory", "deadmans"] USER_TYPES = ["user"] ALL_TYPES = BOT_TYPES + USER_TYPES @dataclass class RunningBot: profile_id: int name: str bot_type: str task: asyncio.Task address: str = "" contacts: list[dict] = field(default_factory=list) groups: list[dict] = field(default_factory=list) log_lines: list[str] = field(default_factory=list) chat: Any = None # simplex_chat ChatApi instance # profile_id → RunningBot _running: dict[int, RunningBot] = {} def is_running(profile_id: int) -> bool: b = _running.get(profile_id) return b is not None and not b.task.done() def get_running(profile_id: int) -> RunningBot | None: b = _running.get(profile_id) if b and not b.task.done(): return b return None def all_statuses() -> dict[int, bool]: return {pid: is_running(pid) for pid in _running} async def start_bot(profile: dict, on_address: callable) -> None: """Start a bot for the given profile dict. Idempotent.""" pid = profile["id"] if is_running(pid): return config = json.loads(profile.get("config") or "{}") bot_type = profile["bot_type"] db_prefix = profile["db_prefix"] task = asyncio.create_task( _run_bot(pid, profile["name"], bot_type, db_prefix, config, on_address), name=f"bot-{pid}-{profile['name']}", ) _running[pid] = RunningBot( profile_id=pid, name=profile["name"], bot_type=bot_type, task=task, ) log.info("Started bot %d (%s / %s)", pid, profile["name"], bot_type) async def stop_bot(profile_id: int) -> None: b = _running.get(profile_id) if b and not b.task.done(): b.task.cancel() try: await b.task except asyncio.CancelledError: pass log.info("Stopped bot %d", profile_id) async def send_message(profile_id: int, contact_or_group: str, text: str) -> bool: """Send a text message from a running bot. Returns True on success.""" b = get_running(profile_id) if not b or not b.chat: return False try: for c in b.contacts: if c["localDisplayName"] == contact_or_group: await b.chat.api_send_text_message( {"chatType": "direct", "chatId": c["contactId"]}, text ) return True for g in b.groups: if group_name(g) == contact_or_group: await b.chat.api_send_text_message( {"chatType": "group", "chatId": group_id(g)}, text ) return True except Exception as e: log.error("send_message error: %s", e) return False async def send_to_chat(profile_id: int, chat_type: str, chat_id: int, text: str) -> bool: """Send a message directly to a chat by its (type, id) ref. Returns True on success.""" b = get_running(profile_id) if not b or not b.chat: return False try: await b.chat.api_send_text_message({"chatType": chat_type, "chatId": chat_id}, text) return True except Exception as e: log.error("send_to_chat error: %s", e) return False def _normalize_item(ci: dict) -> dict: """Flatten a ChatItem into {id, ts, text, outgoing, sender} for the UI.""" meta = ci.get("meta", {}) chat_dir = ci.get("chatDir", {}) dir_type = chat_dir.get("type", "") outgoing = dir_type.endswith("Snd") # Prefer meta.itemText; fall back to content.msgContent.text text = meta.get("itemText") or ci.get("content", {}).get("msgContent", {}).get("text", "") # Sender name: group messages carry the member; direct/own use a generic label sender = "" if dir_type == "groupRcv": sender = chat_dir.get("groupMember", {}).get("localDisplayName", "") return { "id": meta.get("itemId"), "ts": meta.get("itemTs", ""), "text": text, "outgoing": outgoing, "sender": sender, "deleted": "itemDeleted" in meta, } async def get_chat_history( profile_id: int, chat_type: str, chat_id: int, count: int = 50 ) -> list[dict]: """Return the last `count` messages of a chat, oldest-first, normalized for the UI.""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") chat = await b.chat.api_get_chat(chat_type, chat_id, count) items = chat.get("chatItems", []) return [_normalize_item(ci) for ci in items] async def _run_bot( profile_id: int, name: str, bot_type: str, db_prefix: str, config: dict, on_address: callable, ) -> None: """Inner coroutine — runs the simplex-chat event loop for one profile.""" try: from simplex_chat import ChatApi, SqliteDb except ImportError: log.error("simplex-chat Python package not installed. Run: pip install simplex-chat") return b = _running[profile_id] try: chat = await ChatApi.init(SqliteDb(file_prefix=db_prefix)) b.chat = chat # libsimplex /_start requires an active user to exist first user = await chat.api_get_active_user() if not user: user = await chat.api_create_active_user( {"displayName": name, "fullName": ""} ) await chat.start_chat() user_id = user["userId"] existing = await chat.api_get_user_address(user_id) if existing: # api_get_user_address returns UserContactLink; link is nested under connLinkContact link = existing["connLinkContact"] else: # api_create_user_address returns CreatedConnLink directly link = await chat.api_create_user_address(user_id) address = link.get("connShortLink") or link.get("connFullLink", "") b.address = address await on_address(profile_id, address) # Configure address settings based on profile type settings: dict = {"businessAddress": False, "autoAccept": {"acceptIncognito": False}} if bot_type == "user": pass # plain user: auto-accept on, no auto-reply elif bot_type == "support": settings["businessAddress"] = True welcome = config.get("welcome_message", f"Welcome to {name} support.") settings["autoReply"] = {"type": "text", "text": welcome} elif bot_type in ("echo", "broadcast", "directory", "deadmans"): welcome = config.get("welcome_message", f"Connected to {name}.") settings["autoReply"] = {"type": "text", "text": welcome} await chat.api_set_address_settings(user_id, settings) # Refresh contacts/groups async def refresh() -> None: try: b.contacts = await chat.api_list_contacts(user_id) groups = await chat.api_list_groups(user_id) # Classify each group as channel (observer link) vs group (member link) for g in groups: await _classify_group(chat, g) b.groups = groups except Exception: log.exception("refresh failed for bot %d", profile_id) await refresh() # Event loop while True: evt = await chat.recv_chat_event(500_000) if evt is None: continue tag = evt.get("type", "") b.log_lines.append(f"[{tag}]") if len(b.log_lines) > 200: b.log_lines = b.log_lines[-200:] if tag == "contactConnected": await refresh() ct = evt.get("contact", {}) ct_name = ct.get("localDisplayName", "?") _append_log(b, f"Contact connected: {ct_name}") if bot_type == "echo": pass # echo handled on message elif bot_type == "broadcast": welcome = config.get("welcome_message", "You are subscribed.") try: await chat.api_send_text_message( {"chatType": "direct", "chatId": ct["contactId"]}, welcome ) except Exception: pass elif tag == "newChatItems": items = evt.get("chatItems", []) for item in items: ci = item.get("chatItem", {}) direction = ci.get("meta", {}).get("itemStatus", {}).get("type", "") if direction != "sndSent": content = ci.get("content", {}) mc = content.get("msgContent", {}) text = mc.get("text", "") chat_info = item.get("chatInfo", {}) _append_log(b, f"Message: {text[:80]}") if bot_type == "echo" and text: try: await chat.api_send_text_reply(item, f"Echo: {text}") except Exception: pass elif bot_type == "broadcast": publishers = config.get("publishers", []) sender = chat_info.get("contact", {}).get("localDisplayName", "") if sender in publishers and text: # broadcast to all contacts contacts = await chat.api_list_contacts(user_id) for c in contacts: try: await chat.api_send_text_message( {"chatType": "direct", "chatId": c["contactId"]}, text ) except Exception: pass except asyncio.CancelledError: pass except Exception as e: log.exception("Bot %d crashed: %s", profile_id, e) _append_log(b, f"ERROR: {e}") finally: if b.chat: try: await b.chat.close() except Exception: pass def _append_log(b: RunningBot, line: str) -> None: b.log_lines.append(line) if len(b.log_lines) > 200: b.log_lines = b.log_lines[-200:] async def _classify_group(chat: Any, g: dict) -> None: """Annotate a GroupInfo in place with link info: is_channel, link. A channel is a group whose join-link role is "observer". Groups with a "member"+ link (or no link at all) are regular 2-way groups. """ g["is_channel"] = False g["link"] = "" try: link_obj = await chat.api_get_group_link(g["groupId"]) except Exception: return # no link → plain private group g["is_channel"] = link_obj.get("acceptMemberRole") == "observer" conn = link_obj.get("connLinkContact", {}) g["link"] = conn.get("connShortLink") or conn.get("connFullLink", "") async def create_channel(profile_id: int, name: str) -> str: """Create a group with an observer join link (a SimpleX channel). Returns the link.""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") user = await b.chat.api_get_active_user() if not user: raise RuntimeError("No active user for this profile") info = await b.chat.api_new_group(user["userId"], {"displayName": name, "fullName": ""}) link = await b.chat.api_create_group_link(info["groupId"], "observer") # Refresh cached group list (re-classifies all groups including the new channel) try: groups = await b.chat.api_list_groups(user["userId"]) for g in groups: await _classify_group(b.chat, g) b.groups = groups except Exception: log.exception("group refresh after create_channel failed") return link async def create_group(profile_id: int, name: str) -> str: """Create a regular 2-way group with a member join link. Returns the link.""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") user = await b.chat.api_get_active_user() if not user: raise RuntimeError("No active user for this profile") info = await b.chat.api_new_group(user["userId"], {"displayName": name, "fullName": ""}) link = await b.chat.api_create_group_link(info["groupId"], "member") try: groups = await b.chat.api_list_groups(user["userId"]) for g in groups: await _classify_group(b.chat, g) b.groups = groups except Exception: log.exception("group refresh after create_group failed") return link async def get_group_members(profile_id: int, gid: int) -> list[dict]: """Return the member list for a group/channel (excludes the owner themselves).""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") members = await b.chat.api_list_members(gid) return [ {"name": m["localDisplayName"], "role": m["memberRole"], "status": m["memberStatus"]} for m in members ] async def get_group_link(profile_id: int, gid: int) -> str: """Return the existing join link for a group/channel (empty if none).""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") try: return await b.chat.api_get_group_link_str(gid) except Exception: return ""