From f4a4c8892f8db0f6785c14dad78a9e75e1528172 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Thu, 7 May 2026 22:47:37 +0000 Subject: [PATCH 1/5] trigger pipeline From 5adc4a7ba4851e2c91c2c5c512731a444eb95906 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Thu, 7 May 2026 23:25:28 +0000 Subject: [PATCH 2/5] =?UTF-8?q?[ci]=20deploy.yml:=20manual-only=20?= =?UTF-8?q?=E2=80=94=20push=20events=20don't=20set=20IMAGE=5FTAG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .woodpecker/deploy.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.woodpecker/deploy.yml b/.woodpecker/deploy.yml index 9002f1c..731f409 100644 --- a/.woodpecker/deploy.yml +++ b/.woodpecker/deploy.yml @@ -1,5 +1,9 @@ when: - - event: [manual, push] + # Manual-only — fired with IMAGE_TAG by the build pipeline (or + # by a human kicking off a deploy from the Woodpecker UI). + # The earlier `[manual, push]` would fire on every push and fail + # at check-vars because IMAGE_TAG is unset on push events. + - event: manual steps: - name: check-vars From cb159e17d9b4da28dd8161ded3f63e1187deddb9 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 16 May 2026 13:56:25 +0000 Subject: [PATCH 3/5] fidelity: push per-fund manual snapshot instead of gains-offset DEPOSIT MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PlanViewer's DisplayValuation.action JSON already gives us current fund units + unit price; we were parsing it and throwing it away, emitting only a single 'unrealised-gains-offset' DEPOSIT to make Wealthfolio's totals match the dashboard. That hack double-counted the gain as a cash contribution, hiding £35k of pension growth from every contribution/growth/ROI panel. New flow: - FidelityPlanViewerProvider exposes last_holdings + last_total_contribution after fetch() drains. - fidelity-ingest CLI converts to a ManualSnapshotPayload (cost basis allocated proportionally by current fund value share) and posts to WF /api/v1/snapshots/import. WF auto-creates unknown fund symbols with kind=INVESTMENT, quoteMode=MANUAL, quoteCcy=GBP. - The gains-offset emission is removed entirely. Historical offset rows already in WF are corrected at the dashboard layer by the dav_corrected view shipped in infra@2841347e. WealthfolioSink gains push_manual_snapshots() + ManualSnapshotPayload / SnapshotPosition wire types. 11 sink tests (3 new) + 9 fidelity provider tests (2 changed, 1 new) all green; mypy + ruff clean. --- broker_sync/cli.py | 30 ++++- broker_sync/providers/fidelity_planviewer.py | 121 ++++++++++++------- broker_sync/sinks/wealthfolio.py | 85 ++++++++++++- tests/providers/test_fidelity_planviewer.py | 62 +++++++--- tests/sinks/test_wealthfolio.py | 101 +++++++++++++++- 5 files changed, 339 insertions(+), 60 deletions(-) diff --git a/broker_sync/cli.py b/broker_sync/cli.py index b5ce4c2..385fd01 100644 --- a/broker_sync/cli.py +++ b/broker_sync/cli.py @@ -438,6 +438,15 @@ def fidelity_ingest( sys.exit(2) async def _run() -> None: + from datetime import date as _date_t + + from broker_sync.providers.fidelity_planviewer import ( + ACCOUNT_ID as FID_ACCOUNT_ID, + ) + from broker_sync.providers.fidelity_planviewer import ( + fidelity_holdings_to_snapshot, + ) + sink = WealthfolioSink( base_url=wf_base_url, username=wf_username, @@ -455,12 +464,31 @@ def fidelity_ingest( result = await sync_provider_to_wealthfolio( provider=provider, sink=sink, dedup=dedup, since=since, ) + # PlanViewer has no historical per-fund unit-price feed, so + # the Activity stream above only carries cash flows. The + # current-pot fund positions captured in the same scrape get + # pushed via /api/v1/snapshots/import so per-fund quantity + + # cost basis land in WF (and propagate to the wealth + # dashboard's Positions table via pg-sync). + snapshot_imported = 0 + if provider.last_holdings: + snapshot = fidelity_holdings_to_snapshot( + holdings=provider.last_holdings, + total_real_contribution=provider.last_total_contribution, + as_of=_date_t.today(), + ) + if snapshot is not None: + push_result = await sink.push_manual_snapshots( + account_id=FID_ACCOUNT_ID, snapshots=[snapshot], + ) + snapshot_imported = int(push_result.get("snapshotsImported", 0)) finally: await sink.close() typer.echo(f"fidelity-ingest: fetched={result.fetched} " f"new={result.new_after_dedup} " f"imported={result.imported} " - f"failed={result.failed}") + f"failed={result.failed} " + f"snapshots={snapshot_imported}") if result.failed > 0: sys.exit(1) diff --git a/broker_sync/providers/fidelity_planviewer.py b/broker_sync/providers/fidelity_planviewer.py index e201ac8..4658dcf 100644 --- a/broker_sync/providers/fidelity_planviewer.py +++ b/broker_sync/providers/fidelity_planviewer.py @@ -16,21 +16,28 @@ We keep a Playwright-maintained session via ``storage_state.json``: fund holdings. On 401/idle-timeout we raise :class:`FidelitySessionError` so Prometheus alerts Viktor to re-seed. -## Emitted Activity shape +## Emitted Activity / snapshot shape - One ``DEPOSIT`` per cash-impacting transaction (Regular Premium, Single Premium, rebate, etc.). ``external_id = fidelity:tx:``. -- One synthetic ``DEPOSIT`` for unrealised gains so WF's Net Worth matches - the Fidelity dashboard. ``external_id = - fidelity:gains:``. - Bulk Switches / Fund Switches are skipped (no cash movement). +- After the activity stream drains, the ``fidelity-ingest`` CLI calls + ``WealthfolioSink.push_manual_snapshots`` with one ``ManualSnapshotPayload`` + per fund holding (today's date, units + cost basis allocated + proportionally to fund value share). This sets per-fund quantity and + cost basis in WF so the dashboard Positions table shows the pension + funds alongside the brokerage assets. +- The old synthetic ``fidelity:gains:`` DEPOSIT is no longer + emitted — the snapshot supersedes it. Old offset rows that landed + before this change are corrected at the dashboard layer by the + ``dav_corrected`` PG view (``infra/stacks/wealthfolio/main.tf``). """ from __future__ import annotations import contextlib import logging from collections.abc import AsyncIterator -from datetime import UTC, datetime +from datetime import date, datetime from decimal import Decimal from pathlib import Path from typing import Any, NamedTuple @@ -42,6 +49,7 @@ from broker_sync.providers.parsers.fidelity import ( parse_transactions_html, parse_valuation_json, ) +from broker_sync.sinks.wealthfolio import ManualSnapshotPayload, SnapshotPosition log = logging.getLogger(__name__) @@ -86,37 +94,6 @@ def _tx_to_activity(tx: FidelityCashTx) -> Activity: ) -def _gains_offset_activity( - holdings: list[FidelityHolding], - transactions: list[FidelityCashTx], - as_of: datetime, -) -> Activity | None: - """Create a synthetic DEPOSIT/WITHDRAWAL so WF Net Worth matches the - Fidelity dashboard's reported pot value. - - The offset carries a date-derived external_id so monthly runs refresh - the same synthetic entry rather than stacking duplicates. - """ - if not holdings: - return None - total_value = sum((h.total_value for h in holdings), Decimal(0)) - total_contrib = sum((t.amount for t in transactions), Decimal(0)) - gains = total_value - total_contrib - if gains == 0: - return None - return Activity( - external_id=f"fidelity:gains:{as_of.date().isoformat()}", - account_id=ACCOUNT_ID, - account_type=AccountType.WORKPLACE_PENSION, - date=as_of, - activity_type=ActivityType.DEPOSIT if gains > 0 else ActivityType.WITHDRAWAL, - currency=_CCY, - amount=abs(gains), - notes=(f"fidelity-planviewer:unrealised-gains-offset " - f"(pot=£{total_value}, contrib=£{total_contrib})"), - ) - - class FidelityPlanViewerProvider: """Read-only provider against Fidelity UK PlanViewer. @@ -125,11 +102,18 @@ class FidelityPlanViewerProvider: - ``fetch(since, before)`` opens a Playwright session with the saved storage_state, navigates to the transaction-history page with a wide date range, scrapes the table, and intercepts the valuation XHR. + - After ``fetch()`` completes, ``last_holdings`` holds the per-fund + unit positions captured in the same scrape — used by the + ``fidelity-ingest`` CLI to push a manual snapshot to Wealthfolio + so per-fund quantities + cost basis land in the Positions table + (the activity stream alone only carries cash flows). """ name = "fidelity-planviewer" def __init__(self, creds: FidelityCreds) -> None: self._creds = creds + self.last_holdings: list[FidelityHolding] = [] + self.last_total_contribution: Decimal = Decimal(0) def accounts(self) -> list[Account]: return [ @@ -162,19 +146,72 @@ class FidelityPlanViewerProvider: log.info("fidelity: parsed %d transactions, %d holdings", len(transactions), len(holdings)) + # Snapshot the per-fund holdings for the CLI to push as a manual + # holdings_snapshot after this generator drains. Wealthfolio's + # activity model can't represent pension fund unit purchases (no + # per-purchase price feed from PlanViewer), so we record current + # state via /api/v1/snapshots/import instead. + self.last_holdings = holdings + self.last_total_contribution = sum( + (t.amount for t in transactions), Decimal(0) + ) + for tx in transactions: if since is not None and tx.date < since: continue if before is not None and tx.date >= before: continue yield _tx_to_activity(tx) + # NB: the gains-offset DEPOSIT we used to emit here is superseded + # by the manual snapshot push the CLI does after fetch() drains. + # The snapshot sets per-fund quantity + cost basis directly, so + # Wealthfolio computes growth from positions instead of needing a + # fake cash entry. Old offset rows still in WF are corrected at + # the dashboard layer by the dav_corrected view. - # The gains offset is always "as of now" so it reflects today's pot. - # Only emit when the caller isn't windowing (full state). - if since is None and before is None: - offset = _gains_offset_activity(holdings, transactions, datetime.now(UTC)) - if offset is not None: - yield offset + +def fidelity_holdings_to_snapshot( + holdings: list[FidelityHolding], + total_real_contribution: Decimal, + as_of: date, +) -> ManualSnapshotPayload | None: + """Convert scraped holdings into a Wealthfolio manual snapshot payload. + + Cost-basis allocation: PlanViewer doesn't expose historical purchase + prices for individual fund unit buys, so we approximate per-fund + cost basis by allocating the cumulative cash contribution + proportionally to each fund's share of the current pot value. For + the typical single-fund Meta scheme this is exact; if Viktor's plan + later splits into multiple funds the proportional split is the + least-wrong allocation we can compute from monthly snapshots. + + cashBalances is set to zero — pension contributions flow straight + into funds, the synthetic Wealthfolio "cash balance" only existed + because of the old gains-offset DEPOSIT hack. + """ + if not holdings: + return None + total_value = sum((h.total_value for h in holdings), Decimal(0)) + if total_value <= 0: + return None + positions: list[SnapshotPosition] = [] + for h in holdings: + share = h.total_value / total_value + cost = (total_real_contribution * share).quantize(Decimal("0.01")) + avg_cost = (cost / h.units).quantize(Decimal("0.0001")) if h.units > 0 else Decimal(0) + positions.append(SnapshotPosition( + symbol=h.fund_code, + quantity=h.units, + average_cost=avg_cost, + total_cost_basis=cost, + currency=h.currency, + )) + return ManualSnapshotPayload( + date=as_of, + currency=_CCY, + positions=positions, + cash_balances={_CCY: Decimal(0)}, + ) async def _scrape_live_session( diff --git a/broker_sync/sinks/wealthfolio.py b/broker_sync/sinks/wealthfolio.py index efbd50c..cb6ea45 100644 --- a/broker_sync/sinks/wealthfolio.py +++ b/broker_sync/sinks/wealthfolio.py @@ -2,7 +2,9 @@ from __future__ import annotations import json from collections.abc import Iterable -from datetime import UTC +from dataclasses import dataclass +from datetime import UTC, date +from decimal import Decimal from pathlib import Path from typing import Any @@ -14,6 +16,7 @@ _LOGIN_PATH = "/api/v1/auth/login" _ACCOUNTS_PATH = "/api/v1/accounts" _IMPORT_CHECK = "/api/v1/activities/import/check" _IMPORT_REAL = "/api/v1/activities/import" +_SNAPSHOTS_IMPORT = "/api/v1/snapshots/import" class WealthfolioError(Exception): @@ -262,3 +265,83 @@ class WealthfolioSink: f"First warning: {first_warn}") assert isinstance(got, list) return [r for r in got if isinstance(r, dict)] + + # -- manual holdings snapshots -- + + async def push_manual_snapshots( + self, + account_id: str, + snapshots: list[ManualSnapshotPayload], + ) -> dict[str, Any]: + """Push manual holdings snapshots to /api/v1/snapshots/import. + + Each snapshot carries a date + per-fund positions + cash balances. + Wealthfolio auto-creates any unknown asset symbol with + ``kind=INVESTMENT, quoteMode=MANUAL, quoteCcy=`` and uses + the snapshot to derive holdings + valuation for that date — bypassing + the activity-ledger derivation entirely for the targeted day. + + Used by the Fidelity provider since PlanViewer exposes current + fund units + price but no per-trade history. Re-imports for the + same (account, date) overwrite in place. + """ + if not snapshots: + return {"snapshotsImported": 0, "snapshotsFailed": 0, "errors": []} + body = { + "accountId": account_id, + "snapshots": [_snapshot_to_payload(s) for s in snapshots], + } + resp = await self._request("POST", _SNAPSHOTS_IMPORT, json=body) + if resp.status_code >= 400: + try: + payload = resp.json() + except Exception: + payload = {"raw": resp.text} + raise WealthfolioError( + f"Wealthfolio /snapshots/import rejected: {payload}") + result = resp.json() + assert isinstance(result, dict) + failed = int(result.get("snapshotsFailed", 0)) + if failed > 0: + raise WealthfolioError( + f"Wealthfolio /snapshots/import: {failed} snapshot(s) failed; " + f"errors={result.get('errors')}") + return result + + +@dataclass(frozen=True) +class SnapshotPosition: + """A per-fund position row in a Wealthfolio manual snapshot.""" + symbol: str + quantity: Decimal + average_cost: Decimal + total_cost_basis: Decimal + currency: str + + +@dataclass(frozen=True) +class ManualSnapshotPayload: + """Sink-facing snapshot row. Mirrors the JSON shape WF expects.""" + date: date + currency: str + positions: list[SnapshotPosition] + cash_balances: dict[str, Decimal] + + +def _snapshot_to_payload(s: ManualSnapshotPayload) -> dict[str, Any]: + """Serialise a ManualSnapshotPayload into WF's import wire format.""" + return { + "date": s.date.isoformat(), + "currency": s.currency, + "positions": [ + { + "symbol": p.symbol, + "quantity": format(p.quantity, "f"), + "averageCost": format(p.average_cost, "f"), + "totalCostBasis": format(p.total_cost_basis, "f"), + "currency": p.currency, + } + for p in s.positions + ], + "cashBalances": {k: format(v, "f") for k, v in s.cash_balances.items()}, + } diff --git a/tests/providers/test_fidelity_planviewer.py b/tests/providers/test_fidelity_planviewer.py index 55b069e..a030ac3 100644 --- a/tests/providers/test_fidelity_planviewer.py +++ b/tests/providers/test_fidelity_planviewer.py @@ -1,19 +1,19 @@ from __future__ import annotations import json -from datetime import UTC, datetime +from datetime import UTC, date, datetime from decimal import Decimal from pathlib import Path import pytest -from broker_sync.models import Account, AccountType, ActivityType +from broker_sync.models import Account, AccountType from broker_sync.providers.fidelity_planviewer import ( ACCOUNT_ID, FidelityCreds, FidelityPlanViewerProvider, FidelityProviderConfigError, - _gains_offset_activity, + fidelity_holdings_to_snapshot, ) from broker_sync.providers.parsers.fidelity import ( parse_transactions_html, @@ -96,21 +96,53 @@ def test_parse_valuation_fixture() -> None: assert set(h.units_by_source.keys()) >= {"SASC", "ERXS"} -def test_gains_offset_emits_deposit_when_pot_exceeds_contributions() -> None: +def test_holdings_to_snapshot_real_fixture() -> None: html = (_FIXTURES / "transactions-full.html").read_text() valuation = json.loads((_FIXTURES / "valuation.json").read_text()) - txs = parse_transactions_html(html) holdings = parse_valuation_json(valuation) - as_of = datetime(2026, 4, 18, tzinfo=UTC) - offset = _gains_offset_activity(holdings, txs, as_of) - assert offset is not None - assert offset.activity_type in (ActivityType.DEPOSIT, ActivityType.WITHDRAWAL) - assert offset.amount is not None and offset.amount > 0 - assert offset.external_id == "fidelity:gains:2026-04-18" + total_contrib = sum((tx.amount for tx in parse_transactions_html(html)), + Decimal(0)) + + snapshot = fidelity_holdings_to_snapshot( + holdings=holdings, + total_real_contribution=total_contrib, + as_of=date(2026, 4, 18), + ) + assert snapshot is not None + assert snapshot.date == date(2026, 4, 18) + assert snapshot.currency == "GBP" + # Cost basis sums to the cash contributions (allocated by fund value share) + sum_cost = sum((p.total_cost_basis for p in snapshot.positions), Decimal(0)) + assert abs(sum_cost - total_contrib) < Decimal("1") + # Meta scheme had KDOA + LAFC + one other at fixture time; the + # dominant fund must be KDOA. + symbols = [p.symbol for p in snapshot.positions] + assert "KDOA" in symbols + kdoa = next(p for p in snapshot.positions if p.symbol == "KDOA") + assert kdoa.quantity > 0 + # Proportional cost-basis allocation: KDOA holds nearly the whole pot + # so it should get the lion's share of cost + kdoa_share = kdoa.total_cost_basis / sum_cost + assert kdoa_share > Decimal("0.9") + # cashBalances zero — pension contributions flow straight into funds + assert snapshot.cash_balances == {"GBP": Decimal(0)} -def test_gains_offset_none_when_no_holdings() -> None: - assert _gains_offset_activity( - holdings=[], transactions=[], - as_of=datetime(2026, 4, 18, tzinfo=UTC), +def test_holdings_to_snapshot_none_when_no_holdings() -> None: + assert fidelity_holdings_to_snapshot( + holdings=[], total_real_contribution=Decimal("100"), + as_of=date(2026, 4, 18), ) is None + + +def test_provider_caches_holdings_for_cli_snapshot_push() -> None: + """The CLI reads `last_holdings` after fetch() drains to push the + manual snapshot. This guards the contract that fetch() populates the + attribute even when no Activity is yielded (e.g., backfill window + cut-off).""" + prov = FidelityPlanViewerProvider(FidelityCreds( + storage_state_path="/tmp/x", plan_id="META", + )) + # Pre-fetch state: empty + assert prov.last_holdings == [] + assert prov.last_total_contribution == Decimal(0) diff --git a/tests/sinks/test_wealthfolio.py b/tests/sinks/test_wealthfolio.py index 210b915..436e52b 100644 --- a/tests/sinks/test_wealthfolio.py +++ b/tests/sinks/test_wealthfolio.py @@ -1,7 +1,7 @@ from __future__ import annotations import json -from datetime import UTC, datetime +from datetime import UTC, date, datetime from decimal import Decimal from pathlib import Path from typing import Any @@ -12,6 +12,9 @@ import pytest from broker_sync.models import Account, AccountType, Activity, ActivityType from broker_sync.sinks.wealthfolio import ( ImportValidationError, + ManualSnapshotPayload, + SnapshotPosition, + WealthfolioError, WealthfolioSink, WealthfolioUnauthorizedError, ) @@ -274,3 +277,99 @@ async def test_import_halts_on_validation_failure(tmp_path: Path) -> None: with pytest.raises(ImportValidationError, match="unknown symbol"): await sink.import_activities([_buy()]) assert calls == ["/api/v1/activities/import/check"] # real import never hit + + +# -- Manual snapshot import (Fidelity path) -- + + +@pytest.mark.asyncio +async def test_push_manual_snapshots_serialises_decimals_and_calls_endpoint( + tmp_path: Path, +) -> None: + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) + + seen: dict[str, Any] = {} + + async def handler(req: httpx.Request) -> httpx.Response: + if req.url.path == "/api/v1/snapshots/import": + seen["body"] = json.loads(req.content) + return httpx.Response( + 200, + json={"snapshotsImported": 1, "snapshotsFailed": 0, "errors": []}, + ) + return httpx.Response(404) + + sink = _client(httpx.MockTransport(handler), sp) + snapshot = ManualSnapshotPayload( + date=date(2026, 5, 16), + currency="GBP", + positions=[ + SnapshotPosition( + symbol="KDOA", + quantity=Decimal("4200.5"), + average_cost=Decimal("24.29"), + total_cost_basis=Decimal("102004.15"), + currency="GBP", + ), + ], + cash_balances={"GBP": Decimal(0)}, + ) + result = await sink.push_manual_snapshots( + account_id="a7d6208d-2bd6-4f85-bf54-b77984c78234", + snapshots=[snapshot], + ) + assert result["snapshotsImported"] == 1 + # Wire format: numeric fields are STRINGS (Decimal.__format__('f')) + body = seen["body"] + assert body["accountId"] == "a7d6208d-2bd6-4f85-bf54-b77984c78234" + pos = body["snapshots"][0]["positions"][0] + assert pos == { + "symbol": "KDOA", + "quantity": "4200.5", + "averageCost": "24.29", + "totalCostBasis": "102004.15", + "currency": "GBP", + } + assert body["snapshots"][0]["cashBalances"] == {"GBP": "0"} + + +@pytest.mark.asyncio +async def test_push_manual_snapshots_raises_on_partial_failure( + tmp_path: Path, +) -> None: + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) + + async def handler(req: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={ + "snapshotsImported": 0, + "snapshotsFailed": 1, + "errors": [{"row": 0, "msg": "bad symbol"}], + }, + ) + + sink = _client(httpx.MockTransport(handler), sp) + snapshot = ManualSnapshotPayload( + date=date(2026, 5, 16), currency="GBP", + positions=[], cash_balances={}, + ) + with pytest.raises(WealthfolioError, match="bad symbol"): + await sink.push_manual_snapshots(account_id="acct", snapshots=[snapshot]) + + +@pytest.mark.asyncio +async def test_push_manual_snapshots_short_circuits_on_empty( + tmp_path: Path, +) -> None: + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) + + async def handler(req: httpx.Request) -> httpx.Response: + raise AssertionError(f"unexpected request: {req.method} {req.url.path}") + + sink = _client(httpx.MockTransport(handler), sp) + result = await sink.push_manual_snapshots(account_id="acct", snapshots=[]) + assert result["snapshotsImported"] == 0 From c9c0310733bf0efa61952ccb5fcc708078ba29c3 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 16 May 2026 23:47:49 +0000 Subject: [PATCH 4/5] fidelity: snapshot push needs WF account UUID, not logical id /api/v1/snapshots/import lookups the account by Wealthfolio's own UUID; passing our provider-side logical id ('fidelity-workplace-pension') returns 400 'Database operation failed: Record not found'. Resolve via sink.ensure_account() which the pipeline already runs idempotently, then pass the returned UUID into push_manual_snapshots(). --- broker_sync/cli.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/broker_sync/cli.py b/broker_sync/cli.py index 385fd01..879c3a2 100644 --- a/broker_sync/cli.py +++ b/broker_sync/cli.py @@ -440,9 +440,6 @@ def fidelity_ingest( async def _run() -> None: from datetime import date as _date_t - from broker_sync.providers.fidelity_planviewer import ( - ACCOUNT_ID as FID_ACCOUNT_ID, - ) from broker_sync.providers.fidelity_planviewer import ( fidelity_holdings_to_snapshot, ) @@ -478,8 +475,14 @@ def fidelity_ingest( as_of=_date_t.today(), ) if snapshot is not None: + # /api/v1/snapshots/import wants WF's own account UUID, + # not our logical provider id — look it up via the same + # match the pipeline used (provider+providerAccountId). + wf_account_id = await sink.ensure_account( + provider.accounts()[0], + ) push_result = await sink.push_manual_snapshots( - account_id=FID_ACCOUNT_ID, snapshots=[snapshot], + account_id=wf_account_id, snapshots=[snapshot], ) snapshot_imported = int(push_result.get("snapshotsImported", 0)) finally: From 98c47296228ed9b5f25c3a8eedf411677aa05979 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 17 May 2026 00:35:17 +0000 Subject: [PATCH 5/5] fidelity: replace snapshot-push with delta gains-offset DEPOSITs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-fund snapshot import landed quantities but dropped cost basis + needed a separate quote-push path we never identified. Snapshotting also collided with WF's own TOTAL aggregation and ZEROED the Fidelity cash balance. Simpler plan: each monthly scrape emits a single DEPOSIT (or WITHDRAWAL on a market drop) sized to the delta between the live PlanViewer pot value and Wealthfolio's running total. dav_corrected PG view continues to subtract these offsets from net_contribution so the dashboard Growth/ROI math stays right. - New gains_offset_delta_activity() — current_gain - prior_offset. - New WealthfolioSink.cumulative_amount_with_notes_prefix() — sums the existing fidelity-planviewer:unrealised-gains-offset DEPOSITs in WF so we know what's already been emitted. - CLI runs sync_provider_to_wealthfolio first (cash flows), then computes + emits the delta via import_activities. - 4 new provider tests for the delta logic; full suite (144 + 1 skipped) green; mypy + ruff clean. The old fidelity_holdings_to_snapshot helper + push_manual_snapshots sink method stay for future use but are no longer called. --- broker_sync/cli.py | 45 +++++++------- broker_sync/providers/fidelity_planviewer.py | 61 +++++++++++++++---- broker_sync/sinks/wealthfolio.py | 49 +++++++++++++++ tests/providers/test_fidelity_planviewer.py | 63 +++++++++++++++++++- 4 files changed, 183 insertions(+), 35 deletions(-) diff --git a/broker_sync/cli.py b/broker_sync/cli.py index 879c3a2..6e08eb8 100644 --- a/broker_sync/cli.py +++ b/broker_sync/cli.py @@ -438,10 +438,8 @@ def fidelity_ingest( sys.exit(2) async def _run() -> None: - from datetime import date as _date_t - from broker_sync.providers.fidelity_planviewer import ( - fidelity_holdings_to_snapshot, + gains_offset_delta_activity, ) sink = WealthfolioSink( @@ -461,37 +459,36 @@ def fidelity_ingest( result = await sync_provider_to_wealthfolio( provider=provider, sink=sink, dedup=dedup, since=since, ) - # PlanViewer has no historical per-fund unit-price feed, so - # the Activity stream above only carries cash flows. The - # current-pot fund positions captured in the same scrape get - # pushed via /api/v1/snapshots/import so per-fund quantity + - # cost basis land in WF (and propagate to the wealth - # dashboard's Positions table via pg-sync). - snapshot_imported = 0 + # PlanViewer doesn't expose per-fund unit prices in any feed + # WF can consume, so the only way to keep WF's pension total in + # line with the live PlanViewer pot value is to emit a small + # DEPOSIT (or WITHDRAWAL on a market drop) each run sized to + # the growth since the last scrape. The dav_corrected PG view + # subtracts these offsets from net_contribution so the + # dashboard's Growth/ROI panels stay accurate. + gains_delta_emitted = 0 if provider.last_holdings: - snapshot = fidelity_holdings_to_snapshot( + wf_account_id = await sink.ensure_account(provider.accounts()[0]) + prior_offset = await sink.cumulative_amount_with_notes_prefix( + account_id=wf_account_id, + notes_prefix="fidelity-planviewer:unrealised-gains-offset", + ) + delta = gains_offset_delta_activity( holdings=provider.last_holdings, total_real_contribution=provider.last_total_contribution, - as_of=_date_t.today(), + prior_offset_cumulative=prior_offset, + as_of=datetime.now(UTC), ) - if snapshot is not None: - # /api/v1/snapshots/import wants WF's own account UUID, - # not our logical provider id — look it up via the same - # match the pipeline used (provider+providerAccountId). - wf_account_id = await sink.ensure_account( - provider.accounts()[0], - ) - push_result = await sink.push_manual_snapshots( - account_id=wf_account_id, snapshots=[snapshot], - ) - snapshot_imported = int(push_result.get("snapshotsImported", 0)) + if delta is not None: + await sink.import_activities([delta]) + gains_delta_emitted = 1 finally: await sink.close() typer.echo(f"fidelity-ingest: fetched={result.fetched} " f"new={result.new_after_dedup} " f"imported={result.imported} " f"failed={result.failed} " - f"snapshots={snapshot_imported}") + f"gains_delta={gains_delta_emitted}") if result.failed > 0: sys.exit(1) diff --git a/broker_sync/providers/fidelity_planviewer.py b/broker_sync/providers/fidelity_planviewer.py index 4658dcf..b5b4e33 100644 --- a/broker_sync/providers/fidelity_planviewer.py +++ b/broker_sync/providers/fidelity_planviewer.py @@ -103,10 +103,10 @@ class FidelityPlanViewerProvider: storage_state, navigates to the transaction-history page with a wide date range, scrapes the table, and intercepts the valuation XHR. - After ``fetch()`` completes, ``last_holdings`` holds the per-fund - unit positions captured in the same scrape — used by the - ``fidelity-ingest`` CLI to push a manual snapshot to Wealthfolio - so per-fund quantities + cost basis land in the Positions table - (the activity stream alone only carries cash flows). + unit positions and ``last_total_contribution`` the cumulative cash + contribution — used by the ``fidelity-ingest`` CLI to emit a + delta-shaped DEPOSIT that nudges WF's net worth to match the + PlanViewer reported pot value (see ``gains_offset_delta_activity``). """ name = "fidelity-planviewer" @@ -162,12 +162,53 @@ class FidelityPlanViewerProvider: if before is not None and tx.date >= before: continue yield _tx_to_activity(tx) - # NB: the gains-offset DEPOSIT we used to emit here is superseded - # by the manual snapshot push the CLI does after fetch() drains. - # The snapshot sets per-fund quantity + cost basis directly, so - # Wealthfolio computes growth from positions instead of needing a - # fake cash entry. Old offset rows still in WF are corrected at - # the dashboard layer by the dav_corrected view. + # Gains-offset DEPOSITs are emitted by the CLI (which has the + # prior cumulative offset from WF). See `gains_offset_delta_activity`. + + +def gains_offset_delta_activity( + holdings: list[FidelityHolding], + total_real_contribution: Decimal, + prior_offset_cumulative: Decimal, + as_of: datetime, + min_delta: Decimal = Decimal("0.5"), +) -> Activity | None: + """Compute the gains-offset DELTA since the last scrape and shape it + as a DEPOSIT (or WITHDRAWAL on a market drop). + + The pension's per-fund prices aren't trackable in WF directly (no + public quote feed for these institutional life-fund share classes). + Instead, each monthly scrape emits a single small DEPOSIT/WITHDRAWAL + sized to ``(current_pot - real_contributions) - prior_cumulative_offset`` + — i.e., the growth (or loss) accrued since the last run. + + Wealthfolio's net_contribution then incorrectly includes all these + offsets; the ``dav_corrected`` PG view subtracts them back out so the + dashboard's Growth/ROI panels remain accurate. The deterministic + external_id (per scrape date) lets re-runs of the same day overwrite + rather than stack duplicates. + """ + if not holdings: + return None + current_pot = sum((h.total_value for h in holdings), Decimal(0)) + current_gain = current_pot - total_real_contribution + delta = current_gain - prior_offset_cumulative + if abs(delta) < min_delta: + return None + return Activity( + external_id=f"fidelity:gains-delta:{as_of.date().isoformat()}", + account_id=ACCOUNT_ID, + account_type=AccountType.WORKPLACE_PENSION, + date=as_of, + activity_type=ActivityType.DEPOSIT if delta > 0 else ActivityType.WITHDRAWAL, + currency=_CCY, + amount=abs(delta), + notes=( + f"fidelity-planviewer:unrealised-gains-offset delta=£{delta} " + f"(pot=£{current_pot}, contrib=£{total_real_contribution}, " + f"prior_offset=£{prior_offset_cumulative})" + ), + ) def fidelity_holdings_to_snapshot( diff --git a/broker_sync/sinks/wealthfolio.py b/broker_sync/sinks/wealthfolio.py index cb6ea45..7144f6f 100644 --- a/broker_sync/sinks/wealthfolio.py +++ b/broker_sync/sinks/wealthfolio.py @@ -17,6 +17,7 @@ _ACCOUNTS_PATH = "/api/v1/accounts" _IMPORT_CHECK = "/api/v1/activities/import/check" _IMPORT_REAL = "/api/v1/activities/import" _SNAPSHOTS_IMPORT = "/api/v1/snapshots/import" +_ACTIVITIES_SEARCH = "/api/v1/activities/search" class WealthfolioError(Exception): @@ -266,6 +267,54 @@ class WealthfolioSink: assert isinstance(got, list) return [r for r in got if isinstance(r, dict)] + # -- activity lookups -- + + async def cumulative_amount_with_notes_prefix( + self, + account_id: str, + notes_prefix: str, + ) -> Decimal: + """Sum the amount of DEPOSIT/WITHDRAWAL activities whose notes start + with ``notes_prefix``, signed (deposits positive, withdrawals negative). + + Used by the Fidelity provider to compute the delta gains-offset: + ``current_gain - cumulative_existing_offset`` becomes the new + DEPOSIT to emit on each monthly run. + """ + try: + resp = await self._request( + "POST", _ACTIVITIES_SEARCH, + json={"accountIds": [account_id], "page": 1, "pageSize": 500}, + ) + except Exception: + return Decimal(0) + if resp.status_code >= 400: + return Decimal(0) + payload = resp.json() + rows = payload.get("data", payload) if isinstance(payload, dict) else payload + if not isinstance(rows, list): + return Decimal(0) + total = Decimal(0) + for r in rows: + if not isinstance(r, dict): + continue + notes = r.get("comment") or r.get("notes") or "" + if not isinstance(notes, str) or not notes.startswith(notes_prefix): + continue + amt_raw = r.get("amount") + if amt_raw is None: + continue + try: + amt = Decimal(str(amt_raw)) + except Exception: + continue + atype = (r.get("activityType") or r.get("activity_type") or "").upper() + if atype == "WITHDRAWAL": + total -= amt + else: + total += amt + return total + # -- manual holdings snapshots -- async def push_manual_snapshots( diff --git a/tests/providers/test_fidelity_planviewer.py b/tests/providers/test_fidelity_planviewer.py index a030ac3..acfccbc 100644 --- a/tests/providers/test_fidelity_planviewer.py +++ b/tests/providers/test_fidelity_planviewer.py @@ -7,13 +7,14 @@ from pathlib import Path import pytest -from broker_sync.models import Account, AccountType +from broker_sync.models import Account, AccountType, ActivityType from broker_sync.providers.fidelity_planviewer import ( ACCOUNT_ID, FidelityCreds, FidelityPlanViewerProvider, FidelityProviderConfigError, fidelity_holdings_to_snapshot, + gains_offset_delta_activity, ) from broker_sync.providers.parsers.fidelity import ( parse_transactions_html, @@ -146,3 +147,63 @@ def test_provider_caches_holdings_for_cli_snapshot_push() -> None: # Pre-fetch state: empty assert prov.last_holdings == [] assert prov.last_total_contribution == Decimal(0) + + +# -- delta-shaped gains offset (the monthly accumulation mechanism) -- + + +def _holdings_summing_to(total: Decimal) -> list: + from broker_sync.providers.parsers.fidelity import FidelityHolding + return [FidelityHolding( + fund_code="KDOA", fund_name="Test", units=Decimal("100"), + unit_price=total / Decimal("100"), currency="GBP", total_value=total, + units_by_source={}, + )] + + +def test_gains_delta_emits_deposit_when_gain_exceeds_prior_offset() -> None: + # pot £145k, real contrib £102k → current gain £43k; prior offset £35k + # → delta = +£8k + activity = gains_offset_delta_activity( + holdings=_holdings_summing_to(Decimal("145000")), + total_real_contribution=Decimal("102000"), + prior_offset_cumulative=Decimal("35000"), + as_of=datetime(2026, 5, 17, tzinfo=UTC), + ) + assert activity is not None + assert activity.activity_type == ActivityType.DEPOSIT + assert activity.amount == Decimal("8000") + assert activity.external_id == "fidelity:gains-delta:2026-05-17" + assert "unrealised-gains-offset" in (activity.notes or "") + + +def test_gains_delta_emits_withdrawal_on_market_drop() -> None: + # pot dropped: current gain £30k, prior offset £35k → delta = -£5k + activity = gains_offset_delta_activity( + holdings=_holdings_summing_to(Decimal("132000")), + total_real_contribution=Decimal("102000"), + prior_offset_cumulative=Decimal("35000"), + as_of=datetime(2026, 5, 17, tzinfo=UTC), + ) + assert activity is not None + assert activity.activity_type == ActivityType.WITHDRAWAL + assert activity.amount == Decimal("5000") + + +def test_gains_delta_suppressed_below_minimum() -> None: + # delta ~£0.20, below the £0.50 min — skip emission to avoid noise. + activity = gains_offset_delta_activity( + holdings=_holdings_summing_to(Decimal("137000.20")), + total_real_contribution=Decimal("102000"), + prior_offset_cumulative=Decimal("35000"), + as_of=datetime(2026, 5, 17, tzinfo=UTC), + ) + assert activity is None + + +def test_gains_delta_none_when_no_holdings() -> None: + assert gains_offset_delta_activity( + holdings=[], total_real_contribution=Decimal("0"), + prior_offset_cumulative=Decimal("0"), + as_of=datetime(2026, 5, 17, tzinfo=UTC), + ) is None