ingest: switch wealthfolio to pg-sync mirror reads
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

The previous SQLite-direct reader queried `holdings_snapshot` (singular)
and `accounts.type` — both wrong against the live wealthfolio schema
(plural `holdings_snapshots`, column `account_type`). It silently
returned [] via the OperationalError fallback, leaving fire-planner with
stale account snapshots.

Switch to reading from the wealthfolio_sync PG mirror. The pg-sync
sidecar (defined in infra/stacks/wealthfolio) hourly mirrors SQLite to
Postgres with a clean schema. We read from `daily_account_valuation`
which already has total_value, cost_basis, and explicit fx_rate_to_base
per row — no JSON-decoding of position blobs.

CLI ingest no longer takes --db-path (no kubectl-exec gymnastics);
reads WEALTHFOLIO_SYNC_DB_CONNECTION_STRING from env. Falls back to
DB_CONNECTION_STRING for single-DB local dev.

13 new tests covering: latest-per-account, multi-currency FX, explicit
as-of, empty mirror, null cost_basis, full pipeline through upsert.
140 tests pass; mypy strict + ruff clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-05-09 21:33:48 +00:00
parent 8880bd377f
commit 23d11bdf6d
5 changed files with 432 additions and 203 deletions

View file

@ -2,7 +2,7 @@
Sub-commands:
- migrate alembic upgrade head
- ingest [wealthfolio] load wealthfolio sqlite into account_snapshot
- ingest [wealthfolio] pull from wealthfolio_sync PG mirror into account_snapshot
- simulate run a single scenario, pretty-print
- recompute-all run the 120-scenario Cartesian, persist all
- serve run the FastAPI on-demand /recompute server
@ -21,10 +21,15 @@ from pathlib import Path
import click
import numpy as np
from sqlalchemy.ext.asyncio import async_sessionmaker
from fire_planner.db import create_engine_from_env, make_session_factory
from fire_planner.glide_path import get as get_glide
from fire_planner.ingest import wealthfolio as wf_ingest
from fire_planner.ingest.wealthfolio import upsert_snapshots
from fire_planner.ingest.wealthfolio_pg import (
create_wf_sync_engine_from_env,
read_account_snapshots_from_pg,
)
from fire_planner.reporters.cli import format_scenario
from fire_planner.reporters.pg import write_run
from fire_planner.returns.bootstrap import block_bootstrap
@ -57,32 +62,38 @@ def migrate() -> None:
type=click.Choice(["wealthfolio"]),
default="wealthfolio",
help="Data source — currently only wealthfolio is wired.")
@click.option("--db-path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
required=False,
help="Local sqlite path (after kubectl exec). Required for --source=wealthfolio.")
@click.option("--as-of",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="Snapshot date to read; defaults to MAX(snapshot_date) in the sqlite.")
def ingest(source: str, db_path: Path | None, as_of: date | None) -> None:
help="Valuation date to read; defaults to MAX(valuation_date) in the mirror.")
def ingest(source: str, as_of: date | None) -> None:
"""Pull external state into fire_planner.account_snapshot."""
if source == "wealthfolio":
if db_path is None:
raise click.UsageError("--db-path is required for --source=wealthfolio")
asyncio.run(_ingest_wealthfolio(db_path, as_of))
asyncio.run(_ingest_wealthfolio(as_of))
async def _ingest_wealthfolio(db_path: Path, as_of: date | None) -> None:
rows = wf_ingest.read_account_snapshots(db_path, as_of=as_of)
async def _ingest_wealthfolio(as_of: date | None) -> None:
"""Read account snapshots from wealthfolio_sync PG mirror, upsert."""
wf_engine = create_wf_sync_engine_from_env()
try:
wf_factory = async_sessionmaker(wf_engine, expire_on_commit=False)
async with wf_factory() as wf_sess:
rows = await read_account_snapshots_from_pg(wf_sess, as_of=as_of)
finally:
await wf_engine.dispose()
if not rows:
click.echo("warning: no rows read — wealthfolio sqlite empty or schema unrecognised",
err=True)
click.echo(
"warning: no rows read — wealthfolio_sync mirror is empty or "
"no rows on the requested date",
err=True,
)
engine = create_engine_from_env()
factory = make_session_factory(engine)
try:
async with factory() as sess:
n = await wf_ingest.upsert_snapshots(sess, rows)
n = await upsert_snapshots(sess, rows)
await sess.commit()
click.echo(f"wealthfolio ingest: {n} rows upserted")
finally:

View file

