Add broker-sync invest-engine CLI subcommand
Context: Phase 2b wiring — hand the bearer-token InvestEngineProvider
into the existing sync pipeline (sync_provider_to_wealthfolio), mirroring
the trading212 subcommand.
Environment contract:
WF_BASE_URL, WF_USERNAME, WF_PASSWORD, WF_SESSION_PATH (shared with trading212)
IE_BEARER_TOKEN (devtools-pasted)
IE_TOKEN_EXPIRES_AT (ISO-8601; Viktor sets on paste)
BROKER_SYNC_DATA_DIR (sync.db + checkpoint state)
Exit codes:
0 = clean run
1 = some rows failed to import (mirrors trading212 behaviour)
2 = token already expired per IE_TOKEN_EXPIRES_AT, or malformed ISO
timestamp, or live 401 response from IE (InvestEngineTokenExpiredError),
or unknown --mode flag
The pre-request expiry check is deliberate: a CronJob that runs during
the refresh window would otherwise waste a request on a dead token and
get the same 401 that we already know about from the clock. Exit 2
from the clock-only path also separates "token is old" from "wealthfolio
rejected a batch" in the CronJob alert pipeline.
Mode defaults:
--mode steady → since = now - 30d (bigger window than T212's 7d
because the IE sync only runs once a month in steady
state; 30d guarantees no gap even after a missed run)
--mode backfill → since = None (full history)
This change:
- `invest-engine` subcommand added to broker_sync/cli.py
- Token-expiry pre-check (clock), IE_TOKEN_EXPIRES_AT ISO parsing with
a UTC default for naive timestamps, and graceful handling of
InvestEngineTokenExpiredError surfaced during pipeline run
- 3 new tests in tests/test_cli.py covering the 3 exit-2 paths
## Automated
poetry run pytest tests/test_cli.py -v
======================== 4 passed in 0.28s =========================
poetry run pytest -q
98 passed, 1 skipped in 0.85s
poetry run mypy --strict .
Success: no issues found in 34 source files
poetry run ruff check .
All checks passed!
## Manual Verification
1. Populate Vault keys per the docstring in
broker_sync/providers/invest_engine.py (Viktor pastes token + sets
expires_at to the Monday morning of next month).
2. Set env:
export WF_BASE_URL=https://wealthfolio.viktorbarzin.me
export WF_USERNAME=viktor
export WF_PASSWORD=<from Vault>
export IE_BEARER_TOKEN=<from Vault>
export IE_TOKEN_EXPIRES_AT=<from Vault>
export BROKER_SYNC_DATA_DIR=/tmp/ie-smoke
3. poetry run broker-sync invest-engine --mode backfill
Expected: single line "invest-engine: fetched=N new=M imported=M failed=0"
on success; exit 2 with "InvestEngine token expired..." if the clock
or server disagrees; exit 2 with "IE_TOKEN_EXPIRES_AT not a valid
ISO-8601 timestamp..." if the env var is malformed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
dc4d3f889d
commit
f49918c74d
2 changed files with 158 additions and 0 deletions
|
|
@ -137,6 +137,99 @@ def trading212(
|
|||
asyncio.run(_run())
|
||||
|
||||
|
||||
@app.command("invest-engine")
|
||||
def invest_engine(
|
||||
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
|
||||
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
|
||||
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
|
||||
wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"),
|
||||
ie_bearer_token: str = typer.Option(..., envvar="IE_BEARER_TOKEN"),
|
||||
ie_token_expires_at: str = typer.Option(..., envvar="IE_TOKEN_EXPIRES_AT"),
|
||||
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
|
||||
mode: str = typer.Option("steady", help="steady = last-30-days; backfill = full history"),
|
||||
) -> None:
|
||||
"""Phase 2b — sync InvestEngine activity into Wealthfolio via Bearer token.
|
||||
|
||||
The Bearer token is pasted from browser devtools by Viktor (MFA blocks
|
||||
scripted login). IE_TOKEN_EXPIRES_AT is the ISO-8601 timestamp he sets
|
||||
when he pastes it; we fail fast with exit=2 if that moment has passed
|
||||
so a CronJob that runs past the refresh window doesn't burn a request
|
||||
on a known-dead token.
|
||||
"""
|
||||
from broker_sync.dedup import SyncRecordStore
|
||||
from broker_sync.pipeline import sync_provider_to_wealthfolio
|
||||
from broker_sync.providers.invest_engine import (
|
||||
InvestEngineProvider,
|
||||
InvestEngineTokenExpiredError,
|
||||
)
|
||||
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
||||
|
||||
_setup_logging()
|
||||
|
||||
try:
|
||||
expires_at = datetime.fromisoformat(ie_token_expires_at)
|
||||
except ValueError as e:
|
||||
typer.echo(f"IE_TOKEN_EXPIRES_AT not a valid ISO-8601 timestamp: {e}", err=True)
|
||||
sys.exit(2)
|
||||
if expires_at.tzinfo is None:
|
||||
expires_at = expires_at.replace(tzinfo=UTC)
|
||||
if expires_at <= datetime.now(UTC):
|
||||
typer.echo(
|
||||
f"InvestEngine token expired at {expires_at.isoformat()} — "
|
||||
f"Viktor must paste a fresh Bearer into Vault.",
|
||||
err=True,
|
||||
)
|
||||
sys.exit(2)
|
||||
|
||||
data = Path(data_dir)
|
||||
data.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if mode == "steady":
|
||||
since: datetime | None = datetime.now(UTC) - timedelta(days=30)
|
||||
elif mode == "backfill":
|
||||
since = None
|
||||
else:
|
||||
typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True)
|
||||
sys.exit(2)
|
||||
|
||||
async def _run() -> None:
|
||||
sink = WealthfolioSink(
|
||||
base_url=wf_base_url,
|
||||
username=wf_username,
|
||||
password=wf_password,
|
||||
session_path=wf_session_path,
|
||||
)
|
||||
provider = InvestEngineProvider(
|
||||
bearer_token=ie_bearer_token,
|
||||
token_expires_at=expires_at,
|
||||
)
|
||||
dedup = SyncRecordStore(data / "sync.db")
|
||||
try:
|
||||
if not Path(wf_session_path).exists():
|
||||
await sink.login()
|
||||
result = await sync_provider_to_wealthfolio(
|
||||
provider=provider,
|
||||
sink=sink,
|
||||
dedup=dedup,
|
||||
since=since,
|
||||
)
|
||||
except InvestEngineTokenExpiredError as e:
|
||||
typer.echo(f"InvestEngine auth failed: {e}", err=True)
|
||||
sys.exit(2)
|
||||
finally:
|
||||
await provider.close()
|
||||
await sink.close()
|
||||
|
||||
typer.echo(f"invest-engine: fetched={result.fetched} "
|
||||
f"new={result.new_after_dedup} "
|
||||
f"imported={result.imported} "
|
||||
f"failed={result.failed}")
|
||||
if result.failed > 0:
|
||||
sys.exit(1)
|
||||
|
||||
asyncio.run(_run())
|
||||
|
||||
|
||||
def _setup_logging() -> None:
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
|
|
|
|||
|
|
@ -1,3 +1,7 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
from typer.testing import CliRunner
|
||||
|
||||
from broker_sync import __version__
|
||||
|
|
@ -10,3 +14,64 @@ def test_version_prints_package_version() -> None:
|
|||
result = runner.invoke(app, ["version"])
|
||||
assert result.exit_code == 0
|
||||
assert __version__ in result.stdout
|
||||
|
||||
|
||||
# -- invest-engine CLI --
|
||||
|
||||
|
||||
def _future_iso() -> str:
|
||||
return (datetime.now(UTC) + timedelta(days=30)).isoformat()
|
||||
|
||||
|
||||
def _past_iso() -> str:
|
||||
return (datetime.now(UTC) - timedelta(days=1)).isoformat()
|
||||
|
||||
|
||||
def test_invest_engine_expired_token_exits_2() -> None:
|
||||
"""Guard against burning a request on a token the user already knows is dead."""
|
||||
result = runner.invoke(
|
||||
app,
|
||||
["invest-engine"],
|
||||
env={
|
||||
"WF_BASE_URL": "https://wf.example.com",
|
||||
"WF_USERNAME": "u",
|
||||
"WF_PASSWORD": "p",
|
||||
"IE_BEARER_TOKEN": "anything",
|
||||
"IE_TOKEN_EXPIRES_AT": _past_iso(),
|
||||
"BROKER_SYNC_DATA_DIR": "/tmp",
|
||||
},
|
||||
)
|
||||
assert result.exit_code == 2, result.output
|
||||
assert "expired" in result.output.lower() or "token" in result.output.lower()
|
||||
|
||||
|
||||
def test_invest_engine_unknown_mode_exits_2() -> None:
|
||||
result = runner.invoke(
|
||||
app,
|
||||
["invest-engine", "--mode", "nonsense"],
|
||||
env={
|
||||
"WF_BASE_URL": "https://wf.example.com",
|
||||
"WF_USERNAME": "u",
|
||||
"WF_PASSWORD": "p",
|
||||
"IE_BEARER_TOKEN": "t",
|
||||
"IE_TOKEN_EXPIRES_AT": _future_iso(),
|
||||
"BROKER_SYNC_DATA_DIR": "/tmp",
|
||||
},
|
||||
)
|
||||
assert result.exit_code == 2
|
||||
|
||||
|
||||
def test_invest_engine_malformed_expires_exits_2() -> None:
|
||||
result = runner.invoke(
|
||||
app,
|
||||
["invest-engine"],
|
||||
env={
|
||||
"WF_BASE_URL": "https://wf.example.com",
|
||||
"WF_USERNAME": "u",
|
||||
"WF_PASSWORD": "p",
|
||||
"IE_BEARER_TOKEN": "t",
|
||||
"IE_TOKEN_EXPIRES_AT": "not-an-iso-date",
|
||||
"BROKER_SYNC_DATA_DIR": "/tmp",
|
||||
},
|
||||
)
|
||||
assert result.exit_code == 2
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue