fire-planner/fire_planner/db.py
Viktor Barzin 9cc781a8d6
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
fire-planner: ProjectionLab parity Wave 1 — tabbed shell, year stats, goals,
income streams, Sankey cashflow, progress overlay, settings sub-pages

Wave 1 (9 features across 4 streams):

Stream A — dashboard skeleton
  1.A.1 ScenarioShell with top tabs (Plan/Cash Flow/Tax Analytics/Compare/
        Reports/Estate/Settings) + left Sidebar with Plans switcher.
  1.A.2 GET /scenarios/{id}/year-stats?year=N returning per-year metrics
        (NW, Δ NW, taxable income, taxes, eff. rate, spending, contribs,
        investment growth). YearScrubber + YearStatsPanel render the
        right-hand sidebar; URL ?year= preserves selection.
  1.A.3 FanChart gains optional `milestones` prop (lib/milestone.ts maps
        life_event.kind → emoji) + selectedYear marker line.

Stream B — goals + progress
  1.B.1 New goals_eval module: target_nw_by_year / never_run_out /
        target_real_income probability evaluation. Wired into POST
        /simulate (exact, per-path) and GET /scenarios/{id}/projection
        (approximated from persisted fan via percentile interpolation).
        GoalsSection renders pass/fail badges.
  1.B.2 GET /scenarios/{id}/progress overlays AccountSnapshot totals on
        the projection fan; ProgressPage shows variance side-panel.

Stream C — income + cashflow
  1.C.1 New IncomeStream model + alembic 0003 + CRUD endpoints. Engine
        aggregates streams into per-year inflows + taxable arrays;
        income tax routes through the jurisdiction tax engine.
        IncomeStreamsSection on Plan tab.
  1.C.2 GET /scenarios/{id}/cashflow?year=N returns sources/sinks for
        an ECharts Sankey (sums conserve). CashflowTab body.

Stream D — settings
  1.D.1 SettingsTab + sub-nav (Milestones/Rates/Dividends/Bonds/Tax/
        Metrics/Other/Notes); placeholder cards for unbuilt sub-pages.
  1.D.2 LifeEventsSection relocated to /scenarios/:id/settings.
  1.D.3 RatesSettings (Fixed/Historical/Advanced segmented + per-asset
        cards). SimulateRequest gains rates_mode, inflation_pct,
        stocks/bonds growth + dividend, stocks_allocation. New
        build_fixed_paths() in simulator. Real-return arithmetic
        verified against (1+g+d)/(1+i)−1 ≈ 5.4%.
  1.D.4 NotesSettings — markdown textarea, save-on-blur, stored in
        scenario.config_json.notes.

Backend: 238 pytest pass (+19 new), mypy + ruff clean.
Frontend: typecheck + 7 unit tests + production build clean.

Roadmap for Wave 2-N is documented in the implementation plan.
2026-05-10 12:49:44 +00:00

