diff --git a/manager/crypto_test.py b/manager/crypto_test.py new file mode 100644 index 0000000..1f64c9b --- /dev/null +++ b/manager/crypto_test.py @@ -0,0 +1,116 @@ +"""End-to-end test of the crypto price bot (Pattern 3, in-process FFI). + +Serves a mock CoinGecko simple/price endpoint, starts a crypto bot, and checks it +creates a channel and posts a price snapshot of the selected coins/currencies. + +Run: .venv/bin/python crypto_test.py +""" + +import asyncio +import json +import sys +import threading +import time +import urllib.request +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent)) +import profiles as pm # noqa: E402 + +DATA = Path("data") +BOT_PREFIX = str(DATA / "cryptotest_bot") +BOT_PID = 99005 + +PRICES = {"bitcoin": {"usd": 65000, "gbp": 51000}, "ethereum": {"usd": 3200, "gbp": 2500}} + + +class CGHandler(BaseHTTPRequestHandler): + def do_GET(self): + body = json.dumps(PRICES).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(body) + + def log_message(self, *a): + pass + + +def cleanup(): + for p in DATA.glob("cryptotest_bot_*"): + p.unlink() + + +async def wait_until(fn, timeout=120, every=1): + start = time.time() + while time.time() - start < timeout: + v = await fn() + if v: + return v + await asyncio.sleep(every) + return None + + +async def channel_texts(chat, gid): + c = await chat.api_get_chat("group", gid, 50) + return [ci.get("content", {}).get("msgContent", {}).get("text", "") for ci in c.get("chatItems", [])] + + +async def main() -> int: + cleanup() + srv = HTTPServer(("127.0.0.1", 0), CGHandler) + port = srv.server_address[1] + threading.Thread(target=srv.serve_forever, daemon=True).start() + + # point the bot's fetcher at the mock server + base = f"http://127.0.0.1:{port}/" + orig_fetch = pm._fetch_crypto + + def mock_fetch(ids, vs): + with urllib.request.urlopen(base, timeout=10) as r: + return json.loads(r.read()) + pm._fetch_crypto = mock_fetch + print("mock CoinGecko on", base) + + profile = { + "id": BOT_PID, "name": "cryptotestbot", "bot_type": "crypto", + "db_prefix": BOT_PREFIX, + "config": json.dumps({"coins": ["bitcoin", "ethereum"], + "currencies": ["usd", "gbp"], "poll_seconds": 60}), + } + ok = True + try: + await pm.start_bot(profile, lambda pid, addr: asyncio.sleep(0)) + b = pm.get_running(BOT_PID) + gid = await wait_until(lambda: asyncio.sleep(0, b.channel_gid), timeout=90) + print("channel created:", bool(gid), "gid", gid) + assert gid, "crypto bot did not create a channel" + + got = await wait_until( + lambda: _has_price(channel_texts(b.chat, gid)), timeout=30, every=2 + ) + print("price snapshot posted:", got) + assert got and "Bitcoin" in got and "$" in got, "no valid price snapshot posted" + except AssertionError as e: + ok = False + print("ASSERT FAIL:", e) + finally: + pm._fetch_crypto = orig_fetch + await pm.stop_bot(BOT_PID) + srv.shutdown() + cleanup() + + print("\nRESULT:", "PASS — crypto bot posts price snapshots" if ok else "FAIL") + return 0 if ok else 1 + + +async def _has_price(coro): + for t in await coro: + if "Crypto prices" in (t or ""): + return t + return None + + +if __name__ == "__main__": + raise SystemExit(asyncio.run(main())) diff --git a/manager/profiles.py b/manager/profiles.py index 6f63e52..ab0c7ac 100644 --- a/manager/profiles.py +++ b/manager/profiles.py @@ -4,6 +4,7 @@ import asyncio import json import logging import time +import urllib.parse import urllib.request import xml.etree.ElementTree as ET from dataclasses import dataclass, field @@ -138,7 +139,7 @@ def group_member_count(g: dict) -> int: return g.get("groupSummary", {}).get("currentMembers", 0) -BOT_TYPES = ["echo", "llm", "rss", "broadcast", "support", "directory", "deadmans"] +BOT_TYPES = ["echo", "llm", "rss", "crypto", "broadcast", "support", "directory", "deadmans"] USER_TYPES = ["user"] BUSINESS_TYPES = ["business"] # cli accounts with a business address (per-customer group chats) ALL_TYPES = BOT_TYPES + USER_TYPES + BUSINESS_TYPES @@ -157,10 +158,10 @@ class RunningBot: chat: Any = None # simplex_chat ChatApi instance # Per-contact LLM conversation history (contactId → [{role, content}, ...]) histories: dict[int, list[dict]] = field(default_factory=dict) - # RSS bot state - rss_seen: set = field(default_factory=set) # entry ids already posted - rss_next_poll: float = 0.0 - rss_gid: int | None = None # broadcast channel group id + # Feed-style bot state (rss / crypto) + rss_seen: set = field(default_factory=set) # rss: entry ids already posted + poll_next: float = 0.0 # next scheduled poll (epoch seconds) + channel_gid: int | None = None # broadcast channel group id # profile_id → RunningBot @@ -464,9 +465,9 @@ def _rss_format(e: dict) -> str: return f"{title}\n{e['link']}" if e.get("link") else title -async def _rss_ensure_channel(profile_id: int, b: "RunningBot", chat: Any, user_id: int, - name: str, config: dict) -> int | None: - """Find or create the broadcast channel (observer group) for this feed; persist its id.""" +async def _ensure_channel(profile_id: int, b: "RunningBot", chat: Any, user_id: int, + channel_name: str, config: dict) -> int | None: + """Find or create the broadcast channel (observer group) for a feed/ticker bot; persist its id.""" import db as _db gid = config.get("channel_gid") if gid: @@ -478,17 +479,17 @@ async def _rss_ensure_channel(profile_id: int, b: "RunningBot", chat: Any, user_ return gid # assume still valid if the lookup failed try: info = await chat.api_new_group(user_id, { - "displayName": f"{name} Feed", "fullName": "", + "displayName": channel_name, "fullName": "", "groupPreferences": {"history": {"enable": "on"}}, # new subscribers see recent posts }) gid = info["groupId"] await chat.api_create_group_link(gid, "observer") # channel = observer link config["channel_gid"] = gid _db.update_config(profile_id, config) - _append_log(b, f"RSS channel created (group {gid})") + _append_log(b, f"Channel created (group {gid})") return gid except Exception: - log.exception("rss: failed to create channel") + log.exception("failed to create channel") return None @@ -525,6 +526,74 @@ async def _rss_poll(b: "RunningBot", chat: Any, gid: int | None, config: dict, _append_log(b, f"RSS posted {len(to_post)} item(s)") +# ── Crypto price bot helpers (CoinGecko) ───────────────────────────────────────── +# Popular coins offered in the UI: CoinGecko id → display name. +CRYPTO_COINS = { + "bitcoin": "Bitcoin", "ethereum": "Ethereum", "tether": "Tether", "binancecoin": "BNB", + "solana": "Solana", "ripple": "XRP", "cardano": "Cardano", "dogecoin": "Dogecoin", + "polkadot": "Polkadot", "litecoin": "Litecoin", "tron": "TRON", "chainlink": "Chainlink", +} +# vs_currency → display symbol. +CRYPTO_CURRENCIES = {"usd": "$", "eur": "€", "gbp": "£", "jpy": "¥", "aud": "A$", "cad": "C$"} + + +def _fetch_crypto(ids: list, vs: list) -> dict: + """Fetch current prices from CoinGecko's simple/price endpoint (JSON).""" + q = urllib.parse.urlencode({"ids": ",".join(ids), "vs_currencies": ",".join(vs)}) + url = f"https://api.coingecko.com/api/v3/simple/price?{q}" + req = urllib.request.Request(url, headers={"User-Agent": "simplex-crypto-bot/1.0"}) + with urllib.request.urlopen(req, timeout=20) as r: # noqa: S310 - fixed CoinGecko host + return json.loads(r.read()) + + +def _fmt_price(v) -> str: + try: + v = float(v) + except (TypeError, ValueError): + return str(v) + if v >= 1000: + return f"{v:,.0f}" + if v >= 1: + return f"{v:,.2f}" + return f"{v:.8f}".rstrip("0").rstrip(".") + + +def _crypto_format(prices: dict, coins: list, currencies: list) -> str | None: + lines = [] + for cid in coins: + d = prices.get(cid) + if not d: + continue + nm = CRYPTO_COINS.get(cid, cid.title()) + parts = [f"{CRYPTO_CURRENCIES.get(cur, '')}{_fmt_price(d[cur])}" + for cur in currencies if d.get(cur) is not None] + if parts: + lines.append(f"{nm}: " + " · ".join(parts)) + if not lines: + return None + return f"Crypto prices @ {datetime.now().strftime('%H:%M')}\n" + "\n".join(lines) + + +async def _crypto_poll(b: "RunningBot", chat: Any, gid: int | None, config: dict) -> None: + """Fetch selected prices and post a snapshot to the channel (the 'stream').""" + coins = config.get("coins") or ["bitcoin", "ethereum"] + currencies = config.get("currencies") or ["usd"] + try: + prices = await asyncio.to_thread(_fetch_crypto, coins, currencies) + except Exception as e: + log.error("crypto fetch error: %s", e) + _append_log(b, f"Crypto fetch error: {e}") + return + msg = _crypto_format(prices, coins, currencies) + if not gid or not msg: + return + try: + await chat.api_send_text_message({"chatType": "group", "chatId": gid}, msg) + _append_log(b, "Crypto prices posted") + except Exception: + log.exception("crypto: failed to post to channel") + + async def _run_bot( profile_id: int, name: str, @@ -589,6 +658,9 @@ async def _run_bot( elif bot_type == "rss": welcome = config.get("welcome_message") or f"You're subscribed to {name}." settings["autoReply"] = {"type": "text", "text": welcome} + elif bot_type == "crypto": + welcome = config.get("welcome_message") or f"You're subscribed to {name} crypto prices." + settings["autoReply"] = {"type": "text", "text": welcome} elif bot_type in ("echo", "llm", "directory", "deadmans"): welcome = config.get("welcome_message", f"Connected to {name}.") settings["autoReply"] = {"type": "text", "text": welcome} @@ -620,20 +692,24 @@ async def _run_bot( # RSS bot: ensure a broadcast channel and seed seen-items so we don't replay the # whole feed on startup; then poll on an interval. - rss_poll_s = float(config.get("poll_seconds", 300)) - if bot_type == "rss": - b.rss_gid = await _rss_ensure_channel(profile_id, b, chat, user_id, name, config) - if b.rss_gid and not config.get("rss_populated"): - # first run for this feed: fill the channel with the latest items so new - # subscribers see content (recent history). Done once, then flagged. - await _rss_poll(b, chat, b.rss_gid, config, seed=False, max_post=5) - config["rss_populated"] = True - import db as _db - _db.update_config(profile_id, config) - else: - # already populated on a previous run — just record current ids, don't replay - await _rss_poll(b, chat, b.rss_gid, config, seed=True) - b.rss_next_poll = time.time() + rss_poll_s + poll_s = float(config.get("poll_seconds", 300)) + if bot_type in ("rss", "crypto"): + cname = f"{name} Feed" if bot_type == "rss" else f"{name} Prices" + b.channel_gid = await _ensure_channel(profile_id, b, chat, user_id, cname, config) + if bot_type == "rss": + if b.channel_gid and not config.get("rss_populated"): + # first run for this feed: fill the channel with the latest items so new + # subscribers see content (recent history). Done once, then flagged. + await _rss_poll(b, chat, b.channel_gid, config, seed=False, max_post=5) + config["rss_populated"] = True + import db as _db + _db.update_config(profile_id, config) + else: + # already populated on a previous run — just record current ids, don't replay + await _rss_poll(b, chat, b.channel_gid, config, seed=True) + else: # crypto: post an immediate price snapshot so the channel isn't empty + await _crypto_poll(b, chat, b.channel_gid, config) + b.poll_next = time.time() + poll_s await refresh() # Event loop @@ -641,9 +717,14 @@ async def _run_bot( evt = await chat.recv_chat_event(500_000) # RSS bot: poll the feed on its interval and broadcast new items to the channel - if bot_type == "rss" and time.time() >= b.rss_next_poll: - await _rss_poll(b, chat, b.rss_gid, config, seed=False) - b.rss_next_poll = time.time() + rss_poll_s + if bot_type == "rss" and time.time() >= b.poll_next: + await _rss_poll(b, chat, b.channel_gid, config, seed=False) + b.poll_next = time.time() + poll_s + + # Crypto bot: post a fresh price snapshot to the channel on its interval + if bot_type == "crypto" and time.time() >= b.poll_next: + await _crypto_poll(b, chat, b.channel_gid, config) + b.poll_next = time.time() + poll_s # Directory bot: ~every 30s, refresh groups and register/auto-join new ones if bot_type == "directory": diff --git a/manager/rss_test.py b/manager/rss_test.py index 06eda5f..4f0eabe 100644 --- a/manager/rss_test.py +++ b/manager/rss_test.py @@ -96,7 +96,7 @@ async def main() -> int: b = pm.get_running(BOT_PID) # 1) channel created - gid = await wait_until(lambda: asyncio.sleep(0, b.rss_gid), timeout=90) + gid = await wait_until(lambda: asyncio.sleep(0, b.channel_gid), timeout=90) print("channel created:", bool(gid), "gid", gid) assert gid, "rss bot did not create a channel" diff --git a/manager/templates/list.html b/manager/templates/list.html index cf364b8..32eecaa 100644 --- a/manager/templates/list.html +++ b/manager/templates/list.html @@ -19,6 +19,11 @@ .bot-types-card table td { vertical-align: top; } .bot-types-card .tag { white-space: nowrap; } + + .chk-grid { display: grid; grid-template-columns: repeat(auto-fill, minmax(120px, 1fr)); gap: 6px 12px; } + .chk { display: flex; align-items: center; gap: 7px; font-size: 13px; font-weight: 500; + color: var(--text); cursor: pointer; } + .chk input { width: auto; } {% endblock %} @@ -50,6 +55,7 @@ echoRepeats every message back to the sender — handy for testing a connection end to end. llmChat with a local or remote LLM (OpenAI-compatible, e.g. Ollama). Give it context, it replies to your messages. rssWatches an RSS/Atom feed and broadcasts new posts to a channel it creates. Subscribers join the channel to receive them. + cryptoStreams selected crypto prices (CoinGecko) to a channel on an interval. Pick coins & currencies below. broadcastRelays messages from authorized publishers out to all of the bot's contacts. supportBusiness inbox — auto-replies with a welcome message and collects incoming inquiries. directoryDirectory service for discovering and listing groups or contacts. @@ -242,6 +248,45 @@ + {% endif %}