"""End-to-end test of the 'llm' bot (Pattern 3, in-process FFI). Stands up a tiny OpenAI-compatible mock server (so no Ollama needed), starts an llm bot pointed at it with a known context, connects a customer, sends a message, and verifies the bot's reply reflects both the configured context and the message. Run: .venv/bin/python llm_test.py """ import asyncio import json import sys import threading import time from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent)) import profiles as pm # noqa: E402 from simplex_chat import ChatApi, SqliteDb # noqa: E402 DATA = Path("data") BOT_PREFIX = str(DATA / "llmtest_bot") CUST_PREFIX = str(DATA / "llmtest_cust") BOT_PID = 99003 CONTEXT = "TESTCTX" def cleanup(): for pat in ("llmtest_bot_*", "llmtest_cust_*"): for p in DATA.glob(pat): p.unlink() class MockLLM(BaseHTTPRequestHandler): def do_POST(self): n = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(n) or b"{}") msgs = body.get("messages", []) system = next((m["content"] for m in msgs if m["role"] == "system"), "") last_user = next((m["content"] for m in reversed(msgs) if m["role"] == "user"), "") content = f"ctx:{system}|got:{last_user}" out = json.dumps({"choices": [{"message": {"role": "assistant", "content": content}}]}).encode() self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(out) def log_message(self, *a): pass async def wait_until(fn, timeout=120, every=1): start = time.time() while time.time() - start < timeout: v = await fn() if v: return v await asyncio.sleep(every) return None async def incoming_texts(chat, contact_id): c = await chat.api_get_chat("direct", contact_id, 50) return [ ci.get("content", {}).get("msgContent", {}).get("text", "") for ci in c.get("chatItems", []) if ci.get("chatDir", {}).get("type", "").endswith("Rcv") ] async def main() -> int: cleanup() srv = HTTPServer(("127.0.0.1", 0), MockLLM) port = srv.server_address[1] threading.Thread(target=srv.serve_forever, daemon=True).start() print("mock LLM on port", port) addr_box = {} async def on_address(pid, addr): addr_box["addr"] = addr profile = { "id": BOT_PID, "name": "llmtestbot", "bot_type": "llm", "db_prefix": BOT_PREFIX, "config": json.dumps({ "api_base": f"http://127.0.0.1:{port}/v1", "model": "test-model", "api_key": "x", "system_prompt": CONTEXT, }), } cust = None ok = True try: await pm.start_bot(profile, on_address) addr = await wait_until(lambda: asyncio.sleep(0, addr_box.get("addr")), timeout=90) print("bot address:", bool(addr)) assert addr, "llm bot never published an address" cust = await ChatApi.init(SqliteDb(file_prefix=CUST_PREFIX)) if not await cust.api_get_active_user(): await cust.api_create_active_user({"displayName": "customer", "fullName": ""}) await cust.start_chat() await cust.send_chat_cmd(f"/connect {addr}") u = await cust.api_get_active_user() cid = await wait_until( lambda: _first_contact(cust, u["userId"]), timeout=90, every=2 ) assert cid, "customer did not connect" await asyncio.sleep(2) await cust.api_send_text_message({"chatType": "direct", "chatId": cid}, "ping") reply = await wait_until( lambda: _find_reply(cust, cid), timeout=60, every=2 ) print("bot reply:", reply) assert reply and "ctx:TESTCTX" in reply and "got:ping" in reply, \ "reply did not reflect context + message" except AssertionError as e: ok = False print("ASSERT FAIL:", e) finally: await pm.stop_bot(BOT_PID) if cust: try: await cust.close() except Exception: pass srv.shutdown() cleanup() print("\nRESULT:", "PASS — llm bot replies using its context" if ok else "FAIL") return 0 if ok else 1 async def _first_contact(chat, uid): cs = await chat.api_list_contacts(uid) return cs[0]["contactId"] if cs else None async def _find_reply(chat, cid): for t in await incoming_texts(chat, cid): if t.startswith("ctx:"): return t return None if __name__ == "__main__": raise SystemExit(asyncio.run(main()))