"""End-to-end smoke: scenario builder → simulator → reporter → SQLite. Exercises the same pipeline `recompute-all` runs in production, but on SQLite (no Postgres needed). Catches integration breakage early. """ from decimal import Decimal import numpy as np from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from fire_planner.db import McRun, Scenario, ScenarioSummary from fire_planner.glide_path import get as get_glide from fire_planner.reporters.pg import write_run from fire_planner.returns.bootstrap import block_bootstrap from fire_planner.returns.shiller import synthetic_returns from fire_planner.scenarios import build_regime_schedule, build_strategy, cartesian_scenarios from fire_planner.simulator import simulate async def test_full_pipeline_persists_summary_per_scenario(session: AsyncSession) -> None: """Run a tiny Cartesian (2 jurisdictions × 1 strategy × 1 leave × 1 glide = 2 scenarios) end-to-end. Verifies scenario, mc_run, and scenario_summary all populate.""" bundle = synthetic_returns(seed=1, n_years=120) paths = block_bootstrap(bundle, n_paths=200, n_years=20, block_size=5, rng=np.random.default_rng(0)) specs = cartesian_scenarios( spending_gbp=Decimal("80000"), nw_seed_gbp=Decimal("1500000"), horizon_years=20, jurisdictions=("uk", "cyprus"), strategies=("trinity", ), leave_years=(2, ), glides=("rising", ), ) assert len(specs) == 2 for spec in specs: result = simulate( paths=paths, initial_portfolio=float(spec.nw_seed_gbp), spending_target=float(spec.spending_gbp), glide=get_glide(spec.glide_path), strategy=build_strategy(spec.strategy), regime=build_regime_schedule(spec.jurisdiction, spec.leave_uk_year), horizon_years=spec.horizon_years, ) await write_run(session, spec, result, seed=42, elapsed_seconds=0.5) await session.commit() scenarios = (await session.execute(select(Scenario))).scalars().all() assert {s.external_id for s in scenarios} == { "uk-trinity-leave-y2-glide-rising", "cyprus-trinity-leave-y2-glide-rising", } runs = (await session.execute(select(McRun))).scalars().all() assert len(runs) == 2 summaries = (await session.execute(select(ScenarioSummary))).scalars().all() assert len(summaries) == 2 # Cyprus median_lifetime_tax should be lower than UK's for the same # scenario shape — the canonical Phase 8 sanity test. by_jur = {s.jurisdiction: s for s in summaries} assert by_jur["cyprus"].median_lifetime_tax_gbp < by_jur["uk"].median_lifetime_tax_gbp async def test_pipeline_handles_recompute_idempotency(session: AsyncSession) -> None: """Running the same scenario twice must result in 1 scenario row, 2 mc_run rows, and 1 scenario_summary row pointing at the latest run.""" bundle = synthetic_returns(seed=2, n_years=60) paths = block_bootstrap(bundle, n_paths=100, n_years=15, block_size=5, rng=np.random.default_rng(0)) spec = next( iter( cartesian_scenarios( spending_gbp=Decimal("100000"), nw_seed_gbp=Decimal("1000000"), horizon_years=15, jurisdictions=("bulgaria", ), strategies=("vpw", ), leave_years=(1, ), glides=("static_60_40", ), ))) for run in range(2): result = simulate( paths=paths, initial_portfolio=float(spec.nw_seed_gbp), spending_target=float(spec.spending_gbp), glide=get_glide(spec.glide_path), strategy=build_strategy(spec.strategy), regime=build_regime_schedule(spec.jurisdiction, spec.leave_uk_year), horizon_years=spec.horizon_years, ) await write_run(session, spec, result, seed=run, elapsed_seconds=0.2) await session.commit() scenarios = (await session.execute(select(Scenario))).scalars().all() assert len(scenarios) == 1 runs = (await session.execute(select(McRun))).scalars().all() assert len(runs) == 2 summaries = (await session.execute(select(ScenarioSummary))).scalars().all() assert len(summaries) == 1