import asyncio import json import logging import os from pathlib import Path import httpx logger = logging.getLogger("jon-snow.tools") PLANE_URL = os.getenv("PLANE_URL", "http://172.27.40.3:8095") PLANE_WORKSPACE = os.getenv("PLANE_WORKSPACE", "nxm") PLANE_API_KEY = os.getenv("PLANE_API_KEY", "") RAVEN_URL = os.getenv("RAVEN_URL", "http://raven-notify:8400") QYBURN_URL = os.getenv("QYBURN_URL", "http://qyburn-coder:8700") AGENT_FULL_NAMES = [ "hodor-gateway", "bran-changelog", "varys-monitor", "sam-research", "raven-notify", "qyburn-coder", "citadel-mcp", "jon-snow", ] SHORT_TO_FULL = { "hodor": "hodor-gateway", "bran": "bran-changelog", "varys": "varys-monitor", "sam": "sam-research", "raven": "raven-notify", "qyburn": "qyburn-coder", "citadel": "citadel-mcp", "jon": "jon-snow", } def get_all_agent_status(agent_os_dir: Path) -> str: logs_dir = agent_os_dir / "logs" if not logs_dir.exists(): return "Agent log directory not found." lines = [] for full_name in AGENT_FULL_NAMES: log_file = logs_dir / full_name / "last-run.json" if log_file.exists(): try: data = json.loads(log_file.read_text()) ts = data.get("timestamp", "unknown")[:19].replace("T", " ") status = data.get("status", "unknown") result = data.get("result", "")[:120] icon = "OK" if status == "success" else "FAIL" lines.append(f"- **{full_name}**: [{icon}] {ts} UTC — {result}") except Exception as e: lines.append(f"- **{full_name}**: error reading log ({e})") else: lines.append(f"- **{full_name}**: no log (never run or not yet deployed)") return "\n".join(lines) def get_agent_output(sites_dir: Path, agent_name: str) -> str | None: short_name = agent_name if agent_name in SHORT_TO_FULL else next( (k for k, v in SHORT_TO_FULL.items() if v == agent_name), agent_name ) output_file = sites_dir / short_name / "last-output.md" if not output_file.exists(): return None content = output_file.read_text() return content[:3000] def _make_identifier(name: str) -> str: import re return re.sub(r"[^A-Z0-9]", "", name.upper())[:10] or "PROJ" async def create_plane_project(name: str) -> dict: identifier = _make_identifier(name) headers = {"X-Api-Key": PLANE_API_KEY, "Content-Type": "application/json"} async with httpx.AsyncClient() as client: resp = await client.post( f"{PLANE_URL}/api/v1/workspaces/{PLANE_WORKSPACE}/projects/", headers=headers, json={"name": name, "identifier": identifier, "network": 2}, timeout=10, ) if resp.status_code == 400: data = resp.json() raise ValueError(data.get("identifier", [data])[0] if "identifier" in data else str(data)) resp.raise_for_status() data = resp.json() return {"name": data["name"], "identifier": data["identifier"]} async def get_plane_projects() -> list[dict]: headers = {"X-Api-Key": PLANE_API_KEY} async with httpx.AsyncClient() as client: resp = await client.get( f"{PLANE_URL}/api/v1/workspaces/{PLANE_WORKSPACE}/projects/", headers=headers, timeout=10, ) resp.raise_for_status() return resp.json().get("results", []) async def get_plane_issues(project_hint: str) -> tuple[str, list[dict]]: """Return (project_name, issues). Resolves project by name/identifier fragment.""" projects = await get_plane_projects() hint = project_hint.lower() project = None for p in projects: if p.get("identifier", "").lower() == hint: project = p break if not project: matches = [p for p in projects if hint in p["name"].lower()] if matches: project = matches[0] if not project: return ("", []) headers = {"X-Api-Key": PLANE_API_KEY} async with httpx.AsyncClient() as client: states_resp, issues_resp = await asyncio.gather( client.get( f"{PLANE_URL}/api/v1/workspaces/{PLANE_WORKSPACE}/projects/{project['id']}/states/", headers=headers, timeout=10, ), client.get( f"{PLANE_URL}/api/v1/workspaces/{PLANE_WORKSPACE}/projects/{project['id']}/issues/", headers=headers, timeout=10, ), ) states_resp.raise_for_status() issues_resp.raise_for_status() state_map = {s["id"]: s["name"] for s in states_resp.json().get("results", [])} issues = [ { "title": i.get("name", ""), "state": state_map.get(i.get("state", ""), ""), "priority": i.get("priority", "none"), } for i in issues_resp.json().get("results", []) ] return (project["name"], issues) def _resolve_project(projects: list[dict], hint: str | None) -> dict: if hint: hint_lower = hint.lower() for p in projects: if hint_lower in p["name"].lower(): return p # Default: Agent Ecosystem for p in projects: if "agent" in p["name"].lower(): return p return projects[0] async def create_plane_issue(title: str, project_hint: str | None = None) -> dict: projects = await get_plane_projects() if not projects: raise ValueError("No Plane projects found") project = _resolve_project(projects, project_hint) headers = {"X-Api-Key": PLANE_API_KEY, "Content-Type": "application/json"} async with httpx.AsyncClient() as client: resp = await client.post( f"{PLANE_URL}/api/v1/workspaces/{PLANE_WORKSPACE}/projects/{project['id']}/issues/", headers=headers, json={"name": title[:255], "priority": "none"}, timeout=10, ) resp.raise_for_status() issue = resp.json() return { "sequence_id": issue.get("sequence_id", "?"), "title": title[:80], "project": project["name"], } # --------------------------------------------------------------------------- # Approval gate helpers # --------------------------------------------------------------------------- async def notify_raven_with_actions( description: str, approve_url: str, reject_url: str, action_type: str, ) -> None: """Send a Discord approval request via Raven's /notify-with-actions endpoint.""" payload = { "description": description, "approve_url": approve_url, "reject_url": reject_url, "action_type": action_type, "source": "jon-snow", } async with httpx.AsyncClient() as client: try: resp = await client.post( f"{RAVEN_URL}/notify-with-actions", json=payload, timeout=10, ) resp.raise_for_status() logger.info(f"raven notified for action: {action_type}") except Exception as e: logger.error(f"raven notify failed: {e}") async def call_qyburn_rebuild(stack_name: str) -> dict: """Trigger a docker rebuild (compose up -d --build) via Qyburn.""" async with httpx.AsyncClient() as client: resp = await client.post( f"{QYBURN_URL}/rebuild/{stack_name}", timeout=300, # builds can take a while ) resp.raise_for_status() return resp.json() async def call_qyburn_restart(stack_name: str) -> dict: """Trigger a docker restart (compose restart) via Qyburn.""" async with httpx.AsyncClient() as client: resp = await client.post( f"{QYBURN_URL}/restart/{stack_name}", timeout=120, ) resp.raise_for_status() return resp.json()