import difflib import json import logging import os import subprocess import tempfile import uuid from datetime import datetime, timezone from pathlib import Path import httpx from fastapi import BackgroundTasks, FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse, RedirectResponse from pydantic import BaseModel logging.basicConfig(level=logging.INFO, format="%(asctime)s [qyburn] %(message)s") logger = logging.getLogger("qyburn") HODOR_URL = os.getenv("HODOR_URL", "http://hodor-gateway:8200") OLLAMA_URL = os.getenv("OLLAMA_URL", "http://172.27.40.20:11434") RAVEN_URL = os.getenv("RAVEN_URL", "http://raven-notify:8400/notify") CODER_MODEL = os.getenv("CODER_MODEL", "qwen2.5-coder:7b") STACKS_DIR = Path(os.getenv("STACKS_DIR", "/opt/stacks")) SITES_DIR = Path(os.getenv("SITES_DIR", "/opt/sites")) AGENT_OS_DIR = Path(os.getenv("AGENT_OS_DIR", "/opt/agent-os")) SANDBOX_DIR = STACKS_DIR / "qyburn-coder" / "sandbox" JOBS_FILE = STACKS_DIR / "qyburn-coder" / "jobs.json" MAX_ATTEMPTS = 3 def _save_jobs(): try: JOBS_FILE.write_text(json.dumps(jobs, indent=2)) except Exception as e: logger.warning(f"failed to persist jobs: {e}") def _load_jobs(): try: data = json.loads(JOBS_FILE.read_text()) jobs.update(data) logger.info(f"loaded {len(data)} job(s) from {JOBS_FILE}") except FileNotFoundError: pass except Exception as e: logger.warning(f"failed to load jobs from disk: {e}") jobs: dict[str, dict] = {} _load_jobs() app = FastAPI(title="qyburn-coder", version="0.1.0") # --------------------------------------------------------------------------- # Models # --------------------------------------------------------------------------- class TaskRequest(BaseModel): description: str target_path: str constraints: str | None = None # --------------------------------------------------------------------------- # Coding loop helpers # --------------------------------------------------------------------------- def _extract_diff(response: str) -> str: lines = response.strip().splitlines() if lines and lines[0].strip().startswith("```"): lines = lines[1:] if lines and lines[-1].strip() == "```": lines = lines[:-1] return "\n".join(lines) def _apply_diff(original: str, diff_text: str) -> tuple[str | None, str | None]: """Apply a unified diff to original. Returns (patched_content, error_or_None).""" with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: f.write(original) tmp_path = f.name try: result = subprocess.run( ["patch", "--no-backup-if-mismatch", tmp_path], input=diff_text, capture_output=True, text=True, timeout=30, ) if result.returncode != 0: return None, f"patch failed:\n{(result.stderr or result.stdout).strip()[:400]}" return Path(tmp_path).read_text(), None finally: Path(tmp_path).unlink(missing_ok=True) Path(tmp_path + ".orig").unlink(missing_ok=True) def _syntax_check(code: str, filename: str) -> str | None: with tempfile.NamedTemporaryFile(suffix=".py", mode="w", delete=False) as f: f.write(code) tmp = f.name try: result = subprocess.run( ["python", "-m", "py_compile", tmp], capture_output=True, text=True, timeout=10, ) if result.returncode != 0: return result.stderr.replace(tmp, filename) return None finally: Path(tmp).unlink(missing_ok=True) def _make_diff(original: str, modified: str, filename: str) -> str: a = original.splitlines(keepends=True) b = modified.splitlines(keepends=True) return "".join(difflib.unified_diff(a, b, fromfile=f"a/{filename}", tofile=f"b/{filename}")) or "(no changes)" def _build_prompt(job: dict, original: str, attempt: int, last_error: str | None) -> str: lines = [f"Task: {job['description']}"] if job.get("constraints"): lines.append(f"Constraints: {job['constraints']}") if attempt > 1 and last_error: lines.append(f"\nPrevious attempt failed:\n{last_error}") lines.append("Fix the diff so it applies cleanly.") lines += [ "", f"File to modify ({job['target_path']}):", "---", original, "---", "", "Output the unified diff:", ] return "\n".join(lines) async def _notify_raven(title: str, message: str, level: str = "warning"): try: async with httpx.AsyncClient(timeout=10.0) as client: r = await client.post(RAVEN_URL, json={"title": title, "message": message, "level": level}) logger.info(f"Raven notified: {r.status_code}") except Exception as e: logger.warning(f"Raven notification failed: {e}") async def _warm_coder_model(): try: async with httpx.AsyncClient(timeout=120.0) as client: logger.info(f"warming coder model {CODER_MODEL}...") await client.post(f"{OLLAMA_URL}/api/chat", json={ "model": CODER_MODEL, "messages": [{"role": "user", "content": "ready"}], "stream": False, }) logger.info("coder model warm") except Exception as e: logger.warning(f"model warm-up failed (continuing anyway): {e}") _CODER_SYSTEM = ( "You are a Python code editor. " "When given a task and a file, output ONLY a unified diff showing the changes needed. " "Use standard unified diff format with --- and +++ headers and @@ hunk markers. " "No markdown fences, no explanations, no comments. " "Output only the diff lines: ---, +++, @@, context lines (space prefix), " "+ added lines, - removed lines." ) async def _ask_coder(task_prompt: str) -> str: async with httpx.AsyncClient(timeout=300.0) as client: r = await client.post(f"{OLLAMA_URL}/api/chat", json={ "model": CODER_MODEL, "messages": [ {"role": "system", "content": _CODER_SYSTEM}, {"role": "user", "content": task_prompt}, ], "stream": False, "options": {"temperature": 0.1, "num_predict": 4096}, }) r.raise_for_status() return r.json()["message"]["content"] def _write_handoff(job: dict, original: str, last_diff: str): out_dir = SITES_DIR / "qyburn" out_dir.mkdir(parents=True, exist_ok=True) content = ( f"# Qyburn Escalation Handoff\n\n" f"**Job:** {job['job_id']}\n" f"**Task:** {job['description']}\n" f"**Target:** {job['target_path']}\n" f"**Attempts:** {job['attempts']}\n" f"**Last error:** {job.get('error', 'unknown')}\n\n" f"---\n\n" f"## Last Diff Attempt\n\n```diff\n{last_diff}\n```\n\n" f"## Instructions\n\n" f"The diff above failed to apply cleanly or produced a syntax error.\n" f"Edit the target file directly at `{job['target_path']}`, then POST the corrected complete file to `/handoff/{job['job_id']}`.\n" ) (out_dir / "handoff.md").write_text(content) logger.info(f"[{job['job_id'][:8]}] handoff written → /opt/sites/qyburn/handoff.md") def _write_last_run(): log_dir = AGENT_OS_DIR / "logs" / "qyburn-coder" log_dir.mkdir(parents=True, exist_ok=True) counts: dict[str, int] = {} for j in jobs.values(): counts[j["status"]] = counts.get(j["status"], 0) + 1 (log_dir / "last-run.json").write_text(json.dumps({ "agent": "qyburn-coder", "timestamp": datetime.now(timezone.utc).isoformat(), "status": "running", "result": f"{len(jobs)} jobs — " + ", ".join(f"{v} {k}" for k, v in counts.items()), }, indent=2)) _save_jobs() # --------------------------------------------------------------------------- # Coding loop (background task) # --------------------------------------------------------------------------- async def run_coding_loop(job_id: str): job = jobs[job_id] job["status"] = "running" job["updated_at"] = datetime.now(timezone.utc).isoformat() target = STACKS_DIR / job["target_path"] if not target.exists(): job["status"] = "failed" job["error"] = f"target file not found: {job['target_path']}" job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.error(f"[{job_id[:8]}] {job['error']}") return await _warm_coder_model() original = target.read_text() sandbox_dir = SANDBOX_DIR / job_id sandbox_dir.mkdir(parents=True, exist_ok=True) sandbox_file = sandbox_dir / Path(job["target_path"]).name last_diff = "" last_error = None for attempt in range(1, MAX_ATTEMPTS + 1): if job.get("cancelled"): job["status"] = "cancelled" job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] cancelled between attempts") _write_last_run() return job["attempts"] = attempt job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] attempt {attempt}/{MAX_ATTEMPTS}") try: response = await _ask_coder(_build_prompt(job, original, attempt, last_error)) except Exception as e: last_error = f"Ollama error: {e}" logger.warning(f"[{job_id[:8]}] attempt {attempt} — LLM error: {e}") continue job["tokens_estimated"] = job.get("tokens_estimated", 0) + len(response) // 4 diff_text = _extract_diff(response) last_diff = diff_text patched, patch_error = _apply_diff(original, diff_text) if patch_error: last_error = patch_error logger.warning(f"[{job_id[:8]}] attempt {attempt} patch error: {patch_error[:120]}") continue syntax_error = _syntax_check(patched, Path(job["target_path"]).name) if syntax_error is None: sandbox_file.write_text(patched) job["status"] = "pending_review" job["diff"] = diff_text job["sandbox_file"] = str(sandbox_file) job["original"] = original job["error"] = None job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] patch OK on attempt {attempt} — pending review") _write_last_run() return last_error = syntax_error logger.warning(f"[{job_id[:8]}] attempt {attempt} syntax error: {syntax_error[:120]}") _write_handoff(job, original, last_diff) job["status"] = "escalated" job["error"] = last_error job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] escalated after {MAX_ATTEMPTS} attempts") await _notify_raven( title="Qyburn handoff ready", message=( f"Job {job_id[:8]} escalated after {MAX_ATTEMPTS} attempts.\n" f"Task: {job['description']}\n" f"Target: {job['target_path']}\n" f"Last error: {str(last_error)[:200]}\n" f"Read handoff: /opt/sites/qyburn/handoff.md" ), level="warning", ) _write_last_run() # --------------------------------------------------------------------------- # Status page # --------------------------------------------------------------------------- _STATUS_COLORS = { "queued": "#8b949e", "running": "#58a6ff", "pending_review": "#f0883e", "applied": "#56d364", "escalated": "#ff7b72", "failed": "#ff7b72", "cancelled": "#8b949e", "rejected": "#8b949e", } _BTN = '
' def _buttons(*specs) -> str: return '
' + "".join( f'
' f'
' for action, jid, cls, label in specs ) + "
" def _render_page() -> str: timestamp = datetime.now(timezone.utc).isoformat() active = any(j["status"] in ("queued", "running") for j in jobs.values()) refresh = '' if active else "" cards = "" for job_id, job in sorted(jobs.items(), key=lambda x: x[1]["created_at"], reverse=True): color = _STATUS_COLORS.get(job["status"], "#8b949e") badge = f'{job["status"]}' extra = "" status = job["status"] if status == "pending_review" and job.get("diff"): stack_dir = str(Path(job["target_path"]).parent) extra = ( f'
{job["diff"]}
' + _buttons( ("approve", job_id, "approve", "Approve & Apply"), ("reject", job_id, "danger", "Reject"), ("retry", job_id, "secondary", "Retry"), ) + f'

