fire-planner/fire_planner/db.py

288 lines
16 KiB
Python
Raw Permalink Normal View History

2026-05-07 17:06:19 +00:00
import os
from datetime import date, datetime
from decimal import Decimal
from typing import Any
from sqlalchemy import JSON, TIMESTAMP, Boolean, Date, Integer, Numeric, String, func, text
2026-05-07 17:06:19 +00:00
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
SCHEMA_NAME = "fire_planner"
class Base(DeclarativeBase):
pass
# JSONB on Postgres, plain JSON on SQLite — tests use SQLite, prod uses Postgres.
JSON_TYPE = JSONB().with_variant(JSON(), "sqlite")
class AccountSnapshot(Base):
"""Daily NW per account from Wealthfolio (filled by ingest).
`external_id` is `wealthfolio:{account_id}:{date}` so re-runs on the same
day are idempotent Wealthfolio keeps one snapshot per account per day.
"""
__tablename__ = "account_snapshot"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
external_id: Mapped[str] = mapped_column(String, unique=True, nullable=False)
snapshot_date: Mapped[date] = mapped_column(Date, nullable=False, index=True)
account_id: Mapped[str] = mapped_column(String, nullable=False, index=True)
account_name: Mapped[str] = mapped_column(String, nullable=False)
account_type: Mapped[str] = mapped_column(String, nullable=False)
currency: Mapped[str] = mapped_column(String(3), nullable=False, server_default="GBP")
market_value: Mapped[Decimal] = mapped_column(Numeric(14, 2), nullable=False)
market_value_gbp: Mapped[Decimal] = mapped_column(Numeric(14, 2), nullable=False)
cost_basis_gbp: Mapped[Decimal | None] = mapped_column(Numeric(14, 2), nullable=True)
raw_extraction: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
class Scenario(Base):
"""A simulation scenario.
Two kinds:
- `kind='cartesian'` auto-generated from `scenarios.py` Cartesian
product; rebuilt every recompute, upserted on `external_id`.
- `kind='user'` user-defined (named, optionally cloned from a base);
survives recomputes; `parent_scenario_id` points at the source if any.
2026-05-07 17:06:19 +00:00
"""
__tablename__ = "scenario"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
external_id: Mapped[str] = mapped_column(String, unique=True, nullable=False)
kind: Mapped[str] = mapped_column(String(16),
nullable=False,
server_default=text("'cartesian'"))
name: Mapped[str | None] = mapped_column(String, nullable=True)
description: Mapped[str | None] = mapped_column(String, nullable=True)
parent_scenario_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
2026-05-07 17:06:19 +00:00
jurisdiction: Mapped[str] = mapped_column(String(32), nullable=False, index=True)
strategy: Mapped[str] = mapped_column(String(32), nullable=False, index=True)
leave_uk_year: Mapped[int] = mapped_column(Integer, nullable=False)
glide_path: Mapped[str] = mapped_column(String(32), nullable=False)
spending_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
horizon_years: Mapped[int] = mapped_column(Integer, nullable=False, server_default=text("60"))
nw_seed_gbp: Mapped[Decimal] = mapped_column(Numeric(14, 2), nullable=False)
savings_per_year_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2),
nullable=False,
server_default=text("0"))
config_json: Mapped[dict[str, Any]] = mapped_column(JSON_TYPE, nullable=False)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
class McRun(Base):
"""One MC execution per (scenario, run_at). Stores execution metadata +
summary statistics enough to populate a Grafana cell without touching
the per-path tables."""
__tablename__ = "mc_run"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
scenario_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
run_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
n_paths: Mapped[int] = mapped_column(Integer, nullable=False)
seed: Mapped[int] = mapped_column(Integer, nullable=False)
success_rate: Mapped[Decimal] = mapped_column(Numeric(6, 4), nullable=False)
p10_ending_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p50_ending_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p90_ending_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
median_lifetime_tax_gbp: Mapped[Decimal] = mapped_column(Numeric(14, 2), nullable=False)
median_years_to_ruin: Mapped[Decimal | None] = mapped_column(Numeric(6, 2), nullable=True)
elapsed_seconds: Mapped[Decimal] = mapped_column(Numeric(8, 3), nullable=False)
sequence_risk_correlation: Mapped[Decimal | None] = mapped_column(Numeric(6, 4), nullable=True)
extra: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True)
class McPath(Base):
"""Sparse per-path storage: top decile, bottom decile, and median paths
fully stored enough for a fan chart, not 10k×60 600k rows."""
__tablename__ = "mc_path"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
mc_run_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
path_idx: Mapped[int] = mapped_column(Integer, nullable=False)
bucket: Mapped[str] = mapped_column(String(16), nullable=False)
year_idx: Mapped[int] = mapped_column(Integer, nullable=False)
portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
withdrawal_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
tax_paid_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
real_portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
class ProjectionYearly(Base):
"""Deterministic point projection per scenario — per-year point estimates
that drive fan charts and the per-year Grafana table. One row per
(scenario, year)."""
__tablename__ = "projection_yearly"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
mc_run_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
year_idx: Mapped[int] = mapped_column(Integer, nullable=False)
p10_portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p25_portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p50_portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p75_portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p90_portfolio_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p50_withdrawal_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
p50_tax_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
survival_rate: Mapped[Decimal] = mapped_column(Numeric(6, 4), nullable=False)
class ScenarioSummary(Base):
"""Denormalised fast-read for Grafana — one row per (scenario, latest run)."""
__tablename__ = "scenario_summary"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
scenario_id: Mapped[int] = mapped_column(Integer, unique=True, nullable=False)
mc_run_id: Mapped[int] = mapped_column(Integer, nullable=False)
jurisdiction: Mapped[str] = mapped_column(String(32), nullable=False, index=True)
strategy: Mapped[str] = mapped_column(String(32), nullable=False, index=True)
leave_uk_year: Mapped[int] = mapped_column(Integer, nullable=False)
glide_path: Mapped[str] = mapped_column(String(32), nullable=False)
spending_gbp: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
success_rate: Mapped[Decimal] = mapped_column(Numeric(6, 4), nullable=False)
p10_ending_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p50_ending_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
p90_ending_gbp: Mapped[Decimal] = mapped_column(Numeric(16, 2), nullable=False)
median_lifetime_tax_gbp: Mapped[Decimal] = mapped_column(Numeric(14, 2), nullable=False)
median_years_to_ruin: Mapped[Decimal | None] = mapped_column(Numeric(6, 2), nullable=True)
updated_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
class LifeEvent(Base):
"""A timed event in a user's plan: retirement, kid born, mortgage payoff,
sabbatical, etc. Attached to a scenario.
`year_start` and `year_end` are offsets from the scenario start year
(year 0 = today). For one-time events, leave `year_end` = `year_start`.
`delta_gbp_per_year` is the annual cashflow change while the event is
active (negative = expense, positive = income; 0 for events that just
mark a milestone like "retire").
Free-form `payload` carries event-kind-specific config that the
simulator hasn't yet learned to consume — graceful forward-compat.
"""
__tablename__ = "life_event"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
scenario_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
kind: Mapped[str] = mapped_column(String(32), nullable=False)
name: Mapped[str] = mapped_column(String, nullable=False)
year_start: Mapped[int] = mapped_column(Integer, nullable=False)
year_end: Mapped[int | None] = mapped_column(Integer, nullable=True)
delta_gbp_per_year: Mapped[Decimal] = mapped_column(Numeric(12, 2),
nullable=False,
server_default=text("0"))
one_time_amount_gbp: Mapped[Decimal | None] = mapped_column(Numeric(14, 2), nullable=True)
fire-planner: Wave 2 chart-first — flex spending, categorised life events, interactive Visx Gantt + spending-profile chart Charts are now the primary editor for life events. The Plan-tab body re-orders to make charts ~80% of viewport real-estate; legacy form sections are collapsed into a drawer. Backend: - alembic 0004: life_event.category enum (essential / discretionary / not_spending). Defaults to essential so existing rows keep their full spending impact. - Simulator gains discretionary_outflows + flex_rules params. Tracks per-path running ATH, applies the deepest applicable cut to discretionary outflows when portfolio drops vs ATH (PLab-style flex spending). Cut amount stays in the portfolio (refund pattern). - New flex_spending module with FlexRule + applicable_cut + cuts_per_year (vectorised). Sortable rules; "deepest cut wins" so users specify cumulative cuts at each tier. - New /scenarios/{id}/spending-profile endpoint returning per-year base / essential / discretionary / flex_cut / total breakdown. - SimulateRequest gains flex_rules + life_event.category roundtrip. - 8 new tests; 246 total pytest pass; mypy + ruff clean. Frontend (Visx + ECharts): - Installed @visx/{scale,shape,group,axis,event,responsive,tooltip} for native SVG drag interactions. - New <SpendingProfileChart> — Visx stacked-area of base/essential/ discretionary with red flex-cut overlay, hover tooltip, click-to- scrub-year. - New <EventGantt> — interactive Visx Gantt: * Click empty space → popover create at that year (default essential spending event) * Click a bar → inline edit popover (name, kind, range, £/y, category) with delete button * Drag bar middle → moves the whole event (year-resolution snap) * Drag bar edges → resizes year_start / year_end * All gestures persist via PATCH /life-events/{id} - New <FlexRulesEditor> — list of {from_ath_pct, cut} tiers, save-on- change to scenario.config_json.flex_rules. - Plan-tab redesign: NW fan dominant top with floating stat badges (Year/Age/NW/Δ NW/Spending/Eff. tax) over the chart; spending- profile chart middle; Gantt bottom; flex-rules editor; legacy form sections in a collapsed <details> drawer. - Frontend typecheck + 7 vitest tests + production build all clean.
2026-05-10 16:49:04 +00:00
# Spending category for flex-spending classification:
# essential — never trimmed by flex rules (mortgage, food, kids)
# discretionary — trimmed when portfolio drops vs ATH (travel, dining)
# not_spending — informational only (a milestone marker that doesn't
# change cashflow, e.g. a kid graduating)
# Default is `essential` so existing rows keep their full spending impact.
category: Mapped[str] = mapped_column(String(16),
nullable=False,
server_default=text("'essential'"))
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default=text("true"))
payload: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
fire-planner: ProjectionLab parity Wave 1 — tabbed shell, year stats, goals, income streams, Sankey cashflow, progress overlay, settings sub-pages Wave 1 (9 features across 4 streams): Stream A — dashboard skeleton 1.A.1 ScenarioShell with top tabs (Plan/Cash Flow/Tax Analytics/Compare/ Reports/Estate/Settings) + left Sidebar with Plans switcher. 1.A.2 GET /scenarios/{id}/year-stats?year=N returning per-year metrics (NW, Δ NW, taxable income, taxes, eff. rate, spending, contribs, investment growth). YearScrubber + YearStatsPanel render the right-hand sidebar; URL ?year= preserves selection. 1.A.3 FanChart gains optional `milestones` prop (lib/milestone.ts maps life_event.kind → emoji) + selectedYear marker line. Stream B — goals + progress 1.B.1 New goals_eval module: target_nw_by_year / never_run_out / target_real_income probability evaluation. Wired into POST /simulate (exact, per-path) and GET /scenarios/{id}/projection (approximated from persisted fan via percentile interpolation). GoalsSection renders pass/fail badges. 1.B.2 GET /scenarios/{id}/progress overlays AccountSnapshot totals on the projection fan; ProgressPage shows variance side-panel. Stream C — income + cashflow 1.C.1 New IncomeStream model + alembic 0003 + CRUD endpoints. Engine aggregates streams into per-year inflows + taxable arrays; income tax routes through the jurisdiction tax engine. IncomeStreamsSection on Plan tab. 1.C.2 GET /scenarios/{id}/cashflow?year=N returns sources/sinks for an ECharts Sankey (sums conserve). CashflowTab body. Stream D — settings 1.D.1 SettingsTab + sub-nav (Milestones/Rates/Dividends/Bonds/Tax/ Metrics/Other/Notes); placeholder cards for unbuilt sub-pages. 1.D.2 LifeEventsSection relocated to /scenarios/:id/settings. 1.D.3 RatesSettings (Fixed/Historical/Advanced segmented + per-asset cards). SimulateRequest gains rates_mode, inflation_pct, stocks/bonds growth + dividend, stocks_allocation. New build_fixed_paths() in simulator. Real-return arithmetic verified against (1+g+d)/(1+i)−1 ≈ 5.4%. 1.D.4 NotesSettings — markdown textarea, save-on-blur, stored in scenario.config_json.notes. Backend: 238 pytest pass (+19 new), mypy + ruff clean. Frontend: typecheck + 7 unit tests + production build clean. Roadmap for Wave 2-N is documented in the implementation plan.
2026-05-10 12:49:44 +00:00
class IncomeStream(Base):
"""A typed, recurring source of income — first-class income object.
Modelled as a per-scenario row so a user can stack salary, dividends,
rental, pensions, RSUs, etc. The simulator routes the after-tax
amount through the jurisdiction's tax engine using `tax_treatment`
as the bucket hint (income / dividend / cgt / tax_free).
`start_year` / `end_year` are offsets from the scenario start year.
`growth_pct` is real growth; the simulator applies it geometrically.
"""
__tablename__ = "income_stream"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
scenario_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
kind: Mapped[str] = mapped_column(String(32), nullable=False)
name: Mapped[str] = mapped_column(String, nullable=False)
start_year: Mapped[int] = mapped_column(Integer, nullable=False, server_default=text("0"))
end_year: Mapped[int | None] = mapped_column(Integer, nullable=True)
amount_gbp_per_year: Mapped[Decimal] = mapped_column(Numeric(12, 2),
nullable=False,
server_default=text("0"))
growth_pct: Mapped[Decimal] = mapped_column(Numeric(6, 4),
nullable=False,
server_default=text("0"))
tax_treatment: Mapped[str] = mapped_column(String(16),
nullable=False,
server_default=text("'income'"))
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default=text("true"))
payload: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
class RetirementGoal(Base):
"""A user-defined success criterion for a scenario.
Examples:
- target_nw: "have ≥£2M real GBP at year 25" kind=target_nw,
target_amount_gbp=2_000_000, target_year=25, comparator='>='
- never_run_out: "never run out before age 95" kind=never_run_out,
target_year=65 (years from start), no amount
- inheritance: "leave ≥£500k to heirs" kind=inheritance,
target_amount_gbp=500_000, target_year=horizon, comparator='>='
`success_threshold` is the probability bar that counts as "passing"
(e.g. 0.95 = 95% of MC paths must satisfy the comparator).
"""
__tablename__ = "retirement_goal"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
scenario_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
kind: Mapped[str] = mapped_column(String(32), nullable=False)
name: Mapped[str] = mapped_column(String, nullable=False)
target_amount_gbp: Mapped[Decimal | None] = mapped_column(Numeric(16, 2), nullable=True)
target_year: Mapped[int | None] = mapped_column(Integer, nullable=True)
comparator: Mapped[str] = mapped_column(String(4), nullable=False, server_default=text("'>='"))
success_threshold: Mapped[Decimal] = mapped_column(Numeric(4, 3),
nullable=False,
server_default=text("0.95"))
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default=text("true"))
payload: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now())
2026-05-07 17:06:19 +00:00
def create_engine_from_env() -> AsyncEngine:
url = os.environ["DB_CONNECTION_STRING"]
return create_async_engine(url, pool_pre_ping=True)
def make_session_factory(engine: AsyncEngine) -> async_sessionmaker[Any]:
return async_sessionmaker(engine, expire_on_commit=False)