259 lines
9.6 KiB
Python
259 lines
9.6 KiB
Python
"""click CLI entrypoint.
|
|
|
|
Sub-commands:
|
|
- migrate — alembic upgrade head
|
|
- ingest [wealthfolio] — load wealthfolio sqlite into account_snapshot
|
|
- simulate — run a single scenario, pretty-print
|
|
- recompute-all — run the 120-scenario Cartesian, persist all
|
|
- serve — run the FastAPI on-demand /recompute server
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from datetime import date
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
|
|
import click
|
|
import numpy as np
|
|
|
|
from fire_planner.db import create_engine_from_env, make_session_factory
|
|
from fire_planner.glide_path import get as get_glide
|
|
from fire_planner.ingest import wealthfolio as wf_ingest
|
|
from fire_planner.reporters.cli import format_scenario
|
|
from fire_planner.reporters.pg import write_run
|
|
from fire_planner.returns.bootstrap import block_bootstrap
|
|
from fire_planner.returns.shiller import load_from_csv, synthetic_returns
|
|
from fire_planner.scenarios import (
|
|
ScenarioSpec,
|
|
build_regime_schedule,
|
|
build_strategy,
|
|
cartesian_scenarios,
|
|
)
|
|
from fire_planner.simulator import simulate
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@click.group()
|
|
def cli() -> None:
|
|
logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO"))
|
|
|
|
|
|
@cli.command()
|
|
def migrate() -> None:
|
|
"""Run `alembic upgrade head`."""
|
|
rc = subprocess.run(["alembic", "upgrade", "head"], check=False)
|
|
sys.exit(rc.returncode)
|
|
|
|
|
|
@cli.command("ingest")
|
|
@click.option("--source",
|
|
type=click.Choice(["wealthfolio"]),
|
|
default="wealthfolio",
|
|
help="Data source — currently only wealthfolio is wired.")
|
|
@click.option("--db-path",
|
|
type=click.Path(exists=True, dir_okay=False, path_type=Path),
|
|
required=False,
|
|
help="Local sqlite path (after kubectl exec). Required for --source=wealthfolio.")
|
|
@click.option("--as-of",
|
|
type=click.DateTime(formats=["%Y-%m-%d"]),
|
|
default=None,
|
|
help="Snapshot date to read; defaults to MAX(snapshot_date) in the sqlite.")
|
|
def ingest(source: str, db_path: Path | None, as_of: date | None) -> None:
|
|
"""Pull external state into fire_planner.account_snapshot."""
|
|
if source == "wealthfolio":
|
|
if db_path is None:
|
|
raise click.UsageError("--db-path is required for --source=wealthfolio")
|
|
asyncio.run(_ingest_wealthfolio(db_path, as_of))
|
|
|
|
|
|
async def _ingest_wealthfolio(db_path: Path, as_of: date | None) -> None:
|
|
rows = wf_ingest.read_account_snapshots(db_path, as_of=as_of)
|
|
if not rows:
|
|
click.echo("warning: no rows read — wealthfolio sqlite empty or schema unrecognised",
|
|
err=True)
|
|
engine = create_engine_from_env()
|
|
factory = make_session_factory(engine)
|
|
try:
|
|
async with factory() as sess:
|
|
n = await wf_ingest.upsert_snapshots(sess, rows)
|
|
await sess.commit()
|
|
click.echo(f"wealthfolio ingest: {n} rows upserted")
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
|
|
def _build_paths(seed: int, n_paths: int, n_years: int, returns_csv: Path | None) -> np.ndarray:
|
|
"""Load returns from CSV (production) or synthetic (smoke tests)."""
|
|
if returns_csv and returns_csv.exists():
|
|
bundle = load_from_csv(returns_csv)
|
|
else:
|
|
bundle = synthetic_returns(seed=42)
|
|
rng = np.random.default_rng(seed)
|
|
return block_bootstrap(bundle, n_paths=n_paths, n_years=n_years, block_size=5, rng=rng)
|
|
|
|
|
|
@cli.command("simulate")
|
|
@click.option("--scenario",
|
|
required=True,
|
|
help="external_id, e.g. cyprus-vpw-leave-y3-glide-rising")
|
|
@click.option("--n-paths", type=int, default=10_000)
|
|
@click.option("--horizon", type=int, default=60)
|
|
@click.option("--spending", type=float, default=100_000.0)
|
|
@click.option("--nw-seed", type=float, default=1_000_000.0)
|
|
@click.option("--savings", type=float, default=0.0)
|
|
@click.option("--floor",
|
|
type=float,
|
|
default=None,
|
|
help="Real-GBP floor for vpw_floor strategy (e.g. 40000).")
|
|
@click.option("--returns-csv", type=click.Path(path_type=Path), default=None)
|
|
@click.option("--seed", type=int, default=42)
|
|
@click.option("--write-db/--no-write-db", default=False, help="Persist results to fire_planner DB.")
|
|
def simulate_cmd(
|
|
scenario: str,
|
|
n_paths: int,
|
|
horizon: int,
|
|
spending: float,
|
|
nw_seed: float,
|
|
savings: float,
|
|
floor: float | None,
|
|
returns_csv: Path | None,
|
|
seed: int,
|
|
write_db: bool,
|
|
) -> None:
|
|
"""Run one scenario by external_id and pretty-print the result."""
|
|
parts = scenario.split("-")
|
|
if len(parts) < 6 or parts[2] != "leave" or parts[4] != "glide":
|
|
raise click.UsageError(f"bad scenario id: {scenario!r} "
|
|
"(expected jurisdiction-strategy-leave-yN-glide-NAME)")
|
|
jurisdiction = parts[0]
|
|
# strategy may include underscore (e.g. guyton_klinger), so rebuild
|
|
strategy_end = scenario.index("-leave-")
|
|
strategy_name = scenario[len(jurisdiction) + 1:strategy_end]
|
|
leave_year = int(parts[parts.index("leave") + 1].lstrip("y"))
|
|
glide_name = scenario.split("-glide-")[1]
|
|
|
|
spec = ScenarioSpec(
|
|
jurisdiction=jurisdiction,
|
|
strategy=strategy_name,
|
|
leave_uk_year=leave_year,
|
|
glide_path=glide_name,
|
|
spending_gbp=Decimal(str(spending)),
|
|
nw_seed_gbp=Decimal(str(nw_seed)),
|
|
horizon_years=horizon,
|
|
savings_per_year_gbp=Decimal(str(savings)),
|
|
)
|
|
paths = _build_paths(seed, n_paths, horizon, returns_csv)
|
|
annual_savings = (np.full(horizon, savings, dtype=np.float64) if savings else None)
|
|
started = time.perf_counter()
|
|
result = simulate(
|
|
paths=paths,
|
|
initial_portfolio=nw_seed,
|
|
spending_target=spending,
|
|
glide=get_glide(glide_name),
|
|
strategy=build_strategy(strategy_name, floor=floor),
|
|
regime=build_regime_schedule(jurisdiction, leave_year),
|
|
horizon_years=horizon,
|
|
annual_savings=annual_savings,
|
|
)
|
|
elapsed = time.perf_counter() - started
|
|
click.echo(format_scenario(spec, result))
|
|
if write_db:
|
|
asyncio.run(_persist(spec, result, seed=seed, elapsed_seconds=elapsed))
|
|
click.echo(f"simulate: elapsed={elapsed:.2f}s")
|
|
|
|
|
|
async def _persist(spec: ScenarioSpec, result: object, *, seed: int,
|
|
elapsed_seconds: float) -> None:
|
|
engine = create_engine_from_env()
|
|
factory = make_session_factory(engine)
|
|
try:
|
|
async with factory() as sess:
|
|
from fire_planner.simulator import SimulationResult # local to avoid cycle
|
|
assert isinstance(result, SimulationResult)
|
|
await write_run(sess, spec, result, seed=seed, elapsed_seconds=elapsed_seconds)
|
|
await sess.commit()
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
|
|
@cli.command("recompute-all")
|
|
@click.option("--n-paths", type=int, default=10_000)
|
|
@click.option("--horizon", type=int, default=60)
|
|
@click.option("--spending", type=float, default=100_000.0)
|
|
@click.option("--nw-seed", type=float, default=1_000_000.0)
|
|
@click.option("--savings", type=float, default=0.0)
|
|
@click.option("--floor",
|
|
type=float,
|
|
default=None,
|
|
help="Real-GBP floor — applied to vpw_floor scenarios in the sweep.")
|
|
@click.option("--returns-csv", type=click.Path(path_type=Path), default=None)
|
|
@click.option("--seed", type=int, default=42)
|
|
def recompute_all(n_paths: int, horizon: int, spending: float, nw_seed: float, savings: float,
|
|
floor: float | None, returns_csv: Path | None, seed: int) -> None:
|
|
"""Run the full Cartesian (default 120 scenarios) and persist."""
|
|
asyncio.run(
|
|
_recompute_all(n_paths, horizon, spending, nw_seed, savings, floor, returns_csv, seed))
|
|
|
|
|
|
async def _recompute_all(
|
|
n_paths: int,
|
|
horizon: int,
|
|
spending: float,
|
|
nw_seed: float,
|
|
savings: float,
|
|
floor: float | None,
|
|
returns_csv: Path | None,
|
|
seed: int,
|
|
) -> None:
|
|
paths = _build_paths(seed, n_paths, horizon, returns_csv)
|
|
specs = cartesian_scenarios(
|
|
spending_gbp=Decimal(str(spending)),
|
|
nw_seed_gbp=Decimal(str(nw_seed)),
|
|
savings_per_year_gbp=Decimal(str(savings)),
|
|
horizon_years=horizon,
|
|
)
|
|
annual_savings = (np.full(horizon, savings, dtype=np.float64) if savings else None)
|
|
engine = create_engine_from_env()
|
|
factory = make_session_factory(engine)
|
|
successes = 0
|
|
try:
|
|
async with factory() as sess:
|
|
for spec in specs:
|
|
started = time.perf_counter()
|
|
result = simulate(
|
|
paths=paths,
|
|
initial_portfolio=nw_seed,
|
|
spending_target=spending,
|
|
glide=get_glide(spec.glide_path),
|
|
strategy=build_strategy(spec.strategy, floor=floor),
|
|
regime=build_regime_schedule(spec.jurisdiction, spec.leave_uk_year),
|
|
horizon_years=horizon,
|
|
annual_savings=annual_savings,
|
|
)
|
|
elapsed = time.perf_counter() - started
|
|
await write_run(sess, spec, result, seed=seed, elapsed_seconds=elapsed)
|
|
successes += 1
|
|
click.echo(f"{spec.external_id}: success={result.success_rate*100:.1f}% "
|
|
f"elapsed={elapsed:.2f}s")
|
|
await sess.commit()
|
|
finally:
|
|
await engine.dispose()
|
|
click.echo(f"recompute-all done: {successes}/{len(specs)} scenarios written")
|
|
|
|
|
|
@cli.command()
|
|
def serve() -> None:
|
|
"""Run the FastAPI on-demand /recompute server."""
|
|
import uvicorn
|
|
uvicorn.run("fire_planner.app:app", host="0.0.0.0", port=8080)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|