import json import logging import os import time from datetime import datetime, timezone from pathlib import Path import httpx import yaml logging.basicConfig(level=logging.INFO, format="%(asctime)s [varys] %(message)s") logger = logging.getLogger("varys") SITES_DIR = Path(os.getenv("SITES_DIR", "/opt/sites")) AGENT_OS_DIR = Path(os.getenv("AGENT_OS_DIR", "/opt/agent-os")) RAVEN_URL = os.getenv("RAVEN_URL", "") CONFIG_FILE = Path(__file__).parent / "config.yaml" def _load_prev_states() -> dict: path = AGENT_OS_DIR / "logs" / "varys-monitor" / "service-states.json" if not path.exists(): return {"services": {}, "agents": {}} try: data = json.loads(path.read_text()) # migrate old flat format (services only) if "services" not in data: return {"services": data, "agents": {}} return data except Exception: return {"services": {}, "agents": {}} def _save_states(services: list, agents: list): path = AGENT_OS_DIR / "logs" / "varys-monitor" / "service-states.json" path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps({ "services": {s["name"]: s["status"] for s in services}, "agents": {a["name"]: a["status"] for a in agents}, })) def _notify_raven(message: str, severity: str): if not RAVEN_URL: return try: httpx.post(f"{RAVEN_URL}/notify", json={"message": message, "severity": severity, "source": "varys"}, timeout=5) logger.info(f"raven notified: {message}") except Exception as e: logger.warning(f"raven notify failed (raven not live yet?): {e}") def check_service(name: str, url: str) -> dict: start = time.monotonic() try: # stream=True so we get the status code without reading the body (handles SSE endpoints) with httpx.stream("GET", url, timeout=5, verify=False, follow_redirects=True) as r: ms = int((time.monotonic() - start) * 1000) status = "up" if r.status_code < 500 else "degraded" return {"name": name, "status": status, "code": r.status_code, "ms": ms} except Exception: ms = int((time.monotonic() - start) * 1000) return {"name": name, "status": "down", "code": None, "ms": ms} def read_agent_status(name: str) -> dict: path = AGENT_OS_DIR / "logs" / name / "last-run.json" if not path.exists(): return {"name": name, "status": "unknown", "timestamp": None, "result": "no data"} try: data = json.loads(path.read_text()) return { "name": name, "status": data.get("status", "unknown"), "timestamp": data.get("timestamp", ""), "result": data.get("result", ""), } except Exception as e: return {"name": name, "status": "error", "timestamp": None, "result": str(e)} def render_html(services: list, agents: list) -> str: now_iso = datetime.now(timezone.utc).isoformat() SERVICE_COLOURS = {"up": "#3fb950", "degraded": "#d29922", "down": "#f85149"} AGENT_COLOURS = {"success": "#3fb950", "failure": "#f85149"} service_cards = "" for s in services: colour = SERVICE_COLOURS.get(s["status"], "#8b949e") label = s["status"].upper() ms_text = f"{s['ms']} ms" if s["status"] != "down" else "—" service_cards += f"""
Checks HTTP reachability for all services in the NxM stack and monitors agent run status. Refreshes every 15 minutes.