Sam — Research Agent
Web search via SearXNG + synthesis via Ollama. POST /research to use.
import json import logging import os import re from datetime import datetime, timezone from pathlib import Path import httpx from fastapi import FastAPI, HTTPException from pydantic import BaseModel VERSION_PATTERN = re.compile(r'\bv?\d+\.\d+[\.\d]*\b') VERSION_CONTEXT = re.compile(r'\b(latest|stable|current|released?|new version)\b', re.IGNORECASE) logging.basicConfig(level=logging.INFO, format="%(asctime)s [sam] %(message)s") logger = logging.getLogger("sam") app = FastAPI(title="sam-research", version="0.1.0") OLLAMA_URL = os.getenv("OLLAMA_URL", "http://172.27.40.20:11434") OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "gemma4") SEARXNG_URL = os.getenv("SEARXNG_URL", "http://searxng:8080") SITES_DIR = Path(os.getenv("SITES_DIR", "/opt/sites")) AGENT_OS_DIR = Path(os.getenv("AGENT_OS_DIR", "/opt/agent-os")) HISTORY_FILE = SITES_DIR / "sam" / "history.json" MAX_HISTORY = 10 def load_history() -> list[dict]: if HISTORY_FILE.exists(): try: return json.loads(HISTORY_FILE.read_text()) except Exception: pass return [] def save_history(entry: dict): history = load_history() history.insert(0, entry) HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True) HISTORY_FILE.write_text(json.dumps(history[:MAX_HISTORY], indent=2)) async def search_web(query: str, max_results: int) -> list[dict]: params = {"q": query, "format": "json", "categories": "general", "language": "en"} async with httpx.AsyncClient(timeout=30.0) as client: r = await client.get(f"{SEARXNG_URL}/search", params=params) r.raise_for_status() data = r.json() return [ {"title": item.get("title", ""), "url": item.get("url", ""), "snippet": item.get("content", "")} for item in data.get("results", [])[:max_results] ] def _has_version(results: list[dict]) -> bool: for r in results: text = r.get("snippet", "") + " " + r.get("title", "") if VERSION_PATTERN.search(text) and VERSION_CONTEXT.search(text): return True return False async def search_with_retry(query: str, max_results: int) -> tuple[list[dict], bool]: """Search SearXNG, retrying with a refined query if no version number found in results. Returns (results, retried).""" results = await search_web(query, max_results) if _has_version(results): return results, False retry_query = query + " latest release version changelog" logger.info(f"no version found in results — retrying: '{retry_query}'") retry_results = await search_web(retry_query, max_results) seen_urls = {r["url"] for r in results} merged = results + [r for r in retry_results if r["url"] not in seen_urls] return merged[:max_results], True async def ollama_synthesize(prompt: str, model: str) -> str: payload = { "model": model, "messages": [{"role": "user", "content": prompt}], "stream": False, } async with httpx.AsyncClient(timeout=120.0) as client: r = await client.post(f"{OLLAMA_URL}/api/chat", json=payload) r.raise_for_status() return r.json()["message"]["content"] def build_prompt(query: str, results: list[dict], instructions: str | None) -> str: lines = [f"You are a research assistant. The user wants information on: {query}\n"] if instructions: lines.append(f"Instructions: {instructions}\n") lines.append("Web search results:\n") for i, r in enumerate(results, 1): lines.append(f"{i}. **{r['title']}**") lines.append(f" URL: {r['url']}") if r["snippet"]: lines.append(f" {r['snippet']}") lines.append("") lines.append( "Based on these results, provide a thorough answer. " "Follow any format instructions given above. " "Cite sources by referencing their URLs inline." ) return "\n".join(lines) def render_html(history: list[dict]) -> str: now_iso = datetime.now(timezone.utc).isoformat() rows = "" for entry in history: ts = entry.get("timestamp", "") ts_span = f'' if ts else "—" query = entry.get("query", "") model = entry.get("model", "") result_len = entry.get("summary_chars", 0) rows += f"""
Web search via SearXNG + synthesis via Ollama. POST /research to use.