fire-planner/fire_planner/__main__.py
2026-05-07 17:06:19 +00:00

259 lines
9.6 KiB
Python

"""click CLI entrypoint.
Sub-commands:
- migrate — alembic upgrade head
- ingest [wealthfolio] — load wealthfolio sqlite into account_snapshot
- simulate — run a single scenario, pretty-print
- recompute-all — run the 120-scenario Cartesian, persist all
- serve — run the FastAPI on-demand /recompute server
"""
from __future__ import annotations
import asyncio
import logging
import os
import subprocess
import sys
import time
from datetime import date
from decimal import Decimal
from pathlib import Path
import click
import numpy as np
from fire_planner.db import create_engine_from_env, make_session_factory
from fire_planner.glide_path import get as get_glide
from fire_planner.ingest import wealthfolio as wf_ingest
from fire_planner.reporters.cli import format_scenario
from fire_planner.reporters.pg import write_run
from fire_planner.returns.bootstrap import block_bootstrap
from fire_planner.returns.shiller import load_from_csv, synthetic_returns
from fire_planner.scenarios import (
ScenarioSpec,
build_regime_schedule,
build_strategy,
cartesian_scenarios,
)
from fire_planner.simulator import simulate
log = logging.getLogger(__name__)
@click.group()
def cli() -> None:
logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO"))
@cli.command()
def migrate() -> None:
"""Run `alembic upgrade head`."""
rc = subprocess.run(["alembic", "upgrade", "head"], check=False)
sys.exit(rc.returncode)
@cli.command("ingest")
@click.option("--source",
type=click.Choice(["wealthfolio"]),
default="wealthfolio",
help="Data source — currently only wealthfolio is wired.")
@click.option("--db-path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
required=False,
help="Local sqlite path (after kubectl exec). Required for --source=wealthfolio.")
@click.option("--as-of",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="Snapshot date to read; defaults to MAX(snapshot_date) in the sqlite.")
def ingest(source: str, db_path: Path | None, as_of: date | None) -> None:
"""Pull external state into fire_planner.account_snapshot."""
if source == "wealthfolio":
if db_path is None:
raise click.UsageError("--db-path is required for --source=wealthfolio")
asyncio.run(_ingest_wealthfolio(db_path, as_of))
async def _ingest_wealthfolio(db_path: Path, as_of: date | None) -> None:
rows = wf_ingest.read_account_snapshots(db_path, as_of=as_of)
if not rows:
click.echo("warning: no rows read — wealthfolio sqlite empty or schema unrecognised",
err=True)
engine = create_engine_from_env()
factory = make_session_factory(engine)
try:
async with factory() as sess:
n = await wf_ingest.upsert_snapshots(sess, rows)
await sess.commit()
click.echo(f"wealthfolio ingest: {n} rows upserted")
finally:
await engine.dispose()
def _build_paths(seed: int, n_paths: int, n_years: int, returns_csv: Path | None) -> np.ndarray:
"""Load returns from CSV (production) or synthetic (smoke tests)."""
if returns_csv and returns_csv.exists():
bundle = load_from_csv(returns_csv)
else:
bundle = synthetic_returns(seed=42)
rng = np.random.default_rng(seed)
return block_bootstrap(bundle, n_paths=n_paths, n_years=n_years, block_size=5, rng=rng)
@cli.command("simulate")
@click.option("--scenario",
required=True,
help="external_id, e.g. cyprus-vpw-leave-y3-glide-rising")
@click.option("--n-paths", type=int, default=10_000)
@click.option("--horizon", type=int, default=60)
@click.option("--spending", type=float, default=100_000.0)
@click.option("--nw-seed", type=float, default=1_000_000.0)
@click.option("--savings", type=float, default=0.0)
@click.option("--floor",
type=float,
default=None,
help="Real-GBP floor for vpw_floor strategy (e.g. 40000).")
@click.option("--returns-csv", type=click.Path(path_type=Path), default=None)
@click.option("--seed", type=int, default=42)
@click.option("--write-db/--no-write-db", default=False, help="Persist results to fire_planner DB.")
def simulate_cmd(
scenario: str,
n_paths: int,
horizon: int,
spending: float,
nw_seed: float,
savings: float,
floor: float | None,
returns_csv: Path | None,
seed: int,
write_db: bool,
) -> None:
"""Run one scenario by external_id and pretty-print the result."""
parts = scenario.split("-")
if len(parts) < 6 or parts[2] != "leave" or parts[4] != "glide":
raise click.UsageError(f"bad scenario id: {scenario!r} "
"(expected jurisdiction-strategy-leave-yN-glide-NAME)")
jurisdiction = parts[0]
# strategy may include underscore (e.g. guyton_klinger), so rebuild
strategy_end = scenario.index("-leave-")
strategy_name = scenario[len(jurisdiction) + 1:strategy_end]
leave_year = int(parts[parts.index("leave") + 1].lstrip("y"))
glide_name = scenario.split("-glide-")[1]
spec = ScenarioSpec(
jurisdiction=jurisdiction,
strategy=strategy_name,
leave_uk_year=leave_year,
glide_path=glide_name,
spending_gbp=Decimal(str(spending)),
nw_seed_gbp=Decimal(str(nw_seed)),
horizon_years=horizon,
savings_per_year_gbp=Decimal(str(savings)),
)
paths = _build_paths(seed, n_paths, horizon, returns_csv)
annual_savings = (np.full(horizon, savings, dtype=np.float64) if savings else None)
started = time.perf_counter()
result = simulate(
paths=paths,
initial_portfolio=nw_seed,
spending_target=spending,
glide=get_glide(glide_name),
strategy=build_strategy(strategy_name, floor=floor),
regime=build_regime_schedule(jurisdiction, leave_year),
horizon_years=horizon,
annual_savings=annual_savings,
)
elapsed = time.perf_counter() - started
click.echo(format_scenario(spec, result))
if write_db:
asyncio.run(_persist(spec, result, seed=seed, elapsed_seconds=elapsed))
click.echo(f"simulate: elapsed={elapsed:.2f}s")
async def _persist(spec: ScenarioSpec, result: object, *, seed: int,
elapsed_seconds: float) -> None:
engine = create_engine_from_env()
factory = make_session_factory(engine)
try:
async with factory() as sess:
from fire_planner.simulator import SimulationResult # local to avoid cycle
assert isinstance(result, SimulationResult)
await write_run(sess, spec, result, seed=seed, elapsed_seconds=elapsed_seconds)
await sess.commit()
finally:
await engine.dispose()
@cli.command("recompute-all")
@click.option("--n-paths", type=int, default=10_000)
@click.option("--horizon", type=int, default=60)
@click.option("--spending", type=float, default=100_000.0)
@click.option("--nw-seed", type=float, default=1_000_000.0)
@click.option("--savings", type=float, default=0.0)
@click.option("--floor",
type=float,
default=None,
help="Real-GBP floor — applied to vpw_floor scenarios in the sweep.")
@click.option("--returns-csv", type=click.Path(path_type=Path), default=None)
@click.option("--seed", type=int, default=42)
def recompute_all(n_paths: int, horizon: int, spending: float, nw_seed: float, savings: float,
floor: float | None, returns_csv: Path | None, seed: int) -> None:
"""Run the full Cartesian (default 120 scenarios) and persist."""
asyncio.run(
_recompute_all(n_paths, horizon, spending, nw_seed, savings, floor, returns_csv, seed))
async def _recompute_all(
n_paths: int,
horizon: int,
spending: float,
nw_seed: float,
savings: float,
floor: float | None,
returns_csv: Path | None,
seed: int,
) -> None:
paths = _build_paths(seed, n_paths, horizon, returns_csv)
specs = cartesian_scenarios(
spending_gbp=Decimal(str(spending)),
nw_seed_gbp=Decimal(str(nw_seed)),
savings_per_year_gbp=Decimal(str(savings)),
horizon_years=horizon,
)
annual_savings = (np.full(horizon, savings, dtype=np.float64) if savings else None)
engine = create_engine_from_env()
factory = make_session_factory(engine)
successes = 0
try:
async with factory() as sess:
for spec in specs:
started = time.perf_counter()
result = simulate(
paths=paths,
initial_portfolio=nw_seed,
spending_target=spending,
glide=get_glide(spec.glide_path),
strategy=build_strategy(spec.strategy, floor=floor),
regime=build_regime_schedule(spec.jurisdiction, spec.leave_uk_year),
horizon_years=horizon,
annual_savings=annual_savings,
)
elapsed = time.perf_counter() - started
await write_run(sess, spec, result, seed=seed, elapsed_seconds=elapsed)
successes += 1
click.echo(f"{spec.external_id}: success={result.success_rate*100:.1f}% "
f"elapsed={elapsed:.2f}s")
await sess.commit()
finally:
await engine.dispose()
click.echo(f"recompute-all done: {successes}/{len(specs)} scenarios written")
@cli.command()
def serve() -> None:
"""Run the FastAPI on-demand /recompute server."""
import uvicorn
uvicorn.run("fire_planner.app:app", host="0.0.0.0", port=8080)
if __name__ == "__main__":
cli()