fidelity: push per-fund manual snapshot instead of gains-offset DEPOSIT

PlanViewer's DisplayValuation.action JSON already gives us current
fund units + unit price; we were parsing it and throwing it away,
emitting only a single 'unrealised-gains-offset' DEPOSIT to make
Wealthfolio's totals match the dashboard. That hack double-counted
the gain as a cash contribution, hiding £35k of pension growth from
every contribution/growth/ROI panel.

New flow:
- FidelityPlanViewerProvider exposes last_holdings + last_total_contribution
  after fetch() drains.
- fidelity-ingest CLI converts to a ManualSnapshotPayload (cost basis
  allocated proportionally by current fund value share) and posts to
  WF /api/v1/snapshots/import. WF auto-creates unknown fund symbols
  with kind=INVESTMENT, quoteMode=MANUAL, quoteCcy=GBP.
- The gains-offset emission is removed entirely. Historical offset
  rows already in WF are corrected at the dashboard layer by the
  dav_corrected view shipped in infra@2841347e.

WealthfolioSink gains push_manual_snapshots() + ManualSnapshotPayload /
SnapshotPosition wire types. 11 sink tests (3 new) + 9 fidelity
provider tests (2 changed, 1 new) all green; mypy + ruff clean.
This commit is contained in:
Viktor Barzin 2026-05-16 13:56:25 +00:00
parent 5adc4a7ba4
commit cb159e17d9
5 changed files with 339 additions and 60 deletions

View file

