broker-sync/broker_sync/providers/ibkr.py
Viktor Barzin a4dab03bc5
Some checks failed
CI / test (push) Waiting to run
CI / build (push) Blocked by required conditions
CI / deploy (push) Blocked by required conditions
ci/woodpecker/push/build Pipeline was canceled
cli: add 'broker-sync ibkr' command (Flex pull + import + reconcile + metrics)
2026-05-26 22:29:44 +00:00

257 lines
8.9 KiB
Python

"""Interactive Brokers Flex Web Service ingestion provider.
Pulls daily Activity Flex Query reports via the ``ibflex`` library, maps
Trades + CashTransactions to broker-sync ``Activity`` objects, and runs a
reconciliation step against the broker-reported ``OpenPositions``.
See ``docs/specs/2026-05-26-ibkr-ingest-design.md`` for the full design.
"""
from __future__ import annotations
import logging
from collections.abc import AsyncIterator
from datetime import UTC, date, datetime
from decimal import Decimal
from typing import Any
from broker_sync.models import Account, AccountType, Activity, ActivityType
log = logging.getLogger(__name__)
# Map IBKR currency → default exchange suffix.
# Today: GBP → LSE (.L). Extend when more accounts onboard.
_LSE_EXCHANGES = {"LSE", "LSEETF", "LSEIOB1"}
_GBP_SUFFIX = ".L"
def canonical_symbol(symbol: str, *, exchange: str | None, currency: str) -> str:
"""Return the WF-canonical form of an IBKR ticker.
LSE-listed GBP instruments get a ``.L`` suffix (Wealthfolio convention).
US instruments and anything already suffixed are returned unchanged.
"""
if "." in symbol:
return symbol
if exchange in _LSE_EXCHANGES or (exchange is None and currency == "GBP"):
return symbol + _GBP_SUFFIX
return symbol
def _to_utc_datetime(value: Any, time_value: Any = None) -> datetime:
"""Combine a date (with optional time) into a UTC datetime."""
if isinstance(value, datetime):
dt = value
elif isinstance(value, date):
if isinstance(time_value, str):
dt = datetime.fromisoformat(f"{value.isoformat()}T{time_value}")
elif hasattr(time_value, "isoformat"):
dt = datetime.fromisoformat(f"{value.isoformat()}T{time_value.isoformat()}")
else:
dt = datetime.fromisoformat(f"{value.isoformat()}T00:00:00")
else:
# Last-resort: ISO string
dt = datetime.fromisoformat(str(value))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
return dt.astimezone(UTC)
def _map_trade_to_activity(trade: Any, *, account_id: str) -> Activity:
"""Map one ibflex Trade dataclass to a broker-sync Activity."""
buy_sell_obj = trade.buySell
buy_sell = buy_sell_obj.name if hasattr(buy_sell_obj, "name") else str(buy_sell_obj)
if buy_sell == "BUY":
activity_type = ActivityType.BUY
elif buy_sell == "SELL":
activity_type = ActivityType.SELL
else:
raise ValueError(
f"unsupported Trade.buySell={buy_sell!r} on tradeID={trade.tradeID}"
)
exchange = getattr(trade, "exchange", None)
symbol = canonical_symbol(
str(trade.symbol),
exchange=str(exchange) if exchange is not None else None,
currency=str(trade.currency),
)
quantity = abs(Decimal(str(trade.quantity)))
unit_price = Decimal(str(trade.tradePrice))
commission = trade.ibCommission if trade.ibCommission is not None else Decimal(0)
fee = abs(Decimal(str(commission)))
return Activity(
external_id=f"ibkr:trade:{trade.tradeID}",
account_id=account_id,
account_type=AccountType.GIA,
date=_to_utc_datetime(trade.tradeDate, getattr(trade, "tradeTime", None)),
activity_type=activity_type,
currency=str(trade.currency),
symbol=symbol,
quantity=quantity,
unit_price=unit_price,
fee=fee,
)
# Map known IBKR Flex CashTransaction.type values to broker-sync ActivityType.
# Unknown values yield None + a WARNING — we refuse to guess.
_CASH_TYPE_MAP: dict[str, ActivityType] = {
"DIVIDEND": ActivityType.DIVIDEND,
"DIVIDENDS": ActivityType.DIVIDEND,
"PAYMENT_IN_LIEU_OF_DIVIDENDS": ActivityType.DIVIDEND,
"WITHHOLDING_TAX": ActivityType.TAX,
"WHTAX": ActivityType.TAX,
"BROKER_INTEREST_RECEIVED": ActivityType.INTEREST,
"BROKER_INTEREST_PAID": ActivityType.FEE,
"COMMISSION_ADJUSTMENTS": ActivityType.FEE,
"OTHER_FEES": ActivityType.FEE,
}
_DEPOSIT_WITHDRAWAL_TYPES = {
"DEPOSITS_WITHDRAWALS",
"DEPOSIT_WITHDRAWALS",
"DEPOSITWITHDRAW",
}
def _normalise_cash_type(type_obj: Any) -> str:
"""Canonicalise the IBKR Flex CashTransaction.type enum to an UPPER_SNAKE name."""
if hasattr(type_obj, "name"):
return str(type_obj.name).upper()
return str(type_obj).strip().upper().replace(" ", "_").replace("&", "AND")
def _map_cash_to_activity(cash: Any, *, account_id: str) -> Activity | None:
"""Map one ibflex CashTransaction to a broker-sync Activity.
Returns None for unsupported types (logged at WARNING).
"""
type_name = _normalise_cash_type(cash.type)
amount = Decimal(str(cash.amount))
if type_name in _DEPOSIT_WITHDRAWAL_TYPES:
activity_type = ActivityType.DEPOSIT if amount > 0 else ActivityType.WITHDRAWAL
else:
mapped = _CASH_TYPE_MAP.get(type_name)
if mapped is None:
log.warning(
"ibkr: skipping cash transaction id=%s with unsupported type=%r",
getattr(cash, "transactionID", "?"),
type_name,
)
return None
activity_type = mapped
dt_raw = cash.dateTime
dt = _to_utc_datetime(dt_raw) if dt_raw is not None else datetime.now(UTC)
return Activity(
external_id=f"ibkr:cash:{cash.transactionID}",
account_id=account_id,
account_type=AccountType.GIA,
date=dt,
activity_type=activity_type,
currency=str(cash.currency),
amount=abs(amount),
)
class IBKRError(Exception):
"""Base class for ibkr-provider errors."""
class IBKRAccountMismatchError(IBKRError):
"""Flex statement accountId did not match configured upstream id."""
class IBKRProvider:
"""Fetches IBKR Flex Activity reports and yields broker-sync Activities.
Reconciliation (OpenPositions vs WF-computed qty) is NOT part of
``fetch()`` — it runs at the CLI layer after import, where the
WealthfolioSink is available to query WF.
"""
name = "ibkr"
def __init__(
self,
*,
token: str,
query_id: str,
wf_account_id: str,
upstream_account_id: str,
) -> None:
self._token = token
self._query_id = query_id
self._wf_account_id = wf_account_id
self._upstream_account_id = upstream_account_id
# Stashed for the reconciliation step after fetch() drains.
self._last_response: Any = None
def accounts(self) -> list[Account]:
return [
Account(
id=self._wf_account_id,
name="Interactive Brokers (UK)",
account_type=AccountType.GIA,
currency="GBP", # FX-aware per-trade; account ccy is GBP
provider="ibkr",
)
]
async def close(self) -> None:
# ibflex.client uses synchronous `requests` under the hood; no resources to close.
return
async def fetch(
self,
*,
since: datetime | None = None, # Flex query owns the date range
before: datetime | None = None,
) -> AsyncIterator[Activity]:
from ibflex import client as ib_client
from ibflex import parser as ib_parser
del since, before # unused; Flex query defines the period
xml_bytes = ib_client.download(self._token, self._query_id)
response = ib_parser.parse(xml_bytes)
self._last_response = response
if not response.FlexStatements:
log.warning("ibkr: Flex response had no FlexStatements")
return
stmt = response.FlexStatements[0]
if str(stmt.accountId) != self._upstream_account_id:
raise IBKRAccountMismatchError(
f"Flex statement.accountId={stmt.accountId!r} does not match "
f"configured IBKR_ACCOUNT_ID_UPSTREAM={self._upstream_account_id!r} "
f"— refusing to ingest"
)
for trade in stmt.Trades or []:
yield _map_trade_to_activity(trade, account_id=self._wf_account_id)
for cash in stmt.CashTransactions or []:
activity = _map_cash_to_activity(cash, account_id=self._wf_account_id)
if activity is not None:
yield activity
def open_positions(self) -> list[tuple[str, Decimal]]:
"""Return ``[(canonical_symbol, position_qty), ...]`` from the most
recent fetch. Empty list before the first ``fetch()`` call."""
if self._last_response is None:
return []
stmt = self._last_response.FlexStatements[0]
out: list[tuple[str, Decimal]] = []
for pos in stmt.OpenPositions or []:
exchange = getattr(pos, "exchange", None)
symbol = canonical_symbol(
str(pos.symbol),
exchange=str(exchange) if exchange is not None else None,
currency=str(pos.currency),
)
out.append((symbol, Decimal(str(pos.position))))
return out