"""Upsert helper + lazy-refresh for the wealthfolio account snapshot cache. `account_snapshot` is a disk cache of the live wealthfolio_sync mirror (`daily_account_valuation` JOIN `accounts`). It's populated on demand by `refresh_account_snapshots_if_stale` — invoked from the `/networth`, `/networth/history`, and `/scenarios/{id}/progress` endpoints on every request. If the cache is fresher than the TTL, the call is a no-op; if not, we read from wf_sync, upsert, and serve the fresh data. Prior to 2026-05-27 the cache was populated by a manual CLI ingest with no automation, so it drifted up to 18 days behind reality (see post-mortem of that day). Lazy refresh-on-read removes the manual step. rows = await read_account_snapshots_from_pg(wf_session) await upsert_snapshots(session, rows) """ from __future__ import annotations import logging import os from datetime import date from typing import Any from sqlalchemy import func, select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.dialects.sqlite import insert as sqlite_insert from sqlalchemy.ext.asyncio import AsyncSession from fire_planner.db import AccountSnapshot from fire_planner.ingest.wealthfolio_pg import read_account_snapshots_from_pg log = logging.getLogger(__name__) _DEFAULT_TTL_DAYS = 1 def _dialect_insert(session: AsyncSession) -> Any: bind = session.get_bind() if bind.dialect.name == "sqlite": return sqlite_insert return pg_insert async def upsert_snapshots(session: AsyncSession, rows: list[dict[str, Any]]) -> int: """Idempotent upsert on `external_id` (one row per account per day).""" if not rows: return 0 insert_ = _dialect_insert(session) stmt = insert_(AccountSnapshot).values(rows) update_cols = { "market_value": stmt.excluded.market_value, "market_value_gbp": stmt.excluded.market_value_gbp, "snapshot_date": stmt.excluded.snapshot_date, "account_name": stmt.excluded.account_name, "account_type": stmt.excluded.account_type, "currency": stmt.excluded.currency, "cost_basis_gbp": stmt.excluded.cost_basis_gbp, } stmt = stmt.on_conflict_do_update(index_elements=["external_id"], set_=update_cols) await session.execute(stmt) return len(rows) def _ttl_days() -> int: raw = os.environ.get("NETWORTH_CACHE_TTL_DAYS", "") if not raw.strip(): return _DEFAULT_TTL_DAYS try: return max(0, int(raw)) except ValueError: log.warning("NETWORTH_CACHE_TTL_DAYS=%r is not an int; using default %d", raw, _DEFAULT_TTL_DAYS) return _DEFAULT_TTL_DAYS async def refresh_account_snapshots_if_stale( session: AsyncSession, wf_sync: AsyncSession | None, *, ttl_days: int | None = None, now: date | None = None, ) -> bool: """Refresh `account_snapshot` from wf_sync if the cache is older than TTL. Returns True if a refresh ran (rows were upserted), False if the cache was already fresh or `wf_sync` was None (e.g. unconfigured environment). The check uses `MAX(snapshot_date)` — if no rows exist yet the cache is considered stale and a refresh fires. The upsert is idempotent so concurrent requests racing on the same stale window don't conflict. """ if wf_sync is None: return False ttl = _ttl_days() if ttl_days is None else ttl_days today = now or date.today() latest = (await session.execute(select(func.max(AccountSnapshot.snapshot_date)))).scalar() if latest is not None and (today - latest).days < ttl: return False rows = await read_account_snapshots_from_pg(wf_sync) if not rows: log.warning("refresh: wf_sync returned no rows; cache left as-is " "(latest cached date=%s)", latest) return False n = await upsert_snapshots(session, rows) await session.commit() log.info("refresh: upserted %d snapshot row(s) from wf_sync (was latest=%s)", n, latest) return True