ibkr: add Flex provider — Trade/Cash mapping + OpenPositions snapshot
Maps Trades (BUY/SELL) and CashTransactions (DIVIDEND, TAX, INTEREST, FEE, DEPOSIT, WITHDRAWAL) from an IBKR Flex Activity Query to broker-sync Activity objects. Adds canonical_symbol helper (LSE → .L suffix when exchange=LSE* or currency=GBP). Exposes OpenPositions for the reconciliation step that runs at the CLI layer. Guards against wrong-account writes by checking stmt.accountId == IBKR_ACCOUNT_ID_UPSTREAM before yielding any activities. 13 unit tests cover all the mappings + the mismatch guard. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
882415464e
commit
e83c5a0a8f
2 changed files with 454 additions and 0 deletions
255
broker_sync/providers/ibkr.py
Normal file
255
broker_sync/providers/ibkr.py
Normal file
|
|
@ -0,0 +1,255 @@
|
|||
"""Interactive Brokers Flex Web Service ingestion provider.
|
||||
|
||||
Pulls daily Activity Flex Query reports via the ``ibflex`` library, maps
|
||||
Trades + CashTransactions to broker-sync ``Activity`` objects, and runs a
|
||||
reconciliation step against the broker-reported ``OpenPositions``.
|
||||
|
||||
See ``docs/specs/2026-05-26-ibkr-ingest-design.md`` for the full design.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from collections.abc import AsyncIterator
|
||||
from datetime import UTC, date, datetime
|
||||
from decimal import Decimal
|
||||
from typing import Any
|
||||
|
||||
from broker_sync.models import Account, AccountType, Activity, ActivityType
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# Map IBKR currency → default exchange suffix.
|
||||
# Today: GBP → LSE (.L). Extend when more accounts onboard.
|
||||
_LSE_EXCHANGES = {"LSE", "LSEETF", "LSEIOB1"}
|
||||
_GBP_SUFFIX = ".L"
|
||||
|
||||
|
||||
def canonical_symbol(symbol: str, *, exchange: str | None, currency: str) -> str:
|
||||
"""Return the WF-canonical form of an IBKR ticker.
|
||||
|
||||
LSE-listed GBP instruments get a ``.L`` suffix (Wealthfolio convention).
|
||||
US instruments and anything already suffixed are returned unchanged.
|
||||
"""
|
||||
if "." in symbol:
|
||||
return symbol
|
||||
if exchange in _LSE_EXCHANGES or (exchange is None and currency == "GBP"):
|
||||
return symbol + _GBP_SUFFIX
|
||||
return symbol
|
||||
|
||||
|
||||
def _to_utc_datetime(value: Any, time_value: Any = None) -> datetime:
|
||||
"""Combine a date (with optional time) into a UTC datetime."""
|
||||
if isinstance(value, datetime):
|
||||
dt = value
|
||||
elif isinstance(value, date):
|
||||
if isinstance(time_value, str):
|
||||
dt = datetime.fromisoformat(f"{value.isoformat()}T{time_value}")
|
||||
elif hasattr(time_value, "isoformat"):
|
||||
dt = datetime.fromisoformat(f"{value.isoformat()}T{time_value.isoformat()}")
|
||||
else:
|
||||
dt = datetime.fromisoformat(f"{value.isoformat()}T00:00:00")
|
||||
else:
|
||||
# Last-resort: ISO string
|
||||
dt = datetime.fromisoformat(str(value))
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=UTC)
|
||||
return dt.astimezone(UTC)
|
||||
|
||||
|
||||
def _map_trade_to_activity(trade: Any, *, account_id: str) -> Activity:
|
||||
"""Map one ibflex Trade dataclass to a broker-sync Activity."""
|
||||
buy_sell_obj = trade.buySell
|
||||
buy_sell = buy_sell_obj.name if hasattr(buy_sell_obj, "name") else str(buy_sell_obj)
|
||||
if buy_sell == "BUY":
|
||||
activity_type = ActivityType.BUY
|
||||
elif buy_sell == "SELL":
|
||||
activity_type = ActivityType.SELL
|
||||
else:
|
||||
raise ValueError(
|
||||
f"unsupported Trade.buySell={buy_sell!r} on tradeID={trade.tradeID}"
|
||||
)
|
||||
|
||||
exchange = getattr(trade, "exchange", None)
|
||||
symbol = canonical_symbol(
|
||||
str(trade.symbol),
|
||||
exchange=str(exchange) if exchange is not None else None,
|
||||
currency=str(trade.currency),
|
||||
)
|
||||
quantity = abs(Decimal(str(trade.quantity)))
|
||||
unit_price = Decimal(str(trade.tradePrice))
|
||||
commission = trade.ibCommission if trade.ibCommission is not None else Decimal(0)
|
||||
fee = abs(Decimal(str(commission)))
|
||||
return Activity(
|
||||
external_id=f"ibkr:trade:{trade.tradeID}",
|
||||
account_id=account_id,
|
||||
account_type=AccountType.GIA,
|
||||
date=_to_utc_datetime(trade.tradeDate, getattr(trade, "tradeTime", None)),
|
||||
activity_type=activity_type,
|
||||
currency=str(trade.currency),
|
||||
symbol=symbol,
|
||||
quantity=quantity,
|
||||
unit_price=unit_price,
|
||||
fee=fee,
|
||||
)
|
||||
|
||||
|
||||
# Map known IBKR Flex CashTransaction.type values to broker-sync ActivityType.
|
||||
# Unknown values yield None + a WARNING — we refuse to guess.
|
||||
_CASH_TYPE_MAP: dict[str, ActivityType] = {
|
||||
"DIVIDEND": ActivityType.DIVIDEND,
|
||||
"DIVIDENDS": ActivityType.DIVIDEND,
|
||||
"PAYMENT_IN_LIEU_OF_DIVIDENDS": ActivityType.DIVIDEND,
|
||||
"WITHHOLDING_TAX": ActivityType.TAX,
|
||||
"WHTAX": ActivityType.TAX,
|
||||
"BROKER_INTEREST_RECEIVED": ActivityType.INTEREST,
|
||||
"BROKER_INTEREST_PAID": ActivityType.FEE,
|
||||
"COMMISSION_ADJUSTMENTS": ActivityType.FEE,
|
||||
"OTHER_FEES": ActivityType.FEE,
|
||||
}
|
||||
|
||||
_DEPOSIT_WITHDRAWAL_TYPES = {
|
||||
"DEPOSITS_WITHDRAWALS",
|
||||
"DEPOSIT_WITHDRAWALS",
|
||||
"DEPOSITWITHDRAW",
|
||||
}
|
||||
|
||||
|
||||
def _normalise_cash_type(type_obj: Any) -> str:
|
||||
"""Canonicalise the IBKR Flex CashTransaction.type enum to an UPPER_SNAKE name."""
|
||||
if hasattr(type_obj, "name"):
|
||||
return str(type_obj.name).upper()
|
||||
return str(type_obj).strip().upper().replace(" ", "_").replace("&", "AND")
|
||||
|
||||
|
||||
def _map_cash_to_activity(cash: Any, *, account_id: str) -> Activity | None:
|
||||
"""Map one ibflex CashTransaction to a broker-sync Activity.
|
||||
|
||||
Returns None for unsupported types (logged at WARNING).
|
||||
"""
|
||||
type_name = _normalise_cash_type(cash.type)
|
||||
amount = Decimal(str(cash.amount))
|
||||
|
||||
if type_name in _DEPOSIT_WITHDRAWAL_TYPES:
|
||||
activity_type = ActivityType.DEPOSIT if amount > 0 else ActivityType.WITHDRAWAL
|
||||
else:
|
||||
mapped = _CASH_TYPE_MAP.get(type_name)
|
||||
if mapped is None:
|
||||
log.warning(
|
||||
"ibkr: skipping cash transaction id=%s with unsupported type=%r",
|
||||
getattr(cash, "transactionID", "?"),
|
||||
type_name,
|
||||
)
|
||||
return None
|
||||
activity_type = mapped
|
||||
|
||||
dt_raw = cash.dateTime
|
||||
dt = _to_utc_datetime(dt_raw) if dt_raw is not None else datetime.now(UTC)
|
||||
|
||||
return Activity(
|
||||
external_id=f"ibkr:cash:{cash.transactionID}",
|
||||
account_id=account_id,
|
||||
account_type=AccountType.GIA,
|
||||
date=dt,
|
||||
activity_type=activity_type,
|
||||
currency=str(cash.currency),
|
||||
amount=abs(amount),
|
||||
)
|
||||
|
||||
|
||||
class IBKRError(Exception):
|
||||
"""Base class for ibkr-provider errors."""
|
||||
|
||||
|
||||
class IBKRAccountMismatchError(IBKRError):
|
||||
"""Flex statement accountId did not match configured upstream id."""
|
||||
|
||||
|
||||
class IBKRProvider:
|
||||
"""Fetches IBKR Flex Activity reports and yields broker-sync Activities.
|
||||
|
||||
Reconciliation (OpenPositions vs WF-computed qty) is NOT part of
|
||||
``fetch()`` — it runs at the CLI layer after import, where the
|
||||
WealthfolioSink is available to query WF.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
token: str,
|
||||
query_id: str,
|
||||
wf_account_id: str,
|
||||
upstream_account_id: str,
|
||||
) -> None:
|
||||
self._token = token
|
||||
self._query_id = query_id
|
||||
self._wf_account_id = wf_account_id
|
||||
self._upstream_account_id = upstream_account_id
|
||||
# Stashed for the reconciliation step after fetch() drains.
|
||||
self._last_response: Any = None
|
||||
|
||||
def accounts(self) -> list[Account]:
|
||||
return [
|
||||
Account(
|
||||
id=self._wf_account_id,
|
||||
name="Interactive Brokers (UK)",
|
||||
account_type=AccountType.GIA,
|
||||
currency="GBP", # FX-aware per-trade; account ccy is GBP
|
||||
provider="ibkr",
|
||||
)
|
||||
]
|
||||
|
||||
async def close(self) -> None:
|
||||
# ibflex.client uses synchronous `requests` under the hood; no resources to close.
|
||||
return
|
||||
|
||||
async def fetch(
|
||||
self,
|
||||
*,
|
||||
since: datetime | None = None, # Flex query owns the date range
|
||||
before: datetime | None = None,
|
||||
) -> AsyncIterator[Activity]:
|
||||
from ibflex import client as ib_client
|
||||
from ibflex import parser as ib_parser
|
||||
|
||||
del since, before # unused; Flex query defines the period
|
||||
|
||||
xml_bytes = ib_client.download(self._token, self._query_id)
|
||||
response = ib_parser.parse(xml_bytes)
|
||||
self._last_response = response
|
||||
|
||||
if not response.FlexStatements:
|
||||
log.warning("ibkr: Flex response had no FlexStatements")
|
||||
return
|
||||
|
||||
stmt = response.FlexStatements[0]
|
||||
if str(stmt.accountId) != self._upstream_account_id:
|
||||
raise IBKRAccountMismatchError(
|
||||
f"Flex statement.accountId={stmt.accountId!r} does not match "
|
||||
f"configured IBKR_ACCOUNT_ID_UPSTREAM={self._upstream_account_id!r} "
|
||||
f"— refusing to ingest"
|
||||
)
|
||||
|
||||
for trade in stmt.Trades or []:
|
||||
yield _map_trade_to_activity(trade, account_id=self._wf_account_id)
|
||||
|
||||
for cash in stmt.CashTransactions or []:
|
||||
activity = _map_cash_to_activity(cash, account_id=self._wf_account_id)
|
||||
if activity is not None:
|
||||
yield activity
|
||||
|
||||
def open_positions(self) -> list[tuple[str, Decimal]]:
|
||||
"""Return ``[(canonical_symbol, position_qty), ...]`` from the most
|
||||
recent fetch. Empty list before the first ``fetch()`` call."""
|
||||
if self._last_response is None:
|
||||
return []
|
||||
stmt = self._last_response.FlexStatements[0]
|
||||
out: list[tuple[str, Decimal]] = []
|
||||
for pos in stmt.OpenPositions or []:
|
||||
exchange = getattr(pos, "exchange", None)
|
||||
symbol = canonical_symbol(
|
||||
str(pos.symbol),
|
||||
exchange=str(exchange) if exchange is not None else None,
|
||||
currency=str(pos.currency),
|
||||
)
|
||||
out.append((symbol, Decimal(str(pos.position))))
|
||||
return out
|
||||
199
tests/providers/test_ibkr.py
Normal file
199
tests/providers/test_ibkr.py
Normal file
|
|
@ -0,0 +1,199 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from decimal import Decimal
|
||||
|
||||
import pytest
|
||||
|
||||
from broker_sync.models import ActivityType
|
||||
from broker_sync.providers.ibkr import (
|
||||
IBKRAccountMismatchError,
|
||||
IBKRProvider,
|
||||
_map_cash_to_activity,
|
||||
_map_trade_to_activity,
|
||||
canonical_symbol,
|
||||
)
|
||||
|
||||
# -- canonical_symbol --
|
||||
|
||||
|
||||
def test_canonical_symbol_lse_etf_gets_l_suffix() -> None:
|
||||
assert canonical_symbol("VUAG", exchange="LSEETF", currency="GBP") == "VUAG.L"
|
||||
|
||||
|
||||
def test_canonical_symbol_us_stock_unchanged() -> None:
|
||||
assert canonical_symbol("AAPL", exchange="NASDAQ", currency="USD") == "AAPL"
|
||||
|
||||
|
||||
def test_canonical_symbol_lse_gbp_inferred_when_exchange_missing() -> None:
|
||||
"""IBKR Flex sometimes omits exchange — infer LSE from currency==GBP."""
|
||||
assert canonical_symbol("VUAG", exchange=None, currency="GBP") == "VUAG.L"
|
||||
|
||||
|
||||
def test_canonical_symbol_already_suffixed_unchanged() -> None:
|
||||
assert canonical_symbol("VUAG.L", exchange="LSEETF", currency="GBP") == "VUAG.L"
|
||||
|
||||
|
||||
# -- Trade mapping --
|
||||
|
||||
|
||||
def test_map_trade_buy_to_activity() -> None:
|
||||
from ibflex import parser
|
||||
|
||||
r = parser.parse("tests/fixtures/ibkr/sample_flex.xml")
|
||||
trade = r.FlexStatements[0].Trades[0] # T1001: 10 VUAG BUY @ 107.50 GBP, comm -1.05
|
||||
|
||||
activity = _map_trade_to_activity(trade, account_id="wf-acct-uuid")
|
||||
|
||||
assert activity.external_id == "ibkr:trade:T1001"
|
||||
assert activity.account_id == "wf-acct-uuid"
|
||||
assert activity.activity_type == ActivityType.BUY
|
||||
assert activity.symbol == "VUAG.L"
|
||||
assert activity.quantity == Decimal("10")
|
||||
assert activity.unit_price == Decimal("107.50")
|
||||
assert activity.fee == Decimal("1.05")
|
||||
assert activity.currency == "GBP"
|
||||
assert isinstance(activity.date, datetime)
|
||||
assert activity.date.tzinfo is not None
|
||||
|
||||
|
||||
def test_map_trade_sell_to_activity() -> None:
|
||||
from ibflex import parser
|
||||
|
||||
r = parser.parse("tests/fixtures/ibkr/sample_flex.xml")
|
||||
trade = r.FlexStatements[0].Trades[2] # T1003: 2 VUAG SELL @ 108.00 GBP
|
||||
|
||||
activity = _map_trade_to_activity(trade, account_id="wf-acct")
|
||||
assert activity.activity_type == ActivityType.SELL
|
||||
assert activity.symbol == "VUAG.L"
|
||||
assert activity.quantity == Decimal("2")
|
||||
assert activity.unit_price == Decimal("108.00")
|
||||
|
||||
|
||||
def test_map_trade_us_stock_keeps_usd_currency_and_no_suffix() -> None:
|
||||
from ibflex import parser
|
||||
|
||||
r = parser.parse("tests/fixtures/ibkr/sample_flex.xml")
|
||||
trade = r.FlexStatements[0].Trades[1] # T1002: AAPL BUY USD
|
||||
|
||||
activity = _map_trade_to_activity(trade, account_id="wf-acct")
|
||||
assert activity.symbol == "AAPL"
|
||||
assert activity.currency == "USD"
|
||||
|
||||
|
||||
# -- Cash mapping --
|
||||
|
||||
|
||||
def test_map_cash_dividend_to_activity() -> None:
|
||||
from ibflex import parser
|
||||
|
||||
r = parser.parse("tests/fixtures/ibkr/sample_flex.xml")
|
||||
cash = r.FlexStatements[0].CashTransactions[0] # C5001: Dividends 3.50 GBP
|
||||
|
||||
activity = _map_cash_to_activity(cash, account_id="wf-acct")
|
||||
assert activity is not None
|
||||
assert activity.external_id == "ibkr:cash:C5001"
|
||||
assert activity.activity_type == ActivityType.DIVIDEND
|
||||
assert activity.amount == Decimal("3.50")
|
||||
assert activity.currency == "GBP"
|
||||
|
||||
|
||||
def test_map_cash_withholding_tax_to_tax_activity() -> None:
|
||||
from ibflex import parser
|
||||
|
||||
r = parser.parse("tests/fixtures/ibkr/sample_flex.xml")
|
||||
cash = r.FlexStatements[0].CashTransactions[1] # C5002: Withholding Tax -0.35 GBP
|
||||
|
||||
activity = _map_cash_to_activity(cash, account_id="wf-acct")
|
||||
assert activity is not None
|
||||
assert activity.activity_type == ActivityType.TAX
|
||||
assert activity.amount == Decimal("0.35") # always positive on Activity
|
||||
|
||||
|
||||
def test_map_cash_unknown_type_returns_none_and_logs(caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Unknown CashTransaction.type produces None + a WARNING log line."""
|
||||
|
||||
class FakeType:
|
||||
name = "FrobnicatedThing"
|
||||
|
||||
class FakeCash:
|
||||
transactionID = "C9999"
|
||||
dateTime = None
|
||||
type = FakeType()
|
||||
amount = Decimal("0")
|
||||
currency = "GBP"
|
||||
|
||||
with caplog.at_level("WARNING"):
|
||||
result = _map_cash_to_activity(FakeCash, account_id="wf-acct")
|
||||
assert result is None
|
||||
assert any("FROBNICATEDTHING" in r.message for r in caplog.records)
|
||||
|
||||
|
||||
# -- IBKRProvider end-to-end --
|
||||
|
||||
|
||||
async def test_ibkr_provider_fetch_returns_mapped_activities(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""IBKRProvider.fetch() yields all mapped activities (trades + cash)."""
|
||||
from ibflex import client as ib_client
|
||||
|
||||
with open("tests/fixtures/ibkr/sample_flex.xml", "rb") as f:
|
||||
xml_bytes = f.read()
|
||||
monkeypatch.setattr(ib_client, "download", lambda *a, **kw: xml_bytes)
|
||||
|
||||
provider = IBKRProvider(
|
||||
token="t",
|
||||
query_id="q",
|
||||
wf_account_id="wf-acct",
|
||||
upstream_account_id="U12345678",
|
||||
)
|
||||
activities = [a async for a in provider.fetch()]
|
||||
# 3 trades + 2 cash = 5
|
||||
assert len(activities) == 5
|
||||
types = sorted(a.activity_type.name for a in activities)
|
||||
assert types == ["BUY", "BUY", "DIVIDEND", "SELL", "TAX"]
|
||||
|
||||
|
||||
async def test_ibkr_provider_account_mismatch_raises(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""Mismatched accountId raises and writes nothing."""
|
||||
from ibflex import client as ib_client
|
||||
|
||||
with open("tests/fixtures/ibkr/sample_flex.xml", "rb") as f:
|
||||
xml_bytes = f.read()
|
||||
monkeypatch.setattr(ib_client, "download", lambda *a, **kw: xml_bytes)
|
||||
|
||||
provider = IBKRProvider(
|
||||
token="t",
|
||||
query_id="q",
|
||||
wf_account_id="wf-acct",
|
||||
upstream_account_id="U99999999", # WRONG
|
||||
)
|
||||
with pytest.raises(IBKRAccountMismatchError, match="U12345678"):
|
||||
_ = [a async for a in provider.fetch()]
|
||||
|
||||
|
||||
async def test_ibkr_provider_open_positions_after_fetch(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""open_positions() returns canonicalised symbol + qty after fetch drained."""
|
||||
from ibflex import client as ib_client
|
||||
|
||||
with open("tests/fixtures/ibkr/sample_flex.xml", "rb") as f:
|
||||
xml_bytes = f.read()
|
||||
monkeypatch.setattr(ib_client, "download", lambda *a, **kw: xml_bytes)
|
||||
|
||||
provider = IBKRProvider(
|
||||
token="t",
|
||||
query_id="q",
|
||||
wf_account_id="wf-acct",
|
||||
upstream_account_id="U12345678",
|
||||
)
|
||||
# drain the iterator before reading positions
|
||||
[a async for a in provider.fetch()]
|
||||
|
||||
positions = provider.open_positions()
|
||||
# VUAG → VUAG.L (LSE inferred from GBP); AAPL unchanged (USD)
|
||||
assert dict(positions) == {"VUAG.L": Decimal("8"), "AAPL": Decimal("5")}
|
||||
Loading…
Add table
Add a link
Reference in a new issue