From 98c47296228ed9b5f25c3a8eedf411677aa05979 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 17 May 2026 00:35:17 +0000 Subject: [PATCH] fidelity: replace snapshot-push with delta gains-offset DEPOSITs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-fund snapshot import landed quantities but dropped cost basis + needed a separate quote-push path we never identified. Snapshotting also collided with WF's own TOTAL aggregation and ZEROED the Fidelity cash balance. Simpler plan: each monthly scrape emits a single DEPOSIT (or WITHDRAWAL on a market drop) sized to the delta between the live PlanViewer pot value and Wealthfolio's running total. dav_corrected PG view continues to subtract these offsets from net_contribution so the dashboard Growth/ROI math stays right. - New gains_offset_delta_activity() — current_gain - prior_offset. - New WealthfolioSink.cumulative_amount_with_notes_prefix() — sums the existing fidelity-planviewer:unrealised-gains-offset DEPOSITs in WF so we know what's already been emitted. - CLI runs sync_provider_to_wealthfolio first (cash flows), then computes + emits the delta via import_activities. - 4 new provider tests for the delta logic; full suite (144 + 1 skipped) green; mypy + ruff clean. The old fidelity_holdings_to_snapshot helper + push_manual_snapshots sink method stay for future use but are no longer called. --- broker_sync/cli.py | 45 +++++++------- broker_sync/providers/fidelity_planviewer.py | 61 +++++++++++++++---- broker_sync/sinks/wealthfolio.py | 49 +++++++++++++++ tests/providers/test_fidelity_planviewer.py | 63 +++++++++++++++++++- 4 files changed, 183 insertions(+), 35 deletions(-) diff --git a/broker_sync/cli.py b/broker_sync/cli.py index 879c3a2..6e08eb8 100644 --- a/broker_sync/cli.py +++ b/broker_sync/cli.py @@ -438,10 +438,8 @@ def fidelity_ingest( sys.exit(2) async def _run() -> None: - from datetime import date as _date_t - from broker_sync.providers.fidelity_planviewer import ( - fidelity_holdings_to_snapshot, + gains_offset_delta_activity, ) sink = WealthfolioSink( @@ -461,37 +459,36 @@ def fidelity_ingest( result = await sync_provider_to_wealthfolio( provider=provider, sink=sink, dedup=dedup, since=since, ) - # PlanViewer has no historical per-fund unit-price feed, so - # the Activity stream above only carries cash flows. The - # current-pot fund positions captured in the same scrape get - # pushed via /api/v1/snapshots/import so per-fund quantity + - # cost basis land in WF (and propagate to the wealth - # dashboard's Positions table via pg-sync). - snapshot_imported = 0 + # PlanViewer doesn't expose per-fund unit prices in any feed + # WF can consume, so the only way to keep WF's pension total in + # line with the live PlanViewer pot value is to emit a small + # DEPOSIT (or WITHDRAWAL on a market drop) each run sized to + # the growth since the last scrape. The dav_corrected PG view + # subtracts these offsets from net_contribution so the + # dashboard's Growth/ROI panels stay accurate. + gains_delta_emitted = 0 if provider.last_holdings: - snapshot = fidelity_holdings_to_snapshot( + wf_account_id = await sink.ensure_account(provider.accounts()[0]) + prior_offset = await sink.cumulative_amount_with_notes_prefix( + account_id=wf_account_id, + notes_prefix="fidelity-planviewer:unrealised-gains-offset", + ) + delta = gains_offset_delta_activity( holdings=provider.last_holdings, total_real_contribution=provider.last_total_contribution, - as_of=_date_t.today(), + prior_offset_cumulative=prior_offset, + as_of=datetime.now(UTC), ) - if snapshot is not None: - # /api/v1/snapshots/import wants WF's own account UUID, - # not our logical provider id — look it up via the same - # match the pipeline used (provider+providerAccountId). - wf_account_id = await sink.ensure_account( - provider.accounts()[0], - ) - push_result = await sink.push_manual_snapshots( - account_id=wf_account_id, snapshots=[snapshot], - ) - snapshot_imported = int(push_result.get("snapshotsImported", 0)) + if delta is not None: + await sink.import_activities([delta]) + gains_delta_emitted = 1 finally: await sink.close() typer.echo(f"fidelity-ingest: fetched={result.fetched} " f"new={result.new_after_dedup} " f"imported={result.imported} " f"failed={result.failed} " - f"snapshots={snapshot_imported}") + f"gains_delta={gains_delta_emitted}") if result.failed > 0: sys.exit(1) diff --git a/broker_sync/providers/fidelity_planviewer.py b/broker_sync/providers/fidelity_planviewer.py index 4658dcf..b5b4e33 100644 --- a/broker_sync/providers/fidelity_planviewer.py +++ b/broker_sync/providers/fidelity_planviewer.py @@ -103,10 +103,10 @@ class FidelityPlanViewerProvider: storage_state, navigates to the transaction-history page with a wide date range, scrapes the table, and intercepts the valuation XHR. - After ``fetch()`` completes, ``last_holdings`` holds the per-fund - unit positions captured in the same scrape — used by the - ``fidelity-ingest`` CLI to push a manual snapshot to Wealthfolio - so per-fund quantities + cost basis land in the Positions table - (the activity stream alone only carries cash flows). + unit positions and ``last_total_contribution`` the cumulative cash + contribution — used by the ``fidelity-ingest`` CLI to emit a + delta-shaped DEPOSIT that nudges WF's net worth to match the + PlanViewer reported pot value (see ``gains_offset_delta_activity``). """ name = "fidelity-planviewer" @@ -162,12 +162,53 @@ class FidelityPlanViewerProvider: if before is not None and tx.date >= before: continue yield _tx_to_activity(tx) - # NB: the gains-offset DEPOSIT we used to emit here is superseded - # by the manual snapshot push the CLI does after fetch() drains. - # The snapshot sets per-fund quantity + cost basis directly, so - # Wealthfolio computes growth from positions instead of needing a - # fake cash entry. Old offset rows still in WF are corrected at - # the dashboard layer by the dav_corrected view. + # Gains-offset DEPOSITs are emitted by the CLI (which has the + # prior cumulative offset from WF). See `gains_offset_delta_activity`. + + +def gains_offset_delta_activity( + holdings: list[FidelityHolding], + total_real_contribution: Decimal, + prior_offset_cumulative: Decimal, + as_of: datetime, + min_delta: Decimal = Decimal("0.5"), +) -> Activity | None: + """Compute the gains-offset DELTA since the last scrape and shape it + as a DEPOSIT (or WITHDRAWAL on a market drop). + + The pension's per-fund prices aren't trackable in WF directly (no + public quote feed for these institutional life-fund share classes). + Instead, each monthly scrape emits a single small DEPOSIT/WITHDRAWAL + sized to ``(current_pot - real_contributions) - prior_cumulative_offset`` + — i.e., the growth (or loss) accrued since the last run. + + Wealthfolio's net_contribution then incorrectly includes all these + offsets; the ``dav_corrected`` PG view subtracts them back out so the + dashboard's Growth/ROI panels remain accurate. The deterministic + external_id (per scrape date) lets re-runs of the same day overwrite + rather than stack duplicates. + """ + if not holdings: + return None + current_pot = sum((h.total_value for h in holdings), Decimal(0)) + current_gain = current_pot - total_real_contribution + delta = current_gain - prior_offset_cumulative + if abs(delta) < min_delta: + return None + return Activity( + external_id=f"fidelity:gains-delta:{as_of.date().isoformat()}", + account_id=ACCOUNT_ID, + account_type=AccountType.WORKPLACE_PENSION, + date=as_of, + activity_type=ActivityType.DEPOSIT if delta > 0 else ActivityType.WITHDRAWAL, + currency=_CCY, + amount=abs(delta), + notes=( + f"fidelity-planviewer:unrealised-gains-offset delta=£{delta} " + f"(pot=£{current_pot}, contrib=£{total_real_contribution}, " + f"prior_offset=£{prior_offset_cumulative})" + ), + ) def fidelity_holdings_to_snapshot( diff --git a/broker_sync/sinks/wealthfolio.py b/broker_sync/sinks/wealthfolio.py index cb6ea45..7144f6f 100644 --- a/broker_sync/sinks/wealthfolio.py +++ b/broker_sync/sinks/wealthfolio.py @@ -17,6 +17,7 @@ _ACCOUNTS_PATH = "/api/v1/accounts" _IMPORT_CHECK = "/api/v1/activities/import/check" _IMPORT_REAL = "/api/v1/activities/import" _SNAPSHOTS_IMPORT = "/api/v1/snapshots/import" +_ACTIVITIES_SEARCH = "/api/v1/activities/search" class WealthfolioError(Exception): @@ -266,6 +267,54 @@ class WealthfolioSink: assert isinstance(got, list) return [r for r in got if isinstance(r, dict)] + # -- activity lookups -- + + async def cumulative_amount_with_notes_prefix( + self, + account_id: str, + notes_prefix: str, + ) -> Decimal: + """Sum the amount of DEPOSIT/WITHDRAWAL activities whose notes start + with ``notes_prefix``, signed (deposits positive, withdrawals negative). + + Used by the Fidelity provider to compute the delta gains-offset: + ``current_gain - cumulative_existing_offset`` becomes the new + DEPOSIT to emit on each monthly run. + """ + try: + resp = await self._request( + "POST", _ACTIVITIES_SEARCH, + json={"accountIds": [account_id], "page": 1, "pageSize": 500}, + ) + except Exception: + return Decimal(0) + if resp.status_code >= 400: + return Decimal(0) + payload = resp.json() + rows = payload.get("data", payload) if isinstance(payload, dict) else payload + if not isinstance(rows, list): + return Decimal(0) + total = Decimal(0) + for r in rows: + if not isinstance(r, dict): + continue + notes = r.get("comment") or r.get("notes") or "" + if not isinstance(notes, str) or not notes.startswith(notes_prefix): + continue + amt_raw = r.get("amount") + if amt_raw is None: + continue + try: + amt = Decimal(str(amt_raw)) + except Exception: + continue + atype = (r.get("activityType") or r.get("activity_type") or "").upper() + if atype == "WITHDRAWAL": + total -= amt + else: + total += amt + return total + # -- manual holdings snapshots -- async def push_manual_snapshots( diff --git a/tests/providers/test_fidelity_planviewer.py b/tests/providers/test_fidelity_planviewer.py index a030ac3..acfccbc 100644 --- a/tests/providers/test_fidelity_planviewer.py +++ b/tests/providers/test_fidelity_planviewer.py @@ -7,13 +7,14 @@ from pathlib import Path import pytest -from broker_sync.models import Account, AccountType +from broker_sync.models import Account, AccountType, ActivityType from broker_sync.providers.fidelity_planviewer import ( ACCOUNT_ID, FidelityCreds, FidelityPlanViewerProvider, FidelityProviderConfigError, fidelity_holdings_to_snapshot, + gains_offset_delta_activity, ) from broker_sync.providers.parsers.fidelity import ( parse_transactions_html, @@ -146,3 +147,63 @@ def test_provider_caches_holdings_for_cli_snapshot_push() -> None: # Pre-fetch state: empty assert prov.last_holdings == [] assert prov.last_total_contribution == Decimal(0) + + +# -- delta-shaped gains offset (the monthly accumulation mechanism) -- + + +def _holdings_summing_to(total: Decimal) -> list: + from broker_sync.providers.parsers.fidelity import FidelityHolding + return [FidelityHolding( + fund_code="KDOA", fund_name="Test", units=Decimal("100"), + unit_price=total / Decimal("100"), currency="GBP", total_value=total, + units_by_source={}, + )] + + +def test_gains_delta_emits_deposit_when_gain_exceeds_prior_offset() -> None: + # pot £145k, real contrib £102k → current gain £43k; prior offset £35k + # → delta = +£8k + activity = gains_offset_delta_activity( + holdings=_holdings_summing_to(Decimal("145000")), + total_real_contribution=Decimal("102000"), + prior_offset_cumulative=Decimal("35000"), + as_of=datetime(2026, 5, 17, tzinfo=UTC), + ) + assert activity is not None + assert activity.activity_type == ActivityType.DEPOSIT + assert activity.amount == Decimal("8000") + assert activity.external_id == "fidelity:gains-delta:2026-05-17" + assert "unrealised-gains-offset" in (activity.notes or "") + + +def test_gains_delta_emits_withdrawal_on_market_drop() -> None: + # pot dropped: current gain £30k, prior offset £35k → delta = -£5k + activity = gains_offset_delta_activity( + holdings=_holdings_summing_to(Decimal("132000")), + total_real_contribution=Decimal("102000"), + prior_offset_cumulative=Decimal("35000"), + as_of=datetime(2026, 5, 17, tzinfo=UTC), + ) + assert activity is not None + assert activity.activity_type == ActivityType.WITHDRAWAL + assert activity.amount == Decimal("5000") + + +def test_gains_delta_suppressed_below_minimum() -> None: + # delta ~£0.20, below the £0.50 min — skip emission to avoid noise. + activity = gains_offset_delta_activity( + holdings=_holdings_summing_to(Decimal("137000.20")), + total_real_contribution=Decimal("102000"), + prior_offset_cumulative=Decimal("35000"), + as_of=datetime(2026, 5, 17, tzinfo=UTC), + ) + assert activity is None + + +def test_gains_delta_none_when_no_holdings() -> None: + assert gains_offset_delta_activity( + holdings=[], total_real_contribution=Decimal("0"), + prior_offset_cumulative=Decimal("0"), + as_of=datetime(2026, 5, 17, tzinfo=UTC), + ) is None