fire-planner: lazy-refresh /networth from wf_sync (default TTL 1d)
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

The account_snapshot cache fed /networth, /networth/history, and
/scenarios/{id}/progress. No CronJob populated it, so the cache had
drifted ~18 days behind the wealthfolio_sync mirror (last refresh
2026-05-09 via manual kubectl exec; Grafana reads wf_sync directly
and stayed fresh).

Switch to lazy refresh on read: each request to those endpoints now
checks MAX(account_snapshot.snapshot_date) — if it's older than
NETWORTH_CACHE_TTL_DAYS (default 1), pull fresh rows from wf_sync via
read_account_snapshots_from_pg and upsert. Idempotent under
concurrency (existing ON CONFLICT DO UPDATE).

Plumbing:
- Add get_wf_sync_session dependency that yields None when the wf_sync
  factory isn't wired (keeps existing tests' behaviour: no refresh
  attempted, they continue to seed account_snapshot directly).
- Wire wf_sync engine + session_factory in app.lifespan when
  WEALTHFOLIO_SYNC_DB_CONNECTION_STRING is set.
- Centralise the staleness check in refresh_account_snapshots_if_stale.

Tests:
- 271 existing tests still green.
- Three new tests in test_api_networth_refresh.py covering: empty cache
  triggers refresh, stale cache triggers refresh, fresh cache skips
  refresh (asserts the wf_sync value is NOT served).
This commit is contained in:
Viktor Barzin 2026-05-27 18:21:12 +00:00
parent e72fd22a17
commit 4da58fe56e
6 changed files with 317 additions and 9 deletions

View file

@ -16,3 +16,22 @@ async def get_session(request: Request) -> AsyncIterator[AsyncSession]:
factory: async_sessionmaker[AsyncSession] = request.app.state.session_factory
async with factory() as session:
yield session
async def get_wf_sync_session(request: Request) -> AsyncIterator[AsyncSession | None]:
"""Yield an AsyncSession bound to the wealthfolio_sync mirror.
Powers the lazy refresh of `account_snapshot` (the disk cache for
/networth). Yields `None` when the factory is not wired so endpoints
can fall back to whatever is already cached relevant for the in-memory
SQLite test runs that don't wire wf_sync. Tests that DO want to exercise
the refresh override this with their own factory via
`app.dependency_overrides`.
"""
factory: async_sessionmaker[AsyncSession] | None = getattr(
request.app.state, "wf_sync_session_factory", None)
if factory is None:
yield None
return
async with factory() as session:
yield session

View file

@ -1,7 +1,8 @@
"""Net-worth read endpoints.
Reads from `fire_planner.account_snapshot` (populated hourly by the
wealthfolio ingest). Two views:
Reads from `fire_planner.account_snapshot`, which is lazily refreshed
from the wealthfolio_sync PG mirror on each request when older than
`NETWORTH_CACHE_TTL_DAYS` (default 1). Two views:
- GET /networth latest snapshot per account, totals
- GET /networth/history daily totals + per-account series, for charts
"""
@ -15,7 +16,7 @@ from fastapi import APIRouter, Depends, Query
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from fire_planner.api.dependencies import get_session
from fire_planner.api.dependencies import get_session, get_wf_sync_session
from fire_planner.api.schemas import (
AccountSnapshotOut,
NetWorthCurrent,
@ -23,13 +24,18 @@ from fire_planner.api.schemas import (
NetWorthHistoryPoint,
)
from fire_planner.db import AccountSnapshot
from fire_planner.ingest.wealthfolio import refresh_account_snapshots_if_stale
router = APIRouter(prefix="/networth", tags=["networth"])
@router.get("", response_model=NetWorthCurrent)
async def current_networth(session: AsyncSession = Depends(get_session)) -> NetWorthCurrent:
async def current_networth(
session: AsyncSession = Depends(get_session),
wf_sync: AsyncSession | None = Depends(get_wf_sync_session),
) -> NetWorthCurrent:
"""Latest snapshot per account + GBP total."""
await refresh_account_snapshots_if_stale(session, wf_sync)
latest_date = (await session.execute(
select(AccountSnapshot.snapshot_date).order_by(
AccountSnapshot.snapshot_date.desc()).limit(1))).scalar()
@ -46,6 +52,7 @@ async def current_networth(session: AsyncSession = Depends(get_session)) -> NetW
@router.get("/history", response_model=NetWorthHistory)
async def networth_history(
session: AsyncSession = Depends(get_session),
wf_sync: AsyncSession | None = Depends(get_wf_sync_session),
days: int = Query(default=365, ge=1, le=3650, description="Look-back window."),
) -> NetWorthHistory:
"""Daily NW total + per-account breakdown for a stacked area chart.
@ -53,6 +60,7 @@ async def networth_history(
Picks one row per (account_id, snapshot_date) wealthfolio ingest
upserts daily so this is already de-duped, but we group defensively.
"""
await refresh_account_snapshots_if_stale(session, wf_sync)
rows = (await session.execute(
select(
AccountSnapshot.snapshot_date,

View file

@ -16,7 +16,7 @@ from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from fire_planner.api.dependencies import get_session
from fire_planner.api.dependencies import get_session, get_wf_sync_session
from fire_planner.api.schemas import (
ProgressActualPoint,
ProgressProjectedPoint,
@ -24,6 +24,7 @@ from fire_planner.api.schemas import (
ProgressVariancePoint,
)
from fire_planner.db import AccountSnapshot, McRun, ProjectionYearly, Scenario
from fire_planner.ingest.wealthfolio import refresh_account_snapshots_if_stale
router = APIRouter(prefix="/scenarios", tags=["progress"])
@ -32,11 +33,13 @@ router = APIRouter(prefix="/scenarios", tags=["progress"])
async def get_progress(
scenario_id: int,
session: AsyncSession = Depends(get_session),
wf_sync: AsyncSession | None = Depends(get_wf_sync_session),
) -> ProgressResponse:
scen = await session.get(Scenario, scenario_id)
if scen is None:
raise HTTPException(status_code=404, detail="Scenario not found")
await refresh_account_snapshots_if_stale(session, wf_sync)
actuals_rows = (await session.execute(
select(
AccountSnapshot.snapshot_date,

View file

@ -53,6 +53,7 @@ from fire_planner.api.spending import router as spending_router
from fire_planner.api.spending_profile import router as spending_profile_router
from fire_planner.api.year_stats import router as year_stats_router
from fire_planner.db import create_engine_from_env, make_session_factory
from fire_planner.ingest.wealthfolio_pg import create_wf_sync_engine_from_env
log = logging.getLogger(__name__)
@ -77,6 +78,17 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# Tests inject these via dependency_overrides; nothing to wire.
log.warning("DB_CONNECTION_STRING unset; skipping engine init")
# wealthfolio_sync engine — powers the lazy refresh of /networth's
# account_snapshot cache. Optional: when unset we serve whatever is
# already cached in account_snapshot (still useful for tests + dev).
if os.environ.get("WEALTHFOLIO_SYNC_DB_CONNECTION_STRING"):
wf_engine = create_wf_sync_engine_from_env()
app.state.wf_sync_engine = wf_engine
app.state.wf_sync_session_factory = make_session_factory(wf_engine)
else:
log.info("WEALTHFOLIO_SYNC_DB_CONNECTION_STRING unset; "
"lazy refresh of /networth will no-op")
worker = asyncio.create_task(_drain_queue(app))
app.state._worker = worker
try:
@ -88,6 +100,9 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
eng = getattr(app.state, "engine", None)
if eng is not None:
await eng.dispose()
wf_eng = getattr(app.state, "wf_sync_engine", None)
if wf_eng is not None:
await wf_eng.dispose()
async def _drain_queue(app: FastAPI) -> None:

View file

@ -1,21 +1,37 @@
"""Upsert helper for wealthfolio account snapshots.
"""Upsert helper + lazy-refresh for the wealthfolio account snapshot cache.
The actual read happens in `wealthfolio_pg.py` (against the
`wealthfolio_sync` PG mirror). This module keeps the upsert helper that
both prod and tests use, so callers can:
`account_snapshot` is a disk cache of the live wealthfolio_sync mirror
(`daily_account_valuation` JOIN `accounts`). It's populated on demand by
`refresh_account_snapshots_if_stale` invoked from the `/networth`,
`/networth/history`, and `/scenarios/{id}/progress` endpoints on every
request. If the cache is fresher than the TTL, the call is a no-op; if
not, we read from wf_sync, upsert, and serve the fresh data.
Prior to 2026-05-27 the cache was populated by a manual CLI ingest with
no automation, so it drifted up to 18 days behind reality (see
post-mortem of that day). Lazy refresh-on-read removes the manual step.
rows = await read_account_snapshots_from_pg(wf_session)
await upsert_snapshots(session, rows)
"""
from __future__ import annotations
import logging
import os
from datetime import date
from typing import Any
from sqlalchemy import func, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy.ext.asyncio import AsyncSession
from fire_planner.db import AccountSnapshot
from fire_planner.ingest.wealthfolio_pg import read_account_snapshots_from_pg
log = logging.getLogger(__name__)
_DEFAULT_TTL_DAYS = 1
def _dialect_insert(session: AsyncSession) -> Any:
@ -43,3 +59,49 @@ async def upsert_snapshots(session: AsyncSession, rows: list[dict[str, Any]]) ->
stmt = stmt.on_conflict_do_update(index_elements=["external_id"], set_=update_cols)
await session.execute(stmt)
return len(rows)
def _ttl_days() -> int:
raw = os.environ.get("NETWORTH_CACHE_TTL_DAYS", "")
if not raw.strip():
return _DEFAULT_TTL_DAYS
try:
return max(0, int(raw))
except ValueError:
log.warning("NETWORTH_CACHE_TTL_DAYS=%r is not an int; using default %d",
raw, _DEFAULT_TTL_DAYS)
return _DEFAULT_TTL_DAYS
async def refresh_account_snapshots_if_stale(
session: AsyncSession,
wf_sync: AsyncSession | None,
*,
ttl_days: int | None = None,
now: date | None = None,
) -> bool:
"""Refresh `account_snapshot` from wf_sync if the cache is older than TTL.
Returns True if a refresh ran (rows were upserted), False if the cache
was already fresh or `wf_sync` was None (e.g. unconfigured environment).
The check uses `MAX(snapshot_date)` if no rows exist yet the cache is
considered stale and a refresh fires. The upsert is idempotent so
concurrent requests racing on the same stale window don't conflict.
"""
if wf_sync is None:
return False
ttl = _ttl_days() if ttl_days is None else ttl_days
today = now or date.today()
latest = (await session.execute(select(func.max(AccountSnapshot.snapshot_date)))).scalar()
if latest is not None and (today - latest).days < ttl:
return False
rows = await read_account_snapshots_from_pg(wf_sync)
if not rows:
log.warning("refresh: wf_sync returned no rows; cache left as-is "
"(latest cached date=%s)", latest)
return False
n = await upsert_snapshots(session, rows)
await session.commit()
log.info("refresh: upserted %d snapshot row(s) from wf_sync (was latest=%s)", n, latest)
return True