@ -438,6 +438,15 @@ def fidelity_ingest(
sys.exit(2)
async def _run() -> None:
from datetime import date as _date_t
from broker_sync.providers.fidelity_planviewer import (
ACCOUNT_ID as FID_ACCOUNT_ID,
)
from broker_sync.providers.fidelity_planviewer import (
fidelity_holdings_to_snapshot,
)
sink = WealthfolioSink(
base_url=wf_base_url,
username=wf_username,
@ -455,12 +464,31 @@ def fidelity_ingest(
result = await sync_provider_to_wealthfolio(
provider=provider, sink=sink, dedup=dedup, since=since,
)
# PlanViewer has no historical per-fund unit-price feed, so
# the Activity stream above only carries cash flows. The
# current-pot fund positions captured in the same scrape get
# pushed via /api/v1/snapshots/import so per-fund quantity +
# cost basis land in WF (and propagate to the wealth
# dashboard's Positions table via pg-sync).
snapshot_imported = 0
if provider.last_holdings:
snapshot = fidelity_holdings_to_snapshot(
holdings=provider.last_holdings,
total_real_contribution=provider.last_total_contribution,
as_of=_date_t.today(),
)
if snapshot is not None:
push_result = await sink.push_manual_snapshots(
account_id=FID_ACCOUNT_ID, snapshots=[snapshot],
)
snapshot_imported = int(push_result.get("snapshotsImported", 0))
finally:
await sink.close()
typer.echo(f"fidelity-ingest: fetched={result.fetched} "
f"new={result.new_after_dedup} "
f"imported={result.imported} "
f"failed={result.failed}")
f"failed={result.failed} "
f"snapshots={snapshot_imported}")
if result.failed > 0:
sys.exit(1)

View file

@ -16,21 +16,28 @@ We keep a Playwright-maintained session via ``storage_state.json``:
fund holdings. On 401/idle-timeout we raise
:class:`FidelitySessionError` so Prometheus alerts Viktor to re-seed.
## Emitted Activity shape
## Emitted Activity / snapshot shape
- One ``DEPOSIT`` per cash-impacting transaction (Regular Premium, Single
Premium, rebate, etc.). ``external_id = fidelity:tx:<sha256[:16]>``.
- One synthetic ``DEPOSIT`` for unrealised gains so WF's Net Worth matches
the Fidelity dashboard. ``external_id =
fidelity:gains:<YYYY-MM-DD>``.
- Bulk Switches / Fund Switches are skipped (no cash movement).
- After the activity stream drains, the ``fidelity-ingest`` CLI calls
``WealthfolioSink.push_manual_snapshots`` with one ``ManualSnapshotPayload``
per fund holding (today's date, units + cost basis allocated
proportionally to fund value share). This sets per-fund quantity and
cost basis in WF so the dashboard Positions table shows the pension
funds alongside the brokerage assets.
- The old synthetic ``fidelity:gains:<date>`` DEPOSIT is no longer
emitted the snapshot supersedes it. Old offset rows that landed
before this change are corrected at the dashboard layer by the
``dav_corrected`` PG view (``infra/stacks/wealthfolio/main.tf``).
"""
from __future__ import annotations
import contextlib
import logging
from collections.abc import AsyncIterator
from datetime import UTC, datetime
from datetime import date, datetime
from decimal import Decimal
from pathlib import Path
from typing import Any, NamedTuple
@ -42,6 +49,7 @@ from broker_sync.providers.parsers.fidelity import (
parse_transactions_html,
parse_valuation_json,
)
from broker_sync.sinks.wealthfolio import ManualSnapshotPayload, SnapshotPosition
log = logging.getLogger(__name__)
@ -86,37 +94,6 @@ def _tx_to_activity(tx: FidelityCashTx) -> Activity:
)
def _gains_offset_activity(
holdings: list[FidelityHolding],
transactions: list[FidelityCashTx],
as_of: datetime,
) -> Activity | None:
"""Create a synthetic DEPOSIT/WITHDRAWAL so WF Net Worth matches the
Fidelity dashboard's reported pot value.
The offset carries a date-derived external_id so monthly runs refresh
the same synthetic entry rather than stacking duplicates.
"""
if not holdings:
return None
total_value = sum((h.total_value for h in holdings), Decimal(0))
total_contrib = sum((t.amount for t in transactions), Decimal(0))
gains = total_value - total_contrib
if gains == 0:
return None
return Activity(
external_id=f"fidelity:gains:{as_of.date().isoformat()}",
account_id=ACCOUNT_ID,
account_type=AccountType.WORKPLACE_PENSION,
date=as_of,
activity_type=ActivityType.DEPOSIT if gains > 0 else ActivityType.WITHDRAWAL,
currency=_CCY,
amount=abs(gains),
notes=(f"fidelity-planviewer:unrealised-gains-offset "
f"(pot=£{total_value}, contrib=£{total_contrib})"),
)
class FidelityPlanViewerProvider:
"""Read-only provider against Fidelity UK PlanViewer.
@ -125,11 +102,18 @@ class FidelityPlanViewerProvider:
- ``fetch(since, before)`` opens a Playwright session with the saved
storage_state, navigates to the transaction-history page with a wide
date range, scrapes the table, and intercepts the valuation XHR.
- After ``fetch()`` completes, ``last_holdings`` holds the per-fund
unit positions captured in the same scrape used by the
``fidelity-ingest`` CLI to push a manual snapshot to Wealthfolio
so per-fund quantities + cost basis land in the Positions table
(the activity stream alone only carries cash flows).
"""
name = "fidelity-planviewer"
def __init__(self, creds: FidelityCreds) -> None:
self._creds = creds
self.last_holdings: list[FidelityHolding] = []
self.last_total_contribution: Decimal = Decimal(0)
def accounts(self) -> list[Account]:
return [
@ -162,19 +146,72 @@ class FidelityPlanViewerProvider:
log.info("fidelity: parsed %d transactions, %d holdings",
len(transactions), len(holdings))
# Snapshot the per-fund holdings for the CLI to push as a manual
# holdings_snapshot after this generator drains. Wealthfolio's
# activity model can't represent pension fund unit purchases (no
# per-purchase price feed from PlanViewer), so we record current
# state via /api/v1/snapshots/import instead.
self.last_holdings = holdings
self.last_total_contribution = sum(
(t.amount for t in transactions), Decimal(0)
)
for tx in transactions:
if since is not None and tx.date < since:
continue
if before is not None and tx.date >= before:
continue
yield _tx_to_activity(tx)
# NB: the gains-offset DEPOSIT we used to emit here is superseded
# by the manual snapshot push the CLI does after fetch() drains.
# The snapshot sets per-fund quantity + cost basis directly, so
# Wealthfolio computes growth from positions instead of needing a
# fake cash entry. Old offset rows still in WF are corrected at
# the dashboard layer by the dav_corrected view.
# The gains offset is always "as of now" so it reflects today's pot.
# Only emit when the caller isn't windowing (full state).
if since is None and before is None:
offset = _gains_offset_activity(holdings, transactions, datetime.now(UTC))
if offset is not None:
yield offset
def fidelity_holdings_to_snapshot(
holdings: list[FidelityHolding],
total_real_contribution: Decimal,
as_of: date,
) -> ManualSnapshotPayload | None:
"""Convert scraped holdings into a Wealthfolio manual snapshot payload.
Cost-basis allocation: PlanViewer doesn't expose historical purchase
prices for individual fund unit buys, so we approximate per-fund
cost basis by allocating the cumulative cash contribution
proportionally to each fund's share of the current pot value. For
the typical single-fund Meta scheme this is exact; if Viktor's plan
later splits into multiple funds the proportional split is the
least-wrong allocation we can compute from monthly snapshots.
cashBalances is set to zero pension contributions flow straight
into funds, the synthetic Wealthfolio "cash balance" only existed
because of the old gains-offset DEPOSIT hack.
"""
if not holdings:
return None
total_value = sum((h.total_value for h in holdings), Decimal(0))
if total_value <= 0:
return None
positions: list[SnapshotPosition] = []
for h in holdings:
share = h.total_value / total_value
cost = (total_real_contribution * share).quantize(Decimal("0.01"))
avg_cost = (cost / h.units).quantize(Decimal("0.0001")) if h.units > 0 else Decimal(0)
positions.append(SnapshotPosition(
symbol=h.fund_code,
quantity=h.units,
average_cost=avg_cost,
total_cost_basis=cost,
currency=h.currency,
))
return ManualSnapshotPayload(
date=as_of,
currency=_CCY,
positions=positions,
cash_balances={_CCY: Decimal(0)},
)
async def _scrape_live_session(

View file

@ -2,7 +2,9 @@ from __future__ import annotations
import json
from collections.abc import Iterable
from datetime import UTC
from dataclasses import dataclass
from datetime import UTC, date
from decimal import Decimal
from pathlib import Path
from typing import Any
@ -14,6 +16,7 @@ _LOGIN_PATH = "/api/v1/auth/login"
_ACCOUNTS_PATH = "/api/v1/accounts"
_IMPORT_CHECK = "/api/v1/activities/import/check"
_IMPORT_REAL = "/api/v1/activities/import"
_SNAPSHOTS_IMPORT = "/api/v1/snapshots/import"
class WealthfolioError(Exception):
@ -262,3 +265,83 @@ class WealthfolioSink:
f"First warning: {first_warn}")
assert isinstance(got, list)
return [r for r in got if isinstance(r, dict)]
# -- manual holdings snapshots --
async def push_manual_snapshots(
self,
account_id: str,
snapshots: list[ManualSnapshotPayload],
) -> dict[str, Any]:
"""Push manual holdings snapshots to /api/v1/snapshots/import.
Each snapshot carries a date + per-fund positions + cash balances.
Wealthfolio auto-creates any unknown asset symbol with
``kind=INVESTMENT, quoteMode=MANUAL, quoteCcy=<currency>`` and uses
the snapshot to derive holdings + valuation for that date bypassing
the activity-ledger derivation entirely for the targeted day.
Used by the Fidelity provider since PlanViewer exposes current
fund units + price but no per-trade history. Re-imports for the
same (account, date) overwrite in place.
"""
if not snapshots:
return {"snapshotsImported": 0, "snapshotsFailed": 0, "errors": []}
body = {
"accountId": account_id,
"snapshots": [_snapshot_to_payload(s) for s in snapshots],
}
resp = await self._request("POST", _SNAPSHOTS_IMPORT, json=body)
if resp.status_code >= 400:
try:
payload = resp.json()
except Exception:
payload = {"raw": resp.text}
raise WealthfolioError(
f"Wealthfolio /snapshots/import rejected: {payload}")
result = resp.json()
assert isinstance(result, dict)
failed = int(result.get("snapshotsFailed", 0))
if failed > 0:
raise WealthfolioError(
f"Wealthfolio /snapshots/import: {failed} snapshot(s) failed; "
f"errors={result.get('errors')}")
return result
@dataclass(frozen=True)
class SnapshotPosition:
"""A per-fund position row in a Wealthfolio manual snapshot."""
symbol: str
quantity: Decimal
average_cost: Decimal
total_cost_basis: Decimal
currency: str
@dataclass(frozen=True)
class ManualSnapshotPayload:
"""Sink-facing snapshot row. Mirrors the JSON shape WF expects."""
date: date
currency: str
positions: list[SnapshotPosition]
cash_balances: dict[str, Decimal]
def _snapshot_to_payload(s: ManualSnapshotPayload) -> dict[str, Any]:
"""Serialise a ManualSnapshotPayload into WF's import wire format."""
return {
"date": s.date.isoformat(),
"currency": s.currency,
"positions": [
{
"symbol": p.symbol,
"quantity": format(p.quantity, "f"),
"averageCost": format(p.average_cost, "f"),
"totalCostBasis": format(p.total_cost_basis, "f"),
"currency": p.currency,
}
for p in s.positions
],
"cashBalances": {k: format(v, "f") for k, v in s.cash_balances.items()},
}

View file

@ -1,19 +1,19 @@
from __future__ import annotations
import json
from datetime import UTC, datetime
from datetime import UTC, date, datetime
from decimal import Decimal
from pathlib import Path
import pytest
from broker_sync.models import Account, AccountType, ActivityType
from broker_sync.models import Account, AccountType
from broker_sync.providers.fidelity_planviewer import (
ACCOUNT_ID,
FidelityCreds,
FidelityPlanViewerProvider,
FidelityProviderConfigError,
_gains_offset_activity,
fidelity_holdings_to_snapshot,
)
from broker_sync.providers.parsers.fidelity import (
parse_transactions_html,
@ -96,21 +96,53 @@ def test_parse_valuation_fixture() -> None:
assert set(h.units_by_source.keys()) >= {"SASC", "ERXS"}
def test_gains_offset_emits_deposit_when_pot_exceeds_contributions() -> None:
def test_holdings_to_snapshot_real_fixture() -> None:
html = (_FIXTURES / "transactions-full.html").read_text()
valuation = json.loads((_FIXTURES / "valuation.json").read_text())
txs = parse_transactions_html(html)
holdings = parse_valuation_json(valuation)
as_of = datetime(2026, 4, 18, tzinfo=UTC)
offset = _gains_offset_activity(holdings, txs, as_of)
assert offset is not None
assert offset.activity_type in (ActivityType.DEPOSIT, ActivityType.WITHDRAWAL)
assert offset.amount is not None and offset.amount > 0
assert offset.external_id == "fidelity:gains:2026-04-18"
total_contrib = sum((tx.amount for tx in parse_transactions_html(html)),
Decimal(0))
snapshot = fidelity_holdings_to_snapshot(
holdings=holdings,
total_real_contribution=total_contrib,
as_of=date(2026, 4, 18),
)
assert snapshot is not None
assert snapshot.date == date(2026, 4, 18)
assert snapshot.currency == "GBP"
# Cost basis sums to the cash contributions (allocated by fund value share)
sum_cost = sum((p.total_cost_basis for p in snapshot.positions), Decimal(0))
assert abs(sum_cost - total_contrib) < Decimal("1")
# Meta scheme had KDOA + LAFC + one other at fixture time; the
# dominant fund must be KDOA.
symbols = [p.symbol for p in snapshot.positions]
assert "KDOA" in symbols
kdoa = next(p for p in snapshot.positions if p.symbol == "KDOA")
assert kdoa.quantity > 0
# Proportional cost-basis allocation: KDOA holds nearly the whole pot
# so it should get the lion's share of cost
kdoa_share = kdoa.total_cost_basis / sum_cost
assert kdoa_share > Decimal("0.9")
# cashBalances zero — pension contributions flow straight into funds
assert snapshot.cash_balances == {"GBP": Decimal(0)}
def test_gains_offset_none_when_no_holdings() -> None:
assert _gains_offset_activity(
holdings=[], transactions=[],
as_of=datetime(2026, 4, 18, tzinfo=UTC),
def test_holdings_to_snapshot_none_when_no_holdings() -> None:
assert fidelity_holdings_to_snapshot(
holdings=[], total_real_contribution=Decimal("100"),
as_of=date(2026, 4, 18),
) is None
def test_provider_caches_holdings_for_cli_snapshot_push() -> None:
"""The CLI reads `last_holdings` after fetch() drains to push the
manual snapshot. This guards the contract that fetch() populates the
attribute even when no Activity is yielded (e.g., backfill window
cut-off)."""
prov = FidelityPlanViewerProvider(FidelityCreds(
storage_state_path="/tmp/x", plan_id="META",
))
# Pre-fetch state: empty
assert prov.last_holdings == []
assert prov.last_total_contribution == Decimal(0)

View file

@ -1,7 +1,7 @@
from __future__ import annotations
import json
from datetime import UTC, datetime
from datetime import UTC, date, datetime
from decimal import Decimal
from pathlib import Path
from typing import Any
@ -12,6 +12,9 @@ import pytest
from broker_sync.models import Account, AccountType, Activity, ActivityType
from broker_sync.sinks.wealthfolio import (
ImportValidationError,
ManualSnapshotPayload,
SnapshotPosition,
WealthfolioError,
WealthfolioSink,
WealthfolioUnauthorizedError,
)
@ -274,3 +277,99 @@ async def test_import_halts_on_validation_failure(tmp_path: Path) -> None:
with pytest.raises(ImportValidationError, match="unknown symbol"):
await sink.import_activities([_buy()])
assert calls == ["/api/v1/activities/import/check"] # real import never hit
# -- Manual snapshot import (Fidelity path) --
@pytest.mark.asyncio
async def test_push_manual_snapshots_serialises_decimals_and_calls_endpoint(
tmp_path: Path,
) -> None:
sp = tmp_path / "s.json"
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
seen: dict[str, Any] = {}
async def handler(req: httpx.Request) -> httpx.Response:
if req.url.path == "/api/v1/snapshots/import":
seen["body"] = json.loads(req.content)
return httpx.Response(
200,
json={"snapshotsImported": 1, "snapshotsFailed": 0, "errors": []},
)
return httpx.Response(404)
sink = _client(httpx.MockTransport(handler), sp)
snapshot = ManualSnapshotPayload(
date=date(2026, 5, 16),
currency="GBP",
positions=[
SnapshotPosition(
symbol="KDOA",
quantity=Decimal("4200.5"),
average_cost=Decimal("24.29"),
total_cost_basis=Decimal("102004.15"),
currency="GBP",
),
],
cash_balances={"GBP": Decimal(0)},
)
result = await sink.push_manual_snapshots(
account_id="a7d6208d-2bd6-4f85-bf54-b77984c78234",
snapshots=[snapshot],
)
assert result["snapshotsImported"] == 1
# Wire format: numeric fields are STRINGS (Decimal.__format__('f'))
body = seen["body"]
assert body["accountId"] == "a7d6208d-2bd6-4f85-bf54-b77984c78234"
pos = body["snapshots"][0]["positions"][0]
assert pos == {
"symbol": "KDOA",
"quantity": "4200.5",
"averageCost": "24.29",
"totalCostBasis": "102004.15",
"currency": "GBP",
}
assert body["snapshots"][0]["cashBalances"] == {"GBP": "0"}
@pytest.mark.asyncio
async def test_push_manual_snapshots_raises_on_partial_failure(
tmp_path: Path,
) -> None:
sp = tmp_path / "s.json"
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
async def handler(req: httpx.Request) -> httpx.Response:
return httpx.Response(
200,
json={
"snapshotsImported": 0,
"snapshotsFailed": 1,
"errors": [{"row": 0, "msg": "bad symbol"}],
},
)
sink = _client(httpx.MockTransport(handler), sp)
snapshot = ManualSnapshotPayload(
date=date(2026, 5, 16), currency="GBP",
positions=[], cash_balances={},
)
with pytest.raises(WealthfolioError, match="bad symbol"):
await sink.push_manual_snapshots(account_id="acct", snapshots=[snapshot])
@pytest.mark.asyncio
async def test_push_manual_snapshots_short_circuits_on_empty(
tmp_path: Path,
) -> None:
sp = tmp_path / "s.json"
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
async def handler(req: httpx.Request) -> httpx.Response:
raise AssertionError(f"unexpected request: {req.method} {req.url.path}")
sink = _client(httpx.MockTransport(handler), sp)
result = await sink.push_manual_snapshots(account_id="acct", snapshots=[])
assert result["snapshotsImported"] == 0