broker-sync/broker_sync/providers/fidelity_planviewer.py
Viktor Barzin 98c4729622
Some checks failed
ci/woodpecker/push/build Pipeline failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled
CI / deploy (push) Has been cancelled
fidelity: replace snapshot-push with delta gains-offset DEPOSITs
Per-fund snapshot import landed quantities but dropped cost basis +
needed a separate quote-push path we never identified. Snapshotting
also collided with WF's own TOTAL aggregation and ZEROED the Fidelity
cash balance.

Simpler plan: each monthly scrape emits a single DEPOSIT (or
WITHDRAWAL on a market drop) sized to the delta between the live
PlanViewer pot value and Wealthfolio's running total. dav_corrected
PG view continues to subtract these offsets from net_contribution so
the dashboard Growth/ROI math stays right.

- New gains_offset_delta_activity() — current_gain - prior_offset.
- New WealthfolioSink.cumulative_amount_with_notes_prefix() — sums
  the existing fidelity-planviewer:unrealised-gains-offset DEPOSITs
  in WF so we know what's already been emitted.
- CLI runs sync_provider_to_wealthfolio first (cash flows), then
  computes + emits the delta via import_activities.
- 4 new provider tests for the delta logic; full suite (144 + 1
  skipped) green; mypy + ruff clean.

The old fidelity_holdings_to_snapshot helper + push_manual_snapshots
sink method stay for future use but are no longer called.
2026-05-17 00:35:17 +00:00

331 lines
14 KiB
Python

