"""Bot lifecycle management — start, stop, status, message sending.""" import asyncio import json import logging import urllib.request from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any log = logging.getLogger(__name__) # Directory bot websites live in /web//, each self-contained # (its own index.html + data/), generated from the web/index.html template. WEB_DIR = Path(__file__).parent.parent / "web" def safe_name(name: str) -> str: return name.lower().replace(" ", "_") def generate_directory_site(name: str) -> str: """Generate a per-bot directory website from the web/index.html template. Substitutes the placeholder name (SimpleXXX -> bot name) and seeds an empty listing so the page loads cleanly. Returns the relative site path. Each directory bot gets its own folder, so multiple can coexist (not a singleton). """ template = (WEB_DIR / "index.html").read_text(encoding="utf-8") page = template.replace("SimpleXXX", name) site = WEB_DIR / safe_name(name) (site / "data").mkdir(parents=True, exist_ok=True) (site / "index.html").write_text(page, encoding="utf-8") listing = site / "data" / "listing.json" if not listing.exists(): listing.write_text('{"entries": []}', encoding="utf-8") return f"{safe_name(name)}/index.html" # ── Notifications ─────────────────────────────────────────────────────────────── # In-memory, cross-account feed of received messages. Ephemeral (clears on restart). _notifications: list[dict] = [] _notif_seq = 0 _NOTIF_MAX = 200 def record_notification( profile_id: int, profile_name: str, chat_type: str, chat_id: int, sender: str, text: str ) -> None: global _notif_seq _notif_seq += 1 _notifications.append({ "id": _notif_seq, "profile_id": profile_id, "profile_name": profile_name, "chat_type": chat_type, "chat_id": chat_id, "sender": sender, "text": text[:140], "ts": datetime.now(timezone.utc).isoformat(), "read": False, }) if len(_notifications) > _NOTIF_MAX: del _notifications[:-_NOTIF_MAX] def get_notifications(limit: int = 50) -> list[dict]: """Most-recent-first.""" return list(reversed(_notifications[-limit:])) def unread_count() -> int: return sum(1 for n in _notifications if not n["read"]) def mark_all_read() -> None: for n in _notifications: n["read"] = True # Default system prompt for LLM-backed support bots when none is configured. DEFAULT_SUPPORT_PROMPT = ( "You are a helpful customer-support assistant. Answer concisely and politely. " "If you don't know something, say so rather than guessing." ) async def llm_chat( api_base: str, api_key: str, model: str, messages: list[dict], timeout: float = 60.0 ) -> str: """Call an OpenAI-compatible /chat/completions endpoint and return the reply text. Works with any provider that follows the OpenAI standard — Grok (api.x.ai/v1), Ollama (localhost:11434/v1), OpenAI (api.openai.com/v1), etc. Only the base URL, key and model differ. `api_base` should include the version path (e.g. .../v1). """ url = api_base.rstrip("/") + "/chat/completions" body = json.dumps({"model": model, "messages": messages}).encode("utf-8") def _call() -> str: req = urllib.request.Request(url, data=body, method="POST") req.add_header("Content-Type", "application/json") if api_key: req.add_header("Authorization", f"Bearer {api_key}") with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310 - user-configured endpoint return resp.read().decode("utf-8") raw = await asyncio.to_thread(_call) data = json.loads(raw) return data["choices"][0]["message"]["content"] # api_list_groups returns BARE GroupInfo dicts (verified against the live API): # g["groupId"], g["groupProfile"]["displayName"], # g["groupSummary"]["currentMembers"], g["membership"]["memberRole"] # There is no "groupInfo" wrapper and no "members" list in this response. # # A "channel" is a group whose join link has acceptMemberRole == "observer" # (joiners are read-only; only the owner broadcasts). A regular group's link # has role "member" (2-way). This is the only thing that distinguishes them. def group_name(g: dict) -> str: return g["groupProfile"]["displayName"] def group_id(g: dict) -> int: return g["groupId"] def group_member_count(g: dict) -> int: return g.get("groupSummary", {}).get("currentMembers", 0) BOT_TYPES = ["echo", "broadcast", "support", "directory", "deadmans"] USER_TYPES = ["user"] ALL_TYPES = BOT_TYPES + USER_TYPES @dataclass class RunningBot: profile_id: int name: str bot_type: str task: asyncio.Task address: str = "" contacts: list[dict] = field(default_factory=list) groups: list[dict] = field(default_factory=list) log_lines: list[str] = field(default_factory=list) chat: Any = None # simplex_chat ChatApi instance # Per-contact LLM conversation history (contactId → [{role, content}, ...]) histories: dict[int, list[dict]] = field(default_factory=dict) # profile_id → RunningBot _running: dict[int, RunningBot] = {} def is_running(profile_id: int) -> bool: b = _running.get(profile_id) return b is not None and not b.task.done() def get_running(profile_id: int) -> RunningBot | None: b = _running.get(profile_id) if b and not b.task.done(): return b return None def all_statuses() -> dict[int, bool]: return {pid: is_running(pid) for pid in _running} async def start_bot(profile: dict, on_address: callable) -> None: """Start a bot for the given profile dict. Idempotent.""" pid = profile["id"] if is_running(pid): return config = json.loads(profile.get("config") or "{}") bot_type = profile["bot_type"] db_prefix = profile["db_prefix"] task = asyncio.create_task( _run_bot(pid, profile["name"], bot_type, db_prefix, config, on_address), name=f"bot-{pid}-{profile['name']}", ) _running[pid] = RunningBot( profile_id=pid, name=profile["name"], bot_type=bot_type, task=task, ) log.info("Started bot %d (%s / %s)", pid, profile["name"], bot_type) async def stop_bot(profile_id: int) -> None: b = _running.get(profile_id) if b and not b.task.done(): b.task.cancel() try: await b.task except asyncio.CancelledError: pass log.info("Stopped bot %d", profile_id) async def send_message(profile_id: int, contact_or_group: str, text: str) -> bool: """Send a text message from a running bot. Returns True on success.""" b = get_running(profile_id) if not b or not b.chat: return False try: for c in b.contacts: if c["localDisplayName"] == contact_or_group: await b.chat.api_send_text_message( {"chatType": "direct", "chatId": c["contactId"]}, text ) return True for g in b.groups: if group_name(g) == contact_or_group: await b.chat.api_send_text_message( {"chatType": "group", "chatId": group_id(g)}, text ) return True except Exception as e: log.error("send_message error: %s", e) return False async def send_to_chat(profile_id: int, chat_type: str, chat_id: int, text: str) -> bool: """Send a message directly to a chat by its (type, id) ref. Raises on failure.""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") await b.chat.api_send_text_message({"chatType": chat_type, "chatId": chat_id}, text) return True async def refresh_lists(profile_id: int) -> None: """Re-fetch contacts and groups (with channel classification) for a running profile. Called when rendering the profile page so member counts and lists are current — group joins don't emit contactConnected, so the cached lists would otherwise stale. """ b = get_running(profile_id) if not b or not b.chat: return try: user = await b.chat.api_get_active_user() if not user: return uid = user["userId"] b.contacts = await b.chat.api_list_contacts(uid) groups = await b.chat.api_list_groups(uid) for g in groups: await _classify_group(b.chat, g) b.groups = groups except Exception: log.exception("refresh_lists failed for %d", profile_id) def _normalize_item(ci: dict) -> dict: """Flatten a ChatItem into {id, ts, text, outgoing, sender} for the UI.""" meta = ci.get("meta", {}) chat_dir = ci.get("chatDir", {}) dir_type = chat_dir.get("type", "") outgoing = dir_type.endswith("Snd") # Prefer meta.itemText; fall back to content.msgContent.text text = meta.get("itemText") or ci.get("content", {}).get("msgContent", {}).get("text", "") # Sender name: group messages carry the member; direct/own use a generic label sender = "" if dir_type == "groupRcv": sender = chat_dir.get("groupMember", {}).get("localDisplayName", "") return { "id": meta.get("itemId"), "ts": meta.get("itemTs", ""), "text": text, "outgoing": outgoing, "sender": sender, "deleted": "itemDeleted" in meta, } async def get_chat_history( profile_id: int, chat_type: str, chat_id: int, count: int = 50 ) -> list[dict]: """Return the last `count` messages of a chat, oldest-first, normalized for the UI.""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") chat = await b.chat.api_get_chat(chat_type, chat_id, count) items = chat.get("chatItems", []) return [_normalize_item(ci) for ci in items] async def clear_chat(profile_id: int, chat_type: str, chat_id: int) -> bool: """Clear a conversation's messages but keep the contact/group (delete mode 'messages').""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") ref = ("@" if chat_type == "direct" else "#") + str(chat_id) r = await b.chat.send_chat_cmd(f"/_delete {ref} messages") if isinstance(r, dict) and r.get("type") == "chatCmdError": raise RuntimeError(f"clear failed: {r}") return True async def delete_contact(profile_id: int, contact_id: int) -> bool: """Delete a contact entirely (delete mode 'full', notifies the contact).""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") await b.chat.api_delete_chat("direct", contact_id, {"type": "full", "notify": True}) # Refresh the cached contact list so the row disappears try: user = await b.chat.api_get_active_user() if user: b.contacts = await b.chat.api_list_contacts(user["userId"]) except Exception: pass return True async def _run_bot( profile_id: int, name: str, bot_type: str, db_prefix: str, config: dict, on_address: callable, ) -> None: """Inner coroutine — runs the simplex-chat event loop for one profile.""" try: from simplex_chat import ChatApi, SqliteDb except ImportError: log.error("simplex-chat Python package not installed. Run: pip install simplex-chat") return b = _running[profile_id] try: chat = await ChatApi.init(SqliteDb(file_prefix=db_prefix)) b.chat = chat # libsimplex /_start requires an active user to exist first user = await chat.api_get_active_user() if not user: user = await chat.api_create_active_user(_profile_dict(name, config)) await chat.start_chat() user_id = user["userId"] # Sync profile from config so edits made while stopped take effect on start await _sync_profile(chat, user_id, name, config) existing = await chat.api_get_user_address(user_id) if existing: # api_get_user_address returns UserContactLink; link is nested under connLinkContact link = existing["connLinkContact"] else: # api_create_user_address returns CreatedConnLink directly link = await chat.api_create_user_address(user_id) address = link.get("connShortLink") or link.get("connFullLink", "") b.address = address await on_address(profile_id, address) # Configure address settings based on profile type settings: dict = {"businessAddress": False, "autoAccept": {"acceptIncognito": False}} if bot_type == "user": pass # plain user: auto-accept on, no auto-reply elif bot_type == "support": settings["businessAddress"] = True welcome = config.get("welcome_message", f"Welcome to {name} support.") settings["autoReply"] = {"type": "text", "text": welcome} elif bot_type in ("echo", "broadcast", "directory", "deadmans"): welcome = config.get("welcome_message", f"Connected to {name}.") settings["autoReply"] = {"type": "text", "text": welcome} await chat.api_set_address_settings(user_id, settings) # Refresh contacts/groups async def refresh() -> None: try: b.contacts = await chat.api_list_contacts(user_id) groups = await chat.api_list_groups(user_id) # Classify each group as channel (observer link) vs group (member link) for g in groups: await _classify_group(chat, g) b.groups = groups except Exception: log.exception("refresh failed for bot %d", profile_id) await refresh() # Dead man's switch state: fire `message` if no check-in within checkin_hours. dms_interval_s = float(config.get("checkin_hours", 24)) * 3600 dms_last_checkin = datetime.now(timezone.utc) dms_fired = False if bot_type == "deadmans": _append_log(b, f"Dead man's switch armed — deadline in {config.get('checkin_hours', 24)}h") dir_tick = 0 # directory bots periodically scan for newly-added groups # Event loop while True: evt = await chat.recv_chat_event(500_000) # Directory bot: ~every 30s, refresh groups and register/auto-join new ones if bot_type == "directory": dir_tick += 1 if dir_tick >= 60: dir_tick = 0 await refresh() await _directory_register_new_groups(b, chat, config, name) # Dead man's switch: fire once if the check-in window has elapsed if bot_type == "deadmans" and not dms_fired: elapsed = (datetime.now(timezone.utc) - dms_last_checkin).total_seconds() if elapsed > dms_interval_s: dms_fired = True n = await _fire_deadmans(chat, user_id, config) _append_log(b, f"Dead man's switch FIRED → notified {n} contact(s)") if evt is None: continue tag = evt.get("type", "") b.log_lines.append(f"[{tag}]") if len(b.log_lines) > 200: b.log_lines = b.log_lines[-200:] if tag == "contactConnected": await refresh() ct = evt.get("contact", {}) ct_name = ct.get("localDisplayName", "?") _append_log(b, f"Contact connected: {ct_name}") if bot_type == "echo": pass # echo handled on message elif bot_type == "broadcast": welcome = config.get("welcome_message", "You are subscribed.") try: await chat.api_send_text_message( {"chatType": "direct", "chatId": ct["contactId"]}, welcome ) except Exception: pass elif tag == "newChatItems": items = evt.get("chatItems", []) for item in items: ci = item.get("chatItem", {}) # Robust incoming-vs-outgoing: chatDir ".../Rcv" = received, # ".../Snd" = sent by us. Avoids replying to our own messages. chat_dir = ci.get("chatDir", {}).get("type", "") incoming = chat_dir.endswith("Rcv") if incoming: content = ci.get("content", {}) mc = content.get("msgContent", {}) text = mc.get("text", "") chat_info = item.get("chatInfo", {}) _append_log(b, f"Message: {text[:80]}") # Cross-account notification for any received message if text: ci_type = chat_info.get("type") if ci_type == "direct": ct = chat_info.get("contact", {}) _notify_ref = ("direct", ct.get("contactId"), ct.get("localDisplayName", "")) elif ci_type == "group": gm = ci.get("chatDir", {}).get("groupMember", {}) or {} _notify_ref = ( "group", chat_info.get("groupInfo", {}).get("groupId"), gm.get("localDisplayName", ""), ) else: _notify_ref = (None, None, "") if _notify_ref[1] is not None: record_notification( profile_id, name, _notify_ref[0], _notify_ref[1], _notify_ref[2], text, ) if bot_type == "echo" and text: try: await chat.api_send_text_reply(item, f"Echo: {text}") except Exception: pass elif bot_type == "support" and text: await _handle_support_message( b, chat, config, item, chat_info, text ) elif bot_type == "directory" and text: await _handle_directory_message( b, chat, config, name, item, chat_info, text ) elif bot_type == "deadmans" and text: owner = config.get("owner") sender = chat_info.get("contact", {}).get("localDisplayName", "") # Any contact checks in, unless an owner is configured if not owner or sender == owner: dms_last_checkin = datetime.now(timezone.utc) dms_fired = False hrs = config.get("checkin_hours", 24) try: await chat.api_send_text_reply( item, f"✓ Check-in received. Switch reset — next deadline in {hrs}h." ) except Exception: pass elif bot_type == "broadcast": publishers = config.get("publishers", []) sender = chat_info.get("contact", {}).get("localDisplayName", "") if sender in publishers and text: # broadcast to all contacts contacts = await chat.api_list_contacts(user_id) for c in contacts: try: await chat.api_send_text_message( {"chatType": "direct", "chatId": c["contactId"]}, text ) except Exception: pass except asyncio.CancelledError: pass except Exception as e: log.exception("Bot %d crashed: %s", profile_id, e) _append_log(b, f"ERROR: {e}") finally: if b.chat: try: await b.chat.close() except Exception: pass def _append_log(b: RunningBot, line: str) -> None: b.log_lines.append(line) if len(b.log_lines) > 200: b.log_lines = b.log_lines[-200:] async def _handle_directory_message( b: RunningBot, chat: Any, config: dict, name: str, item: dict, chat_info: dict, text: str ) -> None: """Directory bot: super-user approval commands, and search for everyone else.""" import directory as _dir safe = safe_name(name) superusers = config.get("superusers", []) or [] sender = chat_info.get("contact", {}).get("localDisplayName", "") is_super = sender in superusers t = text.strip() async def reply(msg: str) -> None: try: await chat.api_send_text_reply(item, msg) except Exception: pass if t.startswith("/"): parts = t.split() cmd = parts[0].lower() if cmd == "/help": await reply("Send a search term to find groups. Admins: /list, /approve , /reject .") elif cmd == "/list": if not is_super: await reply("Only admins can list pending submissions.") return pend = _dir.entries_by_status(safe, "pending") if not pend: await reply("No pending submissions.") else: await reply("Pending:\n" + "\n".join( f"#{e['id']} {e['displayName']} ({'channel' if e['is_channel'] else 'group'})" for e in pend )) elif cmd in ("/approve", "/reject"): if not is_super: await reply("Only admins can approve or reject.") return if len(parts) < 2 or not parts[1].isdigit(): await reply(f"Usage: {cmd} ") return eid = int(parts[1]) status = "approved" if cmd == "/approve" else "rejected" e = _dir.set_status(safe, eid, status) if not e: await reply(f"No submission #{eid}.") return if status == "approved": _dir.publish(safe) await reply(f"✓ Approved #{eid} '{e['displayName']}' — now listed on the directory site.") else: await reply(f"Rejected #{eid} '{e['displayName']}'.") else: await reply("Unknown command. Send /help for options.") else: results = _dir.search(safe, t) if not results: await reply(f"No groups found for '{t}'. Send /help for options.") else: await reply("Found:\n" + "\n".join(f"{e['displayName']}: {e['link'] or '(no link)'}" for e in results[:10])) async def _directory_register_new_groups( b: RunningBot, chat: Any, config: dict, name: str ) -> None: """Auto-join groups the directory bot was added to, and register them as pending.""" import directory as _dir safe = safe_name(name) superusers = config.get("superusers", []) or [] for g in b.groups: gid = group_id(g) membership = g.get("membership") or {} status = membership.get("memberStatus", "") role = membership.get("memberRole", "") if status == "invited": try: await chat.api_join_group(gid) # someone added the bot; accept except Exception: pass continue # registered on a later tick once it syncs if role != "owner" and not _dir.find_by_group(_dir.load_state(safe), gid): link = "" try: link = await chat.api_get_group_link_str(gid) except Exception: pass entry, is_new = _dir.add_pending( safe, gid, group_name(g), link, bool(g.get("is_channel")), g.get("groupSummary") or {}, (g.get("groupProfile") or {}).get("shortDescr"), "group", ) if is_new: _append_log(b, f"Directory: registered pending #{entry['id']} '{entry['displayName']}'") for c in b.contacts: if c["localDisplayName"] in superusers: try: await chat.api_send_text_message( {"chatType": "direct", "chatId": c["contactId"]}, f"New directory submission #{entry['id']}: '{entry['displayName']}'. " f"/approve {entry['id']} or /reject {entry['id']}.", ) except Exception: pass async def _fire_deadmans(chat: Any, user_id: int, config: dict) -> int: """Send the dead-man message to recipients (named, or all contacts). Returns count sent.""" message = config.get("message") or "Dead man's switch triggered — no check-in was received." recipients = config.get("recipients") or [] # list of display names; empty = all contacts try: contacts = await chat.api_list_contacts(user_id) except Exception: return 0 targets = contacts if not recipients else [c for c in contacts if c["localDisplayName"] in recipients] sent = 0 for c in targets: try: await chat.api_send_text_message({"chatType": "direct", "chatId": c["contactId"]}, message) sent += 1 except Exception: pass return sent async def _handle_support_message( b: RunningBot, chat: Any, config: dict, item: dict, chat_info: dict, text: str ) -> None: """Answer an incoming support message via the configured OpenAI-compatible LLM. If no api_base is configured the bot stays silent (the static welcome auto-reply has already greeted the contact) — so support bots without an LLM behave as before. """ api_base = (config.get("api_base") or "").strip() if not api_base: return # no LLM configured for this bot contact_id = chat_info.get("contact", {}).get("contactId") system_prompt = config.get("system_prompt") or DEFAULT_SUPPORT_PROMPT model = config.get("model") or "grok-2" api_key = config.get("api_key") or "" # Maintain a short rolling history per contact for conversational context. hist = b.histories.setdefault(contact_id, []) hist.append({"role": "user", "content": text}) if len(hist) > 20: del hist[:-20] messages = [{"role": "system", "content": system_prompt}, *hist] try: reply = await llm_chat(api_base, api_key, model, messages) except Exception as e: log.error("support LLM error: %s", e) _append_log(b, f"LLM error: {e}") try: await chat.api_send_text_reply( item, "Sorry, I'm having trouble responding right now. Please try again shortly." ) except Exception: pass return hist.append({"role": "assistant", "content": reply}) _append_log(b, f"LLM reply: {reply[:80]}") try: await chat.api_send_text_reply(item, reply) except Exception: log.exception("failed to send support reply") async def global_status() -> dict: """Aggregate manager-wide status: running profiles, totals, and server config. Server/network info (SMP+XFTP counts, operators) is read from the first running profile — these are the shared SimpleX presets, so one profile represents all. """ running = [b for b in _running.values() if not b.task.done()] status = { "profiles_running": len(running), "contacts": sum(len(b.contacts) for b in running), "groups": sum(len(b.groups) for b in running), "smp_servers": 0, "xftp_servers": 0, "operators": [], "proxy_mode": "", } for b in running: if not b.chat: continue try: user = await b.chat.api_get_active_user() r = await b.chat.send_chat_cmd(f"/_servers {user['userId']}") for op in r.get("userServers", []): o = op.get("operator") or {} if o.get("enabled") and o.get("tradeName"): status["operators"].append(o["tradeName"]) status["smp_servers"] += sum( 1 for s in op.get("smpServers", []) if s.get("enabled") and not s.get("deleted") ) status["xftp_servers"] += sum( 1 for s in op.get("xftpServers", []) if s.get("enabled") and not s.get("deleted") ) nc = await b.chat.send_chat_cmd("/network") status["proxy_mode"] = nc.get("networkConfig", {}).get("smpProxyMode", "") break # one running profile is representative except Exception: log.exception("global_status server read failed") continue return status def _server_host(s: str) -> str: """Extract the readable host from a server URI like smp://key@host1,host2.onion.""" try: after = s.split("://", 1)[1] if "://" in s else s if "@" in after: after = after.split("@", 1)[1] return after.split(",")[0] except Exception: return s def _server_row(s: dict) -> dict: return { "host": _server_host(s.get("server", "")), "enabled": bool(s.get("enabled")), "preset": bool(s.get("preset")), "deleted": bool(s.get("deleted")), } def _first_running_chat(): for b in _running.values(): if not b.task.done() and b.chat: return b return None async def get_servers_detail() -> dict: """Full per-operator SMP/XFTP server breakdown + network config (first running profile).""" b = _first_running_chat() if not b: return {"profile_name": None, "operators": [], "network": {}} try: user = await b.chat.api_get_active_user() r = await b.chat.send_chat_cmd(f"/_servers {user['userId']}") nc = await b.chat.send_chat_cmd("/network") operators = [] for op in r.get("userServers", []): o = op.get("operator") or {} operators.append({ "name": o.get("tradeName") or "Custom", "enabled": bool(o.get("enabled")), "smp": [_server_row(s) for s in op.get("smpServers", [])], "xftp": [_server_row(s) for s in op.get("xftpServers", [])], }) return {"profile_name": b.name, "operators": operators, "network": nc.get("networkConfig", {})} except Exception: log.exception("get_servers_detail failed") return {"profile_name": None, "operators": [], "network": {}} async def get_network_config() -> dict: """Just the networkConfig (proxy/host/session modes) from the first running profile.""" b = _first_running_chat() if not b: return {} try: nc = await b.chat.send_chat_cmd("/network") return nc.get("networkConfig", {}) except Exception: log.exception("get_network_config failed") return {} def _profile_dict(name: str, config: dict) -> dict: """Build a SimpleX Profile dict (displayName/fullName/shortDescr/image) from config.""" profile: dict = {"displayName": name, "fullName": config.get("full_name", "")} if config.get("bio"): profile["shortDescr"] = config["bio"] if config.get("avatar"): profile["image"] = config["avatar"] # base64 data URI return profile async def _sync_profile(chat: Any, user_id: int, name: str, config: dict) -> None: """Push the config-derived profile to the live account. No-op if unchanged.""" try: await chat.api_update_profile(user_id, _profile_dict(name, config)) except Exception: log.exception("profile sync failed") async def update_profile( profile_id: int, full_name: str | None, bio: str | None, avatar: str | None ) -> dict: """Persist profile edits to the manager DB and apply them live if running. `avatar` is a base64 data URI; pass None to leave the existing avatar unchanged, or an empty string to remove it. Returns the updated config dict. """ import db as _db prof = _db.get_profile(profile_id) if not prof: raise RuntimeError("Profile not found") config = json.loads(prof.get("config") or "{}") if full_name is not None: config["full_name"] = full_name if bio is not None: config["bio"] = bio if avatar is not None: if avatar: config["avatar"] = avatar else: config.pop("avatar", None) _db.update_config(profile_id, config) b = get_running(profile_id) if b and b.chat: user = await b.chat.api_get_active_user() if user: await _sync_profile(b.chat, user["userId"], prof["name"], config) return config async def _classify_group(chat: Any, g: dict) -> None: """Annotate a GroupInfo in place with link info: is_channel, link. A channel is a group whose join-link role is "observer". Groups with a "member"+ link (or no link at all) are regular 2-way groups. """ g["is_channel"] = False g["link"] = "" try: link_obj = await chat.api_get_group_link(g["groupId"]) except Exception: return # no link → plain private group g["is_channel"] = link_obj.get("acceptMemberRole") == "observer" conn = link_obj.get("connLinkContact", {}) g["link"] = conn.get("connShortLink") or conn.get("connFullLink", "") async def create_channel(profile_id: int, name: str) -> str: """Create a group with an observer join link (a SimpleX channel). Returns the link.""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") user = await b.chat.api_get_active_user() if not user: raise RuntimeError("No active user for this profile") info = await b.chat.api_new_group(user["userId"], {"displayName": name, "fullName": ""}) link = await b.chat.api_create_group_link(info["groupId"], "observer") # Refresh cached group list (re-classifies all groups including the new channel) try: groups = await b.chat.api_list_groups(user["userId"]) for g in groups: await _classify_group(b.chat, g) b.groups = groups except Exception: log.exception("group refresh after create_channel failed") return link async def create_group(profile_id: int, name: str) -> str: """Create a regular 2-way group with a member join link. Returns the link.""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") user = await b.chat.api_get_active_user() if not user: raise RuntimeError("No active user for this profile") info = await b.chat.api_new_group(user["userId"], {"displayName": name, "fullName": ""}) link = await b.chat.api_create_group_link(info["groupId"], "member") try: groups = await b.chat.api_list_groups(user["userId"]) for g in groups: await _classify_group(b.chat, g) b.groups = groups except Exception: log.exception("group refresh after create_group failed") return link async def get_group_members(profile_id: int, gid: int) -> list[dict]: """Return the member list for a group/channel. api_list_members excludes the user themselves, so we prepend our own membership — this makes the dialog count match groupSummary.currentMembers. """ b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") members = await b.chat.api_list_members(gid) result = [ {"name": m["localDisplayName"], "role": m["memberRole"], "status": m["memberStatus"]} for m in members ] # Prepend ourselves (the owner), pulled from the cached group's membership. for g in b.groups: if group_id(g) == gid: me = g.get("membership") or {} if me: result.insert(0, { "name": me.get("localDisplayName", "you") + " (you)", "role": me.get("memberRole", "owner"), "status": me.get("memberStatus", ""), }) break return result async def join_group(profile_id: int, gid: int) -> bool: """Accept a pending group invitation (memberStatus 'invited' -> joined).""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") await b.chat.api_join_group(gid) await refresh_lists(profile_id) return True async def leave_group(profile_id: int, gid: int) -> bool: """Leave a group/channel (api_leave_group), then refresh the cached lists.""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") await b.chat.api_leave_group(gid) await refresh_lists(profile_id) return True async def delete_group(profile_id: int, gid: int) -> bool: """Delete a group/channel entirely (owner action; delete mode 'full', notifies members).""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") await b.chat.api_delete_chat("group", gid, {"type": "full", "notify": True}) await refresh_lists(profile_id) return True async def get_group_link(profile_id: int, gid: int) -> str: """Return the existing join link for a group/channel (empty if none).""" b = get_running(profile_id) if not b or not b.chat: raise RuntimeError("Profile is not running") try: return await b.chat.api_get_group_link_str(gid) except Exception: return ""