claude-agent-service/app/breakglass/server.py
Viktor Barzin 4f361d91eb
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
breakglass: in-cluster emergency-recovery UI for the devvm
Viktor wanted a web UI on the claude service to act as his breakglass when
the devvm is down: open it, have Claude SSH in to diagnose/repair, and
power-cycle the VM via the Proxmox host if needed. This is the app half
(the infra stack + host bootstrap live in the infra repo).

New, ISOLATED ASGI app under app/breakglass/ (never imports app.main, so the
untrusted-input agents — recruiter-triage, nextcloud-todos — can't share a
process with the root-on-devvm / PVE-reset SSH key):
- pve.py: the LLM-independent power-verb path (status|forensics|reset|stop|
  start|cycle on VM 102), whitelist-validated client-side, executed over the
  forced-command SSH key (list argv, no shell).
- agent_session.py: multi-turn streamed chat — claude -p --session-id /
  --resume with --output-format stream-json, translated to a small SSE
  vocabulary (session/text/tool/result/error/done).
- auth.py: edge Authentik header OR bearer; fail-closed.
- server.py: FastAPI (session/chat-SSE/pve-verb routes) + serves the Svelte UI.
- Svelte SPA (frontend/, built into app/breakglass/static/ and committed — no
  in-cluster build, per ADR-0002): streamed chat + danger-styled manual VM
  controls with confirm-on-mutate.
- agents/breakglass.md: narrow tools (Bash/Read/Grep/Glob, no web), taught the
  ssh devvm / ssh pve aliases and cycle-vs-reset.
- docker-entrypoint-breakglass.sh: ssh-agent bootstrap from the mounted key +
  ssh aliases, then uvicorn app.breakglass.server. The breakglass Deployment
  overrides the image CMD with this; the existing service is untouched.

26 new tests (verb whitelist incl. injection attempts, stream-json→SSE
translation, auth gating, route behaviour); full suite 58 green.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 21:36:05 +00:00

96 lines
3.3 KiB
Python

"""Breakglass FastAPI app — the in-cluster emergency recovery UI.
Routes:
GET /health — liveness (no auth)
GET / — the single-page UI (static)
POST /api/session — open a chat session, returns {session_id}
POST /api/chat — run one turn, streams SSE events (text/tool/result)
POST /api/pve/{verb} — LLM-independent PVE power verb (manual buttons)
GET /api/pve/verbs — list allowed verbs + which mutate
Everything under /api requires auth (edge Authentik header or bearer token).
"""
import json
import os
import uuid
from fastapi import Depends, FastAPI, HTTPException
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from . import agent_session, config, pve
from .auth import require_auth
app = FastAPI(title="Claude Breakglass")
_STATIC_DIR = os.path.join(os.path.dirname(__file__), "static")
class SessionResponse(BaseModel):
session_id: str
class ChatRequest(BaseModel):
session_id: str
prompt: str = Field(..., min_length=1)
model: str | None = None
@app.get("/health")
async def health():
return {"status": "ok", "service": "claude-breakglass"}
@app.post("/api/session", response_model=SessionResponse)
async def open_session(_identity: str = Depends(require_auth)):
# Claude wants a UUID for --session-id.
return SessionResponse(session_id=str(uuid.uuid4()))
@app.post("/api/chat")
async def chat(req: ChatRequest, _identity: str = Depends(require_auth)):
"""Stream one chat turn as Server-Sent Events. The browser reads the
response body incrementally (fetch + ReadableStream)."""
async def _sse():
try:
async for ev in agent_session.run_turn(req.session_id, req.prompt, req.model):
yield f"data: {json.dumps(ev)}\n\n"
except Exception as exc: # noqa: BLE001 — surface any failure to the UI
yield f"data: {json.dumps({'kind': 'error', 'error': str(exc)[:500]})}\n\n"
yield f"data: {json.dumps({'kind': 'done'})}\n\n"
return StreamingResponse(
_sse(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@app.get("/api/pve/verbs")
async def pve_verbs(_identity: str = Depends(require_auth)):
return {
"verbs": sorted(pve.ALLOWED_VERBS),
"mutating": sorted(pve.MUTATING_VERBS),
}
@app.post("/api/pve/{verb}")
async def pve_verb(verb: str, _identity: str = Depends(require_auth)):
"""Run a PVE power verb directly (no LLM in the path). Mutating verbs
capture forensics first on the host, unconditionally."""
if not pve.is_allowed(verb):
raise HTTPException(status_code=400, detail=f"unknown verb '{verb}'")
result = await pve.run_verb(verb)
status = 200 if result.get("exit_code") == 0 else 502
return JSONResponse(status_code=status, content=result)
# Serve the SPA. Mounted last so it doesn't shadow /api or /health.
if os.path.isdir(_STATIC_DIR):
@app.get("/")
async def index():
return FileResponse(os.path.join(_STATIC_DIR, "index.html"))
app.mount("/", StaticFiles(directory=_STATIC_DIR, html=True), name="static")