278 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from datetime import date, datetime
from decimal import Decimal
from typing import Any
from sqlalchemy import JSON, TIMESTAMP, Boolean, Date, Integer, Numeric, String, func, text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
SCHEMA_NAME = "fire_planner"
class Base(DeclarativeBase):
pass
# JSONB on Postgres, plain JSON on SQLite — tests use SQLite, prod uses Postgres.
JSON_TYPE = JSONB().with_variant(JSON(), "sqlite")
class AccountSnapshot(Base):
"""Daily NW per account from Wealthfolio (filled by ingest).
`external_id` is `wealthfolio:{account_id}:{date}` so re-runs on the same
day are idempotent — Wealthfolio keeps one snapshot per account per day.
"""
__tablename__ = "account_snapshot"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
external_id: Mapped[str] = mapped_column(String, unique=True, nullable=False)
snapshot_date: Mapped[date] = mapped_column(Date, nullable=False, index=True)
account_id: Mapped[str] = mapped_column(String, nullable=False, index=True)
account_name: Mapped[str] = mapped_column(String, nullable=False)
account_type: Mapped[str] = mapped_column(String, nullable=False)
currency: Mapped[str] = mapped_column(String(3), nullable=False, server_default="GBP")
market_value: Mapped[Decimal] = mapped_column(Numeric(14, 2), nullable=False)
market_value_gbp: Mapped[Decimal] = mapped_column(Numeric(14, 2), nullable=False)
cost_basis_gbp: Mapped[Decimal | None] = mapped_column(Numeric(14, 2), nullable=True)
raw_extraction: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
class Scenario(Base):
"""A simulation scenario.
Two kinds:
- `kind='cartesian'` — auto-generated from `scenarios.py` Cartesian
product; rebuilt every recompute, upserted on `external_id`.
- `kind='user'` — user-defined (named, optionally cloned from a base);
survives recomputes; `parent_scenario_id` points at the source if any.
"""
__tablename__ = "scenario"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
external_id: Mapped[str] = mapped_column(String, unique=True, nullable=False)
kind: Mapped[str] = mapped_column(String(16),
nullable=False,
server_default=text("'cartesian'"))
name: Mapped[str | None] = mapped_column(String, nullable=True)
description: Mapped[str | None] = mapped_column(String, nullable=True)
parent_scenario_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
jurisdiction: Mapped[str] = mapped_column(String(32), nullable=False, index=True)
strategy: Mapped[str] = mapped_column(String(32), nullable=False, index=True)
leave_uk_year: Mapped[int] = mapped_column(Integer, nullable=False)
glide_path: Mapped[str] = mapped_column(String(32), nullable=False)
spending_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
horizon_years: Mapped[int] = mapped_column(Integer, nullable=False, server_default=text("60"))
nw_seed_gbp: Mapped[Decimal] = mapped_column(Numeric(14, 2), nullable=False)
savings_per_year_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2),
nullable=False,
server_default=text("0"))
config_json: Mapped[dict[str, Any]] = mapped_column(JSON_TYPE, nullable=False)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
class McRun(Base):
"""One MC execution per (scenario, run_at). Stores execution metadata +
summary statistics — enough to populate a Grafana cell without touching
the per-path tables."""
__tablename__ = "mc_run"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
scenario_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
run_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
n_paths: Mapped[int] = mapped_column(Integer, nullable=False)
seed: Mapped[int] = mapped_column(Integer, nullable=False)
success_rate: Mapped[Decimal] = mapped_column(Numeric(6, 4), nullable=False)
p10_ending_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p50_ending_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p90_ending_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
median_lifetime_tax_gbp: Mapped[Decimal] = mapped_column(Numeric(14, 2), nullable=False)
median_years_to_ruin: Mapped[Decimal | None] = mapped_column(Numeric(6, 2), nullable=True)
elapsed_seconds: Mapped[Decimal] = mapped_column(Numeric(8, 3), nullable=False)
sequence_risk_correlation: Mapped[Decimal | None] = mapped_column(Numeric(6, 4), nullable=True)
extra: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True)
class McPath(Base):
"""Sparse per-path storage: top decile, bottom decile, and median paths
fully stored — enough for a fan chart, not 10k×60 ≈ 600k rows."""
__tablename__ = "mc_path"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
mc_run_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
path_idx: Mapped[int] = mapped_column(Integer, nullable=False)
bucket: Mapped[str] = mapped_column(String(16), nullable=False)
year_idx: Mapped[int] = mapped_column(Integer, nullable=False)
portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
withdrawal_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
tax_paid_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
real_portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
class ProjectionYearly(Base):
"""Deterministic point projection per scenario — per-year point estimates
that drive fan charts and the per-year Grafana table. One row per
(scenario, year)."""
__tablename__ = "projection_yearly"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
mc_run_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
year_idx: Mapped[int] = mapped_column(Integer, nullable=False)
p10_portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p25_portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p50_portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p75_portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p90_portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p50_withdrawal_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
p50_tax_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
survival_rate: Mapped[Decimal] = mapped_column(Numeric(6, 4), nullable=False)
class ScenarioSummary(Base):
"""Denormalised fast-read for Grafana — one row per (scenario, latest run)."""
__tablename__ = "scenario_summary"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
scenario_id: Mapped[int] = mapped_column(Integer, unique=True, nullable=False)
mc_run_id: Mapped[int] = mapped_column(Integer, nullable=False)
jurisdiction: Mapped[str] = mapped_column(String(32), nullable=False, index=True)
strategy: Mapped[str] = mapped_column(String(32), nullable=False, index=True)
leave_uk_year: Mapped[int] = mapped_column(Integer, nullable=False)
glide_path: Mapped[str] = mapped_column(String(32), nullable=False)
spending_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
success_rate: Mapped[Decimal] = mapped_column(Numeric(6, 4), nullable=False)
p10_ending_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p50_ending_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p90_ending_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
median_lifetime_tax_gbp: Mapped[Decimal] = mapped_column(Numeric(14, 2), nullable=False)
median_years_to_ruin: Mapped[Decimal | None] = mapped_column(Numeric(6, 2), nullable=True)
updated_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
class LifeEvent(Base):
"""A timed event in a user's plan: retirement, kid born, mortgage payoff,
sabbatical, etc. Attached to a scenario.
`year_start` and `year_end` are offsets from the scenario start year
(year 0 = today). For one-time events, leave `year_end` = `year_start`.
`delta_gbp_per_year` is the annual cashflow change while the event is
active (negative = expense, positive = income; 0 for events that just
mark a milestone like "retire").
Free-form `payload` carries event-kind-specific config that the
simulator hasn't yet learned to consume — graceful forward-compat.
"""
__tablename__ = "life_event"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
scenario_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
kind: Mapped[str] = mapped_column(String(32), nullable=False)
name: Mapped[str] = mapped_column(String, nullable=False)
year_start: Mapped[int] = mapped_column(Integer, nullable=False)
year_end: Mapped[int | None] = mapped_column(Integer, nullable=True)
delta_gbp_per_year: Mapped[Decimal] = mapped_column(Numeric(12, 2),
nullable=False,
server_default=text("0"))
one_time_amount_gbp: Mapped[Decimal | None] = mapped_column(Numeric(14, 2), nullable=True)
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default=text("true"))
payload: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
class IncomeStream(Base):
"""A typed, recurring source of income — first-class income object.
Modelled as a per-scenario row so a user can stack salary, dividends,
rental, pensions, RSUs, etc. The simulator routes the after-tax
amount through the jurisdiction's tax engine using `tax_treatment`
as the bucket hint (income / dividend / cgt / tax_free).
`start_year` / `end_year` are offsets from the scenario start year.
`growth_pct` is real growth; the simulator applies it geometrically.
"""
__tablename__ = "income_stream"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
scenario_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
kind: Mapped[str] = mapped_column(String(32), nullable=False)
name: Mapped[str] = mapped_column(String, nullable=False)
start_year: Mapped[int] = mapped_column(Integer, nullable=False, server_default=text("0"))
end_year: Mapped[int | None] = mapped_column(Integer, nullable=True)
amount_gbp_per_year: Mapped[Decimal] = mapped_column(Numeric(12, 2),
nullable=False,
server_default=text("0"))
growth_pct: Mapped[Decimal] = mapped_column(Numeric(6, 4),
nullable=False,
server_default=text("0"))
tax_treatment: Mapped[str] = mapped_column(String(16),
nullable=False,
server_default=text("'income'"))
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default=text("true"))
payload: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
class RetirementGoal(Base):
"""A user-defined success criterion for a scenario.
Examples:
- target_nw: "have ≥£2M real GBP at year 25" → kind=target_nw,
target_amount_gbp=2_000_000, target_year=25, comparator='>='
- never_run_out: "never run out before age 95" → kind=never_run_out,
target_year=65 (years from start), no amount
- inheritance: "leave ≥£500k to heirs" → kind=inheritance,
target_amount_gbp=500_000, target_year=horizon, comparator='>='
`success_threshold` is the probability bar that counts as "passing"
(e.g. 0.95 = 95% of MC paths must satisfy the comparator).
"""
__tablename__ = "retirement_goal"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
scenario_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
kind: Mapped[str] = mapped_column(String(32), nullable=False)
name: Mapped[str] = mapped_column(String, nullable=False)
target_amount_gbp: Mapped[Decimal | None] = mapped_column(Numeric(16, 2), nullable=True)
target_year: Mapped[int | None] = mapped_column(Integer, nullable=True)
comparator: Mapped[str] = mapped_column(String(4), nullable=False, server_default=text("'>='"))
success_threshold: Mapped[Decimal] = mapped_column(Numeric(4, 3),
nullable=False,
server_default=text("0.95"))
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default=text("true"))
payload: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
def create_engine_from_env() -> AsyncEngine:
url = os.environ["DB_CONNECTION_STRING"]
return create_async_engine(url, pool_pre_ping=True)
def make_session_factory(engine: AsyncEngine) -> async_sessionmaker[Any]:
return async_sessionmaker(engine, expire_on_commit=False)