fidelity: replace snapshot-push with delta gains-offset DEPOSITs
Some checks failed
ci/woodpecker/push/build Pipeline failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled
CI / deploy (push) Has been cancelled

Per-fund snapshot import landed quantities but dropped cost basis +
needed a separate quote-push path we never identified. Snapshotting
also collided with WF's own TOTAL aggregation and ZEROED the Fidelity
cash balance.

Simpler plan: each monthly scrape emits a single DEPOSIT (or
WITHDRAWAL on a market drop) sized to the delta between the live
PlanViewer pot value and Wealthfolio's running total. dav_corrected
PG view continues to subtract these offsets from net_contribution so
the dashboard Growth/ROI math stays right.

- New gains_offset_delta_activity() — current_gain - prior_offset.
- New WealthfolioSink.cumulative_amount_with_notes_prefix() — sums
  the existing fidelity-planviewer:unrealised-gains-offset DEPOSITs
  in WF so we know what's already been emitted.
- CLI runs sync_provider_to_wealthfolio first (cash flows), then
  computes + emits the delta via import_activities.
- 4 new provider tests for the delta logic; full suite (144 + 1
  skipped) green; mypy + ruff clean.

The old fidelity_holdings_to_snapshot helper + push_manual_snapshots
sink method stay for future use but are no longer called.
This commit is contained in:
Viktor Barzin 2026-05-17 00:35:17 +00:00
parent c9c0310733
commit 98c4729622
4 changed files with 183 additions and 35 deletions

View file

@ -438,10 +438,8 @@ def fidelity_ingest(
sys.exit(2)
async def _run() -> None:
from datetime import date as _date_t
from broker_sync.providers.fidelity_planviewer import (
fidelity_holdings_to_snapshot,
gains_offset_delta_activity,
)
sink = WealthfolioSink(
@ -461,37 +459,36 @@ def fidelity_ingest(
result = await sync_provider_to_wealthfolio(
provider=provider, sink=sink, dedup=dedup, since=since,
)
# PlanViewer has no historical per-fund unit-price feed, so
# the Activity stream above only carries cash flows. The
# current-pot fund positions captured in the same scrape get
# pushed via /api/v1/snapshots/import so per-fund quantity +
# cost basis land in WF (and propagate to the wealth
# dashboard's Positions table via pg-sync).
snapshot_imported = 0
# PlanViewer doesn't expose per-fund unit prices in any feed
# WF can consume, so the only way to keep WF's pension total in
# line with the live PlanViewer pot value is to emit a small
# DEPOSIT (or WITHDRAWAL on a market drop) each run sized to
# the growth since the last scrape. The dav_corrected PG view
# subtracts these offsets from net_contribution so the
# dashboard's Growth/ROI panels stay accurate.
gains_delta_emitted = 0
if provider.last_holdings:
snapshot = fidelity_holdings_to_snapshot(
wf_account_id = await sink.ensure_account(provider.accounts()[0])
prior_offset = await sink.cumulative_amount_with_notes_prefix(
account_id=wf_account_id,
notes_prefix="fidelity-planviewer:unrealised-gains-offset",
)
delta = gains_offset_delta_activity(
holdings=provider.last_holdings,
total_real_contribution=provider.last_total_contribution,
as_of=_date_t.today(),
prior_offset_cumulative=prior_offset,
as_of=datetime.now(UTC),
)
if snapshot is not None:
# /api/v1/snapshots/import wants WF's own account UUID,
# not our logical provider id — look it up via the same
# match the pipeline used (provider+providerAccountId).
wf_account_id = await sink.ensure_account(
provider.accounts()[0],
)
push_result = await sink.push_manual_snapshots(
account_id=wf_account_id, snapshots=[snapshot],
)
snapshot_imported = int(push_result.get("snapshotsImported", 0))
if delta is not None:
await sink.import_activities([delta])
gains_delta_emitted = 1
finally:
await sink.close()
typer.echo(f"fidelity-ingest: fetched={result.fetched} "
f"new={result.new_after_dedup} "
f"imported={result.imported} "
f"failed={result.failed} "
f"snapshots={snapshot_imported}")
f"gains_delta={gains_delta_emitted}")
if result.failed > 0:
sys.exit(1)