3896af2a43
4 - len(s) % 4 = 4 when len % 4 == 0, adding 4 padding chars instead of 0. -len(s) % 4 correctly returns 0 in that case.
109 lines
3.3 KiB
Python
109 lines
3.3 KiB
Python
"""Jon Snow approval gate — token management and pending action store."""
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import os
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
JON_SECRET = os.getenv("JON_SECRET", "change-me")
|
|
TOKEN_TTL = 900 # 15 minutes
|
|
|
|
|
|
@dataclass
|
|
class PendingAction:
|
|
id: str
|
|
description: str
|
|
action_type: str # "docker_rebuild", "docker_restart", "file_write"
|
|
params: dict
|
|
plane_issue_id: Optional[str] = None
|
|
plane_project_id: Optional[str] = None
|
|
created_at: float = field(default_factory=time.time)
|
|
|
|
|
|
_pending: dict[str, PendingAction] = {}
|
|
_used_tokens: set[str] = set()
|
|
|
|
|
|
def queue_action(
|
|
description: str,
|
|
action_type: str,
|
|
params: dict,
|
|
plane_issue_id: Optional[str] = None,
|
|
plane_project_id: Optional[str] = None,
|
|
) -> PendingAction:
|
|
"""Create and store a pending action. Returns the action with a fresh UUID."""
|
|
action = PendingAction(
|
|
id=str(uuid.uuid4()),
|
|
description=description,
|
|
action_type=action_type,
|
|
params=params,
|
|
plane_issue_id=plane_issue_id,
|
|
plane_project_id=plane_project_id,
|
|
)
|
|
_pending[action.id] = action
|
|
return action
|
|
|
|
|
|
def generate_token(action_id: str, purpose: str) -> str:
|
|
"""Generate a signed, time-stamped base64url token for approve or reject."""
|
|
ts = str(int(time.time()))
|
|
msg = f"{action_id}:{ts}:{purpose}".encode()
|
|
sig = hmac.new(JON_SECRET.encode(), msg, hashlib.sha256).hexdigest()[:20]
|
|
raw = f"{action_id}:{ts}:{purpose}:{sig}"
|
|
return base64.urlsafe_b64encode(raw.encode()).decode().rstrip("=")
|
|
|
|
|
|
def _pad(s: str) -> str:
|
|
return s + "=" * (-len(s) % 4) # 0 when already aligned, not 4
|
|
|
|
|
|
def verify_token(token: str) -> tuple[str, str]:
|
|
"""Decode and verify a token. Returns (action_id, purpose).
|
|
Raises ValueError on expired, used, or invalid signature."""
|
|
try:
|
|
raw = base64.urlsafe_b64decode(_pad(token)).decode()
|
|
parts = raw.split(":")
|
|
if len(parts) != 4:
|
|
raise ValueError("malformed token")
|
|
action_id, ts, purpose, sig = parts
|
|
except Exception:
|
|
raise ValueError("invalid token format")
|
|
|
|
if token in _used_tokens:
|
|
raise ValueError("token already used")
|
|
|
|
now = int(time.time())
|
|
if now - int(ts) > TOKEN_TTL:
|
|
raise ValueError("token expired")
|
|
|
|
# Verify HMAC
|
|
msg = f"{action_id}:{ts}:{purpose}".encode()
|
|
expected_sig = hmac.new(JON_SECRET.encode(), msg, hashlib.sha256).hexdigest()[:20]
|
|
if not hmac.compare_digest(sig, expected_sig):
|
|
raise ValueError("invalid token signature")
|
|
|
|
_used_tokens.add(token)
|
|
return action_id, purpose
|
|
|
|
|
|
def pop_action(action_id: str) -> Optional[PendingAction]:
|
|
"""Remove and return a pending action by ID. Returns None if not found."""
|
|
return _pending.pop(action_id, None)
|
|
|
|
|
|
def get_action(action_id: str) -> Optional[PendingAction]:
|
|
"""Get a pending action without removing it."""
|
|
return _pending.get(action_id)
|
|
|
|
|
|
def cleanup_expired() -> int:
|
|
"""Remove actions older than TOKEN_TTL. Returns count removed."""
|
|
cutoff = time.time() - TOKEN_TTL
|
|
expired = [aid for aid, a in _pending.items() if a.created_at < cutoff]
|
|
for aid in expired:
|
|
del _pending[aid]
|
|
return len(expired)
|