Compare commits
5 commits
phase-0-sc
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 98c4729622 | |||
| c9c0310733 | |||
| cb159e17d9 | |||
|
|
5adc4a7ba4 | ||
|
|
f4a4c8892f |
6 changed files with 494 additions and 60 deletions
|
|
@ -1,5 +1,9 @@
|
||||||
when:
|
when:
|
||||||
- event: [manual, push]
|
# Manual-only — fired with IMAGE_TAG by the build pipeline (or
|
||||||
|
# by a human kicking off a deploy from the Woodpecker UI).
|
||||||
|
# The earlier `[manual, push]` would fire on every push and fail
|
||||||
|
# at check-vars because IMAGE_TAG is unset on push events.
|
||||||
|
- event: manual
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: check-vars
|
- name: check-vars
|
||||||
|
|
|
||||||
|
|
@ -438,6 +438,10 @@ def fidelity_ingest(
|
||||||
sys.exit(2)
|
sys.exit(2)
|
||||||
|
|
||||||
async def _run() -> None:
|
async def _run() -> None:
|
||||||
|
from broker_sync.providers.fidelity_planviewer import (
|
||||||
|
gains_offset_delta_activity,
|
||||||
|
)
|
||||||
|
|
||||||
sink = WealthfolioSink(
|
sink = WealthfolioSink(
|
||||||
base_url=wf_base_url,
|
base_url=wf_base_url,
|
||||||
username=wf_username,
|
username=wf_username,
|
||||||
|
|
@ -455,12 +459,36 @@ def fidelity_ingest(
|
||||||
result = await sync_provider_to_wealthfolio(
|
result = await sync_provider_to_wealthfolio(
|
||||||
provider=provider, sink=sink, dedup=dedup, since=since,
|
provider=provider, sink=sink, dedup=dedup, since=since,
|
||||||
)
|
)
|
||||||
|
# PlanViewer doesn't expose per-fund unit prices in any feed
|
||||||
|
# WF can consume, so the only way to keep WF's pension total in
|
||||||
|
# line with the live PlanViewer pot value is to emit a small
|
||||||
|
# DEPOSIT (or WITHDRAWAL on a market drop) each run sized to
|
||||||
|
# the growth since the last scrape. The dav_corrected PG view
|
||||||
|
# subtracts these offsets from net_contribution so the
|
||||||
|
# dashboard's Growth/ROI panels stay accurate.
|
||||||
|
gains_delta_emitted = 0
|
||||||
|
if provider.last_holdings:
|
||||||
|
wf_account_id = await sink.ensure_account(provider.accounts()[0])
|
||||||
|
prior_offset = await sink.cumulative_amount_with_notes_prefix(
|
||||||
|
account_id=wf_account_id,
|
||||||
|
notes_prefix="fidelity-planviewer:unrealised-gains-offset",
|
||||||
|
)
|
||||||
|
delta = gains_offset_delta_activity(
|
||||||
|
holdings=provider.last_holdings,
|
||||||
|
total_real_contribution=provider.last_total_contribution,
|
||||||
|
prior_offset_cumulative=prior_offset,
|
||||||
|
as_of=datetime.now(UTC),
|
||||||
|
)
|
||||||
|
if delta is not None:
|
||||||
|
await sink.import_activities([delta])
|
||||||
|
gains_delta_emitted = 1
|
||||||
finally:
|
finally:
|
||||||
await sink.close()
|
await sink.close()
|
||||||
typer.echo(f"fidelity-ingest: fetched={result.fetched} "
|
typer.echo(f"fidelity-ingest: fetched={result.fetched} "
|
||||||
f"new={result.new_after_dedup} "
|
f"new={result.new_after_dedup} "
|
||||||
f"imported={result.imported} "
|
f"imported={result.imported} "
|
||||||
f"failed={result.failed}")
|
f"failed={result.failed} "
|
||||||
|
f"gains_delta={gains_delta_emitted}")
|
||||||
if result.failed > 0:
|
if result.failed > 0:
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,21 +16,28 @@ We keep a Playwright-maintained session via ``storage_state.json``:
|
||||||
fund holdings. On 401/idle-timeout we raise
|
fund holdings. On 401/idle-timeout we raise
|
||||||
:class:`FidelitySessionError` so Prometheus alerts Viktor to re-seed.
|
:class:`FidelitySessionError` so Prometheus alerts Viktor to re-seed.
|
||||||
|
|
||||||
## Emitted Activity shape
|
## Emitted Activity / snapshot shape
|
||||||
|
|
||||||
- One ``DEPOSIT`` per cash-impacting transaction (Regular Premium, Single
|
- One ``DEPOSIT`` per cash-impacting transaction (Regular Premium, Single
|
||||||
Premium, rebate, etc.). ``external_id = fidelity:tx:<sha256[:16]>``.
|
Premium, rebate, etc.). ``external_id = fidelity:tx:<sha256[:16]>``.
|
||||||
- One synthetic ``DEPOSIT`` for unrealised gains so WF's Net Worth matches
|
|
||||||
the Fidelity dashboard. ``external_id =
|
|
||||||
fidelity:gains:<YYYY-MM-DD>``.
|
|
||||||
- Bulk Switches / Fund Switches are skipped (no cash movement).
|
- Bulk Switches / Fund Switches are skipped (no cash movement).
|
||||||
|
- After the activity stream drains, the ``fidelity-ingest`` CLI calls
|
||||||
|
``WealthfolioSink.push_manual_snapshots`` with one ``ManualSnapshotPayload``
|
||||||
|
per fund holding (today's date, units + cost basis allocated
|
||||||
|
proportionally to fund value share). This sets per-fund quantity and
|
||||||
|
cost basis in WF so the dashboard Positions table shows the pension
|
||||||
|
funds alongside the brokerage assets.
|
||||||
|
- The old synthetic ``fidelity:gains:<date>`` DEPOSIT is no longer
|
||||||
|
emitted — the snapshot supersedes it. Old offset rows that landed
|
||||||
|
before this change are corrected at the dashboard layer by the
|
||||||
|
``dav_corrected`` PG view (``infra/stacks/wealthfolio/main.tf``).
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import contextlib
|
import contextlib
|
||||||
import logging
|
import logging
|
||||||
from collections.abc import AsyncIterator
|
from collections.abc import AsyncIterator
|
||||||
from datetime import UTC, datetime
|
from datetime import date, datetime
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, NamedTuple
|
from typing import Any, NamedTuple
|
||||||
|
|
@ -42,6 +49,7 @@ from broker_sync.providers.parsers.fidelity import (
|
||||||
parse_transactions_html,
|
parse_transactions_html,
|
||||||
parse_valuation_json,
|
parse_valuation_json,
|
||||||
)
|
)
|
||||||
|
from broker_sync.sinks.wealthfolio import ManualSnapshotPayload, SnapshotPosition
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
@ -86,37 +94,6 @@ def _tx_to_activity(tx: FidelityCashTx) -> Activity:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _gains_offset_activity(
|
|
||||||
holdings: list[FidelityHolding],
|
|
||||||
transactions: list[FidelityCashTx],
|
|
||||||
as_of: datetime,
|
|
||||||
) -> Activity | None:
|
|
||||||
"""Create a synthetic DEPOSIT/WITHDRAWAL so WF Net Worth matches the
|
|
||||||
Fidelity dashboard's reported pot value.
|
|
||||||
|
|
||||||
The offset carries a date-derived external_id so monthly runs refresh
|
|
||||||
the same synthetic entry rather than stacking duplicates.
|
|
||||||
"""
|
|
||||||
if not holdings:
|
|
||||||
return None
|
|
||||||
total_value = sum((h.total_value for h in holdings), Decimal(0))
|
|
||||||
total_contrib = sum((t.amount for t in transactions), Decimal(0))
|
|
||||||
gains = total_value - total_contrib
|
|
||||||
if gains == 0:
|
|
||||||
return None
|
|
||||||
return Activity(
|
|
||||||
external_id=f"fidelity:gains:{as_of.date().isoformat()}",
|
|
||||||
account_id=ACCOUNT_ID,
|
|
||||||
account_type=AccountType.WORKPLACE_PENSION,
|
|
||||||
date=as_of,
|
|
||||||
activity_type=ActivityType.DEPOSIT if gains > 0 else ActivityType.WITHDRAWAL,
|
|
||||||
currency=_CCY,
|
|
||||||
amount=abs(gains),
|
|
||||||
notes=(f"fidelity-planviewer:unrealised-gains-offset "
|
|
||||||
f"(pot=£{total_value}, contrib=£{total_contrib})"),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class FidelityPlanViewerProvider:
|
class FidelityPlanViewerProvider:
|
||||||
"""Read-only provider against Fidelity UK PlanViewer.
|
"""Read-only provider against Fidelity UK PlanViewer.
|
||||||
|
|
||||||
|
|
@ -125,11 +102,18 @@ class FidelityPlanViewerProvider:
|
||||||
- ``fetch(since, before)`` opens a Playwright session with the saved
|
- ``fetch(since, before)`` opens a Playwright session with the saved
|
||||||
storage_state, navigates to the transaction-history page with a wide
|
storage_state, navigates to the transaction-history page with a wide
|
||||||
date range, scrapes the table, and intercepts the valuation XHR.
|
date range, scrapes the table, and intercepts the valuation XHR.
|
||||||
|
- After ``fetch()`` completes, ``last_holdings`` holds the per-fund
|
||||||
|
unit positions and ``last_total_contribution`` the cumulative cash
|
||||||
|
contribution — used by the ``fidelity-ingest`` CLI to emit a
|
||||||
|
delta-shaped DEPOSIT that nudges WF's net worth to match the
|
||||||
|
PlanViewer reported pot value (see ``gains_offset_delta_activity``).
|
||||||
"""
|
"""
|
||||||
name = "fidelity-planviewer"
|
name = "fidelity-planviewer"
|
||||||
|
|
||||||
def __init__(self, creds: FidelityCreds) -> None:
|
def __init__(self, creds: FidelityCreds) -> None:
|
||||||
self._creds = creds
|
self._creds = creds
|
||||||
|
self.last_holdings: list[FidelityHolding] = []
|
||||||
|
self.last_total_contribution: Decimal = Decimal(0)
|
||||||
|
|
||||||
def accounts(self) -> list[Account]:
|
def accounts(self) -> list[Account]:
|
||||||
return [
|
return [
|
||||||
|
|
@ -162,19 +146,113 @@ class FidelityPlanViewerProvider:
|
||||||
log.info("fidelity: parsed %d transactions, %d holdings",
|
log.info("fidelity: parsed %d transactions, %d holdings",
|
||||||
len(transactions), len(holdings))
|
len(transactions), len(holdings))
|
||||||
|
|
||||||
|
# Snapshot the per-fund holdings for the CLI to push as a manual
|
||||||
|
# holdings_snapshot after this generator drains. Wealthfolio's
|
||||||
|
# activity model can't represent pension fund unit purchases (no
|
||||||
|
# per-purchase price feed from PlanViewer), so we record current
|
||||||
|
# state via /api/v1/snapshots/import instead.
|
||||||
|
self.last_holdings = holdings
|
||||||
|
self.last_total_contribution = sum(
|
||||||
|
(t.amount for t in transactions), Decimal(0)
|
||||||
|
)
|
||||||
|
|
||||||
for tx in transactions:
|
for tx in transactions:
|
||||||
if since is not None and tx.date < since:
|
if since is not None and tx.date < since:
|
||||||
continue
|
continue
|
||||||
if before is not None and tx.date >= before:
|
if before is not None and tx.date >= before:
|
||||||
continue
|
continue
|
||||||
yield _tx_to_activity(tx)
|
yield _tx_to_activity(tx)
|
||||||
|
# Gains-offset DEPOSITs are emitted by the CLI (which has the
|
||||||
|
# prior cumulative offset from WF). See `gains_offset_delta_activity`.
|
||||||
|
|
||||||
# The gains offset is always "as of now" so it reflects today's pot.
|
|
||||||
# Only emit when the caller isn't windowing (full state).
|
def gains_offset_delta_activity(
|
||||||
if since is None and before is None:
|
holdings: list[FidelityHolding],
|
||||||
offset = _gains_offset_activity(holdings, transactions, datetime.now(UTC))
|
total_real_contribution: Decimal,
|
||||||
if offset is not None:
|
prior_offset_cumulative: Decimal,
|
||||||
yield offset
|
as_of: datetime,
|
||||||
|
min_delta: Decimal = Decimal("0.5"),
|
||||||
|
) -> Activity | None:
|
||||||
|
"""Compute the gains-offset DELTA since the last scrape and shape it
|
||||||
|
as a DEPOSIT (or WITHDRAWAL on a market drop).
|
||||||
|
|
||||||
|
The pension's per-fund prices aren't trackable in WF directly (no
|
||||||
|
public quote feed for these institutional life-fund share classes).
|
||||||
|
Instead, each monthly scrape emits a single small DEPOSIT/WITHDRAWAL
|
||||||
|
sized to ``(current_pot - real_contributions) - prior_cumulative_offset``
|
||||||
|
— i.e., the growth (or loss) accrued since the last run.
|
||||||
|
|
||||||
|
Wealthfolio's net_contribution then incorrectly includes all these
|
||||||
|
offsets; the ``dav_corrected`` PG view subtracts them back out so the
|
||||||
|
dashboard's Growth/ROI panels remain accurate. The deterministic
|
||||||
|
external_id (per scrape date) lets re-runs of the same day overwrite
|
||||||
|
rather than stack duplicates.
|
||||||
|
"""
|
||||||
|
if not holdings:
|
||||||
|
return None
|
||||||
|
current_pot = sum((h.total_value for h in holdings), Decimal(0))
|
||||||
|
current_gain = current_pot - total_real_contribution
|
||||||
|
delta = current_gain - prior_offset_cumulative
|
||||||
|
if abs(delta) < min_delta:
|
||||||
|
return None
|
||||||
|
return Activity(
|
||||||
|
external_id=f"fidelity:gains-delta:{as_of.date().isoformat()}",
|
||||||
|
account_id=ACCOUNT_ID,
|
||||||
|
account_type=AccountType.WORKPLACE_PENSION,
|
||||||
|
date=as_of,
|
||||||
|
activity_type=ActivityType.DEPOSIT if delta > 0 else ActivityType.WITHDRAWAL,
|
||||||
|
currency=_CCY,
|
||||||
|
amount=abs(delta),
|
||||||
|
notes=(
|
||||||
|
f"fidelity-planviewer:unrealised-gains-offset delta=£{delta} "
|
||||||
|
f"(pot=£{current_pot}, contrib=£{total_real_contribution}, "
|
||||||
|
f"prior_offset=£{prior_offset_cumulative})"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def fidelity_holdings_to_snapshot(
|
||||||
|
holdings: list[FidelityHolding],
|
||||||
|
total_real_contribution: Decimal,
|
||||||
|
as_of: date,
|
||||||
|
) -> ManualSnapshotPayload | None:
|
||||||
|
"""Convert scraped holdings into a Wealthfolio manual snapshot payload.
|
||||||
|
|
||||||
|
Cost-basis allocation: PlanViewer doesn't expose historical purchase
|
||||||
|
prices for individual fund unit buys, so we approximate per-fund
|
||||||
|
cost basis by allocating the cumulative cash contribution
|
||||||
|
proportionally to each fund's share of the current pot value. For
|
||||||
|
the typical single-fund Meta scheme this is exact; if Viktor's plan
|
||||||
|
later splits into multiple funds the proportional split is the
|
||||||
|
least-wrong allocation we can compute from monthly snapshots.
|
||||||
|
|
||||||
|
cashBalances is set to zero — pension contributions flow straight
|
||||||
|
into funds, the synthetic Wealthfolio "cash balance" only existed
|
||||||
|
because of the old gains-offset DEPOSIT hack.
|
||||||
|
"""
|
||||||
|
if not holdings:
|
||||||
|
return None
|
||||||
|
total_value = sum((h.total_value for h in holdings), Decimal(0))
|
||||||
|
if total_value <= 0:
|
||||||
|
return None
|
||||||
|
positions: list[SnapshotPosition] = []
|
||||||
|
for h in holdings:
|
||||||
|
share = h.total_value / total_value
|
||||||
|
cost = (total_real_contribution * share).quantize(Decimal("0.01"))
|
||||||
|
avg_cost = (cost / h.units).quantize(Decimal("0.0001")) if h.units > 0 else Decimal(0)
|
||||||
|
positions.append(SnapshotPosition(
|
||||||
|
symbol=h.fund_code,
|
||||||
|
quantity=h.units,
|
||||||
|
average_cost=avg_cost,
|
||||||
|
total_cost_basis=cost,
|
||||||
|
currency=h.currency,
|
||||||
|
))
|
||||||
|
return ManualSnapshotPayload(
|
||||||
|
date=as_of,
|
||||||
|
currency=_CCY,
|
||||||
|
positions=positions,
|
||||||
|
cash_balances={_CCY: Decimal(0)},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def _scrape_live_session(
|
async def _scrape_live_session(
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,9 @@ from __future__ import annotations
|
||||||
|
|
||||||
import json
|
import json
|
||||||
from collections.abc import Iterable
|
from collections.abc import Iterable
|
||||||
from datetime import UTC
|
from dataclasses import dataclass
|
||||||
|
from datetime import UTC, date
|
||||||
|
from decimal import Decimal
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
|
@ -14,6 +16,8 @@ _LOGIN_PATH = "/api/v1/auth/login"
|
||||||
_ACCOUNTS_PATH = "/api/v1/accounts"
|
_ACCOUNTS_PATH = "/api/v1/accounts"
|
||||||
_IMPORT_CHECK = "/api/v1/activities/import/check"
|
_IMPORT_CHECK = "/api/v1/activities/import/check"
|
||||||
_IMPORT_REAL = "/api/v1/activities/import"
|
_IMPORT_REAL = "/api/v1/activities/import"
|
||||||
|
_SNAPSHOTS_IMPORT = "/api/v1/snapshots/import"
|
||||||
|
_ACTIVITIES_SEARCH = "/api/v1/activities/search"
|
||||||
|
|
||||||
|
|
||||||
class WealthfolioError(Exception):
|
class WealthfolioError(Exception):
|
||||||
|
|
@ -262,3 +266,131 @@ class WealthfolioSink:
|
||||||
f"First warning: {first_warn}")
|
f"First warning: {first_warn}")
|
||||||
assert isinstance(got, list)
|
assert isinstance(got, list)
|
||||||
return [r for r in got if isinstance(r, dict)]
|
return [r for r in got if isinstance(r, dict)]
|
||||||
|
|
||||||
|
# -- activity lookups --
|
||||||
|
|
||||||
|
async def cumulative_amount_with_notes_prefix(
|
||||||
|
self,
|
||||||
|
account_id: str,
|
||||||
|
notes_prefix: str,
|
||||||
|
) -> Decimal:
|
||||||
|
"""Sum the amount of DEPOSIT/WITHDRAWAL activities whose notes start
|
||||||
|
with ``notes_prefix``, signed (deposits positive, withdrawals negative).
|
||||||
|
|
||||||
|
Used by the Fidelity provider to compute the delta gains-offset:
|
||||||
|
``current_gain - cumulative_existing_offset`` becomes the new
|
||||||
|
DEPOSIT to emit on each monthly run.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
resp = await self._request(
|
||||||
|
"POST", _ACTIVITIES_SEARCH,
|
||||||
|
json={"accountIds": [account_id], "page": 1, "pageSize": 500},
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
return Decimal(0)
|
||||||
|
if resp.status_code >= 400:
|
||||||
|
return Decimal(0)
|
||||||
|
payload = resp.json()
|
||||||
|
rows = payload.get("data", payload) if isinstance(payload, dict) else payload
|
||||||
|
if not isinstance(rows, list):
|
||||||
|
return Decimal(0)
|
||||||
|
total = Decimal(0)
|
||||||
|
for r in rows:
|
||||||
|
if not isinstance(r, dict):
|
||||||
|
continue
|
||||||
|
notes = r.get("comment") or r.get("notes") or ""
|
||||||
|
if not isinstance(notes, str) or not notes.startswith(notes_prefix):
|
||||||
|
continue
|
||||||
|
amt_raw = r.get("amount")
|
||||||
|
if amt_raw is None:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
amt = Decimal(str(amt_raw))
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
atype = (r.get("activityType") or r.get("activity_type") or "").upper()
|
||||||
|
if atype == "WITHDRAWAL":
|
||||||
|
total -= amt
|
||||||
|
else:
|
||||||
|
total += amt
|
||||||
|
return total
|
||||||
|
|
||||||
|
# -- manual holdings snapshots --
|
||||||
|
|
||||||
|
async def push_manual_snapshots(
|
||||||
|
self,
|
||||||
|
account_id: str,
|
||||||
|
snapshots: list[ManualSnapshotPayload],
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Push manual holdings snapshots to /api/v1/snapshots/import.
|
||||||
|
|
||||||
|
Each snapshot carries a date + per-fund positions + cash balances.
|
||||||
|
Wealthfolio auto-creates any unknown asset symbol with
|
||||||
|
``kind=INVESTMENT, quoteMode=MANUAL, quoteCcy=<currency>`` and uses
|
||||||
|
the snapshot to derive holdings + valuation for that date — bypassing
|
||||||
|
the activity-ledger derivation entirely for the targeted day.
|
||||||
|
|
||||||
|
Used by the Fidelity provider since PlanViewer exposes current
|
||||||
|
fund units + price but no per-trade history. Re-imports for the
|
||||||
|
same (account, date) overwrite in place.
|
||||||
|
"""
|
||||||
|
if not snapshots:
|
||||||
|
return {"snapshotsImported": 0, "snapshotsFailed": 0, "errors": []}
|
||||||
|
body = {
|
||||||
|
"accountId": account_id,
|
||||||
|
"snapshots": [_snapshot_to_payload(s) for s in snapshots],
|
||||||
|
}
|
||||||
|
resp = await self._request("POST", _SNAPSHOTS_IMPORT, json=body)
|
||||||
|
if resp.status_code >= 400:
|
||||||
|
try:
|
||||||
|
payload = resp.json()
|
||||||
|
except Exception:
|
||||||
|
payload = {"raw": resp.text}
|
||||||
|
raise WealthfolioError(
|
||||||
|
f"Wealthfolio /snapshots/import rejected: {payload}")
|
||||||
|
result = resp.json()
|
||||||
|
assert isinstance(result, dict)
|
||||||
|
failed = int(result.get("snapshotsFailed", 0))
|
||||||
|
if failed > 0:
|
||||||
|
raise WealthfolioError(
|
||||||
|
f"Wealthfolio /snapshots/import: {failed} snapshot(s) failed; "
|
||||||
|
f"errors={result.get('errors')}")
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class SnapshotPosition:
|
||||||
|
"""A per-fund position row in a Wealthfolio manual snapshot."""
|
||||||
|
symbol: str
|
||||||
|
quantity: Decimal
|
||||||
|
average_cost: Decimal
|
||||||
|
total_cost_basis: Decimal
|
||||||
|
currency: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class ManualSnapshotPayload:
|
||||||
|
"""Sink-facing snapshot row. Mirrors the JSON shape WF expects."""
|
||||||
|
date: date
|
||||||
|
currency: str
|
||||||
|
positions: list[SnapshotPosition]
|
||||||
|
cash_balances: dict[str, Decimal]
|
||||||
|
|
||||||
|
|
||||||
|
def _snapshot_to_payload(s: ManualSnapshotPayload) -> dict[str, Any]:
|
||||||
|
"""Serialise a ManualSnapshotPayload into WF's import wire format."""
|
||||||
|
return {
|
||||||
|
"date": s.date.isoformat(),
|
||||||
|
"currency": s.currency,
|
||||||
|
"positions": [
|
||||||
|
{
|
||||||
|
"symbol": p.symbol,
|
||||||
|
"quantity": format(p.quantity, "f"),
|
||||||
|
"averageCost": format(p.average_cost, "f"),
|
||||||
|
"totalCostBasis": format(p.total_cost_basis, "f"),
|
||||||
|
"currency": p.currency,
|
||||||
|
}
|
||||||
|
for p in s.positions
|
||||||
|
],
|
||||||
|
"cashBalances": {k: format(v, "f") for k, v in s.cash_balances.items()},
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import json
|
import json
|
||||||
from datetime import UTC, datetime
|
from datetime import UTC, date, datetime
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
@ -13,7 +13,8 @@ from broker_sync.providers.fidelity_planviewer import (
|
||||||
FidelityCreds,
|
FidelityCreds,
|
||||||
FidelityPlanViewerProvider,
|
FidelityPlanViewerProvider,
|
||||||
FidelityProviderConfigError,
|
FidelityProviderConfigError,
|
||||||
_gains_offset_activity,
|
fidelity_holdings_to_snapshot,
|
||||||
|
gains_offset_delta_activity,
|
||||||
)
|
)
|
||||||
from broker_sync.providers.parsers.fidelity import (
|
from broker_sync.providers.parsers.fidelity import (
|
||||||
parse_transactions_html,
|
parse_transactions_html,
|
||||||
|
|
@ -96,21 +97,113 @@ def test_parse_valuation_fixture() -> None:
|
||||||
assert set(h.units_by_source.keys()) >= {"SASC", "ERXS"}
|
assert set(h.units_by_source.keys()) >= {"SASC", "ERXS"}
|
||||||
|
|
||||||
|
|
||||||
def test_gains_offset_emits_deposit_when_pot_exceeds_contributions() -> None:
|
def test_holdings_to_snapshot_real_fixture() -> None:
|
||||||
html = (_FIXTURES / "transactions-full.html").read_text()
|
html = (_FIXTURES / "transactions-full.html").read_text()
|
||||||
valuation = json.loads((_FIXTURES / "valuation.json").read_text())
|
valuation = json.loads((_FIXTURES / "valuation.json").read_text())
|
||||||
txs = parse_transactions_html(html)
|
|
||||||
holdings = parse_valuation_json(valuation)
|
holdings = parse_valuation_json(valuation)
|
||||||
as_of = datetime(2026, 4, 18, tzinfo=UTC)
|
total_contrib = sum((tx.amount for tx in parse_transactions_html(html)),
|
||||||
offset = _gains_offset_activity(holdings, txs, as_of)
|
Decimal(0))
|
||||||
assert offset is not None
|
|
||||||
assert offset.activity_type in (ActivityType.DEPOSIT, ActivityType.WITHDRAWAL)
|
snapshot = fidelity_holdings_to_snapshot(
|
||||||
assert offset.amount is not None and offset.amount > 0
|
holdings=holdings,
|
||||||
assert offset.external_id == "fidelity:gains:2026-04-18"
|
total_real_contribution=total_contrib,
|
||||||
|
as_of=date(2026, 4, 18),
|
||||||
|
)
|
||||||
|
assert snapshot is not None
|
||||||
|
assert snapshot.date == date(2026, 4, 18)
|
||||||
|
assert snapshot.currency == "GBP"
|
||||||
|
# Cost basis sums to the cash contributions (allocated by fund value share)
|
||||||
|
sum_cost = sum((p.total_cost_basis for p in snapshot.positions), Decimal(0))
|
||||||
|
assert abs(sum_cost - total_contrib) < Decimal("1")
|
||||||
|
# Meta scheme had KDOA + LAFC + one other at fixture time; the
|
||||||
|
# dominant fund must be KDOA.
|
||||||
|
symbols = [p.symbol for p in snapshot.positions]
|
||||||
|
assert "KDOA" in symbols
|
||||||
|
kdoa = next(p for p in snapshot.positions if p.symbol == "KDOA")
|
||||||
|
assert kdoa.quantity > 0
|
||||||
|
# Proportional cost-basis allocation: KDOA holds nearly the whole pot
|
||||||
|
# so it should get the lion's share of cost
|
||||||
|
kdoa_share = kdoa.total_cost_basis / sum_cost
|
||||||
|
assert kdoa_share > Decimal("0.9")
|
||||||
|
# cashBalances zero — pension contributions flow straight into funds
|
||||||
|
assert snapshot.cash_balances == {"GBP": Decimal(0)}
|
||||||
|
|
||||||
|
|
||||||
def test_gains_offset_none_when_no_holdings() -> None:
|
def test_holdings_to_snapshot_none_when_no_holdings() -> None:
|
||||||
assert _gains_offset_activity(
|
assert fidelity_holdings_to_snapshot(
|
||||||
holdings=[], transactions=[],
|
holdings=[], total_real_contribution=Decimal("100"),
|
||||||
as_of=datetime(2026, 4, 18, tzinfo=UTC),
|
as_of=date(2026, 4, 18),
|
||||||
|
) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_provider_caches_holdings_for_cli_snapshot_push() -> None:
|
||||||
|
"""The CLI reads `last_holdings` after fetch() drains to push the
|
||||||
|
manual snapshot. This guards the contract that fetch() populates the
|
||||||
|
attribute even when no Activity is yielded (e.g., backfill window
|
||||||
|
cut-off)."""
|
||||||
|
prov = FidelityPlanViewerProvider(FidelityCreds(
|
||||||
|
storage_state_path="/tmp/x", plan_id="META",
|
||||||
|
))
|
||||||
|
# Pre-fetch state: empty
|
||||||
|
assert prov.last_holdings == []
|
||||||
|
assert prov.last_total_contribution == Decimal(0)
|
||||||
|
|
||||||
|
|
||||||
|
# -- delta-shaped gains offset (the monthly accumulation mechanism) --
|
||||||
|
|
||||||
|
|
||||||
|
def _holdings_summing_to(total: Decimal) -> list:
|
||||||
|
from broker_sync.providers.parsers.fidelity import FidelityHolding
|
||||||
|
return [FidelityHolding(
|
||||||
|
fund_code="KDOA", fund_name="Test", units=Decimal("100"),
|
||||||
|
unit_price=total / Decimal("100"), currency="GBP", total_value=total,
|
||||||
|
units_by_source={},
|
||||||
|
)]
|
||||||
|
|
||||||
|
|
||||||
|
def test_gains_delta_emits_deposit_when_gain_exceeds_prior_offset() -> None:
|
||||||
|
# pot £145k, real contrib £102k → current gain £43k; prior offset £35k
|
||||||
|
# → delta = +£8k
|
||||||
|
activity = gains_offset_delta_activity(
|
||||||
|
holdings=_holdings_summing_to(Decimal("145000")),
|
||||||
|
total_real_contribution=Decimal("102000"),
|
||||||
|
prior_offset_cumulative=Decimal("35000"),
|
||||||
|
as_of=datetime(2026, 5, 17, tzinfo=UTC),
|
||||||
|
)
|
||||||
|
assert activity is not None
|
||||||
|
assert activity.activity_type == ActivityType.DEPOSIT
|
||||||
|
assert activity.amount == Decimal("8000")
|
||||||
|
assert activity.external_id == "fidelity:gains-delta:2026-05-17"
|
||||||
|
assert "unrealised-gains-offset" in (activity.notes or "")
|
||||||
|
|
||||||
|
|
||||||
|
def test_gains_delta_emits_withdrawal_on_market_drop() -> None:
|
||||||
|
# pot dropped: current gain £30k, prior offset £35k → delta = -£5k
|
||||||
|
activity = gains_offset_delta_activity(
|
||||||
|
holdings=_holdings_summing_to(Decimal("132000")),
|
||||||
|
total_real_contribution=Decimal("102000"),
|
||||||
|
prior_offset_cumulative=Decimal("35000"),
|
||||||
|
as_of=datetime(2026, 5, 17, tzinfo=UTC),
|
||||||
|
)
|
||||||
|
assert activity is not None
|
||||||
|
assert activity.activity_type == ActivityType.WITHDRAWAL
|
||||||
|
assert activity.amount == Decimal("5000")
|
||||||
|
|
||||||
|
|
||||||
|
def test_gains_delta_suppressed_below_minimum() -> None:
|
||||||
|
# delta ~£0.20, below the £0.50 min — skip emission to avoid noise.
|
||||||
|
activity = gains_offset_delta_activity(
|
||||||
|
holdings=_holdings_summing_to(Decimal("137000.20")),
|
||||||
|
total_real_contribution=Decimal("102000"),
|
||||||
|
prior_offset_cumulative=Decimal("35000"),
|
||||||
|
as_of=datetime(2026, 5, 17, tzinfo=UTC),
|
||||||
|
)
|
||||||
|
assert activity is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_gains_delta_none_when_no_holdings() -> None:
|
||||||
|
assert gains_offset_delta_activity(
|
||||||
|
holdings=[], total_real_contribution=Decimal("0"),
|
||||||
|
prior_offset_cumulative=Decimal("0"),
|
||||||
|
as_of=datetime(2026, 5, 17, tzinfo=UTC),
|
||||||
) is None
|
) is None
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import json
|
import json
|
||||||
from datetime import UTC, datetime
|
from datetime import UTC, date, datetime
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
@ -12,6 +12,9 @@ import pytest
|
||||||
from broker_sync.models import Account, AccountType, Activity, ActivityType
|
from broker_sync.models import Account, AccountType, Activity, ActivityType
|
||||||
from broker_sync.sinks.wealthfolio import (
|
from broker_sync.sinks.wealthfolio import (
|
||||||
ImportValidationError,
|
ImportValidationError,
|
||||||
|
ManualSnapshotPayload,
|
||||||
|
SnapshotPosition,
|
||||||
|
WealthfolioError,
|
||||||
WealthfolioSink,
|
WealthfolioSink,
|
||||||
WealthfolioUnauthorizedError,
|
WealthfolioUnauthorizedError,
|
||||||
)
|
)
|
||||||
|
|
@ -274,3 +277,99 @@ async def test_import_halts_on_validation_failure(tmp_path: Path) -> None:
|
||||||
with pytest.raises(ImportValidationError, match="unknown symbol"):
|
with pytest.raises(ImportValidationError, match="unknown symbol"):
|
||||||
await sink.import_activities([_buy()])
|
await sink.import_activities([_buy()])
|
||||||
assert calls == ["/api/v1/activities/import/check"] # real import never hit
|
assert calls == ["/api/v1/activities/import/check"] # real import never hit
|
||||||
|
|
||||||
|
|
||||||
|
# -- Manual snapshot import (Fidelity path) --
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_push_manual_snapshots_serialises_decimals_and_calls_endpoint(
|
||||||
|
tmp_path: Path,
|
||||||
|
) -> None:
|
||||||
|
sp = tmp_path / "s.json"
|
||||||
|
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
|
||||||
|
|
||||||
|
seen: dict[str, Any] = {}
|
||||||
|
|
||||||
|
async def handler(req: httpx.Request) -> httpx.Response:
|
||||||
|
if req.url.path == "/api/v1/snapshots/import":
|
||||||
|
seen["body"] = json.loads(req.content)
|
||||||
|
return httpx.Response(
|
||||||
|
200,
|
||||||
|
json={"snapshotsImported": 1, "snapshotsFailed": 0, "errors": []},
|
||||||
|
)
|
||||||
|
return httpx.Response(404)
|
||||||
|
|
||||||
|
sink = _client(httpx.MockTransport(handler), sp)
|
||||||
|
snapshot = ManualSnapshotPayload(
|
||||||
|
date=date(2026, 5, 16),
|
||||||
|
currency="GBP",
|
||||||
|
positions=[
|
||||||
|
SnapshotPosition(
|
||||||
|
symbol="KDOA",
|
||||||
|
quantity=Decimal("4200.5"),
|
||||||
|
average_cost=Decimal("24.29"),
|
||||||
|
total_cost_basis=Decimal("102004.15"),
|
||||||
|
currency="GBP",
|
||||||
|
),
|
||||||
|
],
|
||||||
|
cash_balances={"GBP": Decimal(0)},
|
||||||
|
)
|
||||||
|
result = await sink.push_manual_snapshots(
|
||||||
|
account_id="a7d6208d-2bd6-4f85-bf54-b77984c78234",
|
||||||
|
snapshots=[snapshot],
|
||||||
|
)
|
||||||
|
assert result["snapshotsImported"] == 1
|
||||||
|
# Wire format: numeric fields are STRINGS (Decimal.__format__('f'))
|
||||||
|
body = seen["body"]
|
||||||
|
assert body["accountId"] == "a7d6208d-2bd6-4f85-bf54-b77984c78234"
|
||||||
|
pos = body["snapshots"][0]["positions"][0]
|
||||||
|
assert pos == {
|
||||||
|
"symbol": "KDOA",
|
||||||
|
"quantity": "4200.5",
|
||||||
|
"averageCost": "24.29",
|
||||||
|
"totalCostBasis": "102004.15",
|
||||||
|
"currency": "GBP",
|
||||||
|
}
|
||||||
|
assert body["snapshots"][0]["cashBalances"] == {"GBP": "0"}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_push_manual_snapshots_raises_on_partial_failure(
|
||||||
|
tmp_path: Path,
|
||||||
|
) -> None:
|
||||||
|
sp = tmp_path / "s.json"
|
||||||
|
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
|
||||||
|
|
||||||
|
async def handler(req: httpx.Request) -> httpx.Response:
|
||||||
|
return httpx.Response(
|
||||||
|
200,
|
||||||
|
json={
|
||||||
|
"snapshotsImported": 0,
|
||||||
|
"snapshotsFailed": 1,
|
||||||
|
"errors": [{"row": 0, "msg": "bad symbol"}],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
sink = _client(httpx.MockTransport(handler), sp)
|
||||||
|
snapshot = ManualSnapshotPayload(
|
||||||
|
date=date(2026, 5, 16), currency="GBP",
|
||||||
|
positions=[], cash_balances={},
|
||||||
|
)
|
||||||
|
with pytest.raises(WealthfolioError, match="bad symbol"):
|
||||||
|
await sink.push_manual_snapshots(account_id="acct", snapshots=[snapshot])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_push_manual_snapshots_short_circuits_on_empty(
|
||||||
|
tmp_path: Path,
|
||||||
|
) -> None:
|
||||||
|
sp = tmp_path / "s.json"
|
||||||
|
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
|
||||||
|
|
||||||
|
async def handler(req: httpx.Request) -> httpx.Response:
|
||||||
|
raise AssertionError(f"unexpected request: {req.method} {req.url.path}")
|
||||||
|
|
||||||
|
sink = _client(httpx.MockTransport(handler), sp)
|
||||||
|
result = await sink.push_manual_snapshots(account_id="acct", snapshots=[])
|
||||||
|
assert result["snapshotsImported"] == 0
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue