import difflib import json import logging import os import subprocess import tempfile import uuid from datetime import datetime, timezone from pathlib import Path import httpx from fastapi import BackgroundTasks, FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from pydantic import BaseModel logging.basicConfig(level=logging.INFO, format="%(asctime)s [qyburn] %(message)s") logger = logging.getLogger("qyburn") HODOR_URL = os.getenv("HODOR_URL", "http://hodor-gateway:8200") OLLAMA_URL = os.getenv("OLLAMA_URL", "http://172.27.40.20:11434") RAVEN_URL = os.getenv("RAVEN_URL", "http://raven-notify:8400/notify") CODER_MODEL = os.getenv("CODER_MODEL", "qwen2.5-coder:7b") STACKS_DIR = Path(os.getenv("STACKS_DIR", "/opt/stacks")) SITES_DIR = Path(os.getenv("SITES_DIR", "/opt/sites")) AGENT_OS_DIR = Path(os.getenv("AGENT_OS_DIR", "/opt/agent-os")) SANDBOX_DIR = STACKS_DIR / "qyburn-coder" / "sandbox" JOBS_FILE = STACKS_DIR / "qyburn-coder" / "jobs.json" MAX_ATTEMPTS = 3 def _resolve_target(target_path: str) -> Path: if target_path.startswith("/opt/"): return Path(target_path) return STACKS_DIR / target_path def _save_jobs(): try: JOBS_FILE.write_text(json.dumps(jobs, indent=2)) except Exception as e: logger.warning(f"failed to persist jobs: {e}") def _load_jobs(): try: data = json.loads(JOBS_FILE.read_text()) jobs.update(data) logger.info(f"loaded {len(data)} job(s) from {JOBS_FILE}") except FileNotFoundError: pass except Exception as e: logger.warning(f"failed to load jobs from disk: {e}") jobs: dict[str, dict] = {} _load_jobs() app = FastAPI(title="qyburn-coder", version="0.1.0") # --------------------------------------------------------------------------- # Models # --------------------------------------------------------------------------- class TaskRequest(BaseModel): description: str target_path: str constraints: str | None = None # --------------------------------------------------------------------------- # Coding loop helpers # --------------------------------------------------------------------------- def _extract_diff(response: str) -> str: lines = response.strip().splitlines() if lines and lines[0].strip().startswith("```"): lines = lines[1:] if lines and lines[-1].strip() == "```": lines = lines[:-1] return "\n".join(lines) def _apply_diff(original: str, diff_text: str) -> tuple[str | None, str | None]: """Apply a unified diff to original. Returns (patched_content, error_or_None).""" with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: f.write(original) tmp_path = f.name try: result = subprocess.run( ["patch", "--no-backup-if-mismatch", tmp_path], input=diff_text, capture_output=True, text=True, timeout=30, ) if result.returncode != 0: return None, f"patch failed:\n{(result.stderr or result.stdout).strip()[:400]}" return Path(tmp_path).read_text(), None finally: Path(tmp_path).unlink(missing_ok=True) Path(tmp_path + ".orig").unlink(missing_ok=True) def _syntax_check(code: str, filename: str) -> str | None: with tempfile.NamedTemporaryFile(suffix=".py", mode="w", delete=False) as f: f.write(code) tmp = f.name try: result = subprocess.run( ["python", "-m", "py_compile", tmp], capture_output=True, text=True, timeout=10, ) if result.returncode != 0: return result.stderr.replace(tmp, filename) return None finally: Path(tmp).unlink(missing_ok=True) def _validate_output(content: str, filename: str) -> str | None: """Returns error string or None if valid. Handles .py, .html, and generic files.""" if not content.strip(): return "model returned empty output" ext = Path(filename).suffix.lower() if ext == ".py": return _syntax_check(content, filename) if ext in (".html", ".htm"): low = content.lower() if " or )" return None return None def _make_diff(original: str, modified: str, filename: str) -> str: a = original.splitlines(keepends=True) b = modified.splitlines(keepends=True) return "".join(difflib.unified_diff(a, b, fromfile=f"a/{filename}", tofile=f"b/{filename}")) or "(no changes)" def _build_create_prompt(job: dict, attempt: int, last_error: str | None) -> str: lines = [f"Task: {job['description']}"] if job.get("constraints"): lines.append(f"Constraints: {job['constraints']}") if attempt > 1 and last_error: lines.append(f"\nPrevious attempt failed:\n{last_error}") lines.append("Fix the issue in the output.") lines += [ "", f"Create the file: {job['target_path']}", "", "Output the complete file content:", ] return "\n".join(lines) def _build_prompt(job: dict, original: str, attempt: int, last_error: str | None) -> str: lines = [f"Task: {job['description']}"] if job.get("constraints"): lines.append(f"Constraints: {job['constraints']}") if attempt > 1 and last_error: lines.append(f"\nPrevious attempt failed:\n{last_error}") lines.append("Fix the diff so it applies cleanly.") lines += [ "", f"File to modify ({job['target_path']}):", "---", original, "---", "", "Output the unified diff:", ] return "\n".join(lines) async def _notify_raven(title: str, message: str, level: str = "warning"): try: async with httpx.AsyncClient(timeout=10.0) as client: r = await client.post(RAVEN_URL, json={"title": title, "message": message, "level": level}) logger.info(f"Raven notified: {r.status_code}") except Exception as e: logger.warning(f"Raven notification failed: {e}") async def _warm_coder_model(): try: async with httpx.AsyncClient(timeout=120.0) as client: logger.info(f"warming coder model {CODER_MODEL}...") await client.post(f"{OLLAMA_URL}/api/chat", json={ "model": CODER_MODEL, "messages": [{"role": "user", "content": "ready"}], "stream": False, }) logger.info("coder model warm") except Exception as e: logger.warning(f"model warm-up failed (continuing anyway): {e}") _CODER_SYSTEM = ( "You are a Python code editor. " "When given a task and a file, output ONLY a unified diff showing the changes needed. " "Use standard unified diff format with --- and +++ headers and @@ hunk markers. " "No markdown fences, no explanations, no comments. " "Output only the diff lines: ---, +++, @@, context lines (space prefix), " "+ added lines, - removed lines." ) _CREATOR_SYSTEM = ( "You are a code and file generator. " "When given a task, output ONLY the complete new file as raw text. " "No markdown fences, no explanations, no preamble. " "Output only the file content itself, starting with the very first character of the file." ) async def _ask_creator(task_prompt: str) -> str: async with httpx.AsyncClient(timeout=300.0) as client: r = await client.post(f"{OLLAMA_URL}/api/chat", json={ "model": CODER_MODEL, "messages": [ {"role": "system", "content": _CREATOR_SYSTEM}, {"role": "user", "content": task_prompt}, ], "stream": False, "options": {"temperature": 0.1, "num_predict": 8192}, }) r.raise_for_status() return r.json()["message"]["content"] async def _ask_coder(task_prompt: str) -> str: async with httpx.AsyncClient(timeout=300.0) as client: r = await client.post(f"{OLLAMA_URL}/api/chat", json={ "model": CODER_MODEL, "messages": [ {"role": "system", "content": _CODER_SYSTEM}, {"role": "user", "content": task_prompt}, ], "stream": False, "options": {"temperature": 0.1, "num_predict": 4096}, }) r.raise_for_status() return r.json()["message"]["content"] def _write_handoff(job: dict, original: str, last_diff: str): out_dir = SITES_DIR / "qyburn" out_dir.mkdir(parents=True, exist_ok=True) content = ( f"# Qyburn Escalation Handoff\n\n" f"**Job:** {job['job_id']}\n" f"**Task:** {job['description']}\n" f"**Target:** {job['target_path']}\n" f"**Attempts:** {job['attempts']}\n" f"**Last error:** {job.get('error', 'unknown')}\n\n" f"---\n\n" f"## Last Diff Attempt\n\n```diff\n{last_diff}\n```\n\n" f"## Instructions\n\n" f"The diff above failed to apply cleanly or produced a syntax error.\n" f"Edit the target file directly at `{job['target_path']}`, then POST the corrected complete file to `/handoff/{job['job_id']}`.\n" ) (out_dir / "handoff.md").write_text(content) logger.info(f"[{job['job_id'][:8]}] handoff written → /opt/sites/qyburn/handoff.md") def _write_last_run(): log_dir = AGENT_OS_DIR / "logs" / "qyburn-coder" log_dir.mkdir(parents=True, exist_ok=True) counts: dict[str, int] = {} for j in jobs.values(): counts[j["status"]] = counts.get(j["status"], 0) + 1 (log_dir / "last-run.json").write_text(json.dumps({ "agent": "qyburn-coder", "timestamp": datetime.now(timezone.utc).isoformat(), "status": "running", "result": f"{len(jobs)} jobs — " + ", ".join(f"{v} {k}" for k, v in counts.items()), }, indent=2)) _save_jobs() # --------------------------------------------------------------------------- # Coding loop (background task) # --------------------------------------------------------------------------- async def run_coding_loop(job_id: str): job = jobs[job_id] job["status"] = "running" job["updated_at"] = datetime.now(timezone.utc).isoformat() target = _resolve_target(job["target_path"]) mode = "edit" if target.exists() else "create" job["mode"] = mode job["updated_at"] = datetime.now(timezone.utc).isoformat() if mode == "edit": original = target.read_text() else: original = "" logger.info(f"[{job_id[:8]}] target does not exist — create mode") await _warm_coder_model() sandbox_dir = SANDBOX_DIR / job_id sandbox_dir.mkdir(parents=True, exist_ok=True) sandbox_file = sandbox_dir / Path(job["target_path"]).name last_diff = "" last_error = None for attempt in range(1, MAX_ATTEMPTS + 1): if job.get("cancelled"): job["status"] = "cancelled" job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] cancelled between attempts") _write_last_run() return job["attempts"] = attempt job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] attempt {attempt}/{MAX_ATTEMPTS}") try: if mode == "edit": response = await _ask_coder(_build_prompt(job, original, attempt, last_error)) else: response = await _ask_creator(_build_create_prompt(job, attempt, last_error)) except Exception as e: last_error = f"Ollama error: {e}" logger.warning(f"[{job_id[:8]}] attempt {attempt} — LLM error: {e}") continue job["tokens_estimated"] = job.get("tokens_estimated", 0) + len(response) // 4 raw = _extract_diff(response) last_diff = raw if mode == "edit": content, patch_error = _apply_diff(original, raw) if patch_error: last_error = patch_error logger.warning(f"[{job_id[:8]}] attempt {attempt} patch error: {patch_error[:120]}") continue else: content = raw error = _validate_output(content, Path(job["target_path"]).name) if error is None: sandbox_file.write_text(content) job["status"] = "pending_review" job["diff"] = raw if mode == "edit" else f"[NEW FILE — {Path(job['target_path']).name}]\n\n{content}" job["sandbox_file"] = str(sandbox_file) job["original"] = original job["error"] = None job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] {mode} OK on attempt {attempt} — pending review") _write_last_run() return last_error = error logger.warning(f"[{job_id[:8]}] attempt {attempt} error: {error[:120]}") _write_handoff(job, original, last_diff) job["status"] = "escalated" job["error"] = last_error job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] escalated after {MAX_ATTEMPTS} attempts") await _notify_raven( title="Qyburn handoff ready", message=( f"Job {job_id[:8]} escalated after {MAX_ATTEMPTS} attempts.\n" f"Task: {job['description']}\n" f"Target: {job['target_path']}\n" f"Last error: {str(last_error)[:200]}\n" f"Read handoff: /opt/sites/qyburn/handoff.md" ), level="warning", ) _write_last_run() # --------------------------------------------------------------------------- # Status page # --------------------------------------------------------------------------- _STATUS_COLORS = { "queued": "#8b949e", "running": "#58a6ff", "pending_review": "#f0883e", "applied": "#56d364", "escalated": "#ff7b72", "failed": "#ff7b72", "cancelled": "#8b949e", "rejected": "#8b949e", } _BTN = '
' def _buttons(*specs) -> str: return '
' + "".join( f'
' f'
' for action, jid, cls, label in specs ) + "
" def _render_page() -> str: timestamp = datetime.now(timezone.utc).isoformat() active = any(j["status"] in ("queued", "running") for j in jobs.values()) refresh = '' if active else "" cards = "" for job_id, job in sorted(jobs.items(), key=lambda x: x[1]["created_at"], reverse=True): color = _STATUS_COLORS.get(job["status"], "#8b949e") badge = f'{job["status"]}' extra = "" status = job["status"] if status == "pending_review" and job.get("diff"): stack_dir = str(Path(job["target_path"]).parent) extra = ( f'
{job["diff"]}
' + _buttons( ("approve", job_id, "approve", "Approve & Apply"), ("reject", job_id, "danger", "Reject"), ("retry", job_id, "secondary", "Retry"), ) + f'

