All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
The previous SQLite-direct reader queried `holdings_snapshot` (singular) and `accounts.type` — both wrong against the live wealthfolio schema (plural `holdings_snapshots`, column `account_type`). It silently returned [] via the OperationalError fallback, leaving fire-planner with stale account snapshots. Switch to reading from the wealthfolio_sync PG mirror. The pg-sync sidecar (defined in infra/stacks/wealthfolio) hourly mirrors SQLite to Postgres with a clean schema. We read from `daily_account_valuation` which already has total_value, cost_basis, and explicit fx_rate_to_base per row — no JSON-decoding of position blobs. CLI ingest no longer takes --db-path (no kubectl-exec gymnastics); reads WEALTHFOLIO_SYNC_DB_CONNECTION_STRING from env. Falls back to DB_CONNECTION_STRING for single-DB local dev. 13 new tests covering: latest-per-account, multi-currency FX, explicit as-of, empty mirror, null cost_basis, full pipeline through upsert. 140 tests pass; mypy strict + ruff clean. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
115 lines
4.7 KiB
Python
115 lines
4.7 KiB
Python
"""Wealthfolio ingest from the wealthfolio_sync Postgres mirror.
|
|
|
|
The wealthfolio pod runs a `pg-sync` sidecar that hourly snapshots its
|
|
local SQLite to the `wealthfolio_sync` PG database. Reading from that
|
|
mirror (rather than the live SQLite via kubectl-exec) is more reliable:
|
|
no cross-pod copy, no schema-mismatch swallowing, multi-process safe,
|
|
and Grafana already trusts it.
|
|
|
|
Source schema (defined in `infra/stacks/wealthfolio/main.tf`):
|
|
|
|
accounts(id, name, account_type, currency, is_active)
|
|
daily_account_valuation(id, account_id, valuation_date,
|
|
account_currency, base_currency,
|
|
fx_rate_to_base,
|
|
cash_balance, investment_market_value,
|
|
total_value, cost_basis, net_contribution)
|
|
|
|
`daily_account_valuation` is the right table for time-series NW per
|
|
account — `total_value` is in `account_currency`; multiply by
|
|
`fx_rate_to_base` to get the user's base (typically GBP). The mirror
|
|
filters out the synthetic `'TOTAL'` account_id so we never see it here.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from datetime import date
|
|
from decimal import ROUND_HALF_EVEN, Decimal
|
|
from typing import Any
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
|
|
|
|
_TWO_PLACES = Decimal("0.01")
|
|
|
|
|
|
def create_wf_sync_engine_from_env() -> AsyncEngine:
|
|
"""Engine for the wealthfolio_sync mirror DB.
|
|
|
|
Reads `WEALTHFOLIO_SYNC_DB_CONNECTION_STRING`; falls back to
|
|
`DB_CONNECTION_STRING` for single-DB local dev. Raises if neither
|
|
is set so callers fail loudly instead of silently using the wrong DB.
|
|
"""
|
|
url = (
|
|
os.environ.get("WEALTHFOLIO_SYNC_DB_CONNECTION_STRING")
|
|
or os.environ.get("DB_CONNECTION_STRING")
|
|
)
|
|
if not url:
|
|
raise RuntimeError(
|
|
"WEALTHFOLIO_SYNC_DB_CONNECTION_STRING (or DB_CONNECTION_STRING) "
|
|
"must be set to read the wealthfolio_sync mirror"
|
|
)
|
|
return create_async_engine(url, pool_pre_ping=True)
|
|
|
|
|
|
async def read_account_snapshots_from_pg(
|
|
wf_session: AsyncSession,
|
|
as_of: date | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Read latest NW per account from `daily_account_valuation`.
|
|
|
|
Returns a list of dicts with the same shape as
|
|
`wealthfolio.read_account_snapshots()`, so `upsert_snapshots()`
|
|
accepts either source.
|
|
|
|
If `as_of` is None, picks the max `valuation_date` in the mirror
|
|
(i.e. "latest"). If `as_of` is given, returns rows for that exact
|
|
date; if no rows exist, returns [].
|
|
"""
|
|
if as_of is None:
|
|
latest = (await wf_session.execute(
|
|
text("SELECT MAX(valuation_date) FROM daily_account_valuation"))).scalar()
|
|
if latest is None:
|
|
return []
|
|
target = latest if isinstance(latest, date) else date.fromisoformat(str(latest))
|
|
else:
|
|
target = as_of
|
|
|
|
rows = (await wf_session.execute(
|
|
text("""
|
|
SELECT a.id AS account_id,
|
|
a.name AS account_name,
|
|
a.account_type AS account_type,
|
|
COALESCE(d.account_currency, a.currency, 'GBP') AS currency,
|
|
d.total_value AS total_value,
|
|
d.cost_basis AS cost_basis,
|
|
COALESCE(d.fx_rate_to_base, 1.0) AS fx_rate_to_base,
|
|
d.valuation_date AS snapshot_date
|
|
FROM daily_account_valuation d
|
|
JOIN accounts a ON a.id = d.account_id
|
|
WHERE d.valuation_date = :as_of
|
|
"""), {"as_of": target})).mappings().all()
|
|
|
|
out: list[dict[str, Any]] = []
|
|
for r in rows:
|
|
snapshot_date = (r["snapshot_date"] if isinstance(r["snapshot_date"], date) else
|
|
date.fromisoformat(str(r["snapshot_date"])))
|
|
total_value = Decimal(str(r["total_value"] or 0))
|
|
fx = Decimal(str(r["fx_rate_to_base"]))
|
|
market_value_gbp = (total_value * fx).quantize(_TWO_PLACES, rounding=ROUND_HALF_EVEN)
|
|
cost_basis_gbp: Decimal | None = None
|
|
if r["cost_basis"] is not None:
|
|
cost_basis_gbp = (Decimal(str(r["cost_basis"])) *
|
|
fx).quantize(_TWO_PLACES, rounding=ROUND_HALF_EVEN)
|
|
out.append({
|
|
"external_id": f"wealthfolio:{r['account_id']}:{snapshot_date.isoformat()}",
|
|
"snapshot_date": snapshot_date,
|
|
"account_id": str(r["account_id"]),
|
|
"account_name": r["account_name"] or "",
|
|
"account_type": r["account_type"] or "unknown",
|
|
"currency": r["currency"] or "GBP",
|
|
"market_value": total_value,
|
|
"market_value_gbp": market_value_gbp,
|
|
"cost_basis_gbp": cost_basis_gbp,
|
|
})
|
|
return out
|