import json import logging import os import smtplib from contextlib import asynccontextmanager from datetime import datetime, timezone from email.mime.text import MIMEText from pathlib import Path import httpx from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel logging.basicConfig(level=logging.INFO, format="%(asctime)s [raven] %(message)s") logger = logging.getLogger("raven") DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL", "") SMTP_HOST = os.getenv("SMTP_HOST", "smtp.gmail.com") SMTP_PORT = int(os.getenv("SMTP_PORT", "587")) SMTP_USER = os.getenv("SMTP_USER", "") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "") SMTP_FROM = os.getenv("SMTP_FROM", "") SMTP_TO = os.getenv("SMTP_TO", "") SITES_DIR = Path(os.getenv("SITES_DIR", "/opt/sites")) AGENT_OS_DIR = Path(os.getenv("AGENT_OS_DIR", "/opt/agent-os")) PORT = int(os.getenv("PORT", "8400")) MAX_HISTORY = 50 SEVERITY_COLORS = {"critical": 15158332, "warning": 16776960, "info": 3066993} def _state_path() -> Path: return SITES_DIR / "raven" / "state.json" def _load_state() -> dict: p = _state_path() if not p.exists(): return {"email_enabled": True} try: return json.loads(p.read_text()) except Exception: return {"email_enabled": True} def _save_state(state: dict): p = _state_path() p.parent.mkdir(parents=True, exist_ok=True) p.write_text(json.dumps(state, indent=2)) class NotifyRequest(BaseModel): message: str severity: str = "info" source: str = "unknown" def _history_path() -> Path: return SITES_DIR / "raven" / "history.json" def _load_history() -> list: p = _history_path() if not p.exists(): return [] try: return json.loads(p.read_text()) except Exception: return [] def _save_history(history: list): p = _history_path() p.parent.mkdir(parents=True, exist_ok=True) p.write_text(json.dumps(history[-MAX_HISTORY:], indent=2)) def _send_discord(message: str, severity: str, source: str, timestamp: str): if not DISCORD_WEBHOOK_URL: logger.warning("DISCORD_WEBHOOK_URL not set — skipping Discord") return color = SEVERITY_COLORS.get(severity.lower(), 3066993) payload = { "embeds": [{ "title": f"[{severity.upper()}] {source}", "description": message, "color": color, "timestamp": timestamp, "footer": {"text": "raven-notify · nxm.co.za"}, }] } try: r = httpx.post(DISCORD_WEBHOOK_URL, json=payload, timeout=10) r.raise_for_status() logger.info(f"discord: sent [{severity}] {source}") except Exception as e: logger.error(f"discord send failed: {e}") def _send_email(message: str, severity: str, source: str): if not _load_state().get("email_enabled", True): logger.info("email: disabled by toggle — skipping") return if not all([SMTP_USER, SMTP_PASSWORD, SMTP_TO]): logger.warning("SMTP not configured — skipping email") return subject = f"[{severity.upper()}] {source} — {message[:80]}" body = f"Severity: {severity.upper()}\nSource: {source}\n\n{message}" msg = MIMEText(body) msg["Subject"] = subject msg["From"] = SMTP_FROM or SMTP_USER msg["To"] = SMTP_TO try: with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp: smtp.ehlo() smtp.starttls() smtp.login(SMTP_USER, SMTP_PASSWORD) smtp.sendmail(SMTP_FROM or SMTP_USER, [SMTP_TO], msg.as_string()) logger.info(f"email: sent [{severity}] {source}") except Exception as e: logger.error(f"email send failed: {e}") def _render_html(history: list, timestamp: str, email_enabled: bool = True) -> str: rows = "" for entry in reversed(history[-20:]): sev = entry.get("severity", "info") colors = {"critical": "#f85149", "warning": "#d29922", "info": "#3fb950"} color = colors.get(sev, "#8b949e") ts = entry.get("timestamp", "") rows += f""" {sev.upper()} {entry.get('source', '')} {entry.get('message', '')} """ total = len(history) email_color = "#3fb950" if email_enabled else "#f85149" email_label = "ENABLED" if email_enabled else "DISABLED" btn_label = "Disable Email" if email_enabled else "Enable Email" return f""" Raven — Notifications

Raven — Notifications

Sends alerts and notifications via Discord and email. Triggered by Varys (service down/up) and Grafana (infrastructure alerts).

Updated  ·  {total} total

Email alerts: {email_label}

Recent Notifications (last 20)

{"" + rows + "
" if rows else '

No notifications yet.

'}
""" def _render_md(history: list, timestamp: str) -> str: now = timestamp[:16].replace("T", " ") + " UTC" lines = [f"# Raven — Notification Log\n\nUpdated: {now} | {len(history)} total\n"] for e in reversed(history[-10:]): ts = e.get("timestamp", "")[:16].replace("T", " ") + " UTC" lines.append(f"- [{e.get('severity','?').upper()}] {ts} · **{e.get('source','')}** — {e.get('message','')}") return "\n".join(lines) def _write_status(history: list, last_event: dict = None): timestamp = datetime.now(timezone.utc).isoformat() email_enabled = _load_state().get("email_enabled", True) out_dir = SITES_DIR / "raven" out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "index.html").write_text(_render_html(history, timestamp, email_enabled)) (out_dir / "last-output.md").write_text(_render_md(history, timestamp)) log_dir = AGENT_OS_DIR / "logs" / "raven-notify" log_dir.mkdir(parents=True, exist_ok=True) last_msg = ( f"last: [{last_event['severity']}] {last_event['source']} — {last_event['message'][:50]}" if last_event else "running, no notifications yet" ) (log_dir / "last-run.json").write_text(json.dumps({ "agent": "raven-notify", "timestamp": timestamp, "status": "success", "result": f"{len(history)} notifications total; {last_msg}", }, indent=2)) def _do_notify(message: str, severity: str, source: str) -> dict: timestamp = datetime.now(timezone.utc).isoformat() severity = severity.lower() entry = {"timestamp": timestamp, "severity": severity, "source": source, "message": message} history = _load_history() history.append(entry) _save_history(history) _send_discord(message, severity, source, timestamp) _send_email(message, severity, source) _write_status(history, entry) logger.info(f"notified [{severity}] {source}: {message[:80]}") return {"ok": True, "timestamp": timestamp} @asynccontextmanager async def lifespan(app: FastAPI): history = _load_history() _write_status(history) logger.info(f"raven-notify started on port {PORT} — {len(history)} notifications in history") yield app = FastAPI(title="raven-notify", lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["POST"], allow_headers=["*"], ) @app.get("/health") def health(): return {"status": "ok", "agent": "raven-notify"} @app.post("/notify") def handle_notify(req: NotifyRequest): return _do_notify(req.message, req.severity, req.source) @app.post("/email/toggle") def toggle_email(): state = _load_state() state["email_enabled"] = not state.get("email_enabled", True) _save_state(state) _write_status(_load_history()) logger.info(f"email alerts {'enabled' if state['email_enabled'] else 'disabled'} via toggle") return {"ok": True, "email_enabled": state["email_enabled"]} @app.post("/webhook/grafana") def handle_grafana(payload: dict): status = payload.get("status", "firing") alerts = payload.get("alerts", []) if alerts: for alert in alerts: alert_status = alert.get("status", status) name = alert.get("labels", {}).get("alertname", "unknown") if alert_status == "resolved": # Grafana always sends the firing annotation text in resolved payloads — # override with a clear recovery message so Discord/email isn't confusing. msg = f"✅ RESOLVED: {name} — alert has cleared, service is back online." sev = "info" else: msg = alert.get("annotations", {}).get("description", name) sev = "critical" _do_notify(msg, sev, f"grafana/{name}") return {"ok": True, "processed": len(alerts)} msg = payload.get("message", "") or payload.get("title", "Grafana Alert") sev = "critical" if status == "firing" else "info" return _do_notify(msg, sev, "grafana") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=PORT) # --------------------------------------------------------------------------- # Jon Snow approval gate — action notification endpoint # --------------------------------------------------------------------------- class ActionNotifyRequest(BaseModel): description: str approve_url: str reject_url: str action_type: str source: str = "jon-snow" @app.post("/notify-with-actions") def handle_action_notify(req: ActionNotifyRequest): """Send a Discord embed with clickable approve/reject links for Jon Snow's approval gate.""" if not DISCORD_WEBHOOK_URL: logger.warning("DISCORD_WEBHOOK_URL not set — skipping Discord") return {"ok": False, "error": "no webhook configured"} payload = { "embeds": [{ "title": "⚡ Action Approval Required", "description": f"**{req.source}** wants to: `{req.description}`", "color": 16776960, # yellow "fields": [ {"name": "Action Type", "value": f"`{req.action_type}`", "inline": True}, {"name": "Expires", "value": "15 minutes", "inline": True}, ], "footer": {"text": "jon-snow approval gate · nxm.co.za"}, }], "content": f"✅ **Approve:** {req.approve_url}\n❌ **Reject:** {req.reject_url}", } try: r = httpx.post(DISCORD_WEBHOOK_URL, json=payload, timeout=10) r.raise_for_status() logger.info(f"discord: action approval request sent for {req.action_type}") return {"ok": True} except Exception as e: logger.error(f"discord action notify failed: {e}") return {"ok": False, "error": str(e)}