api: expand FastAPI surface for scenarios, networth, life-events, goals, simulate
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Adds the read+write endpoints the frontend needs to drive a
ProjectionLab-style UX on top of the existing engine.
- /networth, /networth/history — NW total + per-account from
account_snapshot (frontend chart)
- /scenarios CRUD + projection — list/get/create/patch/delete user
scenarios; cartesian read-only
- /scenarios/{id}/life-events — life event CRUD nested under scenario
- /life-events/{id} — patch + delete by id
- /scenarios/{id}/goals,
/goals/{id} — retirement goal CRUD
- /simulate, /compare — sync, no-DB-write what-if endpoints
Auth: Bearer-token dependency on writes + simulate when API_BEARER_TOKEN
is set; reads always open (lock down via Authentik-fronted ingress in
prod). Existing /recompute keeps its bearer auth.
CORS middleware reads FRONTEND_ORIGINS (comma-separated) for the dev
SPA. Lifespan now provisions the SQLAlchemy engine + session_factory
on app.state and disposes them on shutdown.
40 new tests covering happy paths and validation. 172 tests total.
mypy strict + ruff clean (B008 ignore added — Depends() in defaults
is the canonical FastAPI pattern, not a bug).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
31193faf08
commit
ee6ed1d3c4
15 changed files with 1570 additions and 74 deletions
|
|
@ -1,67 +1,131 @@
|
|||
"""FastAPI on-demand /recompute endpoint.
|
||||
"""FastAPI application — wires routers + middleware + lifespan.
|
||||
|
||||
Single deployment. Bearer-token auth (matches payslip-ingest pattern).
|
||||
The endpoint kicks the full 120-scenario Cartesian recompute against
|
||||
whatever the latest Wealthfolio snapshot is in `account_snapshot`.
|
||||
Routers:
|
||||
- /healthz, /metrics, /recompute — operational
|
||||
- /networth, /networth/history — read NW from account_snapshot
|
||||
- /scenarios/... — scenario CRUD + projection
|
||||
- /scenarios/{id}/life-events,
|
||||
/life-events/{id} — life event CRUD
|
||||
- /scenarios/{id}/goals,
|
||||
/goals/{id} — retirement goal CRUD
|
||||
- /simulate, /compare — sync simulate (no DB write)
|
||||
|
||||
For dev / smoke tests, a `/healthz` endpoint reports queue depth.
|
||||
Auth: write/compute paths take Bearer auth via the `require_bearer`
|
||||
dependency when `API_BEARER_TOKEN` is set. Read paths skip auth so the
|
||||
local frontend can hit them without juggling tokens — production
|
||||
deploys lock those down via Authentik-fronted ingress.
|
||||
|
||||
CORS: enabled for the frontend dev server. Comma-separated origins
|
||||
in `FRONTEND_ORIGINS` (defaults to a typical Vite localhost).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import hmac
|
||||
import logging
|
||||
import os
|
||||
from collections.abc import AsyncIterator
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any
|
||||
|
||||
from fastapi import FastAPI, Header, HTTPException, status
|
||||
from fastapi import Depends, FastAPI, status
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from prometheus_fastapi_instrumentator import Instrumentator
|
||||
|
||||
from fire_planner.api.auth import require_bearer
|
||||
from fire_planner.api.goals import router as goals_router
|
||||
from fire_planner.api.life_events import router as life_events_router
|
||||
from fire_planner.api.networth import router as networth_router
|
||||
from fire_planner.api.scenarios import router as scenarios_router
|
||||
from fire_planner.api.simulate import router as simulate_router
|
||||
from fire_planner.db import create_engine_from_env, make_session_factory
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
REQUIRED_ENV = ["DB_CONNECTION_STRING", "RECOMPUTE_BEARER_TOKEN"]
|
||||
|
||||
|
||||
def _verify_env() -> None:
|
||||
missing = [k for k in REQUIRED_ENV if not os.environ.get(k)]
|
||||
if missing:
|
||||
raise RuntimeError(f"Missing required env vars: {', '.join(missing)}")
|
||||
|
||||
|
||||
def _verify_bearer(authorization: str | None, expected: str) -> None:
|
||||
if not expected:
|
||||
raise HTTPException(status_code=401, detail="Service unauthenticated")
|
||||
if not authorization or not authorization.startswith("Bearer "):
|
||||
raise HTTPException(status_code=401, detail="Missing bearer token")
|
||||
token = authorization.removeprefix("Bearer ")
|
||||
if not hmac.compare_digest(token, expected):
|
||||
raise HTTPException(status_code=401, detail="Invalid token")
|
||||
def _frontend_origins() -> list[str]:
|
||||
raw = os.environ.get(
|
||||
"FRONTEND_ORIGINS",
|
||||
"http://localhost:5173,http://localhost:4173,http://127.0.0.1:5173",
|
||||
)
|
||||
return [s.strip() for s in raw.split(",") if s.strip()]
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
|
||||
_verify_env()
|
||||
queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
|
||||
app.state.queue = queue
|
||||
yield
|
||||
if os.environ.get("DB_CONNECTION_STRING"):
|
||||
engine = create_engine_from_env()
|
||||
app.state.engine = engine
|
||||
app.state.session_factory = make_session_factory(engine)
|
||||
else:
|
||||
# Tests inject these via dependency_overrides; nothing to wire.
|
||||
log.warning("DB_CONNECTION_STRING unset; skipping engine init")
|
||||
|
||||
worker = asyncio.create_task(_drain_queue(app))
|
||||
app.state._worker = worker
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
worker.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await worker
|
||||
eng = getattr(app.state, "engine", None)
|
||||
if eng is not None:
|
||||
await eng.dispose()
|
||||
|
||||
|
||||
async def _drain_queue(app: FastAPI) -> None:
|
||||
"""Background task draining the recompute queue. Each item kicks
|
||||
a full Cartesian recompute. Errors logged, don't crash."""
|
||||
queue: asyncio.Queue[dict[str, Any]] = app.state.queue
|
||||
while True:
|
||||
item = await queue.get()
|
||||
try:
|
||||
from fire_planner.__main__ import _recompute_all
|
||||
await _recompute_all(
|
||||
n_paths=int(item.get("n_paths", 10_000)),
|
||||
horizon=int(item.get("horizon", 60)),
|
||||
spending=float(item.get("spending", 100_000.0)),
|
||||
nw_seed=float(item.get("nw_seed", 1_000_000.0)),
|
||||
savings=float(item.get("savings", 0.0)),
|
||||
floor=(float(item["floor"]) if item.get("floor") is not None else None),
|
||||
returns_csv=item.get("returns_csv"),
|
||||
seed=int(item.get("seed", 42)),
|
||||
)
|
||||
except Exception:
|
||||
log.exception("recompute failed")
|
||||
finally:
|
||||
queue.task_done()
|
||||
|
||||
|
||||
app = FastAPI(title="fire-planner", lifespan=lifespan)
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=_frontend_origins(),
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
Instrumentator().instrument(app).expose(app, endpoint="/metrics")
|
||||
|
||||
app.include_router(networth_router)
|
||||
app.include_router(scenarios_router)
|
||||
app.include_router(life_events_router)
|
||||
app.include_router(goals_router)
|
||||
app.include_router(simulate_router)
|
||||
|
||||
@app.post("/recompute", status_code=status.HTTP_202_ACCEPTED)
|
||||
async def recompute(
|
||||
payload: dict[str, Any] | None = None,
|
||||
authorization: str | None = Header(default=None),
|
||||
) -> dict[str, Any]:
|
||||
_verify_bearer(authorization, os.environ.get("RECOMPUTE_BEARER_TOKEN", ""))
|
||||
|
||||
@app.post(
|
||||
"/recompute",
|
||||
status_code=status.HTTP_202_ACCEPTED,
|
||||
dependencies=[Depends(require_bearer)],
|
||||
)
|
||||
async def recompute(payload: dict[str, Any] | None = None) -> dict[str, Any]:
|
||||
"""Queue a full Cartesian recompute (async, persisted). Returns 202."""
|
||||
queue: asyncio.Queue[dict[str, Any]] = app.state.queue
|
||||
body = payload or {}
|
||||
await queue.put(body)
|
||||
await queue.put(payload or {})
|
||||
return {"status": "accepted", "depth": queue.qsize()}
|
||||
|
||||
|
||||
|
|
@ -70,43 +134,3 @@ async def healthz() -> dict[str, Any]:
|
|||
queue = getattr(app.state, "queue", None)
|
||||
depth = queue.qsize() if queue is not None else 0
|
||||
return {"status": "ok", "queue_depth": depth}
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def _drain_loop() -> None:
|
||||
"""Background task to drain the recompute queue. Each item kicks
|
||||
a full Cartesian recompute. Errors get logged but don't crash."""
|
||||
queue: asyncio.Queue[dict[str, Any]] = app.state.queue
|
||||
|
||||
async def worker() -> None:
|
||||
while True:
|
||||
item = await queue.get()
|
||||
try:
|
||||
# Avoid heavy import unless we actually have work.
|
||||
from fire_planner.__main__ import _recompute_all
|
||||
await _recompute_all(
|
||||
n_paths=int(item.get("n_paths", 10_000)),
|
||||
horizon=int(item.get("horizon", 60)),
|
||||
spending=float(item.get("spending", 100_000.0)),
|
||||
nw_seed=float(item.get("nw_seed", 1_000_000.0)),
|
||||
savings=float(item.get("savings", 0.0)),
|
||||
floor=(float(item["floor"]) if item.get("floor") is not None else None),
|
||||
returns_csv=item.get("returns_csv"),
|
||||
seed=int(item.get("seed", 42)),
|
||||
)
|
||||
except Exception:
|
||||
log.exception("recompute failed")
|
||||
finally:
|
||||
queue.task_done()
|
||||
|
||||
task = asyncio.create_task(worker())
|
||||
app.state._worker = task
|
||||
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def _stop_worker() -> None:
|
||||
task = getattr(app.state, "_worker", None)
|
||||
if task is not None:
|
||||
task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await task
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue