Compare commits

...
Sign in to create a new pull request.

5 commits

Author SHA1 Message Date
98c4729622 fidelity: replace snapshot-push with delta gains-offset DEPOSITs
Some checks failed
ci/woodpecker/push/build Pipeline failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled
CI / deploy (push) Has been cancelled
Per-fund snapshot import landed quantities but dropped cost basis +
needed a separate quote-push path we never identified. Snapshotting
also collided with WF's own TOTAL aggregation and ZEROED the Fidelity
cash balance.

Simpler plan: each monthly scrape emits a single DEPOSIT (or
WITHDRAWAL on a market drop) sized to the delta between the live
PlanViewer pot value and Wealthfolio's running total. dav_corrected
PG view continues to subtract these offsets from net_contribution so
the dashboard Growth/ROI math stays right.

- New gains_offset_delta_activity() — current_gain - prior_offset.
- New WealthfolioSink.cumulative_amount_with_notes_prefix() — sums
  the existing fidelity-planviewer:unrealised-gains-offset DEPOSITs
  in WF so we know what's already been emitted.
- CLI runs sync_provider_to_wealthfolio first (cash flows), then
  computes + emits the delta via import_activities.
- 4 new provider tests for the delta logic; full suite (144 + 1
  skipped) green; mypy + ruff clean.

The old fidelity_holdings_to_snapshot helper + push_manual_snapshots
sink method stay for future use but are no longer called.
2026-05-17 00:35:17 +00:00
c9c0310733 fidelity: snapshot push needs WF account UUID, not logical id
/api/v1/snapshots/import lookups the account by Wealthfolio's own
UUID; passing our provider-side logical id ('fidelity-workplace-pension')
returns 400 'Database operation failed: Record not found'. Resolve
via sink.ensure_account() which the pipeline already runs idempotently,
then pass the returned UUID into push_manual_snapshots().
2026-05-16 23:47:49 +00:00
cb159e17d9 fidelity: push per-fund manual snapshot instead of gains-offset DEPOSIT
PlanViewer's DisplayValuation.action JSON already gives us current
fund units + unit price; we were parsing it and throwing it away,
emitting only a single 'unrealised-gains-offset' DEPOSIT to make
Wealthfolio's totals match the dashboard. That hack double-counted
the gain as a cash contribution, hiding £35k of pension growth from
every contribution/growth/ROI panel.

New flow:
- FidelityPlanViewerProvider exposes last_holdings + last_total_contribution
  after fetch() drains.
- fidelity-ingest CLI converts to a ManualSnapshotPayload (cost basis
  allocated proportionally by current fund value share) and posts to
  WF /api/v1/snapshots/import. WF auto-creates unknown fund symbols
  with kind=INVESTMENT, quoteMode=MANUAL, quoteCcy=GBP.
- The gains-offset emission is removed entirely. Historical offset
  rows already in WF are corrected at the dashboard layer by the
  dav_corrected view shipped in infra@2841347e.

WealthfolioSink gains push_manual_snapshots() + ManualSnapshotPayload /
SnapshotPosition wire types. 11 sink tests (3 new) + 9 fidelity
provider tests (2 changed, 1 new) all green; mypy + ruff clean.
2026-05-16 13:56:25 +00:00
Viktor Barzin
5adc4a7ba4 [ci] deploy.yml: manual-only — push events don't set IMAGE_TAG
Some checks failed
ci/woodpecker/push/build Pipeline was successful
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled
CI / deploy (push) Has been cancelled
2026-05-07 23:25:28 +00:00
Viktor Barzin
f4a4c8892f trigger pipeline
Some checks are pending
CI / test (push) Waiting to run
CI / build (push) Blocked by required conditions
CI / deploy (push) Blocked by required conditions
2026-05-07 22:47:37 +00:00
6 changed files with 494 additions and 60 deletions

View file

