Compare commits

..

No commits in common. "main" and "phase-0-scaffold" have entirely different histories.

20 changed files with 307 additions and 3875 deletions

View file

@ -1,9 +1,5 @@
when: when:
# Manual-only — fired with IMAGE_TAG by the build pipeline (or - event: [manual, push]
# by a human kicking off a deploy from the Woodpecker UI).
# The earlier `[manual, push]` would fire on every push and fail
# at check-vars because IMAGE_TAG is unset on push events.
- event: manual
steps: steps:
- name: check-vars - name: check-vars

View file

@ -230,117 +230,6 @@ def invest_engine(
asyncio.run(_run()) asyncio.run(_run())
@app.command("ibkr")
def ibkr(
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
wf_session_path: str = typer.Option(
"/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"
),
ibkr_flex_token: str = typer.Option(..., envvar="IBKR_FLEX_TOKEN"),
ibkr_flex_query_id: str = typer.Option(..., envvar="IBKR_FLEX_QUERY_ID"),
ibkr_account_id_upstream: str = typer.Option(..., envvar="IBKR_ACCOUNT_ID_UPSTREAM"),
pushgateway_url: str = typer.Option(
"http://prometheus-prometheus-pushgateway.monitoring:9091/metrics",
envvar="PUSHGATEWAY_URL",
),
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
) -> None:
"""Phase 2c — daily IBKR Flex Web Service → Wealthfolio sync.
Pulls an Activity Flex Query (Trades + Cash + OpenPositions), maps to
broker-sync Activities, pushes through the shared pipeline, then
reconciles broker-reported OpenPositions against WF-computed quantities
and publishes a Pushgateway drift metric.
The Wealthfolio account UUID is resolved via the pipeline's
ensure_account(provider="ibkr", providerAccountId=IBKR_ACCOUNT_ID_UPSTREAM)
lookup no need to wire the UUID in as a separate env var.
"""
import time
from decimal import Decimal
from broker_sync.dedup import SyncRecordStore
from broker_sync.metrics import push_pushgateway
from broker_sync.pipeline import sync_provider_to_wealthfolio
from broker_sync.providers.ibkr import IBKRAccountMismatchError, IBKRProvider
from broker_sync.sinks.wealthfolio import WealthfolioSink
_setup_logging()
data = Path(data_dir)
data.mkdir(parents=True, exist_ok=True)
async def _run() -> None:
sink = WealthfolioSink(
base_url=wf_base_url,
username=wf_username,
password=wf_password,
session_path=wf_session_path,
)
provider = IBKRProvider(
token=ibkr_flex_token,
query_id=ibkr_flex_query_id,
upstream_account_id=ibkr_account_id_upstream,
)
dedup = SyncRecordStore(data / "sync.db")
try:
if not Path(wf_session_path).exists():
await sink.login()
result = await sync_provider_to_wealthfolio(
provider=provider,
sink=sink,
dedup=dedup,
)
# Resolve WF UUID for reconciliation. ensure_account is idempotent
# and already ran inside sync_provider_to_wealthfolio; this is a
# cheap re-lookup that returns the same UUID.
wf_uuid = await sink.ensure_account(provider.accounts()[0])
# Reconciliation: broker truth vs WF truth.
wf_qty = await sink.compute_position_qty(wf_uuid)
drift_metrics: list[tuple[str, dict[str, str], float]] = []
for symbol, broker_qty in provider.open_positions():
drift = broker_qty - wf_qty.get(symbol, Decimal(0))
drift_metrics.append(
(
"ibkr_position_drift_shares",
{"symbol": symbol, "account": "ibkr-uk"},
float(drift),
)
)
# Cash balances (one row per currency from CashReport, plus a
# BASE_SUMMARY row consolidated in account base currency).
for currency, ending_cash in provider.cash_balances():
drift_metrics.append(
(
"ibkr_cash_balance",
{"currency": currency, "account": "ibkr-uk"},
float(ending_cash),
)
)
drift_metrics.append(
("ibkr_sync_last_success_timestamp_seconds", {}, float(time.time()))
)
await push_pushgateway("broker-sync-ibkr", drift_metrics, pushgateway_url)
except IBKRAccountMismatchError as e:
typer.echo(f"IBKR: {e}", err=True)
sys.exit(2)
finally:
await provider.close()
await sink.close()
typer.echo(
f"ibkr: fetched={result.fetched} new={result.new_after_dedup} "
f"imported={result.imported} failed={result.failed}"
)
if result.failed > 0:
sys.exit(1)
asyncio.run(_run())
@app.command("finance-mysql-import") @app.command("finance-mysql-import")
def finance_mysql_import( def finance_mysql_import(
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
@ -549,10 +438,6 @@ def fidelity_ingest(
sys.exit(2) sys.exit(2)
async def _run() -> None: async def _run() -> None:
from broker_sync.providers.fidelity_planviewer import (
gains_offset_delta_activity,
)
sink = WealthfolioSink( sink = WealthfolioSink(
base_url=wf_base_url, base_url=wf_base_url,
username=wf_username, username=wf_username,
@ -570,36 +455,12 @@ def fidelity_ingest(
result = await sync_provider_to_wealthfolio( result = await sync_provider_to_wealthfolio(
provider=provider, sink=sink, dedup=dedup, since=since, provider=provider, sink=sink, dedup=dedup, since=since,
) )
# PlanViewer doesn't expose per-fund unit prices in any feed
# WF can consume, so the only way to keep WF's pension total in
# line with the live PlanViewer pot value is to emit a small
# DEPOSIT (or WITHDRAWAL on a market drop) each run sized to
# the growth since the last scrape. The dav_corrected PG view
# subtracts these offsets from net_contribution so the
# dashboard's Growth/ROI panels stay accurate.
gains_delta_emitted = 0
if provider.last_holdings:
wf_account_id = await sink.ensure_account(provider.accounts()[0])
prior_offset = await sink.cumulative_amount_with_notes_prefix(
account_id=wf_account_id,
notes_prefix="fidelity-planviewer:unrealised-gains-offset",
)
delta = gains_offset_delta_activity(
holdings=provider.last_holdings,
total_real_contribution=provider.last_total_contribution,
prior_offset_cumulative=prior_offset,
as_of=datetime.now(UTC),
)
if delta is not None:
await sink.import_activities([delta])
gains_delta_emitted = 1
finally: finally:
await sink.close() await sink.close()
typer.echo(f"fidelity-ingest: fetched={result.fetched} " typer.echo(f"fidelity-ingest: fetched={result.fetched} "
f"new={result.new_after_dedup} " f"new={result.new_after_dedup} "
f"imported={result.imported} " f"imported={result.imported} "
f"failed={result.failed} " f"failed={result.failed}")
f"gains_delta={gains_delta_emitted}")
if result.failed > 0: if result.failed > 0:
sys.exit(1) sys.exit(1)

View file

@ -1,51 +0,0 @@
"""Pushgateway client for broker-sync providers.
One function: push a list of (metric, labels, value) tuples to Prometheus
Pushgateway under a given job name. Used by providers to surface per-run
drift / staleness / row counts that Prometheus can alert on.
In-cluster URL: http://prometheus-prometheus-pushgateway.monitoring:9091/metrics
Pass via the ``pushgateway_url`` argument or the ``PUSHGATEWAY_URL`` env var.
"""
from __future__ import annotations
import logging
import os
from collections.abc import Iterable
import httpx
log = logging.getLogger(__name__)
def _format_metric(name: str, labels: dict[str, str], value: float) -> str:
if labels:
body = ",".join(f'{k}="{v}"' for k, v in sorted(labels.items()))
return f"{name}{{{body}}} {value}\n"
return f"{name} {value}\n"
async def push_pushgateway(
job: str,
metrics: Iterable[tuple[str, dict[str, str], float]],
pushgateway_url: str | None = None,
transport: httpx.AsyncBaseTransport | None = None,
) -> None:
"""POST text-format metrics to Pushgateway under ``job``.
``pushgateway_url`` falls back to the env var ``PUSHGATEWAY_URL``.
Raises ``RuntimeError`` if the URL is unset or POST returns non-2xx.
"""
url = pushgateway_url or os.environ.get("PUSHGATEWAY_URL")
if not url:
raise RuntimeError("PUSHGATEWAY_URL not set and no override provided")
body = "".join(_format_metric(n, lbls, v) for n, lbls, v in metrics)
target = f"{url.rstrip('/')}/job/{job}"
async with httpx.AsyncClient(transport=transport, timeout=15.0) as c:
resp = await c.post(target, content=body, headers={"Content-Type": "text/plain"})
if resp.status_code >= 300:
raise RuntimeError(
f"pushgateway POST {target} returned HTTP {resp.status_code}: "
f"{resp.text[:200]}"
)
log.info("pushgateway: pushed %d metrics to job=%s", len(body.splitlines()), job)

View file

@ -16,28 +16,21 @@ We keep a Playwright-maintained session via ``storage_state.json``:
fund holdings. On 401/idle-timeout we raise fund holdings. On 401/idle-timeout we raise
:class:`FidelitySessionError` so Prometheus alerts Viktor to re-seed. :class:`FidelitySessionError` so Prometheus alerts Viktor to re-seed.
## Emitted Activity / snapshot shape ## Emitted Activity shape
- One ``DEPOSIT`` per cash-impacting transaction (Regular Premium, Single - One ``DEPOSIT`` per cash-impacting transaction (Regular Premium, Single
Premium, rebate, etc.). ``external_id = fidelity:tx:<sha256[:16]>``. Premium, rebate, etc.). ``external_id = fidelity:tx:<sha256[:16]>``.
- One synthetic ``DEPOSIT`` for unrealised gains so WF's Net Worth matches
the Fidelity dashboard. ``external_id =
fidelity:gains:<YYYY-MM-DD>``.
- Bulk Switches / Fund Switches are skipped (no cash movement). - Bulk Switches / Fund Switches are skipped (no cash movement).
- After the activity stream drains, the ``fidelity-ingest`` CLI calls
``WealthfolioSink.push_manual_snapshots`` with one ``ManualSnapshotPayload``
per fund holding (today's date, units + cost basis allocated
proportionally to fund value share). This sets per-fund quantity and
cost basis in WF so the dashboard Positions table shows the pension
funds alongside the brokerage assets.
- The old synthetic ``fidelity:gains:<date>`` DEPOSIT is no longer
emitted the snapshot supersedes it. Old offset rows that landed
before this change are corrected at the dashboard layer by the
``dav_corrected`` PG view (``infra/stacks/wealthfolio/main.tf``).
""" """
from __future__ import annotations from __future__ import annotations
import contextlib import contextlib
import logging import logging
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from datetime import date, datetime from datetime import UTC, datetime
from decimal import Decimal from decimal import Decimal
from pathlib import Path from pathlib import Path
from typing import Any, NamedTuple from typing import Any, NamedTuple
@ -49,7 +42,6 @@ from broker_sync.providers.parsers.fidelity import (
parse_transactions_html, parse_transactions_html,
parse_valuation_json, parse_valuation_json,
) )
from broker_sync.sinks.wealthfolio import ManualSnapshotPayload, SnapshotPosition
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -94,6 +86,37 @@ def _tx_to_activity(tx: FidelityCashTx) -> Activity:
) )
def _gains_offset_activity(
holdings: list[FidelityHolding],
transactions: list[FidelityCashTx],
as_of: datetime,
) -> Activity | None:
"""Create a synthetic DEPOSIT/WITHDRAWAL so WF Net Worth matches the
Fidelity dashboard's reported pot value.
The offset carries a date-derived external_id so monthly runs refresh
the same synthetic entry rather than stacking duplicates.
"""
if not holdings:
return None
total_value = sum((h.total_value for h in holdings), Decimal(0))
total_contrib = sum((t.amount for t in transactions), Decimal(0))
gains = total_value - total_contrib
if gains == 0:
return None
return Activity(
external_id=f"fidelity:gains:{as_of.date().isoformat()}",
account_id=ACCOUNT_ID,
account_type=AccountType.WORKPLACE_PENSION,
date=as_of,
activity_type=ActivityType.DEPOSIT if gains > 0 else ActivityType.WITHDRAWAL,
currency=_CCY,
amount=abs(gains),
notes=(f"fidelity-planviewer:unrealised-gains-offset "
f"(pot=£{total_value}, contrib=£{total_contrib})"),
)
class FidelityPlanViewerProvider: class FidelityPlanViewerProvider:
"""Read-only provider against Fidelity UK PlanViewer. """Read-only provider against Fidelity UK PlanViewer.
@ -102,18 +125,11 @@ class FidelityPlanViewerProvider:
- ``fetch(since, before)`` opens a Playwright session with the saved - ``fetch(since, before)`` opens a Playwright session with the saved
storage_state, navigates to the transaction-history page with a wide storage_state, navigates to the transaction-history page with a wide
date range, scrapes the table, and intercepts the valuation XHR. date range, scrapes the table, and intercepts the valuation XHR.
- After ``fetch()`` completes, ``last_holdings`` holds the per-fund
unit positions and ``last_total_contribution`` the cumulative cash
contribution used by the ``fidelity-ingest`` CLI to emit a
delta-shaped DEPOSIT that nudges WF's net worth to match the
PlanViewer reported pot value (see ``gains_offset_delta_activity``).
""" """
name = "fidelity-planviewer" name = "fidelity-planviewer"
def __init__(self, creds: FidelityCreds) -> None: def __init__(self, creds: FidelityCreds) -> None:
self._creds = creds self._creds = creds
self.last_holdings: list[FidelityHolding] = []
self.last_total_contribution: Decimal = Decimal(0)
def accounts(self) -> list[Account]: def accounts(self) -> list[Account]:
return [ return [
@ -146,113 +162,19 @@ class FidelityPlanViewerProvider:
log.info("fidelity: parsed %d transactions, %d holdings", log.info("fidelity: parsed %d transactions, %d holdings",
len(transactions), len(holdings)) len(transactions), len(holdings))
# Snapshot the per-fund holdings for the CLI to push as a manual
# holdings_snapshot after this generator drains. Wealthfolio's
# activity model can't represent pension fund unit purchases (no
# per-purchase price feed from PlanViewer), so we record current
# state via /api/v1/snapshots/import instead.
self.last_holdings = holdings
self.last_total_contribution = sum(
(t.amount for t in transactions), Decimal(0)
)
for tx in transactions: for tx in transactions:
if since is not None and tx.date < since: if since is not None and tx.date < since:
continue continue
if before is not None and tx.date >= before: if before is not None and tx.date >= before:
continue continue
yield _tx_to_activity(tx) yield _tx_to_activity(tx)
# Gains-offset DEPOSITs are emitted by the CLI (which has the
# prior cumulative offset from WF). See `gains_offset_delta_activity`.
# The gains offset is always "as of now" so it reflects today's pot.
def gains_offset_delta_activity( # Only emit when the caller isn't windowing (full state).
holdings: list[FidelityHolding], if since is None and before is None:
total_real_contribution: Decimal, offset = _gains_offset_activity(holdings, transactions, datetime.now(UTC))
prior_offset_cumulative: Decimal, if offset is not None:
as_of: datetime, yield offset
min_delta: Decimal = Decimal("0.5"),
) -> Activity | None:
"""Compute the gains-offset DELTA since the last scrape and shape it
as a DEPOSIT (or WITHDRAWAL on a market drop).
The pension's per-fund prices aren't trackable in WF directly (no
public quote feed for these institutional life-fund share classes).
Instead, each monthly scrape emits a single small DEPOSIT/WITHDRAWAL
sized to ``(current_pot - real_contributions) - prior_cumulative_offset``
i.e., the growth (or loss) accrued since the last run.
Wealthfolio's net_contribution then incorrectly includes all these
offsets; the ``dav_corrected`` PG view subtracts them back out so the
dashboard's Growth/ROI panels remain accurate. The deterministic
external_id (per scrape date) lets re-runs of the same day overwrite
rather than stack duplicates.
"""
if not holdings:
return None
current_pot = sum((h.total_value for h in holdings), Decimal(0))
current_gain = current_pot - total_real_contribution
delta = current_gain - prior_offset_cumulative
if abs(delta) < min_delta:
return None
return Activity(
external_id=f"fidelity:gains-delta:{as_of.date().isoformat()}",
account_id=ACCOUNT_ID,
account_type=AccountType.WORKPLACE_PENSION,
date=as_of,
activity_type=ActivityType.DEPOSIT if delta > 0 else ActivityType.WITHDRAWAL,
currency=_CCY,
amount=abs(delta),
notes=(
f"fidelity-planviewer:unrealised-gains-offset delta=£{delta} "
f"(pot=£{current_pot}, contrib=£{total_real_contribution}, "
f"prior_offset=£{prior_offset_cumulative})"
),
)
def fidelity_holdings_to_snapshot(
holdings: list[FidelityHolding],
total_real_contribution: Decimal,
as_of: date,
) -> ManualSnapshotPayload | None:
"""Convert scraped holdings into a Wealthfolio manual snapshot payload.
Cost-basis allocation: PlanViewer doesn't expose historical purchase
prices for individual fund unit buys, so we approximate per-fund
cost basis by allocating the cumulative cash contribution
proportionally to each fund's share of the current pot value. For
the typical single-fund Meta scheme this is exact; if Viktor's plan
later splits into multiple funds the proportional split is the
least-wrong allocation we can compute from monthly snapshots.
cashBalances is set to zero pension contributions flow straight
into funds, the synthetic Wealthfolio "cash balance" only existed
because of the old gains-offset DEPOSIT hack.
"""
if not holdings:
return None
total_value = sum((h.total_value for h in holdings), Decimal(0))
if total_value <= 0:
return None
positions: list[SnapshotPosition] = []
for h in holdings:
share = h.total_value / total_value
cost = (total_real_contribution * share).quantize(Decimal("0.01"))
avg_cost = (cost / h.units).quantize(Decimal("0.0001")) if h.units > 0 else Decimal(0)
positions.append(SnapshotPosition(
symbol=h.fund_code,
quantity=h.units,
average_cost=avg_cost,
total_cost_basis=cost,
currency=h.currency,
))
return ManualSnapshotPayload(
date=as_of,
currency=_CCY,
positions=positions,
cash_balances={_CCY: Decimal(0)},
)
async def _scrape_live_session( async def _scrape_live_session(

View file

@ -1,276 +0,0 @@
"""Interactive Brokers Flex Web Service ingestion provider.
Pulls daily Activity Flex Query reports via the ``ibflex`` library, maps
Trades + CashTransactions to broker-sync ``Activity`` objects, and runs a
reconciliation step against the broker-reported ``OpenPositions``.
See ``docs/specs/2026-05-26-ibkr-ingest-design.md`` for the full design.
"""
from __future__ import annotations
import logging
from collections.abc import AsyncIterator
from datetime import UTC, date, datetime
from decimal import Decimal
from typing import Any
from broker_sync.models import Account, AccountType, Activity, ActivityType
log = logging.getLogger(__name__)
# Map IBKR currency → default exchange suffix.
# Today: GBP → LSE (.L). Extend when more accounts onboard.
_LSE_EXCHANGES = {"LSE", "LSEETF", "LSEIOB1"}
_GBP_SUFFIX = ".L"
def canonical_symbol(symbol: str, *, exchange: str | None, currency: str) -> str:
"""Return the WF-canonical form of an IBKR ticker.
LSE-listed GBP instruments get a ``.L`` suffix (Wealthfolio convention).
US instruments and anything already suffixed are returned unchanged.
"""
if "." in symbol:
return symbol
if exchange in _LSE_EXCHANGES or (exchange is None and currency == "GBP"):
return symbol + _GBP_SUFFIX
return symbol
def _to_utc_datetime(value: Any, time_value: Any = None) -> datetime:
"""Combine a date (with optional time) into a UTC datetime."""
if isinstance(value, datetime):
dt = value
elif isinstance(value, date):
if isinstance(time_value, str):
dt = datetime.fromisoformat(f"{value.isoformat()}T{time_value}")
elif hasattr(time_value, "isoformat"):
dt = datetime.fromisoformat(f"{value.isoformat()}T{time_value.isoformat()}")
else:
dt = datetime.fromisoformat(f"{value.isoformat()}T00:00:00")
else:
# Last-resort: ISO string
dt = datetime.fromisoformat(str(value))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
return dt.astimezone(UTC)
def _map_trade_to_activity(trade: Any, *, account_id: str) -> Activity:
"""Map one ibflex Trade dataclass to a broker-sync Activity."""
buy_sell_obj = trade.buySell
buy_sell = buy_sell_obj.name if hasattr(buy_sell_obj, "name") else str(buy_sell_obj)
if buy_sell == "BUY":
activity_type = ActivityType.BUY
elif buy_sell == "SELL":
activity_type = ActivityType.SELL
else:
raise ValueError(
f"unsupported Trade.buySell={buy_sell!r} on tradeID={trade.tradeID}"
)
exchange = getattr(trade, "exchange", None)
symbol = canonical_symbol(
str(trade.symbol),
exchange=str(exchange) if exchange is not None else None,
currency=str(trade.currency),
)
quantity = abs(Decimal(str(trade.quantity)))
unit_price = Decimal(str(trade.tradePrice))
commission = trade.ibCommission if trade.ibCommission is not None else Decimal(0)
fee = abs(Decimal(str(commission)))
return Activity(
external_id=f"ibkr:trade:{trade.tradeID}",
account_id=account_id,
account_type=AccountType.GIA,
date=_to_utc_datetime(trade.tradeDate, getattr(trade, "tradeTime", None)),
activity_type=activity_type,
currency=str(trade.currency),
symbol=symbol,
quantity=quantity,
unit_price=unit_price,
fee=fee,
)
# Map known IBKR Flex CashTransaction.type values to broker-sync ActivityType.
# Unknown values yield None + a WARNING — we refuse to guess.
_CASH_TYPE_MAP: dict[str, ActivityType] = {
"DIVIDEND": ActivityType.DIVIDEND,
"DIVIDENDS": ActivityType.DIVIDEND,
"PAYMENT_IN_LIEU_OF_DIVIDENDS": ActivityType.DIVIDEND,
"WITHHOLDING_TAX": ActivityType.TAX,
"WHTAX": ActivityType.TAX,
"BROKER_INTEREST_RECEIVED": ActivityType.INTEREST,
"BROKER_INTEREST_PAID": ActivityType.FEE,
"COMMISSION_ADJUSTMENTS": ActivityType.FEE,
"OTHER_FEES": ActivityType.FEE,
}
_DEPOSIT_WITHDRAWAL_TYPES = {
"DEPOSITS_WITHDRAWALS",
"DEPOSIT_WITHDRAWALS",
"DEPOSITWITHDRAW",
}
def _normalise_cash_type(type_obj: Any) -> str:
"""Canonicalise the IBKR Flex CashTransaction.type enum to an UPPER_SNAKE name."""
if hasattr(type_obj, "name"):
return str(type_obj.name).upper()
return str(type_obj).strip().upper().replace(" ", "_").replace("&", "AND")
def _map_cash_to_activity(cash: Any, *, account_id: str) -> Activity | None:
"""Map one ibflex CashTransaction to a broker-sync Activity.
Returns None for unsupported types (logged at WARNING).
"""
type_name = _normalise_cash_type(cash.type)
amount = Decimal(str(cash.amount))
if type_name in _DEPOSIT_WITHDRAWAL_TYPES:
activity_type = ActivityType.DEPOSIT if amount > 0 else ActivityType.WITHDRAWAL
else:
mapped = _CASH_TYPE_MAP.get(type_name)
if mapped is None:
log.warning(
"ibkr: skipping cash transaction id=%s with unsupported type=%r",
getattr(cash, "transactionID", "?"),
type_name,
)
return None
activity_type = mapped
dt_raw = cash.dateTime
dt = _to_utc_datetime(dt_raw) if dt_raw is not None else datetime.now(UTC)
return Activity(
external_id=f"ibkr:cash:{cash.transactionID}",
account_id=account_id,
account_type=AccountType.GIA,
date=dt,
activity_type=activity_type,
currency=str(cash.currency),
amount=abs(amount),
)
class IBKRError(Exception):
"""Base class for ibkr-provider errors."""
class IBKRAccountMismatchError(IBKRError):
"""Flex statement accountId did not match configured upstream id."""
class IBKRProvider:
"""Fetches IBKR Flex Activity reports and yields broker-sync Activities.
Reconciliation (OpenPositions vs WF-computed qty) is NOT part of
``fetch()`` it runs at the CLI layer after import, where the
WealthfolioSink is available to query WF.
"""
name = "ibkr"
def __init__(
self,
*,
token: str,
query_id: str,
upstream_account_id: str,
) -> None:
self._token = token
self._query_id = query_id
# Single source of truth — the IBKR account number (e.g. U13279690).
# The pipeline's _ensure_accounts() resolves this to a Wealthfolio
# UUID via (provider="ibkr", providerAccountId=upstream_account_id);
# activities are remapped to the WF UUID before import.
self._upstream_account_id = upstream_account_id
# Stashed for the reconciliation step after fetch() drains.
self._last_response: Any = None
def accounts(self) -> list[Account]:
return [
Account(
id=self._upstream_account_id,
name="Interactive Brokers (UK)",
account_type=AccountType.GIA,
currency="GBP", # FX-aware per-trade; account ccy is GBP
provider="ibkr",
)
]
async def close(self) -> None:
# ibflex.client uses synchronous `requests` under the hood; no resources to close.
return
async def fetch(
self,
*,
since: datetime | None = None, # Flex query owns the date range
before: datetime | None = None,
) -> AsyncIterator[Activity]:
from ibflex import client as ib_client
from ibflex import parser as ib_parser
del since, before # unused; Flex query defines the period
xml_bytes = ib_client.download(self._token, self._query_id)
response = ib_parser.parse(xml_bytes)
self._last_response = response
if not response.FlexStatements:
log.warning("ibkr: Flex response had no FlexStatements")
return
stmt = response.FlexStatements[0]
if str(stmt.accountId) != self._upstream_account_id:
raise IBKRAccountMismatchError(
f"Flex statement.accountId={stmt.accountId!r} does not match "
f"configured IBKR_ACCOUNT_ID_UPSTREAM={self._upstream_account_id!r} "
f"— refusing to ingest"
)
for trade in stmt.Trades or []:
yield _map_trade_to_activity(trade, account_id=self._upstream_account_id)
for cash in stmt.CashTransactions or []:
activity = _map_cash_to_activity(cash, account_id=self._upstream_account_id)
if activity is not None:
yield activity
def open_positions(self) -> list[tuple[str, Decimal]]:
"""Return ``[(canonical_symbol, position_qty), ...]`` from the most
recent fetch. Empty list before the first ``fetch()`` call."""
if self._last_response is None:
return []
stmt = self._last_response.FlexStatements[0]
out: list[tuple[str, Decimal]] = []
for pos in stmt.OpenPositions or []:
exchange = getattr(pos, "exchange", None)
symbol = canonical_symbol(
str(pos.symbol),
exchange=str(exchange) if exchange is not None else None,
currency=str(pos.currency),
)
out.append((symbol, Decimal(str(pos.position))))
return out
def cash_balances(self) -> list[tuple[str, Decimal]]:
"""Return ``[(currency, ending_cash), ...]`` from the CashReport.
Includes the ``BASE_SUMMARY`` aggregate row (account base currency
consolidated) plus any per-currency rows. Empty list if no
CashReport section in the Flex query or before first ``fetch()``.
"""
if self._last_response is None:
return []
stmt = self._last_response.FlexStatements[0]
out: list[tuple[str, Decimal]] = []
for row in stmt.CashReport or []:
if row.endingCash is None or row.currency is None:
continue
out.append((str(row.currency), Decimal(str(row.endingCash))))
return out

View file

@ -16,7 +16,6 @@ from __future__ import annotations
import email import email
import imaplib import imaplib
import logging import logging
import os
import re import re
import ssl import ssl
from collections.abc import AsyncIterator, Iterator from collections.abc import AsyncIterator, Iterator
@ -151,41 +150,9 @@ def _fetch_all(creds: ImapCreds) -> Iterator[bytes]:
yield raw yield raw
def _resolve_excluded_providers() -> set[str]:
"""Return the set of providers the IMAP fetcher must skip.
Default-exclude list is structural `invest-engine` is ALWAYS skipped
unless explicitly opted back in via `BROKER_SYNC_IMAP_INCLUDE_PROVIDERS`.
This protects against accidental re-ingestion via any code path that
doesn't set the cron's env (e.g. `kubectl run --rm`, devvm `poetry run`,
a sibling agent session). See post-mortem 2026-05-27 the IMAP path
re-inserted 39 IE BUYs that had been deduped the previous day, because
the safety lived only on the cronjob spec.
Additional providers can be excluded via
`BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS`. `INCLUDE` always wins over
`EXCLUDE` and the default skip-list.
"""
_DEFAULT_EXCLUDED = {"invest-engine", "invest_engine"}
extra = {
p.strip().lower().replace("_", "-")
for p in os.environ.get("BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS", "").split(",")
if p.strip()
}
include = {
p.strip().lower().replace("_", "-")
for p in os.environ.get("BROKER_SYNC_IMAP_INCLUDE_PROVIDERS", "").split(",")
if p.strip()
}
# Canonicalise the default set under the same key normalisation.
canonical = {p.replace("_", "-") for p in _DEFAULT_EXCLUDED}
return (canonical | extra) - include
def fetch_activities(creds: ImapCreds) -> list[Activity]: def fetch_activities(creds: ImapCreds) -> list[Activity]:
out: list[Activity] = [] out: list[Activity] = []
ie_parsed = schwab_parsed = ie_skipped = skipped = 0 ie_parsed = schwab_parsed = skipped = 0
exclude = _resolve_excluded_providers()
for raw in _fetch_all(creds): for raw in _fetch_all(creds):
try: try:
msg = email.message_from_bytes(raw) msg = email.message_from_bytes(raw)
@ -194,28 +161,17 @@ def fetch_activities(creds: ImapCreds) -> list[Activity]:
continue continue
sender = _extract_sender(msg) sender = _extract_sender(msg)
if sender in _IE_SENDERS or sender.endswith("@investengine.com"): if sender in _IE_SENDERS or sender.endswith("@investengine.com"):
if "invest-engine" in exclude:
ie_skipped += 1
continue
out.extend(ie_parser.parse_invest_engine_email(raw)) out.extend(ie_parser.parse_invest_engine_email(raw))
ie_parsed += 1 ie_parsed += 1
elif ( elif sender in _SCHWAB_SENDERS or sender.endswith("@schwab.com"):
sender in _SCHWAB_SENDERS
or sender.endswith("@schwab.com")
or sender.endswith(".schwab.com") # e.g. donotreply@mail.schwab.com
):
if "schwab" in exclude:
skipped += 1
continue
html = _html_or_text(msg) html = _html_or_text(msg)
out.extend(parse_schwab_email(html)) out.extend(parse_schwab_email(html))
schwab_parsed += 1 schwab_parsed += 1
else: else:
skipped += 1 skipped += 1
log.info( log.info(
"imap: ie_parsed=%d ie_skipped=%d schwab_parsed=%d skipped=%d%d activities", "imap: ie_parsed=%d schwab_parsed=%d skipped=%d%d activities",
ie_parsed, ie_parsed,
ie_skipped,
schwab_parsed, schwab_parsed,
skipped, skipped,
len(out), len(out),

View file

@ -1,71 +1,79 @@
"""Schwab workplace-RSU email parser. """Schwab workplace-RSU email parser.
Schwab Stock Plan Services sends a "Your trade was executed" email for Two email shapes are handled:
each sell-to-cover trade (and any user-initiated trade) on the workplace
account. The body has five `<td class="dark-background-body" align="right">`
cells holding date / direction / quantity / ticker / price.
It does NOT email vest-release / Release Confirmation messages to the 1. Trade confirmations (sell-to-cover or user-initiated trades): HTML
employee address for this account (verified against 4 years of inbox with five `<td class="dark-background-body" align="right">` cells
history, 2022-2026). The vest itself is invisible to IMAP. holding date / direction / quantity / ticker / price. one Activity.
Same-day-sell synthesis: Meta RSUs vest and are sold the same day at 2. Release Confirmations (RSU vest events): subject/body mentions
the same FMV (verified across 14 historical vests). When a SELL email "Release Confirmation" or "Award Vesting"; body lists vest date,
is parsed AND its trade date is on or after `VEST_INFER_FROM_DATE`, shares released, FMV, shares sold to cover, and USD tax withheld.
we ALSO emit a paired BUY representing the underlying vest event (Activity, Activity, VestEvent) tuple: the gross vest (BUY at FMV),
same date, same quantity, same price. The date boundary stops this the sell-to-cover (SELL at FMV), and a standalone VestEvent for the
back-filling historical vests that already have csv-sourced BUY rows payslip-ingest reconciliation pipeline.
in Wealthfolio (which would duplicate at chart-level despite distinct
external_ids).
On any parse failure we return an empty list an unparseable email On any parse failure we return the neutral empty result (no Activities,
shouldn't crash the IMAP batch. no VestEvent) an unparseable email shouldn't crash the IMAP batch.
""" """
from __future__ import annotations from __future__ import annotations
import logging import logging
import os import re
from datetime import date, datetime from dataclasses import dataclass
from decimal import Decimal, InvalidOperation from decimal import Decimal, InvalidOperation
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from dateutil import parser as dateparser from dateutil import parser as dateparser
from broker_sync.models import AccountType, Activity, ActivityType from broker_sync.models import AccountType, Activity, ActivityType, VestEvent
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
_ACCOUNT_ID = "schwab-workplace" _ACCOUNT_ID = "schwab-workplace"
_DEFAULT_CURRENCY = "USD" _DEFAULT_CURRENCY = "USD"
# Inferred-BUY synthesis boundary. SELL emails on or after this date # Vest-confirmation emails reliably include one of these phrases. Matching
# emit a paired BUY for the underlying vest; earlier ones do not (they # is case-insensitive and on the raw HTML (cheap — no DOM parse needed).
# already have csv-sourced BUYs in Wealthfolio from the one-shot _VEST_SUBJECT_RE = re.compile(r"Release Confirmation|Award Vesting|RSU Release",
# historical backfill, last vest 2026-02-18). Override at runtime with re.IGNORECASE)
# the env var if a different cutover is needed. ISO-8601 yyyy-mm-dd.
_DEFAULT_VEST_INFER_FROM = "2026-04-01"
def _vest_infer_from() -> date: @dataclass
raw = os.environ.get("SCHWAB_VEST_INFER_FROM_DATE", _DEFAULT_VEST_INFER_FROM).strip() class VestParseResult:
try: activities: list[Activity]
return datetime.strptime(raw, "%Y-%m-%d").date() vest_event: VestEvent | None
except ValueError:
log.warning(
"SCHWAB_VEST_INFER_FROM_DATE=%r is not yyyy-mm-dd; using default %s",
raw, _DEFAULT_VEST_INFER_FROM,
)
return datetime.strptime(_DEFAULT_VEST_INFER_FROM, "%Y-%m-%d").date()
def parse_schwab_email(raw_html: str) -> list[Activity]: def parse_schwab_email(raw_html: str) -> list[Activity]:
"""Return Activities for a Schwab trade-executed email. """Return a single-item list of Activity on success, empty on failure.
Returns: empty list on parse failure; one Activity for a BUY-direction For vest-confirmation emails, returns the two Activity rows (gross
email (rare the workplace account is essentially sell-only); for a vest + sell-to-cover). Use `parse_schwab_email_full` when the caller
SELL email, returns [SELL] plus an inferred paired BUY (=vest event) also needs the VestEvent.
when the trade date is on or after the synthesis-boundary date.
""" """
return parse_schwab_email_full(raw_html).activities
def parse_schwab_email_full(raw_html: str) -> VestParseResult:
"""Full parse — returns activities + optional VestEvent.
Dispatches: vest-confirmation emails `_parse_vest_release`;
everything else the legacy single-row confirmation parser.
"""
if _VEST_SUBJECT_RE.search(raw_html):
result = _parse_vest_release(raw_html)
if result is not None:
return result
log.warning("schwab: detected vest email but could not extract fields; "
"add a real fixture to broker-sync/tests/fixtures/")
return VestParseResult(activities=[], vest_event=None)
return VestParseResult(activities=_parse_trade_confirmation(raw_html), vest_event=None)
def _parse_trade_confirmation(raw_html: str) -> list[Activity]:
"""Legacy 5-cell trade confirmation parser."""
try: try:
soup = BeautifulSoup(raw_html, "html.parser") soup = BeautifulSoup(raw_html, "html.parser")
cells = [ cells = [
@ -82,44 +90,151 @@ def parse_schwab_email(raw_html: str) -> list[Activity]:
direction = (ActivityType.SELL direction = (ActivityType.SELL
if direction_txt.strip().lower() == "sold" else ActivityType.BUY) if direction_txt.strip().lower() == "sold" else ActivityType.BUY)
quantity = Decimal(qty_txt.replace(",", "").strip()) quantity = Decimal(qty_txt.replace(",", "").strip())
# Price like "$123.45" — strip the currency sign and parse the numeric tail.
# Handle "£", "€", "USD", etc. by taking the last numeric span.
price_clean = price_txt price_clean = price_txt
for sign in ("$", "£", "", "USD", "GBP", "EUR"): for sign in ("$", "£", "", "USD", "GBP", "EUR"):
price_clean = price_clean.replace(sign, "") price_clean = price_clean.replace(sign, "")
unit_price = Decimal(price_clean.replace(",", "").strip()) unit_price = Decimal(price_clean.replace(",", "").strip())
ticker_clean = ticker.strip()
external_id = (f"schwab:{trade_date.date().isoformat()}:{ticker_clean}:" external_id = (f"schwab:{trade_date.date().isoformat()}:{ticker}:"
f"{direction.value}:{quantity}") f"{direction.value}:{quantity}")
primary = Activity( return [
Activity(
external_id=external_id, external_id=external_id,
account_id=_ACCOUNT_ID, account_id=_ACCOUNT_ID,
account_type=AccountType.GIA, account_type=AccountType.GIA,
date=trade_date, date=trade_date,
activity_type=direction, activity_type=direction,
symbol=ticker_clean, symbol=ticker.strip(),
quantity=quantity, quantity=quantity,
unit_price=unit_price, unit_price=unit_price,
currency=_DEFAULT_CURRENCY, currency=_DEFAULT_CURRENCY,
notes=f"schwab-email:{direction_txt}", notes=f"schwab-email:{direction_txt}",
) )
]
if direction is not ActivityType.SELL or trade_date.date() < _vest_infer_from():
return [primary]
inferred_buy = Activity(
external_id=(f"schwab:vest:{trade_date.date().isoformat()}:"
f"{ticker_clean}:BUY:{quantity}"),
account_id=_ACCOUNT_ID,
account_type=AccountType.GIA,
date=trade_date,
activity_type=ActivityType.BUY,
symbol=ticker_clean,
quantity=quantity,
unit_price=unit_price,
currency=_DEFAULT_CURRENCY,
notes=(f"schwab-vest-inferred-from-same-day-sell | "
f"paired_sell_external_id={external_id}"),
)
return [inferred_buy, primary]
except (ValueError, InvalidOperation, IndexError, AttributeError): except (ValueError, InvalidOperation, IndexError, AttributeError):
return [] return []
# Heuristic extractors for vest-release emails. Labels observed in public
# Schwab RSU release samples; real fixture needed to tighten these.
_VEST_DATE_RE = re.compile(
r"(?:Release Date|Vest Date|Vesting Date)\s*[:<][^0-9]*"
r"(\d{1,2}[\s/\-][A-Za-z]{3}[\s/\-]\d{2,4}|\d{2}/\d{2}/\d{4}|\d{4}-\d{2}-\d{2})",
re.IGNORECASE)
_VEST_TICKER_RE = re.compile(r"(?:Ticker|Symbol)\s*[:<]\s*([A-Z]{2,5})",
re.IGNORECASE)
_VEST_SHARES_RELEASED_RE = re.compile(
r"(?:Shares Released|Total Shares (?:Released|Vested))\s*[:<]\s*"
r"([\d,]+(?:\.\d+)?)",
re.IGNORECASE)
_VEST_SHARES_WITHHELD_RE = re.compile(
r"(?:Shares (?:Withheld|Sold)(?: for Taxes)?)\s*[:<]\s*"
r"([\d,]+(?:\.\d+)?)",
re.IGNORECASE)
_VEST_FMV_RE = re.compile(
r"(?:Market Price|FMV|Fair Market Value)\s*[:<]\s*"
r"\$?\s*([\d,]+(?:\.\d+)?)",
re.IGNORECASE)
_VEST_TAX_USD_RE = re.compile(
r"(?:Tax Withholding Amount|Total Tax Withholding|Tax Withheld)\s*[:<]\s*"
r"\$?\s*([\d,]+(?:\.\d+)?)",
re.IGNORECASE)
def _parse_vest_release(raw_html: str) -> VestParseResult | None:
"""Best-effort extraction from a Schwab Release Confirmation email.
Runs label regexes on the plain-text view of the HTML. Returns None
(signalling fall-through) if the core four fields (date, ticker,
shares released, FMV) don't all resolve — that's a strong signal the
heuristics need a real fixture before they can be trusted on a live
email.
"""
try:
soup = BeautifulSoup(raw_html, "html.parser")
text = soup.get_text(" ", strip=True)
except Exception:
return None
date_str = _search_group(_VEST_DATE_RE, text)
ticker = _search_group(_VEST_TICKER_RE, text)
shares_released_str = _search_group(_VEST_SHARES_RELEASED_RE, text)
fmv_str = _search_group(_VEST_FMV_RE, text)
if not (date_str and ticker and shares_released_str and fmv_str):
return None
try:
vest_date = dateparser.parse(date_str)
shares_vested = Decimal(shares_released_str.replace(",", ""))
fmv = Decimal(fmv_str.replace(",", ""))
except (ValueError, InvalidOperation):
return None
shares_sold_str = _search_group(_VEST_SHARES_WITHHELD_RE, text)
shares_sold_to_cover = (Decimal(shares_sold_str.replace(",", ""))
if shares_sold_str else None)
tax_usd_str = _search_group(_VEST_TAX_USD_RE, text)
tax_withheld_usd = (Decimal(tax_usd_str.replace(",", ""))
if tax_usd_str else None)
external_id = (f"schwab:{vest_date.date().isoformat()}:{ticker}:VEST:"
f"{shares_vested}")
vest_event = VestEvent(
external_id=external_id,
vest_date=vest_date,
ticker=ticker,
shares_vested=shares_vested,
shares_sold_to_cover=shares_sold_to_cover,
fmv_at_vest_usd=fmv,
tax_withheld_usd=tax_withheld_usd,
source="schwab_email",
raw={
"date": date_str,
"ticker": ticker,
"shares_released": shares_released_str,
"fmv": fmv_str,
"shares_withheld": shares_sold_str or "",
"tax_withheld": tax_usd_str or "",
},
)
# Sibling Activities for Wealthfolio: full vest as BUY, sell-to-cover
# slice as SELL, both at the same FMV so net cash = 0 on that day.
activities: list[Activity] = [
Activity(
external_id=f"{external_id}:BUY",
account_id=_ACCOUNT_ID,
account_type=AccountType.GIA,
date=vest_date,
activity_type=ActivityType.BUY,
symbol=ticker,
quantity=shares_vested,
unit_price=fmv,
currency=_DEFAULT_CURRENCY,
notes="schwab-vest-release",
)
]
if shares_sold_to_cover is not None and shares_sold_to_cover > 0:
activities.append(
Activity(
external_id=f"{external_id}:SELL_TO_COVER",
account_id=_ACCOUNT_ID,
account_type=AccountType.GIA,
date=vest_date,
activity_type=ActivityType.SELL,
symbol=ticker,
quantity=shares_sold_to_cover,
unit_price=fmv,
currency=_DEFAULT_CURRENCY,
notes="schwab-sell-to-cover",
))
return VestParseResult(activities=activities, vest_event=vest_event)
def _search_group(pattern: re.Pattern[str], text: str) -> str | None:
m = pattern.search(text)
return m.group(1).strip() if m else None

View file

@ -2,9 +2,7 @@ from __future__ import annotations
import json import json
from collections.abc import Iterable from collections.abc import Iterable
from dataclasses import dataclass from datetime import UTC
from datetime import UTC, date
from decimal import Decimal
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -16,8 +14,6 @@ _LOGIN_PATH = "/api/v1/auth/login"
_ACCOUNTS_PATH = "/api/v1/accounts" _ACCOUNTS_PATH = "/api/v1/accounts"
_IMPORT_CHECK = "/api/v1/activities/import/check" _IMPORT_CHECK = "/api/v1/activities/import/check"
_IMPORT_REAL = "/api/v1/activities/import" _IMPORT_REAL = "/api/v1/activities/import"
_SNAPSHOTS_IMPORT = "/api/v1/snapshots/import"
_ACTIVITIES_SEARCH = "/api/v1/activities/search"
class WealthfolioError(Exception): class WealthfolioError(Exception):
@ -247,14 +243,10 @@ class WealthfolioSink:
if summary is not None: if summary is not None:
imported_n = int(summary.get("imported", 0)) imported_n = int(summary.get("imported", 0))
total_n = int(summary.get("total", len(valid_rows))) total_n = int(summary.get("total", len(valid_rows)))
dupes = int(summary.get("duplicates", 0)) if imported_n < total_n:
skipped = int(summary.get("skipped", 0))
# Duplicates are expected on every re-run (the cron re-processes the
# full IMAP window each night) — treat (imported + duplicates) as
# accounted-for. Only fail if something was genuinely lost.
accounted = imported_n + dupes
if accounted < total_n:
err_msg = summary.get("errorMessage") or "no errorMessage" err_msg = summary.get("errorMessage") or "no errorMessage"
skipped = int(summary.get("skipped", 0))
dupes = int(summary.get("duplicates", 0))
raise ImportValidationError(f"Wealthfolio /import persisted {imported_n}/{total_n} " raise ImportValidationError(f"Wealthfolio /import persisted {imported_n}/{total_n} "
f"(skipped={skipped} duplicates={dupes}). " f"(skipped={skipped} duplicates={dupes}). "
f"errorMessage: {err_msg}") f"errorMessage: {err_msg}")
@ -270,175 +262,3 @@ class WealthfolioSink:
f"First warning: {first_warn}") f"First warning: {first_warn}")
assert isinstance(got, list) assert isinstance(got, list)
return [r for r in got if isinstance(r, dict)] return [r for r in got if isinstance(r, dict)]
# -- activity lookups --
async def cumulative_amount_with_notes_prefix(
self,
account_id: str,
notes_prefix: str,
) -> Decimal:
"""Sum the amount of DEPOSIT/WITHDRAWAL activities whose notes start
with ``notes_prefix``, signed (deposits positive, withdrawals negative).
Used by the Fidelity provider to compute the delta gains-offset:
``current_gain - cumulative_existing_offset`` becomes the new
DEPOSIT to emit on each monthly run.
"""
try:
resp = await self._request(
"POST", _ACTIVITIES_SEARCH,
json={"accountIds": [account_id], "page": 1, "pageSize": 500},
)
except Exception:
return Decimal(0)
if resp.status_code >= 400:
return Decimal(0)
payload = resp.json()
rows = payload.get("data", payload) if isinstance(payload, dict) else payload
if not isinstance(rows, list):
return Decimal(0)
total = Decimal(0)
for r in rows:
if not isinstance(r, dict):
continue
notes = r.get("comment") or r.get("notes") or ""
if not isinstance(notes, str) or not notes.startswith(notes_prefix):
continue
amt_raw = r.get("amount")
if amt_raw is None:
continue
try:
amt = Decimal(str(amt_raw))
except Exception:
continue
atype = (r.get("activityType") or r.get("activity_type") or "").upper()
if atype == "WITHDRAWAL":
total -= amt
else:
total += amt
return total
async def compute_position_qty(self, account_id: str) -> dict[str, Decimal]:
"""Return per-symbol net position quantity (BUY/IN minus SELL/OUT) for
one account. Skips cash activities and unknown activity types.
Used by the IBKR reconciliation step to compare against broker-reported
OpenPositions.
"""
qty_by_symbol: dict[str, Decimal] = {}
page = 1
while True:
resp = await self._request(
"POST", _ACTIVITIES_SEARCH,
json={"accountIds": [account_id], "page": page, "pageSize": 500},
)
resp.raise_for_status()
payload = resp.json()
activities = payload.get("activities", []) if isinstance(payload, dict) else []
if not activities:
break
for act in activities:
if not isinstance(act, dict):
continue
symbol = act.get("symbol") or ""
if not symbol or symbol.startswith("$CASH"):
continue
act_type = act.get("activityType") or ""
sign: int
if act_type in {"BUY", "ADD_HOLDING", "TRANSFER_IN"}:
sign = 1
elif act_type in {"SELL", "REMOVE_HOLDING", "TRANSFER_OUT"}:
sign = -1
else:
continue
try:
qty = Decimal(str(act.get("quantity") or 0))
except Exception:
continue
qty_by_symbol[symbol] = qty_by_symbol.get(symbol, Decimal(0)) + sign * qty
total_pages = int(payload.get("totalPages") or 1) if isinstance(payload, dict) else 1
if page >= total_pages:
break
page += 1
return qty_by_symbol
# -- manual holdings snapshots --
async def push_manual_snapshots(
self,
account_id: str,
snapshots: list[ManualSnapshotPayload],
) -> dict[str, Any]:
"""Push manual holdings snapshots to /api/v1/snapshots/import.
Each snapshot carries a date + per-fund positions + cash balances.
Wealthfolio auto-creates any unknown asset symbol with
``kind=INVESTMENT, quoteMode=MANUAL, quoteCcy=<currency>`` and uses
the snapshot to derive holdings + valuation for that date bypassing
the activity-ledger derivation entirely for the targeted day.
Used by the Fidelity provider since PlanViewer exposes current
fund units + price but no per-trade history. Re-imports for the
same (account, date) overwrite in place.
"""
if not snapshots:
return {"snapshotsImported": 0, "snapshotsFailed": 0, "errors": []}
body = {
"accountId": account_id,
"snapshots": [_snapshot_to_payload(s) for s in snapshots],
}
resp = await self._request("POST", _SNAPSHOTS_IMPORT, json=body)
if resp.status_code >= 400:
try:
payload = resp.json()
except Exception:
payload = {"raw": resp.text}
raise WealthfolioError(
f"Wealthfolio /snapshots/import rejected: {payload}")
result = resp.json()
assert isinstance(result, dict)
failed = int(result.get("snapshotsFailed", 0))
if failed > 0:
raise WealthfolioError(
f"Wealthfolio /snapshots/import: {failed} snapshot(s) failed; "
f"errors={result.get('errors')}")
return result
@dataclass(frozen=True)
class SnapshotPosition:
"""A per-fund position row in a Wealthfolio manual snapshot."""
symbol: str
quantity: Decimal
average_cost: Decimal
total_cost_basis: Decimal
currency: str
@dataclass(frozen=True)
class ManualSnapshotPayload:
"""Sink-facing snapshot row. Mirrors the JSON shape WF expects."""
date: date
currency: str
positions: list[SnapshotPosition]
cash_balances: dict[str, Decimal]
def _snapshot_to_payload(s: ManualSnapshotPayload) -> dict[str, Any]:
"""Serialise a ManualSnapshotPayload into WF's import wire format."""
return {
"date": s.date.isoformat(),
"currency": s.currency,
"positions": [
{
"symbol": p.symbol,
"quantity": format(p.quantity, "f"),
"averageCost": format(p.average_cost, "f"),
"totalCostBasis": format(p.total_cost_basis, "f"),
"currency": p.currency,
}
for p in s.positions
],
"cashBalances": {k: format(v, "f") for k, v in s.cash_balances.items()},
}

File diff suppressed because it is too large Load diff

View file

@ -1,127 +0,0 @@
# Provider: Interactive Brokers (IBKR Flex Web Service)
Pulls a daily Activity Flex Query via the [`ibflex`](https://github.com/csingley/ibflex)
library, maps Trades + CashTransactions to broker-sync Activities, and
reconciles broker-side OpenPositions against WF-computed quantities.
## When this runs
- K8s CronJob `broker-sync-ibkr` in the `broker-sync` namespace, daily 02:00 UK.
- Manual trigger:
```bash
kubectl -n broker-sync create job --from=cronjob/broker-sync-ibkr broker-sync-ibkr-manual-$(date +%s)
```
## Vault secrets — `secret/broker-sync`
| Key | Description |
|---|---|
| `ibkr_flex_token` | Flex Web Service token (1-year validity, rotate via IBKR Client Portal). |
| `ibkr_flex_query_id` | Activity Flex Query ID (57 digit number). |
| `ibkr_account_id` | Wealthfolio account UUID for "Interactive Brokers (UK)". |
| `ibkr_account_id_upstream` | IBKR-side account number (e.g. `U12345678`) — guards against wrong-account ingestion. |
ExternalSecret `broker-sync-secrets` syncs all keys from `secret/broker-sync`
to a K8s secret of the same name. New keys take ~15 min to propagate.
## IBKR Flex Query design
In IBKR Client Portal → Reports → Flex Queries → Activity Flex Query, create
a new query named `broker-sync-activity` with:
| Section | Required fields |
|---|---|
| Account Information | accountId |
| Trades | tradeID, tradeDate, tradeTime, symbol, buySell, quantity, tradePrice, currency, ibCommission, assetCategory, exchange |
| Cash Transactions | transactionID, dateTime, type, amount, currency, description |
| Open Positions | symbol, position, markPrice, currency, assetCategory, exchange |
| Securities Information | symbol, description, conid |
**Date Format:** `yyyy-MM-dd`. **Time Format:** `HH:mm:ss` (no timezone
suffix — ibflex 1.1 rejects timezone abbreviations in the time field).
**Date Range:** `Last 365 Days` — trailing window so a missed cron run
doesn't lose data. SyncRecordStore (keyed by `external_id`) makes
overlapping pulls idempotent. For a one-off historical backfill, widen
temporarily to `Year to Date` or `Custom Date Range`, run once, then
switch back.
## Cash type mapping
| IBKR Flex `CashTransaction.type` | broker-sync `ActivityType` |
|---|---|
| Dividends | DIVIDEND |
| Withholding Tax | TAX |
| Broker Interest Received | INTEREST |
| Broker Interest Paid | FEE |
| Commission Adjustments | FEE |
| Other Fees | FEE |
| Deposits & Withdrawals | DEPOSIT (amount > 0) / WITHDRAWAL (amount < 0) |
| anything else | skipped + WARNING logged (refuse to guess) |
## Dedup keys
- Trades: `external_id = "ibkr:trade:" + tradeID`
- Cash: `external_id = "ibkr:cash:" + transactionID`
Both are stable across re-runs; `dedup.SyncRecordStore` rejects already-
synced IDs.
## Symbol canonicalisation
LSE-listed GBP instruments get a `.L` suffix (Wealthfolio convention).
US instruments and anything already suffixed pass through unchanged.
The heuristic: `exchange in {LSE, LSEETF, LSEIOB1}` OR
`(exchange is None AND currency == GBP)` → suffix with `.L`. Edge cases
not yet covered (Euronext, XETRA) — extend `canonical_symbol` when those
holdings exist.
## Position reconciliation
Each run pushes to Pushgateway under job `broker-sync-ibkr`:
- `ibkr_position_drift_shares{symbol, account="ibkr-uk"}`
broker_qty wf_qty per asset.
- `ibkr_sync_last_success_timestamp_seconds` — unix timestamp.
Alerts (TODO, will be added to the monitoring stack on first
non-zero drift):
- `IBKRPositionDrift{symbol}``|drift| > 0.01` for >24h, Slack `#security`.
- `IBKRSyncStale` — timestamp > 36h old.
- `IBKRFlexTokenExpired` — Loki rule on the "code 1003" log line.
## Account guard
Before yielding any activities, the provider checks
`flex.accountId == IBKR_ACCOUNT_ID_UPSTREAM`. Mismatch → raises
`IBKRAccountMismatchError` and writes nothing. Prevents wrong-account
ingestion from a misconfigured query (e.g., someone replaced the token
with another user's by mistake).
## Token rotation
Flex tokens expire after 1 year. When the cron starts failing with
`ResponseCodeError(code=1003)`:
1. Sign in to IBKR Client Portal → Reports → Settings → Flex Web Service
→ regenerate token.
2. `vault kv patch secret/broker-sync ibkr_flex_token='<new-token>'`
3. ExternalSecrets controller picks up the new value within ~15 min; no
manual pod restart needed.
## Troubleshooting
| Symptom | Likely cause | Fix |
|---|---|---|
| `IBKR_FLEX_TOKEN not provided` exit 2 | Vault has placeholder value or key missing | `vault kv patch secret/broker-sync ibkr_flex_token='<real-token>'` |
| `IBKRAccountMismatchError` | `ibkr_account_id_upstream` doesn't match the account in the Flex query | Re-check IBKR account number; fix the Vault value |
| `ResponseCodeError(code=1003)` | Flex token expired | See "Token rotation" above |
| `StatementGenerationTimeout` | IBKR side slow | Single retry built in; if it persists, try a smaller date range |
| `Can't convert '... TZ' to time` parser error | Flex query has Time Format with timezone suffix | Switch to `HH:mm:ss` (no TZ) in Flex query settings |
| `'ETF' is not a valid AssetClass` | ETF set in fixture not in ibflex enum | Use `STK` in fixtures (IBKR Flex categorises ETFs under STK) |
## References
- Spec: [`docs/specs/2026-05-26-ibkr-ingest-design.md`](../specs/2026-05-26-ibkr-ingest-design.md)
- Plan: [`docs/plans/2026-05-26-ibkr-flex-ingestion.md`](../plans/2026-05-26-ibkr-flex-ingestion.md)
- Library: <https://github.com/csingley/ibflex>
- IBKR Flex Web Service docs: <https://www.interactivebrokers.com/en/software/am/am/reports/flex_web_service.htm>

View file

@ -1,328 +0,0 @@
# IBKR Flex Ingestion — Design
**Date:** 2026-05-26
**Status:** Approved (brainstorming session 2026-05-26)
**Author:** Viktor + Claude (Opus 4.7)
**Implementation plan:** TBD (will be written next session via writing-plans skill)
## Context
Adds Interactive Brokers (IBKR UK / IE — stocks/ETFs only) as a new
broker-sync provider, pushing activities to Wealthfolio on a daily
schedule alongside the existing Trading 212 / InvestEngine / Fidelity
pipelines.
The user's IBKR account is **currently empty** (no positions, no trades).
This design covers the integration as it will run once the account is
funded and active. The initial backfill step in the setup checklist is a
no-op until the first IBKR trade.
This work is the structural follow-on from the 2026-05-26 Wealthfolio
dedup session, in which £252k of duplicated InvestEngine positions
accumulated silently in WF because the IMAP and API ingestion paths
emitted different `external_id` schemes and never reconciled against
broker-reported truth. The IBKR design bakes in **broker-vs-WF position
reconciliation from day one** — the missing capability that allowed the
IE drift to grow undetected.
## Decisions
### D1 — Use IBKR Flex Web Service (not Client Portal API / TWS)
Flex Web Service is a token-authenticated REST endpoint returning XML
statements. Suits unattended cron because:
- One-year token validity (no daily re-auth, unlike Client Portal Gateway).
- No sidecar / GUI / Java runtime needed.
- Designed for periodic batch reporting — the exact shape of our pipeline.
Client Portal Web API + `ibind` was considered and rejected: its Gateway
sidecar requires browser-based re-auth roughly every 24 hours, which is
incompatible with unattended scheduling.
### D2 — Library: `ibflex` (`csingley/ibflex` on PyPI)
Adds `ibflex = "^0.16"` to `pyproject.toml`. The library provides:
- `client.download(token, query_id) -> bytes` — handles Flex's 2-step
async API (`SendRequest``GetStatement` polling).
- `parser.parse(xml) -> FlexQueryResponse` — typed dataclasses for
`Trades`, `CashTransactions`, `OpenPositions`, `SecuritiesInfo`.
Fallback (Approach B): if `ibflex` proves to lag IBKR schema changes, drop
in raw `httpx` + `xml.etree`. Same provider shape; only the parsing
internals change.
### D3 — One CronJob, daily 02:00 UK, in `broker-sync` namespace
Matches the existing `broker-sync-trading212` cadence and placement. No
new namespace, no new image.
### D4 — Reconciliation is mandatory, not optional
Every run computes a per-asset quantity from the Flex
`OpenPositions` section and compares against WF's computed quantity from
activities. Drift is published as a Pushgateway metric. Cross-checking
broker truth is the line of defense against the IE-style silent
divergence we saw on 2026-05-26.
### D5 — One account, one query
Single Flex Activity Query covering Trades + Cash + Open Positions +
Securities. Single `Interactive Brokers (UK)` account in Wealthfolio.
Multiple accounts can be added later by parameterising the CLI command;
not in scope now.
## Architecture
```
broker-sync K8s namespace
├── CronJob broker-sync-ibkr (schedule: 0 2 * * *)
│ ├── env from broker-sync-secrets:
│ │ IBKR_FLEX_TOKEN, IBKR_FLEX_QUERY_ID, IBKR_ACCOUNT_ID,
│ │ WF_BASE_URL, WF_USERNAME, WF_PASSWORD
│ ├── PVC broker-sync-data-encrypted (shared with other broker-sync jobs)
│ └── image viktorbarzin/broker-sync:<tag> command = ["broker-sync", "ibkr"]
│ External calls
│ ├── HTTPS → ndcdyn.interactivebrokers.com (Flex Web Service)
│ ├── HTTP → wealthfolio.wealthfolio.svc (activities import + position read)
│ └── HTTP → pushgateway.monitoring.svc (drift + last-success metrics)
```
The provider is structurally identical to `broker-sync-trading212` and
the IE bearer-token path — same Vault → CronJob → provider → pipeline →
WF flow. Existing alerting (CronJob-failed, ExternalSecret-stale,
WF-sync-stale) applies transitively; we only add IBKR-specific alerts on
top.
## Components
| Path | Action | Description |
|---|---|---|
| `broker_sync/providers/ibkr.py` | NEW | `IBKRProvider` class implementing the `Provider` protocol. Maps Flex XML to `Activity[]`. ~200 LOC. |
| `broker_sync/cli.py` | MODIFY | New `@app.command("ibkr")` typer command, parallel to `trading212` and `invest-engine`. ~60 LOC. |
| `pyproject.toml` | MODIFY | Add `ibflex = "^0.16"` dependency. |
| `tests/providers/test_ibkr.py` | NEW | Fixture-based parsing tests, sign-conventions, position-drift math, account-id guard. |
| `infra/stacks/broker-sync/main.tf` | MODIFY | New `kubernetes_cron_job_v1.ibkr` resource. |
| Vault `secret/broker-sync` | MODIFY | Add `ibkr_flex_token`, `ibkr_flex_query_id`, `ibkr_account_id`. |
| Wealthfolio (one-time, manual) | NEW data | Create `Interactive Brokers (UK)` account; record its UUID in Vault. |
| `docs/providers/ibkr.md` | NEW | Production-facing provider docs (setup, query design, troubleshooting). Written after first successful run. |
## Data flow (per CronJob run)
1. **02:00 UK** — CronJob fires, pod starts with env from `broker-sync-secrets`.
2. **Download**`ibflex.client.download(token, query_id)` calls Flex
Web Service `SendRequest` + `GetStatement`. Typical 520 s. Library
handles retry/polling.
3. **Parse**`ibflex.parser.parse(xml)` produces a
`FlexQueryResponse`.
4. **Account guard** — two distinct identifiers exist:
- **IBKR_ACCOUNT_ID_UPSTREAM**: the IBKR-side account number
(e.g. `U12345678`), used to validate that the Flex report belongs to
the right account.
- **IBKR_ACCOUNT_ID** (alias: `ibkr_account_id` in Vault): the
Wealthfolio account UUID (e.g. `8a3f...`), used when posting
activities to WF.
Validate `stmt.accountId == os.environ["IBKR_ACCOUNT_ID_UPSTREAM"]`.
Refuse to ingest on mismatch — prevents wrong-account writes from a
misconfigured query.
5. **Map Trades → Activities**:
| Flex | Activity | Notes |
|---|---|---|
| `Trade.tradeID` | `external_id = "ibkr:trade:" + tradeID` | dedup key |
| `Trade.tradeDate + tradeTime` | `date` (UTC) | timezone normalised |
| `Trade.symbol` | `symbol` | canonicalised — LSE tickers get `.L` suffix |
| `Trade.buySell` (BUY / SELL) | `activity_type` | direct |
| `Trade.quantity` | `quantity` | always positive (broker-sync convention) |
| `Trade.tradePrice` | `unit_price` | |
| `Trade.currency` | `currency` | per-trade, multi-ccy supported |
| `Trade.ibCommission` | `fee = abs(ibCommission)` | always positive |
| `Trade.assetCategory` | (sanity check; skip if not in {STK, ETF}) |
6. **Map CashTransactions → Activities**:
| Flex `CashTransaction.type` | Activity `activity_type` | Notes |
|---|---|---|
| `Dividends` | `DIVIDEND` | |
| `Withholding Tax` | `FEE` | tag with `notes="wht:..."` |
| `Broker Interest Paid` | `FEE` | negative direction |
| `Broker Interest Received` | `DIVIDEND` | interest treated as income |
| `Deposits & Withdrawals` | `DEPOSIT` (amount > 0) or `WITHDRAWAL` (amount < 0) | |
| `Commission Adjustments` | `FEE` | |
| anything else | skip + log WARNING with the unknown type | refuse to guess, same convention as IE provider |
external_id = `"ibkr:cash:" + transactionID`.
7. **Cash-flow match**`_with_cash_flow_match(a)` from the shared
pipeline emits a matching DEPOSIT for every BUY (and WITHDRAWAL for
every SELL) so WF cash balance stays consistent. This is the existing
pattern used by T212 + IE; IBKR slots in identically.
8. **Dedup**`SyncRecordStore(/data/sync.db)` skips any `external_id`
already synced. Idempotent re-runs are safe.
9. **Import**`WealthfolioSink.import_activities(...)` POSTs to
`/api/v1/activities/import`. Existing 401 retry logic applies.
10. **Reconciliation** — for each `OpenPositions` row:
```python
# compute_wf_position_qty: NEW helper in WealthfolioSink.
# Queries POST /api/v1/activities/search filtered by accountId, sums
# BUY/SELL/ADD_HOLDING/REMOVE_HOLDING quantities per asset.
wf_qty_by_asset = wf_sink.compute_position_qty(IBKR_ACCOUNT_ID)
for pos in flex_response.OpenPositions:
symbol = canonical_symbol(pos.symbol)
drift = float(pos.position) - wf_qty_by_asset.get(symbol, Decimal(0))
push_metric(
"ibkr_position_drift_shares",
labels={"symbol": symbol, "account": "ibkr-uk"},
value=float(drift),
)
push_metric("ibkr_sync_last_success_timestamp_seconds", time.time())
```
11. **Exit 0** on success, non-zero on any unrecoverable error.
## Error handling
| Failure | Detection | Response | Alert |
|---|---|---|---|
| Token expired (Flex code 1003) | `ibflex.client.ResponseCodeError` | Exit non-zero with explicit log | `IBKRFlexTokenExpired` Loki rule + stale-success Prom alert |
| Statement generation timeout | `ibflex.client.StatementGenerationTimeout` | Retry once after 60 s, then exit non-zero | Stale-success alert catches it after 24 h |
| Empty report (quiet day) | Zero Trades + zero CashTxns | Log "no new activity", still update success timestamp, still reconcile | (none — happy path) |
| WF API 401 | HTTP status | Re-login via `WealthfolioSink` (existing logic) | (existing) |
| WF rejects an activity row | `summary.skipped > 0` | Log per-row + exit non-zero | `IBKRImportRejected` Loki rule |
| Network / DNS fail | httpx exception | Retry once with 30 s backoff | `KubeJobFailed` (existing) |
| **Position drift > 0.01 share for >24h** | Pushgateway non-zero across runs | Prom alert `IBKRPositionDrift{symbol}` warning → Slack `#security` | **NEW capability** |
| Account ID mismatch | Flex `accountId` != env var | Exit 2 immediately, write nothing | `IBKRAccountMismatch` urgent Loki rule |
## Setup checklist (one-time)
### Step 1 — IBKR Client Portal (manual, ~5 min)
1. Sign in at `https://www.interactivebrokers.co.uk/` → **Account
Settings**.
2. **Reports → Settings → Flex Web Service** → Enable → copy the
one-time-displayed **Token** (1 year validity).
3. **Reports → Flex Queries → Activity Flex Query → Create New**:
- Name: `broker-sync-activity`
- Sections: `Account Information`, `Trades`, `Cash Transactions`,
`Open Positions`, `Securities Information`
- Date Format: `yyyy-MM-dd` · Time Format: `HH:mm:ss TimeZone`
- Date Range: `Last 365 Days` — trailing window so a missed cron run
(failed pod, outage, vacation) doesn't lose data. SyncRecordStore
keys on `ibkr:trade:<tradeID>` / `ibkr:cash:<transactionID>`, so
overlapping pulls are no-ops. `Last Business Day` was the original
choice but creates a "single missed run = permanent data loss"
failure mode — rejected in favour of dedup-backed resync window.
- Format: XML
- Trade fields: ensure `tradeID`, `tradeDate`, `tradeTime`, `symbol`,
`buySell`, `quantity`, `tradePrice`, `currency`, `ibCommission`,
`assetCategory` selected.
- CashTransaction fields: `transactionID`, `dateTime`, `type`,
`amount`, `currency`, `description`.
- OpenPositions fields: `symbol`, `position`, `markPrice`, `currency`,
`assetCategory`.
- Save → copy the **Query ID** (57 digit number).
### Step 2 — Vault
```bash
vault kv patch secret/broker-sync \
ibkr_flex_token='YOUR_TOKEN' \
ibkr_flex_query_id='YOUR_QUERY_ID' \
ibkr_account_id='WF_UUID_FROM_STEP_3' \
ibkr_account_id_upstream='YOUR_IBKR_ACCOUNT_NUMBER'
```
### Step 3 — Create WF account (script + paste UUID back)
```bash
# Login → POST /accounts → capture id
curl -sS -c /tmp/wf-jar -X POST "$WF_BASE_URL/api/v1/auth/login" \
-H 'Content-Type: application/json' -d "{\"password\":\"$WF_PASSWORD\"}"
curl -sS -b /tmp/wf-jar -X POST "$WF_BASE_URL/api/v1/accounts" \
-H 'Content-Type: application/json' \
-d '{"name":"Interactive Brokers (UK)","accountType":"GIA","currency":"GBP","isActive":true}' \
| jq -r '.id'
# Paste the UUID back into Vault under ibkr_account_id
```
### Step 4 — Initial backfill (skip while account is empty)
When the IBKR account first holds positions, the daily CronJob will
backfill automatically up to the 365-day trailing window. For older
history, temporarily switch the Flex query Date Range to
`Year to Date` (or `Custom Date Range` with a 1-year window), run the
CronJob manually once, verify WF totals match the broker app, then
switch the Flex query back to `Last 365 Days` for daily incremental.
Dedup makes the temporary widening safe — already-synced rows are no-ops.
### Step 5 — Deploy
1. Push to broker-sync `main` (direct push — personal repo convention,
no PR) → GHA builds `viktorbarzin/broker-sync:latest`.
2. `cd infra/stacks/broker-sync && scripts/tg apply` creates the new
CronJob.
3. Wait for the 02:00 UK run, or trigger manually:
`kubectl -n broker-sync create job --from=cronjob/broker-sync-ibkr broker-sync-ibkr-test-1`.
4. Verify in WF UI: account exists, activities present (if any),
reconciliation drift metric showing zero.
## Testing
**Unit tests** in `tests/providers/test_ibkr.py`:
- `test_parse_trades_maps_to_activities` — canned 3-trade XML, verify
external_id, symbol mapping, quantity sign, fee sign.
- `test_parse_dividend_maps_to_dividend_activity`.
- `test_parse_unknown_cash_type_logs_warning_and_skips`.
- `test_account_id_mismatch_raises` — Flex returns a different
`accountId` than env, refuse to ingest.
- `test_position_drift_computed_correctly` — three-asset scenario, two
match, one drifts.
- `test_canonical_symbol_lse_suffix``VUAG``VUAG.L`,
`AAPL``AAPL` (US, no suffix), etc.
All tests mock `ibflex.client.download` to avoid network.
**Integration test** (manual, post-deploy):
- Trigger CronJob manually.
- Inspect logs.
- Verify in WF UI and Pushgateway.
## Acceptance criteria
- [ ] `broker-sync ibkr` command runs end-to-end against the real Flex Web
Service with the user's token.
- [ ] WF accepts the resulting activity imports (no `summary.skipped`).
- [ ] `ibkr_position_drift_shares` is published for every asset; drift = 0
on a steady-state run.
- [ ] Re-running the command is idempotent — no duplicate activities
written to WF.
- [ ] CronJob completes successfully on its schedule for 7 consecutive days
before the design is marked Done.
## Out of scope
- Multi-account support (only one IBKR account designed in).
- Real-time data / order placement (Flex is batch-only).
- Stock split / corporate action handling — IBKR reports these in the
Flex `CorporateActions` section but we're not enabling that section
yet; revisit if it becomes needed.
- Multi-currency FX conversion math — we record per-trade currency
faithfully and let Wealthfolio do FX. If WF's FX handling proves
inadequate, a separate spec covers that.
## Open questions
(None at design-approval time. Captured here for future amendments.)
## References
- `ibflex` library docs (csingley/ibflex)
- Existing patterns in `broker_sync/providers/trading212.py` and
`broker_sync/providers/invest_engine.py`
- `~/code/infra/stacks/broker-sync/main.tf` (CronJob structure to mirror)
- 2026-05-26 Wealthfolio dedup session (motivates the reconciliation step)

199
poetry.lock generated
View file

@ -73,145 +73,6 @@ files = [
{file = "certifi-2026.2.25.tar.gz", hash = "sha256:e887ab5cee78ea814d3472169153c2d12cd43b14bd03329a39a9c6e2e80bfba7"}, {file = "certifi-2026.2.25.tar.gz", hash = "sha256:e887ab5cee78ea814d3472169153c2d12cd43b14bd03329a39a9c6e2e80bfba7"},
] ]
[[package]]
name = "charset-normalizer"
version = "3.4.7"
description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet."
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "charset_normalizer-3.4.7-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:cdd68a1fb318e290a2077696b7eb7a21a49163c455979c639bf5a5dcdc46617d"},
{file = "charset_normalizer-3.4.7-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e17b8d5d6a8c47c85e68ca8379def1303fd360c3e22093a807cd34a71cd082b8"},
{file = "charset_normalizer-3.4.7-cp310-cp310-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:511ef87c8aec0783e08ac18565a16d435372bc1ac25a91e6ac7f5ef2b0bff790"},
{file = "charset_normalizer-3.4.7-cp310-cp310-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:007d05ec7321d12a40227aae9e2bc6dca73f3cb21058999a1df9e193555a9dcc"},
{file = "charset_normalizer-3.4.7-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:cf29836da5119f3c8a8a70667b0ef5fdca3bb12f80fd06487cfa575b3909b393"},
{file = "charset_normalizer-3.4.7-cp310-cp310-manylinux_2_31_armv7l.whl", hash = "sha256:12d8baf840cc7889b37c7c770f478adea7adce3dcb3944d02ec87508e2dcf153"},
{file = "charset_normalizer-3.4.7-cp310-cp310-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:d560742f3c0d62afaccf9f41fe485ed69bd7661a241f86a3ef0f0fb8b1a397af"},
{file = "charset_normalizer-3.4.7-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:b14b2d9dac08e28bb8046a1a0434b1750eb221c8f5b87a68f4fa11a6f97b5e34"},
{file = "charset_normalizer-3.4.7-cp310-cp310-musllinux_1_2_armv7l.whl", hash = "sha256:bc17a677b21b3502a21f66a8cc64f5bfad4df8a0b8434d661666f8ce90ac3af1"},
{file = "charset_normalizer-3.4.7-cp310-cp310-musllinux_1_2_ppc64le.whl", hash = "sha256:750e02e074872a3fad7f233b47734166440af3cdea0add3e95163110816d6752"},
{file = "charset_normalizer-3.4.7-cp310-cp310-musllinux_1_2_riscv64.whl", hash = "sha256:4e5163c14bffd570ef2affbfdd77bba66383890797df43dc8b4cc7d6f500bf53"},
{file = "charset_normalizer-3.4.7-cp310-cp310-musllinux_1_2_s390x.whl", hash = "sha256:6ed74185b2db44f41ef35fd1617c5888e59792da9bbc9190d6c7300617182616"},
{file = "charset_normalizer-3.4.7-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:94e1885b270625a9a828c9793b4d52a64445299baa1fea5a173bf1d3dd9a1a5a"},
{file = "charset_normalizer-3.4.7-cp310-cp310-win32.whl", hash = "sha256:6785f414ae0f3c733c437e0f3929197934f526d19dfaa75e18fdb4f94c6fb374"},
{file = "charset_normalizer-3.4.7-cp310-cp310-win_amd64.whl", hash = "sha256:6696b7688f54f5af4462118f0bfa7c1621eeb87154f77fa04b9295ce7a8f2943"},
{file = "charset_normalizer-3.4.7-cp310-cp310-win_arm64.whl", hash = "sha256:66671f93accb62ed07da56613636f3641f1a12c13046ce91ffc923721f23c008"},
{file = "charset_normalizer-3.4.7-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:7641bb8895e77f921102f72833904dcd9901df5d6d72a2ab8f31d04b7e51e4e7"},
{file = "charset_normalizer-3.4.7-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:202389074300232baeb53ae2569a60901f7efadd4245cf3a3bf0617d60b439d7"},
{file = "charset_normalizer-3.4.7-cp311-cp311-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:30b8d1d8c52a48c2c5690e152c169b673487a2a58de1ec7393196753063fcd5e"},
{file = "charset_normalizer-3.4.7-cp311-cp311-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:532bc9bf33a68613fd7d65e4b1c71a6a38d7d42604ecf239c77392e9b4e8998c"},
{file = "charset_normalizer-3.4.7-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2fe249cb4651fd12605b7288b24751d8bfd46d35f12a20b1ba33dea122e690df"},
{file = "charset_normalizer-3.4.7-cp311-cp311-manylinux_2_31_armv7l.whl", hash = "sha256:65bcd23054beab4d166035cabbc868a09c1a49d1efe458fe8e4361215df40265"},
{file = "charset_normalizer-3.4.7-cp311-cp311-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:08e721811161356f97b4059a9ba7bafb23ea5ee2255402c42881c214e173c6b4"},
{file = "charset_normalizer-3.4.7-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:e060d01aec0a910bdccb8be71faf34e7799ce36950f8294c8bf612cba65a2c9e"},
{file = "charset_normalizer-3.4.7-cp311-cp311-musllinux_1_2_armv7l.whl", hash = "sha256:38c0109396c4cfc574d502df99742a45c72c08eff0a36158b6f04000043dbf38"},
{file = "charset_normalizer-3.4.7-cp311-cp311-musllinux_1_2_ppc64le.whl", hash = "sha256:1c2a768fdd44ee4a9339a9b0b130049139b8ce3c01d2ce09f67f5a68048d477c"},
{file = "charset_normalizer-3.4.7-cp311-cp311-musllinux_1_2_riscv64.whl", hash = "sha256:1a87ca9d5df6fe460483d9a5bbf2b18f620cbed41b432e2bddb686228282d10b"},
{file = "charset_normalizer-3.4.7-cp311-cp311-musllinux_1_2_s390x.whl", hash = "sha256:d635aab80466bc95771bb78d5370e74d36d1fe31467b6b29b8b57b2a3cd7d22c"},
{file = "charset_normalizer-3.4.7-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:ae196f021b5e7c78e918242d217db021ed2a6ace2bc6ae94c0fc596221c7f58d"},
{file = "charset_normalizer-3.4.7-cp311-cp311-win32.whl", hash = "sha256:adb2597b428735679446b46c8badf467b4ca5f5056aae4d51a19f9570301b1ad"},
{file = "charset_normalizer-3.4.7-cp311-cp311-win_amd64.whl", hash = "sha256:8e385e4267ab76874ae30db04c627faaaf0b509e1ccc11a95b3fc3e83f855c00"},
{file = "charset_normalizer-3.4.7-cp311-cp311-win_arm64.whl", hash = "sha256:d4a48e5b3c2a489fae013b7589308a40146ee081f6f509e047e0e096084ceca1"},
{file = "charset_normalizer-3.4.7-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:eca9705049ad3c7345d574e3510665cb2cf844c2f2dcfe675332677f081cbd46"},
{file = "charset_normalizer-3.4.7-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6178f72c5508bfc5fd446a5905e698c6212932f25bcdd4b47a757a50605a90e2"},
{file = "charset_normalizer-3.4.7-cp312-cp312-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:e1421b502d83040e6d7fb2fb18dff63957f720da3d77b2fbd3187ceb63755d7b"},
{file = "charset_normalizer-3.4.7-cp312-cp312-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:edac0f1ab77644605be2cbba52e6b7f630731fc42b34cb0f634be1a6eface56a"},
{file = "charset_normalizer-3.4.7-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5649fd1c7bade02f320a462fdefd0b4bd3ce036065836d4f42e0de958038e116"},
{file = "charset_normalizer-3.4.7-cp312-cp312-manylinux_2_31_armv7l.whl", hash = "sha256:203104ed3e428044fd943bc4bf45fa73c0730391f9621e37fe39ecf477b128cb"},
{file = "charset_normalizer-3.4.7-cp312-cp312-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:298930cec56029e05497a76988377cbd7457ba864beeea92ad7e844fe74cd1f1"},
{file = "charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:708838739abf24b2ceb208d0e22403dd018faeef86ddac04319a62ae884c4f15"},
{file = "charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_armv7l.whl", hash = "sha256:0f7eb884681e3938906ed0434f20c63046eacd0111c4ba96f27b76084cd679f5"},
{file = "charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:4dc1e73c36828f982bfe79fadf5919923f8a6f4df2860804db9a98c48824ce8d"},
{file = "charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_riscv64.whl", hash = "sha256:aed52fea0513bac0ccde438c188c8a471c4e0f457c2dd20cdbf6ea7a450046c7"},
{file = "charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:fea24543955a6a729c45a73fe90e08c743f0b3334bbf3201e6c4bc1b0c7fa464"},
{file = "charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:bb6d88045545b26da47aa879dd4a89a71d1dce0f0e549b1abcb31dfe4a8eac49"},
{file = "charset_normalizer-3.4.7-cp312-cp312-win32.whl", hash = "sha256:2257141f39fe65a3fdf38aeccae4b953e5f3b3324f4ff0daf9f15b8518666a2c"},
{file = "charset_normalizer-3.4.7-cp312-cp312-win_amd64.whl", hash = "sha256:5ed6ab538499c8644b8a3e18debabcd7ce684f3fa91cf867521a7a0279cab2d6"},
{file = "charset_normalizer-3.4.7-cp312-cp312-win_arm64.whl", hash = "sha256:56be790f86bfb2c98fb742ce566dfb4816e5a83384616ab59c49e0604d49c51d"},
{file = "charset_normalizer-3.4.7-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:f496c9c3cc02230093d8330875c4c3cdfc3b73612a5fd921c65d39cbcef08063"},
{file = "charset_normalizer-3.4.7-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0ea948db76d31190bf08bd371623927ee1339d5f2a0b4b1b4a4439a65298703c"},
{file = "charset_normalizer-3.4.7-cp313-cp313-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:a277ab8928b9f299723bc1a2dabb1265911b1a76341f90a510368ca44ad9ab66"},
{file = "charset_normalizer-3.4.7-cp313-cp313-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:3bec022aec2c514d9cf199522a802bd007cd588ab17ab2525f20f9c34d067c18"},
{file = "charset_normalizer-3.4.7-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e044c39e41b92c845bc815e5ae4230804e8e7bc29e399b0437d64222d92809dd"},
{file = "charset_normalizer-3.4.7-cp313-cp313-manylinux_2_31_armv7l.whl", hash = "sha256:f495a1652cf3fbab2eb0639776dad966c2fb874d79d87ca07f9d5f059b8bd215"},
{file = "charset_normalizer-3.4.7-cp313-cp313-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:e712b419df8ba5e42b226c510472b37bd57b38e897d3eca5e8cfd410a29fa859"},
{file = "charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:7804338df6fcc08105c7745f1502ba68d900f45fd770d5bdd5288ddccb8a42d8"},
{file = "charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_armv7l.whl", hash = "sha256:481551899c856c704d58119b5025793fa6730adda3571971af568f66d2424bb5"},
{file = "charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:f59099f9b66f0d7145115e6f80dd8b1d847176df89b234a5a6b3f00437aa0832"},
{file = "charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_riscv64.whl", hash = "sha256:f59ad4c0e8f6bba240a9bb85504faa1ab438237199d4cce5f622761507b8f6a6"},
{file = "charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:3dedcc22d73ec993f42055eff4fcfed9318d1eeb9a6606c55892a26964964e48"},
{file = "charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:64f02c6841d7d83f832cd97ccf8eb8a906d06eb95d5276069175c696b024b60a"},
{file = "charset_normalizer-3.4.7-cp313-cp313-win32.whl", hash = "sha256:4042d5c8f957e15221d423ba781e85d553722fc4113f523f2feb7b188cc34c5e"},
{file = "charset_normalizer-3.4.7-cp313-cp313-win_amd64.whl", hash = "sha256:3946fa46a0cf3e4c8cb1cc52f56bb536310d34f25f01ca9b6c16afa767dab110"},
{file = "charset_normalizer-3.4.7-cp313-cp313-win_arm64.whl", hash = "sha256:80d04837f55fc81da168b98de4f4b797ef007fc8a79ab71c6ec9bc4dd662b15b"},
{file = "charset_normalizer-3.4.7-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:c36c333c39be2dbca264d7803333c896ab8fa7d4d6f0ab7edb7dfd7aea6e98c0"},
{file = "charset_normalizer-3.4.7-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1c2aed2e5e41f24ea8ef1590b8e848a79b56f3a5564a65ceec43c9d692dc7d8a"},
{file = "charset_normalizer-3.4.7-cp314-cp314-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:54523e136b8948060c0fa0bc7b1b50c32c186f2fceee897a495406bb6e311d2b"},
{file = "charset_normalizer-3.4.7-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:715479b9a2802ecac752a3b0efa2b0b60285cf962ee38414211abdfccc233b41"},
{file = "charset_normalizer-3.4.7-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bd6c2a1c7573c64738d716488d2cdd3c00e340e4835707d8fdb8dc1a66ef164e"},
{file = "charset_normalizer-3.4.7-cp314-cp314-manylinux_2_31_armv7l.whl", hash = "sha256:c45e9440fb78f8ddabcf714b68f936737a121355bf59f3907f4e17721b9d1aae"},
{file = "charset_normalizer-3.4.7-cp314-cp314-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:3534e7dcbdcf757da6b85a0bbf5b6868786d5982dd959b065e65481644817a18"},
{file = "charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:e8ac484bf18ce6975760921bb6148041faa8fef0547200386ea0b52b5d27bf7b"},
{file = "charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_armv7l.whl", hash = "sha256:a5fe03b42827c13cdccd08e6c0247b6a6d4b5e3cdc53fd1749f5896adcdc2356"},
{file = "charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_ppc64le.whl", hash = "sha256:2d6eb928e13016cea4f1f21d1e10c1cebd5a421bc57ddf5b1142ae3f86824fab"},
{file = "charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_riscv64.whl", hash = "sha256:e74327fb75de8986940def6e8dee4f127cc9752bee7355bb323cc5b2659b6d46"},
{file = "charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_s390x.whl", hash = "sha256:d6038d37043bced98a66e68d3aa2b6a35505dc01328cd65217cefe82f25def44"},
{file = "charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:7579e913a5339fb8fa133f6bbcfd8e6749696206cf05acdbdca71a1b436d8e72"},
{file = "charset_normalizer-3.4.7-cp314-cp314-win32.whl", hash = "sha256:5b77459df20e08151cd6f8b9ef8ef1f961ef73d85c21a555c7eed5b79410ec10"},
{file = "charset_normalizer-3.4.7-cp314-cp314-win_amd64.whl", hash = "sha256:92a0a01ead5e668468e952e4238cccd7c537364eb7d851ab144ab6627dbbe12f"},
{file = "charset_normalizer-3.4.7-cp314-cp314-win_arm64.whl", hash = "sha256:67f6279d125ca0046a7fd386d01b311c6363844deac3e5b069b514ba3e63c246"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:effc3f449787117233702311a1b7d8f59cba9ced946ba727bdc329ec69028e24"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:fbccdc05410c9ee21bbf16a35f4c1d16123dcdeb8a1d38f33654fa21d0234f79"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:733784b6d6def852c814bce5f318d25da2ee65dd4839a0718641c696e09a2960"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:a89c23ef8d2c6b27fd200a42aa4ac72786e7c60d40efdc76e6011260b6e949c4"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6c114670c45346afedc0d947faf3c7f701051d2518b943679c8ff88befe14f8e"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-manylinux_2_31_armv7l.whl", hash = "sha256:a180c5e59792af262bf263b21a3c49353f25945d8d9f70628e73de370d55e1e1"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:3c9a494bc5ec77d43cea229c4f6db1e4d8fe7e1bbffa8b6f0f0032430ff8ab44"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:8d828b6667a32a728a1ad1d93957cdf37489c57b97ae6c4de2860fa749b8fc1e"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_armv7l.whl", hash = "sha256:cf1493cd8607bec4d8a7b9b004e699fcf8f9103a9284cc94962cb73d20f9d4a3"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_ppc64le.whl", hash = "sha256:0c96c3b819b5c3e9e165495db84d41914d6894d55181d2d108cc1a69bfc9cce0"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_riscv64.whl", hash = "sha256:752a45dc4a6934060b3b0dab47e04edc3326575f82be64bc4fc293914566503e"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_s390x.whl", hash = "sha256:8778f0c7a52e56f75d12dae53ae320fae900a8b9b4164b981b9c5ce059cd1fcb"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:ce3412fbe1e31eb81ea42f4169ed94861c56e643189e1e75f0041f3fe7020abe"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-win32.whl", hash = "sha256:c03a41a8784091e67a39648f70c5f97b5b6a37f216896d44d2cdcb82615339a0"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-win_amd64.whl", hash = "sha256:03853ed82eeebbce3c2abfdbc98c96dc205f32a79627688ac9a27370ea61a49c"},
{file = "charset_normalizer-3.4.7-cp314-cp314t-win_arm64.whl", hash = "sha256:c35abb8bfff0185efac5878da64c45dafd2b37fb0383add1be155a763c1f083d"},
{file = "charset_normalizer-3.4.7-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:e5f4d355f0a2b1a31bc3edec6795b46324349c9cb25eed068049e4f472fb4259"},
{file = "charset_normalizer-3.4.7-cp38-cp38-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:16d971e29578a5e97d7117866d15889a4a07befe0e87e703ed63cd90cb348c01"},
{file = "charset_normalizer-3.4.7-cp38-cp38-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:dca4bbc466a95ba9c0234ef56d7dd9509f63da22274589ebd4ed7f1f4d4c54e3"},
{file = "charset_normalizer-3.4.7-cp38-cp38-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:e80c8378d8f3d83cd3164da1ad2df9e37a666cdde7b1cb2298ed0b558064be30"},
{file = "charset_normalizer-3.4.7-cp38-cp38-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:36836d6ff945a00b88ba1e4572d721e60b5b8c98c155d465f56ad19d68f23734"},
{file = "charset_normalizer-3.4.7-cp38-cp38-manylinux_2_31_armv7l.whl", hash = "sha256:bd9b23791fe793e4968dba0c447e12f78e425c59fc0e3b97f6450f4781f3ee60"},
{file = "charset_normalizer-3.4.7-cp38-cp38-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:aef65cd602a6d0e0ff6f9930fcb1c8fec60dd2cfcb6facaf4bdb0e5873042db0"},
{file = "charset_normalizer-3.4.7-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:82b271f5137d07749f7bf32f70b17ab6eaabedd297e75dce75081a24f76eb545"},
{file = "charset_normalizer-3.4.7-cp38-cp38-musllinux_1_2_armv7l.whl", hash = "sha256:1efde3cae86c8c273f1eb3b287be7d8499420cf2fe7585c41d370d3e790054a5"},
{file = "charset_normalizer-3.4.7-cp38-cp38-musllinux_1_2_ppc64le.whl", hash = "sha256:c593052c465475e64bbfe5dbd81680f64a67fdc752c56d7a0ae205dc8aeefe0f"},
{file = "charset_normalizer-3.4.7-cp38-cp38-musllinux_1_2_riscv64.whl", hash = "sha256:af21eb4409a119e365397b2adbaca4c9ccab56543a65d5dbd9f920d6ac29f686"},
{file = "charset_normalizer-3.4.7-cp38-cp38-musllinux_1_2_s390x.whl", hash = "sha256:84c018e49c3bf790f9c2771c45e9313a08c2c2a6342b162cd650258b57817706"},
{file = "charset_normalizer-3.4.7-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:dd915403e231e6b1809fe9b6d9fc55cf8fb5e02765ac625d9cd623342a7905d7"},
{file = "charset_normalizer-3.4.7-cp38-cp38-win32.whl", hash = "sha256:320ade88cfb846b8cd6b4ddf5ee9e80ee0c1f52401f2456b84ae1ae6a1a5f207"},
{file = "charset_normalizer-3.4.7-cp38-cp38-win_amd64.whl", hash = "sha256:1dc8b0ea451d6e69735094606991f32867807881400f808a106ee1d963c46a83"},
{file = "charset_normalizer-3.4.7-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:177a0ba5f0211d488e295aaf82707237e331c24788d8d76c96c5a41594723217"},
{file = "charset_normalizer-3.4.7-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6e0d51f618228538a3e8f46bd246f87a6cd030565e015803691603f55e12afb5"},
{file = "charset_normalizer-3.4.7-cp39-cp39-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:14265bfe1f09498b9d8ec91e9ec9fa52775edf90fcbde092b25f4a33d444fea9"},
{file = "charset_normalizer-3.4.7-cp39-cp39-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:87fad7d9ba98c86bcb41b2dc8dbb326619be2562af1f8ff50776a39e55721c5a"},
{file = "charset_normalizer-3.4.7-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f22dec1690b584cea26fade98b2435c132c1b5f68e39f5a0b7627cd7ae31f1dc"},
{file = "charset_normalizer-3.4.7-cp39-cp39-manylinux_2_31_armv7l.whl", hash = "sha256:d61f00a0869d77422d9b2aba989e2d24afa6ffd552af442e0e58de4f35ea6d00"},
{file = "charset_normalizer-3.4.7-cp39-cp39-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:6370e8686f662e6a3941ee48ed4742317cafbe5707e36406e9df792cdb535776"},
{file = "charset_normalizer-3.4.7-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:a6c5863edfbe888d9eff9c8b8087354e27618d9da76425c119293f11712a6319"},
{file = "charset_normalizer-3.4.7-cp39-cp39-musllinux_1_2_armv7l.whl", hash = "sha256:ed065083d0898c9d5b4bbec7b026fd755ff7454e6e8b73a67f8c744b13986e24"},
{file = "charset_normalizer-3.4.7-cp39-cp39-musllinux_1_2_ppc64le.whl", hash = "sha256:2cd4a60d0e2fb04537162c62bbbb4182f53541fe0ede35cdf270a1c1e723cc42"},
{file = "charset_normalizer-3.4.7-cp39-cp39-musllinux_1_2_riscv64.whl", hash = "sha256:813c0e0132266c08eb87469a642cb30aaff57c5f426255419572aaeceeaa7bf4"},
{file = "charset_normalizer-3.4.7-cp39-cp39-musllinux_1_2_s390x.whl", hash = "sha256:07d9e39b01743c3717745f4c530a6349eadbfa043c7577eef86c502c15df2c67"},
{file = "charset_normalizer-3.4.7-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:c0f081d69a6e58272819b70288d3221a6ee64b98df852631c80f293514d3b274"},
{file = "charset_normalizer-3.4.7-cp39-cp39-win32.whl", hash = "sha256:8751d2787c9131302398b11e6c8068053dcb55d5a8964e114b6e196cf16cb366"},
{file = "charset_normalizer-3.4.7-cp39-cp39-win_amd64.whl", hash = "sha256:12a6fff75f6bc66711b73a2f0addfc4c8c15a20e805146a02d147a318962c444"},
{file = "charset_normalizer-3.4.7-cp39-cp39-win_arm64.whl", hash = "sha256:bb8cc7534f51d9a017b93e3e85b260924f909601c3df002bcdb58ddb4dc41a5c"},
{file = "charset_normalizer-3.4.7-py3-none-any.whl", hash = "sha256:3dce51d0f5e7951f8bb4900c257dad282f49190fdbebecd4ba99bcc41fef404d"},
{file = "charset_normalizer-3.4.7.tar.gz", hash = "sha256:ae89db9e5f98a11a4bf50407d4363e7b09b31e55bc117b4f7d80aab97ba009e5"},
]
[[package]] [[package]]
name = "click" name = "click"
version = "8.1.8" version = "8.1.8"
@ -373,24 +234,6 @@ http2 = ["h2 (>=3,<5)"]
socks = ["socksio (==1.*)"] socks = ["socksio (==1.*)"]
zstd = ["zstandard (>=0.18.0)"] zstd = ["zstandard (>=0.18.0)"]
[[package]]
name = "ibflex"
version = "1.1"
description = "Parse Interactive Brokers Flex XML reports and convert to Python types"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "ibflex-1.1-py3-none-any.whl", hash = "sha256:c84e02dafcd17f70587777c2e2f00e3cc1e949e045790bf4fe562fb03dbef434"},
{file = "ibflex-1.1.tar.gz", hash = "sha256:3e5cac02cadcbd22ea46ae4ca306d67c274b7166f40119f5d7d7103a130d032a"},
]
[package.dependencies]
requests = {version = "*", optional = true, markers = "extra == \"web\""}
[package.extras]
web = ["requests"]
[[package]] [[package]]
name = "idna" name = "idna"
version = "3.11" version = "3.11"
@ -820,28 +663,6 @@ files = [
[package.dependencies] [package.dependencies]
six = ">=1.5" six = ">=1.5"
[[package]]
name = "requests"
version = "2.34.2"
description = "Python HTTP for Humans."
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "requests-2.34.2-py3-none-any.whl", hash = "sha256:2a0d60c172f83ac6ab31e4554906c0f3b3588d37b5cb939b1c061f4907e278e0"},
{file = "requests-2.34.2.tar.gz", hash = "sha256:f288924cae4e29463698d6d60bc6a4da69c89185ad1e0bcc4104f584e960b9ed"},
]
[package.dependencies]
certifi = ">=2023.5.7"
charset_normalizer = ">=2,<4"
idna = ">=2.5,<4"
urllib3 = ">=1.26,<3"
[package.extras]
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<8)"]
[[package]] [[package]]
name = "rich" name = "rich"
version = "15.0.0" version = "15.0.0"
@ -979,24 +800,6 @@ files = [
{file = "typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466"}, {file = "typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466"},
] ]
[[package]]
name = "urllib3"
version = "2.7.0"
description = "HTTP library with thread-safe connection pooling, file post, and more."
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "urllib3-2.7.0-py3-none-any.whl", hash = "sha256:9fb4c81ebbb1ce9531cce37674bbc6f1360472bc18ca9a553ede278ef7276897"},
{file = "urllib3-2.7.0.tar.gz", hash = "sha256:231e0ec3b63ceb14667c67be60f2f2c40a518cb38b03af60abc813da26505f4c"},
]
[package.extras]
brotli = ["brotli (>=1.2.0) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=1.2.0.0) ; platform_python_implementation != \"CPython\""]
h2 = ["h2 (>=4,<5)"]
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
zstd = ["backports-zstd (>=1.0.0) ; python_version < \"3.14\""]
[[package]] [[package]]
name = "yapf" name = "yapf"
version = "0.43.0" version = "0.43.0"
@ -1015,4 +818,4 @@ platformdirs = ">=3.5.1"
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = ">=3.11,<3.13" python-versions = ">=3.11,<3.13"
content-hash = "8a704e79729d5bd3cbe78a7e35c51e9da724880915c0152788273b94bd00610d" content-hash = "b3896b2258a425cce9498be9ada5bd48a06d5f2bd7c53ead044ad27c53086bd7"

View file

@ -18,9 +18,6 @@ aiomysql = "^0.3.2"
# long-lived session alive (storage_state + device-trust cookie); actual data # long-lived session alive (storage_state + device-trust cookie); actual data
# is fetched via httpx against the SPA's private JSON backend. # is fetched via httpx against the SPA's private JSON backend.
playwright = "^1.47" playwright = "^1.47"
# IBKR Flex Web Service: pulls Activity Flex Query XML reports (token-auth)
# and parses to typed dataclasses. No Gateway / daily re-auth needed.
ibflex = { version = "^1.1", extras = ["web"] }
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
pytest = "^8.3" pytest = "^8.3"

View file

@ -1,25 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<FlexQueryResponse queryName="broker-sync-activity" type="AF">
<FlexStatements count="1">
<FlexStatement accountId="U12345678" fromDate="2026-05-20" toDate="2026-05-26" period="LastBusinessDay" whenGenerated="2026-05-26T02:00:00">
<AccountInformation accountId="U12345678" acctAlias="" currency="GBP" name="Viktor Test" accountType="Individual"/>
<Trades>
<Trade tradeID="T1001" tradeDate="2026-05-21" tradeTime="14:30:00" symbol="VUAG" buySell="BUY" quantity="10" tradePrice="107.50" currency="GBP" ibCommission="-1.05" assetCategory="STK" exchange="LSEETF"/>
<Trade tradeID="T1002" tradeDate="2026-05-22" tradeTime="09:15:00" symbol="AAPL" buySell="BUY" quantity="5" tradePrice="180.25" currency="USD" ibCommission="-0.50" assetCategory="STK" exchange="NASDAQ"/>
<Trade tradeID="T1003" tradeDate="2026-05-23" tradeTime="11:00:00" symbol="VUAG" buySell="SELL" quantity="2" tradePrice="108.00" currency="GBP" ibCommission="-0.30" assetCategory="STK" exchange="LSEETF"/>
</Trades>
<CashTransactions>
<CashTransaction transactionID="C5001" dateTime="2026-05-22 12:00:00" type="Dividends" amount="3.50" currency="GBP" description="VUAG DIV"/>
<CashTransaction transactionID="C5002" dateTime="2026-05-22 12:00:00" type="Withholding Tax" amount="-0.35" currency="GBP" description="VUAG WHT"/>
</CashTransactions>
<OpenPositions>
<OpenPosition symbol="VUAG" position="8" markPrice="108.20" currency="GBP" assetCategory="STK"/>
<OpenPosition symbol="AAPL" position="5" markPrice="181.00" currency="USD" assetCategory="STK"/>
</OpenPositions>
<CashReport>
<CashReportCurrency accountId="U12345678" currency="BASE_SUMMARY" levelOfDetail="BaseCurrency" startingCash="1.23" endingCash="1.23" endingSettledCash="1.23"/>
<CashReportCurrency accountId="U12345678" currency="USD" levelOfDetail="Currency" startingCash="1.23" endingCash="1.23" endingSettledCash="1.23"/>
</CashReport>
</FlexStatement>
</FlexStatements>
</FlexQueryResponse>

View file

@ -80,66 +80,61 @@ def test_external_id_is_stable_across_reruns() -> None:
def test_price_with_commas_parses() -> None: def test_price_with_commas_parses() -> None:
html = _SELL.replace("$612.34", "$1,612.34") html = _SELL.replace("$612.34", "$1,612.34")
# The first activity is the inferred BUY (date 2025-01-23 ≥ 2026-04-01? no → a = parse_schwab_email(html)[0]
# only one activity for this old-dated email), so index 0 is the SELL. assert a.unit_price == Decimal("1612.34")
acts = parse_schwab_email(html)
sell = next(a for a in acts if a.activity_type is ActivityType.SELL)
assert sell.unit_price == Decimal("1612.34")
# --- Inferred vest BUY --------------------------------------------------- # --- Vest-release parsing -------------------------------------------------
_VEST_RELEASE = """<html><body>
<h2>Release Confirmation</h2>
<p>
Release Date: 15 Mar 2026
Ticker: META
Total Shares Released: 100.0
Market Price: $612.34
Shares Withheld for Taxes: 45
Tax Withholding Amount: $27,555.30
</p>
</body></html>"""
def _recent_sell(date_iso: str = "2026-05-19", qty: str = "55", price: str = "609.35") -> str: def test_vest_release_returns_two_activities_and_vest_event() -> None:
return f""" """Release Confirmation yields a BUY (full vest) + SELL (sell-to-cover) + VestEvent."""
<html><body><table> from broker_sync.providers.parsers.schwab import parse_schwab_email_full
<tr><td class="dark-background-body" align="right">{date_iso}</td></tr>
<tr><td class="dark-background-body" align="right">Sold</td></tr> result = parse_schwab_email_full(_VEST_RELEASE)
<tr><td class="dark-background-body" align="right">{qty}</td></tr> assert result.vest_event is not None
<tr><td class="dark-background-body" align="right">META</td></tr> assert result.vest_event.ticker == "META"
<tr><td class="dark-background-body" align="right">${price}</td></tr> assert result.vest_event.shares_vested == Decimal("100.0")
</table></body></html> assert result.vest_event.shares_sold_to_cover == Decimal("45")
""" assert result.vest_event.fmv_at_vest_usd == Decimal("612.34")
assert result.vest_event.tax_withheld_usd == Decimal("27555.30")
assert result.vest_event.vest_date.date().isoformat() == "2026-03-15"
assert result.vest_event.external_id.startswith("schwab:2026-03-15:META:VEST:")
assert len(result.activities) == 2
buy = result.activities[0]
assert buy.activity_type is ActivityType.BUY
assert buy.quantity == Decimal("100.0")
sell = result.activities[1]
assert sell.activity_type is ActivityType.SELL
assert sell.quantity == Decimal("45")
assert sell.unit_price == Decimal("612.34")
def test_recent_sell_emits_paired_buy() -> None: def test_vest_email_with_unparseable_body_returns_empty() -> None:
"""SELL dated on/after the synthesis boundary triggers a paired BUY.""" """Subject says Release Confirmation but fields missing → empty result, no crash."""
acts = parse_schwab_email(_recent_sell()) from broker_sync.providers.parsers.schwab import parse_schwab_email_full
html = "<html><body>Release Confirmation — please contact support</body></html>"
result = parse_schwab_email_full(html)
assert result.vest_event is None
assert result.activities == []
def test_back_compat_parse_schwab_email_drops_vest_event() -> None:
"""The legacy list[Activity] shape remains stable for existing callers."""
acts = parse_schwab_email(_VEST_RELEASE)
assert len(acts) == 2 assert len(acts) == 2
assert all(isinstance(a.activity_type, ActivityType) for a in acts)
buy = next(a for a in acts if a.activity_type is ActivityType.BUY)
sell = next(a for a in acts if a.activity_type is ActivityType.SELL)
assert buy.quantity == sell.quantity == Decimal("55")
assert buy.unit_price == sell.unit_price == Decimal("609.35")
assert buy.date == sell.date
assert buy.symbol == sell.symbol == "META"
assert "schwab-vest-inferred-from-same-day-sell" in (buy.notes or "")
assert buy.external_id == "schwab:vest:2026-05-19:META:BUY:55"
assert sell.external_id == "schwab:2026-05-19:META:SELL:55"
def test_old_sell_emits_only_sell() -> None:
"""SELL dated before 2026-04-01 (default boundary) skips the paired BUY —
those vests already have csv-sourced BUY rows in Wealthfolio."""
acts = parse_schwab_email(_recent_sell(date_iso="2025-08-19"))
assert len(acts) == 1
assert acts[0].activity_type is ActivityType.SELL
def test_boundary_env_var_overrides(monkeypatch: object) -> None:
"""The synthesis boundary is configurable via env var."""
import os
os.environ["SCHWAB_VEST_INFER_FROM_DATE"] = "2025-01-01"
try:
acts = parse_schwab_email(_recent_sell(date_iso="2025-08-19"))
assert len(acts) == 2 # now in scope
finally:
del os.environ["SCHWAB_VEST_INFER_FROM_DATE"]
def test_buy_email_does_not_emit_inferred_buy() -> None:
"""BUY-direction emails (rare for workplace account) don't get paired."""
acts = parse_schwab_email(_BUY.replace("2024-11-15", "2026-05-15"))
assert len(acts) == 1
assert acts[0].activity_type is ActivityType.BUY

View file

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
from datetime import UTC, date, datetime from datetime import UTC, datetime
from decimal import Decimal from decimal import Decimal
from pathlib import Path from pathlib import Path
@ -13,11 +13,9 @@ from broker_sync.providers.fidelity_planviewer import (
FidelityCreds, FidelityCreds,
FidelityPlanViewerProvider, FidelityPlanViewerProvider,
FidelityProviderConfigError, FidelityProviderConfigError,
fidelity_holdings_to_snapshot, _gains_offset_activity,
gains_offset_delta_activity,
) )
from broker_sync.providers.parsers.fidelity import ( from broker_sync.providers.parsers.fidelity import (
FidelityHolding,
parse_transactions_html, parse_transactions_html,
parse_valuation_json, parse_valuation_json,
) )
@ -98,112 +96,21 @@ def test_parse_valuation_fixture() -> None:
assert set(h.units_by_source.keys()) >= {"SASC", "ERXS"} assert set(h.units_by_source.keys()) >= {"SASC", "ERXS"}
def test_holdings_to_snapshot_real_fixture() -> None: def test_gains_offset_emits_deposit_when_pot_exceeds_contributions() -> None:
html = (_FIXTURES / "transactions-full.html").read_text() html = (_FIXTURES / "transactions-full.html").read_text()
valuation = json.loads((_FIXTURES / "valuation.json").read_text()) valuation = json.loads((_FIXTURES / "valuation.json").read_text())
txs = parse_transactions_html(html)
holdings = parse_valuation_json(valuation) holdings = parse_valuation_json(valuation)
total_contrib = sum((tx.amount for tx in parse_transactions_html(html)), as_of = datetime(2026, 4, 18, tzinfo=UTC)
Decimal(0)) offset = _gains_offset_activity(holdings, txs, as_of)
assert offset is not None
snapshot = fidelity_holdings_to_snapshot( assert offset.activity_type in (ActivityType.DEPOSIT, ActivityType.WITHDRAWAL)
holdings=holdings, assert offset.amount is not None and offset.amount > 0
total_real_contribution=total_contrib, assert offset.external_id == "fidelity:gains:2026-04-18"
as_of=date(2026, 4, 18),
)
assert snapshot is not None
assert snapshot.date == date(2026, 4, 18)
assert snapshot.currency == "GBP"
# Cost basis sums to the cash contributions (allocated by fund value share)
sum_cost = sum((p.total_cost_basis for p in snapshot.positions), Decimal(0))
assert abs(sum_cost - total_contrib) < Decimal("1")
# Meta scheme had KDOA + LAFC + one other at fixture time; the
# dominant fund must be KDOA.
symbols = [p.symbol for p in snapshot.positions]
assert "KDOA" in symbols
kdoa = next(p for p in snapshot.positions if p.symbol == "KDOA")
assert kdoa.quantity > 0
# Proportional cost-basis allocation: KDOA holds nearly the whole pot
# so it should get the lion's share of cost
kdoa_share = kdoa.total_cost_basis / sum_cost
assert kdoa_share > Decimal("0.9")
# cashBalances zero — pension contributions flow straight into funds
assert snapshot.cash_balances == {"GBP": Decimal(0)}
def test_holdings_to_snapshot_none_when_no_holdings() -> None: def test_gains_offset_none_when_no_holdings() -> None:
assert fidelity_holdings_to_snapshot( assert _gains_offset_activity(
holdings=[], total_real_contribution=Decimal("100"), holdings=[], transactions=[],
as_of=date(2026, 4, 18), as_of=datetime(2026, 4, 18, tzinfo=UTC),
) is None
def test_provider_caches_holdings_for_cli_snapshot_push() -> None:
"""The CLI reads `last_holdings` after fetch() drains to push the
manual snapshot. This guards the contract that fetch() populates the
attribute even when no Activity is yielded (e.g., backfill window
cut-off)."""
prov = FidelityPlanViewerProvider(FidelityCreds(
storage_state_path="/tmp/x", plan_id="META",
))
# Pre-fetch state: empty
assert prov.last_holdings == []
assert prov.last_total_contribution == Decimal(0)
# -- delta-shaped gains offset (the monthly accumulation mechanism) --
def _holdings_summing_to(total: Decimal) -> list[FidelityHolding]:
return [FidelityHolding(
fund_code="KDOA", fund_name="Test", units=Decimal("100"),
unit_price=total / Decimal("100"), currency="GBP", total_value=total,
units_by_source={},
)]
def test_gains_delta_emits_deposit_when_gain_exceeds_prior_offset() -> None:
# pot £145k, real contrib £102k → current gain £43k; prior offset £35k
# → delta = +£8k
activity = gains_offset_delta_activity(
holdings=_holdings_summing_to(Decimal("145000")),
total_real_contribution=Decimal("102000"),
prior_offset_cumulative=Decimal("35000"),
as_of=datetime(2026, 5, 17, tzinfo=UTC),
)
assert activity is not None
assert activity.activity_type == ActivityType.DEPOSIT
assert activity.amount == Decimal("8000")
assert activity.external_id == "fidelity:gains-delta:2026-05-17"
assert "unrealised-gains-offset" in (activity.notes or "")
def test_gains_delta_emits_withdrawal_on_market_drop() -> None:
# pot dropped: current gain £30k, prior offset £35k → delta = -£5k
activity = gains_offset_delta_activity(
holdings=_holdings_summing_to(Decimal("132000")),
total_real_contribution=Decimal("102000"),
prior_offset_cumulative=Decimal("35000"),
as_of=datetime(2026, 5, 17, tzinfo=UTC),
)
assert activity is not None
assert activity.activity_type == ActivityType.WITHDRAWAL
assert activity.amount == Decimal("5000")
def test_gains_delta_suppressed_below_minimum() -> None:
# delta ~£0.20, below the £0.50 min — skip emission to avoid noise.
activity = gains_offset_delta_activity(
holdings=_holdings_summing_to(Decimal("137000.20")),
total_real_contribution=Decimal("102000"),
prior_offset_cumulative=Decimal("35000"),
as_of=datetime(2026, 5, 17, tzinfo=UTC),
)
assert activity is None
def test_gains_delta_none_when_no_holdings() -> None:
assert gains_offset_delta_activity(
holdings=[], total_real_contribution=Decimal("0"),
prior_offset_cumulative=Decimal("0"),
as_of=datetime(2026, 5, 17, tzinfo=UTC),
) is None ) is None

View file

@ -1,224 +0,0 @@
from __future__ import annotations
from datetime import datetime
from decimal import Decimal
import pytest
from broker_sync.models import ActivityType
from broker_sync.providers.ibkr import (
IBKRAccountMismatchError,
IBKRProvider,
_map_cash_to_activity,
_map_trade_to_activity,
canonical_symbol,
)
# -- canonical_symbol --
def test_canonical_symbol_lse_etf_gets_l_suffix() -> None:
assert canonical_symbol("VUAG", exchange="LSEETF", currency="GBP") == "VUAG.L"
def test_canonical_symbol_us_stock_unchanged() -> None:
assert canonical_symbol("AAPL", exchange="NASDAQ", currency="USD") == "AAPL"
def test_canonical_symbol_lse_gbp_inferred_when_exchange_missing() -> None:
"""IBKR Flex sometimes omits exchange — infer LSE from currency==GBP."""
assert canonical_symbol("VUAG", exchange=None, currency="GBP") == "VUAG.L"
def test_canonical_symbol_already_suffixed_unchanged() -> None:
assert canonical_symbol("VUAG.L", exchange="LSEETF", currency="GBP") == "VUAG.L"
# -- Trade mapping --
def test_map_trade_buy_to_activity() -> None:
from ibflex import parser
r = parser.parse("tests/fixtures/ibkr/sample_flex.xml")
trade = r.FlexStatements[0].Trades[0] # T1001: 10 VUAG BUY @ 107.50 GBP, comm -1.05
activity = _map_trade_to_activity(trade, account_id="wf-acct-uuid")
assert activity.external_id == "ibkr:trade:T1001"
assert activity.account_id == "wf-acct-uuid"
assert activity.activity_type == ActivityType.BUY
assert activity.symbol == "VUAG.L"
assert activity.quantity == Decimal("10")
assert activity.unit_price == Decimal("107.50")
assert activity.fee == Decimal("1.05")
assert activity.currency == "GBP"
assert isinstance(activity.date, datetime)
assert activity.date.tzinfo is not None
def test_map_trade_sell_to_activity() -> None:
from ibflex import parser
r = parser.parse("tests/fixtures/ibkr/sample_flex.xml")
trade = r.FlexStatements[0].Trades[2] # T1003: 2 VUAG SELL @ 108.00 GBP
activity = _map_trade_to_activity(trade, account_id="wf-acct")
assert activity.activity_type == ActivityType.SELL
assert activity.symbol == "VUAG.L"
assert activity.quantity == Decimal("2")
assert activity.unit_price == Decimal("108.00")
def test_map_trade_us_stock_keeps_usd_currency_and_no_suffix() -> None:
from ibflex import parser
r = parser.parse("tests/fixtures/ibkr/sample_flex.xml")
trade = r.FlexStatements[0].Trades[1] # T1002: AAPL BUY USD
activity = _map_trade_to_activity(trade, account_id="wf-acct")
assert activity.symbol == "AAPL"
assert activity.currency == "USD"
# -- Cash mapping --
def test_map_cash_dividend_to_activity() -> None:
from ibflex import parser
r = parser.parse("tests/fixtures/ibkr/sample_flex.xml")
cash = r.FlexStatements[0].CashTransactions[0] # C5001: Dividends 3.50 GBP
activity = _map_cash_to_activity(cash, account_id="wf-acct")
assert activity is not None
assert activity.external_id == "ibkr:cash:C5001"
assert activity.activity_type == ActivityType.DIVIDEND
assert activity.amount == Decimal("3.50")
assert activity.currency == "GBP"
def test_map_cash_withholding_tax_to_tax_activity() -> None:
from ibflex import parser
r = parser.parse("tests/fixtures/ibkr/sample_flex.xml")
cash = r.FlexStatements[0].CashTransactions[1] # C5002: Withholding Tax -0.35 GBP
activity = _map_cash_to_activity(cash, account_id="wf-acct")
assert activity is not None
assert activity.activity_type == ActivityType.TAX
assert activity.amount == Decimal("0.35") # always positive on Activity
def test_map_cash_unknown_type_returns_none_and_logs(caplog: pytest.LogCaptureFixture) -> None:
"""Unknown CashTransaction.type produces None + a WARNING log line."""
class FakeType:
name = "FrobnicatedThing"
class FakeCash:
transactionID = "C9999"
dateTime = None
type = FakeType()
amount = Decimal("0")
currency = "GBP"
with caplog.at_level("WARNING"):
result = _map_cash_to_activity(FakeCash, account_id="wf-acct")
assert result is None
assert any("FROBNICATEDTHING" in r.message for r in caplog.records)
# -- IBKRProvider end-to-end --
async def test_ibkr_provider_fetch_returns_mapped_activities(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""IBKRProvider.fetch() yields all mapped activities (trades + cash)."""
from ibflex import client as ib_client
with open("tests/fixtures/ibkr/sample_flex.xml", "rb") as f:
xml_bytes = f.read()
monkeypatch.setattr(ib_client, "download", lambda *a, **kw: xml_bytes)
provider = IBKRProvider(
token="t",
query_id="q",
upstream_account_id="U12345678",
)
activities = [a async for a in provider.fetch()]
# 3 trades + 2 cash = 5
assert len(activities) == 5
types = sorted(a.activity_type.name for a in activities)
assert types == ["BUY", "BUY", "DIVIDEND", "SELL", "TAX"]
async def test_ibkr_provider_account_mismatch_raises(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Mismatched accountId raises and writes nothing."""
from ibflex import client as ib_client
with open("tests/fixtures/ibkr/sample_flex.xml", "rb") as f:
xml_bytes = f.read()
monkeypatch.setattr(ib_client, "download", lambda *a, **kw: xml_bytes)
provider = IBKRProvider(
token="t",
query_id="q",
upstream_account_id="U99999999", # WRONG
)
with pytest.raises(IBKRAccountMismatchError, match="U12345678"):
_ = [a async for a in provider.fetch()]
async def test_ibkr_provider_open_positions_after_fetch(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""open_positions() returns canonicalised symbol + qty after fetch drained."""
from ibflex import client as ib_client
with open("tests/fixtures/ibkr/sample_flex.xml", "rb") as f:
xml_bytes = f.read()
monkeypatch.setattr(ib_client, "download", lambda *a, **kw: xml_bytes)
provider = IBKRProvider(
token="t",
query_id="q",
upstream_account_id="U12345678",
)
# drain the iterator before reading positions
[a async for a in provider.fetch()]
positions = provider.open_positions()
# VUAG → VUAG.L (LSE inferred from GBP); AAPL unchanged (USD)
assert dict(positions) == {"VUAG.L": Decimal("8"), "AAPL": Decimal("5")}
async def test_ibkr_provider_cash_balances_after_fetch(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""cash_balances() returns (currency, ending_cash) tuples from CashReport."""
from ibflex import client as ib_client
with open("tests/fixtures/ibkr/sample_flex.xml", "rb") as f:
xml_bytes = f.read()
monkeypatch.setattr(ib_client, "download", lambda *a, **kw: xml_bytes)
provider = IBKRProvider(
token="t",
query_id="q",
upstream_account_id="U12345678",
)
[a async for a in provider.fetch()]
balances = provider.cash_balances()
# Fixture has BASE_SUMMARY + USD rows, both 1.23
assert dict(balances) == {"BASE_SUMMARY": Decimal("1.23"), "USD": Decimal("1.23")}
def test_ibkr_provider_cash_balances_before_fetch_returns_empty() -> None:
"""No CashReport data before fetch()."""
provider = IBKRProvider(token="t", query_id="q", upstream_account_id="U12345678")
assert provider.cash_balances() == []

View file

@ -2,12 +2,8 @@ from __future__ import annotations
from datetime import UTC, date, datetime from datetime import UTC, date, datetime
from decimal import Decimal from decimal import Decimal
from typing import TYPE_CHECKING
from broker_sync.models import AccountType, Activity, ActivityType from broker_sync.models import AccountType, Activity, ActivityType
if TYPE_CHECKING:
from pytest import MonkeyPatch
from broker_sync.providers.imap import ( from broker_sync.providers.imap import (
_IE_GIA_ACCOUNT_ID, _IE_GIA_ACCOUNT_ID,
_IE_ISA_ACCOUNT_ID, _IE_ISA_ACCOUNT_ID,
@ -103,104 +99,3 @@ def test_non_ie_activities_passed_through_unchanged() -> None:
routed = _split_ie_by_isa_cap([schwab_act]) routed = _split_ie_by_isa_cap([schwab_act])
assert routed[0].account_id == "schwab-workplace" assert routed[0].account_id == "schwab-workplace"
assert routed[0].account_type is AccountType.GIA assert routed[0].account_type is AccountType.GIA
def test_invest_engine_skipped_by_default(monkeypatch: MonkeyPatch) -> None:
"""InvestEngine messages MUST be skipped by default, even with no env set.
Post-mortem 2026-05-27: any code path that doesn't set the cron's env
(e.g. `kubectl run --rm` or devvm `poetry run`) was re-importing IE
BUYs through this IMAP path. The opt-out env var was a foot-gun.
Invariant now: structural default skip; opt back in only with
BROKER_SYNC_IMAP_INCLUDE_PROVIDERS.
"""
from broker_sync.providers import imap as imap_mod
from broker_sync.providers.parsers import invest_engine as ie_parser
ie_email = (
b"From: noreply@investengine.com\r\n"
b"Subject: VUAG Bought\r\n"
b"Content-Type: text/plain\r\n\r\n"
b"Vanguard S&P 500: VUAG Bought 10.0 @ 100.0 per share Total: 1000.00\r\n"
)
schwab_email = (
b"From: donotreply@schwab.com\r\n"
b"Subject: Order Confirmed\r\n"
b"Content-Type: text/html\r\n\r\n"
b"<html><body>no-op</body></html>\r\n"
)
monkeypatch.setattr(imap_mod, "_fetch_all", lambda _: [ie_email, schwab_email])
monkeypatch.setattr(ie_parser, "parse_invest_engine_email", lambda raw: [object()])
monkeypatch.setattr(imap_mod, "parse_schwab_email", lambda html: [object()])
creds = imap_mod.ImapCreds(host="h", user="u", password="p", directory="d")
# Default (no env): IE skipped, Schwab parsed.
monkeypatch.delenv("BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS", raising=False)
monkeypatch.delenv("BROKER_SYNC_IMAP_INCLUDE_PROVIDERS", raising=False)
out_default = imap_mod.fetch_activities(creds)
assert len(out_default) == 1, "IE must be skipped by default; only Schwab emitted"
def test_invest_engine_opt_in_via_include_env(monkeypatch: MonkeyPatch) -> None:
"""Setting BROKER_SYNC_IMAP_INCLUDE_PROVIDERS=invest-engine re-enables
IE parsing (escape hatch for the legacy IMAP path)."""
from broker_sync.providers import imap as imap_mod
from broker_sync.providers.parsers import invest_engine as ie_parser
ie_email = b"From: noreply@investengine.com\r\n\r\nirrelevant\r\n"
schwab_email = b"From: donotreply@schwab.com\r\n\r\n<html></html>\r\n"
monkeypatch.setattr(imap_mod, "_fetch_all", lambda _: [ie_email, schwab_email])
monkeypatch.setattr(ie_parser, "parse_invest_engine_email", lambda raw: [object()])
monkeypatch.setattr(imap_mod, "parse_schwab_email", lambda html: [object()])
creds = imap_mod.ImapCreds(host="h", user="u", password="p", directory="d")
monkeypatch.setenv("BROKER_SYNC_IMAP_INCLUDE_PROVIDERS", "invest-engine")
monkeypatch.delenv("BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS", raising=False)
out = imap_mod.fetch_activities(creds)
assert len(out) == 2, "INCLUDE=invest-engine must re-enable IE parsing"
def test_exclude_schwab_still_works(monkeypatch: MonkeyPatch) -> None:
"""EXCLUDE env still works for other providers (forward-compat)."""
from broker_sync.providers import imap as imap_mod
from broker_sync.providers.parsers import invest_engine as ie_parser
schwab_email = b"From: donotreply@schwab.com\r\n\r\n<html></html>\r\n"
monkeypatch.setattr(imap_mod, "_fetch_all", lambda _: [schwab_email])
monkeypatch.setattr(ie_parser, "parse_invest_engine_email", lambda raw: [object()])
monkeypatch.setattr(imap_mod, "parse_schwab_email", lambda html: [object()])
creds = imap_mod.ImapCreds(host="h", user="u", password="p", directory="d")
monkeypatch.setenv("BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS", "schwab")
monkeypatch.delenv("BROKER_SYNC_IMAP_INCLUDE_PROVIDERS", raising=False)
out = imap_mod.fetch_activities(creds)
assert len(out) == 0, "Schwab must be skipped when in EXCLUDE list"
def test_include_overrides_default_and_exclude(monkeypatch: MonkeyPatch) -> None:
"""INCLUDE wins over both the structural default and EXCLUDE env var."""
from broker_sync.providers import imap as imap_mod
monkeypatch.setenv("BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS", "invest-engine,schwab")
monkeypatch.setenv("BROKER_SYNC_IMAP_INCLUDE_PROVIDERS", "invest-engine")
resolved = imap_mod._resolve_excluded_providers()
assert "invest-engine" not in resolved
assert "schwab" in resolved
def test_schwab_subdomain_sender_matches() -> None:
"""Real Schwab trade emails come from `donotreply@mail.schwab.com`
(subdomain), not just `donotreply@schwab.com`. The matcher must
accept either form."""
from broker_sync.providers.imap import _SCHWAB_SENDERS
# Verify the static set works
assert "donotreply@schwab.com" in _SCHWAB_SENDERS
# Verify the subdomain suffix check
for addr in (
"donotreply@mail.schwab.com",
"wealthnotify@equityawards.schwab.com",
):
assert addr.endswith(".schwab.com"), addr

View file

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
from datetime import UTC, date, datetime from datetime import UTC, datetime
from decimal import Decimal from decimal import Decimal
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -12,9 +12,6 @@ import pytest
from broker_sync.models import Account, AccountType, Activity, ActivityType from broker_sync.models import Account, AccountType, Activity, ActivityType
from broker_sync.sinks.wealthfolio import ( from broker_sync.sinks.wealthfolio import (
ImportValidationError, ImportValidationError,
ManualSnapshotPayload,
SnapshotPosition,
WealthfolioError,
WealthfolioSink, WealthfolioSink,
WealthfolioUnauthorizedError, WealthfolioUnauthorizedError,
) )
@ -277,158 +274,3 @@ async def test_import_halts_on_validation_failure(tmp_path: Path) -> None:
with pytest.raises(ImportValidationError, match="unknown symbol"): with pytest.raises(ImportValidationError, match="unknown symbol"):
await sink.import_activities([_buy()]) await sink.import_activities([_buy()])
assert calls == ["/api/v1/activities/import/check"] # real import never hit assert calls == ["/api/v1/activities/import/check"] # real import never hit
# -- Manual snapshot import (Fidelity path) --
@pytest.mark.asyncio
async def test_push_manual_snapshots_serialises_decimals_and_calls_endpoint(
tmp_path: Path,
) -> None:
sp = tmp_path / "s.json"
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
seen: dict[str, Any] = {}
async def handler(req: httpx.Request) -> httpx.Response:
if req.url.path == "/api/v1/snapshots/import":
seen["body"] = json.loads(req.content)
return httpx.Response(
200,
json={"snapshotsImported": 1, "snapshotsFailed": 0, "errors": []},
)
return httpx.Response(404)
sink = _client(httpx.MockTransport(handler), sp)
snapshot = ManualSnapshotPayload(
date=date(2026, 5, 16),
currency="GBP",
positions=[
SnapshotPosition(
symbol="KDOA",
quantity=Decimal("4200.5"),
average_cost=Decimal("24.29"),
total_cost_basis=Decimal("102004.15"),
currency="GBP",
),
],
cash_balances={"GBP": Decimal(0)},
)
result = await sink.push_manual_snapshots(
account_id="a7d6208d-2bd6-4f85-bf54-b77984c78234",
snapshots=[snapshot],
)
assert result["snapshotsImported"] == 1
# Wire format: numeric fields are STRINGS (Decimal.__format__('f'))
body = seen["body"]
assert body["accountId"] == "a7d6208d-2bd6-4f85-bf54-b77984c78234"
pos = body["snapshots"][0]["positions"][0]
assert pos == {
"symbol": "KDOA",
"quantity": "4200.5",
"averageCost": "24.29",
"totalCostBasis": "102004.15",
"currency": "GBP",
}
assert body["snapshots"][0]["cashBalances"] == {"GBP": "0"}
@pytest.mark.asyncio
async def test_push_manual_snapshots_raises_on_partial_failure(
tmp_path: Path,
) -> None:
sp = tmp_path / "s.json"
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
async def handler(req: httpx.Request) -> httpx.Response:
return httpx.Response(
200,
json={
"snapshotsImported": 0,
"snapshotsFailed": 1,
"errors": [{"row": 0, "msg": "bad symbol"}],
},
)
sink = _client(httpx.MockTransport(handler), sp)
snapshot = ManualSnapshotPayload(
date=date(2026, 5, 16), currency="GBP",
positions=[], cash_balances={},
)
with pytest.raises(WealthfolioError, match="bad symbol"):
await sink.push_manual_snapshots(account_id="acct", snapshots=[snapshot])
@pytest.mark.asyncio
async def test_push_manual_snapshots_short_circuits_on_empty(
tmp_path: Path,
) -> None:
sp = tmp_path / "s.json"
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
async def handler(req: httpx.Request) -> httpx.Response:
raise AssertionError(f"unexpected request: {req.method} {req.url.path}")
sink = _client(httpx.MockTransport(handler), sp)
result = await sink.push_manual_snapshots(account_id="acct", snapshots=[])
assert result["snapshotsImported"] == 0
# -- compute_position_qty (used by IBKR reconciliation) --
@pytest.mark.asyncio
async def test_compute_position_qty_sums_buys_minus_sells(tmp_path: Path) -> None:
"""Sums BUY/ADD_HOLDING/TRANSFER_IN minus SELL/REMOVE_HOLDING/TRANSFER_OUT
quantities per symbol, skipping cash activities."""
sp = tmp_path / "s.json"
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
page_1: dict[str, Any] = {
"activities": [
{"symbol": "VUAG.L", "activityType": "BUY", "quantity": "10"},
{"symbol": "VUAG.L", "activityType": "SELL", "quantity": "2"},
{"symbol": "AAPL", "activityType": "BUY", "quantity": "5"},
{"symbol": "$CASH-GBP", "activityType": "DEPOSIT", "quantity": "0",
"amount": "100"},
# Unknown activity type — must be skipped, not crash.
{"symbol": "VUAG.L", "activityType": "DIVIDEND", "quantity": "0",
"amount": "0.5"},
],
"totalPages": 1,
}
async def handler(req: httpx.Request) -> httpx.Response:
if req.url.path == "/api/v1/activities/search":
return httpx.Response(200, json=page_1)
raise AssertionError(f"unexpected request: {req.method} {req.url.path}")
sink = _client(httpx.MockTransport(handler), sp)
result = await sink.compute_position_qty("acct-123")
assert result == {"VUAG.L": Decimal("8"), "AAPL": Decimal("5")}
@pytest.mark.asyncio
async def test_compute_position_qty_paginates(tmp_path: Path) -> None:
"""Walks all pages until totalPages reached."""
sp = tmp_path / "s.json"
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
pages: dict[int, dict[str, Any]] = {
1: {"activities": [{"symbol": "VUAG.L", "activityType": "BUY",
"quantity": "3"}], "totalPages": 2},
2: {"activities": [{"symbol": "VUAG.L", "activityType": "BUY",
"quantity": "4"}], "totalPages": 2},
}
seen_pages: list[int] = []
async def handler(req: httpx.Request) -> httpx.Response:
body = json.loads(req.content)
seen_pages.append(body["page"])
return httpx.Response(200, json=pages[body["page"]])
sink = _client(httpx.MockTransport(handler), sp)
result = await sink.compute_position_qty("acct-x")
assert sorted(seen_pages) == [1, 2]
assert result == {"VUAG.L": Decimal("7")}

View file

@ -1,66 +0,0 @@
from __future__ import annotations
import httpx
import pytest
from broker_sync.metrics import push_pushgateway
async def test_push_pushgateway_posts_text_format() -> None:
captured: dict[str, str] = {}
def transport_handler(request: httpx.Request) -> httpx.Response:
captured["url"] = str(request.url)
captured["method"] = request.method
captured["body"] = request.content.decode("utf-8")
return httpx.Response(200)
transport = httpx.MockTransport(transport_handler)
await push_pushgateway(
job="broker-sync-ibkr",
metrics=[
("ibkr_position_drift_shares", {"symbol": "VUAG.L"}, 0.0),
("ibkr_sync_last_success_timestamp_seconds", {}, 1779830000.0),
],
pushgateway_url="http://pg.example/metrics",
transport=transport,
)
assert captured["method"] == "POST"
assert captured["url"] == "http://pg.example/metrics/job/broker-sync-ibkr"
body = captured["body"]
assert 'ibkr_position_drift_shares{symbol="VUAG.L"} 0.0' in body
assert "ibkr_sync_last_success_timestamp_seconds 1779830000.0" in body
async def test_push_pushgateway_raises_on_non_2xx() -> None:
transport = httpx.MockTransport(lambda r: httpx.Response(500, text="boom"))
with pytest.raises(RuntimeError, match="pushgateway.*500"):
await push_pushgateway(
job="x",
metrics=[("m", {}, 1.0)],
pushgateway_url="http://pg/metrics",
transport=transport,
)
async def test_push_pushgateway_uses_env_var(monkeypatch: pytest.MonkeyPatch) -> None:
captured: dict[str, str] = {}
def handler(request: httpx.Request) -> httpx.Response:
captured["url"] = str(request.url)
return httpx.Response(200)
transport = httpx.MockTransport(handler)
monkeypatch.setenv("PUSHGATEWAY_URL", "http://from-env/metrics")
await push_pushgateway(
job="j",
metrics=[("m", {}, 1.0)],
transport=transport,
)
assert captured["url"] == "http://from-env/metrics/job/j"
async def test_push_pushgateway_raises_when_url_missing(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("PUSHGATEWAY_URL", raising=False)
with pytest.raises(RuntimeError, match="PUSHGATEWAY_URL not set"):
await push_pushgateway(job="j", metrics=[("m", {}, 1.0)])