fire-planner/fire_planner/ingest/wealthfolio.py
Viktor Barzin 23d11bdf6d
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ingest: switch wealthfolio to pg-sync mirror reads
The previous SQLite-direct reader queried `holdings_snapshot` (singular)
and `accounts.type` — both wrong against the live wealthfolio schema
(plural `holdings_snapshots`, column `account_type`). It silently
returned [] via the OperationalError fallback, leaving fire-planner with
stale account snapshots.

Switch to reading from the wealthfolio_sync PG mirror. The pg-sync
sidecar (defined in infra/stacks/wealthfolio) hourly mirrors SQLite to
Postgres with a clean schema. We read from `daily_account_valuation`
which already has total_value, cost_basis, and explicit fx_rate_to_base
per row — no JSON-decoding of position blobs.

CLI ingest no longer takes --db-path (no kubectl-exec gymnastics);
reads WEALTHFOLIO_SYNC_DB_CONNECTION_STRING from env. Falls back to
DB_CONNECTION_STRING for single-DB local dev.

13 new tests covering: latest-per-account, multi-currency FX, explicit
as-of, empty mirror, null cost_basis, full pipeline through upsert.
140 tests pass; mypy strict + ruff clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-09 21:33:48 +00:00

45 lines
1.6 KiB
Python

"""Upsert helper for wealthfolio account snapshots.
The actual read happens in `wealthfolio_pg.py` (against the
`wealthfolio_sync` PG mirror). This module keeps the upsert helper that
both prod and tests use, so callers can:
rows = await read_account_snapshots_from_pg(wf_session)
await upsert_snapshots(session, rows)
"""
from __future__ import annotations
from typing import Any
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy.ext.asyncio import AsyncSession
from fire_planner.db import AccountSnapshot
def _dialect_insert(session: AsyncSession) -> Any:
bind = session.get_bind()
if bind.dialect.name == "sqlite":
return sqlite_insert
return pg_insert
async def upsert_snapshots(session: AsyncSession, rows: list[dict[str, Any]]) -> int:
"""Idempotent upsert on `external_id` (one row per account per day)."""
if not rows:
return 0
insert_ = _dialect_insert(session)
stmt = insert_(AccountSnapshot).values(rows)
update_cols = {
"market_value": stmt.excluded.market_value,
"market_value_gbp": stmt.excluded.market_value_gbp,
"snapshot_date": stmt.excluded.snapshot_date,
"account_name": stmt.excluded.account_name,
"account_type": stmt.excluded.account_type,
"currency": stmt.excluded.currency,
"cost_basis_gbp": stmt.excluded.cost_basis_gbp,
}
stmt = stmt.on_conflict_do_update(index_elements=["external_id"], set_=update_cols)
await session.execute(stmt)
return len(rows)