import asyncio import json import logging import os import re import time from datetime import datetime, timezone from pathlib import Path from typing import AsyncGenerator from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from pydantic import BaseModel from .approval import cleanup_expired, generate_token, pop_action, queue_action, verify_token from .brain import stream_completion from .intent import ( classify_intent, extract_agent_name, extract_compound_task, extract_execute_target, extract_new_project_name, extract_project_name, extract_task_destination, extract_task_title, is_issue_query, ) from .tools import ( call_qyburn_rebuild, call_qyburn_restart, create_plane_issue, create_plane_project, get_agent_output, get_all_agent_status, get_plane_issues, get_plane_projects, notify_raven_with_actions, ) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") logger = logging.getLogger("jon-snow") AGENT_OS_DIR = Path(os.getenv("AGENT_OS_DIR", "/opt/agent-os")) SITES_DIR = Path(os.getenv("SITES_DIR", "/opt/sites")) JON_PUBLIC_URL = os.getenv("JON_PUBLIC_URL", "https://jon.nxm.co.za") app = FastAPI(title="Jon Snow", version="0.3.0") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) SYSTEM_PROMPT = """You are Jon Snow, chief of staff for the NxM home lab agent ecosystem on a self-hosted Linux server (172.27.40.3). Live agents you coordinate: - hodor (8200): HTTP gateway — routes requests to Ollama - bran: Daily changelog summariser — runs 06:00 SAST, writes to /opt/sites/changelog/ - varys: Infrastructure monitor — runs every 15 min, HTTP health checks + agent watchdog - sam (8500): Research agent — SearXNG + Ollama synthesis - raven (8400): Notifications — Discord webhook + Gmail SMTP - qyburn (8700): LLM coding agent — qwen2.5-coder:14b, approve/reject workflow - citadel (8300): MCP tool registry — 21 tools including Plane integration - hermes: Cloud intelligence agent — claude-sonnet-4-6 brain, connected to Citadel read-only Infrastructure: Ubuntu Docker host 172.27.40.3, Ollama at 172.27.40.20:11434, Headscale VPN, Plane project management, Gitea at git.nxm.co.za. Your capabilities (Phase 3): 1. Report agent status — last run time, success/failure, output summary 2. List Plane projects and log tasks to Plane project management 3. Answer questions about the infrastructure 4. Route complex questions to your SMART_MODEL brain 5. Queue execution actions (restart/rebuild agents) — sends approval request via Discord before executing For execution requests (restart/rebuild/redeploy), always queue for approval — never execute directly. Be concise — the user is often on mobile. Use short markdown lists, not long paragraphs.""" # --- OpenAI-compatible request/response models --- class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): model: str = "jon-snow" messages: list[Message] stream: bool = True temperature: float | None = None max_tokens: int | None = None class QueueActionRequest(BaseModel): description: str action_type: str params: dict # --- Output helpers --- def _write_status(intent: str, summary: str, status: str = "success") -> None: log_dir = AGENT_OS_DIR / "logs" / "jon-snow" log_dir.mkdir(parents=True, exist_ok=True) payload = { "agent": "jon-snow", "timestamp": datetime.now(timezone.utc).isoformat(), "status": status, "result": f"[{intent}] {summary[:200]}", } (log_dir / "last-run.json").write_text(json.dumps(payload, indent=2)) # Rolling 50-run history runs_file = log_dir / "runs.jsonl" try: existing = runs_file.read_text().splitlines() if runs_file.exists() else [] existing.append(json.dumps(payload)) runs_file.write_text("\n".join(existing[-50:]) + "\n") except Exception: pass out_dir = SITES_DIR / "jon-snow" out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "last-output.md").write_text( f"# Jon Snow — Last Response\n\n**{payload['timestamp']}**\n\nIntent: `{intent}`\n\n{summary[:500]}\n" ) # --- SSE streaming helpers --- def _sse_chunk(content: str, chunk_id: str) -> str: data = { "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": "jon-snow", "choices": [{"index": 0, "delta": {"content": content}, "finish_reason": None}], } return f"data: {json.dumps(data)}\n\n" def _sse_done(chunk_id: str) -> str: data = { "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": "jon-snow", "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], } return f"data: {json.dumps(data)}\n\ndata: [DONE]\n\n" async def _stream_text(text: str) -> AsyncGenerator[str, None]: chunk_id = f"chatcmpl-{int(time.time())}" # Emit word by word for a natural feel words = text.split(" ") for i, word in enumerate(words): token = word + (" " if i < len(words) - 1 else "") yield _sse_chunk(token, chunk_id) await asyncio.sleep(0.005) yield _sse_done(chunk_id) async def _stream_llm(messages: list[dict], use_smart: bool = False) -> AsyncGenerator[str, None]: chunk_id = f"chatcmpl-{int(time.time())}" collected = [] try: response = await stream_completion(messages, use_smart=use_smart) async for chunk in response: if chunk.choices and chunk.choices[0].delta.content: content = chunk.choices[0].delta.content collected.append(content) yield _sse_chunk(content, chunk_id) yield _sse_done(chunk_id) except Exception as e: logger.error(f"LLM stream error: {e}") error_msg = f"Error reaching LLM: {e}" async for part in _stream_text(error_msg): yield part collected.append(error_msg) return # --- Routes --- @app.get("/health") async def health(): return {"status": "ok", "agent": "jon-snow", "version": "0.3.0"} @app.get("/v1/models") async def list_models(): return { "object": "list", "data": [{"id": "jon-snow", "object": "model", "created": 0, "owned_by": "nxm"}], } def _html_page(title: str, icon: str, heading: str, body: str, color: str = "#3fb950") -> HTMLResponse: """Render a simple mobile-friendly result page.""" html = f""" {title} — Jon Snow
{icon}

