From 23d11bdf6dcb3e0cbb19bb19709b77cda15a0484 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 9 May 2026 21:33:48 +0000 Subject: [PATCH] ingest: switch wealthfolio to pg-sync mirror reads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous SQLite-direct reader queried `holdings_snapshot` (singular) and `accounts.type` — both wrong against the live wealthfolio schema (plural `holdings_snapshots`, column `account_type`). It silently returned [] via the OperationalError fallback, leaving fire-planner with stale account snapshots. Switch to reading from the wealthfolio_sync PG mirror. The pg-sync sidecar (defined in infra/stacks/wealthfolio) hourly mirrors SQLite to Postgres with a clean schema. We read from `daily_account_valuation` which already has total_value, cost_basis, and explicit fx_rate_to_base per row — no JSON-decoding of position blobs. CLI ingest no longer takes --db-path (no kubectl-exec gymnastics); reads WEALTHFOLIO_SYNC_DB_CONNECTION_STRING from env. Falls back to DB_CONNECTION_STRING for single-DB local dev. 13 new tests covering: latest-per-account, multi-currency FX, explicit as-of, empty mirror, null cost_basis, full pipeline through upsert. 140 tests pass; mypy strict + ruff clean. Co-Authored-By: Claude Opus 4.7 --- fire_planner/__main__.py | 43 ++-- fire_planner/ingest/wealthfolio.py | 99 +-------- fire_planner/ingest/wealthfolio_pg.py | 115 +++++++++++ tests/test_ingest_wealthfolio.py | 97 --------- tests/test_ingest_wealthfolio_pg.py | 281 ++++++++++++++++++++++++++ 5 files changed, 432 insertions(+), 203 deletions(-) create mode 100644 fire_planner/ingest/wealthfolio_pg.py delete mode 100644 tests/test_ingest_wealthfolio.py create mode 100644 tests/test_ingest_wealthfolio_pg.py diff --git a/fire_planner/__main__.py b/fire_planner/__main__.py index ce37fff..8a2c452 100644 --- a/fire_planner/__main__.py +++ b/fire_planner/__main__.py @@ -2,7 +2,7 @@ Sub-commands: - migrate — alembic upgrade head -- ingest [wealthfolio] — load wealthfolio sqlite into account_snapshot +- ingest [wealthfolio] — pull from wealthfolio_sync PG mirror into account_snapshot - simulate — run a single scenario, pretty-print - recompute-all — run the 120-scenario Cartesian, persist all - serve — run the FastAPI on-demand /recompute server @@ -21,10 +21,15 @@ from pathlib import Path import click import numpy as np +from sqlalchemy.ext.asyncio import async_sessionmaker from fire_planner.db import create_engine_from_env, make_session_factory from fire_planner.glide_path import get as get_glide -from fire_planner.ingest import wealthfolio as wf_ingest +from fire_planner.ingest.wealthfolio import upsert_snapshots +from fire_planner.ingest.wealthfolio_pg import ( + create_wf_sync_engine_from_env, + read_account_snapshots_from_pg, +) from fire_planner.reporters.cli import format_scenario from fire_planner.reporters.pg import write_run from fire_planner.returns.bootstrap import block_bootstrap @@ -57,32 +62,38 @@ def migrate() -> None: type=click.Choice(["wealthfolio"]), default="wealthfolio", help="Data source — currently only wealthfolio is wired.") -@click.option("--db-path", - type=click.Path(exists=True, dir_okay=False, path_type=Path), - required=False, - help="Local sqlite path (after kubectl exec). Required for --source=wealthfolio.") @click.option("--as-of", type=click.DateTime(formats=["%Y-%m-%d"]), default=None, - help="Snapshot date to read; defaults to MAX(snapshot_date) in the sqlite.") -def ingest(source: str, db_path: Path | None, as_of: date | None) -> None: + help="Valuation date to read; defaults to MAX(valuation_date) in the mirror.") +def ingest(source: str, as_of: date | None) -> None: """Pull external state into fire_planner.account_snapshot.""" if source == "wealthfolio": - if db_path is None: - raise click.UsageError("--db-path is required for --source=wealthfolio") - asyncio.run(_ingest_wealthfolio(db_path, as_of)) + asyncio.run(_ingest_wealthfolio(as_of)) -async def _ingest_wealthfolio(db_path: Path, as_of: date | None) -> None: - rows = wf_ingest.read_account_snapshots(db_path, as_of=as_of) +async def _ingest_wealthfolio(as_of: date | None) -> None: + """Read account snapshots from wealthfolio_sync PG mirror, upsert.""" + wf_engine = create_wf_sync_engine_from_env() + try: + wf_factory = async_sessionmaker(wf_engine, expire_on_commit=False) + async with wf_factory() as wf_sess: + rows = await read_account_snapshots_from_pg(wf_sess, as_of=as_of) + finally: + await wf_engine.dispose() + if not rows: - click.echo("warning: no rows read — wealthfolio sqlite empty or schema unrecognised", - err=True) + click.echo( + "warning: no rows read — wealthfolio_sync mirror is empty or " + "no rows on the requested date", + err=True, + ) + engine = create_engine_from_env() factory = make_session_factory(engine) try: async with factory() as sess: - n = await wf_ingest.upsert_snapshots(sess, rows) + n = await upsert_snapshots(sess, rows) await sess.commit() click.echo(f"wealthfolio ingest: {n} rows upserted") finally: diff --git a/fire_planner/ingest/wealthfolio.py b/fire_planner/ingest/wealthfolio.py index 5567cc6..77dcaa9 100644 --- a/fire_planner/ingest/wealthfolio.py +++ b/fire_planner/ingest/wealthfolio.py @@ -1,28 +1,14 @@ -"""Wealthfolio ingest — kubectl exec into the wealthfolio pod, read the -SQLite DB read-only, parse account snapshots, upsert into -`fire_planner.account_snapshot`. +"""Upsert helper for wealthfolio account snapshots. -Wealthfolio stores every account's NW + holdings in -`/data/app.db` (SQLite). The published schema (post-2025) keeps a -`holdings_snapshot` table per (account_id, date). For the planner we -fold to total NW per account per day. +The actual read happens in `wealthfolio_pg.py` (against the +`wealthfolio_sync` PG mirror). This module keeps the upsert helper that +both prod and tests use, so callers can: -Phase 0 prerequisite: `wealthfolio-sync` must record a snapshot for -every active account every day. Until that lands the Schwab and -InvestEngine accounts read as stale snapshots from years ago and the -planner anchors on £154k instead of the real ~£1M. See -`fire-planner/README.md` and the parent CLAUDE.md project memory. - -This module does NOT shell out to kubectl — that's the operator's job. -Instead, callers pass an already-fetched local SQLite file path -(typically `/tmp/wealthfolio.db`). The CLI wraps the kubectl exec. + rows = await read_account_snapshots_from_pg(wf_session) + await upsert_snapshots(session, rows) """ from __future__ import annotations -import sqlite3 -from datetime import date -from decimal import Decimal -from pathlib import Path from typing import Any from sqlalchemy.dialects.postgresql import insert as pg_insert @@ -39,77 +25,8 @@ def _dialect_insert(session: AsyncSession) -> Any: return pg_insert -def read_account_snapshots(db_path: str | Path, as_of: date | None = None) -> list[dict[str, Any]]: - """Read the latest snapshot row per account. - - Returns a list of dicts ready for upsert into `account_snapshot`. - Each dict has: external_id, snapshot_date, account_id, account_name, - account_type, currency, market_value, market_value_gbp. - """ - db_path = Path(db_path) - if not db_path.exists(): - raise FileNotFoundError(f"Wealthfolio sqlite db not found: {db_path}") - conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) - conn.row_factory = sqlite3.Row - try: - rows = list(_query_snapshots(conn, as_of)) - finally: - conn.close() - return rows - - -def _query_snapshots(conn: sqlite3.Connection, as_of: date | None) -> list[dict[str, Any]]: - """Wealthfolio's actual schema is opaque (different versions ship - different tables). We try the v1 layout first (`accounts` + - `holdings_snapshot`); if that fails, return empty and let the CLI - surface the error to the operator. - """ - cur = conn.cursor() - try: - if as_of is None: - cur.execute("SELECT MAX(snapshot_date) FROM holdings_snapshot", ) - row = cur.fetchone() - as_of_str = row[0] if row and row[0] else date.today().isoformat() - else: - as_of_str = as_of.isoformat() - - cur.execute( - """ - SELECT a.id AS account_id, - a.name AS account_name, - a.type AS account_type, - a.currency AS currency, - SUM(h.market_value) AS market_value, - SUM(h.market_value_gbp) AS market_value_gbp, - ? AS snapshot_date - FROM holdings_snapshot h - JOIN accounts a ON a.id = h.account_id - WHERE h.snapshot_date = ? - GROUP BY a.id - """, - (as_of_str, as_of_str), - ) - except sqlite3.OperationalError: - # Fallback: empty list — the operator should run wealthfolio-sync - # to populate snapshots and try again. - return [] - rows = [] - for row in cur.fetchall(): - snap_date = date.fromisoformat(row["snapshot_date"]) - rows.append({ - "external_id": f"wealthfolio:{row['account_id']}:{row['snapshot_date']}", - "snapshot_date": snap_date, - "account_id": str(row["account_id"]), - "account_name": row["account_name"] or "", - "account_type": row["account_type"] or "unknown", - "currency": row["currency"] or "GBP", - "market_value": Decimal(str(row["market_value"] or 0)), - "market_value_gbp": Decimal(str(row["market_value_gbp"] or 0)), - }) - return rows - - async def upsert_snapshots(session: AsyncSession, rows: list[dict[str, Any]]) -> int: + """Idempotent upsert on `external_id` (one row per account per day).""" if not rows: return 0 insert_ = _dialect_insert(session) @@ -120,6 +37,8 @@ async def upsert_snapshots(session: AsyncSession, rows: list[dict[str, Any]]) -> "snapshot_date": stmt.excluded.snapshot_date, "account_name": stmt.excluded.account_name, "account_type": stmt.excluded.account_type, + "currency": stmt.excluded.currency, + "cost_basis_gbp": stmt.excluded.cost_basis_gbp, } stmt = stmt.on_conflict_do_update(index_elements=["external_id"], set_=update_cols) await session.execute(stmt) diff --git a/fire_planner/ingest/wealthfolio_pg.py b/fire_planner/ingest/wealthfolio_pg.py new file mode 100644 index 0000000..3f15cee --- /dev/null +++ b/fire_planner/ingest/wealthfolio_pg.py @@ -0,0 +1,115 @@ +"""Wealthfolio ingest from the wealthfolio_sync Postgres mirror. + +The wealthfolio pod runs a `pg-sync` sidecar that hourly snapshots its +local SQLite to the `wealthfolio_sync` PG database. Reading from that +mirror (rather than the live SQLite via kubectl-exec) is more reliable: +no cross-pod copy, no schema-mismatch swallowing, multi-process safe, +and Grafana already trusts it. + +Source schema (defined in `infra/stacks/wealthfolio/main.tf`): + + accounts(id, name, account_type, currency, is_active) + daily_account_valuation(id, account_id, valuation_date, + account_currency, base_currency, + fx_rate_to_base, + cash_balance, investment_market_value, + total_value, cost_basis, net_contribution) + +`daily_account_valuation` is the right table for time-series NW per +account — `total_value` is in `account_currency`; multiply by +`fx_rate_to_base` to get the user's base (typically GBP). The mirror +filters out the synthetic `'TOTAL'` account_id so we never see it here. +""" +from __future__ import annotations + +import os +from datetime import date +from decimal import ROUND_HALF_EVEN, Decimal +from typing import Any + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine + +_TWO_PLACES = Decimal("0.01") + + +def create_wf_sync_engine_from_env() -> AsyncEngine: + """Engine for the wealthfolio_sync mirror DB. + + Reads `WEALTHFOLIO_SYNC_DB_CONNECTION_STRING`; falls back to + `DB_CONNECTION_STRING` for single-DB local dev. Raises if neither + is set so callers fail loudly instead of silently using the wrong DB. + """ + url = ( + os.environ.get("WEALTHFOLIO_SYNC_DB_CONNECTION_STRING") + or os.environ.get("DB_CONNECTION_STRING") + ) + if not url: + raise RuntimeError( + "WEALTHFOLIO_SYNC_DB_CONNECTION_STRING (or DB_CONNECTION_STRING) " + "must be set to read the wealthfolio_sync mirror" + ) + return create_async_engine(url, pool_pre_ping=True) + + +async def read_account_snapshots_from_pg( + wf_session: AsyncSession, + as_of: date | None = None, +) -> list[dict[str, Any]]: + """Read latest NW per account from `daily_account_valuation`. + + Returns a list of dicts with the same shape as + `wealthfolio.read_account_snapshots()`, so `upsert_snapshots()` + accepts either source. + + If `as_of` is None, picks the max `valuation_date` in the mirror + (i.e. "latest"). If `as_of` is given, returns rows for that exact + date; if no rows exist, returns []. + """ + if as_of is None: + latest = (await wf_session.execute( + text("SELECT MAX(valuation_date) FROM daily_account_valuation"))).scalar() + if latest is None: + return [] + target = latest if isinstance(latest, date) else date.fromisoformat(str(latest)) + else: + target = as_of + + rows = (await wf_session.execute( + text(""" + SELECT a.id AS account_id, + a.name AS account_name, + a.account_type AS account_type, + COALESCE(d.account_currency, a.currency, 'GBP') AS currency, + d.total_value AS total_value, + d.cost_basis AS cost_basis, + COALESCE(d.fx_rate_to_base, 1.0) AS fx_rate_to_base, + d.valuation_date AS snapshot_date + FROM daily_account_valuation d + JOIN accounts a ON a.id = d.account_id + WHERE d.valuation_date = :as_of + """), {"as_of": target})).mappings().all() + + out: list[dict[str, Any]] = [] + for r in rows: + snapshot_date = (r["snapshot_date"] if isinstance(r["snapshot_date"], date) else + date.fromisoformat(str(r["snapshot_date"]))) + total_value = Decimal(str(r["total_value"] or 0)) + fx = Decimal(str(r["fx_rate_to_base"])) + market_value_gbp = (total_value * fx).quantize(_TWO_PLACES, rounding=ROUND_HALF_EVEN) + cost_basis_gbp: Decimal | None = None + if r["cost_basis"] is not None: + cost_basis_gbp = (Decimal(str(r["cost_basis"])) * + fx).quantize(_TWO_PLACES, rounding=ROUND_HALF_EVEN) + out.append({ + "external_id": f"wealthfolio:{r['account_id']}:{snapshot_date.isoformat()}", + "snapshot_date": snapshot_date, + "account_id": str(r["account_id"]), + "account_name": r["account_name"] or "", + "account_type": r["account_type"] or "unknown", + "currency": r["currency"] or "GBP", + "market_value": total_value, + "market_value_gbp": market_value_gbp, + "cost_basis_gbp": cost_basis_gbp, + }) + return out diff --git a/tests/test_ingest_wealthfolio.py b/tests/test_ingest_wealthfolio.py deleted file mode 100644 index 23ab429..0000000 --- a/tests/test_ingest_wealthfolio.py +++ /dev/null @@ -1,97 +0,0 @@ -"""Wealthfolio ingest reads a real-shape sqlite and upserts cleanly.""" -from __future__ import annotations - -import sqlite3 -from datetime import date -from decimal import Decimal -from pathlib import Path - -import pytest -from sqlalchemy import select -from sqlalchemy.ext.asyncio import AsyncSession - -from fire_planner.db import AccountSnapshot -from fire_planner.ingest.wealthfolio import read_account_snapshots, upsert_snapshots - - -@pytest.fixture -def wealthfolio_db(tmp_path: Path) -> Path: - """Create a minimal sqlite mimicking Wealthfolio's schema.""" - db_path = tmp_path / "wealthfolio.db" - conn = sqlite3.connect(db_path) - cur = conn.cursor() - cur.executescript(""" - CREATE TABLE accounts ( - id TEXT PRIMARY KEY, - name TEXT, - type TEXT, - currency TEXT - ); - CREATE TABLE holdings_snapshot ( - account_id TEXT, - snapshot_date TEXT, - symbol TEXT, - market_value REAL, - market_value_gbp REAL - ); - INSERT INTO accounts VALUES ('acc-isa', 'ISA', 'ISA', 'GBP'); - INSERT INTO accounts VALUES ('acc-schwab', 'Schwab', 'BROKERAGE', 'USD'); - INSERT INTO holdings_snapshot VALUES ('acc-isa', '2026-04-25', 'VWRL', 200000, 200000); - INSERT INTO holdings_snapshot VALUES ('acc-isa', '2026-04-25', 'BND', 100000, 100000); - INSERT INTO holdings_snapshot VALUES ('acc-schwab', '2026-04-25', 'META', 800000, 640000); - """) - conn.commit() - conn.close() - return db_path - - -def test_read_groups_holdings_per_account(wealthfolio_db: Path) -> None: - rows = read_account_snapshots(wealthfolio_db) - assert len(rows) == 2 - by_id = {r["account_id"]: r for r in rows} - assert by_id["acc-isa"]["market_value_gbp"] == Decimal("300000") - assert by_id["acc-schwab"]["market_value_gbp"] == Decimal("640000") - assert by_id["acc-isa"]["snapshot_date"] == date(2026, 4, 25) - - -def test_read_returns_empty_on_unknown_schema(tmp_path: Path) -> None: - """If the sqlite has a totally different shape, return [] rather - than blow up — let the operator surface the warning.""" - db = tmp_path / "weird.db" - conn = sqlite3.connect(db) - conn.execute("CREATE TABLE foo (x INTEGER)") - conn.commit() - conn.close() - assert read_account_snapshots(db) == [] - - -def test_read_missing_file_raises(tmp_path: Path) -> None: - with pytest.raises(FileNotFoundError): - read_account_snapshots(tmp_path / "nope.db") - - -async def test_upsert_inserts_new_rows(session: AsyncSession, wealthfolio_db: Path) -> None: - rows = read_account_snapshots(wealthfolio_db) - n = await upsert_snapshots(session, rows) - await session.commit() - assert n == 2 - persisted = (await session.execute(select(AccountSnapshot))).scalars().all() - assert len(persisted) == 2 - by_id = {p.account_id: p for p in persisted} - assert by_id["acc-isa"].market_value_gbp == Decimal("300000") - - -async def test_upsert_is_idempotent(session: AsyncSession, wealthfolio_db: Path) -> None: - rows = read_account_snapshots(wealthfolio_db) - await upsert_snapshots(session, rows) - await session.commit() - # Run again — should still be 2 rows, not 4 - await upsert_snapshots(session, rows) - await session.commit() - persisted = (await session.execute(select(AccountSnapshot))).scalars().all() - assert len(persisted) == 2 - - -async def test_upsert_zero_rows_is_noop(session: AsyncSession) -> None: - n = await upsert_snapshots(session, []) - assert n == 0 diff --git a/tests/test_ingest_wealthfolio_pg.py b/tests/test_ingest_wealthfolio_pg.py new file mode 100644 index 0000000..fa2dc24 --- /dev/null +++ b/tests/test_ingest_wealthfolio_pg.py @@ -0,0 +1,281 @@ +"""Wealthfolio ingest from the wealthfolio_sync PG mirror. + +The mirror runs as a sidecar inside the wealthfolio pod; it dumps SQLite +into Postgres hourly. Schema (public namespace, defined in +infra/stacks/wealthfolio/main.tf): + + accounts(id, name, account_type, currency, is_active) + daily_account_valuation(id, account_id, valuation_date, + account_currency, base_currency, + fx_rate_to_base, cash_balance, + investment_market_value, total_value, + cost_basis, net_contribution) + +The mirror filters out the synthetic 'TOTAL' account_id at copy time, so +we never see it here. +""" +from __future__ import annotations + +from collections.abc import AsyncIterator +from datetime import date +from decimal import Decimal + +import pytest +import pytest_asyncio +from sqlalchemy import text +from sqlalchemy.ext.asyncio import ( + AsyncEngine, + AsyncSession, + async_sessionmaker, + create_async_engine, +) + +from fire_planner.db import AccountSnapshot +from fire_planner.ingest.wealthfolio import upsert_snapshots +from fire_planner.ingest.wealthfolio_pg import read_account_snapshots_from_pg + + +@pytest_asyncio.fixture +async def wf_engine() -> AsyncIterator[AsyncEngine]: + """In-memory aiosqlite engine standing in for the wealthfolio_sync PG DB. + + The real mirror uses Postgres; we use SQLite for tests but keep DDL + column names and types identical to the deployed schema (verified + against infra/stacks/wealthfolio/main.tf:344-373). + """ + eng = create_async_engine("sqlite+aiosqlite:///:memory:") + async with eng.begin() as conn: + await conn.exec_driver_sql( + """ + CREATE TABLE accounts ( + id TEXT PRIMARY KEY, + name TEXT, + account_type TEXT, + currency TEXT, + is_active BOOLEAN + ) + """ + ) + await conn.exec_driver_sql( + """ + CREATE TABLE daily_account_valuation ( + id TEXT PRIMARY KEY, + account_id TEXT NOT NULL, + valuation_date DATE NOT NULL, + account_currency TEXT, + base_currency TEXT, + fx_rate_to_base NUMERIC, + cash_balance NUMERIC, + investment_market_value NUMERIC, + total_value NUMERIC, + cost_basis NUMERIC, + net_contribution NUMERIC + ) + """ + ) + yield eng + await eng.dispose() + + +@pytest_asyncio.fixture +async def wf_session(wf_engine: AsyncEngine) -> AsyncIterator[AsyncSession]: + factory = async_sessionmaker(wf_engine, expire_on_commit=False) + async with factory() as sess: + yield sess + + +async def _seed_basic(wf_session: AsyncSession) -> None: + """Two accounts, two dates each, one GBP-native + one USD with FX.""" + await wf_session.execute( + text( + """ + INSERT INTO accounts (id, name, account_type, currency, is_active) VALUES + ('acc-isa', 'ISA', 'ISA', 'GBP', 1), + ('acc-schwab', 'Schwab', 'BROKERAGE', 'USD', 1) + """ + ) + ) + # 2026-04-24 — earlier; should be ignored when as_of=None + await wf_session.execute( + text( + """ + INSERT INTO daily_account_valuation + (id, account_id, valuation_date, account_currency, base_currency, + fx_rate_to_base, cash_balance, investment_market_value, + total_value, cost_basis, net_contribution) + VALUES + ('dav-isa-old', 'acc-isa', '2026-04-24', 'GBP', 'GBP', + 1.0, 0, 290000, 290000, 250000, 200000), + ('dav-schwab-old', 'acc-schwab', '2026-04-24', 'USD', 'GBP', + 0.79, 1000, 789000, 790000, 600000, 400000) + """ + ) + ) + # 2026-04-25 — latest; this is what we expect when as_of=None + await wf_session.execute( + text( + """ + INSERT INTO daily_account_valuation + (id, account_id, valuation_date, account_currency, base_currency, + fx_rate_to_base, cash_balance, investment_market_value, + total_value, cost_basis, net_contribution) + VALUES + ('dav-isa-new', 'acc-isa', '2026-04-25', 'GBP', 'GBP', + 1.0, 0, 300000, 300000, 250000, 200000), + ('dav-schwab-new', 'acc-schwab', '2026-04-25', 'USD', 'GBP', + 0.80, 1000, 799000, 800000, 600000, 400000) + """ + ) + ) + await wf_session.commit() + + +async def test_read_returns_latest_snapshot_per_account(wf_session: AsyncSession) -> None: + await _seed_basic(wf_session) + rows = await read_account_snapshots_from_pg(wf_session) + assert len(rows) == 2 + by_id = {r["account_id"]: r for r in rows} + assert by_id["acc-isa"]["snapshot_date"] == date(2026, 4, 25) + assert by_id["acc-schwab"]["snapshot_date"] == date(2026, 4, 25) + + +async def test_read_converts_to_base_currency(wf_session: AsyncSession) -> None: + await _seed_basic(wf_session) + rows = await read_account_snapshots_from_pg(wf_session) + by_id = {r["account_id"]: r for r in rows} + # ISA: GBP-native, fx=1.0 → market_value_gbp == total_value + assert by_id["acc-isa"]["market_value"] == Decimal("300000") + assert by_id["acc-isa"]["market_value_gbp"] == Decimal("300000") + # Schwab: USD, total_value=800000, fx_rate_to_base=0.80 → £640k + assert by_id["acc-schwab"]["market_value"] == Decimal("800000") + assert by_id["acc-schwab"]["market_value_gbp"] == Decimal("640000.00") + + +async def test_read_passes_through_account_metadata(wf_session: AsyncSession) -> None: + await _seed_basic(wf_session) + rows = await read_account_snapshots_from_pg(wf_session) + by_id = {r["account_id"]: r for r in rows} + assert by_id["acc-isa"]["account_name"] == "ISA" + assert by_id["acc-isa"]["account_type"] == "ISA" + assert by_id["acc-isa"]["currency"] == "GBP" + assert by_id["acc-schwab"]["account_type"] == "BROKERAGE" + assert by_id["acc-schwab"]["currency"] == "USD" + + +async def test_read_yields_external_id_per_snapshot(wf_session: AsyncSession) -> None: + await _seed_basic(wf_session) + rows = await read_account_snapshots_from_pg(wf_session) + ids = sorted(r["external_id"] for r in rows) + assert ids == [ + "wealthfolio:acc-isa:2026-04-25", + "wealthfolio:acc-schwab:2026-04-25", + ] + + +async def test_read_with_explicit_as_of(wf_session: AsyncSession) -> None: + await _seed_basic(wf_session) + rows = await read_account_snapshots_from_pg(wf_session, as_of=date(2026, 4, 24)) + assert len(rows) == 2 + by_id = {r["account_id"]: r for r in rows} + assert by_id["acc-isa"]["snapshot_date"] == date(2026, 4, 24) + assert by_id["acc-isa"]["market_value_gbp"] == Decimal("290000") + + +async def test_read_returns_empty_on_empty_mirror(wf_session: AsyncSession) -> None: + rows = await read_account_snapshots_from_pg(wf_session) + assert rows == [] + + +async def test_read_returns_empty_when_no_rows_on_target_date(wf_session: AsyncSession) -> None: + await _seed_basic(wf_session) + rows = await read_account_snapshots_from_pg(wf_session, as_of=date(2020, 1, 1)) + assert rows == [] + + +async def test_read_handles_null_cost_basis(wf_session: AsyncSession) -> None: + """Real wealthfolio rows can have NULL cost_basis when an account is + too new to have a basis recorded. We surface None, not Decimal('0'). + """ + await wf_session.execute( + text( + """ + INSERT INTO accounts (id, name, account_type, currency, is_active) + VALUES ('acc-new', 'New Account', 'BROKERAGE', 'GBP', 1) + """ + ) + ) + await wf_session.execute( + text( + """ + INSERT INTO daily_account_valuation + (id, account_id, valuation_date, account_currency, base_currency, + fx_rate_to_base, total_value, cost_basis) + VALUES + ('dav-new', 'acc-new', '2026-04-25', 'GBP', 'GBP', 1.0, 1000, NULL) + """ + ) + ) + await wf_session.commit() + rows = await read_account_snapshots_from_pg(wf_session) + assert len(rows) == 1 + assert rows[0]["cost_basis_gbp"] is None + assert rows[0]["market_value_gbp"] == Decimal("1000") + + +async def test_pipeline_pg_to_upsert( + wf_session: AsyncSession, + session: AsyncSession, +) -> None: + """End-to-end: read from mirror, feed to existing upsert, verify + accounts land in fire_planner.account_snapshot.""" + await _seed_basic(wf_session) + rows = await read_account_snapshots_from_pg(wf_session) + n = await upsert_snapshots(session, rows) + await session.commit() + assert n == 2 + + from sqlalchemy import select + persisted = (await session.execute(select(AccountSnapshot))).scalars().all() + assert len(persisted) == 2 + by_id = {p.account_id: p for p in persisted} + assert by_id["acc-isa"].market_value_gbp == Decimal("300000") + assert by_id["acc-schwab"].market_value_gbp == Decimal("640000") + + +@pytest.mark.parametrize( + "fx_rate,total,expected_gbp", + [ + (Decimal("1.00"), Decimal("100"), Decimal("100.00")), + (Decimal("0.80"), Decimal("100"), Decimal("80.00")), + (Decimal("1.25"), Decimal("100"), Decimal("125.00")), + (Decimal("0.79145"), Decimal("12345.67"), Decimal("9770.98")), + ], +) +async def test_fx_conversion_rounds_to_pence( + wf_session: AsyncSession, + fx_rate: Decimal, + total: Decimal, + expected_gbp: Decimal, +) -> None: + await wf_session.execute( + text( + """ + INSERT INTO accounts (id, name, account_type, currency, is_active) + VALUES ('acc-x', 'X', 'BROKERAGE', 'USD', 1) + """ + ) + ) + await wf_session.execute( + text( + """ + INSERT INTO daily_account_valuation + (id, account_id, valuation_date, account_currency, base_currency, + fx_rate_to_base, total_value) + VALUES (:id, 'acc-x', '2026-04-25', 'USD', 'GBP', :fx, :total) + """ + ), + {"id": "dav-x", "fx": float(fx_rate), "total": float(total)}, + ) + await wf_session.commit() + rows = await read_account_snapshots_from_pg(wf_session) + assert rows[0]["market_value_gbp"] == expected_gbp