from __future__ import annotations import asyncio import json import logging import os import sys from datetime import UTC, datetime, timedelta from pathlib import Path from typing import TYPE_CHECKING import typer if TYPE_CHECKING: from broker_sync.models import Account app = typer.Typer( help="broker-sync: pull brokerage activity into Wealthfolio", # CRITICAL: rich tracebacks print all local variables on crash, which # includes env-sourced credentials (WF_PASSWORD, T212_API_KEYS_JSON). # Kubernetes pod logs are world-readable — leaking creds there is a # security incident. Plain tracebacks only. pretty_exceptions_enable=False, ) @app.command("version") def version() -> None: """Print version and exit — used by the no-op Phase 0 CronJob as a liveness check.""" from broker_sync import __version__ typer.echo(f"broker-sync {__version__}") @app.command("auth-spike") def auth_spike( wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL", help="Wealthfolio base URL"), wf_username: str = typer.Option(..., envvar="WF_USERNAME"), wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"), ) -> None: """Phase 0.5 — prove end-to-end auth against live Wealthfolio.""" from broker_sync.sinks.wealthfolio import WealthfolioSink async def _run() -> None: sink = WealthfolioSink( base_url=wf_base_url, username=wf_username, password=wf_password, session_path=session_path, ) try: await sink.login() accounts = await sink.list_accounts() typer.echo(f"Logged in. {len(accounts)} account(s) visible.") finally: await sink.close() try: asyncio.run(_run()) except Exception as e: typer.echo(f"auth-spike failed: {e}", err=True) sys.exit(1) @app.command("trading212") def trading212( wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), wf_username: str = typer.Option(..., envvar="WF_USERNAME"), wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"), t212_api_keys_json: str = typer.Option(..., envvar="T212_API_KEYS_JSON"), data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"), mode: str = typer.Option("steady", help="steady = last-7-days; backfill = full history"), ) -> None: """Phase 1 — sync Trading212 accounts into Wealthfolio. T212_API_KEYS_JSON is a JSON array of {id, name, account_type, currency, api_key} objects — one entry per T212 account (ISA, Invest). """ from broker_sync.dedup import SyncRecordStore from broker_sync.pipeline import sync_provider_to_wealthfolio from broker_sync.providers.trading212 import Trading212Provider from broker_sync.sinks.wealthfolio import WealthfolioSink _setup_logging() accounts = _parse_t212_accounts(t212_api_keys_json) if not accounts: typer.echo("No accounts configured in T212_API_KEYS_JSON — nothing to do.", err=True) sys.exit(2) data = Path(data_dir) checkpoint_dir = data / "watermarks" checkpoint_dir.mkdir(parents=True, exist_ok=True) if mode == "steady": since: datetime | None = datetime.now(UTC) - timedelta(days=7) elif mode == "backfill": since = None else: typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True) sys.exit(2) async def _run() -> None: sink = WealthfolioSink( base_url=wf_base_url, username=wf_username, password=wf_password, session_path=wf_session_path, ) provider = Trading212Provider( accounts=accounts, checkpoint_dir=checkpoint_dir, ) dedup = SyncRecordStore(data / "sync.db") try: # Ensure cookie upfront so a first-run with no session file still works. if not Path(wf_session_path).exists(): await sink.login() result = await sync_provider_to_wealthfolio( provider=provider, sink=sink, dedup=dedup, since=since, ) finally: await provider.close() await sink.close() typer.echo(f"trading212: fetched={result.fetched} " f"new={result.new_after_dedup} " f"imported={result.imported} " f"failed={result.failed}") if result.failed > 0: sys.exit(1) asyncio.run(_run()) @app.command("invest-engine") def invest_engine( wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), wf_username: str = typer.Option(..., envvar="WF_USERNAME"), wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"), ie_bearer_token: str = typer.Option(..., envvar="IE_BEARER_TOKEN"), ie_token_expires_at: str = typer.Option(..., envvar="IE_TOKEN_EXPIRES_AT"), data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"), mode: str = typer.Option("steady", help="steady = last-30-days; backfill = full history"), ) -> None: """Phase 2b — sync InvestEngine activity into Wealthfolio via Bearer token. The Bearer token is pasted from browser devtools by Viktor (MFA blocks scripted login). IE_TOKEN_EXPIRES_AT is the ISO-8601 timestamp he sets when he pastes it; we fail fast with exit=2 if that moment has passed so a CronJob that runs past the refresh window doesn't burn a request on a known-dead token. """ from broker_sync.dedup import SyncRecordStore from broker_sync.pipeline import sync_provider_to_wealthfolio from broker_sync.providers.invest_engine import ( InvestEngineProvider, InvestEngineTokenExpiredError, ) from broker_sync.sinks.wealthfolio import WealthfolioSink _setup_logging() try: expires_at = datetime.fromisoformat(ie_token_expires_at) except ValueError as e: typer.echo(f"IE_TOKEN_EXPIRES_AT not a valid ISO-8601 timestamp: {e}", err=True) sys.exit(2) if expires_at.tzinfo is None: expires_at = expires_at.replace(tzinfo=UTC) if expires_at <= datetime.now(UTC): typer.echo( f"InvestEngine token expired at {expires_at.isoformat()} — " f"Viktor must paste a fresh Bearer into Vault.", err=True, ) sys.exit(2) data = Path(data_dir) data.mkdir(parents=True, exist_ok=True) if mode == "steady": since: datetime | None = datetime.now(UTC) - timedelta(days=30) elif mode == "backfill": since = None else: typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True) sys.exit(2) async def _run() -> None: sink = WealthfolioSink( base_url=wf_base_url, username=wf_username, password=wf_password, session_path=wf_session_path, ) provider = InvestEngineProvider( bearer_token=ie_bearer_token, token_expires_at=expires_at, ) dedup = SyncRecordStore(data / "sync.db") try: if not Path(wf_session_path).exists(): await sink.login() result = await sync_provider_to_wealthfolio( provider=provider, sink=sink, dedup=dedup, since=since, ) except InvestEngineTokenExpiredError as e: typer.echo(f"InvestEngine auth failed: {e}", err=True) sys.exit(2) finally: await provider.close() await sink.close() typer.echo(f"invest-engine: fetched={result.fetched} " f"new={result.new_after_dedup} " f"imported={result.imported} " f"failed={result.failed}") if result.failed > 0: sys.exit(1) asyncio.run(_run()) @app.command("finance-mysql-import") def finance_mysql_import( wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), wf_username: str = typer.Option(..., envvar="WF_USERNAME"), wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"), db_host: str = typer.Option(..., envvar="FINANCE_DB_HOST"), db_port: int = typer.Option(3306, envvar="FINANCE_DB_PORT"), db_user: str = typer.Option(..., envvar="FINANCE_DB_USER"), db_password: str = typer.Option(..., envvar="FINANCE_DB_PASSWORD"), db_name: str = typer.Option("finance", envvar="FINANCE_DB_NAME"), data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"), ) -> None: """One-shot backfill: read the retired finance app's MySQL position table and push every row into the correct Wealthfolio account (IE for .L tickers, Schwab for US tickers). Idempotent via dedup.""" from broker_sync.dedup import SyncRecordStore from broker_sync.pipeline import sync_provider_to_wealthfolio from broker_sync.providers.finance_mysql import ( FinanceMySQLCreds, FinanceMySQLProvider, ) from broker_sync.sinks.wealthfolio import WealthfolioSink _setup_logging() data = Path(data_dir) data.mkdir(parents=True, exist_ok=True) async def _run() -> None: sink = WealthfolioSink( base_url=wf_base_url, username=wf_username, password=wf_password, session_path=wf_session_path, ) provider = FinanceMySQLProvider( FinanceMySQLCreds( host=db_host, port=db_port, user=db_user, password=db_password, database=db_name, )) dedup = SyncRecordStore(data / "sync.db") try: if not Path(wf_session_path).exists(): await sink.login() result = await sync_provider_to_wealthfolio( provider=provider, sink=sink, dedup=dedup, ) finally: await sink.close() typer.echo(f"finance-mysql: fetched={result.fetched} " f"new={result.new_after_dedup} " f"imported={result.imported} " f"failed={result.failed}") if result.failed > 0: sys.exit(1) asyncio.run(_run()) @app.command("imap-ingest") def imap_ingest( wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), wf_username: str = typer.Option(..., envvar="WF_USERNAME"), wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"), imap_host: str = typer.Option(..., envvar="IMAP_HOST"), imap_user: str = typer.Option(..., envvar="IMAP_USER"), imap_password: str = typer.Option(..., envvar="IMAP_PASSWORD"), imap_directory: str = typer.Option("INBOX", envvar="IMAP_DIRECTORY"), data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"), ) -> None: """Phase 2/3 — ingest InvestEngine + Schwab confirmation emails via IMAP. Walks the mailbox, routes each message by `From:` sender domain to the matching parser, pushes any resulting activities through the shared pipeline (dedup → Wealthfolio CSV-free JSON import). """ from broker_sync.dedup import SyncRecordStore from broker_sync.pipeline import sync_provider_to_wealthfolio from broker_sync.providers.imap import ImapCreds, ImapProvider from broker_sync.sinks.wealthfolio import WealthfolioSink _setup_logging() data = Path(data_dir) data.mkdir(parents=True, exist_ok=True) async def _run() -> None: sink = WealthfolioSink( base_url=wf_base_url, username=wf_username, password=wf_password, session_path=wf_session_path, ) provider = ImapProvider( ImapCreds( host=imap_host, user=imap_user, password=imap_password, directory=imap_directory, )) dedup = SyncRecordStore(data / "sync.db") try: if not Path(wf_session_path).exists(): await sink.login() result = await sync_provider_to_wealthfolio( provider=provider, sink=sink, dedup=dedup, ) finally: await sink.close() typer.echo(f"imap-ingest: fetched={result.fetched} " f"new={result.new_after_dedup} " f"imported={result.imported} " f"failed={result.failed}") if result.failed > 0: sys.exit(1) asyncio.run(_run()) @app.command("fidelity-seed") def fidelity_seed( out: str = typer.Option( "fidelity_storage_state.json", help="Where to write the storage_state JSON (stage it to Vault afterwards)", ), url: str = typer.Option( "https://pv.planviewer.fidelity.co.uk/", help="PlanViewer SPA URL — defaults to the production UK landing", ), ) -> None: """One-off: launch a headed Chromium so Viktor can log into PlanViewer and capture a long-lived storage_state (cookies + localStorage) for the monthly cron. Expected flow: 1. Chromium opens on the PlanViewer login page. 2. Viktor enters username, password, memorable word, MFA code. 3. Viktor ticks "Remember device" / "Trust this browser" if offered. 4. Viktor waits until the dashboard loads, then presses Enter in the terminal. 5. Script dumps storage_state.json and exits. 6. Viktor runs ``vault kv patch secret/broker-sync fidelity_storage_state=@...``. """ _setup_logging() try: from playwright.sync_api import sync_playwright except ImportError as e: typer.echo( "Playwright is not installed — run `poetry install` first.", err=True) raise typer.Exit(code=2) from e typer.echo(f"Opening {url} in a headed browser — log in, tick " "'Remember device' if offered, then press Enter here.") with sync_playwright() as pw: browser = pw.chromium.launch(headless=False) context = browser.new_context() page = context.new_page() page.goto(url) input("Press Enter once you're fully logged in and the dashboard is visible… ") context.storage_state(path=out) browser.close() typer.echo(f"Wrote {out} — stage it to Vault:") typer.echo(f" vault kv patch secret/broker-sync fidelity_storage_state=@{out}") @app.command("fidelity-ingest") def fidelity_ingest( wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), wf_username: str = typer.Option(..., envvar="WF_USERNAME"), wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"), storage_state_path: str = typer.Option( ..., envvar="FIDELITY_STORAGE_STATE_PATH", help="Path on disk to storage_state.json (materialised from Vault by the init container)", ), plan_id: str = typer.Option(..., envvar="FIDELITY_PLAN_ID"), data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"), mode: str = typer.Option("steady", help="steady = last-60-days; backfill = full history"), ) -> None: """Sync Fidelity UK PlanViewer contributions + fund purchases into Wealthfolio.""" from broker_sync.dedup import SyncRecordStore from broker_sync.pipeline import sync_provider_to_wealthfolio from broker_sync.providers.fidelity_planviewer import ( FidelityCreds, FidelityPlanViewerProvider, ) from broker_sync.sinks.wealthfolio import WealthfolioSink _setup_logging() if mode == "steady": since: datetime | None = datetime.now(UTC) - timedelta(days=60) elif mode == "backfill": since = None else: typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True) sys.exit(2) async def _run() -> None: sink = WealthfolioSink( base_url=wf_base_url, username=wf_username, password=wf_password, session_path=wf_session_path, ) provider = FidelityPlanViewerProvider(FidelityCreds( storage_state_path=storage_state_path, plan_id=plan_id, )) dedup = SyncRecordStore(Path(data_dir) / "sync.db") try: if not Path(wf_session_path).exists(): await sink.login() result = await sync_provider_to_wealthfolio( provider=provider, sink=sink, dedup=dedup, since=since, ) finally: await sink.close() typer.echo(f"fidelity-ingest: fetched={result.fetched} " f"new={result.new_after_dedup} " f"imported={result.imported} " f"failed={result.failed}") if result.failed > 0: sys.exit(1) asyncio.run(_run()) def _setup_logging() -> None: logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) def _parse_t212_accounts(raw: str) -> list[tuple[Account, str]]: """Parse T212_API_KEYS_JSON into (Account, api_key) pairs.""" from broker_sync.models import Account, AccountType parsed = json.loads(raw) if not isinstance(parsed, list): raise typer.BadParameter("T212_API_KEYS_JSON must be a JSON array") pairs: list[tuple[Account, str]] = [] for entry in parsed: if not isinstance(entry, dict): raise typer.BadParameter("Each T212 entry must be an object") try: account = Account( id=entry["id"], name=entry["name"], account_type=AccountType(entry["account_type"]), currency=entry.get("currency", "GBP"), provider="trading212", ) api_key = entry["api_key"] except KeyError as e: raise typer.BadParameter(f"T212 entry missing required key: {e}") from None pairs.append((account, api_key)) return pairs def main() -> None: app() if __name__ == "__main__": os.environ.setdefault("COLUMNS", "120") main()