"""End-to-end test of the crypto price bot (Pattern 3, in-process FFI). Serves a mock CoinGecko simple/price endpoint, starts a crypto bot, and checks it creates a channel and posts a price snapshot of the selected coins/currencies. Run: .venv/bin/python crypto_test.py """ import asyncio import json import sys import threading import time import urllib.request from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent)) import profiles as pm # noqa: E402 DATA = Path("data") BOT_PREFIX = str(DATA / "cryptotest_bot") BOT_PID = 99005 PRICES = {"bitcoin": {"usd": 65000, "gbp": 51000}, "ethereum": {"usd": 3200, "gbp": 2500}} class CGHandler(BaseHTTPRequestHandler): def do_GET(self): body = json.dumps(PRICES).encode() self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(body) def log_message(self, *a): pass def cleanup(): for p in DATA.glob("cryptotest_bot_*"): p.unlink() async def wait_until(fn, timeout=120, every=1): start = time.time() while time.time() - start < timeout: v = await fn() if v: return v await asyncio.sleep(every) return None async def channel_texts(chat, gid): c = await chat.api_get_chat("group", gid, 50) return [ci.get("content", {}).get("msgContent", {}).get("text", "") for ci in c.get("chatItems", [])] async def main() -> int: cleanup() srv = HTTPServer(("127.0.0.1", 0), CGHandler) port = srv.server_address[1] threading.Thread(target=srv.serve_forever, daemon=True).start() # point the bot's fetcher at the mock server base = f"http://127.0.0.1:{port}/" orig_fetch = pm._fetch_crypto def mock_fetch(ids, vs): with urllib.request.urlopen(base, timeout=10) as r: return json.loads(r.read()) pm._fetch_crypto = mock_fetch print("mock CoinGecko on", base) profile = { "id": BOT_PID, "name": "cryptotestbot", "bot_type": "crypto", "db_prefix": BOT_PREFIX, "config": json.dumps({"coins": ["bitcoin", "ethereum"], "currencies": ["usd", "gbp"], "poll_seconds": 60}), } ok = True try: await pm.start_bot(profile, lambda pid, addr: asyncio.sleep(0)) b = pm.get_running(BOT_PID) gid = await wait_until(lambda: asyncio.sleep(0, b.channel_gid), timeout=90) print("channel created:", bool(gid), "gid", gid) assert gid, "crypto bot did not create a channel" got = await wait_until( lambda: _has_price(channel_texts(b.chat, gid)), timeout=30, every=2 ) print("price snapshot posted:", got) assert got and "Bitcoin" in got and "$" in got, "no valid price snapshot posted" except AssertionError as e: ok = False print("ASSERT FAIL:", e) finally: pm._fetch_crypto = orig_fetch await pm.stop_bot(BOT_PID) srv.shutdown() cleanup() print("\nRESULT:", "PASS — crypto bot posts price snapshots" if ok else "FAIL") return 0 if ok else 1 async def _has_price(coro): for t in await coro: if "Crypto prices" in (t or ""): return t return None if __name__ == "__main__": raise SystemExit(asyncio.run(main()))