"""Fidelity UK PlanViewer provider — workplace pension backfill + monthly sync.
PlanViewer has no public individual-member API. The SPA (at
``pv.planviewer.fidelity.co.uk``) and the legacy HTML app (at
``www.planviewer.fidelity.co.uk``) share session cookies via PingFederate
OAuth at ``id.fidelity.co.uk``.
We keep a Playwright-maintained session via ``storage_state.json``:
1. **One-off seed** (``broker-sync fidelity-seed``): Viktor runs a headed
Chromium, logs in (password + memorable word + SMS MFA), clicks
"Remember device". The storage_state is persisted to Vault.
2. **Monthly cron**: loads storage_state, boots headless Chromium, navigates
to the transaction-history page with a wide date range, parses the HTML
table, and intercepts the ``DisplayValuation`` XHR for the current
fund holdings. On 401/idle-timeout we raise
:class:`FidelitySessionError` so Prometheus alerts Viktor to re-seed.
## Emitted Activity / snapshot shape
- One ``DEPOSIT`` per cash-impacting transaction (Regular Premium, Single
Premium, rebate, etc.). ``external_id = fidelity:tx:<sha256[:16]>``.
- Bulk Switches / Fund Switches are skipped (no cash movement).
- After the activity stream drains, the ``fidelity-ingest`` CLI calls
``WealthfolioSink.push_manual_snapshots`` with one ``ManualSnapshotPayload``
per fund holding (today's date, units + cost basis allocated
proportionally to fund value share). This sets per-fund quantity and
cost basis in WF so the dashboard Positions table shows the pension
funds alongside the brokerage assets.
- The old synthetic ``fidelity:gains:<date>`` DEPOSIT is no longer
emitted — the snapshot supersedes it. Old offset rows that landed
before this change are corrected at the dashboard layer by the
``dav_corrected`` PG view (``infra/stacks/wealthfolio/main.tf``).
"""
from __future__ import annotations
import contextlib
import logging
from collections.abc import AsyncIterator
from datetime import date, datetime
from decimal import Decimal
from pathlib import Path
from typing import Any, NamedTuple
from broker_sync.models import Account, AccountType, Activity, ActivityType
from broker_sync.providers.parsers.fidelity import (
FidelityCashTx,
FidelityHolding,
parse_transactions_html,
parse_valuation_json,
)
from broker_sync.sinks.wealthfolio import ManualSnapshotPayload, SnapshotPosition
log = logging.getLogger(__name__)
ACCOUNT_ID = "fidelity-workplace-pension"
_CCY = "GBP"
_PV_BASE = "https://www.planviewer.fidelity.co.uk"
_PV_TX_PATH = "/planviewer/DisplayMyPlanMemberTransHist.action"
_PV_VALUATION_PATH = "/planviewer/DisplayValuation.action"
_PV_LANDING = "https://www.planviewer.fidelity.co.uk/"
# A wide backfill cap; scheme can't predate 1990.
_BACKFILL_START = "01 Jan 1990"
class FidelityCreds(NamedTuple):
"""Paths needed to run the provider."""
storage_state_path: str
plan_id: str
headless: bool = True
class FidelitySessionError(Exception):
"""Raised when PlanViewer rejects the saved session — re-seed required."""
class FidelityProviderConfigError(Exception):
"""Raised when provider config is missing or obviously wrong."""
def _tx_to_activity(tx: FidelityCashTx) -> Activity:
"""Map a Fidelity cash transaction to a canonical DEPOSIT."""
return Activity(
external_id=tx.external_id,
account_id=ACCOUNT_ID,
account_type=AccountType.WORKPLACE_PENSION,
date=tx.date,
activity_type=ActivityType.DEPOSIT,
currency=_CCY,
amount=tx.amount,
notes=f"fidelity-planviewer:{tx.tx_type}",
)
class FidelityPlanViewerProvider:
"""Read-only provider against Fidelity UK PlanViewer.
Lifecycle:
- ``accounts()`` advertises the single WF workplace-pension account.
- ``fetch(since, before)`` opens a Playwright session with the saved
storage_state, navigates to the transaction-history page with a wide
date range, scrapes the table, and intercepts the valuation XHR.
- After ``fetch()`` completes, ``last_holdings`` holds the per-fund
unit positions and ``last_total_contribution`` the cumulative cash
contribution — used by the ``fidelity-ingest`` CLI to emit a
delta-shaped DEPOSIT that nudges WF's net worth to match the
PlanViewer reported pot value (see ``gains_offset_delta_activity``).
"""
name = "fidelity-planviewer"
def __init__(self, creds: FidelityCreds) -> None:
self._creds = creds
self.last_holdings: list[FidelityHolding] = []
self.last_total_contribution: Decimal = Decimal(0)
def accounts(self) -> list[Account]:
return [
Account(
id=ACCOUNT_ID,
name="Fidelity UK Pension",
account_type=AccountType.WORKPLACE_PENSION,
currency=_CCY,
provider=self.name,
),
]
async def fetch(
self,
*,
since: datetime | None = None,
before: datetime | None = None,
) -> AsyncIterator[Activity]:
state_path = self._creds.storage_state_path
if not Path(state_path).exists():
raise FidelityProviderConfigError(
f"storage_state not found at {state_path}"
"run `broker-sync fidelity-seed` first")
tx_html, valuation_json = await _scrape_live_session(
state_path=state_path, headless=self._creds.headless,
)
transactions = parse_transactions_html(tx_html)
holdings = parse_valuation_json(valuation_json)
log.info("fidelity: parsed %d transactions, %d holdings",
len(transactions), len(holdings))
# Snapshot the per-fund holdings for the CLI to push as a manual
# holdings_snapshot after this generator drains. Wealthfolio's
# activity model can't represent pension fund unit purchases (no
# per-purchase price feed from PlanViewer), so we record current
# state via /api/v1/snapshots/import instead.
self.last_holdings = holdings
self.last_total_contribution = sum(
(t.amount for t in transactions), Decimal(0)
)
for tx in transactions:
if since is not None and tx.date < since:
continue
if before is not None and tx.date >= before:
continue
yield _tx_to_activity(tx)
# Gains-offset DEPOSITs are emitted by the CLI (which has the
# prior cumulative offset from WF). See `gains_offset_delta_activity`.
def gains_offset_delta_activity(
holdings: list[FidelityHolding],
total_real_contribution: Decimal,
prior_offset_cumulative: Decimal,
as_of: datetime,
min_delta: Decimal = Decimal("0.5"),
) -> Activity | None:
"""Compute the gains-offset DELTA since the last scrape and shape it
as a DEPOSIT (or WITHDRAWAL on a market drop).
The pension's per-fund prices aren't trackable in WF directly (no
public quote feed for these institutional life-fund share classes).
Instead, each monthly scrape emits a single small DEPOSIT/WITHDRAWAL
sized to ``(current_pot - real_contributions) - prior_cumulative_offset``
— i.e., the growth (or loss) accrued since the last run.
Wealthfolio's net_contribution then incorrectly includes all these
offsets; the ``dav_corrected`` PG view subtracts them back out so the
dashboard's Growth/ROI panels remain accurate. The deterministic
external_id (per scrape date) lets re-runs of the same day overwrite
rather than stack duplicates.
"""
if not holdings:
return None
current_pot = sum((h.total_value for h in holdings), Decimal(0))
current_gain = current_pot - total_real_contribution
delta = current_gain - prior_offset_cumulative
if abs(delta) < min_delta:
return None
return Activity(
external_id=f"fidelity:gains-delta:{as_of.date().isoformat()}",
account_id=ACCOUNT_ID,
account_type=AccountType.WORKPLACE_PENSION,
date=as_of,
activity_type=ActivityType.DEPOSIT if delta > 0 else ActivityType.WITHDRAWAL,
currency=_CCY,
amount=abs(delta),
notes=(
f"fidelity-planviewer:unrealised-gains-offset delta=£{delta} "
f"(pot=£{current_pot}, contrib=£{total_real_contribution}, "
f"prior_offset=£{prior_offset_cumulative})"
),
)
def fidelity_holdings_to_snapshot(
holdings: list[FidelityHolding],
total_real_contribution: Decimal,
as_of: date,
) -> ManualSnapshotPayload | None:
"""Convert scraped holdings into a Wealthfolio manual snapshot payload.
Cost-basis allocation: PlanViewer doesn't expose historical purchase
prices for individual fund unit buys, so we approximate per-fund
cost basis by allocating the cumulative cash contribution
proportionally to each fund's share of the current pot value. For
the typical single-fund Meta scheme this is exact; if Viktor's plan
later splits into multiple funds the proportional split is the
least-wrong allocation we can compute from monthly snapshots.
cashBalances is set to zero — pension contributions flow straight
into funds, the synthetic Wealthfolio "cash balance" only existed
because of the old gains-offset DEPOSIT hack.
"""
if not holdings:
return None
total_value = sum((h.total_value for h in holdings), Decimal(0))
if total_value <= 0:
return None
positions: list[SnapshotPosition] = []
for h in holdings:
share = h.total_value / total_value
cost = (total_real_contribution * share).quantize(Decimal("0.01"))
avg_cost = (cost / h.units).quantize(Decimal("0.0001")) if h.units > 0 else Decimal(0)
positions.append(SnapshotPosition(
symbol=h.fund_code,
quantity=h.units,
average_cost=avg_cost,
total_cost_basis=cost,
currency=h.currency,
))
return ManualSnapshotPayload(
date=as_of,
currency=_CCY,
positions=positions,
cash_balances={_CCY: Decimal(0)},
)
async def _scrape_live_session(
*,
state_path: str,
headless: bool,
) -> tuple[str, dict[str, Any]]:
"""Load storage_state, navigate the transaction + valuation pages,
return (transactions HTML, valuation JSON payload).
Raises :class:`FidelitySessionError` if the session is dead (15-min idle,
cookie expiry, etc.) — Viktor must re-seed.
"""
from playwright.async_api import async_playwright
captured_valuation: dict[str, dict[str, Any]] = {}
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=headless)
try:
ctx = await browser.new_context(
storage_state=state_path,
user_agent=("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/147.0.0.0 Safari/537.36"),
viewport={"width": 1280, "height": 900},
)
page = await ctx.new_page()
async def on_response(resp: Any) -> None:
if _PV_VALUATION_PATH in resp.url and resp.status < 400:
with contextlib.suppress(Exception):
captured_valuation["payload"] = await resp.json()
page.on("response", on_response)
# Trigger session + capture valuation by navigating through landing
# → main page. The SPA fires DisplayValuation on the main page.
await page.goto(_PV_LANDING, wait_until="networkidle", timeout=30000)
await page.wait_for_timeout(2000)
main_url = f"{_PV_BASE}/planviewer/DisplayMainPage.action"
await page.goto(main_url, wait_until="networkidle", timeout=30000)
await page.wait_for_timeout(3000)
if "idle for more than 15 minutes" in (await page.content()) \
or "id.fidelity.co.uk" in page.url:
raise FidelitySessionError(
"PlanViewer session stale — run `broker-sync fidelity-seed`")
# Now pull the transactions page with a wide date range.
await page.goto(f"{_PV_BASE}{_PV_TX_PATH}",
wait_until="networkidle", timeout=30000)
await page.wait_for_timeout(1500)
await page.fill('input[name="startDate"]', _BACKFILL_START)
today = await page.evaluate(
"new Date().toLocaleDateString('en-GB',"
"{day:'2-digit',month:'short',year:'numeric'}).replace(/,/g,'')")
await page.fill('input[name="endDate"]', today)
await page.focus('input[name="endDate"]')
await page.keyboard.press("Enter")
with contextlib.suppress(Exception):
await page.wait_for_load_state("networkidle", timeout=15000)
await page.wait_for_timeout(2000)
tx_html = await page.content()
# If valuation wasn't picked up on the main page, request directly.
if "payload" not in captured_valuation:
r = await page.request.get(f"{_PV_BASE}{_PV_VALUATION_PATH}")
if r.ok:
with contextlib.suppress(Exception):
captured_valuation["payload"] = await r.json()
# Roll the storage_state so the next run benefits from any refresh.
await ctx.storage_state(path=state_path)
finally:
await browser.close()
valuation: dict[str, Any] = captured_valuation.get("payload") or {}
return tx_html, valuation