2026-04-17 19:23:54 +00:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
Add sync pipeline + trading212 CLI subcommand
Context
-------
Closes the gap between "Trading212 provider yields Activities" and
"activities land in Wealthfolio with dedup". One generic pipeline
function works for every provider (Phase 2 IMAP ingest and Phase 3
CSV drop will reuse it).
This change
-----------
- `broker_sync/pipeline.py` — sync_provider_to_wealthfolio():
ensure accounts exist in Wealthfolio, fetch, dedup against the local
SQLite store, batch into Wealthfolio's CSV import at 200 rows each,
record successful imports in the dedup store with the returned
Wealthfolio activity id. Failed batches don't touch the dedup store
so the next run retries.
- Notes field stamped with `sync:<provider>:<external_id>` for human
auditability — NOT used for dedup (the SQLite store owns that).
- `broker_sync/cli.py` — new `trading212` subcommand driven by
T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes:
`steady` fetches last 7 days; `backfill` pulls all history. Exits 0
on clean run, 1 if any batch failed, 2 on config errors.
- Pipeline tests with MockTransport: dedup-skip-then-import happy path
(verifies imported CSV contains only the unseen rows and all three
are recorded after the run); import-rejected path (verifies the
failed row is NOT recorded so the next run retries).
Test plan
---------
## Automated
- poetry run pytest -q → 70 passed
- poetry run mypy broker_sync tests → Success: no issues found in 29 source files
- poetry run ruff check . → All checks passed!
- poetry run broker-sync trading212 --help → shows all env vars + mode flag
## Manual Verification
Live smoke test blocked on:
1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password,
trading212_api_keys).
2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied).
3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA.
Once those land:
kubectl -n broker-sync create job t212-backfill \
--from=cronjob/broker-sync-trading212 -- \
broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
|
|
|
import asyncio
|
|
|
|
|
import json
|
|
|
|
|
import logging
|
2026-04-17 19:23:54 +00:00
|
|
|
import os
|
|
|
|
|
import sys
|
Add sync pipeline + trading212 CLI subcommand
Context
-------
Closes the gap between "Trading212 provider yields Activities" and
"activities land in Wealthfolio with dedup". One generic pipeline
function works for every provider (Phase 2 IMAP ingest and Phase 3
CSV drop will reuse it).
This change
-----------
- `broker_sync/pipeline.py` — sync_provider_to_wealthfolio():
ensure accounts exist in Wealthfolio, fetch, dedup against the local
SQLite store, batch into Wealthfolio's CSV import at 200 rows each,
record successful imports in the dedup store with the returned
Wealthfolio activity id. Failed batches don't touch the dedup store
so the next run retries.
- Notes field stamped with `sync:<provider>:<external_id>` for human
auditability — NOT used for dedup (the SQLite store owns that).
- `broker_sync/cli.py` — new `trading212` subcommand driven by
T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes:
`steady` fetches last 7 days; `backfill` pulls all history. Exits 0
on clean run, 1 if any batch failed, 2 on config errors.
- Pipeline tests with MockTransport: dedup-skip-then-import happy path
(verifies imported CSV contains only the unseen rows and all three
are recorded after the run); import-rejected path (verifies the
failed row is NOT recorded so the next run retries).
Test plan
---------
## Automated
- poetry run pytest -q → 70 passed
- poetry run mypy broker_sync tests → Success: no issues found in 29 source files
- poetry run ruff check . → All checks passed!
- poetry run broker-sync trading212 --help → shows all env vars + mode flag
## Manual Verification
Live smoke test blocked on:
1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password,
trading212_api_keys).
2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied).
3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA.
Once those land:
kubectl -n broker-sync create job t212-backfill \
--from=cronjob/broker-sync-trading212 -- \
broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
|
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import TYPE_CHECKING
|
2026-04-17 19:23:54 +00:00
|
|
|
|
|
|
|
|
import typer
|
|
|
|
|
|
Add sync pipeline + trading212 CLI subcommand
Context
-------
Closes the gap between "Trading212 provider yields Activities" and
"activities land in Wealthfolio with dedup". One generic pipeline
function works for every provider (Phase 2 IMAP ingest and Phase 3
CSV drop will reuse it).
This change
-----------
- `broker_sync/pipeline.py` — sync_provider_to_wealthfolio():
ensure accounts exist in Wealthfolio, fetch, dedup against the local
SQLite store, batch into Wealthfolio's CSV import at 200 rows each,
record successful imports in the dedup store with the returned
Wealthfolio activity id. Failed batches don't touch the dedup store
so the next run retries.
- Notes field stamped with `sync:<provider>:<external_id>` for human
auditability — NOT used for dedup (the SQLite store owns that).
- `broker_sync/cli.py` — new `trading212` subcommand driven by
T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes:
`steady` fetches last 7 days; `backfill` pulls all history. Exits 0
on clean run, 1 if any batch failed, 2 on config errors.
- Pipeline tests with MockTransport: dedup-skip-then-import happy path
(verifies imported CSV contains only the unseen rows and all three
are recorded after the run); import-rejected path (verifies the
failed row is NOT recorded so the next run retries).
Test plan
---------
## Automated
- poetry run pytest -q → 70 passed
- poetry run mypy broker_sync tests → Success: no issues found in 29 source files
- poetry run ruff check . → All checks passed!
- poetry run broker-sync trading212 --help → shows all env vars + mode flag
## Manual Verification
Live smoke test blocked on:
1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password,
trading212_api_keys).
2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied).
3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA.
Once those land:
kubectl -n broker-sync create job t212-backfill \
--from=cronjob/broker-sync-trading212 -- \
broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
|
|
|
if TYPE_CHECKING:
|
|
|
|
|
from broker_sync.models import Account
|
|
|
|
|
|
2026-04-17 20:22:30 +00:00
|
|
|
app = typer.Typer(
|
|
|
|
|
help="broker-sync: pull brokerage activity into Wealthfolio",
|
|
|
|
|
# CRITICAL: rich tracebacks print all local variables on crash, which
|
|
|
|
|
# includes env-sourced credentials (WF_PASSWORD, T212_API_KEYS_JSON).
|
|
|
|
|
# Kubernetes pod logs are world-readable — leaking creds there is a
|
|
|
|
|
# security incident. Plain tracebacks only.
|
|
|
|
|
pretty_exceptions_enable=False,
|
|
|
|
|
)
|
2026-04-17 19:23:54 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.command("version")
|
|
|
|
|
def version() -> None:
|
|
|
|
|
"""Print version and exit — used by the no-op Phase 0 CronJob as a liveness check."""
|
|
|
|
|
from broker_sync import __version__
|
|
|
|
|
typer.echo(f"broker-sync {__version__}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.command("auth-spike")
|
|
|
|
|
def auth_spike(
|
|
|
|
|
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL", help="Wealthfolio base URL"),
|
|
|
|
|
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
|
|
|
|
|
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
|
|
|
|
|
session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"),
|
|
|
|
|
) -> None:
|
Add sync pipeline + trading212 CLI subcommand
Context
-------
Closes the gap between "Trading212 provider yields Activities" and
"activities land in Wealthfolio with dedup". One generic pipeline
function works for every provider (Phase 2 IMAP ingest and Phase 3
CSV drop will reuse it).
This change
-----------
- `broker_sync/pipeline.py` — sync_provider_to_wealthfolio():
ensure accounts exist in Wealthfolio, fetch, dedup against the local
SQLite store, batch into Wealthfolio's CSV import at 200 rows each,
record successful imports in the dedup store with the returned
Wealthfolio activity id. Failed batches don't touch the dedup store
so the next run retries.
- Notes field stamped with `sync:<provider>:<external_id>` for human
auditability — NOT used for dedup (the SQLite store owns that).
- `broker_sync/cli.py` — new `trading212` subcommand driven by
T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes:
`steady` fetches last 7 days; `backfill` pulls all history. Exits 0
on clean run, 1 if any batch failed, 2 on config errors.
- Pipeline tests with MockTransport: dedup-skip-then-import happy path
(verifies imported CSV contains only the unseen rows and all three
are recorded after the run); import-rejected path (verifies the
failed row is NOT recorded so the next run retries).
Test plan
---------
## Automated
- poetry run pytest -q → 70 passed
- poetry run mypy broker_sync tests → Success: no issues found in 29 source files
- poetry run ruff check . → All checks passed!
- poetry run broker-sync trading212 --help → shows all env vars + mode flag
## Manual Verification
Live smoke test blocked on:
1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password,
trading212_api_keys).
2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied).
3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA.
Once those land:
kubectl -n broker-sync create job t212-backfill \
--from=cronjob/broker-sync-trading212 -- \
broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
|
|
|
"""Phase 0.5 — prove end-to-end auth against live Wealthfolio."""
|
2026-04-17 19:23:54 +00:00
|
|
|
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
|
|
|
|
|
|
|
|
|
async def _run() -> None:
|
|
|
|
|
sink = WealthfolioSink(
|
|
|
|
|
base_url=wf_base_url,
|
|
|
|
|
username=wf_username,
|
|
|
|
|
password=wf_password,
|
|
|
|
|
session_path=session_path,
|
|
|
|
|
)
|
|
|
|
|
try:
|
|
|
|
|
await sink.login()
|
|
|
|
|
accounts = await sink.list_accounts()
|
|
|
|
|
typer.echo(f"Logged in. {len(accounts)} account(s) visible.")
|
|
|
|
|
finally:
|
|
|
|
|
await sink.close()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
except Exception as e:
|
|
|
|
|
typer.echo(f"auth-spike failed: {e}", err=True)
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
|
Add sync pipeline + trading212 CLI subcommand
Context
-------
Closes the gap between "Trading212 provider yields Activities" and
"activities land in Wealthfolio with dedup". One generic pipeline
function works for every provider (Phase 2 IMAP ingest and Phase 3
CSV drop will reuse it).
This change
-----------
- `broker_sync/pipeline.py` — sync_provider_to_wealthfolio():
ensure accounts exist in Wealthfolio, fetch, dedup against the local
SQLite store, batch into Wealthfolio's CSV import at 200 rows each,
record successful imports in the dedup store with the returned
Wealthfolio activity id. Failed batches don't touch the dedup store
so the next run retries.
- Notes field stamped with `sync:<provider>:<external_id>` for human
auditability — NOT used for dedup (the SQLite store owns that).
- `broker_sync/cli.py` — new `trading212` subcommand driven by
T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes:
`steady` fetches last 7 days; `backfill` pulls all history. Exits 0
on clean run, 1 if any batch failed, 2 on config errors.
- Pipeline tests with MockTransport: dedup-skip-then-import happy path
(verifies imported CSV contains only the unseen rows and all three
are recorded after the run); import-rejected path (verifies the
failed row is NOT recorded so the next run retries).
Test plan
---------
## Automated
- poetry run pytest -q → 70 passed
- poetry run mypy broker_sync tests → Success: no issues found in 29 source files
- poetry run ruff check . → All checks passed!
- poetry run broker-sync trading212 --help → shows all env vars + mode flag
## Manual Verification
Live smoke test blocked on:
1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password,
trading212_api_keys).
2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied).
3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA.
Once those land:
kubectl -n broker-sync create job t212-backfill \
--from=cronjob/broker-sync-trading212 -- \
broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
|
|
|
@app.command("trading212")
|
|
|
|
|
def trading212(
|
|
|
|
|
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
|
|
|
|
|
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
|
|
|
|
|
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
|
|
|
|
|
wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"),
|
|
|
|
|
t212_api_keys_json: str = typer.Option(..., envvar="T212_API_KEYS_JSON"),
|
|
|
|
|
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
|
|
|
|
|
mode: str = typer.Option("steady", help="steady = last-7-days; backfill = full history"),
|
|
|
|
|
) -> None:
|
|
|
|
|
"""Phase 1 — sync Trading212 accounts into Wealthfolio.
|
|
|
|
|
|
|
|
|
|
T212_API_KEYS_JSON is a JSON array of
|
|
|
|
|
{id, name, account_type, currency, api_key}
|
|
|
|
|
objects — one entry per T212 account (ISA, Invest).
|
|
|
|
|
"""
|
|
|
|
|
from broker_sync.dedup import SyncRecordStore
|
|
|
|
|
from broker_sync.pipeline import sync_provider_to_wealthfolio
|
|
|
|
|
from broker_sync.providers.trading212 import Trading212Provider
|
|
|
|
|
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
|
|
|
|
|
|
|
|
|
_setup_logging()
|
|
|
|
|
accounts = _parse_t212_accounts(t212_api_keys_json)
|
|
|
|
|
if not accounts:
|
|
|
|
|
typer.echo("No accounts configured in T212_API_KEYS_JSON — nothing to do.", err=True)
|
|
|
|
|
sys.exit(2)
|
|
|
|
|
|
|
|
|
|
data = Path(data_dir)
|
|
|
|
|
checkpoint_dir = data / "watermarks"
|
|
|
|
|
checkpoint_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
if mode == "steady":
|
|
|
|
|
since: datetime | None = datetime.now(UTC) - timedelta(days=7)
|
|
|
|
|
elif mode == "backfill":
|
|
|
|
|
since = None
|
|
|
|
|
else:
|
|
|
|
|
typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True)
|
|
|
|
|
sys.exit(2)
|
|
|
|
|
|
|
|
|
|
async def _run() -> None:
|
|
|
|
|
sink = WealthfolioSink(
|
|
|
|
|
base_url=wf_base_url,
|
|
|
|
|
username=wf_username,
|
|
|
|
|
password=wf_password,
|
|
|
|
|
session_path=wf_session_path,
|
|
|
|
|
)
|
|
|
|
|
provider = Trading212Provider(
|
|
|
|
|
accounts=accounts,
|
|
|
|
|
checkpoint_dir=checkpoint_dir,
|
|
|
|
|
)
|
|
|
|
|
dedup = SyncRecordStore(data / "sync.db")
|
|
|
|
|
try:
|
|
|
|
|
# Ensure cookie upfront so a first-run with no session file still works.
|
|
|
|
|
if not Path(wf_session_path).exists():
|
|
|
|
|
await sink.login()
|
|
|
|
|
result = await sync_provider_to_wealthfolio(
|
|
|
|
|
provider=provider,
|
|
|
|
|
sink=sink,
|
|
|
|
|
dedup=dedup,
|
|
|
|
|
since=since,
|
|
|
|
|
)
|
|
|
|
|
finally:
|
|
|
|
|
await provider.close()
|
|
|
|
|
await sink.close()
|
|
|
|
|
|
|
|
|
|
typer.echo(f"trading212: fetched={result.fetched} "
|
|
|
|
|
f"new={result.new_after_dedup} "
|
|
|
|
|
f"imported={result.imported} "
|
|
|
|
|
f"failed={result.failed}")
|
|
|
|
|
if result.failed > 0:
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
|
|
|
|
|
Add broker-sync invest-engine CLI subcommand
Context: Phase 2b wiring — hand the bearer-token InvestEngineProvider
into the existing sync pipeline (sync_provider_to_wealthfolio), mirroring
the trading212 subcommand.
Environment contract:
WF_BASE_URL, WF_USERNAME, WF_PASSWORD, WF_SESSION_PATH (shared with trading212)
IE_BEARER_TOKEN (devtools-pasted)
IE_TOKEN_EXPIRES_AT (ISO-8601; Viktor sets on paste)
BROKER_SYNC_DATA_DIR (sync.db + checkpoint state)
Exit codes:
0 = clean run
1 = some rows failed to import (mirrors trading212 behaviour)
2 = token already expired per IE_TOKEN_EXPIRES_AT, or malformed ISO
timestamp, or live 401 response from IE (InvestEngineTokenExpiredError),
or unknown --mode flag
The pre-request expiry check is deliberate: a CronJob that runs during
the refresh window would otherwise waste a request on a dead token and
get the same 401 that we already know about from the clock. Exit 2
from the clock-only path also separates "token is old" from "wealthfolio
rejected a batch" in the CronJob alert pipeline.
Mode defaults:
--mode steady → since = now - 30d (bigger window than T212's 7d
because the IE sync only runs once a month in steady
state; 30d guarantees no gap even after a missed run)
--mode backfill → since = None (full history)
This change:
- `invest-engine` subcommand added to broker_sync/cli.py
- Token-expiry pre-check (clock), IE_TOKEN_EXPIRES_AT ISO parsing with
a UTC default for naive timestamps, and graceful handling of
InvestEngineTokenExpiredError surfaced during pipeline run
- 3 new tests in tests/test_cli.py covering the 3 exit-2 paths
## Automated
poetry run pytest tests/test_cli.py -v
======================== 4 passed in 0.28s =========================
poetry run pytest -q
98 passed, 1 skipped in 0.85s
poetry run mypy --strict .
Success: no issues found in 34 source files
poetry run ruff check .
All checks passed!
## Manual Verification
1. Populate Vault keys per the docstring in
broker_sync/providers/invest_engine.py (Viktor pastes token + sets
expires_at to the Monday morning of next month).
2. Set env:
export WF_BASE_URL=https://wealthfolio.viktorbarzin.me
export WF_USERNAME=viktor
export WF_PASSWORD=<from Vault>
export IE_BEARER_TOKEN=<from Vault>
export IE_TOKEN_EXPIRES_AT=<from Vault>
export BROKER_SYNC_DATA_DIR=/tmp/ie-smoke
3. poetry run broker-sync invest-engine --mode backfill
Expected: single line "invest-engine: fetched=N new=M imported=M failed=0"
on success; exit 2 with "InvestEngine token expired..." if the clock
or server disagrees; exit 2 with "IE_TOKEN_EXPIRES_AT not a valid
ISO-8601 timestamp..." if the env var is malformed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 21:59:31 +00:00
|
|
|
@app.command("invest-engine")
|
|
|
|
|
def invest_engine(
|
|
|
|
|
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
|
|
|
|
|
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
|
|
|
|
|
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
|
|
|
|
|
wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"),
|
|
|
|
|
ie_bearer_token: str = typer.Option(..., envvar="IE_BEARER_TOKEN"),
|
|
|
|
|
ie_token_expires_at: str = typer.Option(..., envvar="IE_TOKEN_EXPIRES_AT"),
|
|
|
|
|
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
|
|
|
|
|
mode: str = typer.Option("steady", help="steady = last-30-days; backfill = full history"),
|
|
|
|
|
) -> None:
|
|
|
|
|
"""Phase 2b — sync InvestEngine activity into Wealthfolio via Bearer token.
|
|
|
|
|
|
|
|
|
|
The Bearer token is pasted from browser devtools by Viktor (MFA blocks
|
|
|
|
|
scripted login). IE_TOKEN_EXPIRES_AT is the ISO-8601 timestamp he sets
|
|
|
|
|
when he pastes it; we fail fast with exit=2 if that moment has passed
|
|
|
|
|
so a CronJob that runs past the refresh window doesn't burn a request
|
|
|
|
|
on a known-dead token.
|
|
|
|
|
"""
|
|
|
|
|
from broker_sync.dedup import SyncRecordStore
|
|
|
|
|
from broker_sync.pipeline import sync_provider_to_wealthfolio
|
|
|
|
|
from broker_sync.providers.invest_engine import (
|
|
|
|
|
InvestEngineProvider,
|
|
|
|
|
InvestEngineTokenExpiredError,
|
|
|
|
|
)
|
|
|
|
|
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
|
|
|
|
|
|
|
|
|
_setup_logging()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
expires_at = datetime.fromisoformat(ie_token_expires_at)
|
|
|
|
|
except ValueError as e:
|
|
|
|
|
typer.echo(f"IE_TOKEN_EXPIRES_AT not a valid ISO-8601 timestamp: {e}", err=True)
|
|
|
|
|
sys.exit(2)
|
|
|
|
|
if expires_at.tzinfo is None:
|
|
|
|
|
expires_at = expires_at.replace(tzinfo=UTC)
|
|
|
|
|
if expires_at <= datetime.now(UTC):
|
|
|
|
|
typer.echo(
|
|
|
|
|
f"InvestEngine token expired at {expires_at.isoformat()} — "
|
|
|
|
|
f"Viktor must paste a fresh Bearer into Vault.",
|
|
|
|
|
err=True,
|
|
|
|
|
)
|
|
|
|
|
sys.exit(2)
|
|
|
|
|
|
|
|
|
|
data = Path(data_dir)
|
|
|
|
|
data.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
if mode == "steady":
|
|
|
|
|
since: datetime | None = datetime.now(UTC) - timedelta(days=30)
|
|
|
|
|
elif mode == "backfill":
|
|
|
|
|
since = None
|
|
|
|
|
else:
|
|
|
|
|
typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True)
|
|
|
|
|
sys.exit(2)
|
|
|
|
|
|
|
|
|
|
async def _run() -> None:
|
|
|
|
|
sink = WealthfolioSink(
|
|
|
|
|
base_url=wf_base_url,
|
|
|
|
|
username=wf_username,
|
|
|
|
|
password=wf_password,
|
|
|
|
|
session_path=wf_session_path,
|
|
|
|
|
)
|
|
|
|
|
provider = InvestEngineProvider(
|
|
|
|
|
bearer_token=ie_bearer_token,
|
|
|
|
|
token_expires_at=expires_at,
|
|
|
|
|
)
|
|
|
|
|
dedup = SyncRecordStore(data / "sync.db")
|
|
|
|
|
try:
|
|
|
|
|
if not Path(wf_session_path).exists():
|
|
|
|
|
await sink.login()
|
|
|
|
|
result = await sync_provider_to_wealthfolio(
|
|
|
|
|
provider=provider,
|
|
|
|
|
sink=sink,
|
|
|
|
|
dedup=dedup,
|
|
|
|
|
since=since,
|
|
|
|
|
)
|
|
|
|
|
except InvestEngineTokenExpiredError as e:
|
|
|
|
|
typer.echo(f"InvestEngine auth failed: {e}", err=True)
|
|
|
|
|
sys.exit(2)
|
|
|
|
|
finally:
|
|
|
|
|
await provider.close()
|
|
|
|
|
await sink.close()
|
|
|
|
|
|
|
|
|
|
typer.echo(f"invest-engine: fetched={result.fetched} "
|
|
|
|
|
f"new={result.new_after_dedup} "
|
|
|
|
|
f"imported={result.imported} "
|
|
|
|
|
f"failed={result.failed}")
|
|
|
|
|
if result.failed > 0:
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
|
|
|
|
|
2026-04-17 22:38:21 +00:00
|
|
|
@app.command("finance-mysql-import")
|
|
|
|
|
def finance_mysql_import(
|
|
|
|
|
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
|
|
|
|
|
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
|
|
|
|
|
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
|
|
|
|
|
wf_session_path: str = typer.Option("/data/wealthfolio_session.json",
|
|
|
|
|
envvar="WF_SESSION_PATH"),
|
|
|
|
|
db_host: str = typer.Option(..., envvar="FINANCE_DB_HOST"),
|
|
|
|
|
db_port: int = typer.Option(3306, envvar="FINANCE_DB_PORT"),
|
|
|
|
|
db_user: str = typer.Option(..., envvar="FINANCE_DB_USER"),
|
|
|
|
|
db_password: str = typer.Option(..., envvar="FINANCE_DB_PASSWORD"),
|
|
|
|
|
db_name: str = typer.Option("finance", envvar="FINANCE_DB_NAME"),
|
|
|
|
|
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
|
|
|
|
|
) -> None:
|
|
|
|
|
"""One-shot backfill: read the retired finance app's MySQL position table
|
|
|
|
|
and push every row into the correct Wealthfolio account (IE for .L
|
|
|
|
|
tickers, Schwab for US tickers). Idempotent via dedup."""
|
|
|
|
|
from broker_sync.dedup import SyncRecordStore
|
|
|
|
|
from broker_sync.pipeline import sync_provider_to_wealthfolio
|
|
|
|
|
from broker_sync.providers.finance_mysql import (
|
|
|
|
|
FinanceMySQLCreds,
|
|
|
|
|
FinanceMySQLProvider,
|
|
|
|
|
)
|
|
|
|
|
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
|
|
|
|
|
|
|
|
|
_setup_logging()
|
|
|
|
|
data = Path(data_dir)
|
|
|
|
|
data.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
async def _run() -> None:
|
|
|
|
|
sink = WealthfolioSink(
|
|
|
|
|
base_url=wf_base_url,
|
|
|
|
|
username=wf_username,
|
|
|
|
|
password=wf_password,
|
|
|
|
|
session_path=wf_session_path,
|
|
|
|
|
)
|
|
|
|
|
provider = FinanceMySQLProvider(
|
|
|
|
|
FinanceMySQLCreds(
|
|
|
|
|
host=db_host,
|
|
|
|
|
port=db_port,
|
|
|
|
|
user=db_user,
|
|
|
|
|
password=db_password,
|
|
|
|
|
database=db_name,
|
|
|
|
|
))
|
|
|
|
|
dedup = SyncRecordStore(data / "sync.db")
|
|
|
|
|
try:
|
|
|
|
|
if not Path(wf_session_path).exists():
|
|
|
|
|
await sink.login()
|
|
|
|
|
result = await sync_provider_to_wealthfolio(
|
|
|
|
|
provider=provider,
|
|
|
|
|
sink=sink,
|
|
|
|
|
dedup=dedup,
|
|
|
|
|
)
|
|
|
|
|
finally:
|
|
|
|
|
await sink.close()
|
|
|
|
|
typer.echo(f"finance-mysql: fetched={result.fetched} "
|
|
|
|
|
f"new={result.new_after_dedup} "
|
|
|
|
|
f"imported={result.imported} "
|
|
|
|
|
f"failed={result.failed}")
|
|
|
|
|
if result.failed > 0:
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
|
|
|
|
|
2026-04-17 22:12:05 +00:00
|
|
|
@app.command("imap-ingest")
|
|
|
|
|
def imap_ingest(
|
|
|
|
|
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
|
|
|
|
|
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
|
|
|
|
|
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
|
|
|
|
|
wf_session_path: str = typer.Option("/data/wealthfolio_session.json",
|
|
|
|
|
envvar="WF_SESSION_PATH"),
|
|
|
|
|
imap_host: str = typer.Option(..., envvar="IMAP_HOST"),
|
|
|
|
|
imap_user: str = typer.Option(..., envvar="IMAP_USER"),
|
|
|
|
|
imap_password: str = typer.Option(..., envvar="IMAP_PASSWORD"),
|
|
|
|
|
imap_directory: str = typer.Option("INBOX", envvar="IMAP_DIRECTORY"),
|
|
|
|
|
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
|
|
|
|
|
) -> None:
|
|
|
|
|
"""Phase 2/3 — ingest InvestEngine + Schwab confirmation emails via IMAP.
|
|
|
|
|
|
|
|
|
|
Walks the mailbox, routes each message by `From:` sender domain to the
|
|
|
|
|
matching parser, pushes any resulting activities through the shared
|
|
|
|
|
pipeline (dedup → Wealthfolio CSV-free JSON import).
|
|
|
|
|
"""
|
|
|
|
|
from broker_sync.dedup import SyncRecordStore
|
|
|
|
|
from broker_sync.pipeline import sync_provider_to_wealthfolio
|
|
|
|
|
from broker_sync.providers.imap import ImapCreds, ImapProvider
|
|
|
|
|
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
|
|
|
|
|
|
|
|
|
_setup_logging()
|
|
|
|
|
data = Path(data_dir)
|
|
|
|
|
data.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
async def _run() -> None:
|
|
|
|
|
sink = WealthfolioSink(
|
|
|
|
|
base_url=wf_base_url,
|
|
|
|
|
username=wf_username,
|
|
|
|
|
password=wf_password,
|
|
|
|
|
session_path=wf_session_path,
|
|
|
|
|
)
|
|
|
|
|
provider = ImapProvider(
|
|
|
|
|
ImapCreds(
|
|
|
|
|
host=imap_host,
|
|
|
|
|
user=imap_user,
|
|
|
|
|
password=imap_password,
|
|
|
|
|
directory=imap_directory,
|
|
|
|
|
))
|
|
|
|
|
dedup = SyncRecordStore(data / "sync.db")
|
|
|
|
|
try:
|
|
|
|
|
if not Path(wf_session_path).exists():
|
|
|
|
|
await sink.login()
|
|
|
|
|
result = await sync_provider_to_wealthfolio(
|
|
|
|
|
provider=provider,
|
|
|
|
|
sink=sink,
|
|
|
|
|
dedup=dedup,
|
|
|
|
|
)
|
|
|
|
|
finally:
|
|
|
|
|
await sink.close()
|
|
|
|
|
typer.echo(f"imap-ingest: fetched={result.fetched} "
|
|
|
|
|
f"new={result.new_after_dedup} "
|
|
|
|
|
f"imported={result.imported} "
|
|
|
|
|
f"failed={result.failed}")
|
|
|
|
|
if result.failed > 0:
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
|
|
|
|
|
fidelity-planviewer: scaffold provider + CLI (seed + stub ingest)
## Context
UK workplace pension at planviewer.fidelity.co.uk has no public API; the SPA
calls a private JSON backend at prd.wiciam.fidelity.co.uk/cvmfe/api/*. Viktor
confirmed in DevTools that an OPTIONS preflight lists auth headers
(ch, fid, rid, sid, tbid, theosreferer, ua). Full reverse-engineering of the
endpoint paths is pending Viktor's POST cURL paste for transactions +
holdings views.
Until those endpoints are captured, ship the scaffold: provider module, CLI
commands, tests, docs. This unblocks installing Playwright in the image and
lets Viktor run the one-off seed command on his laptop ahead of the data
integration.
## This change
- broker_sync/providers/fidelity_planviewer.py
- FidelityCreds namedtuple (storage_state_path, plan_id).
- FidelitySessionError (401 → re-seed), FidelityProviderConfigError.
- FidelityPlanViewerProvider: .accounts() returns a single
WORKPLACE_PENSION account, .fetch() raises until endpoints are wired.
- broker_sync/cli.py
- fidelity-seed: launches headed Chromium so Viktor can log in and tick
"Remember device", then dumps storage_state.json.
- fidelity-ingest: stub matching the invest-engine / trading212 CLI
shape; reads storage_state + plan_id, pipes through the shared pipeline.
- tests/providers/test_fidelity_planviewer.py
- Asserts the single-account shape + the loud-failure guard.
- docs/providers/fidelity-planviewer.md
- Architecture diagram, one-time seed procedure, backfill + monthly
commands, alert runbook.
- pyproject.toml
- playwright ^1.47 as a first-class dep (used only by fidelity-seed and
later by the session-refresh step in fidelity-ingest).
## What is NOT in this change
- Endpoint wiring in provider.fetch() — blocked on DevTools POST cURL.
- Infra CronJob + Vault secret + Prometheus alert — lands once the first
manual backfill succeeds and we know the Chromium image size is fine.
- Dockerfile Chromium install — same trigger.
## Verification
### Automated
$ poetry run pytest tests/providers/test_fidelity_planviewer.py -v
2 passed in 0.08s
$ poetry run pytest -q
122 passed, 1 skipped in 1.07s
$ poetry run mypy broker_sync/providers/fidelity_planviewer.py broker_sync/cli.py
Success: no issues found in 2 source files
$ poetry run ruff check broker_sync/providers/fidelity_planviewer.py broker_sync/cli.py tests/providers/test_fidelity_planviewer.py
All checks passed!
### Manual (Viktor, later)
1. poetry install && poetry run playwright install chromium
2. poetry run broker-sync fidelity-seed --out /tmp/state.json
3. Chromium opens → log in → tick "Remember device" → press Enter
4. vault kv patch secret/broker-sync fidelity_storage_state=@/tmp/state.json
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 14:09:04 +00:00
|
|
|
@app.command("fidelity-seed")
|
|
|
|
|
def fidelity_seed(
|
|
|
|
|
out: str = typer.Option(
|
|
|
|
|
"fidelity_storage_state.json",
|
|
|
|
|
help="Where to write the storage_state JSON (stage it to Vault afterwards)",
|
|
|
|
|
),
|
|
|
|
|
url: str = typer.Option(
|
|
|
|
|
"https://pv.planviewer.fidelity.co.uk/",
|
|
|
|
|
help="PlanViewer SPA URL — defaults to the production UK landing",
|
|
|
|
|
),
|
|
|
|
|
) -> None:
|
|
|
|
|
"""One-off: launch a headed Chromium so Viktor can log into PlanViewer and
|
|
|
|
|
capture a long-lived storage_state (cookies + localStorage) for the monthly
|
|
|
|
|
cron.
|
|
|
|
|
|
|
|
|
|
Expected flow:
|
|
|
|
|
1. Chromium opens on the PlanViewer login page.
|
|
|
|
|
2. Viktor enters username, password, memorable word, MFA code.
|
|
|
|
|
3. Viktor ticks "Remember device" / "Trust this browser" if offered.
|
|
|
|
|
4. Viktor waits until the dashboard loads, then presses Enter in the terminal.
|
|
|
|
|
5. Script dumps storage_state.json and exits.
|
|
|
|
|
6. Viktor runs ``vault kv patch secret/broker-sync fidelity_storage_state=@...``.
|
|
|
|
|
"""
|
|
|
|
|
_setup_logging()
|
|
|
|
|
try:
|
|
|
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
|
except ImportError as e:
|
|
|
|
|
typer.echo(
|
|
|
|
|
"Playwright is not installed — run `poetry install` first.", err=True)
|
|
|
|
|
raise typer.Exit(code=2) from e
|
|
|
|
|
|
|
|
|
|
typer.echo(f"Opening {url} in a headed browser — log in, tick "
|
|
|
|
|
"'Remember device' if offered, then press Enter here.")
|
|
|
|
|
with sync_playwright() as pw:
|
|
|
|
|
browser = pw.chromium.launch(headless=False)
|
|
|
|
|
context = browser.new_context()
|
|
|
|
|
page = context.new_page()
|
|
|
|
|
page.goto(url)
|
|
|
|
|
input("Press Enter once you're fully logged in and the dashboard is visible… ")
|
|
|
|
|
context.storage_state(path=out)
|
|
|
|
|
browser.close()
|
|
|
|
|
typer.echo(f"Wrote {out} — stage it to Vault:")
|
|
|
|
|
typer.echo(f" vault kv patch secret/broker-sync fidelity_storage_state=@{out}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.command("fidelity-ingest")
|
|
|
|
|
def fidelity_ingest(
|
|
|
|
|
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
|
|
|
|
|
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
|
|
|
|
|
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
|
|
|
|
|
wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"),
|
|
|
|
|
storage_state_path: str = typer.Option(
|
|
|
|
|
...,
|
|
|
|
|
envvar="FIDELITY_STORAGE_STATE_PATH",
|
|
|
|
|
help="Path on disk to storage_state.json (materialised from Vault by the init container)",
|
|
|
|
|
),
|
|
|
|
|
plan_id: str = typer.Option(..., envvar="FIDELITY_PLAN_ID"),
|
|
|
|
|
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
|
|
|
|
|
mode: str = typer.Option("steady", help="steady = last-60-days; backfill = full history"),
|
|
|
|
|
) -> None:
|
|
|
|
|
"""Sync Fidelity UK PlanViewer contributions + fund purchases into Wealthfolio."""
|
|
|
|
|
from broker_sync.dedup import SyncRecordStore
|
|
|
|
|
from broker_sync.pipeline import sync_provider_to_wealthfolio
|
|
|
|
|
from broker_sync.providers.fidelity_planviewer import (
|
|
|
|
|
FidelityCreds,
|
|
|
|
|
FidelityPlanViewerProvider,
|
|
|
|
|
)
|
|
|
|
|
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
|
|
|
|
|
|
|
|
|
_setup_logging()
|
|
|
|
|
|
|
|
|
|
if mode == "steady":
|
|
|
|
|
since: datetime | None = datetime.now(UTC) - timedelta(days=60)
|
|
|
|
|
elif mode == "backfill":
|
|
|
|
|
since = None
|
|
|
|
|
else:
|
|
|
|
|
typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True)
|
|
|
|
|
sys.exit(2)
|
|
|
|
|
|
|
|
|
|
async def _run() -> None:
|
|
|
|
|
sink = WealthfolioSink(
|
|
|
|
|
base_url=wf_base_url,
|
|
|
|
|
username=wf_username,
|
|
|
|
|
password=wf_password,
|
|
|
|
|
session_path=wf_session_path,
|
|
|
|
|
)
|
|
|
|
|
provider = FidelityPlanViewerProvider(FidelityCreds(
|
|
|
|
|
storage_state_path=storage_state_path,
|
|
|
|
|
plan_id=plan_id,
|
|
|
|
|
))
|
|
|
|
|
dedup = SyncRecordStore(Path(data_dir) / "sync.db")
|
|
|
|
|
try:
|
|
|
|
|
if not Path(wf_session_path).exists():
|
|
|
|
|
await sink.login()
|
|
|
|
|
result = await sync_provider_to_wealthfolio(
|
|
|
|
|
provider=provider, sink=sink, dedup=dedup, since=since,
|
|
|
|
|
)
|
|
|
|
|
finally:
|
|
|
|
|
await sink.close()
|
|
|
|
|
typer.echo(f"fidelity-ingest: fetched={result.fetched} "
|
|
|
|
|
f"new={result.new_after_dedup} "
|
|
|
|
|
f"imported={result.imported} "
|
|
|
|
|
f"failed={result.failed}")
|
|
|
|
|
if result.failed > 0:
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
|
|
|
|
|
Add sync pipeline + trading212 CLI subcommand
Context
-------
Closes the gap between "Trading212 provider yields Activities" and
"activities land in Wealthfolio with dedup". One generic pipeline
function works for every provider (Phase 2 IMAP ingest and Phase 3
CSV drop will reuse it).
This change
-----------
- `broker_sync/pipeline.py` — sync_provider_to_wealthfolio():
ensure accounts exist in Wealthfolio, fetch, dedup against the local
SQLite store, batch into Wealthfolio's CSV import at 200 rows each,
record successful imports in the dedup store with the returned
Wealthfolio activity id. Failed batches don't touch the dedup store
so the next run retries.
- Notes field stamped with `sync:<provider>:<external_id>` for human
auditability — NOT used for dedup (the SQLite store owns that).
- `broker_sync/cli.py` — new `trading212` subcommand driven by
T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes:
`steady` fetches last 7 days; `backfill` pulls all history. Exits 0
on clean run, 1 if any batch failed, 2 on config errors.
- Pipeline tests with MockTransport: dedup-skip-then-import happy path
(verifies imported CSV contains only the unseen rows and all three
are recorded after the run); import-rejected path (verifies the
failed row is NOT recorded so the next run retries).
Test plan
---------
## Automated
- poetry run pytest -q → 70 passed
- poetry run mypy broker_sync tests → Success: no issues found in 29 source files
- poetry run ruff check . → All checks passed!
- poetry run broker-sync trading212 --help → shows all env vars + mode flag
## Manual Verification
Live smoke test blocked on:
1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password,
trading212_api_keys).
2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied).
3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA.
Once those land:
kubectl -n broker-sync create job t212-backfill \
--from=cronjob/broker-sync-trading212 -- \
broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
|
|
|
def _setup_logging() -> None:
|
|
|
|
|
logging.basicConfig(
|
|
|
|
|
level=logging.INFO,
|
|
|
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_t212_accounts(raw: str) -> list[tuple[Account, str]]:
|
|
|
|
|
"""Parse T212_API_KEYS_JSON into (Account, api_key) pairs."""
|
|
|
|
|
from broker_sync.models import Account, AccountType
|
|
|
|
|
|
|
|
|
|
parsed = json.loads(raw)
|
|
|
|
|
if not isinstance(parsed, list):
|
|
|
|
|
raise typer.BadParameter("T212_API_KEYS_JSON must be a JSON array")
|
|
|
|
|
|
|
|
|
|
pairs: list[tuple[Account, str]] = []
|
|
|
|
|
for entry in parsed:
|
|
|
|
|
if not isinstance(entry, dict):
|
|
|
|
|
raise typer.BadParameter("Each T212 entry must be an object")
|
|
|
|
|
try:
|
|
|
|
|
account = Account(
|
|
|
|
|
id=entry["id"],
|
|
|
|
|
name=entry["name"],
|
|
|
|
|
account_type=AccountType(entry["account_type"]),
|
|
|
|
|
currency=entry.get("currency", "GBP"),
|
|
|
|
|
provider="trading212",
|
|
|
|
|
)
|
|
|
|
|
api_key = entry["api_key"]
|
|
|
|
|
except KeyError as e:
|
|
|
|
|
raise typer.BadParameter(f"T212 entry missing required key: {e}") from None
|
|
|
|
|
pairs.append((account, api_key))
|
|
|
|
|
return pairs
|
|
|
|
|
|
|
|
|
|
|
2026-04-17 19:23:54 +00:00
|
|
|
def main() -> None:
|
|
|
|
|
app()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
os.environ.setdefault("COLUMNS", "120")
|
|
|
|
|
main()
|