cli: add 'broker-sync ibkr' command (Flex pull + import + reconcile + metrics)
Some checks failed
CI / test (push) Waiting to run
CI / build (push) Blocked by required conditions
CI / deploy (push) Blocked by required conditions
ci/woodpecker/push/build Pipeline was canceled

This commit is contained in:
Viktor Barzin 2026-05-26 22:29:44 +00:00
parent e83c5a0a8f
commit a4dab03bc5
2 changed files with 96 additions and 0 deletions

View file

@ -230,6 +230,100 @@ def invest_engine(
asyncio.run(_run())
@app.command("ibkr")
def ibkr(
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
wf_session_path: str = typer.Option(
"/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"
),
ibkr_flex_token: str = typer.Option(..., envvar="IBKR_FLEX_TOKEN"),
ibkr_flex_query_id: str = typer.Option(..., envvar="IBKR_FLEX_QUERY_ID"),
ibkr_account_id: str = typer.Option(..., envvar="IBKR_ACCOUNT_ID"),
ibkr_account_id_upstream: str = typer.Option(..., envvar="IBKR_ACCOUNT_ID_UPSTREAM"),
pushgateway_url: str = typer.Option(
"http://prometheus-prometheus-pushgateway.monitoring:9091/metrics",
envvar="PUSHGATEWAY_URL",
),
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
) -> None:
"""Phase 2c — daily IBKR Flex Web Service → Wealthfolio sync.
Pulls an Activity Flex Query (Trades + Cash + OpenPositions), maps to
broker-sync Activities, pushes through the shared pipeline, then
reconciles broker-reported OpenPositions against WF-computed quantities
and publishes a Pushgateway drift metric.
"""
import time
from decimal import Decimal
from broker_sync.dedup import SyncRecordStore
from broker_sync.metrics import push_pushgateway
from broker_sync.pipeline import sync_provider_to_wealthfolio
from broker_sync.providers.ibkr import IBKRAccountMismatchError, IBKRProvider
from broker_sync.sinks.wealthfolio import WealthfolioSink
_setup_logging()
data = Path(data_dir)
data.mkdir(parents=True, exist_ok=True)
async def _run() -> None:
sink = WealthfolioSink(
base_url=wf_base_url,
username=wf_username,
password=wf_password,
session_path=wf_session_path,
)
provider = IBKRProvider(
token=ibkr_flex_token,
query_id=ibkr_flex_query_id,
wf_account_id=ibkr_account_id,
upstream_account_id=ibkr_account_id_upstream,
)
dedup = SyncRecordStore(data / "sync.db")
try:
if not Path(wf_session_path).exists():
await sink.login()
result = await sync_provider_to_wealthfolio(
provider=provider,
sink=sink,
dedup=dedup,
)
# Reconciliation: broker truth vs WF truth.
wf_qty = await sink.compute_position_qty(ibkr_account_id)
drift_metrics: list[tuple[str, dict[str, str], float]] = []
for symbol, broker_qty in provider.open_positions():
drift = broker_qty - wf_qty.get(symbol, Decimal(0))
drift_metrics.append(
(
"ibkr_position_drift_shares",
{"symbol": symbol, "account": "ibkr-uk"},
float(drift),
)
)
drift_metrics.append(
("ibkr_sync_last_success_timestamp_seconds", {}, float(time.time()))
)
await push_pushgateway("broker-sync-ibkr", drift_metrics, pushgateway_url)
except IBKRAccountMismatchError as e:
typer.echo(f"IBKR: {e}", err=True)
sys.exit(2)
finally:
await provider.close()
await sink.close()
typer.echo(
f"ibkr: fetched={result.fetched} new={result.new_after_dedup} "
f"imported={result.imported} failed={result.failed}"
)
if result.failed > 0:
sys.exit(1)
asyncio.run(_run())
@app.command("finance-mysql-import")
def finance_mysql_import(
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),

View file

@ -172,6 +172,8 @@ class IBKRProvider:
WealthfolioSink is available to query WF.
"""
name = "ibkr"
def __init__(
self,
*,