## Context
UK workplace pension at planviewer.fidelity.co.uk has no public API; the SPA
calls a private JSON backend at prd.wiciam.fidelity.co.uk/cvmfe/api/*. Viktor
confirmed in DevTools that an OPTIONS preflight lists auth headers
(ch, fid, rid, sid, tbid, theosreferer, ua). Full reverse-engineering of the
endpoint paths is pending Viktor's POST cURL paste for transactions +
holdings views.
Until those endpoints are captured, ship the scaffold: provider module, CLI
commands, tests, docs. This unblocks installing Playwright in the image and
lets Viktor run the one-off seed command on his laptop ahead of the data
integration.
## This change
- broker_sync/providers/fidelity_planviewer.py
- FidelityCreds namedtuple (storage_state_path, plan_id).
- FidelitySessionError (401 → re-seed), FidelityProviderConfigError.
- FidelityPlanViewerProvider: .accounts() returns a single
WORKPLACE_PENSION account, .fetch() raises until endpoints are wired.
- broker_sync/cli.py
- fidelity-seed: launches headed Chromium so Viktor can log in and tick
"Remember device", then dumps storage_state.json.
- fidelity-ingest: stub matching the invest-engine / trading212 CLI
shape; reads storage_state + plan_id, pipes through the shared pipeline.
- tests/providers/test_fidelity_planviewer.py
- Asserts the single-account shape + the loud-failure guard.
- docs/providers/fidelity-planviewer.md
- Architecture diagram, one-time seed procedure, backfill + monthly
commands, alert runbook.
- pyproject.toml
- playwright ^1.47 as a first-class dep (used only by fidelity-seed and
later by the session-refresh step in fidelity-ingest).
## What is NOT in this change
- Endpoint wiring in provider.fetch() — blocked on DevTools POST cURL.
- Infra CronJob + Vault secret + Prometheus alert — lands once the first
manual backfill succeeds and we know the Chromium image size is fine.
- Dockerfile Chromium install — same trigger.
## Verification
### Automated
$ poetry run pytest tests/providers/test_fidelity_planviewer.py -v
2 passed in 0.08s
$ poetry run pytest -q
122 passed, 1 skipped in 1.07s
$ poetry run mypy broker_sync/providers/fidelity_planviewer.py broker_sync/cli.py
Success: no issues found in 2 source files
$ poetry run ruff check broker_sync/providers/fidelity_planviewer.py broker_sync/cli.py tests/providers/test_fidelity_planviewer.py
All checks passed!
### Manual (Viktor, later)
1. poetry install && poetry run playwright install chromium
2. poetry run broker-sync fidelity-seed --out /tmp/state.json
3. Chromium opens → log in → tick "Remember device" → press Enter
4. vault kv patch secret/broker-sync fidelity_storage_state=@/tmp/state.json
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
510 lines
18 KiB
Python
510 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import UTC, datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
import typer
|
|
|
|
if TYPE_CHECKING:
|
|
from broker_sync.models import Account
|
|
|
|
app = typer.Typer(
|
|
help="broker-sync: pull brokerage activity into Wealthfolio",
|
|
# CRITICAL: rich tracebacks print all local variables on crash, which
|
|
# includes env-sourced credentials (WF_PASSWORD, T212_API_KEYS_JSON).
|
|
# Kubernetes pod logs are world-readable — leaking creds there is a
|
|
# security incident. Plain tracebacks only.
|
|
pretty_exceptions_enable=False,
|
|
)
|
|
|
|
|
|
@app.command("version")
|
|
def version() -> None:
|
|
"""Print version and exit — used by the no-op Phase 0 CronJob as a liveness check."""
|
|
from broker_sync import __version__
|
|
typer.echo(f"broker-sync {__version__}")
|
|
|
|
|
|
@app.command("auth-spike")
|
|
def auth_spike(
|
|
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL", help="Wealthfolio base URL"),
|
|
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
|
|
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
|
|
session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"),
|
|
) -> None:
|
|
"""Phase 0.5 — prove end-to-end auth against live Wealthfolio."""
|
|
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
|
|
|
async def _run() -> None:
|
|
sink = WealthfolioSink(
|
|
base_url=wf_base_url,
|
|
username=wf_username,
|
|
password=wf_password,
|
|
session_path=session_path,
|
|
)
|
|
try:
|
|
await sink.login()
|
|
accounts = await sink.list_accounts()
|
|
typer.echo(f"Logged in. {len(accounts)} account(s) visible.")
|
|
finally:
|
|
await sink.close()
|
|
|
|
try:
|
|
asyncio.run(_run())
|
|
except Exception as e:
|
|
typer.echo(f"auth-spike failed: {e}", err=True)
|
|
sys.exit(1)
|
|
|
|
|
|
@app.command("trading212")
|
|
def trading212(
|
|
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
|
|
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
|
|
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
|
|
wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"),
|
|
t212_api_keys_json: str = typer.Option(..., envvar="T212_API_KEYS_JSON"),
|
|
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
|
|
mode: str = typer.Option("steady", help="steady = last-7-days; backfill = full history"),
|
|
) -> None:
|
|
"""Phase 1 — sync Trading212 accounts into Wealthfolio.
|
|
|
|
T212_API_KEYS_JSON is a JSON array of
|
|
{id, name, account_type, currency, api_key}
|
|
objects — one entry per T212 account (ISA, Invest).
|
|
"""
|
|
from broker_sync.dedup import SyncRecordStore
|
|
from broker_sync.pipeline import sync_provider_to_wealthfolio
|
|
from broker_sync.providers.trading212 import Trading212Provider
|
|
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
|
|
|
_setup_logging()
|
|
accounts = _parse_t212_accounts(t212_api_keys_json)
|
|
if not accounts:
|
|
typer.echo("No accounts configured in T212_API_KEYS_JSON — nothing to do.", err=True)
|
|
sys.exit(2)
|
|
|
|
data = Path(data_dir)
|
|
checkpoint_dir = data / "watermarks"
|
|
checkpoint_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
if mode == "steady":
|
|
since: datetime | None = datetime.now(UTC) - timedelta(days=7)
|
|
elif mode == "backfill":
|
|
since = None
|
|
else:
|
|
typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True)
|
|
sys.exit(2)
|
|
|
|
async def _run() -> None:
|
|
sink = WealthfolioSink(
|
|
base_url=wf_base_url,
|
|
username=wf_username,
|
|
password=wf_password,
|
|
session_path=wf_session_path,
|
|
)
|
|
provider = Trading212Provider(
|
|
accounts=accounts,
|
|
checkpoint_dir=checkpoint_dir,
|
|
)
|
|
dedup = SyncRecordStore(data / "sync.db")
|
|
try:
|
|
# Ensure cookie upfront so a first-run with no session file still works.
|
|
if not Path(wf_session_path).exists():
|
|
await sink.login()
|
|
result = await sync_provider_to_wealthfolio(
|
|
provider=provider,
|
|
sink=sink,
|
|
dedup=dedup,
|
|
since=since,
|
|
)
|
|
finally:
|
|
await provider.close()
|
|
await sink.close()
|
|
|
|
typer.echo(f"trading212: fetched={result.fetched} "
|
|
f"new={result.new_after_dedup} "
|
|
f"imported={result.imported} "
|
|
f"failed={result.failed}")
|
|
if result.failed > 0:
|
|
sys.exit(1)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
@app.command("invest-engine")
|
|
def invest_engine(
|
|
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
|
|
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
|
|
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
|
|
wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"),
|
|
ie_bearer_token: str = typer.Option(..., envvar="IE_BEARER_TOKEN"),
|
|
ie_token_expires_at: str = typer.Option(..., envvar="IE_TOKEN_EXPIRES_AT"),
|
|
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
|
|
mode: str = typer.Option("steady", help="steady = last-30-days; backfill = full history"),
|
|
) -> None:
|
|
"""Phase 2b — sync InvestEngine activity into Wealthfolio via Bearer token.
|
|
|
|
The Bearer token is pasted from browser devtools by Viktor (MFA blocks
|
|
scripted login). IE_TOKEN_EXPIRES_AT is the ISO-8601 timestamp he sets
|
|
when he pastes it; we fail fast with exit=2 if that moment has passed
|
|
so a CronJob that runs past the refresh window doesn't burn a request
|
|
on a known-dead token.
|
|
"""
|
|
from broker_sync.dedup import SyncRecordStore
|
|
from broker_sync.pipeline import sync_provider_to_wealthfolio
|
|
from broker_sync.providers.invest_engine import (
|
|
InvestEngineProvider,
|
|
InvestEngineTokenExpiredError,
|
|
)
|
|
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
|
|
|
_setup_logging()
|
|
|
|
try:
|
|
expires_at = datetime.fromisoformat(ie_token_expires_at)
|
|
except ValueError as e:
|
|
typer.echo(f"IE_TOKEN_EXPIRES_AT not a valid ISO-8601 timestamp: {e}", err=True)
|
|
sys.exit(2)
|
|
if expires_at.tzinfo is None:
|
|
expires_at = expires_at.replace(tzinfo=UTC)
|
|
if expires_at <= datetime.now(UTC):
|
|
typer.echo(
|
|
f"InvestEngine token expired at {expires_at.isoformat()} — "
|
|
f"Viktor must paste a fresh Bearer into Vault.",
|
|
err=True,
|
|
)
|
|
sys.exit(2)
|
|
|
|
data = Path(data_dir)
|
|
data.mkdir(parents=True, exist_ok=True)
|
|
|
|
if mode == "steady":
|
|
since: datetime | None = datetime.now(UTC) - timedelta(days=30)
|
|
elif mode == "backfill":
|
|
since = None
|
|
else:
|
|
typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True)
|
|
sys.exit(2)
|
|
|
|
async def _run() -> None:
|
|
sink = WealthfolioSink(
|
|
base_url=wf_base_url,
|
|
username=wf_username,
|
|
password=wf_password,
|
|
session_path=wf_session_path,
|
|
)
|
|
provider = InvestEngineProvider(
|
|
bearer_token=ie_bearer_token,
|
|
token_expires_at=expires_at,
|
|
)
|
|
dedup = SyncRecordStore(data / "sync.db")
|
|
try:
|
|
if not Path(wf_session_path).exists():
|
|
await sink.login()
|
|
result = await sync_provider_to_wealthfolio(
|
|
provider=provider,
|
|
sink=sink,
|
|
dedup=dedup,
|
|
since=since,
|
|
)
|
|
except InvestEngineTokenExpiredError as e:
|
|
typer.echo(f"InvestEngine auth failed: {e}", err=True)
|
|
sys.exit(2)
|
|
finally:
|
|
await provider.close()
|
|
await sink.close()
|
|
|
|
typer.echo(f"invest-engine: fetched={result.fetched} "
|
|
f"new={result.new_after_dedup} "
|
|
f"imported={result.imported} "
|
|
f"failed={result.failed}")
|
|
if result.failed > 0:
|
|
sys.exit(1)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
@app.command("finance-mysql-import")
|
|
def finance_mysql_import(
|
|
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
|
|
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
|
|
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
|
|
wf_session_path: str = typer.Option("/data/wealthfolio_session.json",
|
|
envvar="WF_SESSION_PATH"),
|
|
db_host: str = typer.Option(..., envvar="FINANCE_DB_HOST"),
|
|
db_port: int = typer.Option(3306, envvar="FINANCE_DB_PORT"),
|
|
db_user: str = typer.Option(..., envvar="FINANCE_DB_USER"),
|
|
db_password: str = typer.Option(..., envvar="FINANCE_DB_PASSWORD"),
|
|
db_name: str = typer.Option("finance", envvar="FINANCE_DB_NAME"),
|
|
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
|
|
) -> None:
|
|
"""One-shot backfill: read the retired finance app's MySQL position table
|
|
and push every row into the correct Wealthfolio account (IE for .L
|
|
tickers, Schwab for US tickers). Idempotent via dedup."""
|
|
from broker_sync.dedup import SyncRecordStore
|
|
from broker_sync.pipeline import sync_provider_to_wealthfolio
|
|
from broker_sync.providers.finance_mysql import (
|
|
FinanceMySQLCreds,
|
|
FinanceMySQLProvider,
|
|
)
|
|
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
|
|
|
_setup_logging()
|
|
data = Path(data_dir)
|
|
data.mkdir(parents=True, exist_ok=True)
|
|
|
|
async def _run() -> None:
|
|
sink = WealthfolioSink(
|
|
base_url=wf_base_url,
|
|
username=wf_username,
|
|
password=wf_password,
|
|
session_path=wf_session_path,
|
|
)
|
|
provider = FinanceMySQLProvider(
|
|
FinanceMySQLCreds(
|
|
host=db_host,
|
|
port=db_port,
|
|
user=db_user,
|
|
password=db_password,
|
|
database=db_name,
|
|
))
|
|
dedup = SyncRecordStore(data / "sync.db")
|
|
try:
|
|
if not Path(wf_session_path).exists():
|
|
await sink.login()
|
|
result = await sync_provider_to_wealthfolio(
|
|
provider=provider,
|
|
sink=sink,
|
|
dedup=dedup,
|
|
)
|
|
finally:
|
|
await sink.close()
|
|
typer.echo(f"finance-mysql: fetched={result.fetched} "
|
|
f"new={result.new_after_dedup} "
|
|
f"imported={result.imported} "
|
|
f"failed={result.failed}")
|
|
if result.failed > 0:
|
|
sys.exit(1)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
@app.command("imap-ingest")
|
|
def imap_ingest(
|
|
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
|
|
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
|
|
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
|
|
wf_session_path: str = typer.Option("/data/wealthfolio_session.json",
|
|
envvar="WF_SESSION_PATH"),
|
|
imap_host: str = typer.Option(..., envvar="IMAP_HOST"),
|
|
imap_user: str = typer.Option(..., envvar="IMAP_USER"),
|
|
imap_password: str = typer.Option(..., envvar="IMAP_PASSWORD"),
|
|
imap_directory: str = typer.Option("INBOX", envvar="IMAP_DIRECTORY"),
|
|
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
|
|
) -> None:
|
|
"""Phase 2/3 — ingest InvestEngine + Schwab confirmation emails via IMAP.
|
|
|
|
Walks the mailbox, routes each message by `From:` sender domain to the
|
|
matching parser, pushes any resulting activities through the shared
|
|
pipeline (dedup → Wealthfolio CSV-free JSON import).
|
|
"""
|
|
from broker_sync.dedup import SyncRecordStore
|
|
from broker_sync.pipeline import sync_provider_to_wealthfolio
|
|
from broker_sync.providers.imap import ImapCreds, ImapProvider
|
|
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
|
|
|
_setup_logging()
|
|
data = Path(data_dir)
|
|
data.mkdir(parents=True, exist_ok=True)
|
|
|
|
async def _run() -> None:
|
|
sink = WealthfolioSink(
|
|
base_url=wf_base_url,
|
|
username=wf_username,
|
|
password=wf_password,
|
|
session_path=wf_session_path,
|
|
)
|
|
provider = ImapProvider(
|
|
ImapCreds(
|
|
host=imap_host,
|
|
user=imap_user,
|
|
password=imap_password,
|
|
directory=imap_directory,
|
|
))
|
|
dedup = SyncRecordStore(data / "sync.db")
|
|
try:
|
|
if not Path(wf_session_path).exists():
|
|
await sink.login()
|
|
result = await sync_provider_to_wealthfolio(
|
|
provider=provider,
|
|
sink=sink,
|
|
dedup=dedup,
|
|
)
|
|
finally:
|
|
await sink.close()
|
|
typer.echo(f"imap-ingest: fetched={result.fetched} "
|
|
f"new={result.new_after_dedup} "
|
|
f"imported={result.imported} "
|
|
f"failed={result.failed}")
|
|
if result.failed > 0:
|
|
sys.exit(1)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
@app.command("fidelity-seed")
|
|
def fidelity_seed(
|
|
out: str = typer.Option(
|
|
"fidelity_storage_state.json",
|
|
help="Where to write the storage_state JSON (stage it to Vault afterwards)",
|
|
),
|
|
url: str = typer.Option(
|
|
"https://pv.planviewer.fidelity.co.uk/",
|
|
help="PlanViewer SPA URL — defaults to the production UK landing",
|
|
),
|
|
) -> None:
|
|
"""One-off: launch a headed Chromium so Viktor can log into PlanViewer and
|
|
capture a long-lived storage_state (cookies + localStorage) for the monthly
|
|
cron.
|
|
|
|
Expected flow:
|
|
1. Chromium opens on the PlanViewer login page.
|
|
2. Viktor enters username, password, memorable word, MFA code.
|
|
3. Viktor ticks "Remember device" / "Trust this browser" if offered.
|
|
4. Viktor waits until the dashboard loads, then presses Enter in the terminal.
|
|
5. Script dumps storage_state.json and exits.
|
|
6. Viktor runs ``vault kv patch secret/broker-sync fidelity_storage_state=@...``.
|
|
"""
|
|
_setup_logging()
|
|
try:
|
|
from playwright.sync_api import sync_playwright
|
|
except ImportError as e:
|
|
typer.echo(
|
|
"Playwright is not installed — run `poetry install` first.", err=True)
|
|
raise typer.Exit(code=2) from e
|
|
|
|
typer.echo(f"Opening {url} in a headed browser — log in, tick "
|
|
"'Remember device' if offered, then press Enter here.")
|
|
with sync_playwright() as pw:
|
|
browser = pw.chromium.launch(headless=False)
|
|
context = browser.new_context()
|
|
page = context.new_page()
|
|
page.goto(url)
|
|
input("Press Enter once you're fully logged in and the dashboard is visible… ")
|
|
context.storage_state(path=out)
|
|
browser.close()
|
|
typer.echo(f"Wrote {out} — stage it to Vault:")
|
|
typer.echo(f" vault kv patch secret/broker-sync fidelity_storage_state=@{out}")
|
|
|
|
|
|
@app.command("fidelity-ingest")
|
|
def fidelity_ingest(
|
|
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
|
|
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
|
|
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
|
|
wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"),
|
|
storage_state_path: str = typer.Option(
|
|
...,
|
|
envvar="FIDELITY_STORAGE_STATE_PATH",
|
|
help="Path on disk to storage_state.json (materialised from Vault by the init container)",
|
|
),
|
|
plan_id: str = typer.Option(..., envvar="FIDELITY_PLAN_ID"),
|
|
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
|
|
mode: str = typer.Option("steady", help="steady = last-60-days; backfill = full history"),
|
|
) -> None:
|
|
"""Sync Fidelity UK PlanViewer contributions + fund purchases into Wealthfolio."""
|
|
from broker_sync.dedup import SyncRecordStore
|
|
from broker_sync.pipeline import sync_provider_to_wealthfolio
|
|
from broker_sync.providers.fidelity_planviewer import (
|
|
FidelityCreds,
|
|
FidelityPlanViewerProvider,
|
|
)
|
|
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
|
|
|
_setup_logging()
|
|
|
|
if mode == "steady":
|
|
since: datetime | None = datetime.now(UTC) - timedelta(days=60)
|
|
elif mode == "backfill":
|
|
since = None
|
|
else:
|
|
typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True)
|
|
sys.exit(2)
|
|
|
|
async def _run() -> None:
|
|
sink = WealthfolioSink(
|
|
base_url=wf_base_url,
|
|
username=wf_username,
|
|
password=wf_password,
|
|
session_path=wf_session_path,
|
|
)
|
|
provider = FidelityPlanViewerProvider(FidelityCreds(
|
|
storage_state_path=storage_state_path,
|
|
plan_id=plan_id,
|
|
))
|
|
dedup = SyncRecordStore(Path(data_dir) / "sync.db")
|
|
try:
|
|
if not Path(wf_session_path).exists():
|
|
await sink.login()
|
|
result = await sync_provider_to_wealthfolio(
|
|
provider=provider, sink=sink, dedup=dedup, since=since,
|
|
)
|
|
finally:
|
|
await sink.close()
|
|
typer.echo(f"fidelity-ingest: fetched={result.fetched} "
|
|
f"new={result.new_after_dedup} "
|
|
f"imported={result.imported} "
|
|
f"failed={result.failed}")
|
|
if result.failed > 0:
|
|
sys.exit(1)
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
def _setup_logging() -> None:
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
|
)
|
|
|
|
|
|
def _parse_t212_accounts(raw: str) -> list[tuple[Account, str]]:
|
|
"""Parse T212_API_KEYS_JSON into (Account, api_key) pairs."""
|
|
from broker_sync.models import Account, AccountType
|
|
|
|
parsed = json.loads(raw)
|
|
if not isinstance(parsed, list):
|
|
raise typer.BadParameter("T212_API_KEYS_JSON must be a JSON array")
|
|
|
|
pairs: list[tuple[Account, str]] = []
|
|
for entry in parsed:
|
|
if not isinstance(entry, dict):
|
|
raise typer.BadParameter("Each T212 entry must be an object")
|
|
try:
|
|
account = Account(
|
|
id=entry["id"],
|
|
name=entry["name"],
|
|
account_type=AccountType(entry["account_type"]),
|
|
currency=entry.get("currency", "GBP"),
|
|
provider="trading212",
|
|
)
|
|
api_key = entry["api_key"]
|
|
except KeyError as e:
|
|
raise typer.BadParameter(f"T212 entry missing required key: {e}") from None
|
|
pairs.append((account, api_key))
|
|
return pairs
|
|
|
|
|
|
def main() -> None:
|
|
app()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
os.environ.setdefault("COLUMNS", "120")
|
|
main()
|