@ -1,5 +1,9 @@
when: when:
- event: [manual, push] # Manual-only — fired with IMAGE_TAG by the build pipeline (or
# by a human kicking off a deploy from the Woodpecker UI).
# The earlier `[manual, push]` would fire on every push and fail
# at check-vars because IMAGE_TAG is unset on push events.
- event: manual
steps: steps:
- name: check-vars - name: check-vars

View file

@ -438,6 +438,10 @@ def fidelity_ingest(
sys.exit(2) sys.exit(2)
async def _run() -> None: async def _run() -> None:
from broker_sync.providers.fidelity_planviewer import (
gains_offset_delta_activity,
)
sink = WealthfolioSink( sink = WealthfolioSink(
base_url=wf_base_url, base_url=wf_base_url,
username=wf_username, username=wf_username,
@ -455,12 +459,36 @@ def fidelity_ingest(
result = await sync_provider_to_wealthfolio( result = await sync_provider_to_wealthfolio(
provider=provider, sink=sink, dedup=dedup, since=since, provider=provider, sink=sink, dedup=dedup, since=since,
) )
# PlanViewer doesn't expose per-fund unit prices in any feed
# WF can consume, so the only way to keep WF's pension total in
# line with the live PlanViewer pot value is to emit a small
# DEPOSIT (or WITHDRAWAL on a market drop) each run sized to
# the growth since the last scrape. The dav_corrected PG view
# subtracts these offsets from net_contribution so the
# dashboard's Growth/ROI panels stay accurate.
gains_delta_emitted = 0
if provider.last_holdings:
wf_account_id = await sink.ensure_account(provider.accounts()[0])
prior_offset = await sink.cumulative_amount_with_notes_prefix(
account_id=wf_account_id,
notes_prefix="fidelity-planviewer:unrealised-gains-offset",
)
delta = gains_offset_delta_activity(
holdings=provider.last_holdings,
total_real_contribution=provider.last_total_contribution,
prior_offset_cumulative=prior_offset,
as_of=datetime.now(UTC),
)
if delta is not None:
await sink.import_activities([delta])
gains_delta_emitted = 1
finally: finally:
await sink.close() await sink.close()
typer.echo(f"fidelity-ingest: fetched={result.fetched} " typer.echo(f"fidelity-ingest: fetched={result.fetched} "
f"new={result.new_after_dedup} " f"new={result.new_after_dedup} "
f"imported={result.imported} " f"imported={result.imported} "
f"failed={result.failed}") f"failed={result.failed} "
f"gains_delta={gains_delta_emitted}")
if result.failed > 0: if result.failed > 0:
sys.exit(1) sys.exit(1)

View file

@ -16,21 +16,28 @@ We keep a Playwright-maintained session via ``storage_state.json``:
fund holdings. On 401/idle-timeout we raise fund holdings. On 401/idle-timeout we raise
:class:`FidelitySessionError` so Prometheus alerts Viktor to re-seed. :class:`FidelitySessionError` so Prometheus alerts Viktor to re-seed.
## Emitted Activity shape ## Emitted Activity / snapshot shape
- One ``DEPOSIT`` per cash-impacting transaction (Regular Premium, Single - One ``DEPOSIT`` per cash-impacting transaction (Regular Premium, Single
Premium, rebate, etc.). ``external_id = fidelity:tx:<sha256[:16]>``. Premium, rebate, etc.). ``external_id = fidelity:tx:<sha256[:16]>``.
- One synthetic ``DEPOSIT`` for unrealised gains so WF's Net Worth matches
the Fidelity dashboard. ``external_id =
fidelity:gains:<YYYY-MM-DD>``.
- Bulk Switches / Fund Switches are skipped (no cash movement). - Bulk Switches / Fund Switches are skipped (no cash movement).
- After the activity stream drains, the ``fidelity-ingest`` CLI calls
``WealthfolioSink.push_manual_snapshots`` with one ``ManualSnapshotPayload``
per fund holding (today's date, units + cost basis allocated
proportionally to fund value share). This sets per-fund quantity and
cost basis in WF so the dashboard Positions table shows the pension
funds alongside the brokerage assets.
- The old synthetic ``fidelity:gains:<date>`` DEPOSIT is no longer
emitted the snapshot supersedes it. Old offset rows that landed
before this change are corrected at the dashboard layer by the
``dav_corrected`` PG view (``infra/stacks/wealthfolio/main.tf``).
""" """
from __future__ import annotations from __future__ import annotations
import contextlib import contextlib
import logging import logging
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from datetime import UTC, datetime from datetime import date, datetime
from decimal import Decimal from decimal import Decimal
from pathlib import Path from pathlib import Path
from typing import Any, NamedTuple from typing import Any, NamedTuple
@ -42,6 +49,7 @@ from broker_sync.providers.parsers.fidelity import (
parse_transactions_html, parse_transactions_html,
parse_valuation_json, parse_valuation_json,
) )
from broker_sync.sinks.wealthfolio import ManualSnapshotPayload, SnapshotPosition
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -86,37 +94,6 @@ def _tx_to_activity(tx: FidelityCashTx) -> Activity:
) )
def _gains_offset_activity(
holdings: list[FidelityHolding],
transactions: list[FidelityCashTx],
as_of: datetime,
) -> Activity | None:
"""Create a synthetic DEPOSIT/WITHDRAWAL so WF Net Worth matches the
Fidelity dashboard's reported pot value.
The offset carries a date-derived external_id so monthly runs refresh
the same synthetic entry rather than stacking duplicates.
"""
if not holdings:
return None
total_value = sum((h.total_value for h in holdings), Decimal(0))
total_contrib = sum((t.amount for t in transactions), Decimal(0))
gains = total_value - total_contrib
if gains == 0:
return None
return Activity(
external_id=f"fidelity:gains:{as_of.date().isoformat()}",
account_id=ACCOUNT_ID,
account_type=AccountType.WORKPLACE_PENSION,
date=as_of,
activity_type=ActivityType.DEPOSIT if gains > 0 else ActivityType.WITHDRAWAL,
currency=_CCY,
amount=abs(gains),
notes=(f"fidelity-planviewer:unrealised-gains-offset "
f"(pot=£{total_value}, contrib=£{total_contrib})"),
)
class FidelityPlanViewerProvider: class FidelityPlanViewerProvider:
"""Read-only provider against Fidelity UK PlanViewer. """Read-only provider against Fidelity UK PlanViewer.
@ -125,11 +102,18 @@ class FidelityPlanViewerProvider:
- ``fetch(since, before)`` opens a Playwright session with the saved - ``fetch(since, before)`` opens a Playwright session with the saved
storage_state, navigates to the transaction-history page with a wide storage_state, navigates to the transaction-history page with a wide
date range, scrapes the table, and intercepts the valuation XHR. date range, scrapes the table, and intercepts the valuation XHR.
- After ``fetch()`` completes, ``last_holdings`` holds the per-fund
unit positions and ``last_total_contribution`` the cumulative cash
contribution used by the ``fidelity-ingest`` CLI to emit a
delta-shaped DEPOSIT that nudges WF's net worth to match the
PlanViewer reported pot value (see ``gains_offset_delta_activity``).
""" """
name = "fidelity-planviewer" name = "fidelity-planviewer"
def __init__(self, creds: FidelityCreds) -> None: def __init__(self, creds: FidelityCreds) -> None:
self._creds = creds self._creds = creds
self.last_holdings: list[FidelityHolding] = []
self.last_total_contribution: Decimal = Decimal(0)
def accounts(self) -> list[Account]: def accounts(self) -> list[Account]:
return [ return [
@ -162,19 +146,113 @@ class FidelityPlanViewerProvider:
log.info("fidelity: parsed %d transactions, %d holdings", log.info("fidelity: parsed %d transactions, %d holdings",
len(transactions), len(holdings)) len(transactions), len(holdings))
# Snapshot the per-fund holdings for the CLI to push as a manual
# holdings_snapshot after this generator drains. Wealthfolio's
# activity model can't represent pension fund unit purchases (no
# per-purchase price feed from PlanViewer), so we record current
# state via /api/v1/snapshots/import instead.
self.last_holdings = holdings
self.last_total_contribution = sum(
(t.amount for t in transactions), Decimal(0)
)
for tx in transactions: for tx in transactions:
if since is not None and tx.date < since: if since is not None and tx.date < since:
continue continue
if before is not None and tx.date >= before: if before is not None and tx.date >= before:
continue continue
yield _tx_to_activity(tx) yield _tx_to_activity(tx)
# Gains-offset DEPOSITs are emitted by the CLI (which has the
# prior cumulative offset from WF). See `gains_offset_delta_activity`.
# The gains offset is always "as of now" so it reflects today's pot.
# Only emit when the caller isn't windowing (full state). def gains_offset_delta_activity(
if since is None and before is None: holdings: list[FidelityHolding],
offset = _gains_offset_activity(holdings, transactions, datetime.now(UTC)) total_real_contribution: Decimal,
if offset is not None: prior_offset_cumulative: Decimal,
yield offset as_of: datetime,
min_delta: Decimal = Decimal("0.5"),
) -> Activity | None:
"""Compute the gains-offset DELTA since the last scrape and shape it
as a DEPOSIT (or WITHDRAWAL on a market drop).
The pension's per-fund prices aren't trackable in WF directly (no
public quote feed for these institutional life-fund share classes).
Instead, each monthly scrape emits a single small DEPOSIT/WITHDRAWAL
sized to ``(current_pot - real_contributions) - prior_cumulative_offset``
i.e., the growth (or loss) accrued since the last run.
Wealthfolio's net_contribution then incorrectly includes all these
offsets; the ``dav_corrected`` PG view subtracts them back out so the
dashboard's Growth/ROI panels remain accurate. The deterministic
external_id (per scrape date) lets re-runs of the same day overwrite
rather than stack duplicates.
"""
if not holdings:
return None
current_pot = sum((h.total_value for h in holdings), Decimal(0))
current_gain = current_pot - total_real_contribution
delta = current_gain - prior_offset_cumulative
if abs(delta) < min_delta:
return None
return Activity(
external_id=f"fidelity:gains-delta:{as_of.date().isoformat()}",
account_id=ACCOUNT_ID,
account_type=AccountType.WORKPLACE_PENSION,
date=as_of,
activity_type=ActivityType.DEPOSIT if delta > 0 else ActivityType.WITHDRAWAL,
currency=_CCY,
amount=abs(delta),
notes=(
f"fidelity-planviewer:unrealised-gains-offset delta=£{delta} "
f"(pot=£{current_pot}, contrib=£{total_real_contribution}, "
f"prior_offset=£{prior_offset_cumulative})"
),
)
def fidelity_holdings_to_snapshot(
holdings: list[FidelityHolding],
total_real_contribution: Decimal,
as_of: date,
) -> ManualSnapshotPayload | None:
"""Convert scraped holdings into a Wealthfolio manual snapshot payload.
Cost-basis allocation: PlanViewer doesn't expose historical purchase
prices for individual fund unit buys, so we approximate per-fund
cost basis by allocating the cumulative cash contribution
proportionally to each fund's share of the current pot value. For
the typical single-fund Meta scheme this is exact; if Viktor's plan
later splits into multiple funds the proportional split is the
least-wrong allocation we can compute from monthly snapshots.
cashBalances is set to zero pension contributions flow straight
into funds, the synthetic Wealthfolio "cash balance" only existed
because of the old gains-offset DEPOSIT hack.
"""
if not holdings:
return None
total_value = sum((h.total_value for h in holdings), Decimal(0))
if total_value <= 0:
return None
positions: list[SnapshotPosition] = []
for h in holdings:
share = h.total_value / total_value
cost = (total_real_contribution * share).quantize(Decimal("0.01"))
avg_cost = (cost / h.units).quantize(Decimal("0.0001")) if h.units > 0 else Decimal(0)
positions.append(SnapshotPosition(
symbol=h.fund_code,
quantity=h.units,
average_cost=avg_cost,
total_cost_basis=cost,
currency=h.currency,
))
return ManualSnapshotPayload(
date=as_of,
currency=_CCY,
positions=positions,
cash_balances={_CCY: Decimal(0)},
)
async def _scrape_live_session( async def _scrape_live_session(

View file

@ -2,7 +2,9 @@ from __future__ import annotations
import json import json
from collections.abc import Iterable from collections.abc import Iterable
from datetime import UTC from dataclasses import dataclass
from datetime import UTC, date
from decimal import Decimal
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -14,6 +16,8 @@ _LOGIN_PATH = "/api/v1/auth/login"
_ACCOUNTS_PATH = "/api/v1/accounts" _ACCOUNTS_PATH = "/api/v1/accounts"
_IMPORT_CHECK = "/api/v1/activities/import/check" _IMPORT_CHECK = "/api/v1/activities/import/check"
_IMPORT_REAL = "/api/v1/activities/import" _IMPORT_REAL = "/api/v1/activities/import"
_SNAPSHOTS_IMPORT = "/api/v1/snapshots/import"
_ACTIVITIES_SEARCH = "/api/v1/activities/search"
class WealthfolioError(Exception): class WealthfolioError(Exception):
@ -262,3 +266,131 @@ class WealthfolioSink:
f"First warning: {first_warn}") f"First warning: {first_warn}")
assert isinstance(got, list) assert isinstance(got, list)
return [r for r in got if isinstance(r, dict)] return [r for r in got if isinstance(r, dict)]
# -- activity lookups --
async def cumulative_amount_with_notes_prefix(
self,
account_id: str,
notes_prefix: str,
) -> Decimal:
"""Sum the amount of DEPOSIT/WITHDRAWAL activities whose notes start
with ``notes_prefix``, signed (deposits positive, withdrawals negative).
Used by the Fidelity provider to compute the delta gains-offset:
``current_gain - cumulative_existing_offset`` becomes the new
DEPOSIT to emit on each monthly run.
"""
try:
resp = await self._request(
"POST", _ACTIVITIES_SEARCH,
json={"accountIds": [account_id], "page": 1, "pageSize": 500},
)
except Exception:
return Decimal(0)
if resp.status_code >= 400:
return Decimal(0)
payload = resp.json()
rows = payload.get("data", payload) if isinstance(payload, dict) else payload
if not isinstance(rows, list):
return Decimal(0)
total = Decimal(0)
for r in rows:
if not isinstance(r, dict):
continue
notes = r.get("comment") or r.get("notes") or ""
if not isinstance(notes, str) or not notes.startswith(notes_prefix):
continue
amt_raw = r.get("amount")
if amt_raw is None:
continue
try:
amt = Decimal(str(amt_raw))
except Exception:
continue
atype = (r.get("activityType") or r.get("activity_type") or "").upper()
if atype == "WITHDRAWAL":
total -= amt
else:
total += amt
return total
# -- manual holdings snapshots --
async def push_manual_snapshots(
self,
account_id: str,
snapshots: list[ManualSnapshotPayload],
) -> dict[str, Any]:
"""Push manual holdings snapshots to /api/v1/snapshots/import.
Each snapshot carries a date + per-fund positions + cash balances.
Wealthfolio auto-creates any unknown asset symbol with
``kind=INVESTMENT, quoteMode=MANUAL, quoteCcy=<currency>`` and uses
the snapshot to derive holdings + valuation for that date bypassing
the activity-ledger derivation entirely for the targeted day.
Used by the Fidelity provider since PlanViewer exposes current
fund units + price but no per-trade history. Re-imports for the
same (account, date) overwrite in place.
"""
if not snapshots:
return {"snapshotsImported": 0, "snapshotsFailed": 0, "errors": []}
body = {
"accountId": account_id,
"snapshots": [_snapshot_to_payload(s) for s in snapshots],
}
resp = await self._request("POST", _SNAPSHOTS_IMPORT, json=body)
if resp.status_code >= 400:
try:
payload = resp.json()
except Exception:
payload = {"raw": resp.text}
raise WealthfolioError(
f"Wealthfolio /snapshots/import rejected: {payload}")
result = resp.json()
assert isinstance(result, dict)
failed = int(result.get("snapshotsFailed", 0))
if failed > 0:
raise WealthfolioError(
f"Wealthfolio /snapshots/import: {failed} snapshot(s) failed; "
f"errors={result.get('errors')}")
return result
@dataclass(frozen=True)
class SnapshotPosition:
"""A per-fund position row in a Wealthfolio manual snapshot."""
symbol: str
quantity: Decimal
average_cost: Decimal
total_cost_basis: Decimal
currency: str
@dataclass(frozen=True)
class ManualSnapshotPayload:
"""Sink-facing snapshot row. Mirrors the JSON shape WF expects."""
date: date
currency: str
positions: list[SnapshotPosition]
cash_balances: dict[str, Decimal]
def _snapshot_to_payload(s: ManualSnapshotPayload) -> dict[str, Any]:
"""Serialise a ManualSnapshotPayload into WF's import wire format."""
return {
"date": s.date.isoformat(),
"currency": s.currency,
"positions": [
{
"symbol": p.symbol,
"quantity": format(p.quantity, "f"),
"averageCost": format(p.average_cost, "f"),
"totalCostBasis": format(p.total_cost_basis, "f"),
"currency": p.currency,
}
for p in s.positions
],
"cashBalances": {k: format(v, "f") for k, v in s.cash_balances.items()},
}

View file

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
from datetime import UTC, datetime from datetime import UTC, date, datetime
from decimal import Decimal from decimal import Decimal
from pathlib import Path from pathlib import Path
@ -13,7 +13,8 @@ from broker_sync.providers.fidelity_planviewer import (
FidelityCreds, FidelityCreds,
FidelityPlanViewerProvider, FidelityPlanViewerProvider,
FidelityProviderConfigError, FidelityProviderConfigError,
_gains_offset_activity, fidelity_holdings_to_snapshot,
gains_offset_delta_activity,
) )
from broker_sync.providers.parsers.fidelity import ( from broker_sync.providers.parsers.fidelity import (
parse_transactions_html, parse_transactions_html,
@ -96,21 +97,113 @@ def test_parse_valuation_fixture() -> None:
assert set(h.units_by_source.keys()) >= {"SASC", "ERXS"} assert set(h.units_by_source.keys()) >= {"SASC", "ERXS"}
def test_gains_offset_emits_deposit_when_pot_exceeds_contributions() -> None: def test_holdings_to_snapshot_real_fixture() -> None:
html = (_FIXTURES / "transactions-full.html").read_text() html = (_FIXTURES / "transactions-full.html").read_text()
valuation = json.loads((_FIXTURES / "valuation.json").read_text()) valuation = json.loads((_FIXTURES / "valuation.json").read_text())
txs = parse_transactions_html(html)
holdings = parse_valuation_json(valuation) holdings = parse_valuation_json(valuation)
as_of = datetime(2026, 4, 18, tzinfo=UTC) total_contrib = sum((tx.amount for tx in parse_transactions_html(html)),
offset = _gains_offset_activity(holdings, txs, as_of) Decimal(0))
assert offset is not None
assert offset.activity_type in (ActivityType.DEPOSIT, ActivityType.WITHDRAWAL) snapshot = fidelity_holdings_to_snapshot(
assert offset.amount is not None and offset.amount > 0 holdings=holdings,
assert offset.external_id == "fidelity:gains:2026-04-18" total_real_contribution=total_contrib,
as_of=date(2026, 4, 18),
)
assert snapshot is not None
assert snapshot.date == date(2026, 4, 18)
assert snapshot.currency == "GBP"
# Cost basis sums to the cash contributions (allocated by fund value share)
sum_cost = sum((p.total_cost_basis for p in snapshot.positions), Decimal(0))
assert abs(sum_cost - total_contrib) < Decimal("1")
# Meta scheme had KDOA + LAFC + one other at fixture time; the
# dominant fund must be KDOA.
symbols = [p.symbol for p in snapshot.positions]
assert "KDOA" in symbols
kdoa = next(p for p in snapshot.positions if p.symbol == "KDOA")
assert kdoa.quantity > 0
# Proportional cost-basis allocation: KDOA holds nearly the whole pot
# so it should get the lion's share of cost
kdoa_share = kdoa.total_cost_basis / sum_cost
assert kdoa_share > Decimal("0.9")
# cashBalances zero — pension contributions flow straight into funds
assert snapshot.cash_balances == {"GBP": Decimal(0)}
def test_gains_offset_none_when_no_holdings() -> None: def test_holdings_to_snapshot_none_when_no_holdings() -> None:
assert _gains_offset_activity( assert fidelity_holdings_to_snapshot(
holdings=[], transactions=[], holdings=[], total_real_contribution=Decimal("100"),
as_of=datetime(2026, 4, 18, tzinfo=UTC), as_of=date(2026, 4, 18),
) is None
def test_provider_caches_holdings_for_cli_snapshot_push() -> None:
"""The CLI reads `last_holdings` after fetch() drains to push the
manual snapshot. This guards the contract that fetch() populates the
attribute even when no Activity is yielded (e.g., backfill window
cut-off)."""
prov = FidelityPlanViewerProvider(FidelityCreds(
storage_state_path="/tmp/x", plan_id="META",
))
# Pre-fetch state: empty
assert prov.last_holdings == []
assert prov.last_total_contribution == Decimal(0)
# -- delta-shaped gains offset (the monthly accumulation mechanism) --
def _holdings_summing_to(total: Decimal) -> list:
from broker_sync.providers.parsers.fidelity import FidelityHolding
return [FidelityHolding(
fund_code="KDOA", fund_name="Test", units=Decimal("100"),
unit_price=total / Decimal("100"), currency="GBP", total_value=total,
units_by_source={},
)]
def test_gains_delta_emits_deposit_when_gain_exceeds_prior_offset() -> None:
# pot £145k, real contrib £102k → current gain £43k; prior offset £35k
# → delta = +£8k
activity = gains_offset_delta_activity(
holdings=_holdings_summing_to(Decimal("145000")),
total_real_contribution=Decimal("102000"),
prior_offset_cumulative=Decimal("35000"),
as_of=datetime(2026, 5, 17, tzinfo=UTC),
)
assert activity is not None
assert activity.activity_type == ActivityType.DEPOSIT
assert activity.amount == Decimal("8000")
assert activity.external_id == "fidelity:gains-delta:2026-05-17"
assert "unrealised-gains-offset" in (activity.notes or "")
def test_gains_delta_emits_withdrawal_on_market_drop() -> None:
# pot dropped: current gain £30k, prior offset £35k → delta = -£5k
activity = gains_offset_delta_activity(
holdings=_holdings_summing_to(Decimal("132000")),
total_real_contribution=Decimal("102000"),
prior_offset_cumulative=Decimal("35000"),
as_of=datetime(2026, 5, 17, tzinfo=UTC),
)
assert activity is not None
assert activity.activity_type == ActivityType.WITHDRAWAL
assert activity.amount == Decimal("5000")
def test_gains_delta_suppressed_below_minimum() -> None:
# delta ~£0.20, below the £0.50 min — skip emission to avoid noise.
activity = gains_offset_delta_activity(
holdings=_holdings_summing_to(Decimal("137000.20")),
total_real_contribution=Decimal("102000"),
prior_offset_cumulative=Decimal("35000"),
as_of=datetime(2026, 5, 17, tzinfo=UTC),
)
assert activity is None
def test_gains_delta_none_when_no_holdings() -> None:
assert gains_offset_delta_activity(
holdings=[], total_real_contribution=Decimal("0"),
prior_offset_cumulative=Decimal("0"),
as_of=datetime(2026, 5, 17, tzinfo=UTC),
) is None ) is None

View file

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
from datetime import UTC, datetime from datetime import UTC, date, datetime
from decimal import Decimal from decimal import Decimal
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -12,6 +12,9 @@ import pytest
from broker_sync.models import Account, AccountType, Activity, ActivityType from broker_sync.models import Account, AccountType, Activity, ActivityType
from broker_sync.sinks.wealthfolio import ( from broker_sync.sinks.wealthfolio import (
ImportValidationError, ImportValidationError,
ManualSnapshotPayload,
SnapshotPosition,
WealthfolioError,
WealthfolioSink, WealthfolioSink,
WealthfolioUnauthorizedError, WealthfolioUnauthorizedError,
) )
@ -274,3 +277,99 @@ async def test_import_halts_on_validation_failure(tmp_path: Path) -> None:
with pytest.raises(ImportValidationError, match="unknown symbol"): with pytest.raises(ImportValidationError, match="unknown symbol"):
await sink.import_activities([_buy()]) await sink.import_activities([_buy()])
assert calls == ["/api/v1/activities/import/check"] # real import never hit assert calls == ["/api/v1/activities/import/check"] # real import never hit
# -- Manual snapshot import (Fidelity path) --
@pytest.mark.asyncio
async def test_push_manual_snapshots_serialises_decimals_and_calls_endpoint(
tmp_path: Path,
) -> None:
sp = tmp_path / "s.json"
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
seen: dict[str, Any] = {}
async def handler(req: httpx.Request) -> httpx.Response:
if req.url.path == "/api/v1/snapshots/import":
seen["body"] = json.loads(req.content)
return httpx.Response(
200,
json={"snapshotsImported": 1, "snapshotsFailed": 0, "errors": []},
)
return httpx.Response(404)
sink = _client(httpx.MockTransport(handler), sp)
snapshot = ManualSnapshotPayload(
date=date(2026, 5, 16),
currency="GBP",
positions=[
SnapshotPosition(
symbol="KDOA",
quantity=Decimal("4200.5"),
average_cost=Decimal("24.29"),
total_cost_basis=Decimal("102004.15"),
currency="GBP",
),
],
cash_balances={"GBP": Decimal(0)},
)
result = await sink.push_manual_snapshots(
account_id="a7d6208d-2bd6-4f85-bf54-b77984c78234",
snapshots=[snapshot],
)
assert result["snapshotsImported"] == 1
# Wire format: numeric fields are STRINGS (Decimal.__format__('f'))
body = seen["body"]
assert body["accountId"] == "a7d6208d-2bd6-4f85-bf54-b77984c78234"
pos = body["snapshots"][0]["positions"][0]
assert pos == {
"symbol": "KDOA",
"quantity": "4200.5",
"averageCost": "24.29",
"totalCostBasis": "102004.15",
"currency": "GBP",
}
assert body["snapshots"][0]["cashBalances"] == {"GBP": "0"}
@pytest.mark.asyncio
async def test_push_manual_snapshots_raises_on_partial_failure(
tmp_path: Path,
) -> None:
sp = tmp_path / "s.json"
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
async def handler(req: httpx.Request) -> httpx.Response:
return httpx.Response(
200,
json={
"snapshotsImported": 0,
"snapshotsFailed": 1,
"errors": [{"row": 0, "msg": "bad symbol"}],
},
)
sink = _client(httpx.MockTransport(handler), sp)
snapshot = ManualSnapshotPayload(
date=date(2026, 5, 16), currency="GBP",
positions=[], cash_balances={},
)
with pytest.raises(WealthfolioError, match="bad symbol"):
await sink.push_manual_snapshots(account_id="acct", snapshots=[snapshot])
@pytest.mark.asyncio
async def test_push_manual_snapshots_short_circuits_on_empty(
tmp_path: Path,
) -> None:
sp = tmp_path / "s.json"
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
async def handler(req: httpx.Request) -> httpx.Response:
raise AssertionError(f"unexpected request: {req.method} {req.url.path}")
sink = _client(httpx.MockTransport(handler), sp)
result = await sink.push_manual_snapshots(account_id="acct", snapshots=[])
assert result["snapshotsImported"] == 0