Files
simplex-manager/manager/profiles.py
Jon 7c712c9ee3 Rich chat messages (reactions, replies, files, images); RSS poll countdown; Speakers' Corner directory page updates
- Chat: extract reactions, quoted replies, file/image data in _normalize_item
- Chat: render emoji reaction pills, reply-quote blocks, inline image previews, file blocks with Accept/Download
- Chat: reply UI (hover → set reply → preview bar above compose → send with quotedItemId)
- Chat: emoji picker strip (6 quick-react emojis) on message hover
- Chat: POST /react and POST /file/{id}/receive and GET /file/{id}/download endpoints
- Chat: file decryption via core.chat_read_file (native libsimplex FFI), served with correct MIME type
- List: RSS bot cards show live next-poll countdown (ticks every second via status API poll_next field)
- Directory: rename SimpleXXX → Speakers' Corner Online Directory throughout
- Directory: add hero banner image, About page link, QR popout, title hyperlink
- Directory: new about.html — Online Safety Act, Digital ID, 65k arrests stat, community rules

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-07 20:23:00 +01:00

1420 lines
55 KiB
Python

"""Bot lifecycle management — start, stop, status, message sending."""
import asyncio
import json
import logging
import time
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
log = logging.getLogger(__name__)
# Directory bot websites live in <repo>/web/<safe_name>/, each self-contained
# (its own index.html + data/), generated from the web/index.html template.
WEB_DIR = Path(__file__).parent.parent / "web"
def safe_name(name: str) -> str:
return name.lower().replace(" ", "_")
def generate_directory_site(name: str) -> str:
"""Generate a per-bot directory website from the web/index.html template.
Substitutes the placeholder name (SimpleXXX -> bot name) and seeds an empty
listing so the page loads cleanly. Returns the relative site path.
Each directory bot gets its own folder, so multiple can coexist (not a singleton).
"""
template = (WEB_DIR / "index.html").read_text(encoding="utf-8")
page = template.replace("SimpleXXX", name)
site = WEB_DIR / safe_name(name)
(site / "data").mkdir(parents=True, exist_ok=True)
(site / "index.html").write_text(page, encoding="utf-8")
listing = site / "data" / "listing.json"
if not listing.exists():
listing.write_text('{"entries": []}', encoding="utf-8")
return f"{safe_name(name)}/index.html"
# ── Notifications ───────────────────────────────────────────────────────────────
# In-memory, cross-account feed of received messages. Ephemeral (clears on restart).
_notifications: list[dict] = []
_notif_seq = 0
_NOTIF_MAX = 200
def record_notification(
profile_id: int, profile_name: str, chat_type: str, chat_id: int, sender: str, text: str
) -> None:
global _notif_seq
_notif_seq += 1
_notifications.append({
"id": _notif_seq,
"profile_id": profile_id,
"profile_name": profile_name,
"chat_type": chat_type,
"chat_id": chat_id,
"sender": sender,
"text": text[:140],
"ts": datetime.now(timezone.utc).isoformat(),
"read": False,
})
if len(_notifications) > _NOTIF_MAX:
del _notifications[:-_NOTIF_MAX]
def get_notifications(limit: int = 50) -> list[dict]:
"""Most-recent-first."""
return list(reversed(_notifications[-limit:]))
def unread_count() -> int:
return sum(1 for n in _notifications if not n["read"])
def mark_all_read() -> None:
for n in _notifications:
n["read"] = True
# Default system prompts when none is configured.
DEFAULT_SUPPORT_PROMPT = (
"You are a helpful customer-support assistant. Answer concisely and politely. "
"If you don't know something, say so rather than guessing."
)
DEFAULT_LLM_PROMPT = (
"You are a helpful assistant. Answer concisely. "
"If you don't know something, say so rather than guessing."
)
async def llm_chat(
api_base: str, api_key: str, model: str, messages: list[dict], timeout: float = 60.0
) -> str:
"""Call an OpenAI-compatible /chat/completions endpoint and return the reply text.
Works with any provider that follows the OpenAI standard — Grok (api.x.ai/v1),
Ollama (localhost:11434/v1), OpenAI (api.openai.com/v1), etc. Only the base URL,
key and model differ. `api_base` should include the version path (e.g. .../v1).
"""
url = api_base.rstrip("/") + "/chat/completions"
body = json.dumps({"model": model, "messages": messages}).encode("utf-8")
def _call() -> str:
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/json")
if api_key:
req.add_header("Authorization", f"Bearer {api_key}")
with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310 - user-configured endpoint
return resp.read().decode("utf-8")
raw = await asyncio.to_thread(_call)
data = json.loads(raw)
return data["choices"][0]["message"]["content"]
# api_list_groups returns BARE GroupInfo dicts (verified against the live API):
# g["groupId"], g["groupProfile"]["displayName"],
# g["groupSummary"]["currentMembers"], g["membership"]["memberRole"]
# There is no "groupInfo" wrapper and no "members" list in this response.
#
# A "channel" is a group whose join link has acceptMemberRole == "observer"
# (joiners are read-only; only the owner broadcasts). A regular group's link
# has role "member" (2-way). This is the only thing that distinguishes them.
def group_name(g: dict) -> str:
return g["groupProfile"]["displayName"]
def group_id(g: dict) -> int:
return g["groupId"]
def group_member_count(g: dict) -> int:
return g.get("groupSummary", {}).get("currentMembers", 0)
BOT_TYPES = ["echo", "llm", "crypto", "broadcast", "support", "directory", "deadmans"]
USER_TYPES = ["user"]
BUSINESS_TYPES = ["business"] # cli accounts with a business address (per-customer group chats)
RSS_TYPES = ["rss"] # feed bots that post to a channel (their own category)
ALL_TYPES = BOT_TYPES + USER_TYPES + BUSINESS_TYPES + RSS_TYPES
@dataclass
class RunningBot:
profile_id: int
name: str
bot_type: str
task: asyncio.Task
address: str = ""
contacts: list[dict] = field(default_factory=list)
groups: list[dict] = field(default_factory=list)
log_lines: list[str] = field(default_factory=list)
chat: Any = None # simplex_chat ChatApi instance
# Per-contact LLM conversation history (contactId → [{role, content}, ...])
histories: dict[int, list[dict]] = field(default_factory=dict)
# Feed-style bot state (rss / crypto)
rss_seen: set = field(default_factory=set) # rss: entry ids already posted
poll_next: float = 0.0 # next scheduled poll (epoch seconds)
channel_gid: int | None = None # broadcast channel group id
file_cache: dict[int, dict] = field(default_factory=dict) # file_id → {name, path, crypto_args}
# profile_id → RunningBot
_running: dict[int, RunningBot] = {}
def is_running(profile_id: int) -> bool:
b = _running.get(profile_id)
return b is not None and not b.task.done()
def get_running(profile_id: int) -> RunningBot | None:
b = _running.get(profile_id)
if b and not b.task.done():
return b
return None
def all_statuses() -> dict[int, bool]:
return {pid: is_running(pid) for pid in _running}
async def start_bot(profile: dict, on_address: callable) -> None:
"""Start a bot for the given profile dict. Idempotent."""
pid = profile["id"]
if is_running(pid):
return
config = json.loads(profile.get("config") or "{}")
bot_type = profile["bot_type"]
db_prefix = profile["db_prefix"]
task = asyncio.create_task(
_run_bot(pid, profile["name"], bot_type, db_prefix, config, on_address),
name=f"bot-{pid}-{profile['name']}",
)
_running[pid] = RunningBot(
profile_id=pid,
name=profile["name"],
bot_type=bot_type,
task=task,
)
log.info("Started bot %d (%s / %s)", pid, profile["name"], bot_type)
async def stop_bot(profile_id: int) -> None:
b = _running.get(profile_id)
if b and not b.task.done():
b.task.cancel()
try:
await b.task
except asyncio.CancelledError:
pass
log.info("Stopped bot %d", profile_id)
async def send_message(profile_id: int, contact_or_group: str, text: str) -> bool:
"""Send a text message from a running bot. Returns True on success."""
b = get_running(profile_id)
if not b or not b.chat:
return False
try:
for c in b.contacts:
if c["localDisplayName"] == contact_or_group:
await b.chat.api_send_text_message(
{"chatType": "direct", "chatId": c["contactId"]}, text
)
return True
for g in b.groups:
if group_name(g) == contact_or_group:
await b.chat.api_send_text_message(
{"chatType": "group", "chatId": group_id(g)}, text
)
return True
except Exception as e:
log.error("send_message error: %s", e)
return False
async def send_to_chat(
profile_id: int, chat_type: str, chat_id: int, text: str,
reply_to_id: int | None = None,
) -> bool:
"""Send a message directly to a chat by its (type, id) ref. Raises on failure."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_send_text_message(
{"chatType": chat_type, "chatId": chat_id}, text, in_reply_to=reply_to_id
)
return True
async def send_reaction(
profile_id: int, chat_type: str, chat_id: int,
item_id: int, emoji: str, add: bool = True,
) -> None:
"""Add or remove an emoji reaction on a message."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_chat_item_reaction(
chat_type, chat_id, item_id, add, {"type": "emoji", "emoji": emoji}
)
async def accept_file(profile_id: int, file_id: int) -> None:
"""Accept an incoming file transfer (moves it from rcvInvitation → downloading)."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_receive_file(file_id)
async def refresh_lists(profile_id: int) -> None:
"""Re-fetch contacts and groups (with channel classification) for a running profile.
Called when rendering the profile page so member counts and lists are current —
group joins don't emit contactConnected, so the cached lists would otherwise stale.
"""
b = get_running(profile_id)
if not b or not b.chat:
return
try:
user = await b.chat.api_get_active_user()
if not user:
return
uid = user["userId"]
b.contacts = await b.chat.api_list_contacts(uid)
groups = await b.chat.api_list_groups(uid)
for g in groups:
await _classify_group(b.chat, g)
b.groups = groups
except Exception:
log.exception("refresh_lists failed for %d", profile_id)
def _normalize_item(ci: dict) -> dict:
"""Flatten a ChatItem into a dict for the UI, including reactions, quote, file."""
meta = ci.get("meta", {})
chat_dir = ci.get("chatDir", {})
dir_type = chat_dir.get("type", "")
outgoing = dir_type.endswith("Snd")
msg_content = ci.get("content", {}).get("msgContent", {})
content_type = msg_content.get("type", "text")
# Prefer meta.itemText; fall back to content.msgContent.text
text = meta.get("itemText") or msg_content.get("text", "")
# Inline image/video preview (base64 data URI embedded in the message)
image_preview = msg_content.get("image", "") if content_type in ("image", "video") else ""
# Sender name: group messages carry the member; direct/own use a generic label
sender = ""
if dir_type == "groupRcv":
sender = chat_dir.get("groupMember", {}).get("localDisplayName", "")
# Emoji reactions: [{emoji, count, me}]
reactions = []
for rc in ci.get("reactions", []):
r = rc.get("reaction", {})
if r.get("type") == "emoji":
reactions.append({
"emoji": r["emoji"],
"count": rc.get("totalReacted", 0),
"me": rc.get("userReacted", False),
})
# Quoted/reply item
quote = None
qi = ci.get("quotedItem")
if qi:
q_content = qi.get("content", {})
q_text = q_content.get("text", "")
q_dir = qi.get("chatDir") or {}
q_dir_type = q_dir.get("type", "")
if q_dir_type in ("directSnd", "groupSnd"):
q_sender = "You"
elif q_dir_type == "groupRcv":
q_sender = q_dir.get("groupMember", {}).get("localDisplayName", "")
else:
q_sender = ""
quote = {"id": qi.get("itemId"), "text": q_text, "sender": q_sender}
# File attachment
file_info = None
f = ci.get("file")
if f:
status = f.get("fileStatus", {}).get("type", "")
path = (f.get("fileSource") or {}).get("filePath", "")
file_info = {
"id": f.get("fileId"),
"name": f.get("fileName", ""),
"size": f.get("fileSize", 0),
"status": status,
"path": path,
}
return {
"id": meta.get("itemId"),
"ts": meta.get("itemTs", ""),
"text": text,
"content_type": content_type,
"image_preview": image_preview,
"outgoing": outgoing,
"sender": sender,
"deleted": "itemDeleted" in meta,
"reactions": reactions,
"quote": quote,
"file": file_info,
}
async def get_chat_history(
profile_id: int, chat_type: str, chat_id: int, count: int = 50
) -> list[dict]:
"""Return the last `count` messages of a chat, oldest-first, normalized for the UI."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
chat = await b.chat.api_get_chat(chat_type, chat_id, count)
items = chat.get("chatItems", [])
normalized = []
for ci in items:
norm = _normalize_item(ci)
f = ci.get("file")
if f and f.get("fileId"):
src = f.get("fileSource") or {}
b.file_cache[f["fileId"]] = {
"name": f.get("fileName", "file"),
"path": src.get("filePath", ""),
"crypto_args": src.get("cryptoArgs"),
}
normalized.append(norm)
return normalized
async def read_file_bytes(profile_id: int, file_id: int) -> tuple[bytes, str]:
"""Return (raw_bytes, filename) for a downloaded file, decrypting if needed."""
b = get_running(profile_id)
if not b:
raise RuntimeError("Profile is not running")
entry = b.file_cache.get(file_id)
if not entry or not entry.get("path"):
raise FileNotFoundError(f"File {file_id} not found in cache — load the chat first")
path = entry["path"]
crypto_args = entry.get("crypto_args")
if crypto_args:
from simplex_chat import core as sc_core
data = await sc_core.chat_read_file(path, crypto_args)
else:
data = Path(path).read_bytes()
return data, entry["name"]
async def clear_chat(profile_id: int, chat_type: str, chat_id: int) -> bool:
"""Clear a conversation's messages but keep the contact/group (delete mode 'messages')."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
ref = ("@" if chat_type == "direct" else "#") + str(chat_id)
r = await b.chat.send_chat_cmd(f"/_delete {ref} messages")
if isinstance(r, dict) and r.get("type") == "chatCmdError":
raise RuntimeError(f"clear failed: {r}")
return True
async def delete_contact(profile_id: int, contact_id: int) -> bool:
"""Delete a contact entirely (delete mode 'full', notifies the contact)."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_delete_chat("direct", contact_id, {"type": "full", "notify": True})
# Refresh the cached contact list so the row disappears
try:
user = await b.chat.api_get_active_user()
if user:
b.contacts = await b.chat.api_list_contacts(user["userId"])
except Exception:
pass
return True
# ── Broadcast bot helpers (mirror the official simplex-broadcast-bot) ────────────
# Publishers are configured as a list of "Name" or "ID:Name" strings. The official
# bot matches a KnownContact by contactId AND display name; we match on either, so
# names alone (what the UI collects) keep working.
def _parse_publishers(pubs: list) -> tuple[set[int], set[str]]:
ids: set[int] = set()
names: set[str] = set()
for p in pubs or []:
s = str(p).strip()
if not s:
continue
if ":" in s:
left, right = s.split(":", 1)
if left.strip().isdigit():
ids.add(int(left.strip()))
names.add(right.strip())
continue
names.add(s)
return ids, names
def _publisher_names(pubs: list) -> str:
_, names = _parse_publishers(pubs)
return ", ".join(sorted(names)) if names else "(no publishers configured)"
def _bc_welcome(config: dict, name: str) -> str:
w = (config.get("welcome_message") or "").strip()
if w:
return w
return (
"Hello! I am a broadcast bot.\n"
f"I broadcast messages to all connected users from {_publisher_names(config.get('publishers', []))}."
)
def _bc_prohibited(config: dict) -> str:
p = (config.get("prohibited_message") or "").strip()
if p:
return p
return (
f"Sorry, only these users can broadcast messages: {_publisher_names(config.get('publishers', []))}. "
"Your message is deleted."
)
# Content types the broadcast bot will relay (matches the official allowlist).
_BC_ALLOWED_CONTENT = {"text", "link"}
async def _handle_broadcast_message(
b: "RunningBot", chat: Any, config: dict, item: dict, chat_info: dict, mc: dict, text: str
) -> None:
"""Mirror simplex-broadcast-bot: publishers' messages go to all contacts; everyone
else gets the prohibited reply and their message is deleted."""
ids, names = _parse_publishers(config.get("publishers", []))
ct = chat_info.get("contact", {})
sender_id = ct.get("contactId")
sender_name = ct.get("localDisplayName", "")
is_publisher = (sender_id in ids) or (sender_name in names)
async def reply(msg: str) -> None:
try:
await chat.api_send_text_reply(item, msg)
except Exception:
pass
if not is_publisher:
await reply(_bc_prohibited(config))
item_id = item.get("chatItem", {}).get("meta", {}).get("itemId")
if sender_id is not None and item_id is not None:
try: # internal delete (a received message can't be deleted for the sender)
await chat.api_delete_chat_items("direct", sender_id, [item_id], "internal")
except Exception:
log.exception("broadcast: failed to delete non-publisher message")
return
if mc.get("type") not in _BC_ALLOWED_CONTENT or not text:
await reply("Message is not supported (text and links only).")
return
# Native broadcast: one /feed command fans out to every contact and reports counts.
try:
r = await chat.send_chat_cmd(f"/feed {text}")
except Exception as e:
log.error("broadcast /feed error: %s", e)
await reply("Could not broadcast right now, please try again.")
return
if isinstance(r, dict) and r.get("type") == "broadcastSent":
s, f = r.get("successes", 0), r.get("failures", 0)
_append_log(b, f"Broadcast → {s} ok, {f} errors")
await reply(f"Forwarded to {s} contact(s), {f} errors")
else:
log.error("broadcast unexpected response: %s", r)
await reply("Broadcast failed.")
# ── RSS bot helpers ──────────────────────────────────────────────────────────────
def _fetch_feed(url: str) -> list[dict]:
"""Fetch + parse an RSS 2.0 or Atom feed → newest-first list of {id,title,link}.
Uses only the stdlib (urllib + ElementTree), no extra dependencies."""
req = urllib.request.Request(url, headers={"User-Agent": "simplex-rss-bot/1.0"})
with urllib.request.urlopen(req, timeout=20) as r: # noqa: S310 - user-configured feed
data = r.read()
root = ET.fromstring(data)
out: list[dict] = []
items = root.findall(".//item") # RSS 2.0
if items:
for it in items:
title = (it.findtext("title") or "").strip()
link = (it.findtext("link") or "").strip()
eid = (it.findtext("guid") or link or title).strip()
out.append({"id": eid, "title": title, "link": link})
else: # Atom
ns = "{http://www.w3.org/2005/Atom}"
for e in root.findall(f".//{ns}entry"):
title = (e.findtext(f"{ns}title") or "").strip()
le = e.find(f"{ns}link")
link = (le.get("href") if le is not None else "") or ""
eid = (e.findtext(f"{ns}id") or link or title).strip()
out.append({"id": eid, "title": title, "link": link})
return out
def _rss_format(e: dict) -> str:
title = e.get("title") or "(untitled)"
return f"{title}\n{e['link']}" if e.get("link") else title
async def _ensure_channel(profile_id: int, b: "RunningBot", chat: Any, user_id: int,
channel_name: str, config: dict) -> int | None:
"""Find or create the broadcast channel (observer group) for a feed/ticker bot; persist its id."""
import db as _db
gid = config.get("channel_gid")
if gid:
try:
groups = await chat.api_list_groups(user_id)
if any(group_id(g) == gid for g in groups):
return gid
except Exception:
return gid # assume still valid if the lookup failed
try:
info = await chat.api_new_group(user_id, {
"displayName": channel_name, "fullName": "",
"groupPreferences": {"history": {"enable": "on"}}, # new subscribers see recent posts
})
gid = info["groupId"]
await chat.api_create_group_link(gid, "observer") # channel = observer link
config["channel_gid"] = gid
_db.update_config(profile_id, config)
_append_log(b, f"Channel created (group {gid})")
return gid
except Exception:
log.exception("failed to create channel")
return None
async def _rss_poll(b: "RunningBot", chat: Any, gid: int | None, config: dict,
seed: bool, max_post: int | None = None) -> None:
"""Fetch the feed and broadcast items not seen before.
seed=True → only record ids (no posting), used on restart so we don't replay.
max_post=N → post at most the N latest new items (used to populate a fresh channel
without dumping the bot's entire backlog)."""
url = (config.get("feed_url") or "").strip()
if not url:
return
try:
entries = await asyncio.to_thread(_fetch_feed, url)
except Exception as e:
log.error("rss fetch error: %s", e)
_append_log(b, f"RSS fetch error: {e}")
return
new = [e for e in entries if e["id"] not in b.rss_seen]
for e in new:
b.rss_seen.add(e["id"]) # mark all seen, even if we only post the latest few
if seed:
_append_log(b, f"RSS seeded {len(entries)} existing item(s)")
return
if not gid or not new:
return
to_post = new if max_post is None else new[:max_post] # new is newest-first
for e in reversed(to_post): # post oldest → newest
try:
await chat.api_send_text_message({"chatType": "group", "chatId": gid}, _rss_format(e))
except Exception:
log.exception("rss: failed to post to channel")
_append_log(b, f"RSS posted {len(to_post)} item(s)")
# ── Crypto price bot helpers (CoinGecko) ─────────────────────────────────────────
# Popular coins offered in the UI: CoinGecko id → display name.
CRYPTO_COINS = {
"bitcoin": "Bitcoin", "ethereum": "Ethereum", "tether": "Tether", "binancecoin": "BNB",
"solana": "Solana", "ripple": "XRP", "cardano": "Cardano", "dogecoin": "Dogecoin",
"polkadot": "Polkadot", "litecoin": "Litecoin", "tron": "TRON", "chainlink": "Chainlink",
}
# vs_currency → display symbol.
CRYPTO_CURRENCIES = {"usd": "$", "eur": "", "gbp": "£", "jpy": "¥", "aud": "A$", "cad": "C$"}
def _fetch_crypto(ids: list, vs: list) -> dict:
"""Fetch current prices from CoinGecko's simple/price endpoint (JSON)."""
q = urllib.parse.urlencode({"ids": ",".join(ids), "vs_currencies": ",".join(vs)})
url = f"https://api.coingecko.com/api/v3/simple/price?{q}"
req = urllib.request.Request(url, headers={"User-Agent": "simplex-crypto-bot/1.0"})
with urllib.request.urlopen(req, timeout=20) as r: # noqa: S310 - fixed CoinGecko host
return json.loads(r.read())
def _fmt_price(v) -> str:
try:
v = float(v)
except (TypeError, ValueError):
return str(v)
if v >= 1000:
return f"{v:,.0f}"
if v >= 1:
return f"{v:,.2f}"
return f"{v:.8f}".rstrip("0").rstrip(".")
def _crypto_format(prices: dict, coins: list, currencies: list) -> str | None:
lines = []
for cid in coins:
d = prices.get(cid)
if not d:
continue
nm = CRYPTO_COINS.get(cid, cid.title())
parts = [f"{CRYPTO_CURRENCIES.get(cur, '')}{_fmt_price(d[cur])}"
for cur in currencies if d.get(cur) is not None]
if parts:
lines.append(f"{nm}: " + " · ".join(parts))
if not lines:
return None
return f"Crypto prices @ {datetime.now().strftime('%H:%M')}\n" + "\n".join(lines)
async def _crypto_poll(b: "RunningBot", chat: Any, gid: int | None, config: dict) -> None:
"""Fetch selected prices and post a snapshot to the channel (the 'stream')."""
coins = config.get("coins") or ["bitcoin", "ethereum"]
currencies = config.get("currencies") or ["usd"]
try:
prices = await asyncio.to_thread(_fetch_crypto, coins, currencies)
except Exception as e:
log.error("crypto fetch error: %s", e)
_append_log(b, f"Crypto fetch error: {e}")
return
msg = _crypto_format(prices, coins, currencies)
if not gid or not msg:
return
try:
await chat.api_send_text_message({"chatType": "group", "chatId": gid}, msg)
_append_log(b, "Crypto prices posted")
except Exception:
log.exception("crypto: failed to post to channel")
async def _run_bot(
profile_id: int,
name: str,
bot_type: str,
db_prefix: str,
config: dict,
on_address: callable,
) -> None:
"""Inner coroutine — runs the simplex-chat event loop for one profile."""
try:
from simplex_chat import ChatApi, SqliteDb
except ImportError:
log.error("simplex-chat Python package not installed. Run: pip install simplex-chat")
return
b = _running[profile_id]
try:
chat = await ChatApi.init(SqliteDb(file_prefix=db_prefix))
b.chat = chat
# libsimplex /_start requires an active user to exist first
user = await chat.api_get_active_user()
if not user:
user = await chat.api_create_active_user(_profile_dict(name, config))
await chat.start_chat()
user_id = user["userId"]
# Sync profile from config so edits made while stopped take effect on start
await _sync_profile(chat, user_id, name, config)
existing = await chat.api_get_user_address(user_id)
if existing:
# api_get_user_address returns UserContactLink; link is nested under connLinkContact
link = existing["connLinkContact"]
else:
# api_create_user_address returns CreatedConnLink directly
link = await chat.api_create_user_address(user_id)
address = link.get("connShortLink") or link.get("connFullLink", "")
b.address = address
await on_address(profile_id, address)
# Configure address settings based on profile type
settings: dict = {"businessAddress": False, "autoAccept": {"acceptIncognito": False}}
if bot_type == "user":
pass # plain user: auto-accept on, no auto-reply
elif bot_type == "business":
# business account: each connecting customer becomes a group chat the
# operator handles in the UI. Optional welcome auto-reply if configured.
settings["businessAddress"] = True
welcome = (config.get("welcome_message") or "").strip()
if welcome:
settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type == "support":
settings["businessAddress"] = True
welcome = config.get("welcome_message", f"Welcome to {name} support.")
settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type == "broadcast":
# auto-reply greets each new contact (default lists allowed publishers)
settings["autoReply"] = {"type": "text", "text": _bc_welcome(config, name)}
elif bot_type == "rss":
welcome = config.get("welcome_message") or f"You're subscribed to {name}."
settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type == "crypto":
welcome = config.get("welcome_message") or f"You're subscribed to {name} crypto prices."
settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type in ("echo", "llm", "directory", "deadmans"):
welcome = config.get("welcome_message", f"Connected to {name}.")
settings["autoReply"] = {"type": "text", "text": welcome}
await chat.api_set_address_settings(user_id, settings)
# Refresh contacts/groups
async def refresh() -> None:
try:
b.contacts = await chat.api_list_contacts(user_id)
groups = await chat.api_list_groups(user_id)
# Classify each group as channel (observer link) vs group (member link)
for g in groups:
await _classify_group(chat, g)
b.groups = groups
except Exception:
log.exception("refresh failed for bot %d", profile_id)
await refresh()
# Dead man's switch state: fire `message` if no check-in within checkin_hours.
dms_interval_s = float(config.get("checkin_hours", 24)) * 3600
dms_last_checkin = datetime.now(timezone.utc)
dms_fired = False
if bot_type == "deadmans":
_append_log(b, f"Dead man's switch armed — deadline in {config.get('checkin_hours', 24)}h")
dir_tick = 0 # directory bots periodically scan for newly-added groups
# RSS bot: ensure a broadcast channel and seed seen-items so we don't replay the
# whole feed on startup; then poll on an interval.
poll_s = float(config.get("poll_seconds", 300))
if bot_type in ("rss", "crypto"):
cname = f"{name} Feed" if bot_type == "rss" else f"{name} Prices"
b.channel_gid = await _ensure_channel(profile_id, b, chat, user_id, cname, config)
if bot_type == "rss":
if b.channel_gid and not config.get("rss_populated"):
# first run for this feed: fill the channel with the latest items so new
# subscribers see content (recent history). Done once, then flagged.
await _rss_poll(b, chat, b.channel_gid, config, seed=False, max_post=20)
config["rss_populated"] = True
import db as _db
_db.update_config(profile_id, config)
else:
# already populated on a previous run — just record current ids, don't replay
await _rss_poll(b, chat, b.channel_gid, config, seed=True)
else: # crypto: post an immediate price snapshot so the channel isn't empty
await _crypto_poll(b, chat, b.channel_gid, config)
b.poll_next = time.time() + poll_s
await refresh()
# Event loop
while True:
evt = await chat.recv_chat_event(500_000)
# RSS bot: poll the feed on its interval and broadcast new items to the channel
if bot_type == "rss" and time.time() >= b.poll_next:
await _rss_poll(b, chat, b.channel_gid, config, seed=False)
b.poll_next = time.time() + poll_s
# Crypto bot: post a fresh price snapshot to the channel on its interval
if bot_type == "crypto" and time.time() >= b.poll_next:
await _crypto_poll(b, chat, b.channel_gid, config)
b.poll_next = time.time() + poll_s
# Directory bot: ~every 30s, refresh groups and register/auto-join new ones
if bot_type == "directory":
dir_tick += 1
if dir_tick >= 60:
dir_tick = 0
await refresh()
await _directory_register_new_groups(b, chat, config, name)
# Dead man's switch: fire once if the check-in window has elapsed
if bot_type == "deadmans" and not dms_fired:
elapsed = (datetime.now(timezone.utc) - dms_last_checkin).total_seconds()
if elapsed > dms_interval_s:
dms_fired = True
n = await _fire_deadmans(chat, user_id, config)
_append_log(b, f"Dead man's switch FIRED → notified {n} contact(s)")
if evt is None:
continue
tag = evt.get("type", "")
b.log_lines.append(f"[{tag}]")
if len(b.log_lines) > 200:
b.log_lines = b.log_lines[-200:]
if tag == "contactConnected":
await refresh()
ct = evt.get("contact", {})
ct_name = ct.get("localDisplayName", "?")
_append_log(b, f"Contact connected: {ct_name}")
# echo replies on message; broadcast/others greet via the auto-reply
# configured in address settings, so nothing to do on connect here.
elif tag == "newChatItems":
items = evt.get("chatItems", [])
for item in items:
ci = item.get("chatItem", {})
# Robust incoming-vs-outgoing: chatDir ".../Rcv" = received,
# ".../Snd" = sent by us. Avoids replying to our own messages.
chat_dir = ci.get("chatDir", {}).get("type", "")
incoming = chat_dir.endswith("Rcv")
if incoming:
content = ci.get("content", {})
mc = content.get("msgContent", {})
text = mc.get("text", "")
chat_info = item.get("chatInfo", {})
_append_log(b, f"Message: {text[:80]}")
# Cross-account notification for any received message
if text:
ci_type = chat_info.get("type")
if ci_type == "direct":
ct = chat_info.get("contact", {})
_notify_ref = ("direct", ct.get("contactId"), ct.get("localDisplayName", ""))
elif ci_type == "group":
gm = ci.get("chatDir", {}).get("groupMember", {}) or {}
_notify_ref = (
"group",
chat_info.get("groupInfo", {}).get("groupId"),
gm.get("localDisplayName", ""),
)
else:
_notify_ref = (None, None, "")
if _notify_ref[1] is not None:
record_notification(
profile_id, name, _notify_ref[0], _notify_ref[1],
_notify_ref[2], text,
)
if bot_type == "echo" and text:
try:
await chat.api_send_text_reply(item, f"Echo: {text}")
except Exception:
pass
elif bot_type == "support" and text:
await _handle_llm_message(
b, chat, config, item, chat_info, text, DEFAULT_SUPPORT_PROMPT
)
elif bot_type == "llm" and text:
await _handle_llm_message(
b, chat, config, item, chat_info, text, DEFAULT_LLM_PROMPT
)
elif bot_type == "directory" and text:
await _handle_directory_message(
b, chat, config, name, item, chat_info, text
)
elif bot_type == "deadmans" and text:
owner = config.get("owner")
sender = chat_info.get("contact", {}).get("localDisplayName", "")
# Any contact checks in, unless an owner is configured
if not owner or sender == owner:
dms_last_checkin = datetime.now(timezone.utc)
dms_fired = False
hrs = config.get("checkin_hours", 24)
try:
await chat.api_send_text_reply(
item, f"✓ Check-in received. Switch reset — next deadline in {hrs}h."
)
except Exception:
pass
elif bot_type == "broadcast":
await _handle_broadcast_message(
b, chat, config, item, chat_info, mc, text
)
except asyncio.CancelledError:
pass
except Exception as e:
log.exception("Bot %d crashed: %s", profile_id, e)
_append_log(b, f"ERROR: {e}")
finally:
if b.chat:
try:
await b.chat.close()
except Exception:
pass
def _append_log(b: RunningBot, line: str) -> None:
b.log_lines.append(line)
if len(b.log_lines) > 200:
b.log_lines = b.log_lines[-200:]
async def _handle_directory_message(
b: RunningBot, chat: Any, config: dict, name: str, item: dict, chat_info: dict, text: str
) -> None:
"""Directory bot: super-user approval commands, and search for everyone else."""
import directory as _dir
safe = safe_name(name)
superusers = config.get("superusers", []) or []
sender = chat_info.get("contact", {}).get("localDisplayName", "")
is_super = sender in superusers
t = text.strip()
async def reply(msg: str) -> None:
try:
await chat.api_send_text_reply(item, msg)
except Exception:
pass
if t.startswith("/"):
parts = t.split()
cmd = parts[0].lower()
if cmd == "/help":
await reply("Send a search term to find groups. Admins: /list, /approve <id>, /reject <id>.")
elif cmd == "/list":
if not is_super:
await reply("Only admins can list pending submissions.")
return
pend = _dir.entries_by_status(safe, "pending")
if not pend:
await reply("No pending submissions.")
else:
await reply("Pending:\n" + "\n".join(
f"#{e['id']} {e['displayName']} ({'channel' if e['is_channel'] else 'group'})" for e in pend
))
elif cmd in ("/approve", "/reject"):
if not is_super:
await reply("Only admins can approve or reject.")
return
if len(parts) < 2 or not parts[1].isdigit():
await reply(f"Usage: {cmd} <id>")
return
eid = int(parts[1])
status = "approved" if cmd == "/approve" else "rejected"
e = _dir.set_status(safe, eid, status)
if not e:
await reply(f"No submission #{eid}.")
return
if status == "approved":
_dir.publish(safe)
await reply(f"✓ Approved #{eid} '{e['displayName']}' — now listed on the directory site.")
else:
await reply(f"Rejected #{eid} '{e['displayName']}'.")
else:
await reply("Unknown command. Send /help for options.")
else:
results = _dir.search(safe, t)
if not results:
await reply(f"No groups found for '{t}'. Send /help for options.")
else:
await reply("Found:\n" + "\n".join(f"{e['displayName']}: {e['link'] or '(no link)'}" for e in results[:10]))
async def _directory_register_new_groups(
b: RunningBot, chat: Any, config: dict, name: str
) -> None:
"""Auto-join groups the directory bot was added to, and register them as pending."""
import directory as _dir
safe = safe_name(name)
superusers = config.get("superusers", []) or []
for g in b.groups:
gid = group_id(g)
membership = g.get("membership") or {}
status = membership.get("memberStatus", "")
role = membership.get("memberRole", "")
if status == "invited":
try:
await chat.api_join_group(gid) # someone added the bot; accept
except Exception:
pass
continue # registered on a later tick once it syncs
if role != "owner" and not _dir.find_by_group(_dir.load_state(safe), gid):
link = ""
try:
link = await chat.api_get_group_link_str(gid)
except Exception:
pass
entry, is_new = _dir.add_pending(
safe, gid, group_name(g), link, bool(g.get("is_channel")),
g.get("groupSummary") or {}, (g.get("groupProfile") or {}).get("shortDescr"), "group",
)
if is_new:
_append_log(b, f"Directory: registered pending #{entry['id']} '{entry['displayName']}'")
for c in b.contacts:
if c["localDisplayName"] in superusers:
try:
await chat.api_send_text_message(
{"chatType": "direct", "chatId": c["contactId"]},
f"New directory submission #{entry['id']}: '{entry['displayName']}'. "
f"/approve {entry['id']} or /reject {entry['id']}.",
)
except Exception:
pass
async def _fire_deadmans(chat: Any, user_id: int, config: dict) -> int:
"""Send the dead-man message to recipients (named, or all contacts). Returns count sent."""
message = config.get("message") or "Dead man's switch triggered — no check-in was received."
recipients = config.get("recipients") or [] # list of display names; empty = all contacts
try:
contacts = await chat.api_list_contacts(user_id)
except Exception:
return 0
targets = contacts if not recipients else [c for c in contacts if c["localDisplayName"] in recipients]
sent = 0
for c in targets:
try:
await chat.api_send_text_message({"chatType": "direct", "chatId": c["contactId"]}, message)
sent += 1
except Exception:
pass
return sent
async def _handle_llm_message(
b: RunningBot, chat: Any, config: dict, item: dict, chat_info: dict, text: str,
default_prompt: str = DEFAULT_LLM_PROMPT,
) -> None:
"""Reply to an incoming message via the configured OpenAI-compatible LLM
(works with Ollama's `ollama serve` at http://localhost:11434/v1, OpenAI, Grok…).
`system_prompt` (the startup context), `api_base` (the API URL), `model` and an
optional `api_key` come from config. If no api_base is set the bot stays silent.
"""
api_base = (config.get("api_base") or "").strip()
if not api_base:
return # no LLM configured for this bot
contact_id = chat_info.get("contact", {}).get("contactId")
system_prompt = config.get("system_prompt") or default_prompt
model = config.get("model") or "grok-2"
api_key = config.get("api_key") or ""
# Maintain a short rolling history per contact for conversational context.
hist = b.histories.setdefault(contact_id, [])
hist.append({"role": "user", "content": text})
if len(hist) > 20:
del hist[:-20]
messages = [{"role": "system", "content": system_prompt}, *hist]
try:
reply = await llm_chat(api_base, api_key, model, messages)
except Exception as e:
log.error("LLM error: %s", e)
_append_log(b, f"LLM error: {e}")
try:
await chat.api_send_text_reply(
item, "Sorry, I'm having trouble responding right now. Please try again shortly."
)
except Exception:
pass
return
hist.append({"role": "assistant", "content": reply})
_append_log(b, f"LLM reply: {reply[:80]}")
try:
await chat.api_send_text_reply(item, reply)
except Exception:
log.exception("failed to send LLM reply")
async def global_status() -> dict:
"""Aggregate manager-wide status: running profiles, totals, and server config.
Server/network info (SMP+XFTP counts, operators) is read from the first running
profile — these are the shared SimpleX presets, so one profile represents all.
"""
running = [b for b in _running.values() if not b.task.done()]
status = {
"profiles_running": len(running),
"contacts": sum(len(b.contacts) for b in running),
"groups": sum(len(b.groups) for b in running),
"smp_servers": 0,
"xftp_servers": 0,
"operators": [],
"proxy_mode": "",
}
for b in running:
if not b.chat:
continue
try:
user = await b.chat.api_get_active_user()
r = await b.chat.send_chat_cmd(f"/_servers {user['userId']}")
for op in r.get("userServers", []):
o = op.get("operator") or {}
if o.get("enabled") and o.get("tradeName"):
status["operators"].append(o["tradeName"])
status["smp_servers"] += sum(
1 for s in op.get("smpServers", []) if s.get("enabled") and not s.get("deleted")
)
status["xftp_servers"] += sum(
1 for s in op.get("xftpServers", []) if s.get("enabled") and not s.get("deleted")
)
nc = await b.chat.send_chat_cmd("/network")
status["proxy_mode"] = nc.get("networkConfig", {}).get("smpProxyMode", "")
break # one running profile is representative
except Exception:
log.exception("global_status server read failed")
continue
return status
def _server_host(s: str) -> str:
"""Extract the readable host from a server URI like smp://key@host1,host2.onion."""
try:
after = s.split("://", 1)[1] if "://" in s else s
if "@" in after:
after = after.split("@", 1)[1]
return after.split(",")[0]
except Exception:
return s
def _server_row(s: dict) -> dict:
return {
"host": _server_host(s.get("server", "")),
"enabled": bool(s.get("enabled")),
"preset": bool(s.get("preset")),
"deleted": bool(s.get("deleted")),
}
def _first_running_chat():
for b in _running.values():
if not b.task.done() and b.chat:
return b
return None
async def get_servers_detail() -> dict:
"""Full per-operator SMP/XFTP server breakdown + network config (first running profile)."""
b = _first_running_chat()
if not b:
return {"profile_name": None, "operators": [], "network": {}}
try:
user = await b.chat.api_get_active_user()
r = await b.chat.send_chat_cmd(f"/_servers {user['userId']}")
nc = await b.chat.send_chat_cmd("/network")
operators = []
for op in r.get("userServers", []):
o = op.get("operator") or {}
operators.append({
"name": o.get("tradeName") or "Custom",
"enabled": bool(o.get("enabled")),
"smp": [_server_row(s) for s in op.get("smpServers", [])],
"xftp": [_server_row(s) for s in op.get("xftpServers", [])],
})
return {"profile_name": b.name, "operators": operators, "network": nc.get("networkConfig", {})}
except Exception:
log.exception("get_servers_detail failed")
return {"profile_name": None, "operators": [], "network": {}}
async def get_network_config() -> dict:
"""Just the networkConfig (proxy/host/session modes) from the first running profile."""
b = _first_running_chat()
if not b:
return {}
try:
nc = await b.chat.send_chat_cmd("/network")
return nc.get("networkConfig", {})
except Exception:
log.exception("get_network_config failed")
return {}
def _profile_dict(name: str, config: dict) -> dict:
"""Build a SimpleX Profile dict (displayName/fullName/shortDescr/image) from config."""
profile: dict = {"displayName": name, "fullName": config.get("full_name", "")}
if config.get("bio"):
profile["shortDescr"] = config["bio"]
if config.get("avatar"):
profile["image"] = config["avatar"] # base64 data URI
return profile
async def _sync_profile(chat: Any, user_id: int, name: str, config: dict) -> None:
"""Push the config-derived profile to the live account. No-op if unchanged."""
try:
await chat.api_update_profile(user_id, _profile_dict(name, config))
except Exception:
log.exception("profile sync failed")
async def update_profile(
profile_id: int, full_name: str | None, bio: str | None, avatar: str | None
) -> dict:
"""Persist profile edits to the manager DB and apply them live if running.
`avatar` is a base64 data URI; pass None to leave the existing avatar unchanged,
or an empty string to remove it. Returns the updated config dict.
"""
import db as _db
prof = _db.get_profile(profile_id)
if not prof:
raise RuntimeError("Profile not found")
config = json.loads(prof.get("config") or "{}")
if full_name is not None:
config["full_name"] = full_name
if bio is not None:
config["bio"] = bio
if avatar is not None:
if avatar:
config["avatar"] = avatar
else:
config.pop("avatar", None)
_db.update_config(profile_id, config)
b = get_running(profile_id)
if b and b.chat:
user = await b.chat.api_get_active_user()
if user:
await _sync_profile(b.chat, user["userId"], prof["name"], config)
return config
async def _classify_group(chat: Any, g: dict) -> None:
"""Annotate a GroupInfo in place with link info: is_channel, link.
A channel is a group whose join-link role is "observer". Groups with a
"member"+ link (or no link at all) are regular 2-way groups.
"""
g["is_channel"] = False
g["link"] = ""
try:
link_obj = await chat.api_get_group_link(g["groupId"])
except Exception:
return # no link → plain private group
g["is_channel"] = link_obj.get("acceptMemberRole") == "observer"
conn = link_obj.get("connLinkContact", {})
g["link"] = conn.get("connShortLink") or conn.get("connFullLink", "")
async def create_channel(profile_id: int, name: str) -> str:
"""Create a group with an observer join link (a SimpleX channel). Returns the link."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
user = await b.chat.api_get_active_user()
if not user:
raise RuntimeError("No active user for this profile")
info = await b.chat.api_new_group(user["userId"], {"displayName": name, "fullName": ""})
link = await b.chat.api_create_group_link(info["groupId"], "observer")
# Refresh cached group list (re-classifies all groups including the new channel)
try:
groups = await b.chat.api_list_groups(user["userId"])
for g in groups:
await _classify_group(b.chat, g)
b.groups = groups
except Exception:
log.exception("group refresh after create_channel failed")
return link
async def create_group(profile_id: int, name: str) -> str:
"""Create a regular 2-way group with a member join link. Returns the link."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
user = await b.chat.api_get_active_user()
if not user:
raise RuntimeError("No active user for this profile")
info = await b.chat.api_new_group(user["userId"], {"displayName": name, "fullName": ""})
link = await b.chat.api_create_group_link(info["groupId"], "member")
try:
groups = await b.chat.api_list_groups(user["userId"])
for g in groups:
await _classify_group(b.chat, g)
b.groups = groups
except Exception:
log.exception("group refresh after create_group failed")
return link
async def get_group_members(profile_id: int, gid: int) -> list[dict]:
"""Return the member list for a group/channel.
api_list_members excludes the user themselves, so we prepend our own
membership — this makes the dialog count match groupSummary.currentMembers.
"""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
members = await b.chat.api_list_members(gid)
result = [
{"name": m["localDisplayName"], "role": m["memberRole"], "status": m["memberStatus"]}
for m in members
]
# Prepend ourselves (the owner), pulled from the cached group's membership.
for g in b.groups:
if group_id(g) == gid:
me = g.get("membership") or {}
if me:
result.insert(0, {
"name": me.get("localDisplayName", "you") + " (you)",
"role": me.get("memberRole", "owner"),
"status": me.get("memberStatus", ""),
})
break
return result
async def join_group(profile_id: int, gid: int) -> bool:
"""Accept a pending group invitation (memberStatus 'invited' -> joined)."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_join_group(gid)
await refresh_lists(profile_id)
return True
async def leave_group(profile_id: int, gid: int) -> bool:
"""Leave a group/channel (api_leave_group), then refresh the cached lists."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_leave_group(gid)
await refresh_lists(profile_id)
return True
async def delete_group(profile_id: int, gid: int) -> bool:
"""Delete a group/channel entirely (owner action; delete mode 'full', notifies members)."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
await b.chat.api_delete_chat("group", gid, {"type": "full", "notify": True})
await refresh_lists(profile_id)
return True
async def get_group_link(profile_id: int, gid: int) -> str:
"""Return the existing join link for a group/channel (empty if none)."""
b = get_running(profile_id)
if not b or not b.chat:
raise RuntimeError("Profile is not running")
try:
return await b.chat.api_get_group_link_str(gid)
except Exception:
return ""