import json import logging import os import smtplib from contextlib import asynccontextmanager from datetime import datetime, timezone from email.mime.text import MIMEText from pathlib import Path import httpx from fastapi import FastAPI from pydantic import BaseModel logging.basicConfig(level=logging.INFO, format="%(asctime)s [raven] %(message)s") logger = logging.getLogger("raven") DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL", "") SMTP_HOST = os.getenv("SMTP_HOST", "smtp.gmail.com") SMTP_PORT = int(os.getenv("SMTP_PORT", "587")) SMTP_USER = os.getenv("SMTP_USER", "") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "") SMTP_FROM = os.getenv("SMTP_FROM", "") SMTP_TO = os.getenv("SMTP_TO", "") SITES_DIR = Path(os.getenv("SITES_DIR", "/opt/sites")) AGENT_OS_DIR = Path(os.getenv("AGENT_OS_DIR", "/opt/agent-os")) PORT = int(os.getenv("PORT", "8400")) MAX_HISTORY = 50 SEVERITY_COLORS = {"critical": 15158332, "warning": 16776960, "info": 3066993} class NotifyRequest(BaseModel): message: str severity: str = "info" source: str = "unknown" def _history_path() -> Path: return SITES_DIR / "raven" / "history.json" def _load_history() -> list: p = _history_path() if not p.exists(): return [] try: return json.loads(p.read_text()) except Exception: return [] def _save_history(history: list): p = _history_path() p.parent.mkdir(parents=True, exist_ok=True) p.write_text(json.dumps(history[-MAX_HISTORY:], indent=2)) def _send_discord(message: str, severity: str, source: str, timestamp: str): if not DISCORD_WEBHOOK_URL: logger.warning("DISCORD_WEBHOOK_URL not set — skipping Discord") return color = SEVERITY_COLORS.get(severity.lower(), 3066993) payload = { "embeds": [{ "title": f"[{severity.upper()}] {source}", "description": message, "color": color, "timestamp": timestamp, "footer": {"text": "raven-notify · nxm.co.za"}, }] } try: r = httpx.post(DISCORD_WEBHOOK_URL, json=payload, timeout=10) r.raise_for_status() logger.info(f"discord: sent [{severity}] {source}") except Exception as e: logger.error(f"discord send failed: {e}") def _send_email(message: str, severity: str, source: str): if not all([SMTP_USER, SMTP_PASSWORD, SMTP_TO]): logger.warning("SMTP not configured — skipping email") return subject = f"[{severity.upper()}] {source} — {message[:80]}" body = f"Severity: {severity.upper()}\nSource: {source}\n\n{message}" msg = MIMEText(body) msg["Subject"] = subject msg["From"] = SMTP_FROM or SMTP_USER msg["To"] = SMTP_TO try: with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp: smtp.ehlo() smtp.starttls() smtp.login(SMTP_USER, SMTP_PASSWORD) smtp.sendmail(SMTP_FROM or SMTP_USER, [SMTP_TO], msg.as_string()) logger.info(f"email: sent [{severity}] {source}") except Exception as e: logger.error(f"email send failed: {e}") def _render_html(history: list, timestamp: str) -> str: rows = "" for entry in reversed(history[-20:]): sev = entry.get("severity", "info") colors = {"critical": "#f85149", "warning": "#d29922", "info": "#3fb950"} color = colors.get(sev, "#8b949e") ts = entry.get("timestamp", "") rows += f"""
Sends alerts and notifications via Discord and email. Triggered by Varys (service down/up) and Grafana (infrastructure alerts).
No notifications yet.
'} """ def _render_md(history: list, timestamp: str) -> str: now = timestamp[:16].replace("T", " ") + " UTC" lines = [f"# Raven — Notification Log\n\nUpdated: {now} | {len(history)} total\n"] for e in reversed(history[-10:]): ts = e.get("timestamp", "")[:16].replace("T", " ") + " UTC" lines.append(f"- [{e.get('severity','?').upper()}] {ts} · **{e.get('source','')}** — {e.get('message','')}") return "\n".join(lines) def _write_status(history: list, last_event: dict = None): timestamp = datetime.now(timezone.utc).isoformat() out_dir = SITES_DIR / "raven" out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "index.html").write_text(_render_html(history, timestamp)) (out_dir / "last-output.md").write_text(_render_md(history, timestamp)) log_dir = AGENT_OS_DIR / "logs" / "raven-notify" log_dir.mkdir(parents=True, exist_ok=True) last_msg = ( f"last: [{last_event['severity']}] {last_event['source']} — {last_event['message'][:50]}" if last_event else "running, no notifications yet" ) (log_dir / "last-run.json").write_text(json.dumps({ "agent": "raven-notify", "timestamp": timestamp, "status": "success", "result": f"{len(history)} notifications total; {last_msg}", }, indent=2)) def _do_notify(message: str, severity: str, source: str) -> dict: timestamp = datetime.now(timezone.utc).isoformat() severity = severity.lower() entry = {"timestamp": timestamp, "severity": severity, "source": source, "message": message} history = _load_history() history.append(entry) _save_history(history) _send_discord(message, severity, source, timestamp) _send_email(message, severity, source) _write_status(history, entry) logger.info(f"notified [{severity}] {source}: {message[:80]}") return {"ok": True, "timestamp": timestamp} @asynccontextmanager async def lifespan(app: FastAPI): history = _load_history() _write_status(history) logger.info(f"raven-notify started on port {PORT} — {len(history)} notifications in history") yield app = FastAPI(title="raven-notify", lifespan=lifespan) @app.get("/health") def health(): return {"status": "ok", "agent": "raven-notify"} @app.post("/notify") def handle_notify(req: NotifyRequest): return _do_notify(req.message, req.severity, req.source) @app.post("/webhook/grafana") def handle_grafana(payload: dict): status = payload.get("status", "firing") alerts = payload.get("alerts", []) if alerts: for alert in alerts: name = alert.get("labels", {}).get("alertname", "unknown") msg = alert.get("annotations", {}).get("description", name) sev = "critical" if alert.get("status", status) == "firing" else "info" _do_notify(msg, sev, f"grafana/{name}") return {"ok": True, "processed": len(alerts)} msg = payload.get("message", "") or payload.get("title", "Grafana Alert") sev = "critical" if status == "firing" else "info" return _do_notify(msg, sev, "grafana") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=PORT)