broker-sync/broker_sync/cli.py
Viktor Barzin a190875f63 Add finance_mysql provider + CLI for historical backfill
finance.position (171 rows, 2020-06-07 to 2025-12-19) is the only source
of InvestEngine + Schwab trade history pre-dating the broker-sync project.
This provider reads it once and pushes every row into the correct WF
account (.L tickers → IE ISA, others → Schwab).

Dedup: external_id = 'finance-mysql:position:<PK>' — idempotent on re-run.
Auth: aiomysql as MySQL root (user-authorized) against the standalone
mysql:8.4 in-cluster service.

New CLI: broker-sync finance-mysql-import
New tests: 5 unit tests covering route, symbol normalise, BUY/SELL
detection.

poetry run pytest -q   →  114 passed, 1 skipped
poetry run mypy        →  clean (aiomysql shielded with type: ignore)
poetry run ruff check  →  clean
2026-04-17 22:38:21 +00:00

401 lines
14 KiB
Python

from __future__ import annotations
import asyncio
import json
import logging
import os
import sys
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import TYPE_CHECKING
import typer
if TYPE_CHECKING:
from broker_sync.models import Account
app = typer.Typer(
help="broker-sync: pull brokerage activity into Wealthfolio",
# CRITICAL: rich tracebacks print all local variables on crash, which
# includes env-sourced credentials (WF_PASSWORD, T212_API_KEYS_JSON).
# Kubernetes pod logs are world-readable — leaking creds there is a
# security incident. Plain tracebacks only.
pretty_exceptions_enable=False,
)
@app.command("version")
def version() -> None:
"""Print version and exit — used by the no-op Phase 0 CronJob as a liveness check."""
from broker_sync import __version__
typer.echo(f"broker-sync {__version__}")
@app.command("auth-spike")
def auth_spike(
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL", help="Wealthfolio base URL"),
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"),
) -> None:
"""Phase 0.5 — prove end-to-end auth against live Wealthfolio."""
from broker_sync.sinks.wealthfolio import WealthfolioSink
async def _run() -> None:
sink = WealthfolioSink(
base_url=wf_base_url,
username=wf_username,
password=wf_password,
session_path=session_path,
)
try:
await sink.login()
accounts = await sink.list_accounts()
typer.echo(f"Logged in. {len(accounts)} account(s) visible.")
finally:
await sink.close()
try:
asyncio.run(_run())
except Exception as e:
typer.echo(f"auth-spike failed: {e}", err=True)
sys.exit(1)
@app.command("trading212")
def trading212(
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"),
t212_api_keys_json: str = typer.Option(..., envvar="T212_API_KEYS_JSON"),
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
mode: str = typer.Option("steady", help="steady = last-7-days; backfill = full history"),
) -> None:
"""Phase 1 — sync Trading212 accounts into Wealthfolio.
T212_API_KEYS_JSON is a JSON array of
{id, name, account_type, currency, api_key}
objects — one entry per T212 account (ISA, Invest).
"""
from broker_sync.dedup import SyncRecordStore
from broker_sync.pipeline import sync_provider_to_wealthfolio
from broker_sync.providers.trading212 import Trading212Provider
from broker_sync.sinks.wealthfolio import WealthfolioSink
_setup_logging()
accounts = _parse_t212_accounts(t212_api_keys_json)
if not accounts:
typer.echo("No accounts configured in T212_API_KEYS_JSON — nothing to do.", err=True)
sys.exit(2)
data = Path(data_dir)
checkpoint_dir = data / "watermarks"
checkpoint_dir.mkdir(parents=True, exist_ok=True)
if mode == "steady":
since: datetime | None = datetime.now(UTC) - timedelta(days=7)
elif mode == "backfill":
since = None
else:
typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True)
sys.exit(2)
async def _run() -> None:
sink = WealthfolioSink(
base_url=wf_base_url,
username=wf_username,
password=wf_password,
session_path=wf_session_path,
)
provider = Trading212Provider(
accounts=accounts,
checkpoint_dir=checkpoint_dir,
)
dedup = SyncRecordStore(data / "sync.db")
try:
# Ensure cookie upfront so a first-run with no session file still works.
if not Path(wf_session_path).exists():
await sink.login()
result = await sync_provider_to_wealthfolio(
provider=provider,
sink=sink,
dedup=dedup,
since=since,
)
finally:
await provider.close()
await sink.close()
typer.echo(f"trading212: fetched={result.fetched} "
f"new={result.new_after_dedup} "
f"imported={result.imported} "
f"failed={result.failed}")
if result.failed > 0:
sys.exit(1)
asyncio.run(_run())
@app.command("invest-engine")
def invest_engine(
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"),
ie_bearer_token: str = typer.Option(..., envvar="IE_BEARER_TOKEN"),
ie_token_expires_at: str = typer.Option(..., envvar="IE_TOKEN_EXPIRES_AT"),
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
mode: str = typer.Option("steady", help="steady = last-30-days; backfill = full history"),
) -> None:
"""Phase 2b — sync InvestEngine activity into Wealthfolio via Bearer token.
The Bearer token is pasted from browser devtools by Viktor (MFA blocks
scripted login). IE_TOKEN_EXPIRES_AT is the ISO-8601 timestamp he sets
when he pastes it; we fail fast with exit=2 if that moment has passed
so a CronJob that runs past the refresh window doesn't burn a request
on a known-dead token.
"""
from broker_sync.dedup import SyncRecordStore
from broker_sync.pipeline import sync_provider_to_wealthfolio
from broker_sync.providers.invest_engine import (
InvestEngineProvider,
InvestEngineTokenExpiredError,
)
from broker_sync.sinks.wealthfolio import WealthfolioSink
_setup_logging()
try:
expires_at = datetime.fromisoformat(ie_token_expires_at)
except ValueError as e:
typer.echo(f"IE_TOKEN_EXPIRES_AT not a valid ISO-8601 timestamp: {e}", err=True)
sys.exit(2)
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=UTC)
if expires_at <= datetime.now(UTC):
typer.echo(
f"InvestEngine token expired at {expires_at.isoformat()}"
f"Viktor must paste a fresh Bearer into Vault.",
err=True,
)
sys.exit(2)
data = Path(data_dir)
data.mkdir(parents=True, exist_ok=True)
if mode == "steady":
since: datetime | None = datetime.now(UTC) - timedelta(days=30)
elif mode == "backfill":
since = None
else:
typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True)
sys.exit(2)
async def _run() -> None:
sink = WealthfolioSink(
base_url=wf_base_url,
username=wf_username,
password=wf_password,
session_path=wf_session_path,
)
provider = InvestEngineProvider(
bearer_token=ie_bearer_token,
token_expires_at=expires_at,
)
dedup = SyncRecordStore(data / "sync.db")
try:
if not Path(wf_session_path).exists():
await sink.login()
result = await sync_provider_to_wealthfolio(
provider=provider,
sink=sink,
dedup=dedup,
since=since,
)
except InvestEngineTokenExpiredError as e:
typer.echo(f"InvestEngine auth failed: {e}", err=True)
sys.exit(2)
finally:
await provider.close()
await sink.close()
typer.echo(f"invest-engine: fetched={result.fetched} "
f"new={result.new_after_dedup} "
f"imported={result.imported} "
f"failed={result.failed}")
if result.failed > 0:
sys.exit(1)
asyncio.run(_run())
@app.command("finance-mysql-import")
def finance_mysql_import(
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
wf_session_path: str = typer.Option("/data/wealthfolio_session.json",
envvar="WF_SESSION_PATH"),
db_host: str = typer.Option(..., envvar="FINANCE_DB_HOST"),
db_port: int = typer.Option(3306, envvar="FINANCE_DB_PORT"),
db_user: str = typer.Option(..., envvar="FINANCE_DB_USER"),
db_password: str = typer.Option(..., envvar="FINANCE_DB_PASSWORD"),
db_name: str = typer.Option("finance", envvar="FINANCE_DB_NAME"),
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
) -> None:
"""One-shot backfill: read the retired finance app's MySQL position table
and push every row into the correct Wealthfolio account (IE for .L
tickers, Schwab for US tickers). Idempotent via dedup."""
from broker_sync.dedup import SyncRecordStore
from broker_sync.pipeline import sync_provider_to_wealthfolio
from broker_sync.providers.finance_mysql import (
FinanceMySQLCreds,
FinanceMySQLProvider,
)
from broker_sync.sinks.wealthfolio import WealthfolioSink
_setup_logging()
data = Path(data_dir)
data.mkdir(parents=True, exist_ok=True)
async def _run() -> None:
sink = WealthfolioSink(
base_url=wf_base_url,
username=wf_username,
password=wf_password,
session_path=wf_session_path,
)
provider = FinanceMySQLProvider(
FinanceMySQLCreds(
host=db_host,
port=db_port,
user=db_user,
password=db_password,
database=db_name,
))
dedup = SyncRecordStore(data / "sync.db")
try:
if not Path(wf_session_path).exists():
await sink.login()
result = await sync_provider_to_wealthfolio(
provider=provider,
sink=sink,
dedup=dedup,
)
finally:
await sink.close()
typer.echo(f"finance-mysql: fetched={result.fetched} "
f"new={result.new_after_dedup} "
f"imported={result.imported} "
f"failed={result.failed}")
if result.failed > 0:
sys.exit(1)
asyncio.run(_run())
@app.command("imap-ingest")
def imap_ingest(
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
wf_session_path: str = typer.Option("/data/wealthfolio_session.json",
envvar="WF_SESSION_PATH"),
imap_host: str = typer.Option(..., envvar="IMAP_HOST"),
imap_user: str = typer.Option(..., envvar="IMAP_USER"),
imap_password: str = typer.Option(..., envvar="IMAP_PASSWORD"),
imap_directory: str = typer.Option("INBOX", envvar="IMAP_DIRECTORY"),
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
) -> None:
"""Phase 2/3 — ingest InvestEngine + Schwab confirmation emails via IMAP.
Walks the mailbox, routes each message by `From:` sender domain to the
matching parser, pushes any resulting activities through the shared
pipeline (dedup → Wealthfolio CSV-free JSON import).
"""
from broker_sync.dedup import SyncRecordStore
from broker_sync.pipeline import sync_provider_to_wealthfolio
from broker_sync.providers.imap import ImapCreds, ImapProvider
from broker_sync.sinks.wealthfolio import WealthfolioSink
_setup_logging()
data = Path(data_dir)
data.mkdir(parents=True, exist_ok=True)
async def _run() -> None:
sink = WealthfolioSink(
base_url=wf_base_url,
username=wf_username,
password=wf_password,
session_path=wf_session_path,
)
provider = ImapProvider(
ImapCreds(
host=imap_host,
user=imap_user,
password=imap_password,
directory=imap_directory,
))
dedup = SyncRecordStore(data / "sync.db")
try:
if not Path(wf_session_path).exists():
await sink.login()
result = await sync_provider_to_wealthfolio(
provider=provider,
sink=sink,
dedup=dedup,
)
finally:
await sink.close()
typer.echo(f"imap-ingest: fetched={result.fetched} "
f"new={result.new_after_dedup} "
f"imported={result.imported} "
f"failed={result.failed}")
if result.failed > 0:
sys.exit(1)
asyncio.run(_run())
def _setup_logging() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
def _parse_t212_accounts(raw: str) -> list[tuple[Account, str]]:
"""Parse T212_API_KEYS_JSON into (Account, api_key) pairs."""
from broker_sync.models import Account, AccountType
parsed = json.loads(raw)
if not isinstance(parsed, list):
raise typer.BadParameter("T212_API_KEYS_JSON must be a JSON array")
pairs: list[tuple[Account, str]] = []
for entry in parsed:
if not isinstance(entry, dict):
raise typer.BadParameter("Each T212 entry must be an object")
try:
account = Account(
id=entry["id"],
name=entry["name"],
account_type=AccountType(entry["account_type"]),
currency=entry.get("currency", "GBP"),
provider="trading212",
)
api_key = entry["api_key"]
except KeyError as e:
raise typer.BadParameter(f"T212 entry missing required key: {e}") from None
pairs.append((account, api_key))
return pairs
def main() -> None:
app()
if __name__ == "__main__":
os.environ.setdefault("COLUMNS", "120")
main()