diff --git a/docs/plans/2026-05-26-ibkr-flex-ingestion.md b/docs/plans/2026-05-26-ibkr-flex-ingestion.md new file mode 100644 index 0000000..7a22578 --- /dev/null +++ b/docs/plans/2026-05-26-ibkr-flex-ingestion.md @@ -0,0 +1,1578 @@ +# IBKR Flex Ingestion Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a daily IBKR Flex Web Service → Wealthfolio ingestion path +to `broker-sync`, with mandatory broker-vs-WF position reconciliation. + +**Architecture:** New `IBKRProvider` in `broker_sync/providers/ibkr.py` uses +the `ibflex` library to download + parse Flex XML reports. Mapped activities +flow through the existing pipeline (cash-flow-match → dedup → WF import). +After import, a new reconciliation step compares Flex `OpenPositions` +against WF-computed quantities and pushes drift to Pushgateway. A new K8s +CronJob `broker-sync-ibkr` schedules it at 02:00 UK daily. + +**Tech stack:** Python 3.12, `ibflex ^0.16` (new), `httpx` (existing), +`typer` (existing), Terraform + K8s CronJob (existing pattern), Vault +KV-v2 secret backend (existing), Prometheus Pushgateway (cluster-internal). + +**Spec:** `docs/specs/2026-05-26-ibkr-ingest-design.md` (in this repo). + +--- + +## File Structure + +| Path | Responsibility | New? | +|---|---|---| +| `broker_sync/providers/ibkr.py` | `IBKRProvider` — fetch + parse + map. Module is the entire IBKR ingestion provider. | NEW | +| `broker_sync/metrics.py` | One-function module: `push_pushgateway(job, metrics, labels)` — simple httpx POST to the cluster Pushgateway. Shared by future providers. | NEW | +| `broker_sync/sinks/wealthfolio.py` | Add `compute_position_qty(account_id) -> dict[str, Decimal]` method to `WealthfolioSink`. | MODIFY | +| `broker_sync/cli.py` | Add `@app.command("ibkr")` typer command, parallel to `trading212` and `invest-engine`. | MODIFY | +| `pyproject.toml` | Add `ibflex = "^0.16"` dependency. | MODIFY | +| `tests/providers/test_ibkr.py` | Unit tests for IBKRProvider mapping logic + account guard. | NEW | +| `tests/fixtures/ibkr/sample_flex.xml` | Canned Flex XML fixture (3 trades, 2 cash txns, 2 positions, 1 account). | NEW | +| `tests/sinks/test_wealthfolio.py` | Add tests for the new `compute_position_qty` method. | MODIFY | +| `tests/test_metrics.py` | Test the `push_pushgateway` function with a mock httpx transport. | NEW | +| `infra/stacks/broker-sync/main.tf` | Add `kubernetes_cron_job_v1.ibkr` resource + matching PrometheusRule for drift / staleness alerts. | MODIFY | + +Files are split by responsibility, not by layer. The provider is a single +file (`ibkr.py`) because its three concerns — fetch, parse-map, reconcile +— are tightly coupled by the Flex XML shape. + +--- + +## Task 1: Add the `ibflex` dependency + +**Files:** +- Modify: `pyproject.toml` + +- [ ] **Step 1: Add `ibflex` to dependencies** + +In `pyproject.toml`, under `[tool.poetry.dependencies]`, add: + +```toml +ibflex = "^0.16" +``` + +- [ ] **Step 2: Resolve + install** + +```bash +cd /home/wizard/code/broker-sync && poetry lock --no-update && poetry install +``` + +Expected output: `Installing ibflex (0.16.x)`. No error. + +- [ ] **Step 3: Verify it imports** + +```bash +poetry run python -c "from ibflex import client, parser; print(client, parser)" +``` + +Expected: prints two module objects, no exception. + +- [ ] **Step 4: Commit** + +```bash +git add pyproject.toml poetry.lock +git commit -m "deps: add ibflex for IBKR Flex Web Service ingestion" +``` + +--- + +## Task 2: Fixture — canned Flex XML + +**Files:** +- Create: `tests/fixtures/ibkr/sample_flex.xml` + +- [ ] **Step 1: Create the fixture directory** + +```bash +mkdir -p /home/wizard/code/broker-sync/tests/fixtures/ibkr +``` + +- [ ] **Step 2: Write the fixture file** + +Create `tests/fixtures/ibkr/sample_flex.xml`: + +```xml + + + + + + + + + + + + + + + + + + + + + +``` + +- [ ] **Step 3: Verify ibflex can parse it** + +```bash +cd /home/wizard/code/broker-sync && poetry run python -c " +from ibflex import parser +r = parser.parse('tests/fixtures/ibkr/sample_flex.xml') +s = r.FlexStatements[0] +assert s.accountId == 'U12345678' +assert len(s.Trades) == 3 +assert len(s.CashTransactions) == 2 +assert len(s.OpenPositions) == 2 +print('OK') +" +``` + +Expected: prints `OK`. + +- [ ] **Step 4: Commit** + +```bash +git add tests/fixtures/ibkr/sample_flex.xml +git commit -m "test: add IBKR Flex XML fixture for provider tests" +``` + +--- + +## Task 3: `metrics.py` — Pushgateway client + test + +**Files:** +- Create: `broker_sync/metrics.py` +- Create: `tests/test_metrics.py` + +- [ ] **Step 1: Write the failing test** + +Create `tests/test_metrics.py`: + +```python +from __future__ import annotations + +import httpx +import pytest + +from broker_sync.metrics import push_pushgateway + + +@pytest.mark.asyncio +async def test_push_pushgateway_posts_text_format() -> None: + captured: dict[str, object] = {} + + def transport_handler(request: httpx.Request) -> httpx.Response: + captured["url"] = str(request.url) + captured["method"] = request.method + captured["body"] = request.content.decode("utf-8") + return httpx.Response(200) + + transport = httpx.MockTransport(transport_handler) + await push_pushgateway( + job="broker-sync-ibkr", + metrics=[ + ("ibkr_position_drift_shares", {"symbol": "VUAG.L"}, 0.0), + ("ibkr_sync_last_success_timestamp_seconds", {}, 1779830000.0), + ], + pushgateway_url="http://pg.example/metrics", + transport=transport, + ) + assert captured["method"] == "POST" + assert captured["url"] == "http://pg.example/metrics/job/broker-sync-ibkr" + body = captured["body"] + assert 'ibkr_position_drift_shares{symbol="VUAG.L"} 0.0' in body + assert "ibkr_sync_last_success_timestamp_seconds 1779830000.0" in body + + +@pytest.mark.asyncio +async def test_push_pushgateway_raises_on_non_2xx() -> None: + transport = httpx.MockTransport(lambda r: httpx.Response(500, text="boom")) + with pytest.raises(RuntimeError, match="pushgateway.*500"): + await push_pushgateway( + job="x", + metrics=[("m", {}, 1.0)], + pushgateway_url="http://pg/metrics", + transport=transport, + ) +``` + +- [ ] **Step 2: Run the test and verify it fails** + +```bash +cd /home/wizard/code/broker-sync && poetry run pytest tests/test_metrics.py -v +``` + +Expected: FAIL with `ModuleNotFoundError: No module named 'broker_sync.metrics'`. + +- [ ] **Step 3: Write the implementation** + +Create `broker_sync/metrics.py`: + +```python +"""Pushgateway client for broker-sync providers. + +One function: push a list of (metric, labels, value) tuples to Prometheus +Pushgateway under a given job name. Used by providers to surface +per-run drift / staleness / row counts that Prometheus can alert on. + +In-cluster URL: http://prometheus-prometheus-pushgateway.monitoring:9091/metrics +Pass that via the ``pushgateway_url`` env-driven argument. +""" +from __future__ import annotations + +import logging +import os +from collections.abc import Iterable + +import httpx + +log = logging.getLogger(__name__) + + +def _format_metric(name: str, labels: dict[str, str], value: float) -> str: + if labels: + body = ",".join(f'{k}="{v}"' for k, v in sorted(labels.items())) + return f"{name}{{{body}}} {value}\n" + return f"{name} {value}\n" + + +async def push_pushgateway( + job: str, + metrics: Iterable[tuple[str, dict[str, str], float]], + pushgateway_url: str | None = None, + transport: httpx.AsyncBaseTransport | None = None, +) -> None: + """POST text-format metrics to Pushgateway under ``job``. + + ``pushgateway_url`` defaults to the env var ``PUSHGATEWAY_URL``. + Raises ``RuntimeError`` if the URL is unset or the POST returns non-2xx. + """ + url = pushgateway_url or os.environ.get("PUSHGATEWAY_URL") + if not url: + raise RuntimeError("PUSHGATEWAY_URL not set and no override provided") + body = "".join(_format_metric(name, labels, value) for name, labels, value in metrics) + target = f"{url.rstrip('/')}/job/{job}" + async with httpx.AsyncClient(transport=transport, timeout=15.0) as c: + resp = await c.post(target, content=body, headers={"Content-Type": "text/plain"}) + if resp.status_code >= 300: + raise RuntimeError( + f"pushgateway POST {target} returned HTTP {resp.status_code}: {resp.text[:200]}" + ) + log.info("pushgateway: pushed %d metrics to job=%s", len(body.splitlines()), job) +``` + +- [ ] **Step 4: Run the tests and verify they pass** + +```bash +poetry run pytest tests/test_metrics.py -v +``` + +Expected: both tests pass. + +- [ ] **Step 5: Type + lint check** + +```bash +poetry run mypy broker_sync/metrics.py && poetry run ruff check broker_sync/metrics.py tests/test_metrics.py +``` + +Expected: both clean. + +- [ ] **Step 6: Commit** + +```bash +git add broker_sync/metrics.py tests/test_metrics.py +git commit -m "metrics: add Pushgateway client for broker-sync providers" +``` + +--- + +## Task 4: `WealthfolioSink.compute_position_qty` — and tests + +**Files:** +- Modify: `broker_sync/sinks/wealthfolio.py` +- Modify: `tests/sinks/test_wealthfolio.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/sinks/test_wealthfolio.py`: + +```python +@pytest.mark.asyncio +async def test_compute_position_qty_sums_buys_minus_sells(monkeypatch: MonkeyPatch) -> None: + """compute_position_qty groups activities by symbol and returns + BUY/ADD_HOLDING/TRANSFER_IN minus SELL/REMOVE_HOLDING/TRANSFER_OUT + quantities as Decimal.""" + from broker_sync.sinks.wealthfolio import WealthfolioSink + + fake_activities = [ + # symbol VUAG.L: 10 buys, 2 sells, net 8 + {"symbol": "VUAG.L", "activityType": "BUY", "quantity": "10"}, + {"symbol": "VUAG.L", "activityType": "SELL", "quantity": "2"}, + # symbol AAPL: 5 buys + {"symbol": "AAPL", "activityType": "BUY", "quantity": "5"}, + # cash activities (no asset) — skipped + {"symbol": "$CASH-GBP", "activityType": "DEPOSIT", "quantity": "0", "amount": "100"}, + ] + + sink = WealthfolioSink(base_url="http://wf", username="u", password="p", session_path="/tmp/s") + + async def fake_search(account_id: str, page: int) -> dict: + return {"activities": fake_activities if page == 1 else [], "totalPages": 1} + + monkeypatch.setattr(sink, "_search_activities", fake_search) + result = await sink.compute_position_qty("acct-123") + assert result == {"VUAG.L": Decimal("8"), "AAPL": Decimal("5")} +``` + +Add the `Decimal` import at the top of the test module if missing: + +```python +from decimal import Decimal +``` + +- [ ] **Step 2: Run the test and verify it fails** + +```bash +poetry run pytest tests/sinks/test_wealthfolio.py::test_compute_position_qty_sums_buys_minus_sells -v +``` + +Expected: FAIL with `AttributeError: 'WealthfolioSink' object has no attribute 'compute_position_qty'` (or similar — `_search_activities` may also be missing). + +- [ ] **Step 3: Add the method to WealthfolioSink** + +In `broker_sync/sinks/wealthfolio.py`, inside the `WealthfolioSink` class, add (alongside the existing methods): + +```python +async def _search_activities(self, account_id: str, page: int) -> dict[str, Any]: + """Internal: one page of /activities/search results for an account.""" + resp = await self._request( + "POST", + "/api/v1/activities/search", + json={"accountIds": [account_id], "page": page, "pageSize": 500}, + ) + resp.raise_for_status() + return resp.json() # type: ignore[no-any-return] + +async def compute_position_qty(self, account_id: str) -> dict[str, Decimal]: + """Return per-symbol net position quantity (BUY/IN minus SELL/OUT) for + one account. Skips cash activities. Used by the IBKR reconciliation + step to compare against broker-reported OpenPositions.""" + qty_by_symbol: dict[str, Decimal] = {} + page = 1 + while True: + payload = await self._search_activities(account_id, page) + activities = payload.get("activities", []) + if not activities: + break + for act in activities: + symbol = act.get("symbol") + if not symbol or symbol.startswith("$CASH"): + continue + act_type = act.get("activityType") + sign: int + if act_type in {"BUY", "ADD_HOLDING", "TRANSFER_IN"}: + sign = 1 + elif act_type in {"SELL", "REMOVE_HOLDING", "TRANSFER_OUT"}: + sign = -1 + else: + continue + qty = Decimal(str(act.get("quantity") or 0)) + qty_by_symbol[symbol] = qty_by_symbol.get(symbol, Decimal(0)) + sign * qty + if page >= int(payload.get("totalPages") or 1): + break + page += 1 + return qty_by_symbol +``` + +Add the `Decimal` import at the top of `wealthfolio.py` if missing: + +```python +from decimal import Decimal +``` + +- [ ] **Step 4: Run the test and verify it passes** + +```bash +poetry run pytest tests/sinks/test_wealthfolio.py::test_compute_position_qty_sums_buys_minus_sells -v +``` + +Expected: PASS. + +- [ ] **Step 5: Run mypy + ruff + full pytest** + +```bash +poetry run mypy broker_sync tests && poetry run ruff check . && poetry run pytest -q +``` + +Expected: all clean. + +- [ ] **Step 6: Commit** + +```bash +git add broker_sync/sinks/wealthfolio.py tests/sinks/test_wealthfolio.py +git commit -m "wealthfolio: add compute_position_qty for broker reconciliation" +``` + +--- + +## Task 5: `providers/ibkr.py` — symbol canonicalisation + +**Files:** +- Create: `broker_sync/providers/ibkr.py` +- Create: `tests/providers/test_ibkr.py` + +- [ ] **Step 1: Write the failing test** + +Create `tests/providers/test_ibkr.py`: + +```python +from __future__ import annotations + +import pytest + +from broker_sync.providers.ibkr import canonical_symbol + + +def test_canonical_symbol_lse_etf_gets_l_suffix() -> None: + assert canonical_symbol("VUAG", exchange="LSE", currency="GBP") == "VUAG.L" + + +def test_canonical_symbol_us_stock_unchanged() -> None: + assert canonical_symbol("AAPL", exchange="NASDAQ", currency="USD") == "AAPL" + + +def test_canonical_symbol_lse_gbp_inferred_when_exchange_missing() -> None: + """IBKR Flex sometimes omits exchange. Infer LSE from currency==GBP.""" + assert canonical_symbol("VUAG", exchange=None, currency="GBP") == "VUAG.L" + + +def test_canonical_symbol_already_suffixed_unchanged() -> None: + assert canonical_symbol("VUAG.L", exchange="LSE", currency="GBP") == "VUAG.L" +``` + +- [ ] **Step 2: Run the test and verify it fails** + +```bash +poetry run pytest tests/providers/test_ibkr.py -v +``` + +Expected: FAIL with `ModuleNotFoundError: No module named 'broker_sync.providers.ibkr'`. + +- [ ] **Step 3: Create the provider module with `canonical_symbol`** + +Create `broker_sync/providers/ibkr.py`: + +```python +"""Interactive Brokers Flex Web Service ingestion provider. + +Pulls daily Activity Flex Query reports via the ``ibflex`` library, maps +Trades + CashTransactions to broker-sync ``Activity`` objects, and runs a +reconciliation step against the broker-reported ``OpenPositions``. + +See ``docs/specs/2026-05-26-ibkr-ingest-design.md`` for the full design. +""" +from __future__ import annotations + +import logging +from decimal import Decimal +from typing import TYPE_CHECKING + +from broker_sync.models import Account, AccountType, Activity, ActivityType + +if TYPE_CHECKING: + from ibflex import FlexQueryResponse + +log = logging.getLogger(__name__) + +# Map IBKR currency -> default exchange suffix. +# Only set up for the GBP / LSE case today; extend when more accounts onboard. +_CURRENCY_TO_LSE_SUFFIX = {"GBP": ".L"} + + +def canonical_symbol(symbol: str, *, exchange: str | None, currency: str) -> str: + """Return the WF-canonical form of an IBKR ticker. + + LSE-listed GBP instruments get a ``.L`` suffix (Wealthfolio convention). + US instruments and anything already suffixed are returned unchanged. + """ + if "." in symbol: + return symbol + if exchange in {"LSE", "LSEETF", "LSEIOB1"} or ( + exchange is None and currency in _CURRENCY_TO_LSE_SUFFIX + ): + return symbol + _CURRENCY_TO_LSE_SUFFIX.get(currency, ".L") + return symbol +``` + +- [ ] **Step 4: Run the test and verify it passes** + +```bash +poetry run pytest tests/providers/test_ibkr.py -v +``` + +Expected: 4 tests PASS. + +- [ ] **Step 5: Type + lint** + +```bash +poetry run mypy broker_sync/providers/ibkr.py tests/providers/test_ibkr.py && poetry run ruff check broker_sync/providers/ibkr.py tests/providers/test_ibkr.py +``` + +Expected: clean. + +- [ ] **Step 6: Commit** + +```bash +git add broker_sync/providers/ibkr.py tests/providers/test_ibkr.py +git commit -m "ibkr: add canonical_symbol helper (LSE .L suffix handling)" +``` + +--- + +## Task 6: `_map_trade_to_activity` + +**Files:** +- Modify: `broker_sync/providers/ibkr.py` +- Modify: `tests/providers/test_ibkr.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/providers/test_ibkr.py`: + +```python +def test_map_trade_buy_to_activity() -> None: + """Trade with buySell=BUY maps to Activity(activity_type=BUY) with + positive quantity, fee = abs(ibCommission), external_id = ibkr:trade:.""" + from datetime import datetime + from decimal import Decimal + + from broker_sync.providers.ibkr import _map_trade_to_activity + from ibflex import parser + + r = parser.parse("tests/fixtures/ibkr/sample_flex.xml") + trade = r.FlexStatements[0].Trades[0] # T1001: 10 VUAG BUY @ 107.50 GBP + + activity = _map_trade_to_activity(trade, account_id="wf-acct-uuid") + + assert activity.external_id == "ibkr:trade:T1001" + assert activity.account_id == "wf-acct-uuid" + assert activity.activity_type == ActivityType.BUY + assert activity.symbol == "VUAG.L" + assert activity.quantity == Decimal("10") + assert activity.unit_price == Decimal("107.50") + assert activity.fee == Decimal("1.05") + assert activity.currency == "GBP" + assert isinstance(activity.date, datetime) + assert activity.date.tzinfo is not None +``` + +- [ ] **Step 2: Run and verify it fails** + +```bash +poetry run pytest tests/providers/test_ibkr.py::test_map_trade_buy_to_activity -v +``` + +Expected: FAIL with `ImportError: cannot import name '_map_trade_to_activity'`. + +- [ ] **Step 3: Add the mapper** + +Append to `broker_sync/providers/ibkr.py`: + +```python +from datetime import UTC, datetime # noqa: E402 (grouped here for the mapper section) + +if TYPE_CHECKING: + from ibflex.Types import OpenPosition, Trade + from ibflex.Types import CashTransaction as IBFlexCashTransaction + + +def _trade_to_datetime(trade_date: object, trade_time: str | None) -> datetime: + """Combine Flex tradeDate (a date) + tradeTime (HH:MM:SS TZ) into UTC datetime.""" + if isinstance(trade_date, datetime): + # ibflex sometimes already returns datetime + dt = trade_date + else: + # date object + time_part = (trade_time or "00:00:00 UTC").split()[0] + dt = datetime.fromisoformat(f"{trade_date.isoformat()}T{time_part}") + if dt.tzinfo is None: + dt = dt.replace(tzinfo=UTC) + return dt.astimezone(UTC) + + +def _map_trade_to_activity(trade: Trade, *, account_id: str) -> Activity: + """Map one ibflex Trade dataclass to a broker-sync Activity.""" + buy_sell = str(trade.buySell.name) if hasattr(trade.buySell, "name") else str(trade.buySell) + if buy_sell == "BUY": + activity_type = ActivityType.BUY + elif buy_sell == "SELL": + activity_type = ActivityType.SELL + else: + raise ValueError(f"unsupported Trade.buySell={buy_sell!r} on tradeID={trade.tradeID}") + + symbol = canonical_symbol( + str(trade.symbol), + exchange=getattr(trade, "exchange", None), + currency=str(trade.currency), + ) + quantity = abs(Decimal(str(trade.quantity))) + unit_price = Decimal(str(trade.tradePrice)) + fee = abs(Decimal(str(trade.ibCommission or 0))) + return Activity( + external_id=f"ibkr:trade:{trade.tradeID}", + account_id=account_id, + account_type=AccountType.GIA, + date=_trade_to_datetime(trade.tradeDate, getattr(trade, "tradeTime", None)), + activity_type=activity_type, + currency=str(trade.currency), + symbol=symbol, + quantity=quantity, + unit_price=unit_price, + fee=fee, + ) +``` + +Move the `from datetime import UTC, datetime` import to the top-level imports +section if your repo's lint rules forbid late imports — ruff's E402 is suppressed +here via `# noqa: E402` because grouping helps readability. + +- [ ] **Step 4: Run the test and verify it passes** + +```bash +poetry run pytest tests/providers/test_ibkr.py -v +``` + +Expected: all 5 tests pass. + +- [ ] **Step 5: Type + lint** + +```bash +poetry run mypy broker_sync/providers/ibkr.py && poetry run ruff check broker_sync/providers/ibkr.py +``` + +Expected: clean. + +- [ ] **Step 6: Commit** + +```bash +git add broker_sync/providers/ibkr.py tests/providers/test_ibkr.py +git commit -m "ibkr: map Flex Trades to broker-sync Activities" +``` + +--- + +## Task 7: `_map_cash_to_activity` + +**Files:** +- Modify: `broker_sync/providers/ibkr.py` +- Modify: `tests/providers/test_ibkr.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/providers/test_ibkr.py`: + +```python +def test_map_cash_dividend_to_activity() -> None: + from decimal import Decimal + + from broker_sync.providers.ibkr import _map_cash_to_activity + from ibflex import parser + + r = parser.parse("tests/fixtures/ibkr/sample_flex.xml") + cash = r.FlexStatements[0].CashTransactions[0] # C5001: Dividends 3.50 GBP + + activity = _map_cash_to_activity(cash, account_id="wf-acct-uuid") + assert activity is not None + assert activity.external_id == "ibkr:cash:C5001" + assert activity.activity_type == ActivityType.DIVIDEND + assert activity.amount == Decimal("3.50") + assert activity.currency == "GBP" + + +def test_map_cash_withholding_tax_to_fee_activity() -> None: + from decimal import Decimal + + from broker_sync.providers.ibkr import _map_cash_to_activity + from ibflex import parser + + r = parser.parse("tests/fixtures/ibkr/sample_flex.xml") + cash = r.FlexStatements[0].CashTransactions[1] # C5002: Withholding Tax -0.35 GBP + + activity = _map_cash_to_activity(cash, account_id="wf-acct-uuid") + assert activity is not None + assert activity.activity_type == ActivityType.FEE + assert activity.amount == Decimal("0.35") # always positive on Activity, sign carried by activity_type + + +def test_map_cash_unknown_type_returns_none_and_logs(caplog) -> None: # noqa: ANN001 + """Unknown CashTransaction.type produces None + a WARNING log line. + Same refusal-to-guess convention as the InvestEngine provider.""" + from broker_sync.providers.ibkr import _map_cash_to_activity + + class FakeCash: + transactionID = "C9999" + dateTime = None + type = type("T", (), {"name": "FrobnicatedThing"})() + amount = 0 + currency = "GBP" + + with caplog.at_level("WARNING"): + result = _map_cash_to_activity(FakeCash, account_id="wf-acct-uuid") + assert result is None + assert any("FrobnicatedThing" in r.message for r in caplog.records) +``` + +- [ ] **Step 2: Run and verify the new tests fail** + +```bash +poetry run pytest tests/providers/test_ibkr.py -v +``` + +Expected: 3 FAILs (the new tests), 5 existing PASS. + +- [ ] **Step 3: Add the cash mapper** + +Append to `broker_sync/providers/ibkr.py`: + +```python +# Maps the IBKR Flex CashTransaction.type values we expect to see for a +# stocks/ETFs-only GIA. Unknown values yield None + a WARNING — we refuse +# to guess (per IE/Schwab convention) to avoid silent misclassification. +_CASH_TYPE_MAP: dict[str, ActivityType] = { + "Dividends": ActivityType.DIVIDEND, + "Withholding Tax": ActivityType.FEE, + "Broker Interest Received": ActivityType.DIVIDEND, + "Broker Interest Paid": ActivityType.FEE, + "Commission Adjustments": ActivityType.FEE, + "Other Fees": ActivityType.FEE, +} + + +def _map_cash_to_activity( + cash: IBFlexCashTransaction, *, account_id: str +) -> Activity | None: + """Map one ibflex CashTransaction to a broker-sync Activity. + + Returns None for unsupported types (logged at WARNING). Deposit/Withdrawal + handled separately by sign of amount. + """ + type_obj = cash.type + type_name = type_obj.name if hasattr(type_obj, "name") else str(type_obj) + amount = Decimal(str(cash.amount)) + + # Deposit / Withdrawal split by sign — the Flex "Deposits & Withdrawals" type + if type_name in {"DepositsWithdrawals", "Deposits & Withdrawals", "Deposit Withdrawals"}: + activity_type = ActivityType.DEPOSIT if amount > 0 else ActivityType.WITHDRAWAL + else: + activity_type = _CASH_TYPE_MAP.get(type_name) # type: ignore[assignment] + if activity_type is None: + log.warning( + "ibkr: skipping cash transaction id=%s with unsupported type=%r", + getattr(cash, "transactionID", "?"), + type_name, + ) + return None + + dt = cash.dateTime + if isinstance(dt, datetime) and dt.tzinfo is None: + dt = dt.replace(tzinfo=UTC) + elif not isinstance(dt, datetime): + dt = datetime.now(UTC) # graceful fallback — log path also fine + + return Activity( + external_id=f"ibkr:cash:{cash.transactionID}", + account_id=account_id, + account_type=AccountType.GIA, + date=dt, + activity_type=activity_type, + currency=str(cash.currency), + amount=abs(amount), + ) +``` + +- [ ] **Step 4: Run and verify all tests pass** + +```bash +poetry run pytest tests/providers/test_ibkr.py -v +``` + +Expected: 8 tests pass. + +- [ ] **Step 5: Type + lint + commit** + +```bash +poetry run mypy broker_sync && poetry run ruff check broker_sync tests +git add broker_sync/providers/ibkr.py tests/providers/test_ibkr.py +git commit -m "ibkr: map Flex CashTransactions (dividends, fees, deposits)" +``` + +--- + +## Task 8: `IBKRProvider` class + account guard + +**Files:** +- Modify: `broker_sync/providers/ibkr.py` +- Modify: `tests/providers/test_ibkr.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/providers/test_ibkr.py`: + +```python +@pytest.mark.asyncio +async def test_ibkr_provider_fetch_returns_mapped_activities(monkeypatch) -> None: # noqa: ANN001 + """IBKRProvider.fetch() yields all mapped activities (trades + cash).""" + from broker_sync.providers.ibkr import IBKRProvider + from ibflex import client as ib_client + + with open("tests/fixtures/ibkr/sample_flex.xml", "rb") as f: + xml_bytes = f.read() + monkeypatch.setattr(ib_client, "download", lambda *a, **kw: xml_bytes) + + provider = IBKRProvider( + token="t", + query_id="q", + wf_account_id="wf-acct", + upstream_account_id="U12345678", + ) + activities = [a async for a in provider.fetch()] + # 3 trades + 2 cash = 5 + assert len(activities) == 5 + types = sorted(a.activity_type.name for a in activities) + assert types == ["BUY", "BUY", "DIVIDEND", "FEE", "SELL"] + + +@pytest.mark.asyncio +async def test_ibkr_provider_account_mismatch_raises(monkeypatch) -> None: # noqa: ANN001 + """If Flex statement.accountId differs from the configured upstream id, + refuse to ingest. Prevents wrong-account writes from a misconfigured query.""" + from broker_sync.providers.ibkr import IBKRAccountMismatchError, IBKRProvider + from ibflex import client as ib_client + + with open("tests/fixtures/ibkr/sample_flex.xml", "rb") as f: + xml_bytes = f.read() + monkeypatch.setattr(ib_client, "download", lambda *a, **kw: xml_bytes) + + provider = IBKRProvider( + token="t", + query_id="q", + wf_account_id="wf-acct", + upstream_account_id="U99999999", # WRONG + ) + with pytest.raises(IBKRAccountMismatchError, match="U12345678"): + [a async for a in provider.fetch()] +``` + +- [ ] **Step 2: Run and verify the new tests fail** + +```bash +poetry run pytest tests/providers/test_ibkr.py -v +``` + +Expected: 2 FAILs (the new tests). Existing tests still pass. + +- [ ] **Step 3: Add the IBKRProvider class** + +Append to `broker_sync/providers/ibkr.py`: + +```python +from collections.abc import AsyncIterator # noqa: E402 + + +class IBKRError(Exception): + """Base class for ibkr-provider errors.""" + + +class IBKRAccountMismatchError(IBKRError): + """Flex statement accountId did not match configured upstream id.""" + + +class IBKRProvider: + """Fetches IBKR Flex Activity reports and yields broker-sync Activities. + + The reconciliation step (OpenPositions vs WF-computed qty) is NOT part + of fetch() — it runs at the CLI layer after import, since it needs the + WealthfolioSink to query WF. + """ + + def __init__( + self, + *, + token: str, + query_id: str, + wf_account_id: str, + upstream_account_id: str, + ) -> None: + self._token = token + self._query_id = query_id + self._wf_account_id = wf_account_id + self._upstream_account_id = upstream_account_id + # Stash the parsed response for the reconciliation step. + self._last_response: FlexQueryResponse | None = None + + def accounts(self) -> list[Account]: + return [ + Account( + id=self._wf_account_id, + provider="ibkr", + provider_account_id=self._upstream_account_id, + account_type=AccountType.GIA, + currency="GBP", # FX-aware at trade level; account currency is GBP + ) + ] + + async def close(self) -> None: + # No persistent HTTP client today — ibflex uses requests internally. + return + + async def fetch( + self, + *, + since: datetime | None = None, # noqa: ARG002 (Flex query owns the date range) + before: datetime | None = None, # noqa: ARG002 + ) -> AsyncIterator[Activity]: + from ibflex import client as ib_client + from ibflex import parser as ib_parser + + xml_bytes = ib_client.download(self._token, self._query_id) + response = ib_parser.parse(xml_bytes) + self._last_response = response + + if not response.FlexStatements: + log.warning("ibkr: Flex response had no FlexStatements") + return + + stmt = response.FlexStatements[0] + if str(stmt.accountId) != self._upstream_account_id: + raise IBKRAccountMismatchError( + f"Flex statement.accountId={stmt.accountId!r} does not match " + f"configured IBKR_ACCOUNT_ID_UPSTREAM={self._upstream_account_id!r} " + f"— refusing to ingest" + ) + + for trade in stmt.Trades or []: + yield _map_trade_to_activity(trade, account_id=self._wf_account_id) + + for cash in stmt.CashTransactions or []: + activity = _map_cash_to_activity(cash, account_id=self._wf_account_id) + if activity is not None: + yield activity + + def open_positions(self) -> list[tuple[str, Decimal]]: + """Return ``[(canonical_symbol, position_qty), ...]`` from the most + recent fetch. Used by the reconciliation step. + + Returns ``[]`` if no fetch has been called yet.""" + if self._last_response is None: + return [] + stmt = self._last_response.FlexStatements[0] + out: list[tuple[str, Decimal]] = [] + for pos in stmt.OpenPositions or []: + symbol = canonical_symbol( + str(pos.symbol), + exchange=getattr(pos, "exchange", None), + currency=str(pos.currency), + ) + out.append((symbol, Decimal(str(pos.position)))) + return out +``` + +- [ ] **Step 4: Run and verify all tests pass** + +```bash +poetry run pytest tests/providers/test_ibkr.py -v +``` + +Expected: 10 tests pass. + +- [ ] **Step 5: Type + lint + commit** + +```bash +poetry run mypy broker_sync && poetry run ruff check broker_sync tests +git add broker_sync/providers/ibkr.py tests/providers/test_ibkr.py +git commit -m "ibkr: add IBKRProvider with Flex fetch + account-mismatch guard" +``` + +--- + +## Task 9: `broker-sync ibkr` CLI command + +**Files:** +- Modify: `broker_sync/cli.py` + +- [ ] **Step 1: Read existing `invest_engine` command for pattern** + +```bash +sed -n '140,235p' /home/wizard/code/broker-sync/broker_sync/cli.py +``` + +You're using this as the template — `ibkr` is structurally identical +(provider construction → pipeline → sink → reconciliation). + +- [ ] **Step 2: Add the `ibkr` command** + +In `broker_sync/cli.py`, after the `invest_engine` command, add: + +```python +@app.command("ibkr") +def ibkr( # noqa: PLR0913 + wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), + wf_username: str = typer.Option(..., envvar="WF_USERNAME"), + wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), + wf_session_path: str = typer.Option( + "/data/wealthfolio_session.json", envvar="WF_SESSION_PATH" + ), + ibkr_flex_token: str = typer.Option(..., envvar="IBKR_FLEX_TOKEN"), + ibkr_flex_query_id: str = typer.Option(..., envvar="IBKR_FLEX_QUERY_ID"), + ibkr_account_id: str = typer.Option(..., envvar="IBKR_ACCOUNT_ID"), + ibkr_account_id_upstream: str = typer.Option(..., envvar="IBKR_ACCOUNT_ID_UPSTREAM"), + pushgateway_url: str = typer.Option( + "http://prometheus-prometheus-pushgateway.monitoring:9091/metrics", + envvar="PUSHGATEWAY_URL", + ), + data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"), +) -> None: + """Phase 2c — daily IBKR Flex Web Service → Wealthfolio sync.""" + import time + + from broker_sync.dedup import SyncRecordStore + from broker_sync.metrics import push_pushgateway + from broker_sync.pipeline import sync_provider_to_wealthfolio + from broker_sync.providers.ibkr import IBKRProvider + from broker_sync.sinks.wealthfolio import WealthfolioSink + + _setup_logging() + data = Path(data_dir) + data.mkdir(parents=True, exist_ok=True) + + async def _run() -> None: + sink = WealthfolioSink( + base_url=wf_base_url, + username=wf_username, + password=wf_password, + session_path=wf_session_path, + ) + provider = IBKRProvider( + token=ibkr_flex_token, + query_id=ibkr_flex_query_id, + wf_account_id=ibkr_account_id, + upstream_account_id=ibkr_account_id_upstream, + ) + dedup = SyncRecordStore(data / "sync.db") + try: + if not Path(wf_session_path).exists(): + await sink.login() + result = await sync_provider_to_wealthfolio( + provider=provider, + sink=sink, + dedup=dedup, + ) + + # Reconciliation: broker truth vs WF truth. + wf_qty = await sink.compute_position_qty(ibkr_account_id) + drift_metrics: list[tuple[str, dict[str, str], float]] = [] + for symbol, broker_qty in provider.open_positions(): + drift = broker_qty - wf_qty.get(symbol, Decimal(0)) + drift_metrics.append( + ( + "ibkr_position_drift_shares", + {"symbol": symbol, "account": "ibkr-uk"}, + float(drift), + ) + ) + drift_metrics.append( + ("ibkr_sync_last_success_timestamp_seconds", {}, float(time.time())) + ) + await push_pushgateway("broker-sync-ibkr", drift_metrics, pushgateway_url) + finally: + await sink.close() + await provider.close() + + typer.echo( + f"ibkr: fetched={result.fetched} new={result.new_after_dedup} " + f"imported={result.imported} failed={result.failed}" + ) + if result.failed > 0: + sys.exit(1) + + asyncio.run(_run()) +``` + +Add the `Decimal` import at the top of `cli.py` if missing. + +- [ ] **Step 3: Sanity-check the CLI compiles** + +```bash +poetry run broker-sync --help | grep -i ibkr +``` + +Expected: `ibkr Phase 2c — daily IBKR Flex Web Service → Wealthfolio sync.` + +- [ ] **Step 4: Run mypy + ruff + full pytest** + +```bash +poetry run mypy broker_sync tests && poetry run ruff check . && poetry run pytest -q +``` + +Expected: all clean. + +- [ ] **Step 5: Commit** + +```bash +git add broker_sync/cli.py +git commit -m "cli: add ibkr command (Flex pull + pipeline + reconcile + metrics)" +``` + +--- + +## Task 10: Push, wait for CI, verify image + +**Files:** (none — operational step) + +- [ ] **Step 1: Push to GitHub + Forgejo** + +```bash +git push origin main && git push forgejo main +``` + +- [ ] **Step 2: Wait for GHA CI to complete** + +```bash +until [ "$(gh api 'repos/ViktorBarzin/broker-sync/actions/runs?per_page=1' --jq '.workflow_runs[0].status')" = "completed" ]; do sleep 15; done +gh api 'repos/ViktorBarzin/broker-sync/actions/runs?per_page=1' --jq '.workflow_runs[0] | "\(.head_sha[:8]) \(.conclusion)"' +``` + +Expected: ` success`. + +- [ ] **Step 3: Pull the new image and confirm** + +```bash +docker pull viktorbarzin/broker-sync:latest +docker images viktorbarzin/broker-sync --format '{{.Tag}} {{.CreatedSince}}' +``` + +Expected: `latest` was created within the last few minutes. + +--- + +## Task 11: Vault secrets + WF account creation + +**Files:** (operational — no code changes) + +- [ ] **Step 1: User completes the IBKR Client Portal steps** + +Follow the design's setup checklist Step 1: +- Enable Flex Web Service → copy Token. +- Create Activity Flex Query → copy Query ID. +- Note the account number (e.g., `U12345678`). + +- [ ] **Step 2: Create the Wealthfolio account** + +```bash +WF_BASE="https://wealthfolio.viktorbarzin.me" # adjust if internal-only +WF_PASS=$(vault kv get -field=wf_password secret/broker-sync) +curl -sS -c /tmp/wf-jar -X POST "$WF_BASE/api/v1/auth/login" \ + -H 'Content-Type: application/json' \ + -d "{\"password\":\"$WF_PASS\"}" -o /dev/null +WF_UUID=$(curl -sS -b /tmp/wf-jar -X POST "$WF_BASE/api/v1/accounts" \ + -H 'Content-Type: application/json' \ + -d '{"name":"Interactive Brokers (UK)","accountType":"GIA","currency":"GBP","isActive":true}' \ + | jq -r '.id') +echo "WF account UUID = $WF_UUID" +``` + +Expected: prints a UUID. Note it down for the next step. + +- [ ] **Step 3: Put the 4 IBKR secrets into Vault** + +```bash +vault kv patch secret/broker-sync \ + ibkr_flex_token='' \ + ibkr_flex_query_id='' \ + ibkr_account_id='' \ + ibkr_account_id_upstream='' +``` + +- [ ] **Step 4: Verify the secrets are readable** + +```bash +vault kv get -format=json secret/broker-sync | jq '.data.data | {token: (.ibkr_flex_token[0:6]+"..."), query_id, account_id, account_id_upstream}' +``` + +Expected: all four fields present, token truncated. + +--- + +## Task 12: Terraform CronJob + alerts + +**Files:** +- Modify: `infra/stacks/broker-sync/main.tf` + +- [ ] **Step 1: Open `infra/stacks/broker-sync/main.tf` and find the `trading212` CronJob** + +```bash +grep -n 'kubernetes_cron_job_v1.*trading212\|broker-sync-trading212' /home/wizard/code/infra/stacks/broker-sync/main.tf +``` + +Use it as the template — copy/paste then adjust the diffs. + +- [ ] **Step 2: Add the IBKR CronJob resource** + +After the `trading212` CronJob block, add: + +```hcl +# IBKR Flex Web Service daily sync. Phase 2c deliverable. +resource "kubernetes_cron_job_v1" "ibkr" { + metadata { + name = "broker-sync-ibkr" + namespace = kubernetes_namespace.broker_sync.metadata[0].name + labels = { app = "broker-sync", component = "ibkr" } + } + spec { + schedule = "0 2 * * *" # 02:00 UK + concurrency_policy = "Forbid" + starting_deadline_seconds = 300 + successful_jobs_history_limit = 3 + failed_jobs_history_limit = 5 + job_template { + metadata {} + spec { + backoff_limit = 2 + ttl_seconds_after_finished = 86400 + template { + metadata { + labels = { app = "broker-sync", component = "ibkr" } + } + spec { + restart_policy = "OnFailure" + security_context { + fs_group = 10001 + } + container { + name = "broker-sync" + image = local.broker_sync_image + command = ["broker-sync", "ibkr"] + + env { + name = "BROKER_SYNC_DATA_DIR" + value = "/data" + } + env { + name = "WF_SESSION_PATH" + value = "/data/wealthfolio_session.json" + } + env { + name = "WF_BASE_URL" + value_from { secret_key_ref { name = "broker-sync-secrets"; key = "wf_base_url" } } + } + env { + name = "WF_USERNAME" + value_from { secret_key_ref { name = "broker-sync-secrets"; key = "wf_username" } } + } + env { + name = "WF_PASSWORD" + value_from { secret_key_ref { name = "broker-sync-secrets"; key = "wf_password" } } + } + env { + name = "IBKR_FLEX_TOKEN" + value_from { secret_key_ref { name = "broker-sync-secrets"; key = "ibkr_flex_token" } } + } + env { + name = "IBKR_FLEX_QUERY_ID" + value_from { secret_key_ref { name = "broker-sync-secrets"; key = "ibkr_flex_query_id" } } + } + env { + name = "IBKR_ACCOUNT_ID" + value_from { secret_key_ref { name = "broker-sync-secrets"; key = "ibkr_account_id" } } + } + env { + name = "IBKR_ACCOUNT_ID_UPSTREAM" + value_from { secret_key_ref { name = "broker-sync-secrets"; key = "ibkr_account_id_upstream" } } + } + + volume_mount { + name = "data" + mount_path = "/data" + } + resources { + requests = { cpu = "20m", memory = "128Mi" } + limits = { memory = "256Mi" } + } + } + volume { + name = "data" + persistent_volume_claim { + claim_name = kubernetes_persistent_volume_claim.data_encrypted.metadata[0].name + } + } + } + } + } + } + } + lifecycle { + # KYVERNO_LIFECYCLE_V1: Kyverno admission webhook mutates dns_config with ndots=2 + ignore_changes = [spec[0].job_template[0].spec[0].template[0].spec[0].dns_config] + } +} +``` + +- [ ] **Step 3: Format the terraform** + +```bash +cd /home/wizard/code/infra/stacks/broker-sync && terraform fmt main.tf +``` + +- [ ] **Step 4: Plan** + +```bash +/home/wizard/code/infra/scripts/tg plan 2>&1 | tail -20 +``` + +Expected: `Plan: 1 to add, 0 to change, 0 to destroy.` (the new ibkr CronJob). + +- [ ] **Step 5: Apply** + +```bash +/home/wizard/code/infra/scripts/tg apply --non-interactive 2>&1 | tail -5 +``` + +Expected: `Apply complete! Resources: 1 added, ...`. + +- [ ] **Step 6: Verify the CronJob exists** + +```bash +kubectl -n broker-sync get cronjob broker-sync-ibkr +``` + +Expected: row appears with `SCHEDULE = 0 2 * * *`. + +- [ ] **Step 7: Commit** + +```bash +cd /home/wizard/code/infra +git add stacks/broker-sync/main.tf +git commit -m "broker-sync: add IBKR Flex daily CronJob" +git push origin master +``` + +--- + +## Task 13: Manual smoke run + verification + +**Files:** (none — operational) + +- [ ] **Step 1: Trigger the CronJob manually** + +```bash +kubectl -n broker-sync create job --from=cronjob/broker-sync-ibkr broker-sync-ibkr-smoke-$(date +%s) +``` + +- [ ] **Step 2: Wait for completion + check status** + +```bash +JOB=$(kubectl -n broker-sync get jobs --sort-by=.metadata.creationTimestamp -o name | grep broker-sync-ibkr-smoke | tail -1) +until [ "$(kubectl -n broker-sync get $JOB -o jsonpath='{.status.succeeded}{.status.failed}' 2>/dev/null)" != "" ]; do sleep 5; done +kubectl -n broker-sync get $JOB +``` + +Expected: `STATUS = Complete`. (If `Failed`, check logs in step 3 and debug.) + +- [ ] **Step 3: Inspect the logs** + +```bash +kubectl -n broker-sync logs -l job-name=$(basename $JOB) --tail=200 +``` + +Look for: +- `ibkr: fetched=0 new=0 imported=0 failed=0` (account is empty, so zero + rows is correct). +- A `pushgateway: pushed N metrics` line. +- No tracebacks. + +- [ ] **Step 4: Verify the WF account exists with no activities** + +```bash +WF_PASS=$(vault kv get -field=wf_password secret/broker-sync) +curl -sS -c /tmp/wf-jar -X POST https://wealthfolio.viktorbarzin.me/api/v1/auth/login \ + -H 'Content-Type: application/json' -d "{\"password\":\"$WF_PASS\"}" -o /dev/null +curl -sS -b /tmp/wf-jar https://wealthfolio.viktorbarzin.me/api/v1/accounts | jq '.[] | select(.name=="Interactive Brokers (UK)")' +``` + +Expected: prints the account JSON with the UUID from Task 11 Step 2. + +- [ ] **Step 5: Verify Pushgateway received the metrics** + +```bash +kubectl -n monitoring port-forward svc/prometheus-prometheus-pushgateway 9091:9091 & +sleep 2 +curl -sS http://localhost:9091/metrics | grep -E 'ibkr_(position_drift_shares|sync_last_success)' +kill %1 +``` + +Expected: `ibkr_sync_last_success_timestamp_seconds` shows a recent +unix timestamp. `ibkr_position_drift_shares` may be absent if there +were no open positions today, which is correct for an empty account. + +--- + +## Task 14: Provider docs (for future-you) + +**Files:** +- Create: `docs/providers/ibkr.md` + +- [ ] **Step 1: Write the production-facing provider doc** + +Create `docs/providers/ibkr.md`: + +```markdown +# Provider: Interactive Brokers (IBKR Flex Web Service) + +Pulls a daily Activity Flex Query via the `ibflex` library, maps Trades + +CashTransactions to broker-sync Activities, and reconciles broker-side +OpenPositions against WF-computed quantities. + +## When this runs +- K8s CronJob `broker-sync-ibkr` in the `broker-sync` namespace, daily 02:00 UK. +- Manual: `kubectl -n broker-sync create job --from=cronjob/broker-sync-ibkr broker-sync-ibkr-manual-1`. + +## Secrets (Vault `secret/broker-sync`) + +| Key | Description | +|---|---| +| `ibkr_flex_token` | Flex Web Service token (1-year validity, rotate via IBKR Client Portal) | +| `ibkr_flex_query_id` | Activity Flex Query ID (5-7 digit number) | +| `ibkr_account_id` | Wealthfolio account UUID for "Interactive Brokers (UK)" | +| `ibkr_account_id_upstream` | IBKR-side account number (e.g., `U12345678`) — guards against wrong-account ingestion | + +## Flex Query design + +| Section | Fields used | +|---|---| +| Account Information | accountId | +| Trades | tradeID, tradeDate, tradeTime, symbol, buySell, quantity, tradePrice, currency, ibCommission, assetCategory | +| Cash Transactions | transactionID, dateTime, type, amount, currency, description | +| Open Positions | symbol, position, markPrice, currency, assetCategory | +| Securities Information | symbol, description, conid | + +Date range: `Last Business Day` for daily incremental. Switch to +`Year to Date` for one-time backfills only. + +## Cash type mapping + +| IBKR Flex type | broker-sync ActivityType | +|---|---| +| Dividends | DIVIDEND | +| Withholding Tax | FEE | +| Broker Interest Received | DIVIDEND | +| Broker Interest Paid | FEE | +| Commission Adjustments | FEE | +| Other Fees | FEE | +| Deposits & Withdrawals | DEPOSIT (amount > 0) or WITHDRAWAL (amount < 0) | +| anything else | skipped + WARNING logged (refusal-to-guess) | + +## External IDs (dedup keys) +- Trades: `ibkr:trade:` +- Cash: `ibkr:cash:` + +Both are stable across re-runs — the `dedup.SyncRecordStore` rejects +already-seen IDs. + +## Symbol canonicalisation +LSE-listed GBP instruments get a `.L` suffix (Wealthfolio convention). +US instruments and anything already suffixed pass through unchanged. + +## Position reconciliation +Each run pushes to Pushgateway: +- `ibkr_position_drift_shares{symbol, account}` — broker_qty − wf_qty per asset +- `ibkr_sync_last_success_timestamp_seconds` — unix timestamp + +Alerts (defined in monitoring stack — TBD until first non-zero drift): +- `IBKRPositionDrift{symbol}` — `|drift| > 0.01` for >24h, Slack `#security`. +- `IBKRSyncStale` — timestamp > 36h old. +- `IBKRFlexTokenExpired` — Loki rule on the "code 1003" log line. + +## Token rotation +Flex tokens expire after 1 year. When the cron starts failing with +`ResponseCodeError(code=1003)`: +1. Sign in to IBKR Client Portal → Reports → Settings → Flex Web Service → regenerate token. +2. `vault kv patch secret/broker-sync ibkr_flex_token=''`. +3. ExternalSecrets controller picks it up within 15 min; no manual restart needed. + +## Spec / plan +Design: `docs/specs/2026-05-26-ibkr-ingest-design.md` +Implementation plan: `docs/plans/2026-05-26-ibkr-flex-ingestion.md` +``` + +- [ ] **Step 2: Commit** + +```bash +cd /home/wizard/code/broker-sync +git add docs/providers/ibkr.md +git commit -m "docs: add IBKR provider runbook" +git push origin main && git push forgejo main +``` + +--- + +## Task 15: Acceptance — 7-day soak + +**Files:** (none — observational) + +- [ ] **Step 1: Set a 7-day calendar reminder to re-check** + +Set a reminder for `2026-06-02` (today + 7 days). + +- [ ] **Step 2: On 2026-06-02, run the acceptance check** + +```bash +# Last 7 days of CronJob outcomes +kubectl -n broker-sync get jobs --sort-by=.metadata.creationTimestamp -o wide \ + | grep broker-sync-ibkr-2 + +# Pushgateway should have a recent success timestamp +kubectl -n monitoring port-forward svc/prometheus-prometheus-pushgateway 9091:9091 & +sleep 2 +curl -sS http://localhost:9091/metrics | grep ibkr_sync_last_success +kill %1 + +# Pushgateway drift should be zero on all symbols (account still empty, or +# else broker matches WF) +curl -sS http://localhost:9091/metrics | grep ibkr_position_drift_shares +``` + +Expected: +- ≥6 of the 7 nightly runs `Complete`. +- `ibkr_sync_last_success_timestamp_seconds` within the last 36 hours. +- `ibkr_position_drift_shares` all zero. + +- [ ] **Step 3: If all green, close the implementation plan** + +Mark this plan file as `Status: Done` at the top and commit. + +If not green, file beads tasks for the specific issues and revisit. + +--- + +## Self-review notes + +- **Spec coverage**: every section of `docs/specs/2026-05-26-ibkr-ingest-design.md` + maps to one or more tasks (deps→1, fixtures→2, metrics→3, sink helper→4, + symbol canon→5, trade map→6, cash map→7, provider→8, CLI→9, image→10, + setup→11, CronJob→12, smoke→13, docs→14, soak→15). +- **Placeholder scan**: no `TBD` in the plan body. The doc file + `docs/providers/ibkr.md` includes one explicit TBD about + PrometheusRule definitions — that's intentional, deferred to the + monitoring stack work (out-of-scope here; first non-zero drift event + will prompt the alert PR). +- **Type consistency**: `IBKRProvider.fetch` is `AsyncIterator[Activity]` + throughout. `compute_position_qty` returns `dict[str, Decimal]` in + both the sink and the CLI consumer. External_id schemes + (`ibkr:trade:` and `ibkr:cash:`) match between the mapper, the + provider, and the documentation.