"""Net-worth read endpoints. - GET /networth → latest snapshot per account, totals. Backed by the `fire_planner.account_snapshot` cache, lazily refreshed from the wealthfolio_sync PG mirror on each request when older than `NETWORTH_CACHE_TTL_DAYS` (default 1). - GET /networth/history → daily NW totals + per-account series. Reads LIVE from wealthfolio_sync's `daily_account_valuation` table so the chart always reflects the full daily series (the cache only holds the latest snapshot). Falls back to `account_snapshot` when wf_sync is unavailable (tests, dev). """ from __future__ import annotations from collections import defaultdict from collections.abc import Sequence from datetime import date, timedelta from decimal import Decimal from typing import Any from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession from fire_planner.api.dependencies import get_session, get_wf_sync_session from fire_planner.api.schemas import ( AccountSnapshotOut, NetWorthCurrent, NetWorthHistory, NetWorthHistoryPoint, ) from fire_planner.db import AccountSnapshot from fire_planner.ingest.wealthfolio import refresh_account_snapshots_if_stale router = APIRouter(prefix="/networth", tags=["networth"]) @router.get("", response_model=NetWorthCurrent) async def current_networth( session: AsyncSession = Depends(get_session), wf_sync: AsyncSession | None = Depends(get_wf_sync_session), ) -> NetWorthCurrent: """Latest snapshot per account + GBP total.""" await refresh_account_snapshots_if_stale(session, wf_sync) latest_date = (await session.execute( select(AccountSnapshot.snapshot_date).order_by( AccountSnapshot.snapshot_date.desc()).limit(1))).scalar() if latest_date is None: return NetWorthCurrent(snapshot_date=date.today(), total_gbp=Decimal("0"), accounts=[]) rows = (await session.execute( select(AccountSnapshot).where( AccountSnapshot.snapshot_date == latest_date))).scalars().all() accounts = [AccountSnapshotOut.model_validate(r) for r in rows] total = sum((a.market_value_gbp for a in accounts), Decimal("0")) return NetWorthCurrent(snapshot_date=latest_date, total_gbp=total, accounts=accounts) @router.get("/history", response_model=NetWorthHistory) async def networth_history( session: AsyncSession = Depends(get_session), wf_sync: AsyncSession | None = Depends(get_wf_sync_session), days: int = Query(default=365, ge=1, le=3650, description="Look-back window in days."), from_date: date | None = Query(default=None, alias="from", description="Inclusive start date (YYYY-MM-DD). " "Overrides `days` when set together with `to`."), to_date: date | None = Query(default=None, alias="to", description="Inclusive end date (YYYY-MM-DD). " "Defaults to today when only `from` is set."), ) -> NetWorthHistory: """Daily NW total + per-account breakdown for a stacked area chart. Window selection: - If `from` is set, use [from, to or today]. - Otherwise use the last `days` days ending today. Reads live from wf_sync's `daily_account_valuation` JOIN `accounts` so the chart spans the broker's full daily series. Falls back to the `account_snapshot` cache only when wf_sync isn't wired (tests, dev). """ today = date.today() if from_date is not None: if to_date is None: to_date = today if from_date > to_date: raise HTTPException(status_code=422, detail="`from` must be on or before `to`") window_start, window_end = from_date, to_date else: window_end = to_date or today window_start = window_end - timedelta(days=max(days - 1, 0)) points = await _history_from_wf_sync(wf_sync, window_start, window_end) if points is None: # wf_sync not wired — fall back to the cache. Useful for tests # and for environments without the mirror. points = await _history_from_cache(session, window_start, window_end) return NetWorthHistory(points=points) async def _history_from_wf_sync( wf_sync: AsyncSession | None, window_start: date, window_end: date, ) -> list[NetWorthHistoryPoint] | None: if wf_sync is None: return None rows = (await wf_sync.execute( text(""" SELECT d.valuation_date, a.name, d.total_value * COALESCE(d.fx_rate_to_base, 1.0) AS market_value_gbp FROM daily_account_valuation d JOIN accounts a ON a.id = d.account_id WHERE d.valuation_date BETWEEN :start AND :end ORDER BY d.valuation_date """), {"start": window_start, "end": window_end})).all() if not rows: return [] return _group_by_date(rows) async def _history_from_cache( session: AsyncSession, window_start: date, window_end: date, ) -> list[NetWorthHistoryPoint]: rows = (await session.execute( select( AccountSnapshot.snapshot_date, AccountSnapshot.account_name, AccountSnapshot.market_value_gbp, ).where(AccountSnapshot.snapshot_date.between(window_start, window_end)) .order_by(AccountSnapshot.snapshot_date))).all() return _group_by_date(rows) def _group_by_date(rows: Sequence[Any]) -> list[NetWorthHistoryPoint]: by_date: dict[date, dict[str, Decimal]] = defaultdict(lambda: defaultdict(lambda: Decimal("0"))) for snap_date, name, value in rows: by_date[snap_date][name] += Decimal(str(value)) return [ NetWorthHistoryPoint( snapshot_date=d, total_gbp=sum(by_date[d].values(), Decimal("0")), by_account=dict(by_date[d]), ) for d in sorted(by_date.keys()) ]