diff --git a/manager/profiles.py b/manager/profiles.py
index f9adfe8..f50062a 100644
--- a/manager/profiles.py
+++ b/manager/profiles.py
@@ -3,7 +3,9 @@
import asyncio
import json
import logging
+import time
import urllib.request
+import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
@@ -136,7 +138,7 @@ def group_member_count(g: dict) -> int:
return g.get("groupSummary", {}).get("currentMembers", 0)
-BOT_TYPES = ["echo", "llm", "broadcast", "support", "directory", "deadmans"]
+BOT_TYPES = ["echo", "llm", "rss", "broadcast", "support", "directory", "deadmans"]
USER_TYPES = ["user"]
BUSINESS_TYPES = ["business"] # cli accounts with a business address (per-customer group chats)
ALL_TYPES = BOT_TYPES + USER_TYPES + BUSINESS_TYPES
@@ -155,6 +157,11 @@ class RunningBot:
chat: Any = None # simplex_chat ChatApi instance
# Per-contact LLM conversation history (contactId → [{role, content}, ...])
histories: dict[int, list[dict]] = field(default_factory=dict)
+ # RSS bot state
+ rss_seen: set = field(default_factory=set) # entry ids already posted
+ rss_items: list = field(default_factory=list) # latest fetched entries (newest first)
+ rss_next_poll: float = 0.0
+ rss_gid: int | None = None # broadcast channel group id
# profile_id → RunningBot
@@ -426,6 +433,110 @@ async def _handle_broadcast_message(
await reply("Broadcast failed.")
+# ── RSS bot helpers ──────────────────────────────────────────────────────────────
+def _fetch_feed(url: str) -> list[dict]:
+ """Fetch + parse an RSS 2.0 or Atom feed → newest-first list of {id,title,link}.
+ Uses only the stdlib (urllib + ElementTree), no extra dependencies."""
+ req = urllib.request.Request(url, headers={"User-Agent": "simplex-rss-bot/1.0"})
+ with urllib.request.urlopen(req, timeout=20) as r: # noqa: S310 - user-configured feed
+ data = r.read()
+ root = ET.fromstring(data)
+ out: list[dict] = []
+ items = root.findall(".//item") # RSS 2.0
+ if items:
+ for it in items:
+ title = (it.findtext("title") or "").strip()
+ link = (it.findtext("link") or "").strip()
+ eid = (it.findtext("guid") or link or title).strip()
+ out.append({"id": eid, "title": title, "link": link})
+ else: # Atom
+ ns = "{http://www.w3.org/2005/Atom}"
+ for e in root.findall(f".//{ns}entry"):
+ title = (e.findtext(f"{ns}title") or "").strip()
+ le = e.find(f"{ns}link")
+ link = (le.get("href") if le is not None else "") or ""
+ eid = (e.findtext(f"{ns}id") or link or title).strip()
+ out.append({"id": eid, "title": title, "link": link})
+ return out
+
+
+def _rss_format(e: dict) -> str:
+ title = e.get("title") or "(untitled)"
+ return f"{title}\n{e['link']}" if e.get("link") else title
+
+
+async def _rss_ensure_channel(profile_id: int, b: "RunningBot", chat: Any, user_id: int,
+ name: str, config: dict) -> int | None:
+ """Find or create the broadcast channel (observer group) for this feed; persist its id."""
+ import db as _db
+ gid = config.get("channel_gid")
+ if gid:
+ try:
+ groups = await chat.api_list_groups(user_id)
+ if any(group_id(g) == gid for g in groups):
+ return gid
+ except Exception:
+ return gid # assume still valid if the lookup failed
+ try:
+ info = await chat.api_new_group(user_id, {
+ "displayName": f"{name} Feed", "fullName": "",
+ "groupPreferences": {"history": {"enable": "on"}}, # new subscribers see recent posts
+ })
+ gid = info["groupId"]
+ await chat.api_create_group_link(gid, "observer") # channel = observer link
+ config["channel_gid"] = gid
+ _db.update_config(profile_id, config)
+ _append_log(b, f"RSS channel created (group {gid})")
+ return gid
+ except Exception:
+ log.exception("rss: failed to create channel")
+ return None
+
+
+async def _rss_poll(b: "RunningBot", chat: Any, gid: int | None, config: dict, seed: bool) -> None:
+ """Fetch the feed; on the seed run just record existing ids, otherwise broadcast new items."""
+ url = (config.get("feed_url") or "").strip()
+ if not url:
+ return
+ try:
+ entries = await asyncio.to_thread(_fetch_feed, url)
+ except Exception as e:
+ log.error("rss fetch error: %s", e)
+ _append_log(b, f"RSS fetch error: {e}")
+ return
+ b.rss_items = entries
+ new = [e for e in entries if e["id"] not in b.rss_seen]
+ for e in new:
+ b.rss_seen.add(e["id"])
+ if seed:
+ _append_log(b, f"RSS seeded {len(entries)} existing item(s)")
+ return
+ if not gid or not new:
+ return
+ for e in reversed(new): # post oldest → newest
+ try:
+ await chat.api_send_text_message({"chatType": "group", "chatId": gid}, _rss_format(e))
+ except Exception:
+ log.exception("rss: failed to post to channel")
+ _append_log(b, f"RSS posted {len(new)} new item(s)")
+
+
+async def _rss_send_latest(chat: Any, item: dict, b: "RunningBot", n: int = 3) -> None:
+ """Reply to a direct request (e.g. /new) with the latest feed items."""
+ items = b.rss_items[:n]
+ if not items:
+ try:
+ await chat.api_send_text_reply(item, "No items yet — check back soon.")
+ except Exception:
+ pass
+ return
+ for e in items:
+ try:
+ await chat.api_send_text_reply(item, _rss_format(e))
+ except Exception:
+ pass
+
+
async def _run_bot(
profile_id: int,
name: str,
@@ -487,6 +598,9 @@ async def _run_bot(
elif bot_type == "broadcast":
# auto-reply greets each new contact (default lists allowed publishers)
settings["autoReply"] = {"type": "text", "text": _bc_welcome(config, name)}
+ elif bot_type == "rss":
+ welcome = config.get("welcome_message") or f"Subscribed to {name}. Send /new for the latest posts."
+ settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type in ("echo", "llm", "directory", "deadmans"):
welcome = config.get("welcome_message", f"Connected to {name}.")
settings["autoReply"] = {"type": "text", "text": welcome}
@@ -516,10 +630,24 @@ async def _run_bot(
dir_tick = 0 # directory bots periodically scan for newly-added groups
+ # RSS bot: ensure a broadcast channel and seed seen-items so we don't replay the
+ # whole feed on startup; then poll on an interval.
+ rss_poll_s = float(config.get("poll_seconds", 300))
+ if bot_type == "rss":
+ b.rss_gid = await _rss_ensure_channel(profile_id, b, chat, user_id, name, config)
+ await _rss_poll(b, chat, b.rss_gid, config, seed=True)
+ b.rss_next_poll = time.time() + rss_poll_s
+ await refresh()
+
# Event loop
while True:
evt = await chat.recv_chat_event(500_000)
+ # RSS bot: poll the feed on its interval and broadcast new items to the channel
+ if bot_type == "rss" and time.time() >= b.rss_next_poll:
+ await _rss_poll(b, chat, b.rss_gid, config, seed=False)
+ b.rss_next_poll = time.time() + rss_poll_s
+
# Directory bot: ~every 30s, refresh groups and register/auto-join new ones
if bot_type == "directory":
dir_tick += 1
@@ -551,7 +679,16 @@ async def _run_bot(
_append_log(b, f"Contact connected: {ct_name}")
# echo replies on message; broadcast/others greet via the auto-reply
- # configured in address settings, so nothing to do on connect here.
+ # configured in address settings. RSS also sends the latest items on connect.
+ if bot_type == "rss":
+ cid = ct.get("contactId")
+ if cid is not None: # send the latest items directly to the new contact
+ for e in b.rss_items[:3]:
+ try:
+ await chat.api_send_text_message(
+ {"chatType": "direct", "chatId": cid}, _rss_format(e))
+ except Exception:
+ pass
elif tag == "newChatItems":
items = evt.get("chatItems", [])
@@ -606,6 +743,10 @@ async def _run_bot(
b, chat, config, item, chat_info, text, DEFAULT_LLM_PROMPT
)
+ elif bot_type == "rss" and text:
+ if text.strip().lower() == "/new":
+ await _rss_send_latest(chat, item, b)
+
elif bot_type == "directory" and text:
await _handle_directory_message(
b, chat, config, name, item, chat_info, text
diff --git a/manager/rss_test.py b/manager/rss_test.py
new file mode 100644
index 0000000..5a4d799
--- /dev/null
+++ b/manager/rss_test.py
@@ -0,0 +1,133 @@
+"""End-to-end test of the RSS bot (Pattern 3, in-process FFI).
+
+Serves a mock RSS feed locally, starts an rss bot pointed at it, and checks:
+ - the bot creates a broadcast channel (observer group)
+ - the initial feed item is seeded (not re-posted)
+ - a newly-added feed item is broadcast to the channel
+
+Verifies via the bot's own view of the channel chat (no subscriber needed).
+
+Run: .venv/bin/python rss_test.py
+"""
+
+import asyncio
+import json
+import sys
+import threading
+import time
+from http.server import BaseHTTPRequestHandler, HTTPServer
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parent))
+import profiles as pm # noqa: E402
+
+DATA = Path("data")
+BOT_PREFIX = str(DATA / "rsstest_bot")
+BOT_PID = 99004
+
+# mutable feed state the mock server serves
+FEED = {"items": [("First post", "https://example.com/1")]}
+
+
+def feed_xml():
+ items = "".join(
+ f"
| echo | Repeats every message back to the sender — handy for testing a connection end to end. |
| llm | Chat with a local or remote LLM (OpenAI-compatible, e.g. Ollama). Give it context, it replies to your messages. |
| rss | Watches an RSS/Atom feed and broadcasts new posts to a channel it creates. Subscribers join the channel; send /new for the latest. |
| broadcast | Relays messages from authorized publishers out to all of the bot's contacts. |
| support | Business inbox — auto-replies with a welcome message and collects incoming inquiries. |
| directory | Directory service for discovering and listing groups or contacts. |