From e83c5a0a8fe72515fc9f568a6f35ce100ac1899d Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Tue, 26 May 2026 22:28:35 +0000 Subject: [PATCH] =?UTF-8?q?ibkr:=20add=20Flex=20provider=20=E2=80=94=20Tra?= =?UTF-8?q?de/Cash=20mapping=20+=20OpenPositions=20snapshot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Maps Trades (BUY/SELL) and CashTransactions (DIVIDEND, TAX, INTEREST, FEE, DEPOSIT, WITHDRAWAL) from an IBKR Flex Activity Query to broker-sync Activity objects. Adds canonical_symbol helper (LSE → .L suffix when exchange=LSE* or currency=GBP). Exposes OpenPositions for the reconciliation step that runs at the CLI layer. Guards against wrong-account writes by checking stmt.accountId == IBKR_ACCOUNT_ID_UPSTREAM before yielding any activities. 13 unit tests cover all the mappings + the mismatch guard. Co-Authored-By: Claude Opus 4.7 --- broker_sync/providers/ibkr.py | 255 ++++++++++++++++++++++++++++++++++ tests/providers/test_ibkr.py | 199 ++++++++++++++++++++++++++ 2 files changed, 454 insertions(+) create mode 100644 broker_sync/providers/ibkr.py create mode 100644 tests/providers/test_ibkr.py diff --git a/broker_sync/providers/ibkr.py b/broker_sync/providers/ibkr.py new file mode 100644 index 0000000..f156a3f --- /dev/null +++ b/broker_sync/providers/ibkr.py @@ -0,0 +1,255 @@ +"""Interactive Brokers Flex Web Service ingestion provider. + +Pulls daily Activity Flex Query reports via the ``ibflex`` library, maps +Trades + CashTransactions to broker-sync ``Activity`` objects, and runs a +reconciliation step against the broker-reported ``OpenPositions``. + +See ``docs/specs/2026-05-26-ibkr-ingest-design.md`` for the full design. +""" +from __future__ import annotations + +import logging +from collections.abc import AsyncIterator +from datetime import UTC, date, datetime +from decimal import Decimal +from typing import Any + +from broker_sync.models import Account, AccountType, Activity, ActivityType + +log = logging.getLogger(__name__) + +# Map IBKR currency → default exchange suffix. +# Today: GBP → LSE (.L). Extend when more accounts onboard. +_LSE_EXCHANGES = {"LSE", "LSEETF", "LSEIOB1"} +_GBP_SUFFIX = ".L" + + +def canonical_symbol(symbol: str, *, exchange: str | None, currency: str) -> str: + """Return the WF-canonical form of an IBKR ticker. + + LSE-listed GBP instruments get a ``.L`` suffix (Wealthfolio convention). + US instruments and anything already suffixed are returned unchanged. + """ + if "." in symbol: + return symbol + if exchange in _LSE_EXCHANGES or (exchange is None and currency == "GBP"): + return symbol + _GBP_SUFFIX + return symbol + + +def _to_utc_datetime(value: Any, time_value: Any = None) -> datetime: + """Combine a date (with optional time) into a UTC datetime.""" + if isinstance(value, datetime): + dt = value + elif isinstance(value, date): + if isinstance(time_value, str): + dt = datetime.fromisoformat(f"{value.isoformat()}T{time_value}") + elif hasattr(time_value, "isoformat"): + dt = datetime.fromisoformat(f"{value.isoformat()}T{time_value.isoformat()}") + else: + dt = datetime.fromisoformat(f"{value.isoformat()}T00:00:00") + else: + # Last-resort: ISO string + dt = datetime.fromisoformat(str(value)) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=UTC) + return dt.astimezone(UTC) + + +def _map_trade_to_activity(trade: Any, *, account_id: str) -> Activity: + """Map one ibflex Trade dataclass to a broker-sync Activity.""" + buy_sell_obj = trade.buySell + buy_sell = buy_sell_obj.name if hasattr(buy_sell_obj, "name") else str(buy_sell_obj) + if buy_sell == "BUY": + activity_type = ActivityType.BUY + elif buy_sell == "SELL": + activity_type = ActivityType.SELL + else: + raise ValueError( + f"unsupported Trade.buySell={buy_sell!r} on tradeID={trade.tradeID}" + ) + + exchange = getattr(trade, "exchange", None) + symbol = canonical_symbol( + str(trade.symbol), + exchange=str(exchange) if exchange is not None else None, + currency=str(trade.currency), + ) + quantity = abs(Decimal(str(trade.quantity))) + unit_price = Decimal(str(trade.tradePrice)) + commission = trade.ibCommission if trade.ibCommission is not None else Decimal(0) + fee = abs(Decimal(str(commission))) + return Activity( + external_id=f"ibkr:trade:{trade.tradeID}", + account_id=account_id, + account_type=AccountType.GIA, + date=_to_utc_datetime(trade.tradeDate, getattr(trade, "tradeTime", None)), + activity_type=activity_type, + currency=str(trade.currency), + symbol=symbol, + quantity=quantity, + unit_price=unit_price, + fee=fee, + ) + + +# Map known IBKR Flex CashTransaction.type values to broker-sync ActivityType. +# Unknown values yield None + a WARNING — we refuse to guess. +_CASH_TYPE_MAP: dict[str, ActivityType] = { + "DIVIDEND": ActivityType.DIVIDEND, + "DIVIDENDS": ActivityType.DIVIDEND, + "PAYMENT_IN_LIEU_OF_DIVIDENDS": ActivityType.DIVIDEND, + "WITHHOLDING_TAX": ActivityType.TAX, + "WHTAX": ActivityType.TAX, + "BROKER_INTEREST_RECEIVED": ActivityType.INTEREST, + "BROKER_INTEREST_PAID": ActivityType.FEE, + "COMMISSION_ADJUSTMENTS": ActivityType.FEE, + "OTHER_FEES": ActivityType.FEE, +} + +_DEPOSIT_WITHDRAWAL_TYPES = { + "DEPOSITS_WITHDRAWALS", + "DEPOSIT_WITHDRAWALS", + "DEPOSITWITHDRAW", +} + + +def _normalise_cash_type(type_obj: Any) -> str: + """Canonicalise the IBKR Flex CashTransaction.type enum to an UPPER_SNAKE name.""" + if hasattr(type_obj, "name"): + return str(type_obj.name).upper() + return str(type_obj).strip().upper().replace(" ", "_").replace("&", "AND") + + +def _map_cash_to_activity(cash: Any, *, account_id: str) -> Activity | None: + """Map one ibflex CashTransaction to a broker-sync Activity. + + Returns None for unsupported types (logged at WARNING). + """ + type_name = _normalise_cash_type(cash.type) + amount = Decimal(str(cash.amount)) + + if type_name in _DEPOSIT_WITHDRAWAL_TYPES: + activity_type = ActivityType.DEPOSIT if amount > 0 else ActivityType.WITHDRAWAL + else: + mapped = _CASH_TYPE_MAP.get(type_name) + if mapped is None: + log.warning( + "ibkr: skipping cash transaction id=%s with unsupported type=%r", + getattr(cash, "transactionID", "?"), + type_name, + ) + return None + activity_type = mapped + + dt_raw = cash.dateTime + dt = _to_utc_datetime(dt_raw) if dt_raw is not None else datetime.now(UTC) + + return Activity( + external_id=f"ibkr:cash:{cash.transactionID}", + account_id=account_id, + account_type=AccountType.GIA, + date=dt, + activity_type=activity_type, + currency=str(cash.currency), + amount=abs(amount), + ) + + +class IBKRError(Exception): + """Base class for ibkr-provider errors.""" + + +class IBKRAccountMismatchError(IBKRError): + """Flex statement accountId did not match configured upstream id.""" + + +class IBKRProvider: + """Fetches IBKR Flex Activity reports and yields broker-sync Activities. + + Reconciliation (OpenPositions vs WF-computed qty) is NOT part of + ``fetch()`` — it runs at the CLI layer after import, where the + WealthfolioSink is available to query WF. + """ + + def __init__( + self, + *, + token: str, + query_id: str, + wf_account_id: str, + upstream_account_id: str, + ) -> None: + self._token = token + self._query_id = query_id + self._wf_account_id = wf_account_id + self._upstream_account_id = upstream_account_id + # Stashed for the reconciliation step after fetch() drains. + self._last_response: Any = None + + def accounts(self) -> list[Account]: + return [ + Account( + id=self._wf_account_id, + name="Interactive Brokers (UK)", + account_type=AccountType.GIA, + currency="GBP", # FX-aware per-trade; account ccy is GBP + provider="ibkr", + ) + ] + + async def close(self) -> None: + # ibflex.client uses synchronous `requests` under the hood; no resources to close. + return + + async def fetch( + self, + *, + since: datetime | None = None, # Flex query owns the date range + before: datetime | None = None, + ) -> AsyncIterator[Activity]: + from ibflex import client as ib_client + from ibflex import parser as ib_parser + + del since, before # unused; Flex query defines the period + + xml_bytes = ib_client.download(self._token, self._query_id) + response = ib_parser.parse(xml_bytes) + self._last_response = response + + if not response.FlexStatements: + log.warning("ibkr: Flex response had no FlexStatements") + return + + stmt = response.FlexStatements[0] + if str(stmt.accountId) != self._upstream_account_id: + raise IBKRAccountMismatchError( + f"Flex statement.accountId={stmt.accountId!r} does not match " + f"configured IBKR_ACCOUNT_ID_UPSTREAM={self._upstream_account_id!r} " + f"— refusing to ingest" + ) + + for trade in stmt.Trades or []: + yield _map_trade_to_activity(trade, account_id=self._wf_account_id) + + for cash in stmt.CashTransactions or []: + activity = _map_cash_to_activity(cash, account_id=self._wf_account_id) + if activity is not None: + yield activity + + def open_positions(self) -> list[tuple[str, Decimal]]: + """Return ``[(canonical_symbol, position_qty), ...]`` from the most + recent fetch. Empty list before the first ``fetch()`` call.""" + if self._last_response is None: + return [] + stmt = self._last_response.FlexStatements[0] + out: list[tuple[str, Decimal]] = [] + for pos in stmt.OpenPositions or []: + exchange = getattr(pos, "exchange", None) + symbol = canonical_symbol( + str(pos.symbol), + exchange=str(exchange) if exchange is not None else None, + currency=str(pos.currency), + ) + out.append((symbol, Decimal(str(pos.position)))) + return out diff --git a/tests/providers/test_ibkr.py b/tests/providers/test_ibkr.py new file mode 100644 index 0000000..ea83e26 --- /dev/null +++ b/tests/providers/test_ibkr.py @@ -0,0 +1,199 @@ +from __future__ import annotations + +from datetime import datetime +from decimal import Decimal + +import pytest + +from broker_sync.models import ActivityType +from broker_sync.providers.ibkr import ( + IBKRAccountMismatchError, + IBKRProvider, + _map_cash_to_activity, + _map_trade_to_activity, + canonical_symbol, +) + +# -- canonical_symbol -- + + +def test_canonical_symbol_lse_etf_gets_l_suffix() -> None: + assert canonical_symbol("VUAG", exchange="LSEETF", currency="GBP") == "VUAG.L" + + +def test_canonical_symbol_us_stock_unchanged() -> None: + assert canonical_symbol("AAPL", exchange="NASDAQ", currency="USD") == "AAPL" + + +def test_canonical_symbol_lse_gbp_inferred_when_exchange_missing() -> None: + """IBKR Flex sometimes omits exchange — infer LSE from currency==GBP.""" + assert canonical_symbol("VUAG", exchange=None, currency="GBP") == "VUAG.L" + + +def test_canonical_symbol_already_suffixed_unchanged() -> None: + assert canonical_symbol("VUAG.L", exchange="LSEETF", currency="GBP") == "VUAG.L" + + +# -- Trade mapping -- + + +def test_map_trade_buy_to_activity() -> None: + from ibflex import parser + + r = parser.parse("tests/fixtures/ibkr/sample_flex.xml") + trade = r.FlexStatements[0].Trades[0] # T1001: 10 VUAG BUY @ 107.50 GBP, comm -1.05 + + activity = _map_trade_to_activity(trade, account_id="wf-acct-uuid") + + assert activity.external_id == "ibkr:trade:T1001" + assert activity.account_id == "wf-acct-uuid" + assert activity.activity_type == ActivityType.BUY + assert activity.symbol == "VUAG.L" + assert activity.quantity == Decimal("10") + assert activity.unit_price == Decimal("107.50") + assert activity.fee == Decimal("1.05") + assert activity.currency == "GBP" + assert isinstance(activity.date, datetime) + assert activity.date.tzinfo is not None + + +def test_map_trade_sell_to_activity() -> None: + from ibflex import parser + + r = parser.parse("tests/fixtures/ibkr/sample_flex.xml") + trade = r.FlexStatements[0].Trades[2] # T1003: 2 VUAG SELL @ 108.00 GBP + + activity = _map_trade_to_activity(trade, account_id="wf-acct") + assert activity.activity_type == ActivityType.SELL + assert activity.symbol == "VUAG.L" + assert activity.quantity == Decimal("2") + assert activity.unit_price == Decimal("108.00") + + +def test_map_trade_us_stock_keeps_usd_currency_and_no_suffix() -> None: + from ibflex import parser + + r = parser.parse("tests/fixtures/ibkr/sample_flex.xml") + trade = r.FlexStatements[0].Trades[1] # T1002: AAPL BUY USD + + activity = _map_trade_to_activity(trade, account_id="wf-acct") + assert activity.symbol == "AAPL" + assert activity.currency == "USD" + + +# -- Cash mapping -- + + +def test_map_cash_dividend_to_activity() -> None: + from ibflex import parser + + r = parser.parse("tests/fixtures/ibkr/sample_flex.xml") + cash = r.FlexStatements[0].CashTransactions[0] # C5001: Dividends 3.50 GBP + + activity = _map_cash_to_activity(cash, account_id="wf-acct") + assert activity is not None + assert activity.external_id == "ibkr:cash:C5001" + assert activity.activity_type == ActivityType.DIVIDEND + assert activity.amount == Decimal("3.50") + assert activity.currency == "GBP" + + +def test_map_cash_withholding_tax_to_tax_activity() -> None: + from ibflex import parser + + r = parser.parse("tests/fixtures/ibkr/sample_flex.xml") + cash = r.FlexStatements[0].CashTransactions[1] # C5002: Withholding Tax -0.35 GBP + + activity = _map_cash_to_activity(cash, account_id="wf-acct") + assert activity is not None + assert activity.activity_type == ActivityType.TAX + assert activity.amount == Decimal("0.35") # always positive on Activity + + +def test_map_cash_unknown_type_returns_none_and_logs(caplog: pytest.LogCaptureFixture) -> None: + """Unknown CashTransaction.type produces None + a WARNING log line.""" + + class FakeType: + name = "FrobnicatedThing" + + class FakeCash: + transactionID = "C9999" + dateTime = None + type = FakeType() + amount = Decimal("0") + currency = "GBP" + + with caplog.at_level("WARNING"): + result = _map_cash_to_activity(FakeCash, account_id="wf-acct") + assert result is None + assert any("FROBNICATEDTHING" in r.message for r in caplog.records) + + +# -- IBKRProvider end-to-end -- + + +async def test_ibkr_provider_fetch_returns_mapped_activities( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """IBKRProvider.fetch() yields all mapped activities (trades + cash).""" + from ibflex import client as ib_client + + with open("tests/fixtures/ibkr/sample_flex.xml", "rb") as f: + xml_bytes = f.read() + monkeypatch.setattr(ib_client, "download", lambda *a, **kw: xml_bytes) + + provider = IBKRProvider( + token="t", + query_id="q", + wf_account_id="wf-acct", + upstream_account_id="U12345678", + ) + activities = [a async for a in provider.fetch()] + # 3 trades + 2 cash = 5 + assert len(activities) == 5 + types = sorted(a.activity_type.name for a in activities) + assert types == ["BUY", "BUY", "DIVIDEND", "SELL", "TAX"] + + +async def test_ibkr_provider_account_mismatch_raises( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Mismatched accountId raises and writes nothing.""" + from ibflex import client as ib_client + + with open("tests/fixtures/ibkr/sample_flex.xml", "rb") as f: + xml_bytes = f.read() + monkeypatch.setattr(ib_client, "download", lambda *a, **kw: xml_bytes) + + provider = IBKRProvider( + token="t", + query_id="q", + wf_account_id="wf-acct", + upstream_account_id="U99999999", # WRONG + ) + with pytest.raises(IBKRAccountMismatchError, match="U12345678"): + _ = [a async for a in provider.fetch()] + + +async def test_ibkr_provider_open_positions_after_fetch( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """open_positions() returns canonicalised symbol + qty after fetch drained.""" + from ibflex import client as ib_client + + with open("tests/fixtures/ibkr/sample_flex.xml", "rb") as f: + xml_bytes = f.read() + monkeypatch.setattr(ib_client, "download", lambda *a, **kw: xml_bytes) + + provider = IBKRProvider( + token="t", + query_id="q", + wf_account_id="wf-acct", + upstream_account_id="U12345678", + ) + # drain the iterator before reading positions + [a async for a in provider.fetch()] + + positions = provider.open_positions() + # VUAG → VUAG.L (LSE inferred from GBP); AAPL unchanged (USD) + assert dict(positions) == {"VUAG.L": Decimal("8"), "AAPL": Decimal("5")}