After approving, use the docker_rebuild tool ' f'or run: cd /opt/stacks/{stack_dir} && docker compose up -d --build

' ) elif status in ("queued", "running"): extra = _buttons(("cancel", job_id, "danger", "Cancel")) elif status in ("escalated", "rejected", "failed"): extra = _buttons(("retry", job_id, "secondary", "Retry")) if status == "escalated": extra += ('

Handoff written to ' '/opt/sites/qyburn/handoff.md — paste to Claude Code.

') if job.get("error"): extra += f'
{job["error"]}
' elif status == "applied": stack_dir = str(Path(job["target_path"]).parent) extra = ( f'

Applied. Use the docker_rebuild tool ' f'or run: cd /opt/stacks/{stack_dir} && docker compose up -d --build

' ) elif job.get("error"): extra = f'
{job["error"]}
' cards += f"""
{job_id[:8]} {badge} attempt {job.get('attempts', 0)}/{MAX_ATTEMPTS}

{job['description']}

Target: {job['target_path']}  · 

{extra}
""" body = cards if cards else '

No jobs yet. POST to /task to submit one.

' return f""" Qyburn — Coder Agent {refresh}

Qyburn — Coder Agent

Local LLM coding loop · {CODER_MODEL} · sandbox review before apply

Updated  ·  {len(jobs)} job(s)

