From a4dab03bc5b0b8cb9500e39e2cb660858214a4c6 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Tue, 26 May 2026 22:29:44 +0000 Subject: [PATCH] cli: add 'broker-sync ibkr' command (Flex pull + import + reconcile + metrics) --- broker_sync/cli.py | 94 +++++++++++++++++++++++++++++++++++ broker_sync/providers/ibkr.py | 2 + 2 files changed, 96 insertions(+) diff --git a/broker_sync/cli.py b/broker_sync/cli.py index 6e08eb8..cef7526 100644 --- a/broker_sync/cli.py +++ b/broker_sync/cli.py @@ -230,6 +230,100 @@ def invest_engine( asyncio.run(_run()) +@app.command("ibkr") +def ibkr( + wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), + wf_username: str = typer.Option(..., envvar="WF_USERNAME"), + wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), + wf_session_path: str = typer.Option( + "/data/wealthfolio_session.json", envvar="WF_SESSION_PATH" + ), + ibkr_flex_token: str = typer.Option(..., envvar="IBKR_FLEX_TOKEN"), + ibkr_flex_query_id: str = typer.Option(..., envvar="IBKR_FLEX_QUERY_ID"), + ibkr_account_id: str = typer.Option(..., envvar="IBKR_ACCOUNT_ID"), + ibkr_account_id_upstream: str = typer.Option(..., envvar="IBKR_ACCOUNT_ID_UPSTREAM"), + pushgateway_url: str = typer.Option( + "http://prometheus-prometheus-pushgateway.monitoring:9091/metrics", + envvar="PUSHGATEWAY_URL", + ), + data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"), +) -> None: + """Phase 2c — daily IBKR Flex Web Service → Wealthfolio sync. + + Pulls an Activity Flex Query (Trades + Cash + OpenPositions), maps to + broker-sync Activities, pushes through the shared pipeline, then + reconciles broker-reported OpenPositions against WF-computed quantities + and publishes a Pushgateway drift metric. + """ + import time + from decimal import Decimal + + from broker_sync.dedup import SyncRecordStore + from broker_sync.metrics import push_pushgateway + from broker_sync.pipeline import sync_provider_to_wealthfolio + from broker_sync.providers.ibkr import IBKRAccountMismatchError, IBKRProvider + from broker_sync.sinks.wealthfolio import WealthfolioSink + + _setup_logging() + data = Path(data_dir) + data.mkdir(parents=True, exist_ok=True) + + async def _run() -> None: + sink = WealthfolioSink( + base_url=wf_base_url, + username=wf_username, + password=wf_password, + session_path=wf_session_path, + ) + provider = IBKRProvider( + token=ibkr_flex_token, + query_id=ibkr_flex_query_id, + wf_account_id=ibkr_account_id, + upstream_account_id=ibkr_account_id_upstream, + ) + dedup = SyncRecordStore(data / "sync.db") + try: + if not Path(wf_session_path).exists(): + await sink.login() + result = await sync_provider_to_wealthfolio( + provider=provider, + sink=sink, + dedup=dedup, + ) + + # Reconciliation: broker truth vs WF truth. + wf_qty = await sink.compute_position_qty(ibkr_account_id) + drift_metrics: list[tuple[str, dict[str, str], float]] = [] + for symbol, broker_qty in provider.open_positions(): + drift = broker_qty - wf_qty.get(symbol, Decimal(0)) + drift_metrics.append( + ( + "ibkr_position_drift_shares", + {"symbol": symbol, "account": "ibkr-uk"}, + float(drift), + ) + ) + drift_metrics.append( + ("ibkr_sync_last_success_timestamp_seconds", {}, float(time.time())) + ) + await push_pushgateway("broker-sync-ibkr", drift_metrics, pushgateway_url) + except IBKRAccountMismatchError as e: + typer.echo(f"IBKR: {e}", err=True) + sys.exit(2) + finally: + await provider.close() + await sink.close() + + typer.echo( + f"ibkr: fetched={result.fetched} new={result.new_after_dedup} " + f"imported={result.imported} failed={result.failed}" + ) + if result.failed > 0: + sys.exit(1) + + asyncio.run(_run()) + + @app.command("finance-mysql-import") def finance_mysql_import( wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), diff --git a/broker_sync/providers/ibkr.py b/broker_sync/providers/ibkr.py index f156a3f..741c79a 100644 --- a/broker_sync/providers/ibkr.py +++ b/broker_sync/providers/ibkr.py @@ -172,6 +172,8 @@ class IBKRProvider: WealthfolioSink is available to query WF. """ + name = "ibkr" + def __init__( self, *,