"""Wealthfolio ingest reads a real-shape sqlite and upserts cleanly.""" from __future__ import annotations import sqlite3 from datetime import date from decimal import Decimal from pathlib import Path import pytest from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from fire_planner.db import AccountSnapshot from fire_planner.ingest.wealthfolio import read_account_snapshots, upsert_snapshots @pytest.fixture def wealthfolio_db(tmp_path: Path) -> Path: """Create a minimal sqlite mimicking Wealthfolio's schema.""" db_path = tmp_path / "wealthfolio.db" conn = sqlite3.connect(db_path) cur = conn.cursor() cur.executescript(""" CREATE TABLE accounts ( id TEXT PRIMARY KEY, name TEXT, type TEXT, currency TEXT ); CREATE TABLE holdings_snapshot ( account_id TEXT, snapshot_date TEXT, symbol TEXT, market_value REAL, market_value_gbp REAL ); INSERT INTO accounts VALUES ('acc-isa', 'ISA', 'ISA', 'GBP'); INSERT INTO accounts VALUES ('acc-schwab', 'Schwab', 'BROKERAGE', 'USD'); INSERT INTO holdings_snapshot VALUES ('acc-isa', '2026-04-25', 'VWRL', 200000, 200000); INSERT INTO holdings_snapshot VALUES ('acc-isa', '2026-04-25', 'BND', 100000, 100000); INSERT INTO holdings_snapshot VALUES ('acc-schwab', '2026-04-25', 'META', 800000, 640000); """) conn.commit() conn.close() return db_path def test_read_groups_holdings_per_account(wealthfolio_db: Path) -> None: rows = read_account_snapshots(wealthfolio_db) assert len(rows) == 2 by_id = {r["account_id"]: r for r in rows} assert by_id["acc-isa"]["market_value_gbp"] == Decimal("300000") assert by_id["acc-schwab"]["market_value_gbp"] == Decimal("640000") assert by_id["acc-isa"]["snapshot_date"] == date(2026, 4, 25) def test_read_returns_empty_on_unknown_schema(tmp_path: Path) -> None: """If the sqlite has a totally different shape, return [] rather than blow up — let the operator surface the warning.""" db = tmp_path / "weird.db" conn = sqlite3.connect(db) conn.execute("CREATE TABLE foo (x INTEGER)") conn.commit() conn.close() assert read_account_snapshots(db) == [] def test_read_missing_file_raises(tmp_path: Path) -> None: with pytest.raises(FileNotFoundError): read_account_snapshots(tmp_path / "nope.db") async def test_upsert_inserts_new_rows(session: AsyncSession, wealthfolio_db: Path) -> None: rows = read_account_snapshots(wealthfolio_db) n = await upsert_snapshots(session, rows) await session.commit() assert n == 2 persisted = (await session.execute(select(AccountSnapshot))).scalars().all() assert len(persisted) == 2 by_id = {p.account_id: p for p in persisted} assert by_id["acc-isa"].market_value_gbp == Decimal("300000") async def test_upsert_is_idempotent(session: AsyncSession, wealthfolio_db: Path) -> None: rows = read_account_snapshots(wealthfolio_db) await upsert_snapshots(session, rows) await session.commit() # Run again — should still be 2 rows, not 4 await upsert_snapshots(session, rows) await session.commit() persisted = (await session.execute(select(AccountSnapshot))).scalars().all() assert len(persisted) == 2 async def test_upsert_zero_rows_is_noop(session: AsyncSession) -> None: n = await upsert_snapshots(session, []) assert n == 0