{body} """ # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.get("/health") def health(): return {"status": "ok", "agent": "qyburn-coder", "model": CODER_MODEL, "hodor_url": HODOR_URL, "jobs": len(jobs)} @app.get("/", response_class=HTMLResponse) def index(): return _render_page() @app.post("/task") async def submit_task(req: TaskRequest, background_tasks: BackgroundTasks): job_id = str(uuid.uuid4()) now = datetime.now(timezone.utc).isoformat() jobs[job_id] = { "job_id": job_id, "status": "queued", "description": req.description, "target_path": req.target_path, "constraints": req.constraints, "attempts": 0, "created_at": now, "updated_at": now, "cancelled": False, "error": None, "diff": None, "sandbox_file": None, "original": None, "tokens_estimated": 0, } background_tasks.add_task(run_coding_loop, job_id) logger.info(f"[{job_id[:8]}] queued: {req.description[:80]}") _write_last_run() return {"job_id": job_id, "status": "queued"} @app.get("/status/{job_id}") def get_status(job_id: str): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") return {k: v for k, v in job.items() if k not in ("original", "sandbox_file", "cancelled")} @app.get("/jobs") def list_jobs(): return [ { "job_id": j["job_id"], "description": j["description"], "status": j["status"], "attempts": j.get("attempts", 0), "tokens_estimated": j.get("tokens_estimated", 0), "created_at": j["created_at"], "updated_at": j["updated_at"], } for j in sorted(jobs.values(), key=lambda x: x["created_at"], reverse=True) ] @app.get("/metrics") def get_metrics(): total = len(jobs) total_tokens = sum(j.get("tokens_estimated", 0) for j in jobs.values()) by_status: dict[str, int] = {} for j in jobs.values(): by_status[j["status"]] = by_status.get(j["status"], 0) + 1 return { "total_jobs": total, "total_tokens_estimated": total_tokens, "avg_tokens_per_job": total_tokens // total if total else 0, "jobs_by_status": by_status, } @app.post("/approve/{job_id}") async def approve_job(job_id: str, request: Request): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") if job["status"] != "pending_review": raise HTTPException(status_code=400, detail=f"job is '{job['status']}', expected 'pending_review'") sandbox_file = Path(job["sandbox_file"]) if not sandbox_file.exists(): raise HTTPException(status_code=500, detail="sandbox file missing") target = STACKS_DIR / job["target_path"] backup = target.with_suffix(target.suffix + ".qyburn-bak") backup.write_text(job["original"]) target.write_text(sandbox_file.read_text()) job["status"] = "applied" job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] applied → {target} (backup: {backup.name})") _write_last_run() if "text/html" in request.headers.get("accept", ""): return RedirectResponse(url="/", status_code=303) return {"status": "applied", "target": str(target), "backup": str(backup)} @app.post("/reject/{job_id}") async def reject_job(job_id: str, request: Request): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") if job["status"] != "pending_review": raise HTTPException(status_code=400, detail=f"job is '{job['status']}', expected 'pending_review'") job["status"] = "rejected" job["diff"] = None job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] rejected") _write_last_run() if "text/html" in request.headers.get("accept", ""): return RedirectResponse(url="/", status_code=303) return {"status": "rejected", "job_id": job_id} @app.post("/cancel/{job_id}") async def cancel_job(job_id: str, request: Request): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") if job["status"] not in ("queued", "running"): raise HTTPException(status_code=400, detail=f"job is '{job['status']}', can only cancel queued/running") job["cancelled"] = True job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] cancel requested") _write_last_run() if "text/html" in request.headers.get("accept", ""): return RedirectResponse(url="/", status_code=303) return {"status": "cancelling", "job_id": job_id} @app.post("/retry/{job_id}") async def retry_job(job_id: str, request: Request, background_tasks: BackgroundTasks): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") if job["status"] not in ("pending_review", "rejected", "escalated", "failed", "cancelled"): raise HTTPException(status_code=400, detail=f"job is '{job['status']}', cannot retry") job["status"] = "queued" job["attempts"] = 0 job["cancelled"] = False job["error"] = None job["diff"] = None job["sandbox_file"] = None job["original"] = None job["updated_at"] = datetime.now(timezone.utc).isoformat() background_tasks.add_task(run_coding_loop, job_id) logger.info(f"[{job_id[:8]}] retrying") _write_last_run() if "text/html" in request.headers.get("accept", ""): return RedirectResponse(url="/", status_code=303) return {"status": "queued", "job_id": job_id} @app.post("/rebuild/{stack_name}") async def rebuild_stack(stack_name: str): """Run docker compose up -d --build for a named stack in /opt/stacks/.""" if not stack_name or "/" in stack_name or ".." in stack_name: raise HTTPException(status_code=400, detail="invalid stack name") stack_path = STACKS_DIR / stack_name if not stack_path.is_dir(): raise HTTPException(status_code=404, detail=f"stack '{stack_name}' not found in /opt/stacks/") compose_file = stack_path / "docker-compose.yml" if not compose_file.exists(): raise HTTPException(status_code=404, detail=f"no docker-compose.yml in {stack_name}") logger.info(f"rebuilding {stack_name}") result = subprocess.run( ["docker", "compose", "up", "-d", "--build"], cwd=str(stack_path), capture_output=True, text=True, timeout=300, ) success = result.returncode == 0 logger.info(f"rebuild {stack_name}: {'ok' if success else 'failed'}") return { "stack": stack_name, "success": success, "stdout": result.stdout, "stderr": result.stderr, } @app.post("/handoff/{job_id}") async def receive_handoff(job_id: str, request: Request): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") body = await request.body() code = body.decode() error = _syntax_check(code, Path(job["target_path"]).name) if error: raise HTTPException(status_code=400, detail=f"syntax error in submitted code: {error}") original = job.get("original") or (STACKS_DIR / job["target_path"]).read_text() sandbox_dir = SANDBOX_DIR / job_id sandbox_dir.mkdir(parents=True, exist_ok=True) sandbox_file = sandbox_dir / Path(job["target_path"]).name sandbox_file.write_text(code) job["status"] = "pending_review" job["diff"] = _make_diff(original, code, Path(job["target_path"]).name) job["sandbox_file"] = str(sandbox_file) job["original"] = original job["error"] = None job["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"[{job_id[:8]}] handoff received — pending review") _write_last_run() return {"status": "pending_review", "job_id": job_id}