diff --git a/broker_sync/cli.py b/broker_sync/cli.py index b5ce4c2..385fd01 100644 --- a/broker_sync/cli.py +++ b/broker_sync/cli.py @@ -438,6 +438,15 @@ def fidelity_ingest( sys.exit(2) async def _run() -> None: + from datetime import date as _date_t + + from broker_sync.providers.fidelity_planviewer import ( + ACCOUNT_ID as FID_ACCOUNT_ID, + ) + from broker_sync.providers.fidelity_planviewer import ( + fidelity_holdings_to_snapshot, + ) + sink = WealthfolioSink( base_url=wf_base_url, username=wf_username, @@ -455,12 +464,31 @@ def fidelity_ingest( result = await sync_provider_to_wealthfolio( provider=provider, sink=sink, dedup=dedup, since=since, ) + # PlanViewer has no historical per-fund unit-price feed, so + # the Activity stream above only carries cash flows. The + # current-pot fund positions captured in the same scrape get + # pushed via /api/v1/snapshots/import so per-fund quantity + + # cost basis land in WF (and propagate to the wealth + # dashboard's Positions table via pg-sync). + snapshot_imported = 0 + if provider.last_holdings: + snapshot = fidelity_holdings_to_snapshot( + holdings=provider.last_holdings, + total_real_contribution=provider.last_total_contribution, + as_of=_date_t.today(), + ) + if snapshot is not None: + push_result = await sink.push_manual_snapshots( + account_id=FID_ACCOUNT_ID, snapshots=[snapshot], + ) + snapshot_imported = int(push_result.get("snapshotsImported", 0)) finally: await sink.close() typer.echo(f"fidelity-ingest: fetched={result.fetched} " f"new={result.new_after_dedup} " f"imported={result.imported} " - f"failed={result.failed}") + f"failed={result.failed} " + f"snapshots={snapshot_imported}") if result.failed > 0: sys.exit(1) diff --git a/broker_sync/providers/fidelity_planviewer.py b/broker_sync/providers/fidelity_planviewer.py index e201ac8..4658dcf 100644 --- a/broker_sync/providers/fidelity_planviewer.py +++ b/broker_sync/providers/fidelity_planviewer.py @@ -16,21 +16,28 @@ We keep a Playwright-maintained session via ``storage_state.json``: fund holdings. On 401/idle-timeout we raise :class:`FidelitySessionError` so Prometheus alerts Viktor to re-seed. -## Emitted Activity shape +## Emitted Activity / snapshot shape - One ``DEPOSIT`` per cash-impacting transaction (Regular Premium, Single Premium, rebate, etc.). ``external_id = fidelity:tx:``. -- One synthetic ``DEPOSIT`` for unrealised gains so WF's Net Worth matches - the Fidelity dashboard. ``external_id = - fidelity:gains:``. - Bulk Switches / Fund Switches are skipped (no cash movement). +- After the activity stream drains, the ``fidelity-ingest`` CLI calls + ``WealthfolioSink.push_manual_snapshots`` with one ``ManualSnapshotPayload`` + per fund holding (today's date, units + cost basis allocated + proportionally to fund value share). This sets per-fund quantity and + cost basis in WF so the dashboard Positions table shows the pension + funds alongside the brokerage assets. +- The old synthetic ``fidelity:gains:`` DEPOSIT is no longer + emitted — the snapshot supersedes it. Old offset rows that landed + before this change are corrected at the dashboard layer by the + ``dav_corrected`` PG view (``infra/stacks/wealthfolio/main.tf``). """ from __future__ import annotations import contextlib import logging from collections.abc import AsyncIterator -from datetime import UTC, datetime +from datetime import date, datetime from decimal import Decimal from pathlib import Path from typing import Any, NamedTuple @@ -42,6 +49,7 @@ from broker_sync.providers.parsers.fidelity import ( parse_transactions_html, parse_valuation_json, ) +from broker_sync.sinks.wealthfolio import ManualSnapshotPayload, SnapshotPosition log = logging.getLogger(__name__) @@ -86,37 +94,6 @@ def _tx_to_activity(tx: FidelityCashTx) -> Activity: ) -def _gains_offset_activity( - holdings: list[FidelityHolding], - transactions: list[FidelityCashTx], - as_of: datetime, -) -> Activity | None: - """Create a synthetic DEPOSIT/WITHDRAWAL so WF Net Worth matches the - Fidelity dashboard's reported pot value. - - The offset carries a date-derived external_id so monthly runs refresh - the same synthetic entry rather than stacking duplicates. - """ - if not holdings: - return None - total_value = sum((h.total_value for h in holdings), Decimal(0)) - total_contrib = sum((t.amount for t in transactions), Decimal(0)) - gains = total_value - total_contrib - if gains == 0: - return None - return Activity( - external_id=f"fidelity:gains:{as_of.date().isoformat()}", - account_id=ACCOUNT_ID, - account_type=AccountType.WORKPLACE_PENSION, - date=as_of, - activity_type=ActivityType.DEPOSIT if gains > 0 else ActivityType.WITHDRAWAL, - currency=_CCY, - amount=abs(gains), - notes=(f"fidelity-planviewer:unrealised-gains-offset " - f"(pot=£{total_value}, contrib=£{total_contrib})"), - ) - - class FidelityPlanViewerProvider: """Read-only provider against Fidelity UK PlanViewer. @@ -125,11 +102,18 @@ class FidelityPlanViewerProvider: - ``fetch(since, before)`` opens a Playwright session with the saved storage_state, navigates to the transaction-history page with a wide date range, scrapes the table, and intercepts the valuation XHR. + - After ``fetch()`` completes, ``last_holdings`` holds the per-fund + unit positions captured in the same scrape — used by the + ``fidelity-ingest`` CLI to push a manual snapshot to Wealthfolio + so per-fund quantities + cost basis land in the Positions table + (the activity stream alone only carries cash flows). """ name = "fidelity-planviewer" def __init__(self, creds: FidelityCreds) -> None: self._creds = creds + self.last_holdings: list[FidelityHolding] = [] + self.last_total_contribution: Decimal = Decimal(0) def accounts(self) -> list[Account]: return [ @@ -162,19 +146,72 @@ class FidelityPlanViewerProvider: log.info("fidelity: parsed %d transactions, %d holdings", len(transactions), len(holdings)) + # Snapshot the per-fund holdings for the CLI to push as a manual + # holdings_snapshot after this generator drains. Wealthfolio's + # activity model can't represent pension fund unit purchases (no + # per-purchase price feed from PlanViewer), so we record current + # state via /api/v1/snapshots/import instead. + self.last_holdings = holdings + self.last_total_contribution = sum( + (t.amount for t in transactions), Decimal(0) + ) + for tx in transactions: if since is not None and tx.date < since: continue if before is not None and tx.date >= before: continue yield _tx_to_activity(tx) + # NB: the gains-offset DEPOSIT we used to emit here is superseded + # by the manual snapshot push the CLI does after fetch() drains. + # The snapshot sets per-fund quantity + cost basis directly, so + # Wealthfolio computes growth from positions instead of needing a + # fake cash entry. Old offset rows still in WF are corrected at + # the dashboard layer by the dav_corrected view. - # The gains offset is always "as of now" so it reflects today's pot. - # Only emit when the caller isn't windowing (full state). - if since is None and before is None: - offset = _gains_offset_activity(holdings, transactions, datetime.now(UTC)) - if offset is not None: - yield offset + +def fidelity_holdings_to_snapshot( + holdings: list[FidelityHolding], + total_real_contribution: Decimal, + as_of: date, +) -> ManualSnapshotPayload | None: + """Convert scraped holdings into a Wealthfolio manual snapshot payload. + + Cost-basis allocation: PlanViewer doesn't expose historical purchase + prices for individual fund unit buys, so we approximate per-fund + cost basis by allocating the cumulative cash contribution + proportionally to each fund's share of the current pot value. For + the typical single-fund Meta scheme this is exact; if Viktor's plan + later splits into multiple funds the proportional split is the + least-wrong allocation we can compute from monthly snapshots. + + cashBalances is set to zero — pension contributions flow straight + into funds, the synthetic Wealthfolio "cash balance" only existed + because of the old gains-offset DEPOSIT hack. + """ + if not holdings: + return None + total_value = sum((h.total_value for h in holdings), Decimal(0)) + if total_value <= 0: + return None + positions: list[SnapshotPosition] = [] + for h in holdings: + share = h.total_value / total_value + cost = (total_real_contribution * share).quantize(Decimal("0.01")) + avg_cost = (cost / h.units).quantize(Decimal("0.0001")) if h.units > 0 else Decimal(0) + positions.append(SnapshotPosition( + symbol=h.fund_code, + quantity=h.units, + average_cost=avg_cost, + total_cost_basis=cost, + currency=h.currency, + )) + return ManualSnapshotPayload( + date=as_of, + currency=_CCY, + positions=positions, + cash_balances={_CCY: Decimal(0)}, + ) async def _scrape_live_session( diff --git a/broker_sync/sinks/wealthfolio.py b/broker_sync/sinks/wealthfolio.py index efbd50c..cb6ea45 100644 --- a/broker_sync/sinks/wealthfolio.py +++ b/broker_sync/sinks/wealthfolio.py @@ -2,7 +2,9 @@ from __future__ import annotations import json from collections.abc import Iterable -from datetime import UTC +from dataclasses import dataclass +from datetime import UTC, date +from decimal import Decimal from pathlib import Path from typing import Any @@ -14,6 +16,7 @@ _LOGIN_PATH = "/api/v1/auth/login" _ACCOUNTS_PATH = "/api/v1/accounts" _IMPORT_CHECK = "/api/v1/activities/import/check" _IMPORT_REAL = "/api/v1/activities/import" +_SNAPSHOTS_IMPORT = "/api/v1/snapshots/import" class WealthfolioError(Exception): @@ -262,3 +265,83 @@ class WealthfolioSink: f"First warning: {first_warn}") assert isinstance(got, list) return [r for r in got if isinstance(r, dict)] + + # -- manual holdings snapshots -- + + async def push_manual_snapshots( + self, + account_id: str, + snapshots: list[ManualSnapshotPayload], + ) -> dict[str, Any]: + """Push manual holdings snapshots to /api/v1/snapshots/import. + + Each snapshot carries a date + per-fund positions + cash balances. + Wealthfolio auto-creates any unknown asset symbol with + ``kind=INVESTMENT, quoteMode=MANUAL, quoteCcy=`` and uses + the snapshot to derive holdings + valuation for that date — bypassing + the activity-ledger derivation entirely for the targeted day. + + Used by the Fidelity provider since PlanViewer exposes current + fund units + price but no per-trade history. Re-imports for the + same (account, date) overwrite in place. + """ + if not snapshots: + return {"snapshotsImported": 0, "snapshotsFailed": 0, "errors": []} + body = { + "accountId": account_id, + "snapshots": [_snapshot_to_payload(s) for s in snapshots], + } + resp = await self._request("POST", _SNAPSHOTS_IMPORT, json=body) + if resp.status_code >= 400: + try: + payload = resp.json() + except Exception: + payload = {"raw": resp.text} + raise WealthfolioError( + f"Wealthfolio /snapshots/import rejected: {payload}") + result = resp.json() + assert isinstance(result, dict) + failed = int(result.get("snapshotsFailed", 0)) + if failed > 0: + raise WealthfolioError( + f"Wealthfolio /snapshots/import: {failed} snapshot(s) failed; " + f"errors={result.get('errors')}") + return result + + +@dataclass(frozen=True) +class SnapshotPosition: + """A per-fund position row in a Wealthfolio manual snapshot.""" + symbol: str + quantity: Decimal + average_cost: Decimal + total_cost_basis: Decimal + currency: str + + +@dataclass(frozen=True) +class ManualSnapshotPayload: + """Sink-facing snapshot row. Mirrors the JSON shape WF expects.""" + date: date + currency: str + positions: list[SnapshotPosition] + cash_balances: dict[str, Decimal] + + +def _snapshot_to_payload(s: ManualSnapshotPayload) -> dict[str, Any]: + """Serialise a ManualSnapshotPayload into WF's import wire format.""" + return { + "date": s.date.isoformat(), + "currency": s.currency, + "positions": [ + { + "symbol": p.symbol, + "quantity": format(p.quantity, "f"), + "averageCost": format(p.average_cost, "f"), + "totalCostBasis": format(p.total_cost_basis, "f"), + "currency": p.currency, + } + for p in s.positions + ], + "cashBalances": {k: format(v, "f") for k, v in s.cash_balances.items()}, + } diff --git a/tests/providers/test_fidelity_planviewer.py b/tests/providers/test_fidelity_planviewer.py index 55b069e..a030ac3 100644 --- a/tests/providers/test_fidelity_planviewer.py +++ b/tests/providers/test_fidelity_planviewer.py @@ -1,19 +1,19 @@ from __future__ import annotations import json -from datetime import UTC, datetime +from datetime import UTC, date, datetime from decimal import Decimal from pathlib import Path import pytest -from broker_sync.models import Account, AccountType, ActivityType +from broker_sync.models import Account, AccountType from broker_sync.providers.fidelity_planviewer import ( ACCOUNT_ID, FidelityCreds, FidelityPlanViewerProvider, FidelityProviderConfigError, - _gains_offset_activity, + fidelity_holdings_to_snapshot, ) from broker_sync.providers.parsers.fidelity import ( parse_transactions_html, @@ -96,21 +96,53 @@ def test_parse_valuation_fixture() -> None: assert set(h.units_by_source.keys()) >= {"SASC", "ERXS"} -def test_gains_offset_emits_deposit_when_pot_exceeds_contributions() -> None: +def test_holdings_to_snapshot_real_fixture() -> None: html = (_FIXTURES / "transactions-full.html").read_text() valuation = json.loads((_FIXTURES / "valuation.json").read_text()) - txs = parse_transactions_html(html) holdings = parse_valuation_json(valuation) - as_of = datetime(2026, 4, 18, tzinfo=UTC) - offset = _gains_offset_activity(holdings, txs, as_of) - assert offset is not None - assert offset.activity_type in (ActivityType.DEPOSIT, ActivityType.WITHDRAWAL) - assert offset.amount is not None and offset.amount > 0 - assert offset.external_id == "fidelity:gains:2026-04-18" + total_contrib = sum((tx.amount for tx in parse_transactions_html(html)), + Decimal(0)) + + snapshot = fidelity_holdings_to_snapshot( + holdings=holdings, + total_real_contribution=total_contrib, + as_of=date(2026, 4, 18), + ) + assert snapshot is not None + assert snapshot.date == date(2026, 4, 18) + assert snapshot.currency == "GBP" + # Cost basis sums to the cash contributions (allocated by fund value share) + sum_cost = sum((p.total_cost_basis for p in snapshot.positions), Decimal(0)) + assert abs(sum_cost - total_contrib) < Decimal("1") + # Meta scheme had KDOA + LAFC + one other at fixture time; the + # dominant fund must be KDOA. + symbols = [p.symbol for p in snapshot.positions] + assert "KDOA" in symbols + kdoa = next(p for p in snapshot.positions if p.symbol == "KDOA") + assert kdoa.quantity > 0 + # Proportional cost-basis allocation: KDOA holds nearly the whole pot + # so it should get the lion's share of cost + kdoa_share = kdoa.total_cost_basis / sum_cost + assert kdoa_share > Decimal("0.9") + # cashBalances zero — pension contributions flow straight into funds + assert snapshot.cash_balances == {"GBP": Decimal(0)} -def test_gains_offset_none_when_no_holdings() -> None: - assert _gains_offset_activity( - holdings=[], transactions=[], - as_of=datetime(2026, 4, 18, tzinfo=UTC), +def test_holdings_to_snapshot_none_when_no_holdings() -> None: + assert fidelity_holdings_to_snapshot( + holdings=[], total_real_contribution=Decimal("100"), + as_of=date(2026, 4, 18), ) is None + + +def test_provider_caches_holdings_for_cli_snapshot_push() -> None: + """The CLI reads `last_holdings` after fetch() drains to push the + manual snapshot. This guards the contract that fetch() populates the + attribute even when no Activity is yielded (e.g., backfill window + cut-off).""" + prov = FidelityPlanViewerProvider(FidelityCreds( + storage_state_path="/tmp/x", plan_id="META", + )) + # Pre-fetch state: empty + assert prov.last_holdings == [] + assert prov.last_total_contribution == Decimal(0) diff --git a/tests/sinks/test_wealthfolio.py b/tests/sinks/test_wealthfolio.py index 210b915..436e52b 100644 --- a/tests/sinks/test_wealthfolio.py +++ b/tests/sinks/test_wealthfolio.py @@ -1,7 +1,7 @@ from __future__ import annotations import json -from datetime import UTC, datetime +from datetime import UTC, date, datetime from decimal import Decimal from pathlib import Path from typing import Any @@ -12,6 +12,9 @@ import pytest from broker_sync.models import Account, AccountType, Activity, ActivityType from broker_sync.sinks.wealthfolio import ( ImportValidationError, + ManualSnapshotPayload, + SnapshotPosition, + WealthfolioError, WealthfolioSink, WealthfolioUnauthorizedError, ) @@ -274,3 +277,99 @@ async def test_import_halts_on_validation_failure(tmp_path: Path) -> None: with pytest.raises(ImportValidationError, match="unknown symbol"): await sink.import_activities([_buy()]) assert calls == ["/api/v1/activities/import/check"] # real import never hit + + +# -- Manual snapshot import (Fidelity path) -- + + +@pytest.mark.asyncio +async def test_push_manual_snapshots_serialises_decimals_and_calls_endpoint( + tmp_path: Path, +) -> None: + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) + + seen: dict[str, Any] = {} + + async def handler(req: httpx.Request) -> httpx.Response: + if req.url.path == "/api/v1/snapshots/import": + seen["body"] = json.loads(req.content) + return httpx.Response( + 200, + json={"snapshotsImported": 1, "snapshotsFailed": 0, "errors": []}, + ) + return httpx.Response(404) + + sink = _client(httpx.MockTransport(handler), sp) + snapshot = ManualSnapshotPayload( + date=date(2026, 5, 16), + currency="GBP", + positions=[ + SnapshotPosition( + symbol="KDOA", + quantity=Decimal("4200.5"), + average_cost=Decimal("24.29"), + total_cost_basis=Decimal("102004.15"), + currency="GBP", + ), + ], + cash_balances={"GBP": Decimal(0)}, + ) + result = await sink.push_manual_snapshots( + account_id="a7d6208d-2bd6-4f85-bf54-b77984c78234", + snapshots=[snapshot], + ) + assert result["snapshotsImported"] == 1 + # Wire format: numeric fields are STRINGS (Decimal.__format__('f')) + body = seen["body"] + assert body["accountId"] == "a7d6208d-2bd6-4f85-bf54-b77984c78234" + pos = body["snapshots"][0]["positions"][0] + assert pos == { + "symbol": "KDOA", + "quantity": "4200.5", + "averageCost": "24.29", + "totalCostBasis": "102004.15", + "currency": "GBP", + } + assert body["snapshots"][0]["cashBalances"] == {"GBP": "0"} + + +@pytest.mark.asyncio +async def test_push_manual_snapshots_raises_on_partial_failure( + tmp_path: Path, +) -> None: + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) + + async def handler(req: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={ + "snapshotsImported": 0, + "snapshotsFailed": 1, + "errors": [{"row": 0, "msg": "bad symbol"}], + }, + ) + + sink = _client(httpx.MockTransport(handler), sp) + snapshot = ManualSnapshotPayload( + date=date(2026, 5, 16), currency="GBP", + positions=[], cash_balances={}, + ) + with pytest.raises(WealthfolioError, match="bad symbol"): + await sink.push_manual_snapshots(account_id="acct", snapshots=[snapshot]) + + +@pytest.mark.asyncio +async def test_push_manual_snapshots_short_circuits_on_empty( + tmp_path: Path, +) -> None: + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) + + async def handler(req: httpx.Request) -> httpx.Response: + raise AssertionError(f"unexpected request: {req.method} {req.url.path}") + + sink = _client(httpx.MockTransport(handler), sp) + result = await sink.push_manual_snapshots(account_id="acct", snapshots=[]) + assert result["snapshotsImported"] == 0