Add 'crypto' bot: streams CoinGecko prices to a channel

New crypto bot type: creates a broadcast channel and posts a price snapshot of the
selected coins/currencies (CoinGecko simple/price JSON) every interval — same
channel-streaming model as RSS. Create form has checkbox grids for popular coins
and currencies plus a poll interval. Generalize the channel helper and feed-poll
state (channel_gid/poll_next) shared by rss + crypto. Adds crypto_test.py (mock
CoinGecko) — passes; rss_test updated for the renamed field.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Jon
2026-06-05 22:31:44 +01:00
parent 7cda767408
commit 3456ed9411
4 changed files with 282 additions and 29 deletions

View File

@@ -4,6 +4,7 @@ import asyncio
import json
import logging
import time
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
@@ -138,7 +139,7 @@ def group_member_count(g: dict) -> int:
return g.get("groupSummary", {}).get("currentMembers", 0)
BOT_TYPES = ["echo", "llm", "rss", "broadcast", "support", "directory", "deadmans"]
BOT_TYPES = ["echo", "llm", "rss", "crypto", "broadcast", "support", "directory", "deadmans"]
USER_TYPES = ["user"]
BUSINESS_TYPES = ["business"] # cli accounts with a business address (per-customer group chats)
ALL_TYPES = BOT_TYPES + USER_TYPES + BUSINESS_TYPES
@@ -157,10 +158,10 @@ class RunningBot:
chat: Any = None # simplex_chat ChatApi instance
# Per-contact LLM conversation history (contactId → [{role, content}, ...])
histories: dict[int, list[dict]] = field(default_factory=dict)
# RSS bot state
rss_seen: set = field(default_factory=set) # entry ids already posted
rss_next_poll: float = 0.0
rss_gid: int | None = None # broadcast channel group id
# Feed-style bot state (rss / crypto)
rss_seen: set = field(default_factory=set) # rss: entry ids already posted
poll_next: float = 0.0 # next scheduled poll (epoch seconds)
channel_gid: int | None = None # broadcast channel group id
# profile_id → RunningBot
@@ -464,9 +465,9 @@ def _rss_format(e: dict) -> str:
return f"{title}\n{e['link']}" if e.get("link") else title
async def _rss_ensure_channel(profile_id: int, b: "RunningBot", chat: Any, user_id: int,
name: str, config: dict) -> int | None:
"""Find or create the broadcast channel (observer group) for this feed; persist its id."""
async def _ensure_channel(profile_id: int, b: "RunningBot", chat: Any, user_id: int,
channel_name: str, config: dict) -> int | None:
"""Find or create the broadcast channel (observer group) for a feed/ticker bot; persist its id."""
import db as _db
gid = config.get("channel_gid")
if gid:
@@ -478,17 +479,17 @@ async def _rss_ensure_channel(profile_id: int, b: "RunningBot", chat: Any, user_
return gid # assume still valid if the lookup failed
try:
info = await chat.api_new_group(user_id, {
"displayName": f"{name} Feed", "fullName": "",
"displayName": channel_name, "fullName": "",
"groupPreferences": {"history": {"enable": "on"}}, # new subscribers see recent posts
})
gid = info["groupId"]
await chat.api_create_group_link(gid, "observer") # channel = observer link
config["channel_gid"] = gid
_db.update_config(profile_id, config)
_append_log(b, f"RSS channel created (group {gid})")
_append_log(b, f"Channel created (group {gid})")
return gid
except Exception:
log.exception("rss: failed to create channel")
log.exception("failed to create channel")
return None
@@ -525,6 +526,74 @@ async def _rss_poll(b: "RunningBot", chat: Any, gid: int | None, config: dict,
_append_log(b, f"RSS posted {len(to_post)} item(s)")
# ── Crypto price bot helpers (CoinGecko) ─────────────────────────────────────────
# Popular coins offered in the UI: CoinGecko id → display name.
CRYPTO_COINS = {
"bitcoin": "Bitcoin", "ethereum": "Ethereum", "tether": "Tether", "binancecoin": "BNB",
"solana": "Solana", "ripple": "XRP", "cardano": "Cardano", "dogecoin": "Dogecoin",
"polkadot": "Polkadot", "litecoin": "Litecoin", "tron": "TRON", "chainlink": "Chainlink",
}
# vs_currency → display symbol.
CRYPTO_CURRENCIES = {"usd": "$", "eur": "", "gbp": "£", "jpy": "¥", "aud": "A$", "cad": "C$"}
def _fetch_crypto(ids: list, vs: list) -> dict:
"""Fetch current prices from CoinGecko's simple/price endpoint (JSON)."""
q = urllib.parse.urlencode({"ids": ",".join(ids), "vs_currencies": ",".join(vs)})
url = f"https://api.coingecko.com/api/v3/simple/price?{q}"
req = urllib.request.Request(url, headers={"User-Agent": "simplex-crypto-bot/1.0"})
with urllib.request.urlopen(req, timeout=20) as r: # noqa: S310 - fixed CoinGecko host
return json.loads(r.read())
def _fmt_price(v) -> str:
try:
v = float(v)
except (TypeError, ValueError):
return str(v)
if v >= 1000:
return f"{v:,.0f}"
if v >= 1:
return f"{v:,.2f}"
return f"{v:.8f}".rstrip("0").rstrip(".")
def _crypto_format(prices: dict, coins: list, currencies: list) -> str | None:
lines = []
for cid in coins:
d = prices.get(cid)
if not d:
continue
nm = CRYPTO_COINS.get(cid, cid.title())
parts = [f"{CRYPTO_CURRENCIES.get(cur, '')}{_fmt_price(d[cur])}"
for cur in currencies if d.get(cur) is not None]
if parts:
lines.append(f"{nm}: " + " · ".join(parts))
if not lines:
return None
return f"Crypto prices @ {datetime.now().strftime('%H:%M')}\n" + "\n".join(lines)
async def _crypto_poll(b: "RunningBot", chat: Any, gid: int | None, config: dict) -> None:
"""Fetch selected prices and post a snapshot to the channel (the 'stream')."""
coins = config.get("coins") or ["bitcoin", "ethereum"]
currencies = config.get("currencies") or ["usd"]
try:
prices = await asyncio.to_thread(_fetch_crypto, coins, currencies)
except Exception as e:
log.error("crypto fetch error: %s", e)
_append_log(b, f"Crypto fetch error: {e}")
return
msg = _crypto_format(prices, coins, currencies)
if not gid or not msg:
return
try:
await chat.api_send_text_message({"chatType": "group", "chatId": gid}, msg)
_append_log(b, "Crypto prices posted")
except Exception:
log.exception("crypto: failed to post to channel")
async def _run_bot(
profile_id: int,
name: str,
@@ -589,6 +658,9 @@ async def _run_bot(
elif bot_type == "rss":
welcome = config.get("welcome_message") or f"You're subscribed to {name}."
settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type == "crypto":
welcome = config.get("welcome_message") or f"You're subscribed to {name} crypto prices."
settings["autoReply"] = {"type": "text", "text": welcome}
elif bot_type in ("echo", "llm", "directory", "deadmans"):
welcome = config.get("welcome_message", f"Connected to {name}.")
settings["autoReply"] = {"type": "text", "text": welcome}
@@ -620,20 +692,24 @@ async def _run_bot(
# RSS bot: ensure a broadcast channel and seed seen-items so we don't replay the
# whole feed on startup; then poll on an interval.
rss_poll_s = float(config.get("poll_seconds", 300))
if bot_type == "rss":
b.rss_gid = await _rss_ensure_channel(profile_id, b, chat, user_id, name, config)
if b.rss_gid and not config.get("rss_populated"):
# first run for this feed: fill the channel with the latest items so new
# subscribers see content (recent history). Done once, then flagged.
await _rss_poll(b, chat, b.rss_gid, config, seed=False, max_post=5)
config["rss_populated"] = True
import db as _db
_db.update_config(profile_id, config)
else:
# already populated on a previous run — just record current ids, don't replay
await _rss_poll(b, chat, b.rss_gid, config, seed=True)
b.rss_next_poll = time.time() + rss_poll_s
poll_s = float(config.get("poll_seconds", 300))
if bot_type in ("rss", "crypto"):
cname = f"{name} Feed" if bot_type == "rss" else f"{name} Prices"
b.channel_gid = await _ensure_channel(profile_id, b, chat, user_id, cname, config)
if bot_type == "rss":
if b.channel_gid and not config.get("rss_populated"):
# first run for this feed: fill the channel with the latest items so new
# subscribers see content (recent history). Done once, then flagged.
await _rss_poll(b, chat, b.channel_gid, config, seed=False, max_post=5)
config["rss_populated"] = True
import db as _db
_db.update_config(profile_id, config)
else:
# already populated on a previous run — just record current ids, don't replay
await _rss_poll(b, chat, b.channel_gid, config, seed=True)
else: # crypto: post an immediate price snapshot so the channel isn't empty
await _crypto_poll(b, chat, b.channel_gid, config)
b.poll_next = time.time() + poll_s
await refresh()
# Event loop
@@ -641,9 +717,14 @@ async def _run_bot(
evt = await chat.recv_chat_event(500_000)
# RSS bot: poll the feed on its interval and broadcast new items to the channel
if bot_type == "rss" and time.time() >= b.rss_next_poll:
await _rss_poll(b, chat, b.rss_gid, config, seed=False)
b.rss_next_poll = time.time() + rss_poll_s
if bot_type == "rss" and time.time() >= b.poll_next:
await _rss_poll(b, chat, b.channel_gid, config, seed=False)
b.poll_next = time.time() + poll_s
# Crypto bot: post a fresh price snapshot to the channel on its interval
if bot_type == "crypto" and time.time() >= b.poll_next:
await _crypto_poll(b, chat, b.channel_gid, config)
b.poll_next = time.time() + poll_s
# Directory bot: ~every 30s, refresh groups and register/auto-join new ones
if bot_type == "directory":