From 83a933ea1a00709a093efec839f820760ee176ce Mon Sep 17 00:00:00 2001 From: Jaco Bezuidenhout Date: Sat, 30 May 2026 14:20:47 +0000 Subject: [PATCH] feat: LLM task extraction, token tracking, direct Claude brain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - brain.py: prefers direct Anthropic API (ANTHROPIC_API_KEY) over Hermes for all LLM calls — ~22x cheaper (122 tokens vs 5600+ Hermes overhead). Falls back to Hermes then Ollama if key unavailable. extract_task_fields(): non-streaming call returns clean {title, project} from any natural language phrasing — no more regex whack-a-mole. - token_log.py: appends every LLM call to token-usage.jsonl with intent, in/out token counts, and USD cost. get_summary() aggregates all-time, today, and per-intent breakdowns. - main.py: task handler uses extract_task_fields() with regex fallback; streaming handler captures usage from final chunk; GET /usage endpoint returns live cost summary. Co-Authored-By: Claude Sonnet 4.6 --- app/brain.py | 105 ++++++++++++++++++++++++++++++++++------------- app/main.py | 40 ++++++++++++------ app/token_log.py | 79 +++++++++++++++++++++++++++++++++++ 3 files changed, 184 insertions(+), 40 deletions(-) create mode 100644 app/token_log.py diff --git a/app/brain.py b/app/brain.py index 8d7569e..fd52146 100644 --- a/app/brain.py +++ b/app/brain.py @@ -1,3 +1,4 @@ +import json import logging import os @@ -6,41 +7,89 @@ import litellm logger = logging.getLogger("jon-snow.brain") litellm.set_verbose = False -FAST_MODEL = os.getenv("FAST_MODEL", "ollama/gemma4") -SMART_MODEL = os.getenv("SMART_MODEL", "ollama/gemma4") +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") +CLAUDE_MODEL = os.getenv("CLAUDE_MODEL", "claude-sonnet-4-6") + +FAST_MODEL = os.getenv("FAST_MODEL", "ollama/llama3.1:8b") OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://172.27.40.20:11434") + HERMES_URL = os.getenv("HERMES_URL", "") HERMES_API_KEY = os.getenv("HERMES_API_KEY", "none") +EXTRACT_SYSTEM = ( + "Extract the task title and destination project from the user message.\n" + "Rules:\n" + "- title: the actual task to be done, stripped of all filler " + "(no 'please add', 'a work item', 'a job item', 'we need to', etc.)\n" + "- project: the client or project name if mentioned, otherwise null\n" + "Reply with JSON only, no other text: " + "{\"title\": \"...\", \"project\": \"...\" or null}" +) + + +def _anthropic_kwargs() -> dict: + return {"api_key": ANTHROPIC_API_KEY, "model": f"anthropic/{CLAUDE_MODEL}"} + + +def _hermes_kwargs() -> dict: + return {"model": "openai/hermes-agent", "api_base": HERMES_URL, "api_key": HERMES_API_KEY} + + +def _ollama_kwargs() -> dict: + return {"model": FAST_MODEL, "api_base": OLLAMA_BASE_URL} + + +def _primary_kwargs() -> dict: + if ANTHROPIC_API_KEY: + return _anthropic_kwargs() + if HERMES_URL: + return _hermes_kwargs() + return _ollama_kwargs() + + +async def extract_task_fields(message: str) -> tuple[dict, dict]: + """Returns (fields, usage). + fields = {"title": str, "project": str | None} + usage = {"prompt_tokens": int, "completion_tokens": int} + """ + prompt = [ + {"role": "system", "content": EXTRACT_SYSTEM}, + {"role": "user", "content": message}, + ] + try: + resp = await litellm.acompletion(stream=False, messages=prompt, **_primary_kwargs()) + usage = { + "prompt_tokens": resp.usage.prompt_tokens if resp.usage else 0, + "completion_tokens": resp.usage.completion_tokens if resp.usage else 0, + } + content = resp.choices[0].message.content.strip() + # Strip markdown code fences if model wraps the JSON + if content.startswith("```"): + content = content.split("```")[1] + if content.startswith("json"): + content = content[4:] + fields = json.loads(content.strip()) + return fields, usage + except Exception as e: + logger.warning(f"extract_task_fields failed: {e}") + return {"title": None, "project": None}, {"prompt_tokens": 0, "completion_tokens": 0} + async def stream_completion(messages: list[dict], use_smart: bool = False): - if HERMES_URL: - logger.info("Brain: routing to Hermes cloud (claude-sonnet-4-6)") - return await litellm.acompletion( - model="openai/hermes-agent", - messages=messages, - stream=True, - api_base=HERMES_URL, - api_key=HERMES_API_KEY, - ) + """Streaming LLM call. Returns (stream, usage_future) where usage is captured + from the final chunk when stream_options include_usage is supported.""" + kwargs = _primary_kwargs() + extra = {} + # Request usage in final streaming chunk (supported by Anthropic + OpenAI) + if ANTHROPIC_API_KEY or HERMES_URL: + extra["stream_options"] = {"include_usage": True} - model = SMART_MODEL if use_smart else FAST_MODEL - logger.info(f"Brain: model={model} smart={use_smart}") + logger.info(f"Brain: model={kwargs.get('model')} smart={use_smart}") try: - return await litellm.acompletion( - model=model, - messages=messages, - stream=True, - api_base=OLLAMA_BASE_URL if model.startswith("ollama/") else None, - ) + return await litellm.acompletion(stream=True, messages=messages, **kwargs, **extra) except Exception as e: - logger.error(f"Brain error ({model}): {e}") - if use_smart and model != FAST_MODEL: - logger.info("Falling back to FAST_MODEL") - return await litellm.acompletion( - model=FAST_MODEL, - messages=messages, - stream=True, - api_base=OLLAMA_BASE_URL if FAST_MODEL.startswith("ollama/") else None, - ) + logger.error(f"Brain error: {e}") + if HERMES_URL and not ANTHROPIC_API_KEY: + logger.info("Falling back to Ollama") + return await litellm.acompletion(stream=True, messages=messages, **_ollama_kwargs()) raise diff --git a/app/main.py b/app/main.py index d954517..28ffb84 100644 --- a/app/main.py +++ b/app/main.py @@ -14,7 +14,9 @@ from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from pydantic import BaseModel from .approval import cleanup_expired, generate_token, pop_action, queue_action, verify_token -from .brain import stream_completion +from .brain import extract_task_fields, stream_completion +from .token_log import get_summary as get_token_summary +from .token_log import log_usage from .intent import ( classify_intent, extract_agent_name, @@ -158,24 +160,29 @@ async def _stream_text(text: str) -> AsyncGenerator[str, None]: yield _sse_done(chunk_id) -async def _stream_llm(messages: list[dict], use_smart: bool = False) -> AsyncGenerator[str, None]: +async def _stream_llm( + messages: list[dict], use_smart: bool = False, intent: str = "planning" +) -> AsyncGenerator[str, None]: chunk_id = f"chatcmpl-{int(time.time())}" - collected = [] + prompt_tokens = completion_tokens = 0 try: response = await stream_completion(messages, use_smart=use_smart) async for chunk in response: if chunk.choices and chunk.choices[0].delta.content: content = chunk.choices[0].delta.content - collected.append(content) yield _sse_chunk(content, chunk_id) + # Capture usage from final chunk (stream_options include_usage) + if hasattr(chunk, "usage") and chunk.usage: + prompt_tokens = chunk.usage.prompt_tokens or 0 + completion_tokens = chunk.usage.completion_tokens or 0 yield _sse_done(chunk_id) except Exception as e: logger.error(f"LLM stream error: {e}") - error_msg = f"Error reaching LLM: {e}" - async for part in _stream_text(error_msg): + async for part in _stream_text(f"Error reaching LLM: {e}"): yield part - collected.append(error_msg) - return + finally: + if prompt_tokens or completion_tokens: + log_usage(intent, prompt_tokens, completion_tokens) # --- Routes --- @@ -185,6 +192,11 @@ async def health(): return {"status": "ok", "agent": "jon-snow", "version": "0.3.0"} +@app.get("/usage") +async def usage(): + return get_token_summary() + + @app.get("/v1/models") async def list_models(): return { @@ -503,9 +515,13 @@ async def chat_completions(req: ChatRequest): yield chunk elif intent == "task": - # Live destination match first, static keyword map as fallback - project_hint = extract_task_destination(user_message) or extract_project_name(user_message) - title = extract_task_title(user_message) + # LLM extraction — handles any natural language phrasing + fields, usage = await extract_task_fields(user_message) + log_usage("task_extract", usage["prompt_tokens"], usage["completion_tokens"]) + + title = fields.get("title") or extract_task_title(user_message) + project_hint = fields.get("project") or extract_project_name(user_message) + try: issue = await create_plane_issue(title, project_hint) response_text = ( @@ -521,7 +537,7 @@ async def chat_completions(req: ChatRequest): yield chunk else: # planning / general - async for chunk in _stream_llm(messages, use_smart=True): + async for chunk in _stream_llm(messages, use_smart=True, intent="planning"): yield chunk summary = f"Planning query: {user_message[:100]}" diff --git a/app/token_log.py b/app/token_log.py new file mode 100644 index 0000000..a094e8f --- /dev/null +++ b/app/token_log.py @@ -0,0 +1,79 @@ +import json +import os +from datetime import datetime, timezone +from pathlib import Path + +AGENT_OS_DIR = Path(os.getenv("AGENT_OS_DIR", "/opt/agent-os")) +TOKEN_LOG = AGENT_OS_DIR / "logs" / "jon-snow" / "token-usage.jsonl" + +# claude-sonnet-4-6 pricing (USD per token) +INPUT_COST_PER_TOKEN = 3.00 / 1_000_000 +OUTPUT_COST_PER_TOKEN = 15.00 / 1_000_000 + + +def log_usage(intent: str, prompt_tokens: int, completion_tokens: int) -> None: + TOKEN_LOG.parent.mkdir(parents=True, exist_ok=True) + cost = (prompt_tokens * INPUT_COST_PER_TOKEN) + (completion_tokens * OUTPUT_COST_PER_TOKEN) + entry = { + "ts": datetime.now(timezone.utc).isoformat(), + "intent": intent, + "in": prompt_tokens, + "out": completion_tokens, + "cost_usd": round(cost, 6), + } + with TOKEN_LOG.open("a") as f: + f.write(json.dumps(entry) + "\n") + + +def get_summary() -> dict: + if not TOKEN_LOG.exists(): + return _empty() + + entries = [] + for line in TOKEN_LOG.read_text().splitlines(): + line = line.strip() + if line: + try: + entries.append(json.loads(line)) + except Exception: + pass + + if not entries: + return _empty() + + today = datetime.now(timezone.utc).date().isoformat() + today_entries = [e for e in entries if e["ts"][:10] == today] + + by_intent: dict = {} + for e in entries: + b = by_intent.setdefault(e["intent"], {"calls": 0, "in": 0, "out": 0, "cost_usd": 0.0}) + b["calls"] += 1 + b["in"] += e["in"] + b["out"] += e["out"] + b["cost_usd"] = round(b["cost_usd"] + e["cost_usd"], 6) + + return { + "all_time": { + "calls": len(entries), + "tokens_in": sum(e["in"] for e in entries), + "tokens_out": sum(e["out"] for e in entries), + "cost_usd": round(sum(e["cost_usd"] for e in entries), 6), + }, + "today": { + "calls": len(today_entries), + "tokens_in": sum(e["in"] for e in today_entries), + "tokens_out": sum(e["out"] for e in today_entries), + "cost_usd": round(sum(e["cost_usd"] for e in today_entries), 6), + }, + "by_intent": by_intent, + "recent": entries[-20:], + } + + +def _empty() -> dict: + return { + "all_time": {"calls": 0, "tokens_in": 0, "tokens_out": 0, "cost_usd": 0.0}, + "today": {"calls": 0, "tokens_in": 0, "tokens_out": 0, "cost_usd": 0.0}, + "by_intent": {}, + "recent": [], + }