@ -1,28 +1,14 @@
"""Wealthfolio ingest — kubectl exec into the wealthfolio pod, read the
SQLite DB read-only, parse account snapshots, upsert into
`fire_planner.account_snapshot`.
"""Upsert helper for wealthfolio account snapshots.
Wealthfolio stores every account's NW + holdings in
`/data/app.db` (SQLite). The published schema (post-2025) keeps a
`holdings_snapshot` table per (account_id, date). For the planner we
fold to total NW per account per day.
The actual read happens in `wealthfolio_pg.py` (against the
`wealthfolio_sync` PG mirror). This module keeps the upsert helper that
both prod and tests use, so callers can:
Phase 0 prerequisite: `wealthfolio-sync` must record a snapshot for
every active account every day. Until that lands the Schwab and
InvestEngine accounts read as stale snapshots from years ago and the
planner anchors on £154k instead of the real ~£1M. See
`fire-planner/README.md` and the parent CLAUDE.md project memory.
This module does NOT shell out to kubectl that's the operator's job.
Instead, callers pass an already-fetched local SQLite file path
(typically `/tmp/wealthfolio.db`). The CLI wraps the kubectl exec.
rows = await read_account_snapshots_from_pg(wf_session)
await upsert_snapshots(session, rows)
"""
from __future__ import annotations
import sqlite3
from datetime import date
from decimal import Decimal
from pathlib import Path
from typing import Any
from sqlalchemy.dialects.postgresql import insert as pg_insert
@ -39,77 +25,8 @@ def _dialect_insert(session: AsyncSession) -> Any:
return pg_insert
def read_account_snapshots(db_path: str | Path, as_of: date | None = None) -> list[dict[str, Any]]:
"""Read the latest snapshot row per account.
Returns a list of dicts ready for upsert into `account_snapshot`.
Each dict has: external_id, snapshot_date, account_id, account_name,
account_type, currency, market_value, market_value_gbp.
"""
db_path = Path(db_path)
if not db_path.exists():
raise FileNotFoundError(f"Wealthfolio sqlite db not found: {db_path}")
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
try:
rows = list(_query_snapshots(conn, as_of))
finally:
conn.close()
return rows
def _query_snapshots(conn: sqlite3.Connection, as_of: date | None) -> list[dict[str, Any]]:
"""Wealthfolio's actual schema is opaque (different versions ship
different tables). We try the v1 layout first (`accounts` +
`holdings_snapshot`); if that fails, return empty and let the CLI
surface the error to the operator.
"""
cur = conn.cursor()
try:
if as_of is None:
cur.execute("SELECT MAX(snapshot_date) FROM holdings_snapshot", )
row = cur.fetchone()
as_of_str = row[0] if row and row[0] else date.today().isoformat()
else:
as_of_str = as_of.isoformat()
cur.execute(
"""
SELECT a.id AS account_id,
a.name AS account_name,
a.type AS account_type,
a.currency AS currency,
SUM(h.market_value) AS market_value,
SUM(h.market_value_gbp) AS market_value_gbp,
? AS snapshot_date
FROM holdings_snapshot h
JOIN accounts a ON a.id = h.account_id
WHERE h.snapshot_date = ?
GROUP BY a.id
""",
(as_of_str, as_of_str),
)
except sqlite3.OperationalError:
# Fallback: empty list — the operator should run wealthfolio-sync
# to populate snapshots and try again.
return []
rows = []
for row in cur.fetchall():
snap_date = date.fromisoformat(row["snapshot_date"])
rows.append({
"external_id": f"wealthfolio:{row['account_id']}:{row['snapshot_date']}",
"snapshot_date": snap_date,
"account_id": str(row["account_id"]),
"account_name": row["account_name"] or "",
"account_type": row["account_type"] or "unknown",
"currency": row["currency"] or "GBP",
"market_value": Decimal(str(row["market_value"] or 0)),
"market_value_gbp": Decimal(str(row["market_value_gbp"] or 0)),
})
return rows
async def upsert_snapshots(session: AsyncSession, rows: list[dict[str, Any]]) -> int:
"""Idempotent upsert on `external_id` (one row per account per day)."""
if not rows:
return 0
insert_ = _dialect_insert(session)
@ -120,6 +37,8 @@ async def upsert_snapshots(session: AsyncSession, rows: list[dict[str, Any]]) ->
"snapshot_date": stmt.excluded.snapshot_date,
"account_name": stmt.excluded.account_name,
"account_type": stmt.excluded.account_type,
"currency": stmt.excluded.currency,
"cost_basis_gbp": stmt.excluded.cost_basis_gbp,
}
stmt = stmt.on_conflict_do_update(index_elements=["external_id"], set_=update_cols)
await session.execute(stmt)

View file

@ -0,0 +1,115 @@
"""Wealthfolio ingest from the wealthfolio_sync Postgres mirror.
The wealthfolio pod runs a `pg-sync` sidecar that hourly snapshots its
local SQLite to the `wealthfolio_sync` PG database. Reading from that
mirror (rather than the live SQLite via kubectl-exec) is more reliable:
no cross-pod copy, no schema-mismatch swallowing, multi-process safe,
and Grafana already trusts it.
Source schema (defined in `infra/stacks/wealthfolio/main.tf`):
accounts(id, name, account_type, currency, is_active)
daily_account_valuation(id, account_id, valuation_date,
account_currency, base_currency,
fx_rate_to_base,
cash_balance, investment_market_value,
total_value, cost_basis, net_contribution)
`daily_account_valuation` is the right table for time-series NW per
account `total_value` is in `account_currency`; multiply by
`fx_rate_to_base` to get the user's base (typically GBP). The mirror
filters out the synthetic `'TOTAL'` account_id so we never see it here.
"""
from __future__ import annotations
import os
from datetime import date
from decimal import ROUND_HALF_EVEN, Decimal
from typing import Any
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
_TWO_PLACES = Decimal("0.01")
def create_wf_sync_engine_from_env() -> AsyncEngine:
"""Engine for the wealthfolio_sync mirror DB.
Reads `WEALTHFOLIO_SYNC_DB_CONNECTION_STRING`; falls back to
`DB_CONNECTION_STRING` for single-DB local dev. Raises if neither
is set so callers fail loudly instead of silently using the wrong DB.
"""
url = (
os.environ.get("WEALTHFOLIO_SYNC_DB_CONNECTION_STRING")
or os.environ.get("DB_CONNECTION_STRING")
)
if not url:
raise RuntimeError(
"WEALTHFOLIO_SYNC_DB_CONNECTION_STRING (or DB_CONNECTION_STRING) "
"must be set to read the wealthfolio_sync mirror"
)
return create_async_engine(url, pool_pre_ping=True)
async def read_account_snapshots_from_pg(
wf_session: AsyncSession,
as_of: date | None = None,
) -> list[dict[str, Any]]:
"""Read latest NW per account from `daily_account_valuation`.
Returns a list of dicts with the same shape as
`wealthfolio.read_account_snapshots()`, so `upsert_snapshots()`
accepts either source.
If `as_of` is None, picks the max `valuation_date` in the mirror
(i.e. "latest"). If `as_of` is given, returns rows for that exact
date; if no rows exist, returns [].
"""
if as_of is None:
latest = (await wf_session.execute(
text("SELECT MAX(valuation_date) FROM daily_account_valuation"))).scalar()
if latest is None:
return []
target = latest if isinstance(latest, date) else date.fromisoformat(str(latest))
else:
target = as_of
rows = (await wf_session.execute(
text("""
SELECT a.id AS account_id,
a.name AS account_name,
a.account_type AS account_type,
COALESCE(d.account_currency, a.currency, 'GBP') AS currency,
d.total_value AS total_value,
d.cost_basis AS cost_basis,
COALESCE(d.fx_rate_to_base, 1.0) AS fx_rate_to_base,
d.valuation_date AS snapshot_date
FROM daily_account_valuation d
JOIN accounts a ON a.id = d.account_id
WHERE d.valuation_date = :as_of
"""), {"as_of": target})).mappings().all()
out: list[dict[str, Any]] = []
for r in rows:
snapshot_date = (r["snapshot_date"] if isinstance(r["snapshot_date"], date) else
date.fromisoformat(str(r["snapshot_date"])))
total_value = Decimal(str(r["total_value"] or 0))
fx = Decimal(str(r["fx_rate_to_base"]))
market_value_gbp = (total_value * fx).quantize(_TWO_PLACES, rounding=ROUND_HALF_EVEN)
cost_basis_gbp: Decimal | None = None
if r["cost_basis"] is not None:
cost_basis_gbp = (Decimal(str(r["cost_basis"])) *
fx).quantize(_TWO_PLACES, rounding=ROUND_HALF_EVEN)
out.append({
"external_id": f"wealthfolio:{r['account_id']}:{snapshot_date.isoformat()}",
"snapshot_date": snapshot_date,
"account_id": str(r["account_id"]),
"account_name": r["account_name"] or "",
"account_type": r["account_type"] or "unknown",
"currency": r["currency"] or "GBP",
"market_value": total_value,
"market_value_gbp": market_value_gbp,
"cost_basis_gbp": cost_basis_gbp,
})
return out