Broadcast bot: parity with official simplex-broadcast-bot

Relay publishers' text/links to all contacts via the native /feed command
(reports 'Forwarded to N contact(s), M errors'); reply to non-publishers with
the prohibited message and internally delete their message (CIDMInternal, as
upstream does). Filter content to text/links. Publishers accept 'Name' or
'ID:Name'; welcome/prohibited defaults list the publishers. Add publishers +
prohibited-reply fields to the create form. Adds broadcast_test.py (3 in-process
controllers: bot + publisher + subscriber) — passes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Jon
2026-06-05 16:27:56 +01:00
parent c1bb9cb955
commit 6232c1589d
3 changed files with 274 additions and 23 deletions

View File

@@ -323,6 +323,104 @@ async def delete_contact(profile_id: int, contact_id: int) -> bool:
return True
# ── Broadcast bot helpers (mirror the official simplex-broadcast-bot) ────────────
# Publishers are configured as a list of "Name" or "ID:Name" strings. The official
# bot matches a KnownContact by contactId AND display name; we match on either, so
# names alone (what the UI collects) keep working.
def _parse_publishers(pubs: list) -> tuple[set[int], set[str]]:
ids: set[int] = set()
names: set[str] = set()
for p in pubs or []:
s = str(p).strip()
if not s:
continue
if ":" in s:
left, right = s.split(":", 1)
if left.strip().isdigit():
ids.add(int(left.strip()))
names.add(right.strip())
continue
names.add(s)
return ids, names
def _publisher_names(pubs: list) -> str:
_, names = _parse_publishers(pubs)
return ", ".join(sorted(names)) if names else "(no publishers configured)"
def _bc_welcome(config: dict, name: str) -> str:
w = (config.get("welcome_message") or "").strip()
if w:
return w
return (
"Hello! I am a broadcast bot.\n"
f"I broadcast messages to all connected users from {_publisher_names(config.get('publishers', []))}."
)
def _bc_prohibited(config: dict) -> str:
p = (config.get("prohibited_message") or "").strip()
if p:
return p
return (
f"Sorry, only these users can broadcast messages: {_publisher_names(config.get('publishers', []))}. "
"Your message is deleted."
)
# Content types the broadcast bot will relay (matches the official allowlist).
_BC_ALLOWED_CONTENT = {"text", "link"}
async def _handle_broadcast_message(
b: "RunningBot", chat: Any, config: dict, item: dict, chat_info: dict, mc: dict, text: str
) -> None:
"""Mirror simplex-broadcast-bot: publishers' messages go to all contacts; everyone
else gets the prohibited reply and their message is deleted."""
ids, names = _parse_publishers(config.get("publishers", []))
ct = chat_info.get("contact", {})
sender_id = ct.get("contactId")
sender_name = ct.get("localDisplayName", "")
is_publisher = (sender_id in ids) or (sender_name in names)
async def reply(msg: str) -> None:
try:
await chat.api_send_text_reply(item, msg)
except Exception:
pass
if not is_publisher:
await reply(_bc_prohibited(config))
item_id = item.get("chatItem", {}).get("meta", {}).get("itemId")
if sender_id is not None and item_id is not None:
try: # internal delete (a received message can't be deleted for the sender)
await chat.api_delete_chat_items("direct", sender_id, [item_id], "internal")
except Exception:
log.exception("broadcast: failed to delete non-publisher message")
return
if mc.get("type") not in _BC_ALLOWED_CONTENT or not text:
await reply("Message is not supported (text and links only).")
return
# Native broadcast: one /feed command fans out to every contact and reports counts.
try:
r = await chat.send_chat_cmd(f"/feed {text}")
except Exception as e:
log.error("broadcast /feed error: %s", e)
await reply("Could not broadcast right now, please try again.")
return
if isinstance(r, dict) and r.get("type") == "broadcastSent":
s, f = r.get("successes", 0), r.get("failures", 0)
_append_log(b, f"Broadcast → {s} ok, {f} errors")
await reply(f"Forwarded to {s} contact(s), {f} errors")
else:
log.error("broadcast unexpected response: %s", r)
await reply("Broadcast failed.")
async def _run_bot(
profile_id: int,
name: str,
@@ -374,7 +472,10 @@ async def _run_bot(
settings["businessAddress"] = True
welcome = config.get("welcome_message", f"Welcome to {name} support.")
settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type in ("echo", "broadcast", "directory", "deadmans"):
elif bot_type == "broadcast":
# auto-reply greets each new contact (default lists allowed publishers)
settings["autoReply"] = {"type": "text", "text": _bc_welcome(config, name)}
elif bot_type in ("echo", "directory", "deadmans"):
welcome = config.get("welcome_message", f"Connected to {name}.")
settings["autoReply"] = {"type": "text", "text": welcome}
@@ -437,16 +538,8 @@ async def _run_bot(
ct_name = ct.get("localDisplayName", "?")
_append_log(b, f"Contact connected: {ct_name}")
if bot_type == "echo":
pass # echo handled on message
elif bot_type == "broadcast":
welcome = config.get("welcome_message", "You are subscribed.")
try:
await chat.api_send_text_message(
{"chatType": "direct", "chatId": ct["contactId"]}, welcome
)
except Exception:
pass
# echo replies on message; broadcast/others greet via the auto-reply
# configured in address settings, so nothing to do on connect here.
elif tag == "newChatItems":
items = evt.get("chatItems", [])
@@ -517,18 +610,9 @@ async def _run_bot(
pass
elif bot_type == "broadcast":
publishers = config.get("publishers", [])
sender = chat_info.get("contact", {}).get("localDisplayName", "")
if sender in publishers and text:
# broadcast to all contacts
contacts = await chat.api_list_contacts(user_id)
for c in contacts:
try:
await chat.api_send_text_message(
{"chatType": "direct", "chatId": c["contactId"]}, text
)
except Exception:
pass
await _handle_broadcast_message(
b, chat, config, item, chat_info, mc, text
)
except asyncio.CancelledError:
pass