diff --git a/fire_planner/api/dependencies.py b/fire_planner/api/dependencies.py index d44dc13..cc0a997 100644 --- a/fire_planner/api/dependencies.py +++ b/fire_planner/api/dependencies.py @@ -16,3 +16,22 @@ async def get_session(request: Request) -> AsyncIterator[AsyncSession]: factory: async_sessionmaker[AsyncSession] = request.app.state.session_factory async with factory() as session: yield session + + +async def get_wf_sync_session(request: Request) -> AsyncIterator[AsyncSession | None]: + """Yield an AsyncSession bound to the wealthfolio_sync mirror. + + Powers the lazy refresh of `account_snapshot` (the disk cache for + /networth). Yields `None` when the factory is not wired so endpoints + can fall back to whatever is already cached — relevant for the in-memory + SQLite test runs that don't wire wf_sync. Tests that DO want to exercise + the refresh override this with their own factory via + `app.dependency_overrides`. + """ + factory: async_sessionmaker[AsyncSession] | None = getattr( + request.app.state, "wf_sync_session_factory", None) + if factory is None: + yield None + return + async with factory() as session: + yield session diff --git a/fire_planner/api/networth.py b/fire_planner/api/networth.py index f3f92e8..9323b07 100644 --- a/fire_planner/api/networth.py +++ b/fire_planner/api/networth.py @@ -1,7 +1,8 @@ """Net-worth read endpoints. -Reads from `fire_planner.account_snapshot` (populated hourly by the -wealthfolio ingest). Two views: +Reads from `fire_planner.account_snapshot`, which is lazily refreshed +from the wealthfolio_sync PG mirror on each request when older than +`NETWORTH_CACHE_TTL_DAYS` (default 1). Two views: - GET /networth → latest snapshot per account, totals - GET /networth/history → daily totals + per-account series, for charts """ @@ -15,7 +16,7 @@ from fastapi import APIRouter, Depends, Query from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from fire_planner.api.dependencies import get_session +from fire_planner.api.dependencies import get_session, get_wf_sync_session from fire_planner.api.schemas import ( AccountSnapshotOut, NetWorthCurrent, @@ -23,13 +24,18 @@ from fire_planner.api.schemas import ( NetWorthHistoryPoint, ) from fire_planner.db import AccountSnapshot +from fire_planner.ingest.wealthfolio import refresh_account_snapshots_if_stale router = APIRouter(prefix="/networth", tags=["networth"]) @router.get("", response_model=NetWorthCurrent) -async def current_networth(session: AsyncSession = Depends(get_session)) -> NetWorthCurrent: +async def current_networth( + session: AsyncSession = Depends(get_session), + wf_sync: AsyncSession | None = Depends(get_wf_sync_session), +) -> NetWorthCurrent: """Latest snapshot per account + GBP total.""" + await refresh_account_snapshots_if_stale(session, wf_sync) latest_date = (await session.execute( select(AccountSnapshot.snapshot_date).order_by( AccountSnapshot.snapshot_date.desc()).limit(1))).scalar() @@ -46,6 +52,7 @@ async def current_networth(session: AsyncSession = Depends(get_session)) -> NetW @router.get("/history", response_model=NetWorthHistory) async def networth_history( session: AsyncSession = Depends(get_session), + wf_sync: AsyncSession | None = Depends(get_wf_sync_session), days: int = Query(default=365, ge=1, le=3650, description="Look-back window."), ) -> NetWorthHistory: """Daily NW total + per-account breakdown for a stacked area chart. @@ -53,6 +60,7 @@ async def networth_history( Picks one row per (account_id, snapshot_date) — wealthfolio ingest upserts daily so this is already de-duped, but we group defensively. """ + await refresh_account_snapshots_if_stale(session, wf_sync) rows = (await session.execute( select( AccountSnapshot.snapshot_date, diff --git a/fire_planner/api/progress.py b/fire_planner/api/progress.py index 99cb83a..a7151b2 100644 --- a/fire_planner/api/progress.py +++ b/fire_planner/api/progress.py @@ -16,7 +16,7 @@ from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from fire_planner.api.dependencies import get_session +from fire_planner.api.dependencies import get_session, get_wf_sync_session from fire_planner.api.schemas import ( ProgressActualPoint, ProgressProjectedPoint, @@ -24,6 +24,7 @@ from fire_planner.api.schemas import ( ProgressVariancePoint, ) from fire_planner.db import AccountSnapshot, McRun, ProjectionYearly, Scenario +from fire_planner.ingest.wealthfolio import refresh_account_snapshots_if_stale router = APIRouter(prefix="/scenarios", tags=["progress"]) @@ -32,11 +33,13 @@ router = APIRouter(prefix="/scenarios", tags=["progress"]) async def get_progress( scenario_id: int, session: AsyncSession = Depends(get_session), + wf_sync: AsyncSession | None = Depends(get_wf_sync_session), ) -> ProgressResponse: scen = await session.get(Scenario, scenario_id) if scen is None: raise HTTPException(status_code=404, detail="Scenario not found") + await refresh_account_snapshots_if_stale(session, wf_sync) actuals_rows = (await session.execute( select( AccountSnapshot.snapshot_date, diff --git a/fire_planner/app.py b/fire_planner/app.py index 4f8a769..cf6164e 100644 --- a/fire_planner/app.py +++ b/fire_planner/app.py @@ -53,6 +53,7 @@ from fire_planner.api.spending import router as spending_router from fire_planner.api.spending_profile import router as spending_profile_router from fire_planner.api.year_stats import router as year_stats_router from fire_planner.db import create_engine_from_env, make_session_factory +from fire_planner.ingest.wealthfolio_pg import create_wf_sync_engine_from_env log = logging.getLogger(__name__) @@ -77,6 +78,17 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: # Tests inject these via dependency_overrides; nothing to wire. log.warning("DB_CONNECTION_STRING unset; skipping engine init") + # wealthfolio_sync engine — powers the lazy refresh of /networth's + # account_snapshot cache. Optional: when unset we serve whatever is + # already cached in account_snapshot (still useful for tests + dev). + if os.environ.get("WEALTHFOLIO_SYNC_DB_CONNECTION_STRING"): + wf_engine = create_wf_sync_engine_from_env() + app.state.wf_sync_engine = wf_engine + app.state.wf_sync_session_factory = make_session_factory(wf_engine) + else: + log.info("WEALTHFOLIO_SYNC_DB_CONNECTION_STRING unset; " + "lazy refresh of /networth will no-op") + worker = asyncio.create_task(_drain_queue(app)) app.state._worker = worker try: @@ -88,6 +100,9 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: eng = getattr(app.state, "engine", None) if eng is not None: await eng.dispose() + wf_eng = getattr(app.state, "wf_sync_engine", None) + if wf_eng is not None: + await wf_eng.dispose() async def _drain_queue(app: FastAPI) -> None: diff --git a/fire_planner/ingest/wealthfolio.py b/fire_planner/ingest/wealthfolio.py index 77dcaa9..554504e 100644 --- a/fire_planner/ingest/wealthfolio.py +++ b/fire_planner/ingest/wealthfolio.py @@ -1,21 +1,37 @@ -"""Upsert helper for wealthfolio account snapshots. +"""Upsert helper + lazy-refresh for the wealthfolio account snapshot cache. -The actual read happens in `wealthfolio_pg.py` (against the -`wealthfolio_sync` PG mirror). This module keeps the upsert helper that -both prod and tests use, so callers can: +`account_snapshot` is a disk cache of the live wealthfolio_sync mirror +(`daily_account_valuation` JOIN `accounts`). It's populated on demand by +`refresh_account_snapshots_if_stale` — invoked from the `/networth`, +`/networth/history`, and `/scenarios/{id}/progress` endpoints on every +request. If the cache is fresher than the TTL, the call is a no-op; if +not, we read from wf_sync, upsert, and serve the fresh data. + +Prior to 2026-05-27 the cache was populated by a manual CLI ingest with +no automation, so it drifted up to 18 days behind reality (see +post-mortem of that day). Lazy refresh-on-read removes the manual step. rows = await read_account_snapshots_from_pg(wf_session) await upsert_snapshots(session, rows) """ from __future__ import annotations +import logging +import os +from datetime import date from typing import Any +from sqlalchemy import func, select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.dialects.sqlite import insert as sqlite_insert from sqlalchemy.ext.asyncio import AsyncSession from fire_planner.db import AccountSnapshot +from fire_planner.ingest.wealthfolio_pg import read_account_snapshots_from_pg + +log = logging.getLogger(__name__) + +_DEFAULT_TTL_DAYS = 1 def _dialect_insert(session: AsyncSession) -> Any: @@ -43,3 +59,49 @@ async def upsert_snapshots(session: AsyncSession, rows: list[dict[str, Any]]) -> stmt = stmt.on_conflict_do_update(index_elements=["external_id"], set_=update_cols) await session.execute(stmt) return len(rows) + + +def _ttl_days() -> int: + raw = os.environ.get("NETWORTH_CACHE_TTL_DAYS", "") + if not raw.strip(): + return _DEFAULT_TTL_DAYS + try: + return max(0, int(raw)) + except ValueError: + log.warning("NETWORTH_CACHE_TTL_DAYS=%r is not an int; using default %d", + raw, _DEFAULT_TTL_DAYS) + return _DEFAULT_TTL_DAYS + + +async def refresh_account_snapshots_if_stale( + session: AsyncSession, + wf_sync: AsyncSession | None, + *, + ttl_days: int | None = None, + now: date | None = None, +) -> bool: + """Refresh `account_snapshot` from wf_sync if the cache is older than TTL. + + Returns True if a refresh ran (rows were upserted), False if the cache + was already fresh or `wf_sync` was None (e.g. unconfigured environment). + + The check uses `MAX(snapshot_date)` — if no rows exist yet the cache is + considered stale and a refresh fires. The upsert is idempotent so + concurrent requests racing on the same stale window don't conflict. + """ + if wf_sync is None: + return False + ttl = _ttl_days() if ttl_days is None else ttl_days + today = now or date.today() + latest = (await session.execute(select(func.max(AccountSnapshot.snapshot_date)))).scalar() + if latest is not None and (today - latest).days < ttl: + return False + rows = await read_account_snapshots_from_pg(wf_sync) + if not rows: + log.warning("refresh: wf_sync returned no rows; cache left as-is " + "(latest cached date=%s)", latest) + return False + n = await upsert_snapshots(session, rows) + await session.commit() + log.info("refresh: upserted %d snapshot row(s) from wf_sync (was latest=%s)", n, latest) + return True diff --git a/tests/test_api_networth_refresh.py b/tests/test_api_networth_refresh.py new file mode 100644 index 0000000..8532e16 --- /dev/null +++ b/tests/test_api_networth_refresh.py @@ -0,0 +1,201 @@ +"""Lazy-refresh behaviour of /networth against the wealthfolio_sync mirror. + +The /networth, /networth/history, and /scenarios/{id}/progress endpoints +read from `account_snapshot`. When that cache is older than +`NETWORTH_CACHE_TTL_DAYS` (default 1) and a wf_sync session is available, +they refresh it from the live mirror before returning. These tests pin +that contract. +""" +from __future__ import annotations + +from collections.abc import AsyncIterator +from datetime import date, timedelta +from decimal import Decimal + +import pytest_asyncio +from httpx import ASGITransport, AsyncClient +from sqlalchemy import select, text +from sqlalchemy.ext.asyncio import ( + AsyncEngine, + AsyncSession, + async_sessionmaker, + create_async_engine, +) + +from fire_planner.api.dependencies import get_session, get_wf_sync_session +from fire_planner.app import app +from fire_planner.db import AccountSnapshot + + +@pytest_asyncio.fixture +async def wf_sync_engine() -> AsyncIterator[AsyncEngine]: + """In-memory aiosqlite engine standing in for the wealthfolio_sync mirror. + + Schema mirrors the real wealthfolio_sync PG tables (defined in + `infra/stacks/wealthfolio/main.tf`) — we only need the subset that + `read_account_snapshots_from_pg` reads. + """ + eng = create_async_engine("sqlite+aiosqlite:///:memory:") + async with eng.begin() as conn: + await conn.exec_driver_sql( + """ + CREATE TABLE accounts ( + id TEXT PRIMARY KEY, + name TEXT, + account_type TEXT, + currency TEXT, + is_active BOOLEAN + ) + """) + await conn.exec_driver_sql( + """ + CREATE TABLE daily_account_valuation ( + id TEXT PRIMARY KEY, + account_id TEXT NOT NULL, + valuation_date DATE NOT NULL, + account_currency TEXT, + base_currency TEXT, + fx_rate_to_base NUMERIC, + cash_balance NUMERIC, + investment_market_value NUMERIC, + total_value NUMERIC, + cost_basis NUMERIC, + net_contribution NUMERIC + ) + """) + yield eng + await eng.dispose() + + +async def _seed_wf_sync(wf_engine: AsyncEngine, valuation_date: date) -> None: + factory = async_sessionmaker(wf_engine, expire_on_commit=False) + async with factory() as sess: + await sess.execute( + text("INSERT INTO accounts (id, name, account_type, currency, is_active) " + "VALUES ('acc-isa', 'ISA', 'ISA', 'GBP', 1)")) + await sess.execute( + text(""" + INSERT INTO daily_account_valuation + (id, account_id, valuation_date, account_currency, base_currency, + fx_rate_to_base, total_value, cost_basis) + VALUES + (:id, 'acc-isa', :d, 'GBP', 'GBP', 1.0, :tv, :cb) + """), + {"id": "dav-isa", "d": valuation_date.isoformat(), + "tv": 1_234_567, "cb": 1_000_000}) + await sess.commit() + + +@pytest_asyncio.fixture +async def client( + engine: AsyncEngine, + session: AsyncSession, + wf_sync_engine: AsyncEngine, +) -> AsyncIterator[AsyncClient]: + fp_factory = async_sessionmaker(engine, expire_on_commit=False) + wf_factory = async_sessionmaker(wf_sync_engine, expire_on_commit=False) + + async def _override_fp() -> AsyncIterator[AsyncSession]: + async with fp_factory() as s: + yield s + + async def _override_wf() -> AsyncIterator[AsyncSession]: + async with wf_factory() as s: + yield s + + app.dependency_overrides[get_session] = _override_fp + app.dependency_overrides[get_wf_sync_session] = _override_wf + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + yield ac + app.dependency_overrides.clear() + + +async def test_networth_refreshes_empty_cache_from_wf_sync( + client: AsyncClient, + session: AsyncSession, + wf_sync_engine: AsyncEngine, +) -> None: + """Cold cache → first /networth call pulls from wf_sync and upserts.""" + today = date.today() + await _seed_wf_sync(wf_sync_engine, today) + + # Cache starts empty. + pre = (await session.execute(select(AccountSnapshot))).scalars().all() + assert pre == [] + + resp = await client.get("/networth") + assert resp.status_code == 200, resp.text + body = resp.json() + assert body["snapshot_date"] == today.isoformat() + assert Decimal(body["total_gbp"]) == Decimal("1234567.00") + + # Cache populated. + persisted = (await session.execute(select(AccountSnapshot))).scalars().all() + assert len(persisted) == 1 + assert persisted[0].account_id == "acc-isa" + assert persisted[0].snapshot_date == today + + +async def test_networth_refreshes_stale_cache_from_wf_sync( + client: AsyncClient, + session: AsyncSession, + wf_sync_engine: AsyncEngine, +) -> None: + """Cache older than TTL → /networth refreshes from wf_sync.""" + today = date.today() + stale_date = today - timedelta(days=5) + + # Pre-seed account_snapshot with stale data. + session.add( + AccountSnapshot( + external_id="wealthfolio:acc-isa:stale", + snapshot_date=stale_date, + account_id="acc-isa", + account_name="ISA", + account_type="ISA", + currency="GBP", + market_value=Decimal("999"), + market_value_gbp=Decimal("999"), + )) + await session.commit() + + # wf_sync has today's row. + await _seed_wf_sync(wf_sync_engine, today) + + resp = await client.get("/networth") + assert resp.status_code == 200 + body = resp.json() + assert body["snapshot_date"] == today.isoformat() + assert Decimal(body["total_gbp"]) == Decimal("1234567.00") + + +async def test_networth_skips_refresh_when_cache_is_fresh( + client: AsyncClient, + session: AsyncSession, + wf_sync_engine: AsyncEngine, +) -> None: + """Cache fresh (today's date) → no refresh, returns cached value.""" + today = date.today() + + # Cache has today's row with one value. + session.add( + AccountSnapshot( + external_id="wealthfolio:acc-isa:fresh", + snapshot_date=today, + account_id="acc-isa", + account_name="ISA", + account_type="ISA", + currency="GBP", + market_value=Decimal("42"), + market_value_gbp=Decimal("42"), + )) + await session.commit() + + # wf_sync has a DIFFERENT value; if refresh fires the response will + # carry 1234567, not 42. + await _seed_wf_sync(wf_sync_engine, today) + + resp = await client.get("/networth") + assert resp.status_code == 200 + assert Decimal(resp.json()["total_gbp"]) == Decimal("42")