import asyncio import json import logging import os import time from datetime import datetime, timezone from pathlib import Path from typing import AsyncGenerator from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel from .brain import stream_completion from .intent import classify_intent, extract_agent_name, extract_project_name from .tools import create_plane_issue, get_agent_output, get_all_agent_status logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") logger = logging.getLogger("jon-snow") AGENT_OS_DIR = Path(os.getenv("AGENT_OS_DIR", "/opt/agent-os")) SITES_DIR = Path(os.getenv("SITES_DIR", "/opt/sites")) app = FastAPI(title="Jon Snow", version="0.2.0") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) SYSTEM_PROMPT = """You are Jon Snow, chief of staff for the NxM home lab agent ecosystem on a self-hosted Linux server (172.27.40.3). Live agents you coordinate: - hodor (8200): HTTP gateway — routes requests to Ollama - bran: Daily changelog summariser — runs 06:00 SAST, writes to /opt/sites/changelog/ - varys: Infrastructure monitor — runs every 15 min, HTTP health checks + agent watchdog - sam (8500): Research agent — SearXNG + Ollama synthesis - raven (8400): Notifications — Discord webhook + Gmail SMTP - qyburn (8700): LLM coding agent — qwen2.5-coder:14b, approve/reject workflow - citadel (8300): MCP tool registry — 16 tools including Plane integration Infrastructure: Ubuntu Docker host 172.27.40.3, Ollama at 172.27.40.20:11434, Netbird VPN (100.119.x.x), Plane project management, Gitea at git.nxm.co.za. Your current capabilities (Phase 2): 1. Report agent status — last run time, success/failure, output summary 2. Log tasks to Plane project management 3. Answer questions about the infrastructure 4. Route complex questions to your SMART_MODEL brain You cannot yet execute tasks autonomously — that is Phase 3. When a user submits a task, log it to Plane and confirm. Be concise — the user is often on mobile. Use short markdown lists, not long paragraphs.""" # --- OpenAI-compatible request/response models --- class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): model: str = "jon-snow" messages: list[Message] stream: bool = True temperature: float | None = None max_tokens: int | None = None # --- Output helpers --- def _write_status(intent: str, summary: str, status: str = "success") -> None: log_dir = AGENT_OS_DIR / "logs" / "jon-snow" log_dir.mkdir(parents=True, exist_ok=True) payload = { "agent": "jon-snow", "timestamp": datetime.now(timezone.utc).isoformat(), "status": status, "result": f"[{intent}] {summary[:200]}", } (log_dir / "last-run.json").write_text(json.dumps(payload, indent=2)) out_dir = SITES_DIR / "jon-snow" out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "last-output.md").write_text( f"# Jon Snow — Last Response\n\n**{payload['timestamp']}**\n\nIntent: `{intent}`\n\n{summary[:500]}\n" ) # --- SSE streaming helpers --- def _sse_chunk(content: str, chunk_id: str) -> str: data = { "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": "jon-snow", "choices": [{"index": 0, "delta": {"content": content}, "finish_reason": None}], } return f"data: {json.dumps(data)}\n\n" def _sse_done(chunk_id: str) -> str: data = { "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": "jon-snow", "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], } return f"data: {json.dumps(data)}\n\ndata: [DONE]\n\n" async def _stream_text(text: str) -> AsyncGenerator[str, None]: chunk_id = f"chatcmpl-{int(time.time())}" # Emit word by word for a natural feel words = text.split(" ") for i, word in enumerate(words): token = word + (" " if i < len(words) - 1 else "") yield _sse_chunk(token, chunk_id) await asyncio.sleep(0.005) yield _sse_done(chunk_id) async def _stream_llm(messages: list[dict], use_smart: bool = False) -> AsyncGenerator[str, None]: chunk_id = f"chatcmpl-{int(time.time())}" collected = [] try: response = await stream_completion(messages, use_smart=use_smart) async for chunk in response: if chunk.choices and chunk.choices[0].delta.content: content = chunk.choices[0].delta.content collected.append(content) yield _sse_chunk(content, chunk_id) yield _sse_done(chunk_id) except Exception as e: logger.error(f"LLM stream error: {e}") error_msg = f"Error reaching LLM: {e}" async for part in _stream_text(error_msg): yield part collected.append(error_msg) return # --- Routes --- @app.get("/health") async def health(): return {"status": "ok", "agent": "jon-snow", "version": "0.2.0"} @app.get("/v1/models") async def list_models(): return { "object": "list", "data": [{"id": "jon-snow", "object": "model", "created": 0, "owned_by": "nxm"}], } @app.post("/v1/chat/completions") async def chat_completions(req: ChatRequest): user_message = next((m.content for m in reversed(req.messages) if m.role == "user"), "") intent = classify_intent(user_message) logger.info(f"intent={intent} msg={user_message[:100]!r}") messages = [{"role": "system", "content": SYSTEM_PROMPT}] messages += [{"role": m.role, "content": m.content} for m in req.messages] async def generate() -> AsyncGenerator[str, None]: summary = "" if intent == "status": status_ctx = get_all_agent_status(AGENT_OS_DIR) agent_name = extract_agent_name(user_message) if agent_name: output = get_agent_output(SITES_DIR, agent_name) if output: status_ctx += f"\n\n### {agent_name} last output:\n{output}" messages[0]["content"] += f"\n\n## Current Agent Status\n{status_ctx}" async for chunk in _stream_llm(messages, use_smart=False): yield chunk summary = f"Status query: {user_message[:100]}" elif intent == "task": project_hint = extract_project_name(user_message) title = user_message.strip() try: issue = await create_plane_issue(title, project_hint) response_text = ( f"Task logged to Plane.\n\n" f"**{issue['title']}** \n" f"Project: *{issue['project']}* | #{issue['sequence_id']}\n\n" f"I can't execute tasks yet (Phase 3). It's in the backlog." ) summary = f"Task created: {issue['title']}" except Exception as e: response_text = f"Couldn't log to Plane ({e}). Task noted locally: {user_message[:100]}" summary = f"Plane error: {e}" async for chunk in _stream_text(response_text): yield chunk else: # planning / general async for chunk in _stream_llm(messages, use_smart=True): yield chunk summary = f"Planning query: {user_message[:100]}" _write_status(intent, summary) if req.stream: return StreamingResponse(generate(), media_type="text/event-stream") # Non-streaming: collect full response full_text = "" async for chunk in generate(): if chunk.startswith("data: ") and "[DONE]" not in chunk: try: data = json.loads(chunk[6:]) token = data.get("choices", [{}])[0].get("delta", {}).get("content", "") full_text += token except Exception: pass return JSONResponse({ "id": f"chatcmpl-{int(time.time())}", "object": "chat.completion", "created": int(time.time()), "model": "jon-snow", "choices": [{"index": 0, "message": {"role": "assistant", "content": full_text}, "finish_reason": "stop"}], "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, })