Files
simplex-manager/manager/profiles.py
Jon 609e91c6de Add 'business' profile type and category
Business accounts are cli profiles with businessAddress=True, so each connecting
customer gets their own group chat (handled via the existing chat UI) with an
optional welcome auto-reply. New BUSINESS_TYPES, a /business page + sidebar entry,
and a business variant of the create form. profile/chat pages route via a
_category helper. Adds business_test.py (customer connects -> lands in a business
group, not a direct contact) — passes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 17:09:00 +01:00

1094 lines
42 KiB
Python

"""Bot lifecycle management — start, stop, status, message sending."""
import asyncio
import json
import logging
import urllib.request
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
log = logging.getLogger(__name__)
# Directory bot websites live in <repo>/web/<safe_name>/, each self-contained
# (its own index.html + data/), generated from the web/index.html template.
WEB_DIR = Path(__file__).parent.parent / "web"
def safe_name(name: str) -> str:
return name.lower().replace(" ", "_")
def generate_directory_site(name: str) -> str:
"""Generate a per-bot directory website from the web/index.html template.
Substitutes the placeholder name (SimpleXXX -> bot name) and seeds an empty
listing so the page loads cleanly. Returns the relative site path.
Each directory bot gets its own folder, so multiple can coexist (not a singleton).
"""
template = (WEB_DIR / "index.html").read_text(encoding="utf-8")
page = template.replace("SimpleXXX", name)
site = WEB_DIR / safe_name(name)
(site / "data").mkdir(parents=True, exist_ok=True)
(site / "index.html").write_text(page, encoding="utf-8")
listing = site / "data" / "listing.json"
if not listing.exists():
listing.write_text('{"entries": []}', encoding="utf-8")
return f"{safe_name(name)}/index.html"
# ── Notifications ───────────────────────────────────────────────────────────────
# In-memory, cross-account feed of received messages. Ephemeral (clears on restart).
_notifications: list[dict] = []
_notif_seq = 0
_NOTIF_MAX = 200
def record_notification(
profile_id: int, profile_name: str, chat_type: str, chat_id: int, sender: str, text: str
) -> None:
global _notif_seq
_notif_seq += 1
_notifications.append({
"id": _notif_seq,
"profile_id": profile_id,
"profile_name": profile_name,
"chat_type": chat_type,
"chat_id": chat_id,
"sender": sender,
"text": text[:140],
"ts": datetime.now(timezone.utc).isoformat(),
"read": False,
})
if len(_notifications) > _NOTIF_MAX:
del _notifications[:-_NOTIF_MAX]
def get_notifications(limit: int = 50) -> list[dict]:
"""Most-recent-first."""
return list(reversed(_notifications[-limit:]))
def unread_count() -> int:
return sum(1 for n in _notifications if not n["read"])
def mark_all_read() -> None:
for n in _notifications:
n["read"] = True
# Default system prompt for LLM-backed support bots when none is configured.
DEFAULT_SUPPORT_PROMPT = (
"You are a helpful customer-support assistant. Answer concisely and politely. "
"If you don't know something, say so rather than guessing."
)
async def llm_chat(
api_base: str, api_key: str, model: str, messages: list[dict], timeout: float = 60.0
) -> str:
"""Call an OpenAI-compatible /chat/completions endpoint and return the reply text.
Works with any provider that follows the OpenAI standard — Grok (api.x.ai/v1),
Ollama (localhost:11434/v1), OpenAI (api.openai.com/v1), etc. Only the base URL,
key and model differ. `api_base` should include the version path (e.g. .../v1).
"""
url = api_base.rstrip("/") + "/chat/completions"
body = json.dumps({"model": model, "messages": messages}).encode("utf-8")
def _call() -> str:
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/json")
if api_key:
req.add_header("Authorization", f"Bearer {api_key}")
with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310 - user-configured endpoint
return resp.read().decode("utf-8")
raw = await asyncio.to_thread(_call)
data = json.loads(raw)
return data["choices"][0]["message"]["content"]
# api_list_groups returns BARE GroupInfo dicts (verified against the live API):
# g["groupId"], g["groupProfile"]["displayName"],
# g["groupSummary"]["currentMembers"], g["membership"]["memberRole"]
# There is no "groupInfo" wrapper and no "members" list in this response.
#
# A "channel" is a group whose join link has acceptMemberRole == "observer"
# (joiners are read-only; only the owner broadcasts). A regular group's link
# has role "member" (2-way). This is the only thing that distinguishes them.
def group_name(g: dict) -> str:
return g["groupProfile"]["displayName"]
def group_id(g: dict) -> int:
return g["groupId"]
def group_member_count(g: dict) -> int:
return g.get("groupSummary", {}).get("currentMembers", 0)
BOT_TYPES = ["echo", "broadcast", "support", "directory", "deadmans"]
USER_TYPES = ["user"]
BUSINESS_TYPES = ["business"] # cli accounts with a business address (per-customer group chats)
ALL_TYPES = BOT_TYPES + USER_TYPES + BUSINESS_TYPES
@dataclass
class RunningBot:
profile_id: int
name: str
bot_type: str
task: asyncio.Task
address: str = ""
contacts: list[dict] = field(default_factory=list)
groups: list[dict] = field(default_factory=list)
log_lines: list[str] = field(default_factory=list)
chat: Any = None # simplex_chat ChatApi instance
# Per-contact LLM conversation history (contactId → [{role, content}, ...])
histories: dict[int, list[dict]] = field(default_factory=dict)
# profile_id → RunningBot
_running: dict[int, RunningBot] = {}
def is_running(profile_id: int) -> bool:
b = _running.get(profile_id)
return b is not None and not b.task.done()
def get_running(profile_id: int) -> RunningBot | None:
b = _running.get(profile_id)
if b and not b.task.done():
return b
return None
def all_statuses() -> dict[int, bool]:
return {pid: is_running(pid) for pid in _running}
async def start_bot(profile: dict, on_address: callable) -> None:
"""Start a bot for the given profile dict. Idempotent."""
pid = profile["id"]
if is_running(pid):
return
config = json.loads(profile.get("config") or "{}")
bot_type = profile["bot_type"]
db_prefix = profile["db_prefix"]
task = asyncio.create_task(
_run_bot(pid, profile["name"], bot_type, db_prefix, config, on_address),
name=f"bot-{pid}-{profile['name']}",
)
_running[pid] = RunningBot(
profile_id=pid,
name=profile["name"],
bot_type=bot_type,
task=task,
)
log.info("Started bot %d (%s / %s)", pid, profile["name"], bot_type)
async def stop_bot(profile_id: int) -> None:
b = _running.get(profile_id)
if b and not b.task.done():
b.task.cancel()
try:
await b.task
except asyncio.CancelledError:
pass
log.info("Stopped bot %d", profile_id)
async def send_message(profile_id: int, contact_or_group: str, text: str) -> bool:
"""Send a text message from a running bot. Returns True on success."""
b = get_running(profile_id)
if not b or not b.chat:
return False
try:
for c in b.contacts:
if c["localDisplayName"] == contact_or_group:
await b.chat.api_send_text_message(
{"chatType": "direct", "chatId": c["contactId"]}, text
)
return True
for g in b.groups:
if group_name(g) == contact_or_group:
await b.chat.api_send_text_message(
{"chatType": "group", "chatId": group_id(g)}, text
)
return True
except Exception as e:
log.error("send_message error: %s", e)
return False
async def send_to_chat(profile_id: int, chat_type: str, chat_id: int, text: str) -> bool:
"""Send a message directly to a chat by its (type, id) ref. Raises on failure."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_send_text_message({"chatType": chat_type, "chatId": chat_id}, text)
return True
async def refresh_lists(profile_id: int) -> None:
"""Re-fetch contacts and groups (with channel classification) for a running profile.
Called when rendering the profile page so member counts and lists are current —
group joins don't emit contactConnected, so the cached lists would otherwise stale.
"""
b = get_running(profile_id)
if not b or not b.chat:
return
try:
user = await b.chat.api_get_active_user()
if not user:
return
uid = user["userId"]
b.contacts = await b.chat.api_list_contacts(uid)
groups = await b.chat.api_list_groups(uid)
for g in groups:
await _classify_group(b.chat, g)
b.groups = groups
except Exception:
log.exception("refresh_lists failed for %d", profile_id)
def _normalize_item(ci: dict) -> dict:
"""Flatten a ChatItem into {id, ts, text, outgoing, sender} for the UI."""
meta = ci.get("meta", {})
chat_dir = ci.get("chatDir", {})
dir_type = chat_dir.get("type", "")
outgoing = dir_type.endswith("Snd")
# Prefer meta.itemText; fall back to content.msgContent.text
text = meta.get("itemText") or ci.get("content", {}).get("msgContent", {}).get("text", "")
# Sender name: group messages carry the member; direct/own use a generic label
sender = ""
if dir_type == "groupRcv":
sender = chat_dir.get("groupMember", {}).get("localDisplayName", "")
return {
"id": meta.get("itemId"),
"ts": meta.get("itemTs", ""),
"text": text,
"outgoing": outgoing,
"sender": sender,
"deleted": "itemDeleted" in meta,
}
async def get_chat_history(
profile_id: int, chat_type: str, chat_id: int, count: int = 50
) -> list[dict]:
"""Return the last `count` messages of a chat, oldest-first, normalized for the UI."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
chat = await b.chat.api_get_chat(chat_type, chat_id, count)
items = chat.get("chatItems", [])
return [_normalize_item(ci) for ci in items]
async def clear_chat(profile_id: int, chat_type: str, chat_id: int) -> bool:
"""Clear a conversation's messages but keep the contact/group (delete mode 'messages')."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
ref = ("@" if chat_type == "direct" else "#") + str(chat_id)
r = await b.chat.send_chat_cmd(f"/_delete {ref} messages")
if isinstance(r, dict) and r.get("type") == "chatCmdError":
raise RuntimeError(f"clear failed: {r}")
return True
async def delete_contact(profile_id: int, contact_id: int) -> bool:
"""Delete a contact entirely (delete mode 'full', notifies the contact)."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_delete_chat("direct", contact_id, {"type": "full", "notify": True})
# Refresh the cached contact list so the row disappears
try:
user = await b.chat.api_get_active_user()
if user:
b.contacts = await b.chat.api_list_contacts(user["userId"])
except Exception:
pass
return True
# ── Broadcast bot helpers (mirror the official simplex-broadcast-bot) ────────────
# Publishers are configured as a list of "Name" or "ID:Name" strings. The official
# bot matches a KnownContact by contactId AND display name; we match on either, so
# names alone (what the UI collects) keep working.
def _parse_publishers(pubs: list) -> tuple[set[int], set[str]]:
ids: set[int] = set()
names: set[str] = set()
for p in pubs or []:
s = str(p).strip()
if not s:
continue
if ":" in s:
left, right = s.split(":", 1)
if left.strip().isdigit():
ids.add(int(left.strip()))
names.add(right.strip())
continue
names.add(s)
return ids, names
def _publisher_names(pubs: list) -> str:
_, names = _parse_publishers(pubs)
return ", ".join(sorted(names)) if names else "(no publishers configured)"
def _bc_welcome(config: dict, name: str) -> str:
w = (config.get("welcome_message") or "").strip()
if w:
return w
return (
"Hello! I am a broadcast bot.\n"
f"I broadcast messages to all connected users from {_publisher_names(config.get('publishers', []))}."
)
def _bc_prohibited(config: dict) -> str:
p = (config.get("prohibited_message") or "").strip()
if p:
return p
return (
f"Sorry, only these users can broadcast messages: {_publisher_names(config.get('publishers', []))}. "
"Your message is deleted."
)
# Content types the broadcast bot will relay (matches the official allowlist).
_BC_ALLOWED_CONTENT = {"text", "link"}
async def _handle_broadcast_message(
b: "RunningBot", chat: Any, config: dict, item: dict, chat_info: dict, mc: dict, text: str
) -> None:
"""Mirror simplex-broadcast-bot: publishers' messages go to all contacts; everyone
else gets the prohibited reply and their message is deleted."""
ids, names = _parse_publishers(config.get("publishers", []))
ct = chat_info.get("contact", {})
sender_id = ct.get("contactId")
sender_name = ct.get("localDisplayName", "")
is_publisher = (sender_id in ids) or (sender_name in names)
async def reply(msg: str) -> None:
try:
await chat.api_send_text_reply(item, msg)
except Exception:
pass
if not is_publisher:
await reply(_bc_prohibited(config))
item_id = item.get("chatItem", {}).get("meta", {}).get("itemId")
if sender_id is not None and item_id is not None:
try: # internal delete (a received message can't be deleted for the sender)
await chat.api_delete_chat_items("direct", sender_id, [item_id], "internal")
except Exception:
log.exception("broadcast: failed to delete non-publisher message")
return
if mc.get("type") not in _BC_ALLOWED_CONTENT or not text:
await reply("Message is not supported (text and links only).")
return
# Native broadcast: one /feed command fans out to every contact and reports counts.
try:
r = await chat.send_chat_cmd(f"/feed {text}")
except Exception as e:
log.error("broadcast /feed error: %s", e)
await reply("Could not broadcast right now, please try again.")
return
if isinstance(r, dict) and r.get("type") == "broadcastSent":
s, f = r.get("successes", 0), r.get("failures", 0)
_append_log(b, f"Broadcast → {s} ok, {f} errors")
await reply(f"Forwarded to {s} contact(s), {f} errors")
else:
log.error("broadcast unexpected response: %s", r)
await reply("Broadcast failed.")
async def _run_bot(
profile_id: int,
name: str,
bot_type: str,
db_prefix: str,
config: dict,
on_address: callable,
) -> None:
"""Inner coroutine — runs the simplex-chat event loop for one profile."""
try:
from simplex_chat import ChatApi, SqliteDb
except ImportError:
log.error("simplex-chat Python package not installed. Run: pip install simplex-chat")
return
b = _running[profile_id]
try:
chat = await ChatApi.init(SqliteDb(file_prefix=db_prefix))
b.chat = chat
# libsimplex /_start requires an active user to exist first
user = await chat.api_get_active_user()
if not user:
user = await chat.api_create_active_user(_profile_dict(name, config))
await chat.start_chat()
user_id = user["userId"]
# Sync profile from config so edits made while stopped take effect on start
await _sync_profile(chat, user_id, name, config)
existing = await chat.api_get_user_address(user_id)
if existing:
# api_get_user_address returns UserContactLink; link is nested under connLinkContact
link = existing["connLinkContact"]
else:
# api_create_user_address returns CreatedConnLink directly
link = await chat.api_create_user_address(user_id)
address = link.get("connShortLink") or link.get("connFullLink", "")
b.address = address
await on_address(profile_id, address)
# Configure address settings based on profile type
settings: dict = {"businessAddress": False, "autoAccept": {"acceptIncognito": False}}
if bot_type == "user":
pass # plain user: auto-accept on, no auto-reply
elif bot_type == "business":
# business account: each connecting customer becomes a group chat the
# operator handles in the UI. Optional welcome auto-reply if configured.
settings["businessAddress"] = True
welcome = (config.get("welcome_message") or "").strip()
if welcome:
settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type == "support":
settings["businessAddress"] = True
welcome = config.get("welcome_message", f"Welcome to {name} support.")
settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type == "broadcast":
# auto-reply greets each new contact (default lists allowed publishers)
settings["autoReply"] = {"type": "text", "text": _bc_welcome(config, name)}
elif bot_type in ("echo", "directory", "deadmans"):
welcome = config.get("welcome_message", f"Connected to {name}.")
settings["autoReply"] = {"type": "text", "text": welcome}
await chat.api_set_address_settings(user_id, settings)
# Refresh contacts/groups
async def refresh() -> None:
try:
b.contacts = await chat.api_list_contacts(user_id)
groups = await chat.api_list_groups(user_id)
# Classify each group as channel (observer link) vs group (member link)
for g in groups:
await _classify_group(chat, g)
b.groups = groups
except Exception:
log.exception("refresh failed for bot %d", profile_id)
await refresh()
# Dead man's switch state: fire `message` if no check-in within checkin_hours.
dms_interval_s = float(config.get("checkin_hours", 24)) * 3600
dms_last_checkin = datetime.now(timezone.utc)
dms_fired = False
if bot_type == "deadmans":
_append_log(b, f"Dead man's switch armed — deadline in {config.get('checkin_hours', 24)}h")
dir_tick = 0 # directory bots periodically scan for newly-added groups
# Event loop
while True:
evt = await chat.recv_chat_event(500_000)
# Directory bot: ~every 30s, refresh groups and register/auto-join new ones
if bot_type == "directory":
dir_tick += 1
if dir_tick >= 60:
dir_tick = 0
await refresh()
await _directory_register_new_groups(b, chat, config, name)
# Dead man's switch: fire once if the check-in window has elapsed
if bot_type == "deadmans" and not dms_fired:
elapsed = (datetime.now(timezone.utc) - dms_last_checkin).total_seconds()
if elapsed > dms_interval_s:
dms_fired = True
n = await _fire_deadmans(chat, user_id, config)
_append_log(b, f"Dead man's switch FIRED → notified {n} contact(s)")
if evt is None:
continue
tag = evt.get("type", "")
b.log_lines.append(f"[{tag}]")
if len(b.log_lines) > 200:
b.log_lines = b.log_lines[-200:]
if tag == "contactConnected":
await refresh()
ct = evt.get("contact", {})
ct_name = ct.get("localDisplayName", "?")
_append_log(b, f"Contact connected: {ct_name}")
# echo replies on message; broadcast/others greet via the auto-reply
# configured in address settings, so nothing to do on connect here.
elif tag == "newChatItems":
items = evt.get("chatItems", [])
for item in items:
ci = item.get("chatItem", {})
# Robust incoming-vs-outgoing: chatDir ".../Rcv" = received,
# ".../Snd" = sent by us. Avoids replying to our own messages.
chat_dir = ci.get("chatDir", {}).get("type", "")
incoming = chat_dir.endswith("Rcv")
if incoming:
content = ci.get("content", {})
mc = content.get("msgContent", {})
text = mc.get("text", "")
chat_info = item.get("chatInfo", {})
_append_log(b, f"Message: {text[:80]}")
# Cross-account notification for any received message
if text:
ci_type = chat_info.get("type")
if ci_type == "direct":
ct = chat_info.get("contact", {})
_notify_ref = ("direct", ct.get("contactId"), ct.get("localDisplayName", ""))
elif ci_type == "group":
gm = ci.get("chatDir", {}).get("groupMember", {}) or {}
_notify_ref = (
"group",
chat_info.get("groupInfo", {}).get("groupId"),
gm.get("localDisplayName", ""),
)
else:
_notify_ref = (None, None, "")
if _notify_ref[1] is not None:
record_notification(
profile_id, name, _notify_ref[0], _notify_ref[1],
_notify_ref[2], text,
)
if bot_type == "echo" and text:
try:
await chat.api_send_text_reply(item, f"Echo: {text}")
except Exception:
pass
elif bot_type == "support" and text:
await _handle_support_message(
b, chat, config, item, chat_info, text
)
elif bot_type == "directory" and text:
await _handle_directory_message(
b, chat, config, name, item, chat_info, text
)
elif bot_type == "deadmans" and text:
owner = config.get("owner")
sender = chat_info.get("contact", {}).get("localDisplayName", "")
# Any contact checks in, unless an owner is configured
if not owner or sender == owner:
dms_last_checkin = datetime.now(timezone.utc)
dms_fired = False
hrs = config.get("checkin_hours", 24)
try:
await chat.api_send_text_reply(
item, f"✓ Check-in received. Switch reset — next deadline in {hrs}h."
)
except Exception:
pass
elif bot_type == "broadcast":
await _handle_broadcast_message(
b, chat, config, item, chat_info, mc, text
)
except asyncio.CancelledError:
pass
except Exception as e:
log.exception("Bot %d crashed: %s", profile_id, e)
_append_log(b, f"ERROR: {e}")
finally:
if b.chat:
try:
await b.chat.close()
except Exception:
pass
def _append_log(b: RunningBot, line: str) -> None:
b.log_lines.append(line)
if len(b.log_lines) > 200:
b.log_lines = b.log_lines[-200:]
async def _handle_directory_message(
b: RunningBot, chat: Any, config: dict, name: str, item: dict, chat_info: dict, text: str
) -> None:
"""Directory bot: super-user approval commands, and search for everyone else."""
import directory as _dir
safe = safe_name(name)
superusers = config.get("superusers", []) or []
sender = chat_info.get("contact", {}).get("localDisplayName", "")
is_super = sender in superusers
t = text.strip()
async def reply(msg: str) -> None:
try:
await chat.api_send_text_reply(item, msg)
except Exception:
pass
if t.startswith("/"):
parts = t.split()
cmd = parts[0].lower()
if cmd == "/help":
await reply("Send a search term to find groups. Admins: /list, /approve <id>, /reject <id>.")
elif cmd == "/list":
if not is_super:
await reply("Only admins can list pending submissions.")
return
pend = _dir.entries_by_status(safe, "pending")
if not pend:
await reply("No pending submissions.")
else:
await reply("Pending:\n" + "\n".join(
f"#{e['id']} {e['displayName']} ({'channel' if e['is_channel'] else 'group'})" for e in pend
))
elif cmd in ("/approve", "/reject"):
if not is_super:
await reply("Only admins can approve or reject.")
return
if len(parts) < 2 or not parts[1].isdigit():
await reply(f"Usage: {cmd} <id>")
return
eid = int(parts[1])
status = "approved" if cmd == "/approve" else "rejected"
e = _dir.set_status(safe, eid, status)
if not e:
await reply(f"No submission #{eid}.")
return
if status == "approved":
_dir.publish(safe)
await reply(f"✓ Approved #{eid} '{e['displayName']}' — now listed on the directory site.")
else:
await reply(f"Rejected #{eid} '{e['displayName']}'.")
else:
await reply("Unknown command. Send /help for options.")
else:
results = _dir.search(safe, t)
if not results:
await reply(f"No groups found for '{t}'. Send /help for options.")
else:
await reply("Found:\n" + "\n".join(f"{e['displayName']}: {e['link'] or '(no link)'}" for e in results[:10]))
async def _directory_register_new_groups(
b: RunningBot, chat: Any, config: dict, name: str
) -> None:
"""Auto-join groups the directory bot was added to, and register them as pending."""
import directory as _dir
safe = safe_name(name)
superusers = config.get("superusers", []) or []
for g in b.groups:
gid = group_id(g)
membership = g.get("membership") or {}
status = membership.get("memberStatus", "")
role = membership.get("memberRole", "")
if status == "invited":
try:
await chat.api_join_group(gid) # someone added the bot; accept
except Exception:
pass
continue # registered on a later tick once it syncs
if role != "owner" and not _dir.find_by_group(_dir.load_state(safe), gid):
link = ""
try:
link = await chat.api_get_group_link_str(gid)
except Exception:
pass
entry, is_new = _dir.add_pending(
safe, gid, group_name(g), link, bool(g.get("is_channel")),
g.get("groupSummary") or {}, (g.get("groupProfile") or {}).get("shortDescr"), "group",
)
if is_new:
_append_log(b, f"Directory: registered pending #{entry['id']} '{entry['displayName']}'")
for c in b.contacts:
if c["localDisplayName"] in superusers:
try:
await chat.api_send_text_message(
{"chatType": "direct", "chatId": c["contactId"]},
f"New directory submission #{entry['id']}: '{entry['displayName']}'. "
f"/approve {entry['id']} or /reject {entry['id']}.",
)
except Exception:
pass
async def _fire_deadmans(chat: Any, user_id: int, config: dict) -> int:
"""Send the dead-man message to recipients (named, or all contacts). Returns count sent."""
message = config.get("message") or "Dead man's switch triggered — no check-in was received."
recipients = config.get("recipients") or [] # list of display names; empty = all contacts
try:
contacts = await chat.api_list_contacts(user_id)
except Exception:
return 0
targets = contacts if not recipients else [c for c in contacts if c["localDisplayName"] in recipients]
sent = 0
for c in targets:
try:
await chat.api_send_text_message({"chatType": "direct", "chatId": c["contactId"]}, message)
sent += 1
except Exception:
pass
return sent
async def _handle_support_message(
b: RunningBot, chat: Any, config: dict, item: dict, chat_info: dict, text: str
) -> None:
"""Answer an incoming support message via the configured OpenAI-compatible LLM.
If no api_base is configured the bot stays silent (the static welcome auto-reply
has already greeted the contact) — so support bots without an LLM behave as before.
"""
api_base = (config.get("api_base") or "").strip()
if not api_base:
return # no LLM configured for this bot
contact_id = chat_info.get("contact", {}).get("contactId")
system_prompt = config.get("system_prompt") or DEFAULT_SUPPORT_PROMPT
model = config.get("model") or "grok-2"
api_key = config.get("api_key") or ""
# Maintain a short rolling history per contact for conversational context.
hist = b.histories.setdefault(contact_id, [])
hist.append({"role": "user", "content": text})
if len(hist) > 20:
del hist[:-20]
messages = [{"role": "system", "content": system_prompt}, *hist]
try:
reply = await llm_chat(api_base, api_key, model, messages)
except Exception as e:
log.error("support LLM error: %s", e)
_append_log(b, f"LLM error: {e}")
try:
await chat.api_send_text_reply(
item, "Sorry, I'm having trouble responding right now. Please try again shortly."
)
except Exception:
pass
return
hist.append({"role": "assistant", "content": reply})
_append_log(b, f"LLM reply: {reply[:80]}")
try:
await chat.api_send_text_reply(item, reply)
except Exception:
log.exception("failed to send support reply")
async def global_status() -> dict:
"""Aggregate manager-wide status: running profiles, totals, and server config.
Server/network info (SMP+XFTP counts, operators) is read from the first running
profile — these are the shared SimpleX presets, so one profile represents all.
"""
running = [b for b in _running.values() if not b.task.done()]
status = {
"profiles_running": len(running),
"contacts": sum(len(b.contacts) for b in running),
"groups": sum(len(b.groups) for b in running),
"smp_servers": 0,
"xftp_servers": 0,
"operators": [],
"proxy_mode": "",
}
for b in running:
if not b.chat:
continue
try:
user = await b.chat.api_get_active_user()
r = await b.chat.send_chat_cmd(f"/_servers {user['userId']}")
for op in r.get("userServers", []):
o = op.get("operator") or {}
if o.get("enabled") and o.get("tradeName"):
status["operators"].append(o["tradeName"])
status["smp_servers"] += sum(
1 for s in op.get("smpServers", []) if s.get("enabled") and not s.get("deleted")
)
status["xftp_servers"] += sum(
1 for s in op.get("xftpServers", []) if s.get("enabled") and not s.get("deleted")
)
nc = await b.chat.send_chat_cmd("/network")
status["proxy_mode"] = nc.get("networkConfig", {}).get("smpProxyMode", "")
break # one running profile is representative
except Exception:
log.exception("global_status server read failed")
continue
return status
def _server_host(s: str) -> str:
"""Extract the readable host from a server URI like smp://key@host1,host2.onion."""
try:
after = s.split("://", 1)[1] if "://" in s else s
if "@" in after:
after = after.split("@", 1)[1]
return after.split(",")[0]
except Exception:
return s
def _server_row(s: dict) -> dict:
return {
"host": _server_host(s.get("server", "")),
"enabled": bool(s.get("enabled")),
"preset": bool(s.get("preset")),
"deleted": bool(s.get("deleted")),
}
def _first_running_chat():
for b in _running.values():
if not b.task.done() and b.chat:
return b
return None
async def get_servers_detail() -> dict:
"""Full per-operator SMP/XFTP server breakdown + network config (first running profile)."""
b = _first_running_chat()
if not b:
return {"profile_name": None, "operators": [], "network": {}}
try:
user = await b.chat.api_get_active_user()
r = await b.chat.send_chat_cmd(f"/_servers {user['userId']}")
nc = await b.chat.send_chat_cmd("/network")
operators = []
for op in r.get("userServers", []):
o = op.get("operator") or {}
operators.append({
"name": o.get("tradeName") or "Custom",
"enabled": bool(o.get("enabled")),
"smp": [_server_row(s) for s in op.get("smpServers", [])],
"xftp": [_server_row(s) for s in op.get("xftpServers", [])],
})
return {"profile_name": b.name, "operators": operators, "network": nc.get("networkConfig", {})}
except Exception:
log.exception("get_servers_detail failed")
return {"profile_name": None, "operators": [], "network": {}}
async def get_network_config() -> dict:
"""Just the networkConfig (proxy/host/session modes) from the first running profile."""
b = _first_running_chat()
if not b:
return {}
try:
nc = await b.chat.send_chat_cmd("/network")
return nc.get("networkConfig", {})
except Exception:
log.exception("get_network_config failed")
return {}
def _profile_dict(name: str, config: dict) -> dict:
"""Build a SimpleX Profile dict (displayName/fullName/shortDescr/image) from config."""
profile: dict = {"displayName": name, "fullName": config.get("full_name", "")}
if config.get("bio"):
profile["shortDescr"] = config["bio"]
if config.get("avatar"):
profile["image"] = config["avatar"] # base64 data URI
return profile
async def _sync_profile(chat: Any, user_id: int, name: str, config: dict) -> None:
"""Push the config-derived profile to the live account. No-op if unchanged."""
try:
await chat.api_update_profile(user_id, _profile_dict(name, config))
except Exception:
log.exception("profile sync failed")
async def update_profile(
profile_id: int, full_name: str | None, bio: str | None, avatar: str | None
) -> dict:
"""Persist profile edits to the manager DB and apply them live if running.
`avatar` is a base64 data URI; pass None to leave the existing avatar unchanged,
or an empty string to remove it. Returns the updated config dict.
"""
import db as _db
prof = _db.get_profile(profile_id)
if not prof:
raise RuntimeError("Profile not found")
config = json.loads(prof.get("config") or "{}")
if full_name is not None:
config["full_name"] = full_name
if bio is not None:
config["bio"] = bio
if avatar is not None:
if avatar:
config["avatar"] = avatar
else:
config.pop("avatar", None)
_db.update_config(profile_id, config)
b = get_running(profile_id)
if b and b.chat:
user = await b.chat.api_get_active_user()
if user:
await _sync_profile(b.chat, user["userId"], prof["name"], config)
return config
async def _classify_group(chat: Any, g: dict) -> None:
"""Annotate a GroupInfo in place with link info: is_channel, link.
A channel is a group whose join-link role is "observer". Groups with a
"member"+ link (or no link at all) are regular 2-way groups.
"""
g["is_channel"] = False
g["link"] = ""
try:
link_obj = await chat.api_get_group_link(g["groupId"])
except Exception:
return # no link → plain private group
g["is_channel"] = link_obj.get("acceptMemberRole") == "observer"
conn = link_obj.get("connLinkContact", {})
g["link"] = conn.get("connShortLink") or conn.get("connFullLink", "")
async def create_channel(profile_id: int, name: str) -> str:
"""Create a group with an observer join link (a SimpleX channel). Returns the link."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
user = await b.chat.api_get_active_user()
if not user:
raise RuntimeError("No active user for this profile")
info = await b.chat.api_new_group(user["userId"], {"displayName": name, "fullName": ""})
link = await b.chat.api_create_group_link(info["groupId"], "observer")
# Refresh cached group list (re-classifies all groups including the new channel)
try:
groups = await b.chat.api_list_groups(user["userId"])
for g in groups:
await _classify_group(b.chat, g)
b.groups = groups
except Exception:
log.exception("group refresh after create_channel failed")
return link
async def create_group(profile_id: int, name: str) -> str:
"""Create a regular 2-way group with a member join link. Returns the link."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
user = await b.chat.api_get_active_user()
if not user:
raise RuntimeError("No active user for this profile")
info = await b.chat.api_new_group(user["userId"], {"displayName": name, "fullName": ""})
link = await b.chat.api_create_group_link(info["groupId"], "member")
try:
groups = await b.chat.api_list_groups(user["userId"])
for g in groups:
await _classify_group(b.chat, g)
b.groups = groups
except Exception:
log.exception("group refresh after create_group failed")
return link
async def get_group_members(profile_id: int, gid: int) -> list[dict]:
"""Return the member list for a group/channel.
api_list_members excludes the user themselves, so we prepend our own
membership — this makes the dialog count match groupSummary.currentMembers.
"""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
members = await b.chat.api_list_members(gid)
result = [
{"name": m["localDisplayName"], "role": m["memberRole"], "status": m["memberStatus"]}
for m in members
]
# Prepend ourselves (the owner), pulled from the cached group's membership.
for g in b.groups:
if group_id(g) == gid:
me = g.get("membership") or {}
if me:
result.insert(0, {
"name": me.get("localDisplayName", "you") + " (you)",
"role": me.get("memberRole", "owner"),
"status": me.get("memberStatus", ""),
})
break
return result
async def join_group(profile_id: int, gid: int) -> bool:
"""Accept a pending group invitation (memberStatus 'invited' -> joined)."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_join_group(gid)
await refresh_lists(profile_id)
return True
async def leave_group(profile_id: int, gid: int) -> bool:
"""Leave a group/channel (api_leave_group), then refresh the cached lists."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_leave_group(gid)
await refresh_lists(profile_id)
return True
async def delete_group(profile_id: int, gid: int) -> bool:
"""Delete a group/channel entirely (owner action; delete mode 'full', notifies members)."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_delete_chat("group", gid, {"type": "full", "notify": True})
await refresh_lists(profile_id)
return True
async def get_group_link(profile_id: int, gid: int) -> str:
"""Return the existing join link for a group/channel (empty if none)."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
try:
return await b.chat.api_get_group_link_str(gid)
except Exception:
return ""