"""Jon Snow approval gate — token management and pending action store.""" import base64 import hashlib import hmac import os import time import uuid from dataclasses import dataclass, field from typing import Optional JON_SECRET = os.getenv("JON_SECRET", "change-me") TOKEN_TTL = 900 # 15 minutes @dataclass class PendingAction: id: str description: str action_type: str # "docker_rebuild", "docker_restart", "file_write" params: dict plane_issue_id: Optional[str] = None plane_project_id: Optional[str] = None created_at: float = field(default_factory=time.time) _pending: dict[str, PendingAction] = {} _used_tokens: set[str] = set() def queue_action( description: str, action_type: str, params: dict, plane_issue_id: Optional[str] = None, plane_project_id: Optional[str] = None, ) -> PendingAction: """Create and store a pending action. Returns the action with a fresh UUID.""" action = PendingAction( id=str(uuid.uuid4()), description=description, action_type=action_type, params=params, plane_issue_id=plane_issue_id, plane_project_id=plane_project_id, ) _pending[action.id] = action return action def generate_token(action_id: str, purpose: str) -> str: """Generate a signed, time-stamped base64url token for approve or reject.""" ts = str(int(time.time())) msg = f"{action_id}:{ts}:{purpose}".encode() sig = hmac.new(JON_SECRET.encode(), msg, hashlib.sha256).hexdigest()[:20] raw = f"{action_id}:{ts}:{purpose}:{sig}" return base64.urlsafe_b64encode(raw.encode()).decode().rstrip("=") def _pad(s: str) -> str: return s + "=" * (-len(s) % 4) # 0 when already aligned, not 4 def verify_token(token: str) -> tuple[str, str]: """Decode and verify a token. Returns (action_id, purpose). Raises ValueError on expired, used, or invalid signature.""" try: raw = base64.urlsafe_b64decode(_pad(token)).decode() parts = raw.split(":") if len(parts) != 4: raise ValueError("malformed token") action_id, ts, purpose, sig = parts except Exception: raise ValueError("invalid token format") if token in _used_tokens: raise ValueError("token already used") now = int(time.time()) if now - int(ts) > TOKEN_TTL: raise ValueError("token expired") # Verify HMAC msg = f"{action_id}:{ts}:{purpose}".encode() expected_sig = hmac.new(JON_SECRET.encode(), msg, hashlib.sha256).hexdigest()[:20] if not hmac.compare_digest(sig, expected_sig): raise ValueError("invalid token signature") _used_tokens.add(token) return action_id, purpose def pop_action(action_id: str) -> Optional[PendingAction]: """Remove and return a pending action by ID. Returns None if not found.""" return _pending.pop(action_id, None) def get_action(action_id: str) -> Optional[PendingAction]: """Get a pending action without removing it.""" return _pending.get(action_id) def cleanup_expired() -> int: """Remove actions older than TOKEN_TTL. Returns count removed.""" cutoff = time.time() - TOKEN_TTL expired = [aid for aid, a in _pending.items() if a.created_at < cutoff] for aid in expired: del _pending[aid] return len(expired)