Files
admin 3896af2a43 fix: _pad() wrong for reject tokens (6-char purpose)
4 - len(s) % 4 = 4 when len % 4 == 0, adding 4 padding chars instead of 0.
-len(s) % 4 correctly returns 0 in that case.
2026-05-27 13:14:07 +00:00

109 lines
3.3 KiB
Python

"""Jon Snow approval gate — token management and pending action store."""
import base64
import hashlib
import hmac
import os
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional
JON_SECRET = os.getenv("JON_SECRET", "change-me")
TOKEN_TTL = 900 # 15 minutes
@dataclass
class PendingAction:
id: str
description: str
action_type: str # "docker_rebuild", "docker_restart", "file_write"
params: dict
plane_issue_id: Optional[str] = None
plane_project_id: Optional[str] = None
created_at: float = field(default_factory=time.time)
_pending: dict[str, PendingAction] = {}
_used_tokens: set[str] = set()
def queue_action(
description: str,
action_type: str,
params: dict,
plane_issue_id: Optional[str] = None,
plane_project_id: Optional[str] = None,
) -> PendingAction:
"""Create and store a pending action. Returns the action with a fresh UUID."""
action = PendingAction(
id=str(uuid.uuid4()),
description=description,
action_type=action_type,
params=params,
plane_issue_id=plane_issue_id,
plane_project_id=plane_project_id,
)
_pending[action.id] = action
return action
def generate_token(action_id: str, purpose: str) -> str:
"""Generate a signed, time-stamped base64url token for approve or reject."""
ts = str(int(time.time()))
msg = f"{action_id}:{ts}:{purpose}".encode()
sig = hmac.new(JON_SECRET.encode(), msg, hashlib.sha256).hexdigest()[:20]
raw = f"{action_id}:{ts}:{purpose}:{sig}"
return base64.urlsafe_b64encode(raw.encode()).decode().rstrip("=")
def _pad(s: str) -> str:
return s + "=" * (-len(s) % 4) # 0 when already aligned, not 4
def verify_token(token: str) -> tuple[str, str]:
"""Decode and verify a token. Returns (action_id, purpose).
Raises ValueError on expired, used, or invalid signature."""
try:
raw = base64.urlsafe_b64decode(_pad(token)).decode()
parts = raw.split(":")
if len(parts) != 4:
raise ValueError("malformed token")
action_id, ts, purpose, sig = parts
except Exception:
raise ValueError("invalid token format")
if token in _used_tokens:
raise ValueError("token already used")
now = int(time.time())
if now - int(ts) > TOKEN_TTL:
raise ValueError("token expired")
# Verify HMAC
msg = f"{action_id}:{ts}:{purpose}".encode()
expected_sig = hmac.new(JON_SECRET.encode(), msg, hashlib.sha256).hexdigest()[:20]
if not hmac.compare_digest(sig, expected_sig):
raise ValueError("invalid token signature")
_used_tokens.add(token)
return action_id, purpose
def pop_action(action_id: str) -> Optional[PendingAction]:
"""Remove and return a pending action by ID. Returns None if not found."""
return _pending.pop(action_id, None)
def get_action(action_id: str) -> Optional[PendingAction]:
"""Get a pending action without removing it."""
return _pending.get(action_id)
def cleanup_expired() -> int:
"""Remove actions older than TOKEN_TTL. Returns count removed."""
cutoff = time.time() - TOKEN_TTL
expired = [aid for aid, a in _pending.items() if a.created_at < cutoff]
for aid in expired:
del _pending[aid]
return len(expired)