fire-planner/fire_planner/api/simulate.py
Viktor Barzin 2fc92c12f5
Some checks failed
ci/woodpecker/push/woodpecker Pipeline was canceled
engine+api: plumb life events into the simulator
Until now life events were stored but ignored by the engine — pure
metadata. Now they actually move portfolios.

Engine:
- simulator.simulate() takes optional cashflow_adjustments: a (n_years,)
  real-GBP array applied each year *after* savings + return but
  *before* withdrawal. Positive = inflow, negative = outflow.
- New fire_planner/life_events.py with EventInput dataclass +
  events_to_cashflow_array(events, horizon). Handles ranged deltas,
  one-time amounts, disabled events, year clipping past horizon,
  negative year_start (clipped to 0), and summing multiple events.

API:
- /simulate accepts optional life_events list. Server converts each
  to EventInput, builds cashflow_adjustments, passes to simulate().
- Frontend Run-now on scenario detail now fetches the scenario's
  life events and includes them in the request — projections finally
  reflect "retire at 50, kid born at y3, inheritance at y22".

Tests: 11 events helper + 4 end-to-end engine + 1 API integration =
16 new tests. 188 total (was 172). mypy strict + ruff clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-09 22:30:33 +00:00

142 lines
6.2 KiB
Python

"""Sync simulate + multi-scenario compare.
Unlike the persisted Cartesian recompute (`/recompute`), these run a
single scenario inline and return the result immediately. The React UI
uses these for what-if exploration — no DB write.
Returns a fan-chart series in the same shape as
`GET /scenarios/{id}/projection`, so frontend chart code is shared.
"""
from __future__ import annotations
import asyncio
import time
from decimal import Decimal
from pathlib import Path
import numpy as np
from fastapi import APIRouter, Depends, HTTPException
from fire_planner.api.auth import require_bearer
from fire_planner.api.schemas import (
CompareRequest,
CompareResult,
ProjectionPoint,
SimulateRequest,
SimulateResult,
)
from fire_planner.glide_path import get as get_glide
from fire_planner.life_events import EventInput, events_to_cashflow_array
from fire_planner.returns.bootstrap import block_bootstrap
from fire_planner.returns.shiller import load_from_csv, synthetic_returns
from fire_planner.scenarios import build_regime_schedule, build_strategy
from fire_planner.simulator import SimulationResult, simulate
router = APIRouter(tags=["simulate"], dependencies=[Depends(require_bearer)])
_RETURNS_CSV = Path("/data/shiller_returns.csv")
def _load_paths(seed: int, n_paths: int, n_years: int) -> np.ndarray:
bundle = (load_from_csv(_RETURNS_CSV) if _RETURNS_CSV.exists() else synthetic_returns(seed=42))
rng = np.random.default_rng(seed)
return block_bootstrap(bundle, n_paths=n_paths, n_years=n_years, block_size=5, rng=rng)
def _project(req: SimulateRequest) -> tuple[SimulationResult, float]:
paths = _load_paths(req.seed, req.n_paths, req.horizon_years)
annual_savings = (np.full(req.horizon_years, float(req.savings_per_year_gbp), dtype=np.float64)
if req.savings_per_year_gbp > 0 else None)
floor = float(req.floor_gbp) if req.floor_gbp is not None else None
cashflow_adjustments = None
if req.life_events:
engine_events = [
EventInput(
year_start=ev.year_start,
year_end=ev.year_end,
delta_gbp_per_year=float(ev.delta_gbp_per_year),
one_time_amount_gbp=(float(ev.one_time_amount_gbp)
if ev.one_time_amount_gbp is not None else None),
enabled=ev.enabled,
) for ev in req.life_events
]
cashflow_adjustments = events_to_cashflow_array(engine_events, req.horizon_years)
started = time.perf_counter()
result = simulate(
paths=paths,
initial_portfolio=float(req.nw_seed_gbp),
spending_target=float(req.spending_gbp),
glide=get_glide(req.glide_path),
strategy=build_strategy(req.strategy, floor=floor),
regime=build_regime_schedule(req.jurisdiction, req.leave_uk_year),
horizon_years=req.horizon_years,
annual_savings=annual_savings,
cashflow_adjustments=cashflow_adjustments,
)
elapsed = time.perf_counter() - started
return result, elapsed
def _to_response(result: SimulationResult, elapsed: float) -> SimulateResult:
# portfolio_real has n_years+1 columns (year 0 = seed, year k = end-of-year k).
# withdrawal_real / tax_real have n_years columns (year k = withdrawn in year k+1).
# Yearly point k describes "end of year k+1": portfolio after withdrawal & growth.
pcts = [10, 25, 50, 75, 90]
portfolio_quantiles = {p: np.percentile(result.portfolio_real, p, axis=0) for p in pcts}
median_wd = np.percentile(result.withdrawal_real, 50, axis=0)
median_tax = np.percentile(result.tax_real, 50, axis=0)
n_years = result.n_years
survival_path = (result.success_mask.astype(np.float64).mean(axis=0) if
result.success_mask.ndim == 2 else np.ones(n_years))
yearly = [
ProjectionPoint(
year_idx=y,
p10_portfolio_gbp=Decimal(str(round(float(portfolio_quantiles[10][y + 1]), 2))),
p25_portfolio_gbp=Decimal(str(round(float(portfolio_quantiles[25][y + 1]), 2))),
p50_portfolio_gbp=Decimal(str(round(float(portfolio_quantiles[50][y + 1]), 2))),
p75_portfolio_gbp=Decimal(str(round(float(portfolio_quantiles[75][y + 1]), 2))),
p90_portfolio_gbp=Decimal(str(round(float(portfolio_quantiles[90][y + 1]), 2))),
p50_withdrawal_gbp=Decimal(str(round(float(median_wd[y]), 2))),
p50_tax_gbp=Decimal(str(round(float(median_tax[y]), 2))),
survival_rate=Decimal(str(round(float(survival_path[y]), 4))),
) for y in range(n_years)
]
median_ytr = result.median_years_to_ruin()
return SimulateResult(
success_rate=Decimal(str(round(float(result.success_rate), 4))),
p10_ending_gbp=Decimal(str(round(float(result.ending_percentile(10)), 2))),
p50_ending_gbp=Decimal(str(round(float(result.ending_percentile(50)), 2))),
p90_ending_gbp=Decimal(str(round(float(result.ending_percentile(90)), 2))),
median_lifetime_tax_gbp=Decimal(str(round(float(result.median_lifetime_tax()), 2))),
median_years_to_ruin=(Decimal(str(round(float(median_ytr), 2)))
if median_ytr is not None else None),
elapsed_seconds=Decimal(str(round(elapsed, 3))),
yearly=yearly,
)
@router.post("/simulate", response_model=SimulateResult)
async def simulate_one(req: SimulateRequest) -> SimulateResult:
"""Run one scenario synchronously, no DB write. ~1-3s for 5k paths."""
try:
result, elapsed = await asyncio.to_thread(_project, req)
except KeyError as e:
raise HTTPException(status_code=400, detail=f"Unknown name: {e}") from None
return _to_response(result, elapsed)
@router.post("/compare", response_model=CompareResult)
async def compare_scenarios(req: CompareRequest) -> CompareResult:
"""Run 2-5 scenarios in parallel, return all results."""
async def one(s: SimulateRequest) -> SimulateResult:
result, elapsed = await asyncio.to_thread(_project, s)
return _to_response(result, elapsed)
try:
results = await asyncio.gather(*(one(s) for s in req.scenarios))
except KeyError as e:
raise HTTPException(status_code=400, detail=f"Unknown name: {e}") from None
return CompareResult(results=results)