"""End-to-end test of the RSS bot (Pattern 3, in-process FFI). Serves a mock RSS feed locally, starts an rss bot pointed at it, and checks: - the bot creates a broadcast channel (observer group) - the initial feed item is seeded (not re-posted) - a newly-added feed item is broadcast to the channel Verifies via the bot's own view of the channel chat (no subscriber needed). Run: .venv/bin/python rss_test.py """ import asyncio import json import sys import threading import time from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent)) import profiles as pm # noqa: E402 DATA = Path("data") BOT_PREFIX = str(DATA / "rsstest_bot") BOT_PID = 99004 # mutable feed state the mock server serves FEED = {"items": [("First post", "https://example.com/1")]} def feed_xml(): items = "".join( f"{t}{l}{l}" for t, l in FEED["items"] ) return (f'' f'Test Feed{items}').encode() class FeedHandler(BaseHTTPRequestHandler): def do_GET(self): body = feed_xml() self.send_response(200) self.send_header("Content-Type", "application/rss+xml") self.end_headers() self.wfile.write(body) def log_message(self, *a): pass def cleanup(): for p in DATA.glob("rsstest_bot_*"): p.unlink() bf = DATA / "manager.db" # leave the manager db alone; we don't use it here async def wait_until(fn, timeout=120, every=1): start = time.time() while time.time() - start < timeout: v = await fn() if v: return v await asyncio.sleep(every) return None async def channel_texts(chat, gid): c = await chat.api_get_chat("group", gid, 50) out = [] for ci in c.get("chatItems", []): out.append(ci.get("content", {}).get("msgContent", {}).get("text", "")) return out async def main() -> int: cleanup() srv = HTTPServer(("127.0.0.1", 0), FeedHandler) port = srv.server_address[1] threading.Thread(target=srv.serve_forever, daemon=True).start() url = f"http://127.0.0.1:{port}/feed.xml" print("mock feed at", url) async def on_address(pid, addr): pass profile = { "id": BOT_PID, "name": "rsstestbot", "bot_type": "rss", "db_prefix": BOT_PREFIX, "config": json.dumps({"feed_url": url, "poll_seconds": 5}), } ok = True try: await pm.start_bot(profile, on_address) b = pm.get_running(BOT_PID) # 1) channel created gid = await wait_until(lambda: asyncio.sleep(0, b.channel_gid), timeout=90) print("channel created:", bool(gid), "gid", gid) assert gid, "rss bot did not create a channel" # 2) first run populates the channel with the existing item(s) got_initial = await wait_until( lambda: _contains(channel_texts(b.chat, gid), "First post"), timeout=20, every=2 ) print("initial item populated to channel:", bool(got_initial)) assert got_initial, "channel was not populated on first run" # 3) add a new item → it should be broadcast on the next poll FEED["items"].insert(0, ("Breaking news", "https://example.com/2")) got = await wait_until( lambda: _contains(channel_texts(b.chat, gid), "Breaking news"), timeout=30, every=2 ) print("new item broadcast to channel:", bool(got)) ok = bool(got) except AssertionError as e: ok = False print("ASSERT FAIL:", e) finally: await pm.stop_bot(BOT_PID) srv.shutdown() cleanup() print("\nRESULT:", "PASS — rss bot broadcasts new feed posts" if ok else "FAIL") return 0 if ok else 1 async def _contains(coro, needle): return any(needle in (t or "") for t in await coro) if __name__ == "__main__": raise SystemExit(asyncio.run(main()))