126 lines
4.8 KiB
Python
126 lines
4.8 KiB
Python
"""Wealthfolio ingest — kubectl exec into the wealthfolio pod, read the
|
|
SQLite DB read-only, parse account snapshots, upsert into
|
|
`fire_planner.account_snapshot`.
|
|
|
|
Wealthfolio stores every account's NW + holdings in
|
|
`/data/app.db` (SQLite). The published schema (post-2025) keeps a
|
|
`holdings_snapshot` table per (account_id, date). For the planner we
|
|
fold to total NW per account per day.
|
|
|
|
Phase 0 prerequisite: `wealthfolio-sync` must record a snapshot for
|
|
every active account every day. Until that lands the Schwab and
|
|
InvestEngine accounts read as stale snapshots from years ago and the
|
|
planner anchors on £154k instead of the real ~£1M. See
|
|
`fire-planner/README.md` and the parent CLAUDE.md project memory.
|
|
|
|
This module does NOT shell out to kubectl — that's the operator's job.
|
|
Instead, callers pass an already-fetched local SQLite file path
|
|
(typically `/tmp/wealthfolio.db`). The CLI wraps the kubectl exec.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
from datetime import date
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from fire_planner.db import AccountSnapshot
|
|
|
|
|
|
def _dialect_insert(session: AsyncSession) -> Any:
|
|
bind = session.get_bind()
|
|
if bind.dialect.name == "sqlite":
|
|
return sqlite_insert
|
|
return pg_insert
|
|
|
|
|
|
def read_account_snapshots(db_path: str | Path, as_of: date | None = None) -> list[dict[str, Any]]:
|
|
"""Read the latest snapshot row per account.
|
|
|
|
Returns a list of dicts ready for upsert into `account_snapshot`.
|
|
Each dict has: external_id, snapshot_date, account_id, account_name,
|
|
account_type, currency, market_value, market_value_gbp.
|
|
"""
|
|
db_path = Path(db_path)
|
|
if not db_path.exists():
|
|
raise FileNotFoundError(f"Wealthfolio sqlite db not found: {db_path}")
|
|
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
rows = list(_query_snapshots(conn, as_of))
|
|
finally:
|
|
conn.close()
|
|
return rows
|
|
|
|
|
|
def _query_snapshots(conn: sqlite3.Connection, as_of: date | None) -> list[dict[str, Any]]:
|
|
"""Wealthfolio's actual schema is opaque (different versions ship
|
|
different tables). We try the v1 layout first (`accounts` +
|
|
`holdings_snapshot`); if that fails, return empty and let the CLI
|
|
surface the error to the operator.
|
|
"""
|
|
cur = conn.cursor()
|
|
try:
|
|
if as_of is None:
|
|
cur.execute("SELECT MAX(snapshot_date) FROM holdings_snapshot", )
|
|
row = cur.fetchone()
|
|
as_of_str = row[0] if row and row[0] else date.today().isoformat()
|
|
else:
|
|
as_of_str = as_of.isoformat()
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT a.id AS account_id,
|
|
a.name AS account_name,
|
|
a.type AS account_type,
|
|
a.currency AS currency,
|
|
SUM(h.market_value) AS market_value,
|
|
SUM(h.market_value_gbp) AS market_value_gbp,
|
|
? AS snapshot_date
|
|
FROM holdings_snapshot h
|
|
JOIN accounts a ON a.id = h.account_id
|
|
WHERE h.snapshot_date = ?
|
|
GROUP BY a.id
|
|
""",
|
|
(as_of_str, as_of_str),
|
|
)
|
|
except sqlite3.OperationalError:
|
|
# Fallback: empty list — the operator should run wealthfolio-sync
|
|
# to populate snapshots and try again.
|
|
return []
|
|
rows = []
|
|
for row in cur.fetchall():
|
|
snap_date = date.fromisoformat(row["snapshot_date"])
|
|
rows.append({
|
|
"external_id": f"wealthfolio:{row['account_id']}:{row['snapshot_date']}",
|
|
"snapshot_date": snap_date,
|
|
"account_id": str(row["account_id"]),
|
|
"account_name": row["account_name"] or "",
|
|
"account_type": row["account_type"] or "unknown",
|
|
"currency": row["currency"] or "GBP",
|
|
"market_value": Decimal(str(row["market_value"] or 0)),
|
|
"market_value_gbp": Decimal(str(row["market_value_gbp"] or 0)),
|
|
})
|
|
return rows
|
|
|
|
|
|
async def upsert_snapshots(session: AsyncSession, rows: list[dict[str, Any]]) -> int:
|
|
if not rows:
|
|
return 0
|
|
insert_ = _dialect_insert(session)
|
|
stmt = insert_(AccountSnapshot).values(rows)
|
|
update_cols = {
|
|
"market_value": stmt.excluded.market_value,
|
|
"market_value_gbp": stmt.excluded.market_value_gbp,
|
|
"snapshot_date": stmt.excluded.snapshot_date,
|
|
"account_name": stmt.excluded.account_name,
|
|
"account_type": stmt.excluded.account_type,
|
|
}
|
|
stmt = stmt.on_conflict_do_update(index_elements=["external_id"], set_=update_cols)
|
|
await session.execute(stmt)
|
|
return len(rows)
|