After approving, use the docker_rebuild tool ' f'or run: cd /opt/stacks/{stack_dir} && docker compose up -d --build

' ) elif status in ("queued", "running"): extra = _buttons(("cancel", job_id, "danger", "Cancel")) elif status in ("escalated", "rejected", "failed"): extra = _buttons(("retry", job_id, "secondary", "Retry")) if status == "escalated": extra += ('

Handoff written to ' '/opt/sites/qyburn/handoff.md — paste to Claude Code.

') if job.get("error"): extra += f'
{job["error"]}
' elif status == "applied": stack_dir = str(Path(job["target_path"]).parent) extra = ( f'

Applied. Use the docker_rebuild tool ' f'or run: cd /opt/stacks/{stack_dir} && docker compose up -d --build

' ) elif job.get("error"): extra = f'
{job["error"]}
' cards += f"""
{job_id[:8]} {badge} attempt {job.get('attempts', 0)}/{MAX_ATTEMPTS}

{job['description']}

Target: {job['target_path']}  · 

{extra}
""" body = cards if cards else '

No jobs yet. POST to /task to submit one.

' return f""" Qyburn — Coder Agent {refresh}

Qyburn — Coder Agent

Local LLM coding loop · {CODER_MODEL} · sandbox review before apply

Updated  ·  {len(jobs)} job(s)

