From 908d16bfc3a39af4b4aaedbe15cf96e105276515 Mon Sep 17 00:00:00 2001 From: Jon Date: Fri, 5 Jun 2026 21:08:05 +0100 Subject: [PATCH] Add 'rss' bot: broadcasts an RSS/Atom feed to a channel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New rss bot type: on start it creates a broadcast channel (observer group with recent history on) and polls a configured feed URL; new posts are broadcast to the channel. Subscribers join the channel link (seen on the bot's profile); direct contacts get a welcome + the latest items and can send /new for the latest. Stdlib-only feed parsing (urllib + ElementTree), seeds existing items on startup so it doesn't replay the whole feed. Config: feed_url, poll_seconds. Adds rss_test.py (mock feed) — passes. Co-Authored-By: Claude Opus 4.8 --- manager/profiles.py | 145 +++++++++++++++++++++++++++++++++++- manager/rss_test.py | 133 +++++++++++++++++++++++++++++++++ manager/templates/list.html | 26 +++++++ 3 files changed, 302 insertions(+), 2 deletions(-) create mode 100644 manager/rss_test.py diff --git a/manager/profiles.py b/manager/profiles.py index f9adfe8..f50062a 100644 --- a/manager/profiles.py +++ b/manager/profiles.py @@ -3,7 +3,9 @@ import asyncio import json import logging +import time import urllib.request +import xml.etree.ElementTree as ET from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path @@ -136,7 +138,7 @@ def group_member_count(g: dict) -> int: return g.get("groupSummary", {}).get("currentMembers", 0) -BOT_TYPES = ["echo", "llm", "broadcast", "support", "directory", "deadmans"] +BOT_TYPES = ["echo", "llm", "rss", "broadcast", "support", "directory", "deadmans"] USER_TYPES = ["user"] BUSINESS_TYPES = ["business"] # cli accounts with a business address (per-customer group chats) ALL_TYPES = BOT_TYPES + USER_TYPES + BUSINESS_TYPES @@ -155,6 +157,11 @@ class RunningBot: chat: Any = None # simplex_chat ChatApi instance # Per-contact LLM conversation history (contactId → [{role, content}, ...]) histories: dict[int, list[dict]] = field(default_factory=dict) + # RSS bot state + rss_seen: set = field(default_factory=set) # entry ids already posted + rss_items: list = field(default_factory=list) # latest fetched entries (newest first) + rss_next_poll: float = 0.0 + rss_gid: int | None = None # broadcast channel group id # profile_id → RunningBot @@ -426,6 +433,110 @@ async def _handle_broadcast_message( await reply("Broadcast failed.") +# ── RSS bot helpers ────────────────────────────────────────────────────────────── +def _fetch_feed(url: str) -> list[dict]: + """Fetch + parse an RSS 2.0 or Atom feed → newest-first list of {id,title,link}. + Uses only the stdlib (urllib + ElementTree), no extra dependencies.""" + req = urllib.request.Request(url, headers={"User-Agent": "simplex-rss-bot/1.0"}) + with urllib.request.urlopen(req, timeout=20) as r: # noqa: S310 - user-configured feed + data = r.read() + root = ET.fromstring(data) + out: list[dict] = [] + items = root.findall(".//item") # RSS 2.0 + if items: + for it in items: + title = (it.findtext("title") or "").strip() + link = (it.findtext("link") or "").strip() + eid = (it.findtext("guid") or link or title).strip() + out.append({"id": eid, "title": title, "link": link}) + else: # Atom + ns = "{http://www.w3.org/2005/Atom}" + for e in root.findall(f".//{ns}entry"): + title = (e.findtext(f"{ns}title") or "").strip() + le = e.find(f"{ns}link") + link = (le.get("href") if le is not None else "") or "" + eid = (e.findtext(f"{ns}id") or link or title).strip() + out.append({"id": eid, "title": title, "link": link}) + return out + + +def _rss_format(e: dict) -> str: + title = e.get("title") or "(untitled)" + return f"{title}\n{e['link']}" if e.get("link") else title + + +async def _rss_ensure_channel(profile_id: int, b: "RunningBot", chat: Any, user_id: int, + name: str, config: dict) -> int | None: + """Find or create the broadcast channel (observer group) for this feed; persist its id.""" + import db as _db + gid = config.get("channel_gid") + if gid: + try: + groups = await chat.api_list_groups(user_id) + if any(group_id(g) == gid for g in groups): + return gid + except Exception: + return gid # assume still valid if the lookup failed + try: + info = await chat.api_new_group(user_id, { + "displayName": f"{name} Feed", "fullName": "", + "groupPreferences": {"history": {"enable": "on"}}, # new subscribers see recent posts + }) + gid = info["groupId"] + await chat.api_create_group_link(gid, "observer") # channel = observer link + config["channel_gid"] = gid + _db.update_config(profile_id, config) + _append_log(b, f"RSS channel created (group {gid})") + return gid + except Exception: + log.exception("rss: failed to create channel") + return None + + +async def _rss_poll(b: "RunningBot", chat: Any, gid: int | None, config: dict, seed: bool) -> None: + """Fetch the feed; on the seed run just record existing ids, otherwise broadcast new items.""" + url = (config.get("feed_url") or "").strip() + if not url: + return + try: + entries = await asyncio.to_thread(_fetch_feed, url) + except Exception as e: + log.error("rss fetch error: %s", e) + _append_log(b, f"RSS fetch error: {e}") + return + b.rss_items = entries + new = [e for e in entries if e["id"] not in b.rss_seen] + for e in new: + b.rss_seen.add(e["id"]) + if seed: + _append_log(b, f"RSS seeded {len(entries)} existing item(s)") + return + if not gid or not new: + return + for e in reversed(new): # post oldest → newest + try: + await chat.api_send_text_message({"chatType": "group", "chatId": gid}, _rss_format(e)) + except Exception: + log.exception("rss: failed to post to channel") + _append_log(b, f"RSS posted {len(new)} new item(s)") + + +async def _rss_send_latest(chat: Any, item: dict, b: "RunningBot", n: int = 3) -> None: + """Reply to a direct request (e.g. /new) with the latest feed items.""" + items = b.rss_items[:n] + if not items: + try: + await chat.api_send_text_reply(item, "No items yet — check back soon.") + except Exception: + pass + return + for e in items: + try: + await chat.api_send_text_reply(item, _rss_format(e)) + except Exception: + pass + + async def _run_bot( profile_id: int, name: str, @@ -487,6 +598,9 @@ async def _run_bot( elif bot_type == "broadcast": # auto-reply greets each new contact (default lists allowed publishers) settings["autoReply"] = {"type": "text", "text": _bc_welcome(config, name)} + elif bot_type == "rss": + welcome = config.get("welcome_message") or f"Subscribed to {name}. Send /new for the latest posts." + settings["autoReply"] = {"type": "text", "text": welcome} elif bot_type in ("echo", "llm", "directory", "deadmans"): welcome = config.get("welcome_message", f"Connected to {name}.") settings["autoReply"] = {"type": "text", "text": welcome} @@ -516,10 +630,24 @@ async def _run_bot( dir_tick = 0 # directory bots periodically scan for newly-added groups + # RSS bot: ensure a broadcast channel and seed seen-items so we don't replay the + # whole feed on startup; then poll on an interval. + rss_poll_s = float(config.get("poll_seconds", 300)) + if bot_type == "rss": + b.rss_gid = await _rss_ensure_channel(profile_id, b, chat, user_id, name, config) + await _rss_poll(b, chat, b.rss_gid, config, seed=True) + b.rss_next_poll = time.time() + rss_poll_s + await refresh() + # Event loop while True: evt = await chat.recv_chat_event(500_000) + # RSS bot: poll the feed on its interval and broadcast new items to the channel + if bot_type == "rss" and time.time() >= b.rss_next_poll: + await _rss_poll(b, chat, b.rss_gid, config, seed=False) + b.rss_next_poll = time.time() + rss_poll_s + # Directory bot: ~every 30s, refresh groups and register/auto-join new ones if bot_type == "directory": dir_tick += 1 @@ -551,7 +679,16 @@ async def _run_bot( _append_log(b, f"Contact connected: {ct_name}") # echo replies on message; broadcast/others greet via the auto-reply - # configured in address settings, so nothing to do on connect here. + # configured in address settings. RSS also sends the latest items on connect. + if bot_type == "rss": + cid = ct.get("contactId") + if cid is not None: # send the latest items directly to the new contact + for e in b.rss_items[:3]: + try: + await chat.api_send_text_message( + {"chatType": "direct", "chatId": cid}, _rss_format(e)) + except Exception: + pass elif tag == "newChatItems": items = evt.get("chatItems", []) @@ -606,6 +743,10 @@ async def _run_bot( b, chat, config, item, chat_info, text, DEFAULT_LLM_PROMPT ) + elif bot_type == "rss" and text: + if text.strip().lower() == "/new": + await _rss_send_latest(chat, item, b) + elif bot_type == "directory" and text: await _handle_directory_message( b, chat, config, name, item, chat_info, text diff --git a/manager/rss_test.py b/manager/rss_test.py new file mode 100644 index 0000000..5a4d799 --- /dev/null +++ b/manager/rss_test.py @@ -0,0 +1,133 @@ +"""End-to-end test of the RSS bot (Pattern 3, in-process FFI). + +Serves a mock RSS feed locally, starts an rss bot pointed at it, and checks: + - the bot creates a broadcast channel (observer group) + - the initial feed item is seeded (not re-posted) + - a newly-added feed item is broadcast to the channel + +Verifies via the bot's own view of the channel chat (no subscriber needed). + +Run: .venv/bin/python rss_test.py +""" + +import asyncio +import json +import sys +import threading +import time +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent)) +import profiles as pm # noqa: E402 + +DATA = Path("data") +BOT_PREFIX = str(DATA / "rsstest_bot") +BOT_PID = 99004 + +# mutable feed state the mock server serves +FEED = {"items": [("First post", "https://example.com/1")]} + + +def feed_xml(): + items = "".join( + f"{t}{l}{l}" + for t, l in FEED["items"] + ) + return (f'' + f'Test Feed{items}').encode() + + +class FeedHandler(BaseHTTPRequestHandler): + def do_GET(self): + body = feed_xml() + self.send_response(200) + self.send_header("Content-Type", "application/rss+xml") + self.end_headers() + self.wfile.write(body) + + def log_message(self, *a): + pass + + +def cleanup(): + for p in DATA.glob("rsstest_bot_*"): + p.unlink() + bf = DATA / "manager.db" # leave the manager db alone; we don't use it here + + +async def wait_until(fn, timeout=120, every=1): + start = time.time() + while time.time() - start < timeout: + v = await fn() + if v: + return v + await asyncio.sleep(every) + return None + + +async def channel_texts(chat, gid): + c = await chat.api_get_chat("group", gid, 50) + out = [] + for ci in c.get("chatItems", []): + out.append(ci.get("content", {}).get("msgContent", {}).get("text", "")) + return out + + +async def main() -> int: + cleanup() + srv = HTTPServer(("127.0.0.1", 0), FeedHandler) + port = srv.server_address[1] + threading.Thread(target=srv.serve_forever, daemon=True).start() + url = f"http://127.0.0.1:{port}/feed.xml" + print("mock feed at", url) + + async def on_address(pid, addr): + pass + + profile = { + "id": BOT_PID, "name": "rsstestbot", "bot_type": "rss", + "db_prefix": BOT_PREFIX, + "config": json.dumps({"feed_url": url, "poll_seconds": 5}), + } + ok = True + try: + await pm.start_bot(profile, on_address) + b = pm.get_running(BOT_PID) + + # 1) channel created + gid = await wait_until(lambda: asyncio.sleep(0, b.rss_gid), timeout=90) + print("channel created:", bool(gid), "gid", gid) + assert gid, "rss bot did not create a channel" + + # 2) the first item was seeded, not posted + await asyncio.sleep(2) + texts = await channel_texts(b.chat, gid) + assert not any("First post" in t for t in texts), "seeded item was wrongly broadcast" + print("seed OK (first item not broadcast)") + + # 3) add a new item → it should be broadcast on the next poll + FEED["items"].insert(0, ("Breaking news", "https://example.com/2")) + got = await wait_until( + lambda: _contains(channel_texts(b.chat, gid), "Breaking news"), timeout=30, every=2 + ) + print("new item broadcast to channel:", bool(got)) + ok = bool(got) + except AssertionError as e: + ok = False + print("ASSERT FAIL:", e) + finally: + await pm.stop_bot(BOT_PID) + srv.shutdown() + cleanup() + + print("\nRESULT:", "PASS — rss bot broadcasts new feed posts" if ok else "FAIL") + return 0 if ok else 1 + + +async def _contains(coro, needle): + return any(needle in (t or "") for t in await coro) + + +if __name__ == "__main__": + raise SystemExit(asyncio.run(main())) diff --git a/manager/templates/list.html b/manager/templates/list.html index dc7377c..1bc37c9 100644 --- a/manager/templates/list.html +++ b/manager/templates/list.html @@ -49,6 +49,7 @@ + @@ -224,6 +225,23 @@ + {% endif %}
echoRepeats every message back to the sender — handy for testing a connection end to end.
llmChat with a local or remote LLM (OpenAI-compatible, e.g. Ollama). Give it context, it replies to your messages.
rssWatches an RSS/Atom feed and broadcasts new posts to a channel it creates. Subscribers join the channel; send /new for the latest.
broadcastRelays messages from authorized publishers out to all of the bot's contacts.
supportBusiness inbox — auto-replies with a welcome message and collects incoming inquiries.
directoryDirectory service for discovering and listing groups or contacts.