Per-fund snapshot import landed quantities but dropped cost basis + needed a separate quote-push path we never identified. Snapshotting also collided with WF's own TOTAL aggregation and ZEROED the Fidelity cash balance. Simpler plan: each monthly scrape emits a single DEPOSIT (or WITHDRAWAL on a market drop) sized to the delta between the live PlanViewer pot value and Wealthfolio's running total. dav_corrected PG view continues to subtract these offsets from net_contribution so the dashboard Growth/ROI math stays right. - New gains_offset_delta_activity() — current_gain - prior_offset. - New WealthfolioSink.cumulative_amount_with_notes_prefix() — sums the existing fidelity-planviewer:unrealised-gains-offset DEPOSITs in WF so we know what's already been emitted. - CLI runs sync_provider_to_wealthfolio first (cash flows), then computes + emits the delta via import_activities. - 4 new provider tests for the delta logic; full suite (144 + 1 skipped) green; mypy + ruff clean. The old fidelity_holdings_to_snapshot helper + push_manual_snapshots sink method stay for future use but are no longer called.
331 lines
14 KiB
Python
331 lines
14 KiB
Python
"""Fidelity UK PlanViewer provider — workplace pension backfill + monthly sync.
|
|
|
|
PlanViewer has no public individual-member API. The SPA (at
|
|
``pv.planviewer.fidelity.co.uk``) and the legacy HTML app (at
|
|
``www.planviewer.fidelity.co.uk``) share session cookies via PingFederate
|
|
OAuth at ``id.fidelity.co.uk``.
|
|
|
|
We keep a Playwright-maintained session via ``storage_state.json``:
|
|
|
|
1. **One-off seed** (``broker-sync fidelity-seed``): Viktor runs a headed
|
|
Chromium, logs in (password + memorable word + SMS MFA), clicks
|
|
"Remember device". The storage_state is persisted to Vault.
|
|
2. **Monthly cron**: loads storage_state, boots headless Chromium, navigates
|
|
to the transaction-history page with a wide date range, parses the HTML
|
|
table, and intercepts the ``DisplayValuation`` XHR for the current
|
|
fund holdings. On 401/idle-timeout we raise
|
|
:class:`FidelitySessionError` so Prometheus alerts Viktor to re-seed.
|
|
|
|
## Emitted Activity / snapshot shape
|
|
|
|
- One ``DEPOSIT`` per cash-impacting transaction (Regular Premium, Single
|
|
Premium, rebate, etc.). ``external_id = fidelity:tx:<sha256[:16]>``.
|
|
- Bulk Switches / Fund Switches are skipped (no cash movement).
|
|
- After the activity stream drains, the ``fidelity-ingest`` CLI calls
|
|
``WealthfolioSink.push_manual_snapshots`` with one ``ManualSnapshotPayload``
|
|
per fund holding (today's date, units + cost basis allocated
|
|
proportionally to fund value share). This sets per-fund quantity and
|
|
cost basis in WF so the dashboard Positions table shows the pension
|
|
funds alongside the brokerage assets.
|
|
- The old synthetic ``fidelity:gains:<date>`` DEPOSIT is no longer
|
|
emitted — the snapshot supersedes it. Old offset rows that landed
|
|
before this change are corrected at the dashboard layer by the
|
|
``dav_corrected`` PG view (``infra/stacks/wealthfolio/main.tf``).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import logging
|
|
from collections.abc import AsyncIterator
|
|
from datetime import date, datetime
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
from typing import Any, NamedTuple
|
|
|
|
from broker_sync.models import Account, AccountType, Activity, ActivityType
|
|
from broker_sync.providers.parsers.fidelity import (
|
|
FidelityCashTx,
|
|
FidelityHolding,
|
|
parse_transactions_html,
|
|
parse_valuation_json,
|
|
)
|
|
from broker_sync.sinks.wealthfolio import ManualSnapshotPayload, SnapshotPosition
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
ACCOUNT_ID = "fidelity-workplace-pension"
|
|
_CCY = "GBP"
|
|
|
|
_PV_BASE = "https://www.planviewer.fidelity.co.uk"
|
|
_PV_TX_PATH = "/planviewer/DisplayMyPlanMemberTransHist.action"
|
|
_PV_VALUATION_PATH = "/planviewer/DisplayValuation.action"
|
|
_PV_LANDING = "https://www.planviewer.fidelity.co.uk/"
|
|
|
|
# A wide backfill cap; scheme can't predate 1990.
|
|
_BACKFILL_START = "01 Jan 1990"
|
|
|
|
|
|
class FidelityCreds(NamedTuple):
|
|
"""Paths needed to run the provider."""
|
|
storage_state_path: str
|
|
plan_id: str
|
|
headless: bool = True
|
|
|
|
|
|
class FidelitySessionError(Exception):
|
|
"""Raised when PlanViewer rejects the saved session — re-seed required."""
|
|
|
|
|
|
class FidelityProviderConfigError(Exception):
|
|
"""Raised when provider config is missing or obviously wrong."""
|
|
|
|
|
|
def _tx_to_activity(tx: FidelityCashTx) -> Activity:
|
|
"""Map a Fidelity cash transaction to a canonical DEPOSIT."""
|
|
return Activity(
|
|
external_id=tx.external_id,
|
|
account_id=ACCOUNT_ID,
|
|
account_type=AccountType.WORKPLACE_PENSION,
|
|
date=tx.date,
|
|
activity_type=ActivityType.DEPOSIT,
|
|
currency=_CCY,
|
|
amount=tx.amount,
|
|
notes=f"fidelity-planviewer:{tx.tx_type}",
|
|
)
|
|
|
|
|
|
class FidelityPlanViewerProvider:
|
|
"""Read-only provider against Fidelity UK PlanViewer.
|
|
|
|
Lifecycle:
|
|
- ``accounts()`` advertises the single WF workplace-pension account.
|
|
- ``fetch(since, before)`` opens a Playwright session with the saved
|
|
storage_state, navigates to the transaction-history page with a wide
|
|
date range, scrapes the table, and intercepts the valuation XHR.
|
|
- After ``fetch()`` completes, ``last_holdings`` holds the per-fund
|
|
unit positions and ``last_total_contribution`` the cumulative cash
|
|
contribution — used by the ``fidelity-ingest`` CLI to emit a
|
|
delta-shaped DEPOSIT that nudges WF's net worth to match the
|
|
PlanViewer reported pot value (see ``gains_offset_delta_activity``).
|
|
"""
|
|
name = "fidelity-planviewer"
|
|
|
|
def __init__(self, creds: FidelityCreds) -> None:
|
|
self._creds = creds
|
|
self.last_holdings: list[FidelityHolding] = []
|
|
self.last_total_contribution: Decimal = Decimal(0)
|
|
|
|
def accounts(self) -> list[Account]:
|
|
return [
|
|
Account(
|
|
id=ACCOUNT_ID,
|
|
name="Fidelity UK Pension",
|
|
account_type=AccountType.WORKPLACE_PENSION,
|
|
currency=_CCY,
|
|
provider=self.name,
|
|
),
|
|
]
|
|
|
|
async def fetch(
|
|
self,
|
|
*,
|
|
since: datetime | None = None,
|
|
before: datetime | None = None,
|
|
) -> AsyncIterator[Activity]:
|
|
state_path = self._creds.storage_state_path
|
|
if not Path(state_path).exists():
|
|
raise FidelityProviderConfigError(
|
|
f"storage_state not found at {state_path} — "
|
|
"run `broker-sync fidelity-seed` first")
|
|
|
|
tx_html, valuation_json = await _scrape_live_session(
|
|
state_path=state_path, headless=self._creds.headless,
|
|
)
|
|
transactions = parse_transactions_html(tx_html)
|
|
holdings = parse_valuation_json(valuation_json)
|
|
log.info("fidelity: parsed %d transactions, %d holdings",
|
|
len(transactions), len(holdings))
|
|
|
|
# Snapshot the per-fund holdings for the CLI to push as a manual
|
|
# holdings_snapshot after this generator drains. Wealthfolio's
|
|
# activity model can't represent pension fund unit purchases (no
|
|
# per-purchase price feed from PlanViewer), so we record current
|
|
# state via /api/v1/snapshots/import instead.
|
|
self.last_holdings = holdings
|
|
self.last_total_contribution = sum(
|
|
(t.amount for t in transactions), Decimal(0)
|
|
)
|
|
|
|
for tx in transactions:
|
|
if since is not None and tx.date < since:
|
|
continue
|
|
if before is not None and tx.date >= before:
|
|
continue
|
|
yield _tx_to_activity(tx)
|
|
# Gains-offset DEPOSITs are emitted by the CLI (which has the
|
|
# prior cumulative offset from WF). See `gains_offset_delta_activity`.
|
|
|
|
|
|
def gains_offset_delta_activity(
|
|
holdings: list[FidelityHolding],
|
|
total_real_contribution: Decimal,
|
|
prior_offset_cumulative: Decimal,
|
|
as_of: datetime,
|
|
min_delta: Decimal = Decimal("0.5"),
|
|
) -> Activity | None:
|
|
"""Compute the gains-offset DELTA since the last scrape and shape it
|
|
as a DEPOSIT (or WITHDRAWAL on a market drop).
|
|
|
|
The pension's per-fund prices aren't trackable in WF directly (no
|
|
public quote feed for these institutional life-fund share classes).
|
|
Instead, each monthly scrape emits a single small DEPOSIT/WITHDRAWAL
|
|
sized to ``(current_pot - real_contributions) - prior_cumulative_offset``
|
|
— i.e., the growth (or loss) accrued since the last run.
|
|
|
|
Wealthfolio's net_contribution then incorrectly includes all these
|
|
offsets; the ``dav_corrected`` PG view subtracts them back out so the
|
|
dashboard's Growth/ROI panels remain accurate. The deterministic
|
|
external_id (per scrape date) lets re-runs of the same day overwrite
|
|
rather than stack duplicates.
|
|
"""
|
|
if not holdings:
|
|
return None
|
|
current_pot = sum((h.total_value for h in holdings), Decimal(0))
|
|
current_gain = current_pot - total_real_contribution
|
|
delta = current_gain - prior_offset_cumulative
|
|
if abs(delta) < min_delta:
|
|
return None
|
|
return Activity(
|
|
external_id=f"fidelity:gains-delta:{as_of.date().isoformat()}",
|
|
account_id=ACCOUNT_ID,
|
|
account_type=AccountType.WORKPLACE_PENSION,
|
|
date=as_of,
|
|
activity_type=ActivityType.DEPOSIT if delta > 0 else ActivityType.WITHDRAWAL,
|
|
currency=_CCY,
|
|
amount=abs(delta),
|
|
notes=(
|
|
f"fidelity-planviewer:unrealised-gains-offset delta=£{delta} "
|
|
f"(pot=£{current_pot}, contrib=£{total_real_contribution}, "
|
|
f"prior_offset=£{prior_offset_cumulative})"
|
|
),
|
|
)
|
|
|
|
|
|
def fidelity_holdings_to_snapshot(
|
|
holdings: list[FidelityHolding],
|
|
total_real_contribution: Decimal,
|
|
as_of: date,
|
|
) -> ManualSnapshotPayload | None:
|
|
"""Convert scraped holdings into a Wealthfolio manual snapshot payload.
|
|
|
|
Cost-basis allocation: PlanViewer doesn't expose historical purchase
|
|
prices for individual fund unit buys, so we approximate per-fund
|
|
cost basis by allocating the cumulative cash contribution
|
|
proportionally to each fund's share of the current pot value. For
|
|
the typical single-fund Meta scheme this is exact; if Viktor's plan
|
|
later splits into multiple funds the proportional split is the
|
|
least-wrong allocation we can compute from monthly snapshots.
|
|
|
|
cashBalances is set to zero — pension contributions flow straight
|
|
into funds, the synthetic Wealthfolio "cash balance" only existed
|
|
because of the old gains-offset DEPOSIT hack.
|
|
"""
|
|
if not holdings:
|
|
return None
|
|
total_value = sum((h.total_value for h in holdings), Decimal(0))
|
|
if total_value <= 0:
|
|
return None
|
|
positions: list[SnapshotPosition] = []
|
|
for h in holdings:
|
|
share = h.total_value / total_value
|
|
cost = (total_real_contribution * share).quantize(Decimal("0.01"))
|
|
avg_cost = (cost / h.units).quantize(Decimal("0.0001")) if h.units > 0 else Decimal(0)
|
|
positions.append(SnapshotPosition(
|
|
symbol=h.fund_code,
|
|
quantity=h.units,
|
|
average_cost=avg_cost,
|
|
total_cost_basis=cost,
|
|
currency=h.currency,
|
|
))
|
|
return ManualSnapshotPayload(
|
|
date=as_of,
|
|
currency=_CCY,
|
|
positions=positions,
|
|
cash_balances={_CCY: Decimal(0)},
|
|
)
|
|
|
|
|
|
async def _scrape_live_session(
|
|
*,
|
|
state_path: str,
|
|
headless: bool,
|
|
) -> tuple[str, dict[str, Any]]:
|
|
"""Load storage_state, navigate the transaction + valuation pages,
|
|
return (transactions HTML, valuation JSON payload).
|
|
|
|
Raises :class:`FidelitySessionError` if the session is dead (15-min idle,
|
|
cookie expiry, etc.) — Viktor must re-seed.
|
|
"""
|
|
from playwright.async_api import async_playwright
|
|
|
|
captured_valuation: dict[str, dict[str, Any]] = {}
|
|
async with async_playwright() as pw:
|
|
browser = await pw.chromium.launch(headless=headless)
|
|
try:
|
|
ctx = await browser.new_context(
|
|
storage_state=state_path,
|
|
user_agent=("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/147.0.0.0 Safari/537.36"),
|
|
viewport={"width": 1280, "height": 900},
|
|
)
|
|
page = await ctx.new_page()
|
|
|
|
async def on_response(resp: Any) -> None:
|
|
if _PV_VALUATION_PATH in resp.url and resp.status < 400:
|
|
with contextlib.suppress(Exception):
|
|
captured_valuation["payload"] = await resp.json()
|
|
page.on("response", on_response)
|
|
|
|
# Trigger session + capture valuation by navigating through landing
|
|
# → main page. The SPA fires DisplayValuation on the main page.
|
|
await page.goto(_PV_LANDING, wait_until="networkidle", timeout=30000)
|
|
await page.wait_for_timeout(2000)
|
|
main_url = f"{_PV_BASE}/planviewer/DisplayMainPage.action"
|
|
await page.goto(main_url, wait_until="networkidle", timeout=30000)
|
|
await page.wait_for_timeout(3000)
|
|
if "idle for more than 15 minutes" in (await page.content()) \
|
|
or "id.fidelity.co.uk" in page.url:
|
|
raise FidelitySessionError(
|
|
"PlanViewer session stale — run `broker-sync fidelity-seed`")
|
|
|
|
# Now pull the transactions page with a wide date range.
|
|
await page.goto(f"{_PV_BASE}{_PV_TX_PATH}",
|
|
wait_until="networkidle", timeout=30000)
|
|
await page.wait_for_timeout(1500)
|
|
await page.fill('input[name="startDate"]', _BACKFILL_START)
|
|
today = await page.evaluate(
|
|
"new Date().toLocaleDateString('en-GB',"
|
|
"{day:'2-digit',month:'short',year:'numeric'}).replace(/,/g,'')")
|
|
await page.fill('input[name="endDate"]', today)
|
|
await page.focus('input[name="endDate"]')
|
|
await page.keyboard.press("Enter")
|
|
with contextlib.suppress(Exception):
|
|
await page.wait_for_load_state("networkidle", timeout=15000)
|
|
await page.wait_for_timeout(2000)
|
|
tx_html = await page.content()
|
|
|
|
# If valuation wasn't picked up on the main page, request directly.
|
|
if "payload" not in captured_valuation:
|
|
r = await page.request.get(f"{_PV_BASE}{_PV_VALUATION_PATH}")
|
|
if r.ok:
|
|
with contextlib.suppress(Exception):
|
|
captured_valuation["payload"] = await r.json()
|
|
|
|
# Roll the storage_state so the next run benefits from any refresh.
|
|
await ctx.storage_state(path=state_path)
|
|
finally:
|
|
await browser.close()
|
|
|
|
valuation: dict[str, Any] = captured_valuation.get("payload") or {}
|
|
return tx_html, valuation
|