"""Bot lifecycle management — start, stop, status, message sending.""" import asyncio import json import logging import time import urllib.request import xml.etree.ElementTree as ET from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any log = logging.getLogger(__name__) # Directory bot websites live in /web//, each self-contained # (its own index.html + data/), generated from the web/index.html template. WEB_DIR = Path(__file__).parent.parent / "web" def safe_name(name: str) -> str: return name.lower().replace(" ", "_") def generate_directory_site(name: str) -> str: """Generate a per-bot directory website from the web/index.html template. Substitutes the placeholder name (SimpleXXX -> bot name) and seeds an empty listing so the page loads cleanly. Returns the relative site path. Each directory bot gets its own folder, so multiple can coexist (not a singleton). """ template = (WEB_DIR / "index.html").read_text(encoding="utf-8") page = template.replace("SimpleXXX", name) site = WEB_DIR / safe_name(name) (site / "data").mkdir(parents=True, exist_ok=True) (site / "index.html").write_text(page, encoding="utf-8") listing = site / "data" / "listing.json" if not listing.exists(): listing.write_text('{"entries": []}', encoding="utf-8") return f"{safe_name(name)}/index.html" # ── Notifications ─────────────────────────────────────────────────────────────── # In-memory, cross-account feed of received messages. Ephemeral (clears on restart). _notifications: list[dict] = [] _notif_seq = 0 _NOTIF_MAX = 200 def record_notification( profile_id: int, profile_name: str, chat_type: str, chat_id: int, sender: str, text: str ) -> None: global _notif_seq _notif_seq += 1 _notifications.append({ "id": _notif_seq, "profile_id": profile_id, "profile_name": profile_name, "chat_type": chat_type, "chat_id": chat_id, "sender": sender, "text": text[:140], "ts": datetime.now(timezone.utc).isoformat(), "read": False, }) if len(_notifications) > _NOTIF_MAX: del _notifications[:-_NOTIF_MAX] def get_notifications(limit: int = 50) -> list[dict]: """Most-recent-first.""" return list(reversed(_notifications[-limit:])) def unread_count() -> int: return sum(1 for n in _notifications if not n["read"]) def mark_all_read() -> None: for n in _notifications: n["read"] = True # Default system prompts when none is configured. DEFAULT_SUPPORT_PROMPT = ( "You are a helpful customer-support assistant. Answer concisely and politely. " "If you don't know something, say so rather than guessing." ) DEFAULT_LLM_PROMPT = ( "You are a helpful assistant. Answer concisely. " "If you don't know something, say so rather than guessing." ) async def llm_chat( api_base: str, api_key: str, model: str, messages: list[dict], timeout: float = 60.0 ) -> str: """Call an OpenAI-compatible /chat/completions endpoint and return the reply text. Works with any provider that follows the OpenAI standard — Grok (api.x.ai/v1), Ollama (localhost:11434/v1), OpenAI (api.openai.com/v1), etc. Only the base URL, key and model differ. `api_base` should include the version path (e.g. .../v1). """ url = api_base.rstrip("/") + "/chat/completions" body = json.dumps({"model": model, "messages": messages}).encode("utf-8") def _call() -> str: req = urllib.request.Request(url, data=body, method="POST") req.add_header("Content-Type", "application/json") if api_key: req.add_header("Authorization", f"Bearer {api_key}") with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310 - user-configured endpoint return resp.read().decode("utf-8") raw = await asyncio.to_thread(_call) data = json.loads(raw) return data["choices"][0]["message"]["content"] # api_list_groups returns BARE GroupInfo dicts (verified against the live API): # g["groupId"], g["groupProfile"]["displayName"], # g["groupSummary"]["currentMembers"], g["membership"]["memberRole"] # There is no "groupInfo" wrapper and no "members" list in this response. # # A "channel" is a group whose join link has acceptMemberRole == "observer" # (joiners are read-only; only the owner broadcasts). A regular group's link # has role "member" (2-way). This is the only thing that distinguishes them. def group_name(g: dict) -> str: return g["groupProfile"]["displayName"] def group_id(g: dict) -> int: return g["groupId"] def group_member_count(g: dict) -> int: return g.get("groupSummary", {}).get("currentMembers", 0) BOT_TYPES = ["echo", "llm", "rss", "broadcast", "support", "directory", "deadmans"] USER_TYPES = ["user"] BUSINESS_TYPES = ["business"] # cli accounts with a business address (per-customer group chats) ALL_TYPES = BOT_TYPES + USER_TYPES + BUSINESS_TYPES @dataclass class RunningBot: profile_id: int name: str bot_type: str task: asyncio.Task address: str = "" contacts: list[dict] = field(default_factory=list) groups: list[dict] = field(default_factory=list) log_lines: list[str] = field(default_factory=list) chat: Any = None # simplex_chat ChatApi instance # Per-contact LLM conversation history (contactId → [{role, content}, ...]) histories: dict[int, list[dict]] = field(default_factory=dict) # RSS bot state rss_seen: set = field(default_factory=set) # entry ids already posted rss_items: list = field(default_factory=list) # latest fetched entries (newest first) rss_next_poll: float = 0.0 rss_gid: int | None = None # broadcast channel group id # profile_id → RunningBot _running: dict[int, RunningBot] = {} def is_running(profile_id: int) -> bool: b = _running.get(profile_id) return b is not None and not b.task.done() def get_running(profile_id: int) -> RunningBot | None: b = _running.get(profile_id) if b and not b.task.done(): return b return None def all_statuses() -> dict[int, bool]: return {pid: is_running(pid) for pid in _running} async def start_bot(profile: dict, on_address: callable) -> None: """Start a bot for the given profile dict. Idempotent.""" pid = profile["id"] if is_running(pid): return config = json.loads(profile.get("config") or "{}") bot_type = profile["bot_type"] db_prefix = profile["db_prefix"] task = asyncio.create_task( _run_bot(pid, profile["name"], bot_type, db_prefix, config, on_address), name=f"bot-{pid}-{profile['name']}", ) _running[pid] = RunningBot( profile_id=pid, name=profile["name"], bot_type=bot_type, task=task, ) log.info("Started bot %d (%s / %s)", pid, profile["name"], bot_type) async def stop_bot(profile_id: int) -> None: b = _running.get(profile_id) if b and not b.task.done(): b.task.cancel() try: await b.task except asyncio.CancelledError: pass log.info("Stopped bot %d", profile_id) async def send_message(profile_id: int, contact_or_group: str, text: str) -> bool: """Send a text message from a running bot. Returns True on success.""" b = get_running(profile_id) if not b or not b.chat: return False try: for c in b.contacts: if c["localDisplayName"] == contact_or_group: await b.chat.api_send_text_message( {"chatType": "direct", "chatId": c["contactId"]}, text ) return True for g in b.groups: if group_name(g) == contact_or_group: await b.chat.api_send_text_message( {"chatType": "group", "chatId": group_id(g)}, text ) return True except Exception as e: log.error("send_message error: %s", e) return False async def send_to_chat(profile_id: int, chat_type: str, chat_id: int, text: str) -> bool: """Send a message directly to a chat by its (type, id) ref. Raises on failure.""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") await b.chat.api_send_text_message({"chatType": chat_type, "chatId": chat_id}, text) return True async def refresh_lists(profile_id: int) -> None: """Re-fetch contacts and groups (with channel classification) for a running profile. Called when rendering the profile page so member counts and lists are current — group joins don't emit contactConnected, so the cached lists would otherwise stale. """ b = get_running(profile_id) if not b or not b.chat: return try: user = await b.chat.api_get_active_user() if not user: return uid = user["userId"] b.contacts = await b.chat.api_list_contacts(uid) groups = await b.chat.api_list_groups(uid) for g in groups: await _classify_group(b.chat, g) b.groups = groups except Exception: log.exception("refresh_lists failed for %d", profile_id) def _normalize_item(ci: dict) -> dict: """Flatten a ChatItem into {id, ts, text, outgoing, sender} for the UI.""" meta = ci.get("meta", {}) chat_dir = ci.get("chatDir", {}) dir_type = chat_dir.get("type", "") outgoing = dir_type.endswith("Snd") # Prefer meta.itemText; fall back to content.msgContent.text text = meta.get("itemText") or ci.get("content", {}).get("msgContent", {}).get("text", "") # Sender name: group messages carry the member; direct/own use a generic label sender = "" if dir_type == "groupRcv": sender = chat_dir.get("groupMember", {}).get("localDisplayName", "") return { "id": meta.get("itemId"), "ts": meta.get("itemTs", ""), "text": text, "outgoing": outgoing, "sender": sender, "deleted": "itemDeleted" in meta, } async def get_chat_history( profile_id: int, chat_type: str, chat_id: int, count: int = 50 ) -> list[dict]: """Return the last `count` messages of a chat, oldest-first, normalized for the UI.""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") chat = await b.chat.api_get_chat(chat_type, chat_id, count) items = chat.get("chatItems", []) return [_normalize_item(ci) for ci in items] async def clear_chat(profile_id: int, chat_type: str, chat_id: int) -> bool: """Clear a conversation's messages but keep the contact/group (delete mode 'messages').""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") ref = ("@" if chat_type == "direct" else "#") + str(chat_id) r = await b.chat.send_chat_cmd(f"/_delete {ref} messages") if isinstance(r, dict) and r.get("type") == "chatCmdError": raise RuntimeError(f"clear failed: {r}") return True async def delete_contact(profile_id: int, contact_id: int) -> bool: """Delete a contact entirely (delete mode 'full', notifies the contact).""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") await b.chat.api_delete_chat("direct", contact_id, {"type": "full", "notify": True}) # Refresh the cached contact list so the row disappears try: user = await b.chat.api_get_active_user() if user: b.contacts = await b.chat.api_list_contacts(user["userId"]) except Exception: pass return True # ── Broadcast bot helpers (mirror the official simplex-broadcast-bot) ──────────── # Publishers are configured as a list of "Name" or "ID:Name" strings. The official # bot matches a KnownContact by contactId AND display name; we match on either, so # names alone (what the UI collects) keep working. def _parse_publishers(pubs: list) -> tuple[set[int], set[str]]: ids: set[int] = set() names: set[str] = set() for p in pubs or []: s = str(p).strip() if not s: continue if ":" in s: left, right = s.split(":", 1) if left.strip().isdigit(): ids.add(int(left.strip())) names.add(right.strip()) continue names.add(s) return ids, names def _publisher_names(pubs: list) -> str: _, names = _parse_publishers(pubs) return ", ".join(sorted(names)) if names else "(no publishers configured)" def _bc_welcome(config: dict, name: str) -> str: w = (config.get("welcome_message") or "").strip() if w: return w return ( "Hello! I am a broadcast bot.\n" f"I broadcast messages to all connected users from {_publisher_names(config.get('publishers', []))}." ) def _bc_prohibited(config: dict) -> str: p = (config.get("prohibited_message") or "").strip() if p: return p return ( f"Sorry, only these users can broadcast messages: {_publisher_names(config.get('publishers', []))}. " "Your message is deleted." ) # Content types the broadcast bot will relay (matches the official allowlist). _BC_ALLOWED_CONTENT = {"text", "link"} async def _handle_broadcast_message( b: "RunningBot", chat: Any, config: dict, item: dict, chat_info: dict, mc: dict, text: str ) -> None: """Mirror simplex-broadcast-bot: publishers' messages go to all contacts; everyone else gets the prohibited reply and their message is deleted.""" ids, names = _parse_publishers(config.get("publishers", [])) ct = chat_info.get("contact", {}) sender_id = ct.get("contactId") sender_name = ct.get("localDisplayName", "") is_publisher = (sender_id in ids) or (sender_name in names) async def reply(msg: str) -> None: try: await chat.api_send_text_reply(item, msg) except Exception: pass if not is_publisher: await reply(_bc_prohibited(config)) item_id = item.get("chatItem", {}).get("meta", {}).get("itemId") if sender_id is not None and item_id is not None: try: # internal delete (a received message can't be deleted for the sender) await chat.api_delete_chat_items("direct", sender_id, [item_id], "internal") except Exception: log.exception("broadcast: failed to delete non-publisher message") return if mc.get("type") not in _BC_ALLOWED_CONTENT or not text: await reply("Message is not supported (text and links only).") return # Native broadcast: one /feed command fans out to every contact and reports counts. try: r = await chat.send_chat_cmd(f"/feed {text}") except Exception as e: log.error("broadcast /feed error: %s", e) await reply("Could not broadcast right now, please try again.") return if isinstance(r, dict) and r.get("type") == "broadcastSent": s, f = r.get("successes", 0), r.get("failures", 0) _append_log(b, f"Broadcast → {s} ok, {f} errors") await reply(f"Forwarded to {s} contact(s), {f} errors") else: log.error("broadcast unexpected response: %s", r) await reply("Broadcast failed.") # ── RSS bot helpers ────────────────────────────────────────────────────────────── def _fetch_feed(url: str) -> list[dict]: """Fetch + parse an RSS 2.0 or Atom feed → newest-first list of {id,title,link}. Uses only the stdlib (urllib + ElementTree), no extra dependencies.""" req = urllib.request.Request(url, headers={"User-Agent": "simplex-rss-bot/1.0"}) with urllib.request.urlopen(req, timeout=20) as r: # noqa: S310 - user-configured feed data = r.read() root = ET.fromstring(data) out: list[dict] = [] items = root.findall(".//item") # RSS 2.0 if items: for it in items: title = (it.findtext("title") or "").strip() link = (it.findtext("link") or "").strip() eid = (it.findtext("guid") or link or title).strip() out.append({"id": eid, "title": title, "link": link}) else: # Atom ns = "{http://www.w3.org/2005/Atom}" for e in root.findall(f".//{ns}entry"): title = (e.findtext(f"{ns}title") or "").strip() le = e.find(f"{ns}link") link = (le.get("href") if le is not None else "") or "" eid = (e.findtext(f"{ns}id") or link or title).strip() out.append({"id": eid, "title": title, "link": link}) return out def _rss_format(e: dict) -> str: title = e.get("title") or "(untitled)" return f"{title}\n{e['link']}" if e.get("link") else title async def _rss_ensure_channel(profile_id: int, b: "RunningBot", chat: Any, user_id: int, name: str, config: dict) -> int | None: """Find or create the broadcast channel (observer group) for this feed; persist its id.""" import db as _db gid = config.get("channel_gid") if gid: try: groups = await chat.api_list_groups(user_id) if any(group_id(g) == gid for g in groups): return gid except Exception: return gid # assume still valid if the lookup failed try: info = await chat.api_new_group(user_id, { "displayName": f"{name} Feed", "fullName": "", "groupPreferences": {"history": {"enable": "on"}}, # new subscribers see recent posts }) gid = info["groupId"] await chat.api_create_group_link(gid, "observer") # channel = observer link config["channel_gid"] = gid _db.update_config(profile_id, config) _append_log(b, f"RSS channel created (group {gid})") return gid except Exception: log.exception("rss: failed to create channel") return None async def _rss_poll(b: "RunningBot", chat: Any, gid: int | None, config: dict, seed: bool) -> None: """Fetch the feed; on the seed run just record existing ids, otherwise broadcast new items.""" url = (config.get("feed_url") or "").strip() if not url: return try: entries = await asyncio.to_thread(_fetch_feed, url) except Exception as e: log.error("rss fetch error: %s", e) _append_log(b, f"RSS fetch error: {e}") return b.rss_items = entries new = [e for e in entries if e["id"] not in b.rss_seen] for e in new: b.rss_seen.add(e["id"]) if seed: _append_log(b, f"RSS seeded {len(entries)} existing item(s)") return if not gid or not new: return for e in reversed(new): # post oldest → newest try: await chat.api_send_text_message({"chatType": "group", "chatId": gid}, _rss_format(e)) except Exception: log.exception("rss: failed to post to channel") _append_log(b, f"RSS posted {len(new)} new item(s)") async def _rss_send_latest(chat: Any, item: dict, b: "RunningBot", n: int = 3) -> None: """Reply to a direct request (e.g. /new) with the latest feed items.""" items = b.rss_items[:n] if not items: try: await chat.api_send_text_reply(item, "No items yet — check back soon.") except Exception: pass return for e in items: try: await chat.api_send_text_reply(item, _rss_format(e)) except Exception: pass async def _run_bot( profile_id: int, name: str, bot_type: str, db_prefix: str, config: dict, on_address: callable, ) -> None: """Inner coroutine — runs the simplex-chat event loop for one profile.""" try: from simplex_chat import ChatApi, SqliteDb except ImportError: log.error("simplex-chat Python package not installed. Run: pip install simplex-chat") return b = _running[profile_id] try: chat = await ChatApi.init(SqliteDb(file_prefix=db_prefix)) b.chat = chat # libsimplex /_start requires an active user to exist first user = await chat.api_get_active_user() if not user: user = await chat.api_create_active_user(_profile_dict(name, config)) await chat.start_chat() user_id = user["userId"] # Sync profile from config so edits made while stopped take effect on start await _sync_profile(chat, user_id, name, config) existing = await chat.api_get_user_address(user_id) if existing: # api_get_user_address returns UserContactLink; link is nested under connLinkContact link = existing["connLinkContact"] else: # api_create_user_address returns CreatedConnLink directly link = await chat.api_create_user_address(user_id) address = link.get("connShortLink") or link.get("connFullLink", "") b.address = address await on_address(profile_id, address) # Configure address settings based on profile type settings: dict = {"businessAddress": False, "autoAccept": {"acceptIncognito": False}} if bot_type == "user": pass # plain user: auto-accept on, no auto-reply elif bot_type == "business": # business account: each connecting customer becomes a group chat the # operator handles in the UI. Optional welcome auto-reply if configured. settings["businessAddress"] = True welcome = (config.get("welcome_message") or "").strip() if welcome: settings["autoReply"] = {"type": "text", "text": welcome} elif bot_type == "support": settings["businessAddress"] = True welcome = config.get("welcome_message", f"Welcome to {name} support.") settings["autoReply"] = {"type": "text", "text": welcome} elif bot_type == "broadcast": # auto-reply greets each new contact (default lists allowed publishers) settings["autoReply"] = {"type": "text", "text": _bc_welcome(config, name)} elif bot_type == "rss": welcome = config.get("welcome_message") or f"Subscribed to {name}. Send /new for the latest posts." settings["autoReply"] = {"type": "text", "text": welcome} elif bot_type in ("echo", "llm", "directory", "deadmans"): welcome = config.get("welcome_message", f"Connected to {name}.") settings["autoReply"] = {"type": "text", "text": welcome} await chat.api_set_address_settings(user_id, settings) # Refresh contacts/groups async def refresh() -> None: try: b.contacts = await chat.api_list_contacts(user_id) groups = await chat.api_list_groups(user_id) # Classify each group as channel (observer link) vs group (member link) for g in groups: await _classify_group(chat, g) b.groups = groups except Exception: log.exception("refresh failed for bot %d", profile_id) await refresh() # Dead man's switch state: fire `message` if no check-in within checkin_hours. dms_interval_s = float(config.get("checkin_hours", 24)) * 3600 dms_last_checkin = datetime.now(timezone.utc) dms_fired = False if bot_type == "deadmans": _append_log(b, f"Dead man's switch armed — deadline in {config.get('checkin_hours', 24)}h") dir_tick = 0 # directory bots periodically scan for newly-added groups # RSS bot: ensure a broadcast channel and seed seen-items so we don't replay the # whole feed on startup; then poll on an interval. rss_poll_s = float(config.get("poll_seconds", 300)) if bot_type == "rss": b.rss_gid = await _rss_ensure_channel(profile_id, b, chat, user_id, name, config) await _rss_poll(b, chat, b.rss_gid, config, seed=True) b.rss_next_poll = time.time() + rss_poll_s await refresh() # Event loop while True: evt = await chat.recv_chat_event(500_000) # RSS bot: poll the feed on its interval and broadcast new items to the channel if bot_type == "rss" and time.time() >= b.rss_next_poll: await _rss_poll(b, chat, b.rss_gid, config, seed=False) b.rss_next_poll = time.time() + rss_poll_s # Directory bot: ~every 30s, refresh groups and register/auto-join new ones if bot_type == "directory": dir_tick += 1 if dir_tick >= 60: dir_tick = 0 await refresh() await _directory_register_new_groups(b, chat, config, name) # Dead man's switch: fire once if the check-in window has elapsed if bot_type == "deadmans" and not dms_fired: elapsed = (datetime.now(timezone.utc) - dms_last_checkin).total_seconds() if elapsed > dms_interval_s: dms_fired = True n = await _fire_deadmans(chat, user_id, config) _append_log(b, f"Dead man's switch FIRED → notified {n} contact(s)") if evt is None: continue tag = evt.get("type", "") b.log_lines.append(f"[{tag}]") if len(b.log_lines) > 200: b.log_lines = b.log_lines[-200:] if tag == "contactConnected": await refresh() ct = evt.get("contact", {}) ct_name = ct.get("localDisplayName", "?") _append_log(b, f"Contact connected: {ct_name}") # echo replies on message; broadcast/others greet via the auto-reply # configured in address settings. RSS also sends the latest items on connect. if bot_type == "rss": cid = ct.get("contactId") if cid is not None: # send the latest items directly to the new contact for e in b.rss_items[:3]: try: await chat.api_send_text_message( {"chatType": "direct", "chatId": cid}, _rss_format(e)) except Exception: pass elif tag == "newChatItems": items = evt.get("chatItems", []) for item in items: ci = item.get("chatItem", {}) # Robust incoming-vs-outgoing: chatDir ".../Rcv" = received, # ".../Snd" = sent by us. Avoids replying to our own messages. chat_dir = ci.get("chatDir", {}).get("type", "") incoming = chat_dir.endswith("Rcv") if incoming: content = ci.get("content", {}) mc = content.get("msgContent", {}) text = mc.get("text", "") chat_info = item.get("chatInfo", {}) _append_log(b, f"Message: {text[:80]}") # Cross-account notification for any received message if text: ci_type = chat_info.get("type") if ci_type == "direct": ct = chat_info.get("contact", {}) _notify_ref = ("direct", ct.get("contactId"), ct.get("localDisplayName", "")) elif ci_type == "group": gm = ci.get("chatDir", {}).get("groupMember", {}) or {} _notify_ref = ( "group", chat_info.get("groupInfo", {}).get("groupId"), gm.get("localDisplayName", ""), ) else: _notify_ref = (None, None, "") if _notify_ref[1] is not None: record_notification( profile_id, name, _notify_ref[0], _notify_ref[1], _notify_ref[2], text, ) if bot_type == "echo" and text: try: await chat.api_send_text_reply(item, f"Echo: {text}") except Exception: pass elif bot_type == "support" and text: await _handle_llm_message( b, chat, config, item, chat_info, text, DEFAULT_SUPPORT_PROMPT ) elif bot_type == "llm" and text: await _handle_llm_message( b, chat, config, item, chat_info, text, DEFAULT_LLM_PROMPT ) elif bot_type == "rss" and text: if text.strip().lower() == "/new": await _rss_send_latest(chat, item, b) elif bot_type == "directory" and text: await _handle_directory_message( b, chat, config, name, item, chat_info, text ) elif bot_type == "deadmans" and text: owner = config.get("owner") sender = chat_info.get("contact", {}).get("localDisplayName", "") # Any contact checks in, unless an owner is configured if not owner or sender == owner: dms_last_checkin = datetime.now(timezone.utc) dms_fired = False hrs = config.get("checkin_hours", 24) try: await chat.api_send_text_reply( item, f"✓ Check-in received. Switch reset — next deadline in {hrs}h." ) except Exception: pass elif bot_type == "broadcast": await _handle_broadcast_message( b, chat, config, item, chat_info, mc, text ) except asyncio.CancelledError: pass except Exception as e: log.exception("Bot %d crashed: %s", profile_id, e) _append_log(b, f"ERROR: {e}") finally: if b.chat: try: await b.chat.close() except Exception: pass def _append_log(b: RunningBot, line: str) -> None: b.log_lines.append(line) if len(b.log_lines) > 200: b.log_lines = b.log_lines[-200:] async def _handle_directory_message( b: RunningBot, chat: Any, config: dict, name: str, item: dict, chat_info: dict, text: str ) -> None: """Directory bot: super-user approval commands, and search for everyone else.""" import directory as _dir safe = safe_name(name) superusers = config.get("superusers", []) or [] sender = chat_info.get("contact", {}).get("localDisplayName", "") is_super = sender in superusers t = text.strip() async def reply(msg: str) -> None: try: await chat.api_send_text_reply(item, msg) except Exception: pass if t.startswith("/"): parts = t.split() cmd = parts[0].lower() if cmd == "/help": await reply("Send a search term to find groups. Admins: /list, /approve , /reject .") elif cmd == "/list": if not is_super: await reply("Only admins can list pending submissions.") return pend = _dir.entries_by_status(safe, "pending") if not pend: await reply("No pending submissions.") else: await reply("Pending:\n" + "\n".join( f"#{e['id']} {e['displayName']} ({'channel' if e['is_channel'] else 'group'})" for e in pend )) elif cmd in ("/approve", "/reject"): if not is_super: await reply("Only admins can approve or reject.") return if len(parts) < 2 or not parts[1].isdigit(): await reply(f"Usage: {cmd} ") return eid = int(parts[1]) status = "approved" if cmd == "/approve" else "rejected" e = _dir.set_status(safe, eid, status) if not e: await reply(f"No submission #{eid}.") return if status == "approved": _dir.publish(safe) await reply(f"✓ Approved #{eid} '{e['displayName']}' — now listed on the directory site.") else: await reply(f"Rejected #{eid} '{e['displayName']}'.") else: await reply("Unknown command. Send /help for options.") else: results = _dir.search(safe, t) if not results: await reply(f"No groups found for '{t}'. Send /help for options.") else: await reply("Found:\n" + "\n".join(f"{e['displayName']}: {e['link'] or '(no link)'}" for e in results[:10])) async def _directory_register_new_groups( b: RunningBot, chat: Any, config: dict, name: str ) -> None: """Auto-join groups the directory bot was added to, and register them as pending.""" import directory as _dir safe = safe_name(name) superusers = config.get("superusers", []) or [] for g in b.groups: gid = group_id(g) membership = g.get("membership") or {} status = membership.get("memberStatus", "") role = membership.get("memberRole", "") if status == "invited": try: await chat.api_join_group(gid) # someone added the bot; accept except Exception: pass continue # registered on a later tick once it syncs if role != "owner" and not _dir.find_by_group(_dir.load_state(safe), gid): link = "" try: link = await chat.api_get_group_link_str(gid) except Exception: pass entry, is_new = _dir.add_pending( safe, gid, group_name(g), link, bool(g.get("is_channel")), g.get("groupSummary") or {}, (g.get("groupProfile") or {}).get("shortDescr"), "group", ) if is_new: _append_log(b, f"Directory: registered pending #{entry['id']} '{entry['displayName']}'") for c in b.contacts: if c["localDisplayName"] in superusers: try: await chat.api_send_text_message( {"chatType": "direct", "chatId": c["contactId"]}, f"New directory submission #{entry['id']}: '{entry['displayName']}'. " f"/approve {entry['id']} or /reject {entry['id']}.", ) except Exception: pass async def _fire_deadmans(chat: Any, user_id: int, config: dict) -> int: """Send the dead-man message to recipients (named, or all contacts). Returns count sent.""" message = config.get("message") or "Dead man's switch triggered — no check-in was received." recipients = config.get("recipients") or [] # list of display names; empty = all contacts try: contacts = await chat.api_list_contacts(user_id) except Exception: return 0 targets = contacts if not recipients else [c for c in contacts if c["localDisplayName"] in recipients] sent = 0 for c in targets: try: await chat.api_send_text_message({"chatType": "direct", "chatId": c["contactId"]}, message) sent += 1 except Exception: pass return sent async def _handle_llm_message( b: RunningBot, chat: Any, config: dict, item: dict, chat_info: dict, text: str, default_prompt: str = DEFAULT_LLM_PROMPT, ) -> None: """Reply to an incoming message via the configured OpenAI-compatible LLM (works with Ollama's `ollama serve` at http://localhost:11434/v1, OpenAI, Grok…). `system_prompt` (the startup context), `api_base` (the API URL), `model` and an optional `api_key` come from config. If no api_base is set the bot stays silent. """ api_base = (config.get("api_base") or "").strip() if not api_base: return # no LLM configured for this bot contact_id = chat_info.get("contact", {}).get("contactId") system_prompt = config.get("system_prompt") or default_prompt model = config.get("model") or "grok-2" api_key = config.get("api_key") or "" # Maintain a short rolling history per contact for conversational context. hist = b.histories.setdefault(contact_id, []) hist.append({"role": "user", "content": text}) if len(hist) > 20: del hist[:-20] messages = [{"role": "system", "content": system_prompt}, *hist] try: reply = await llm_chat(api_base, api_key, model, messages) except Exception as e: log.error("LLM error: %s", e) _append_log(b, f"LLM error: {e}") try: await chat.api_send_text_reply( item, "Sorry, I'm having trouble responding right now. Please try again shortly." ) except Exception: pass return hist.append({"role": "assistant", "content": reply}) _append_log(b, f"LLM reply: {reply[:80]}") try: await chat.api_send_text_reply(item, reply) except Exception: log.exception("failed to send LLM reply") async def global_status() -> dict: """Aggregate manager-wide status: running profiles, totals, and server config. Server/network info (SMP+XFTP counts, operators) is read from the first running profile — these are the shared SimpleX presets, so one profile represents all. """ running = [b for b in _running.values() if not b.task.done()] status = { "profiles_running": len(running), "contacts": sum(len(b.contacts) for b in running), "groups": sum(len(b.groups) for b in running), "smp_servers": 0, "xftp_servers": 0, "operators": [], "proxy_mode": "", } for b in running: if not b.chat: continue try: user = await b.chat.api_get_active_user() r = await b.chat.send_chat_cmd(f"/_servers {user['userId']}") for op in r.get("userServers", []): o = op.get("operator") or {} if o.get("enabled") and o.get("tradeName"): status["operators"].append(o["tradeName"]) status["smp_servers"] += sum( 1 for s in op.get("smpServers", []) if s.get("enabled") and not s.get("deleted") ) status["xftp_servers"] += sum( 1 for s in op.get("xftpServers", []) if s.get("enabled") and not s.get("deleted") ) nc = await b.chat.send_chat_cmd("/network") status["proxy_mode"] = nc.get("networkConfig", {}).get("smpProxyMode", "") break # one running profile is representative except Exception: log.exception("global_status server read failed") continue return status def _server_host(s: str) -> str: """Extract the readable host from a server URI like smp://key@host1,host2.onion.""" try: after = s.split("://", 1)[1] if "://" in s else s if "@" in after: after = after.split("@", 1)[1] return after.split(",")[0] except Exception: return s def _server_row(s: dict) -> dict: return { "host": _server_host(s.get("server", "")), "enabled": bool(s.get("enabled")), "preset": bool(s.get("preset")), "deleted": bool(s.get("deleted")), } def _first_running_chat(): for b in _running.values(): if not b.task.done() and b.chat: return b return None async def get_servers_detail() -> dict: """Full per-operator SMP/XFTP server breakdown + network config (first running profile).""" b = _first_running_chat() if not b: return {"profile_name": None, "operators": [], "network": {}} try: user = await b.chat.api_get_active_user() r = await b.chat.send_chat_cmd(f"/_servers {user['userId']}") nc = await b.chat.send_chat_cmd("/network") operators = [] for op in r.get("userServers", []): o = op.get("operator") or {} operators.append({ "name": o.get("tradeName") or "Custom", "enabled": bool(o.get("enabled")), "smp": [_server_row(s) for s in op.get("smpServers", [])], "xftp": [_server_row(s) for s in op.get("xftpServers", [])], }) return {"profile_name": b.name, "operators": operators, "network": nc.get("networkConfig", {})} except Exception: log.exception("get_servers_detail failed") return {"profile_name": None, "operators": [], "network": {}} async def get_network_config() -> dict: """Just the networkConfig (proxy/host/session modes) from the first running profile.""" b = _first_running_chat() if not b: return {} try: nc = await b.chat.send_chat_cmd("/network") return nc.get("networkConfig", {}) except Exception: log.exception("get_network_config failed") return {} def _profile_dict(name: str, config: dict) -> dict: """Build a SimpleX Profile dict (displayName/fullName/shortDescr/image) from config.""" profile: dict = {"displayName": name, "fullName": config.get("full_name", "")} if config.get("bio"): profile["shortDescr"] = config["bio"] if config.get("avatar"): profile["image"] = config["avatar"] # base64 data URI return profile async def _sync_profile(chat: Any, user_id: int, name: str, config: dict) -> None: """Push the config-derived profile to the live account. No-op if unchanged.""" try: await chat.api_update_profile(user_id, _profile_dict(name, config)) except Exception: log.exception("profile sync failed") async def update_profile( profile_id: int, full_name: str | None, bio: str | None, avatar: str | None ) -> dict: """Persist profile edits to the manager DB and apply them live if running. `avatar` is a base64 data URI; pass None to leave the existing avatar unchanged, or an empty string to remove it. Returns the updated config dict. """ import db as _db prof = _db.get_profile(profile_id) if not prof: raise RuntimeError("Profile not found") config = json.loads(prof.get("config") or "{}") if full_name is not None: config["full_name"] = full_name if bio is not None: config["bio"] = bio if avatar is not None: if avatar: config["avatar"] = avatar else: config.pop("avatar", None) _db.update_config(profile_id, config) b = get_running(profile_id) if b and b.chat: user = await b.chat.api_get_active_user() if user: await _sync_profile(b.chat, user["userId"], prof["name"], config) return config async def _classify_group(chat: Any, g: dict) -> None: """Annotate a GroupInfo in place with link info: is_channel, link. A channel is a group whose join-link role is "observer". Groups with a "member"+ link (or no link at all) are regular 2-way groups. """ g["is_channel"] = False g["link"] = "" try: link_obj = await chat.api_get_group_link(g["groupId"]) except Exception: return # no link → plain private group g["is_channel"] = link_obj.get("acceptMemberRole") == "observer" conn = link_obj.get("connLinkContact", {}) g["link"] = conn.get("connShortLink") or conn.get("connFullLink", "") async def create_channel(profile_id: int, name: str) -> str: """Create a group with an observer join link (a SimpleX channel). Returns the link.""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") user = await b.chat.api_get_active_user() if not user: raise RuntimeError("No active user for this profile") info = await b.chat.api_new_group(user["userId"], {"displayName": name, "fullName": ""}) link = await b.chat.api_create_group_link(info["groupId"], "observer") # Refresh cached group list (re-classifies all groups including the new channel) try: groups = await b.chat.api_list_groups(user["userId"]) for g in groups: await _classify_group(b.chat, g) b.groups = groups except Exception: log.exception("group refresh after create_channel failed") return link async def create_group(profile_id: int, name: str) -> str: """Create a regular 2-way group with a member join link. Returns the link.""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") user = await b.chat.api_get_active_user() if not user: raise RuntimeError("No active user for this profile") info = await b.chat.api_new_group(user["userId"], {"displayName": name, "fullName": ""}) link = await b.chat.api_create_group_link(info["groupId"], "member") try: groups = await b.chat.api_list_groups(user["userId"]) for g in groups: await _classify_group(b.chat, g) b.groups = groups except Exception: log.exception("group refresh after create_group failed") return link async def get_group_members(profile_id: int, gid: int) -> list[dict]: """Return the member list for a group/channel. api_list_members excludes the user themselves, so we prepend our own membership — this makes the dialog count match groupSummary.currentMembers. """ b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") members = await b.chat.api_list_members(gid) result = [ {"name": m["localDisplayName"], "role": m["memberRole"], "status": m["memberStatus"]} for m in members ] # Prepend ourselves (the owner), pulled from the cached group's membership. for g in b.groups: if group_id(g) == gid: me = g.get("membership") or {} if me: result.insert(0, { "name": me.get("localDisplayName", "you") + " (you)", "role": me.get("memberRole", "owner"), "status": me.get("memberStatus", ""), }) break return result async def join_group(profile_id: int, gid: int) -> bool: """Accept a pending group invitation (memberStatus 'invited' -> joined).""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") await b.chat.api_join_group(gid) await refresh_lists(profile_id) return True async def leave_group(profile_id: int, gid: int) -> bool: """Leave a group/channel (api_leave_group), then refresh the cached lists.""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") await b.chat.api_leave_group(gid) await refresh_lists(profile_id) return True async def delete_group(profile_id: int, gid: int) -> bool: """Delete a group/channel entirely (owner action; delete mode 'full', notifies members).""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") await b.chat.api_delete_chat("group", gid, {"type": "full", "notify": True}) await refresh_lists(profile_id) return True async def get_group_link(profile_id: int, gid: int) -> str: """Return the existing join link for a group/channel (empty if none).""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") try: return await b.chat.api_get_group_link_str(gid) except Exception: return ""