{body} """ # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.get("/health") def health(): return {"status": "ok", "agent": "qyburn-coder", "model": CODER_MODEL, "hodor_url": HODOR_URL, "jobs": len(jobs)} @app.get("/", response_class=HTMLResponse) def index(): return _render_page() @app.post("/task") async def submit_task(req: TaskRequest, background_tasks: BackgroundTasks): job_id = str(uuid.uuid4()) now = datetime.now(timezone.utc).isoformat() jobs[job_id] = { "job_id": job_id, "status": "queued", "description": req.description, "target_path": req.target_path, "constraints": req.constraints, "attempts": 0, "created_at": now, "updated_at": now, "cancelled": False, "error": None, "diff": None, "mode": None, "sandbox_file": None, "original": None, "tokens_estimated": 0, } background_tasks.add_task(run_coding_loop, job_id) logger.info(f"[{job_id[:8]}] queued: {req.description[:80]}") _write_last_run() return {"job_id": job_id, "status": "queued"} @app.get("/status/{job_id}") def get_status(job_id: str): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") return {k: v for k, v in job.items() if k not in ("original", "sandbox_file", "cancelled")} @app.get("/jobs") def list_jobs(): return [ { "job_id": j["job_id"], "description": j["description"], "status": j["status"], "attempts": j.get("attempts", 0), "tokens_estimated": j.get("tokens_estimated", 0), "created_at": j["created_at"], "updated_at": j["updated_at"], } for j in sorted(jobs.values(), key=lambda x: x["created_at"], reverse=True) ] @app.get("/metrics") def get_metrics(): total = len(jobs) total_tokens = sum(j.get("tokens_estimated", 0) for j in jobs.values()) by_status: dict[str, int] = {} for j in jobs.values(): by_status[j["status"]] = by_status.get(j["status"], 0) + 1 return { "total_jobs": total, "total_tokens_estimated": total_tokens, "avg_tokens_per_job": total_tokens // total if total else 0, "jobs_by_status": by_status, } @app.post("/approve/{job_id}") async def approve_job(job_id: str, request: Request): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") if job["status"] != "pending_review": raise HTTPException(status_code=400, detail=f"job is '{job['status']}', expected 'pending_review'") sandbox_file = Path(job["sandbox_file"]) if not sandbox_file.exists(): raise HTTPException(status_code=500, detail="sandbox file missing") target = _resolve_target(job["target_path"]) target.parent.mkdir(parents=True, exist_ok=True) if job.get("original"): backup = target.with_suffix(target.suffix + ".qyburn-bak") backup.write_text(job["original"]) target.write_text(sandbox_file.read_text()) job["status"] = "applied" job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] applied → {target} (backup: {backup.name})") _write_last_run() if "text/html" in request.headers.get("accept", ""): return RedirectResponse(url="/", status_code=303) return {"status": "applied", "target": str(target), "backup": str(backup)} @app.post("/reject/{job_id}") async def reject_job(job_id: str, request: Request): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") if job["status"] != "pending_review": raise HTTPException(status_code=400, detail=f"job is '{job['status']}', expected 'pending_review'") job["status"] = "rejected" job["diff"] = None job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] rejected") _write_last_run() if "text/html" in request.headers.get("accept", ""): return RedirectResponse(url="/", status_code=303) return {"status": "rejected", "job_id": job_id} @app.post("/cancel/{job_id}") async def cancel_job(job_id: str, request: Request): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") if job["status"] not in ("queued", "running"): raise HTTPException(status_code=400, detail=f"job is '{job['status']}', can only cancel queued/running") job["cancelled"] = True job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] cancel requested") _write_last_run() if "text/html" in request.headers.get("accept", ""): return RedirectResponse(url="/", status_code=303) return {"status": "cancelling", "job_id": job_id} @app.post("/retry/{job_id}") async def retry_job(job_id: str, request: Request, background_tasks: BackgroundTasks): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") if job["status"] not in ("pending_review", "rejected", "escalated", "failed", "cancelled"): raise HTTPException(status_code=400, detail=f"job is '{job['status']}', cannot retry") job["status"] = "queued" job["attempts"] = 0 job["cancelled"] = False job["error"] = None job["diff"] = None job["sandbox_file"] = None job["original"] = None job["updated_at"] = datetime.now(timezone.utc).isoformat() background_tasks.add_task(run_coding_loop, job_id) logger.info(f"[{job_id[:8]}] retrying") _write_last_run() if "text/html" in request.headers.get("accept", ""): return RedirectResponse(url="/", status_code=303) return {"status": "queued", "job_id": job_id} @app.post("/rebuild/{stack_name}") async def rebuild_stack(stack_name: str): """Run docker compose up -d --build for a named stack in /opt/stacks/.""" if not stack_name or "/" in stack_name or ".." in stack_name: raise HTTPException(status_code=400, detail="invalid stack name") stack_path = STACKS_DIR / stack_name if not stack_path.is_dir(): raise HTTPException(status_code=404, detail=f"stack '{stack_name}' not found in /opt/stacks/") compose_file = stack_path / "docker-compose.yml" if not compose_file.exists(): raise HTTPException(status_code=404, detail=f"no docker-compose.yml in {stack_name}") logger.info(f"rebuilding {stack_name}") result = subprocess.run( ["docker", "compose", "up", "-d", "--build"], cwd=str(stack_path), capture_output=True, text=True, timeout=300, ) success = result.returncode == 0 logger.info(f"rebuild {stack_name}: {'ok' if success else 'failed'}") return { "stack": stack_name, "success": success, "stdout": result.stdout, "stderr": result.stderr, } @app.post("/handoff/{job_id}") async def receive_handoff(job_id: str, request: Request): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") body = await request.body() code = body.decode() error = _syntax_check(code, Path(job["target_path"]).name) if error: raise HTTPException(status_code=400, detail=f"syntax error in submitted code: {error}") original = job.get("original") or (STACKS_DIR / job["target_path"]).read_text() sandbox_dir = SANDBOX_DIR / job_id sandbox_dir.mkdir(parents=True, exist_ok=True) sandbox_file = sandbox_dir / Path(job["target_path"]).name sandbox_file.write_text(code) job["status"] = "pending_review" job["diff"] = _make_diff(original, code, Path(job["target_path"]).name) job["sandbox_file"] = str(sandbox_file) job["original"] = original job["error"] = None job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] handoff received — pending review") _write_last_run() return {"status": "pending_review", "job_id": job_id} # --------------------------------------------------------------------------- # Jon Snow approval gate — rebuild/restart endpoints (pure subprocess, no LLM) # --------------------------------------------------------------------------- @app.post("/rebuild/{stack_name}") async def rebuild_stack(stack_name: str): """Rebuild a Docker Compose stack (compose up -d --build). Called by Jon Snow approval gate. No LLM involved — pure subprocess execution.""" import re if not re.match(r'^[a-zA-Z0-9_-]+$', stack_name): return JSONResponse({"ok": False, "error": "invalid stack name"}, status_code=400) stack_path = STACKS_DIR / stack_name if not stack_path.exists(): return JSONResponse({"ok": False, "error": f"stack not found: {stack_name}"}, status_code=404) logger.info(f"[rebuild] {stack_name} — starting compose up -d --build") try: result = subprocess.run( ["docker", "compose", "up", "-d", "--build"], cwd=str(stack_path), capture_output=True, text=True, timeout=300, ) ok = result.returncode == 0 logger.info(f"[rebuild] {stack_name} — {'ok' if ok else 'failed'} (rc={result.returncode})") return JSONResponse({ "ok": ok, "stack": stack_name, "returncode": result.returncode, "stdout": result.stdout[-2000:] if result.stdout else "", "stderr": result.stderr[-2000:] if result.stderr else "", }) except subprocess.TimeoutExpired: logger.error(f"[rebuild] {stack_name} — timed out after 300s") return JSONResponse({"ok": False, "error": "rebuild timed out (300s)"}, status_code=408) except Exception as e: logger.error(f"[rebuild] {stack_name} — error: {e}") return JSONResponse({"ok": False, "error": str(e)}, status_code=500) @app.post("/restart/{stack_name}") async def restart_stack(stack_name: str): """Restart a Docker Compose stack without rebuilding (compose restart). Called by Jon Snow approval gate. No LLM involved — pure subprocess execution.""" import re if not re.match(r'^[a-zA-Z0-9_-]+$', stack_name): return JSONResponse({"ok": False, "error": "invalid stack name"}, status_code=400) stack_path = STACKS_DIR / stack_name if not stack_path.exists(): return JSONResponse({"ok": False, "error": f"stack not found: {stack_name}"}, status_code=404) logger.info(f"[restart] {stack_name} — starting compose restart") try: result = subprocess.run( ["docker", "compose", "restart"], cwd=str(stack_path), capture_output=True, text=True, timeout=120, ) ok = result.returncode == 0 logger.info(f"[restart] {stack_name} — {'ok' if ok else 'failed'} (rc={result.returncode})") return JSONResponse({ "ok": ok, "stack": stack_name, "returncode": result.returncode, "stdout": result.stdout[-2000:] if result.stdout else "", "stderr": result.stderr[-2000:] if result.stderr else "", }) except subprocess.TimeoutExpired: logger.error(f"[restart] {stack_name} — timed out after 120s") return JSONResponse({"ok": False, "error": "restart timed out (120s)"}, status_code=408) except Exception as e: logger.error(f"[restart] {stack_name} — error: {e}") return JSONResponse({"ok": False, "error": str(e)}, status_code=500)