Files
simplex-manager/manager/profiles.py
Jon 7cda767408 RSS bot: populate channel on first run so joiners see content
Previously the bot seeded all existing feed items on startup WITHOUT posting, so a
freshly-created channel stayed empty and new subscribers saw nothing (only items
appearing after start were posted). Now on first run it posts the latest items
(max 5) to fill the channel — recent history then shows them to joiners — and sets
an rss_populated flag so restarts don't replay. Existing (empty) bots get filled
once on next start. Update rss_test.py.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 22:02:15 +01:00

1230 lines
48 KiB
Python

"""Bot lifecycle management — start, stop, status, message sending."""
import asyncio
import json
import logging
import time
import urllib.request
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
log = logging.getLogger(__name__)
# Directory bot websites live in <repo>/web/<safe_name>/, each self-contained
# (its own index.html + data/), generated from the web/index.html template.
WEB_DIR = Path(__file__).parent.parent / "web"
def safe_name(name: str) -> str:
return name.lower().replace(" ", "_")
def generate_directory_site(name: str) -> str:
"""Generate a per-bot directory website from the web/index.html template.
Substitutes the placeholder name (SimpleXXX -> bot name) and seeds an empty
listing so the page loads cleanly. Returns the relative site path.
Each directory bot gets its own folder, so multiple can coexist (not a singleton).
"""
template = (WEB_DIR / "index.html").read_text(encoding="utf-8")
page = template.replace("SimpleXXX", name)
site = WEB_DIR / safe_name(name)
(site / "data").mkdir(parents=True, exist_ok=True)
(site / "index.html").write_text(page, encoding="utf-8")
listing = site / "data" / "listing.json"
if not listing.exists():
listing.write_text('{"entries": []}', encoding="utf-8")
return f"{safe_name(name)}/index.html"
# ── Notifications ───────────────────────────────────────────────────────────────
# In-memory, cross-account feed of received messages. Ephemeral (clears on restart).
_notifications: list[dict] = []
_notif_seq = 0
_NOTIF_MAX = 200
def record_notification(
profile_id: int, profile_name: str, chat_type: str, chat_id: int, sender: str, text: str
) -> None:
global _notif_seq
_notif_seq += 1
_notifications.append({
"id": _notif_seq,
"profile_id": profile_id,
"profile_name": profile_name,
"chat_type": chat_type,
"chat_id": chat_id,
"sender": sender,
"text": text[:140],
"ts": datetime.now(timezone.utc).isoformat(),
"read": False,
})
if len(_notifications) > _NOTIF_MAX:
del _notifications[:-_NOTIF_MAX]
def get_notifications(limit: int = 50) -> list[dict]:
"""Most-recent-first."""
return list(reversed(_notifications[-limit:]))
def unread_count() -> int:
return sum(1 for n in _notifications if not n["read"])
def mark_all_read() -> None:
for n in _notifications:
n["read"] = True
# Default system prompts when none is configured.
DEFAULT_SUPPORT_PROMPT = (
"You are a helpful customer-support assistant. Answer concisely and politely. "
"If you don't know something, say so rather than guessing."
)
DEFAULT_LLM_PROMPT = (
"You are a helpful assistant. Answer concisely. "
"If you don't know something, say so rather than guessing."
)
async def llm_chat(
api_base: str, api_key: str, model: str, messages: list[dict], timeout: float = 60.0
) -> str:
"""Call an OpenAI-compatible /chat/completions endpoint and return the reply text.
Works with any provider that follows the OpenAI standard — Grok (api.x.ai/v1),
Ollama (localhost:11434/v1), OpenAI (api.openai.com/v1), etc. Only the base URL,
key and model differ. `api_base` should include the version path (e.g. .../v1).
"""
url = api_base.rstrip("/") + "/chat/completions"
body = json.dumps({"model": model, "messages": messages}).encode("utf-8")
def _call() -> str:
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/json")
if api_key:
req.add_header("Authorization", f"Bearer {api_key}")
with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310 - user-configured endpoint
return resp.read().decode("utf-8")
raw = await asyncio.to_thread(_call)
data = json.loads(raw)
return data["choices"][0]["message"]["content"]
# api_list_groups returns BARE GroupInfo dicts (verified against the live API):
# g["groupId"], g["groupProfile"]["displayName"],
# g["groupSummary"]["currentMembers"], g["membership"]["memberRole"]
# There is no "groupInfo" wrapper and no "members" list in this response.
#
# A "channel" is a group whose join link has acceptMemberRole == "observer"
# (joiners are read-only; only the owner broadcasts). A regular group's link
# has role "member" (2-way). This is the only thing that distinguishes them.
def group_name(g: dict) -> str:
return g["groupProfile"]["displayName"]
def group_id(g: dict) -> int:
return g["groupId"]
def group_member_count(g: dict) -> int:
return g.get("groupSummary", {}).get("currentMembers", 0)
BOT_TYPES = ["echo", "llm", "rss", "broadcast", "support", "directory", "deadmans"]
USER_TYPES = ["user"]
BUSINESS_TYPES = ["business"] # cli accounts with a business address (per-customer group chats)
ALL_TYPES = BOT_TYPES + USER_TYPES + BUSINESS_TYPES
@dataclass
class RunningBot:
profile_id: int
name: str
bot_type: str
task: asyncio.Task
address: str = ""
contacts: list[dict] = field(default_factory=list)
groups: list[dict] = field(default_factory=list)
log_lines: list[str] = field(default_factory=list)
chat: Any = None # simplex_chat ChatApi instance
# Per-contact LLM conversation history (contactId → [{role, content}, ...])
histories: dict[int, list[dict]] = field(default_factory=dict)
# RSS bot state
rss_seen: set = field(default_factory=set) # entry ids already posted
rss_next_poll: float = 0.0
rss_gid: int | None = None # broadcast channel group id
# profile_id → RunningBot
_running: dict[int, RunningBot] = {}
def is_running(profile_id: int) -> bool:
b = _running.get(profile_id)
return b is not None and not b.task.done()
def get_running(profile_id: int) -> RunningBot | None:
b = _running.get(profile_id)
if b and not b.task.done():
return b
return None
def all_statuses() -> dict[int, bool]:
return {pid: is_running(pid) for pid in _running}
async def start_bot(profile: dict, on_address: callable) -> None:
"""Start a bot for the given profile dict. Idempotent."""
pid = profile["id"]
if is_running(pid):
return
config = json.loads(profile.get("config") or "{}")
bot_type = profile["bot_type"]
db_prefix = profile["db_prefix"]
task = asyncio.create_task(
_run_bot(pid, profile["name"], bot_type, db_prefix, config, on_address),
name=f"bot-{pid}-{profile['name']}",
)
_running[pid] = RunningBot(
profile_id=pid,
name=profile["name"],
bot_type=bot_type,
task=task,
)
log.info("Started bot %d (%s / %s)", pid, profile["name"], bot_type)
async def stop_bot(profile_id: int) -> None:
b = _running.get(profile_id)
if b and not b.task.done():
b.task.cancel()
try:
await b.task
except asyncio.CancelledError:
pass
log.info("Stopped bot %d", profile_id)
async def send_message(profile_id: int, contact_or_group: str, text: str) -> bool:
"""Send a text message from a running bot. Returns True on success."""
b = get_running(profile_id)
if not b or not b.chat:
return False
try:
for c in b.contacts:
if c["localDisplayName"] == contact_or_group:
await b.chat.api_send_text_message(
{"chatType": "direct", "chatId": c["contactId"]}, text
)
return True
for g in b.groups:
if group_name(g) == contact_or_group:
await b.chat.api_send_text_message(
{"chatType": "group", "chatId": group_id(g)}, text
)
return True
except Exception as e:
log.error("send_message error: %s", e)
return False
async def send_to_chat(profile_id: int, chat_type: str, chat_id: int, text: str) -> bool:
"""Send a message directly to a chat by its (type, id) ref. Raises on failure."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_send_text_message({"chatType": chat_type, "chatId": chat_id}, text)
return True
async def refresh_lists(profile_id: int) -> None:
"""Re-fetch contacts and groups (with channel classification) for a running profile.
Called when rendering the profile page so member counts and lists are current —
group joins don't emit contactConnected, so the cached lists would otherwise stale.
"""
b = get_running(profile_id)
if not b or not b.chat:
return
try:
user = await b.chat.api_get_active_user()
if not user:
return
uid = user["userId"]
b.contacts = await b.chat.api_list_contacts(uid)
groups = await b.chat.api_list_groups(uid)
for g in groups:
await _classify_group(b.chat, g)
b.groups = groups
except Exception:
log.exception("refresh_lists failed for %d", profile_id)
def _normalize_item(ci: dict) -> dict:
"""Flatten a ChatItem into {id, ts, text, outgoing, sender} for the UI."""
meta = ci.get("meta", {})
chat_dir = ci.get("chatDir", {})
dir_type = chat_dir.get("type", "")
outgoing = dir_type.endswith("Snd")
# Prefer meta.itemText; fall back to content.msgContent.text
text = meta.get("itemText") or ci.get("content", {}).get("msgContent", {}).get("text", "")
# Sender name: group messages carry the member; direct/own use a generic label
sender = ""
if dir_type == "groupRcv":
sender = chat_dir.get("groupMember", {}).get("localDisplayName", "")
return {
"id": meta.get("itemId"),
"ts": meta.get("itemTs", ""),
"text": text,
"outgoing": outgoing,
"sender": sender,
"deleted": "itemDeleted" in meta,
}
async def get_chat_history(
profile_id: int, chat_type: str, chat_id: int, count: int = 50
) -> list[dict]:
"""Return the last `count` messages of a chat, oldest-first, normalized for the UI."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
chat = await b.chat.api_get_chat(chat_type, chat_id, count)
items = chat.get("chatItems", [])
return [_normalize_item(ci) for ci in items]
async def clear_chat(profile_id: int, chat_type: str, chat_id: int) -> bool:
"""Clear a conversation's messages but keep the contact/group (delete mode 'messages')."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
ref = ("@" if chat_type == "direct" else "#") + str(chat_id)
r = await b.chat.send_chat_cmd(f"/_delete {ref} messages")
if isinstance(r, dict) and r.get("type") == "chatCmdError":
raise RuntimeError(f"clear failed: {r}")
return True
async def delete_contact(profile_id: int, contact_id: int) -> bool:
"""Delete a contact entirely (delete mode 'full', notifies the contact)."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_delete_chat("direct", contact_id, {"type": "full", "notify": True})
# Refresh the cached contact list so the row disappears
try:
user = await b.chat.api_get_active_user()
if user:
b.contacts = await b.chat.api_list_contacts(user["userId"])
except Exception:
pass
return True
# ── Broadcast bot helpers (mirror the official simplex-broadcast-bot) ────────────
# Publishers are configured as a list of "Name" or "ID:Name" strings. The official
# bot matches a KnownContact by contactId AND display name; we match on either, so
# names alone (what the UI collects) keep working.
def _parse_publishers(pubs: list) -> tuple[set[int], set[str]]:
ids: set[int] = set()
names: set[str] = set()
for p in pubs or []:
s = str(p).strip()
if not s:
continue
if ":" in s:
left, right = s.split(":", 1)
if left.strip().isdigit():
ids.add(int(left.strip()))
names.add(right.strip())
continue
names.add(s)
return ids, names
def _publisher_names(pubs: list) -> str:
_, names = _parse_publishers(pubs)
return ", ".join(sorted(names)) if names else "(no publishers configured)"
def _bc_welcome(config: dict, name: str) -> str:
w = (config.get("welcome_message") or "").strip()
if w:
return w
return (
"Hello! I am a broadcast bot.\n"
f"I broadcast messages to all connected users from {_publisher_names(config.get('publishers', []))}."
)
def _bc_prohibited(config: dict) -> str:
p = (config.get("prohibited_message") or "").strip()
if p:
return p
return (
f"Sorry, only these users can broadcast messages: {_publisher_names(config.get('publishers', []))}. "
"Your message is deleted."
)
# Content types the broadcast bot will relay (matches the official allowlist).
_BC_ALLOWED_CONTENT = {"text", "link"}
async def _handle_broadcast_message(
b: "RunningBot", chat: Any, config: dict, item: dict, chat_info: dict, mc: dict, text: str
) -> None:
"""Mirror simplex-broadcast-bot: publishers' messages go to all contacts; everyone
else gets the prohibited reply and their message is deleted."""
ids, names = _parse_publishers(config.get("publishers", []))
ct = chat_info.get("contact", {})
sender_id = ct.get("contactId")
sender_name = ct.get("localDisplayName", "")
is_publisher = (sender_id in ids) or (sender_name in names)
async def reply(msg: str) -> None:
try:
await chat.api_send_text_reply(item, msg)
except Exception:
pass
if not is_publisher:
await reply(_bc_prohibited(config))
item_id = item.get("chatItem", {}).get("meta", {}).get("itemId")
if sender_id is not None and item_id is not None:
try: # internal delete (a received message can't be deleted for the sender)
await chat.api_delete_chat_items("direct", sender_id, [item_id], "internal")
except Exception:
log.exception("broadcast: failed to delete non-publisher message")
return
if mc.get("type") not in _BC_ALLOWED_CONTENT or not text:
await reply("Message is not supported (text and links only).")
return
# Native broadcast: one /feed command fans out to every contact and reports counts.
try:
r = await chat.send_chat_cmd(f"/feed {text}")
except Exception as e:
log.error("broadcast /feed error: %s", e)
await reply("Could not broadcast right now, please try again.")
return
if isinstance(r, dict) and r.get("type") == "broadcastSent":
s, f = r.get("successes", 0), r.get("failures", 0)
_append_log(b, f"Broadcast → {s} ok, {f} errors")
await reply(f"Forwarded to {s} contact(s), {f} errors")
else:
log.error("broadcast unexpected response: %s", r)
await reply("Broadcast failed.")
# ── RSS bot helpers ──────────────────────────────────────────────────────────────
def _fetch_feed(url: str) -> list[dict]:
"""Fetch + parse an RSS 2.0 or Atom feed → newest-first list of {id,title,link}.
Uses only the stdlib (urllib + ElementTree), no extra dependencies."""
req = urllib.request.Request(url, headers={"User-Agent": "simplex-rss-bot/1.0"})
with urllib.request.urlopen(req, timeout=20) as r: # noqa: S310 - user-configured feed
data = r.read()
root = ET.fromstring(data)
out: list[dict] = []
items = root.findall(".//item") # RSS 2.0
if items:
for it in items:
title = (it.findtext("title") or "").strip()
link = (it.findtext("link") or "").strip()
eid = (it.findtext("guid") or link or title).strip()
out.append({"id": eid, "title": title, "link": link})
else: # Atom
ns = "{http://www.w3.org/2005/Atom}"
for e in root.findall(f".//{ns}entry"):
title = (e.findtext(f"{ns}title") or "").strip()
le = e.find(f"{ns}link")
link = (le.get("href") if le is not None else "") or ""
eid = (e.findtext(f"{ns}id") or link or title).strip()
out.append({"id": eid, "title": title, "link": link})
return out
def _rss_format(e: dict) -> str:
title = e.get("title") or "(untitled)"
return f"{title}\n{e['link']}" if e.get("link") else title
async def _rss_ensure_channel(profile_id: int, b: "RunningBot", chat: Any, user_id: int,
name: str, config: dict) -> int | None:
"""Find or create the broadcast channel (observer group) for this feed; persist its id."""
import db as _db
gid = config.get("channel_gid")
if gid:
try:
groups = await chat.api_list_groups(user_id)
if any(group_id(g) == gid for g in groups):
return gid
except Exception:
return gid # assume still valid if the lookup failed
try:
info = await chat.api_new_group(user_id, {
"displayName": f"{name} Feed", "fullName": "",
"groupPreferences": {"history": {"enable": "on"}}, # new subscribers see recent posts
})
gid = info["groupId"]
await chat.api_create_group_link(gid, "observer") # channel = observer link
config["channel_gid"] = gid
_db.update_config(profile_id, config)
_append_log(b, f"RSS channel created (group {gid})")
return gid
except Exception:
log.exception("rss: failed to create channel")
return None
async def _rss_poll(b: "RunningBot", chat: Any, gid: int | None, config: dict,
seed: bool, max_post: int | None = None) -> None:
"""Fetch the feed and broadcast items not seen before.
seed=True → only record ids (no posting), used on restart so we don't replay.
max_post=N → post at most the N latest new items (used to populate a fresh channel
without dumping the bot's entire backlog)."""
url = (config.get("feed_url") or "").strip()
if not url:
return
try:
entries = await asyncio.to_thread(_fetch_feed, url)
except Exception as e:
log.error("rss fetch error: %s", e)
_append_log(b, f"RSS fetch error: {e}")
return
new = [e for e in entries if e["id"] not in b.rss_seen]
for e in new:
b.rss_seen.add(e["id"]) # mark all seen, even if we only post the latest few
if seed:
_append_log(b, f"RSS seeded {len(entries)} existing item(s)")
return
if not gid or not new:
return
to_post = new if max_post is None else new[:max_post] # new is newest-first
for e in reversed(to_post): # post oldest → newest
try:
await chat.api_send_text_message({"chatType": "group", "chatId": gid}, _rss_format(e))
except Exception:
log.exception("rss: failed to post to channel")
_append_log(b, f"RSS posted {len(to_post)} item(s)")
async def _run_bot(
profile_id: int,
name: str,
bot_type: str,
db_prefix: str,
config: dict,
on_address: callable,
) -> None:
"""Inner coroutine — runs the simplex-chat event loop for one profile."""
try:
from simplex_chat import ChatApi, SqliteDb
except ImportError:
log.error("simplex-chat Python package not installed. Run: pip install simplex-chat")
return
b = _running[profile_id]
try:
chat = await ChatApi.init(SqliteDb(file_prefix=db_prefix))
b.chat = chat
# libsimplex /_start requires an active user to exist first
user = await chat.api_get_active_user()
if not user:
user = await chat.api_create_active_user(_profile_dict(name, config))
await chat.start_chat()
user_id = user["userId"]
# Sync profile from config so edits made while stopped take effect on start
await _sync_profile(chat, user_id, name, config)
existing = await chat.api_get_user_address(user_id)
if existing:
# api_get_user_address returns UserContactLink; link is nested under connLinkContact
link = existing["connLinkContact"]
else:
# api_create_user_address returns CreatedConnLink directly
link = await chat.api_create_user_address(user_id)
address = link.get("connShortLink") or link.get("connFullLink", "")
b.address = address
await on_address(profile_id, address)
# Configure address settings based on profile type
settings: dict = {"businessAddress": False, "autoAccept": {"acceptIncognito": False}}
if bot_type == "user":
pass # plain user: auto-accept on, no auto-reply
elif bot_type == "business":
# business account: each connecting customer becomes a group chat the
# operator handles in the UI. Optional welcome auto-reply if configured.
settings["businessAddress"] = True
welcome = (config.get("welcome_message") or "").strip()
if welcome:
settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type == "support":
settings["businessAddress"] = True
welcome = config.get("welcome_message", f"Welcome to {name} support.")
settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type == "broadcast":
# auto-reply greets each new contact (default lists allowed publishers)
settings["autoReply"] = {"type": "text", "text": _bc_welcome(config, name)}
elif bot_type == "rss":
welcome = config.get("welcome_message") or f"You're subscribed to {name}."
settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type in ("echo", "llm", "directory", "deadmans"):
welcome = config.get("welcome_message", f"Connected to {name}.")
settings["autoReply"] = {"type": "text", "text": welcome}
await chat.api_set_address_settings(user_id, settings)
# Refresh contacts/groups
async def refresh() -> None:
try:
b.contacts = await chat.api_list_contacts(user_id)
groups = await chat.api_list_groups(user_id)
# Classify each group as channel (observer link) vs group (member link)
for g in groups:
await _classify_group(chat, g)
b.groups = groups
except Exception:
log.exception("refresh failed for bot %d", profile_id)
await refresh()
# Dead man's switch state: fire `message` if no check-in within checkin_hours.
dms_interval_s = float(config.get("checkin_hours", 24)) * 3600
dms_last_checkin = datetime.now(timezone.utc)
dms_fired = False
if bot_type == "deadmans":
_append_log(b, f"Dead man's switch armed — deadline in {config.get('checkin_hours', 24)}h")
dir_tick = 0 # directory bots periodically scan for newly-added groups
# RSS bot: ensure a broadcast channel and seed seen-items so we don't replay the
# whole feed on startup; then poll on an interval.
rss_poll_s = float(config.get("poll_seconds", 300))
if bot_type == "rss":
b.rss_gid = await _rss_ensure_channel(profile_id, b, chat, user_id, name, config)
if b.rss_gid and not config.get("rss_populated"):
# first run for this feed: fill the channel with the latest items so new
# subscribers see content (recent history). Done once, then flagged.
await _rss_poll(b, chat, b.rss_gid, config, seed=False, max_post=5)
config["rss_populated"] = True
import db as _db
_db.update_config(profile_id, config)
else:
# already populated on a previous run — just record current ids, don't replay
await _rss_poll(b, chat, b.rss_gid, config, seed=True)
b.rss_next_poll = time.time() + rss_poll_s
await refresh()
# Event loop
while True:
evt = await chat.recv_chat_event(500_000)
# RSS bot: poll the feed on its interval and broadcast new items to the channel
if bot_type == "rss" and time.time() >= b.rss_next_poll:
await _rss_poll(b, chat, b.rss_gid, config, seed=False)
b.rss_next_poll = time.time() + rss_poll_s
# Directory bot: ~every 30s, refresh groups and register/auto-join new ones
if bot_type == "directory":
dir_tick += 1
if dir_tick >= 60:
dir_tick = 0
await refresh()
await _directory_register_new_groups(b, chat, config, name)
# Dead man's switch: fire once if the check-in window has elapsed
if bot_type == "deadmans" and not dms_fired:
elapsed = (datetime.now(timezone.utc) - dms_last_checkin).total_seconds()
if elapsed > dms_interval_s:
dms_fired = True
n = await _fire_deadmans(chat, user_id, config)
_append_log(b, f"Dead man's switch FIRED → notified {n} contact(s)")
if evt is None:
continue
tag = evt.get("type", "")
b.log_lines.append(f"[{tag}]")
if len(b.log_lines) > 200:
b.log_lines = b.log_lines[-200:]
if tag == "contactConnected":
await refresh()
ct = evt.get("contact", {})
ct_name = ct.get("localDisplayName", "?")
_append_log(b, f"Contact connected: {ct_name}")
# echo replies on message; broadcast/others greet via the auto-reply
# configured in address settings, so nothing to do on connect here.
elif tag == "newChatItems":
items = evt.get("chatItems", [])
for item in items:
ci = item.get("chatItem", {})
# Robust incoming-vs-outgoing: chatDir ".../Rcv" = received,
# ".../Snd" = sent by us. Avoids replying to our own messages.
chat_dir = ci.get("chatDir", {}).get("type", "")
incoming = chat_dir.endswith("Rcv")
if incoming:
content = ci.get("content", {})
mc = content.get("msgContent", {})
text = mc.get("text", "")
chat_info = item.get("chatInfo", {})
_append_log(b, f"Message: {text[:80]}")
# Cross-account notification for any received message
if text:
ci_type = chat_info.get("type")
if ci_type == "direct":
ct = chat_info.get("contact", {})
_notify_ref = ("direct", ct.get("contactId"), ct.get("localDisplayName", ""))
elif ci_type == "group":
gm = ci.get("chatDir", {}).get("groupMember", {}) or {}
_notify_ref = (
"group",
chat_info.get("groupInfo", {}).get("groupId"),
gm.get("localDisplayName", ""),
)
else:
_notify_ref = (None, None, "")
if _notify_ref[1] is not None:
record_notification(
profile_id, name, _notify_ref[0], _notify_ref[1],
_notify_ref[2], text,
)
if bot_type == "echo" and text:
try:
await chat.api_send_text_reply(item, f"Echo: {text}")
except Exception:
pass
elif bot_type == "support" and text:
await _handle_llm_message(
b, chat, config, item, chat_info, text, DEFAULT_SUPPORT_PROMPT
)
elif bot_type == "llm" and text:
await _handle_llm_message(
b, chat, config, item, chat_info, text, DEFAULT_LLM_PROMPT
)
elif bot_type == "directory" and text:
await _handle_directory_message(
b, chat, config, name, item, chat_info, text
)
elif bot_type == "deadmans" and text:
owner = config.get("owner")
sender = chat_info.get("contact", {}).get("localDisplayName", "")
# Any contact checks in, unless an owner is configured
if not owner or sender == owner:
dms_last_checkin = datetime.now(timezone.utc)
dms_fired = False
hrs = config.get("checkin_hours", 24)
try:
await chat.api_send_text_reply(
item, f"✓ Check-in received. Switch reset — next deadline in {hrs}h."
)
except Exception:
pass
elif bot_type == "broadcast":
await _handle_broadcast_message(
b, chat, config, item, chat_info, mc, text
)
except asyncio.CancelledError:
pass
except Exception as e:
log.exception("Bot %d crashed: %s", profile_id, e)
_append_log(b, f"ERROR: {e}")
finally:
if b.chat:
try:
await b.chat.close()
except Exception:
pass
def _append_log(b: RunningBot, line: str) -> None:
b.log_lines.append(line)
if len(b.log_lines) > 200:
b.log_lines = b.log_lines[-200:]
async def _handle_directory_message(
b: RunningBot, chat: Any, config: dict, name: str, item: dict, chat_info: dict, text: str
) -> None:
"""Directory bot: super-user approval commands, and search for everyone else."""
import directory as _dir
safe = safe_name(name)
superusers = config.get("superusers", []) or []
sender = chat_info.get("contact", {}).get("localDisplayName", "")
is_super = sender in superusers
t = text.strip()
async def reply(msg: str) -> None:
try:
await chat.api_send_text_reply(item, msg)
except Exception:
pass
if t.startswith("/"):
parts = t.split()
cmd = parts[0].lower()
if cmd == "/help":
await reply("Send a search term to find groups. Admins: /list, /approve <id>, /reject <id>.")
elif cmd == "/list":
if not is_super:
await reply("Only admins can list pending submissions.")
return
pend = _dir.entries_by_status(safe, "pending")
if not pend:
await reply("No pending submissions.")
else:
await reply("Pending:\n" + "\n".join(
f"#{e['id']} {e['displayName']} ({'channel' if e['is_channel'] else 'group'})" for e in pend
))
elif cmd in ("/approve", "/reject"):
if not is_super:
await reply("Only admins can approve or reject.")
return
if len(parts) < 2 or not parts[1].isdigit():
await reply(f"Usage: {cmd} <id>")
return
eid = int(parts[1])
status = "approved" if cmd == "/approve" else "rejected"
e = _dir.set_status(safe, eid, status)
if not e:
await reply(f"No submission #{eid}.")
return
if status == "approved":
_dir.publish(safe)
await reply(f"✓ Approved #{eid} '{e['displayName']}' — now listed on the directory site.")
else:
await reply(f"Rejected #{eid} '{e['displayName']}'.")
else:
await reply("Unknown command. Send /help for options.")
else:
results = _dir.search(safe, t)
if not results:
await reply(f"No groups found for '{t}'. Send /help for options.")
else:
await reply("Found:\n" + "\n".join(f"{e['displayName']}: {e['link'] or '(no link)'}" for e in results[:10]))
async def _directory_register_new_groups(
b: RunningBot, chat: Any, config: dict, name: str
) -> None:
"""Auto-join groups the directory bot was added to, and register them as pending."""
import directory as _dir
safe = safe_name(name)
superusers = config.get("superusers", []) or []
for g in b.groups:
gid = group_id(g)
membership = g.get("membership") or {}
status = membership.get("memberStatus", "")
role = membership.get("memberRole", "")
if status == "invited":
try:
await chat.api_join_group(gid) # someone added the bot; accept
except Exception:
pass
continue # registered on a later tick once it syncs
if role != "owner" and not _dir.find_by_group(_dir.load_state(safe), gid):
link = ""
try:
link = await chat.api_get_group_link_str(gid)
except Exception:
pass
entry, is_new = _dir.add_pending(
safe, gid, group_name(g), link, bool(g.get("is_channel")),
g.get("groupSummary") or {}, (g.get("groupProfile") or {}).get("shortDescr"), "group",
)
if is_new:
_append_log(b, f"Directory: registered pending #{entry['id']} '{entry['displayName']}'")
for c in b.contacts:
if c["localDisplayName"] in superusers:
try:
await chat.api_send_text_message(
{"chatType": "direct", "chatId": c["contactId"]},
f"New directory submission #{entry['id']}: '{entry['displayName']}'. "
f"/approve {entry['id']} or /reject {entry['id']}.",
)
except Exception:
pass
async def _fire_deadmans(chat: Any, user_id: int, config: dict) -> int:
"""Send the dead-man message to recipients (named, or all contacts). Returns count sent."""
message = config.get("message") or "Dead man's switch triggered — no check-in was received."
recipients = config.get("recipients") or [] # list of display names; empty = all contacts
try:
contacts = await chat.api_list_contacts(user_id)
except Exception:
return 0
targets = contacts if not recipients else [c for c in contacts if c["localDisplayName"] in recipients]
sent = 0
for c in targets:
try:
await chat.api_send_text_message({"chatType": "direct", "chatId": c["contactId"]}, message)
sent += 1
except Exception:
pass
return sent
async def _handle_llm_message(
b: RunningBot, chat: Any, config: dict, item: dict, chat_info: dict, text: str,
default_prompt: str = DEFAULT_LLM_PROMPT,
) -> None:
"""Reply to an incoming message via the configured OpenAI-compatible LLM
(works with Ollama's `ollama serve` at http://localhost:11434/v1, OpenAI, Grok…).
`system_prompt` (the startup context), `api_base` (the API URL), `model` and an
optional `api_key` come from config. If no api_base is set the bot stays silent.
"""
api_base = (config.get("api_base") or "").strip()
if not api_base:
return # no LLM configured for this bot
contact_id = chat_info.get("contact", {}).get("contactId")
system_prompt = config.get("system_prompt") or default_prompt
model = config.get("model") or "grok-2"
api_key = config.get("api_key") or ""
# Maintain a short rolling history per contact for conversational context.
hist = b.histories.setdefault(contact_id, [])
hist.append({"role": "user", "content": text})
if len(hist) > 20:
del hist[:-20]
messages = [{"role": "system", "content": system_prompt}, *hist]
try:
reply = await llm_chat(api_base, api_key, model, messages)
except Exception as e:
log.error("LLM error: %s", e)
_append_log(b, f"LLM error: {e}")
try:
await chat.api_send_text_reply(
item, "Sorry, I'm having trouble responding right now. Please try again shortly."
)
except Exception:
pass
return
hist.append({"role": "assistant", "content": reply})
_append_log(b, f"LLM reply: {reply[:80]}")
try:
await chat.api_send_text_reply(item, reply)
except Exception:
log.exception("failed to send LLM reply")
async def global_status() -> dict:
"""Aggregate manager-wide status: running profiles, totals, and server config.
Server/network info (SMP+XFTP counts, operators) is read from the first running
profile — these are the shared SimpleX presets, so one profile represents all.
"""
running = [b for b in _running.values() if not b.task.done()]
status = {
"profiles_running": len(running),
"contacts": sum(len(b.contacts) for b in running),
"groups": sum(len(b.groups) for b in running),
"smp_servers": 0,
"xftp_servers": 0,
"operators": [],
"proxy_mode": "",
}
for b in running:
if not b.chat:
continue
try:
user = await b.chat.api_get_active_user()
r = await b.chat.send_chat_cmd(f"/_servers {user['userId']}")
for op in r.get("userServers", []):
o = op.get("operator") or {}
if o.get("enabled") and o.get("tradeName"):
status["operators"].append(o["tradeName"])
status["smp_servers"] += sum(
1 for s in op.get("smpServers", []) if s.get("enabled") and not s.get("deleted")
)
status["xftp_servers"] += sum(
1 for s in op.get("xftpServers", []) if s.get("enabled") and not s.get("deleted")
)
nc = await b.chat.send_chat_cmd("/network")
status["proxy_mode"] = nc.get("networkConfig", {}).get("smpProxyMode", "")
break # one running profile is representative
except Exception:
log.exception("global_status server read failed")
continue
return status
def _server_host(s: str) -> str:
"""Extract the readable host from a server URI like smp://key@host1,host2.onion."""
try:
after = s.split("://", 1)[1] if "://" in s else s
if "@" in after:
after = after.split("@", 1)[1]
return after.split(",")[0]
except Exception:
return s
def _server_row(s: dict) -> dict:
return {
"host": _server_host(s.get("server", "")),
"enabled": bool(s.get("enabled")),
"preset": bool(s.get("preset")),
"deleted": bool(s.get("deleted")),
}
def _first_running_chat():
for b in _running.values():
if not b.task.done() and b.chat:
return b
return None
async def get_servers_detail() -> dict:
"""Full per-operator SMP/XFTP server breakdown + network config (first running profile)."""
b = _first_running_chat()
if not b:
return {"profile_name": None, "operators": [], "network": {}}
try:
user = await b.chat.api_get_active_user()
r = await b.chat.send_chat_cmd(f"/_servers {user['userId']}")
nc = await b.chat.send_chat_cmd("/network")
operators = []
for op in r.get("userServers", []):
o = op.get("operator") or {}
operators.append({
"name": o.get("tradeName") or "Custom",
"enabled": bool(o.get("enabled")),
"smp": [_server_row(s) for s in op.get("smpServers", [])],
"xftp": [_server_row(s) for s in op.get("xftpServers", [])],
})
return {"profile_name": b.name, "operators": operators, "network": nc.get("networkConfig", {})}
except Exception:
log.exception("get_servers_detail failed")
return {"profile_name": None, "operators": [], "network": {}}
async def get_network_config() -> dict:
"""Just the networkConfig (proxy/host/session modes) from the first running profile."""
b = _first_running_chat()
if not b:
return {}
try:
nc = await b.chat.send_chat_cmd("/network")
return nc.get("networkConfig", {})
except Exception:
log.exception("get_network_config failed")
return {}
def _profile_dict(name: str, config: dict) -> dict:
"""Build a SimpleX Profile dict (displayName/fullName/shortDescr/image) from config."""
profile: dict = {"displayName": name, "fullName": config.get("full_name", "")}
if config.get("bio"):
profile["shortDescr"] = config["bio"]
if config.get("avatar"):
profile["image"] = config["avatar"] # base64 data URI
return profile
async def _sync_profile(chat: Any, user_id: int, name: str, config: dict) -> None:
"""Push the config-derived profile to the live account. No-op if unchanged."""
try:
await chat.api_update_profile(user_id, _profile_dict(name, config))
except Exception:
log.exception("profile sync failed")
async def update_profile(
profile_id: int, full_name: str | None, bio: str | None, avatar: str | None
) -> dict:
"""Persist profile edits to the manager DB and apply them live if running.
`avatar` is a base64 data URI; pass None to leave the existing avatar unchanged,
or an empty string to remove it. Returns the updated config dict.
"""
import db as _db
prof = _db.get_profile(profile_id)
if not prof:
raise RuntimeError("Profile not found")
config = json.loads(prof.get("config") or "{}")
if full_name is not None:
config["full_name"] = full_name
if bio is not None:
config["bio"] = bio
if avatar is not None:
if avatar:
config["avatar"] = avatar
else:
config.pop("avatar", None)
_db.update_config(profile_id, config)
b = get_running(profile_id)
if b and b.chat:
user = await b.chat.api_get_active_user()
if user:
await _sync_profile(b.chat, user["userId"], prof["name"], config)
return config
async def _classify_group(chat: Any, g: dict) -> None:
"""Annotate a GroupInfo in place with link info: is_channel, link.
A channel is a group whose join-link role is "observer". Groups with a
"member"+ link (or no link at all) are regular 2-way groups.
"""
g["is_channel"] = False
g["link"] = ""
try:
link_obj = await chat.api_get_group_link(g["groupId"])
except Exception:
return # no link → plain private group
g["is_channel"] = link_obj.get("acceptMemberRole") == "observer"
conn = link_obj.get("connLinkContact", {})
g["link"] = conn.get("connShortLink") or conn.get("connFullLink", "")
async def create_channel(profile_id: int, name: str) -> str:
"""Create a group with an observer join link (a SimpleX channel). Returns the link."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
user = await b.chat.api_get_active_user()
if not user:
raise RuntimeError("No active user for this profile")
info = await b.chat.api_new_group(user["userId"], {"displayName": name, "fullName": ""})
link = await b.chat.api_create_group_link(info["groupId"], "observer")
# Refresh cached group list (re-classifies all groups including the new channel)
try:
groups = await b.chat.api_list_groups(user["userId"])
for g in groups:
await _classify_group(b.chat, g)
b.groups = groups
except Exception:
log.exception("group refresh after create_channel failed")
return link
async def create_group(profile_id: int, name: str) -> str:
"""Create a regular 2-way group with a member join link. Returns the link."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
user = await b.chat.api_get_active_user()
if not user:
raise RuntimeError("No active user for this profile")
info = await b.chat.api_new_group(user["userId"], {"displayName": name, "fullName": ""})
link = await b.chat.api_create_group_link(info["groupId"], "member")
try:
groups = await b.chat.api_list_groups(user["userId"])
for g in groups:
await _classify_group(b.chat, g)
b.groups = groups
except Exception:
log.exception("group refresh after create_group failed")
return link
async def get_group_members(profile_id: int, gid: int) -> list[dict]:
"""Return the member list for a group/channel.
api_list_members excludes the user themselves, so we prepend our own
membership — this makes the dialog count match groupSummary.currentMembers.
"""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
members = await b.chat.api_list_members(gid)
result = [
{"name": m["localDisplayName"], "role": m["memberRole"], "status": m["memberStatus"]}
for m in members
]
# Prepend ourselves (the owner), pulled from the cached group's membership.
for g in b.groups:
if group_id(g) == gid:
me = g.get("membership") or {}
if me:
result.insert(0, {
"name": me.get("localDisplayName", "you") + " (you)",
"role": me.get("memberRole", "owner"),
"status": me.get("memberStatus", ""),
})
break
return result
async def join_group(profile_id: int, gid: int) -> bool:
"""Accept a pending group invitation (memberStatus 'invited' -> joined)."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_join_group(gid)
await refresh_lists(profile_id)
return True
async def leave_group(profile_id: int, gid: int) -> bool:
"""Leave a group/channel (api_leave_group), then refresh the cached lists."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_leave_group(gid)
await refresh_lists(profile_id)
return True
async def delete_group(profile_id: int, gid: int) -> bool:
"""Delete a group/channel entirely (owner action; delete mode 'full', notifies members)."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_delete_chat("group", gid, {"type": "full", "notify": True})
await refresh_lists(profile_id)
return True
async def get_group_link(profile_id: int, gid: int) -> str:
"""Return the existing join link for a group/channel (empty if none)."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
try:
return await b.chat.api_get_group_link_str(gid)
except Exception:
return ""