{heading}

{body}

jon-snow · nxm.co.za
""" return HTMLResponse(html) @app.get("/approve/{token}") async def approve_action(token: str): """Public endpoint — called when user clicks Approve in Discord. Executes the queued action.""" cleanup_expired() try: action_id, purpose = verify_token(token) except ValueError as e: return _html_page("Error", "⚠️", "Token Invalid", f"{e}. The link may have expired (15 min) or already been used.", color="#f85149") if purpose != "approve": return _html_page("Error", "⚠️", "Wrong Link", "This doesn't look like an approval link.", color="#f85149") action = pop_action(action_id) if not action: return _html_page("Not Found", "🔍", "Action Not Found", "This action has already been executed, rejected, or expired.", color="#d29922") logger.info(f"approved: {action.action_type} — {action.description}") try: if action.action_type == "docker_rebuild": result = await call_qyburn_rebuild(action.params["stack_name"]) ok = result.get("ok", False) if ok: return _html_page("Approved", "✅", "Rebuilding", f"{action.params['stack_name']} is rebuilding. " f"It will be back online in ~30 seconds.") else: return _html_page("Failed", "❌", "Rebuild Failed", f"Could not rebuild {action.params['stack_name']}: " f"{result.get('stderr','unknown error')[:200]}", color="#f85149") elif action.action_type == "docker_restart": result = await call_qyburn_restart(action.params["stack_name"]) ok = result.get("ok", False) if ok: return _html_page("Approved", "✅", "Restarted", f"{action.params['stack_name']} has been restarted successfully.") else: return _html_page("Failed", "❌", "Restart Failed", f"Could not restart {action.params['stack_name']}: " f"{result.get('stderr','unknown error')[:200]}", color="#f85149") elif action.action_type == "file_write": path = action.params.get("path", "") content = action.params.get("content", "") safe_path = re.sub(r"\.\.", "", path).lstrip("/") target = SITES_DIR / safe_path target.parent.mkdir(parents=True, exist_ok=True) target.write_text(content) return _html_page("Approved", "✅", "File Written", f"/opt/sites/{safe_path} has been updated.") else: return _html_page("Error", "⚠️", "Unknown Action", f"Action type '{action.action_type}' is not supported.", color="#f85149") except Exception as e: logger.error(f"action execution failed: {e}") return _html_page("Error", "❌", "Execution Failed", f"Something went wrong: {str(e)[:200]}", color="#f85149") @app.get("/reject/{token}") async def reject_action(token: str): """Public endpoint — called when user clicks Reject in Discord. Discards the queued action.""" cleanup_expired() try: action_id, purpose = verify_token(token) except ValueError as e: return _html_page("Error", "⚠️", "Token Invalid", f"{e}. The link may have expired (15 min) or already been used.", color="#f85149") if purpose != "reject": return _html_page("Error", "⚠️", "Wrong Link", "This doesn't look like a rejection link.", color="#f85149") action = pop_action(action_id) if not action: return _html_page("Not Found", "🔍", "Already Handled", "This action has already been executed, rejected, or expired.", color="#d29922") logger.info(f"rejected: {action.action_type} — {action.description}") return _html_page("Rejected", "❌", "Action Cancelled", f"{action.description} was rejected and will not be executed.", color="#d29922") @app.post("/internal/queue-action") async def internal_queue_action(req: QueueActionRequest): """Internal endpoint — called by Citadel's propose_file_change tool to queue a file write for approval.""" action = queue_action( description=req.description, action_type=req.action_type, params=req.params, ) approve_url = f"{JON_PUBLIC_URL}/approve/{generate_token(action.id, 'approve')}" reject_url = f"{JON_PUBLIC_URL}/reject/{generate_token(action.id, 'reject')}" await notify_raven_with_actions( description=req.description, approve_url=approve_url, reject_url=reject_url, action_type=req.action_type, ) logger.info(f"queued from citadel: {req.action_type} — {req.description}") return { "ok": True, "action_id": action.id, "approve_url": approve_url, "reject_url": reject_url, } @app.post("/v1/chat/completions") async def chat_completions(req: ChatRequest): user_message = next((m.content for m in reversed(req.messages) if m.role == "user"), "") intent = classify_intent(user_message) logger.info(f"intent={intent} msg={user_message[:100]!r}") messages = [{"role": "system", "content": SYSTEM_PROMPT}] messages += [{"role": m.role, "content": m.content} for m in req.messages] async def generate() -> AsyncGenerator[str, None]: summary = "" if intent == "status": status_ctx = get_all_agent_status(AGENT_OS_DIR) agent_name = extract_agent_name(user_message) if agent_name: output = get_agent_output(SITES_DIR, agent_name) if output: status_ctx += f"\n\n### {agent_name} last output:\n{output}" messages[0]["content"] += f"\n\n## Current Agent Status\n{status_ctx}" async for chunk in _stream_llm(messages, use_smart=False): yield chunk summary = f"Status query: {user_message[:100]}" elif intent == "execute": target = extract_execute_target(user_message) if not target: response_text = "I couldn't identify what to execute. Try: _restart citadel_ or _rebuild raven_." summary = "execute: no target extracted" else: action_type, stack_name = target verb = "Restart" if action_type == "docker_restart" else "Rebuild" description = f"{verb} {stack_name}" action = queue_action( description=description, action_type=action_type, params={"stack_name": stack_name}, ) approve_url = f"{JON_PUBLIC_URL}/approve/{generate_token(action.id, 'approve')}" reject_url = f"{JON_PUBLIC_URL}/reject/{generate_token(action.id, 'reject')}" await notify_raven_with_actions( description=description, approve_url=approve_url, reject_url=reject_url, action_type=action_type, ) response_text = ( f"⚡ Approval requested for: **{description}**\n\n" f"A Discord notification has been sent with approve/reject links.\n" f"Token expires in 15 minutes." ) summary = f"execute queued: {description}" async for chunk in _stream_text(response_text): yield chunk elif intent == "create_project": name = extract_new_project_name(user_message) if not name: response_text = "What should the project be called?" summary = "create_project: no name extracted" else: try: project = await create_plane_project(name) response_text = ( f"Project created.\n\n" f"**{project['name']}** (`{project['identifier']}`)" ) summary = f"Created project: {project['name']}" # Handle compound: "create project X and add [task]" task_title = extract_compound_task(user_message) if task_title: issue = await create_plane_issue(task_title, project["name"]) response_text += ( f"\n\n**{issue['title']}** added.\n" f"Project: *{issue['project']}* | #{issue['sequence_id']}" ) summary += f" + issue: {task_title[:60]}" else: response_text += f"\n\nAdd issues with: _add [task] to {project['name']}_" except Exception as e: response_text = f"Couldn't create project: {e}" summary = f"create_project error: {e}" async for chunk in _stream_text(response_text): yield chunk elif intent == "query": try: if is_issue_query(user_message): # Match against live project list — most robust, no static keywords needed projects = await get_plane_projects() msg_lower = user_message.lower() project = None # Longest-name-first so "General / Admin" beats "General" for p in sorted(projects, key=lambda x: len(x["name"]), reverse=True): name_words = p["name"].lower().replace("/", " ").replace("-", " ") # Match if all significant words of the project name appear in the message sig_words = [w for w in name_words.split() if len(w) > 2] if sig_words and all(w in msg_lower for w in sig_words): project = p break # Or if the identifier appears if p.get("identifier", "").lower() in msg_lower: project = p break if not project: # Fall back to static keyword map hint = extract_project_name(user_message) if hint: _, _ = await get_plane_issues(hint) # resolve via tools project_name_fb, issues_fb = await get_plane_issues(hint) if project_name_fb: project = {"id": None, "name": project_name_fb} if not project: names = "\n".join(f"- {p['name']}" for p in projects) response_text = f"Which project? Available:\n\n{names}" else: project_name, issues = await get_plane_issues(project["name"]) if not issues: response_text = f"**{project['name']}** — no issues found." else: PRIORITY_ICON = {"urgent": "🔴", "high": "🟠", "medium": "🟡", "low": "🔵", "none": "⚪"} lines = "\n".join( f"- [{i['state']}] {PRIORITY_ICON.get(i['priority'], '')} {i['title']}" for i in issues ) response_text = f"**{project['name']} — {len(issues)} issue(s):**\n\n{lines}" summary = f"Issues query: {user_message[:80]}" else: projects = await get_plane_projects() lines = "\n".join( f"- **{p.get('identifier', '?')}** — {p['name']}" for p in projects ) response_text = f"**Plane Projects ({len(projects)}):**\n\n{lines}" summary = f"Listed {len(projects)} Plane projects" except Exception as e: response_text = f"Couldn't reach Plane: {e}" summary = f"Plane query error: {e}" async for chunk in _stream_text(response_text): yield chunk elif intent == "task": # Live destination match first, static keyword map as fallback project_hint = extract_task_destination(user_message) or extract_project_name(user_message) title = extract_task_title(user_message) try: issue = await create_plane_issue(title, project_hint) response_text = ( f"Task logged to Plane.\n\n" f"**{issue['title']}** \n" f"Project: *{issue['project']}* | #{issue['sequence_id']}" ) summary = f"Task created: {issue['title']}" except Exception as e: response_text = f"Couldn't log to Plane ({e}). Task noted locally: {user_message[:100]}" summary = f"Plane error: {e}" async for chunk in _stream_text(response_text): yield chunk else: # planning / general async for chunk in _stream_llm(messages, use_smart=True): yield chunk summary = f"Planning query: {user_message[:100]}" _write_status(intent, summary) if req.stream: return StreamingResponse(generate(), media_type="text/event-stream") # Non-streaming: collect full response full_text = "" async for chunk in generate(): if chunk.startswith("data: ") and "[DONE]" not in chunk: try: data = json.loads(chunk[6:]) token = data.get("choices", [{}])[0].get("delta", {}).get("content", "") full_text += token except Exception: pass return JSONResponse({ "id": f"chatcmpl-{int(time.time())}", "object": "chat.completion", "created": int(time.time()), "model": "jon-snow", "choices": [{"index": 0, "message": {"role": "assistant", "content": full_text}, "finish_reason": "stop"}], "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, })