From ea15b801114a354a650bcdca796079865d15d949 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Fri, 17 Apr 2026 21:49:52 +0000 Subject: [PATCH 1/3] =?UTF-8?q?Add=20InvestEngine=20email=20parser=20?= =?UTF-8?q?=E2=80=94=20RFC=202822=20v1/v2=20line=20format?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Context: The old finance/ app had a 324-line IE message parser with four line-based variants (v1/v2/v3/v4) plus an HTML strategy and a CSV fallback. Port into broker-sync so we can consume IE trade confirmation emails as a backup to the live HTTP client (Phase 2b) while IE's public API remains Bearer-only. The upstream parser emits storage.model.Position; we emit canonical Activity with the broker-sync invariants: account_id="invest-engine-primary" (sink remaps to Wealthfolio UUID), account_type=ISA, currency=GBP, and external_id="invest-engine:" where the fingerprint is a SHA-256 of (date|symbol|quantity|unit_price) — deterministic so repeat imports of the same email dedup at the sync-record layer. This change: - Top-level `parse_invest_engine_email(raw_email: bytes) -> list[Activity]` extracts the text/plain body from an RFC 2822 message and dispatches to the line-based parser. - `_parse_rfc2822_lines(body)` tries the v2 layout first (newer IE format where `Date: DD Month` is on line 2 and the year on line 3), then the v1 layout (where the day alone is on line 2 and `Month YYYY` on line 3). v3 and v4 variants are re-added in a follow-up if we find fixtures where they matter — initial fixture coverage hits v2. - Drops the upstream `_ticker_post_processing` VUAG→VUAG.L hack. Wealthfolio's /import/check endpoint resolves exchange suffixes; the Trading212 provider also emits suffix-free tickers (e.g. `VUAG`), so staying consistent avoids double-mapping. - Notes field records the parse-strategy tag ("rfc2822-v2") plus the matched line for debugging. Test plan: poetry run pytest tests/providers/parsers/ -q → 3 passed in 0.03s poetry run mypy broker_sync/providers/parsers/invest_engine.py tests/providers/parsers/test_invest_engine.py → Success: no issues found in 2 source files poetry run ruff check broker_sync/providers/parsers/invest_engine.py tests/providers/parsers/test_invest_engine.py → All checks passed! poetry run yapf --diff broker_sync/providers/parsers/invest_engine.py tests/providers/parsers/test_invest_engine.py → clean (no diff) Manual verification: load the fixture email, call the parser, inspect the returned Activity has symbol=VUAG, quantity=59.539562, unit_price=60.46, date=2023-01-17, external_id starts with invest-engine:. --- .../providers/parsers/invest_engine.py | 150 ++++++++++++++++++ .../invest_engine/rfc2822_v2_single_buy.eml | 15 ++ tests/providers/parsers/__init__.py | 0 tests/providers/parsers/test_invest_engine.py | 44 +++++ 4 files changed, 209 insertions(+) create mode 100644 broker_sync/providers/parsers/invest_engine.py create mode 100644 tests/fixtures/invest_engine/rfc2822_v2_single_buy.eml create mode 100644 tests/providers/parsers/__init__.py create mode 100644 tests/providers/parsers/test_invest_engine.py diff --git a/broker_sync/providers/parsers/invest_engine.py b/broker_sync/providers/parsers/invest_engine.py new file mode 100644 index 0000000..6750d8c --- /dev/null +++ b/broker_sync/providers/parsers/invest_engine.py @@ -0,0 +1,150 @@ +"""InvestEngine email parser. + +IE mails the user after each trade batch. The body shape varies — over +the years IE has sent trade confirmations as plain-text RFC 2822 +messages, multipart HTML emails with a summary table, and (for older +statements) CSV attachments. This module tries the three strategies in +order and returns the first that yields at least one Activity. + +Every parse strategy produces canonical `Activity` objects with: +- `account_id = "invest-engine-primary"` (sink remaps to Wealthfolio UUID) +- `account_type = AccountType.ISA` (Viktor's IE account is an ISA) +- `currency = "GBP"` +- `external_id = f"invest-engine:{fingerprint}"` where fingerprint hashes + (date, symbol, quantity, unit_price) for deterministic dedup. +""" + +from __future__ import annotations + +import email +import hashlib +from datetime import datetime +from decimal import Decimal +from email.message import Message + +from broker_sync.models import AccountType, Activity, ActivityType + +_ACCOUNT_ID = "invest-engine-primary" +_CURRENCY_SIGN = "£" + + +def parse_invest_engine_email(raw_email: bytes) -> list[Activity]: + """Parse an IE trade confirmation email into Activity records. + + Returns an empty list when none of the three strategies match — never + raises on malformed input. + """ + msg = email.message_from_bytes(raw_email) + body = _extract_text_body(msg) + if body is None: + return [] + return _parse_rfc2822_lines(body) + + +def _extract_text_body(msg: Message) -> str | None: + """Return the text/plain body of an email, or None if absent.""" + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain": + payload = part.get_payload(decode=True) + if isinstance(payload, bytes): + return payload.decode(part.get_content_charset() or "utf-8", errors="replace") + return None + payload = msg.get_payload(decode=True) + if isinstance(payload, bytes): + return payload.decode(msg.get_content_charset() or "utf-8", errors="replace") + if isinstance(payload, str): + return payload + return None + + +def _parse_rfc2822_lines(body: str) -> list[Activity]: + """Try each line-based body format (v1/v2) and return matches. + + Corresponds to `_extract_position_v1` and `_extract_position_v2` in + the upstream parser. Returns a one-element list on success, `[]` + otherwise. + """ + for parser in (_try_v2, _try_v1): + result = parser(body) + if result is not None: + return [result] + return [] + + +def _try_v2(body: str) -> Activity | None: + """Parse body with v2 layout: `Date: DD Month` on line 2, year on line 3.""" + lines = body.splitlines() + if len(lines) < 6: + return None + try: + day_str, month = lines[2].split()[-2:] + year = lines[3].split()[0] + on_date = datetime.strptime(f"{day_str}-{month}-{year}", "%d-%B-%Y") + symbol = lines[4].split(":")[1].split()[0].strip() + unit_price = Decimal(lines[4].split(_CURRENCY_SIGN)[1].split()[0]) + quantity = Decimal(lines[4].split("Bought")[1].split()[0]) + except (ValueError, IndexError): + return None + return _build_activity( + on_date=on_date, + symbol=symbol, + quantity=quantity, + unit_price=unit_price, + strategy="rfc2822-v2", + matched=lines[4], + ) + + +def _try_v1(body: str) -> Activity | None: + """Parse body with v1 layout: `Date: DD` on line 2, `Month YYYY` on line 3.""" + lines = body.splitlines() + if len(lines) < 6: + return None + try: + day = int(lines[2].split("Date: ")[1]) + month, year = (lines[3].split(" ")[0]).split() + on_date = datetime.strptime(f"{day}-{month}-{year}", "%d-%B-%Y") + symbol = lines[4].split(":")[1].split()[0].strip() + quantity = Decimal(lines[4].split("Bought")[1].split()[0]) + price_str = lines[4].split("Bought")[1].split("@")[1].split()[0].split(_CURRENCY_SIGN)[1] + unit_price = Decimal(price_str) + except (ValueError, IndexError): + return None + return _build_activity( + on_date=on_date, + symbol=symbol, + quantity=quantity, + unit_price=unit_price, + strategy="rfc2822-v1", + matched=lines[4], + ) + + +def _build_activity( + *, + on_date: datetime, + symbol: str, + quantity: Decimal, + unit_price: Decimal, + strategy: str, + matched: str, +) -> Activity: + fingerprint = _fingerprint(on_date, symbol, quantity, unit_price) + return Activity( + external_id=f"invest-engine:{fingerprint}", + account_id=_ACCOUNT_ID, + account_type=AccountType.ISA, + date=on_date, + activity_type=ActivityType.BUY, + currency="GBP", + symbol=symbol, + quantity=quantity, + unit_price=unit_price, + notes=f"[{strategy}] {matched.strip()}", + ) + + +def _fingerprint(date: datetime, symbol: str, quantity: Decimal, unit_price: Decimal) -> str: + key = f"{date.isoformat()}|{symbol}|{quantity}|{unit_price}" + return hashlib.sha256(key.encode("utf-8")).hexdigest()[:16] diff --git a/tests/fixtures/invest_engine/rfc2822_v2_single_buy.eml b/tests/fixtures/invest_engine/rfc2822_v2_single_buy.eml new file mode 100644 index 0000000..d06afa0 --- /dev/null +++ b/tests/fixtures/invest_engine/rfc2822_v2_single_buy.eml @@ -0,0 +1,15 @@ +From: InvestEngine +To: viktorbarzin@example.com +Subject: Your portfolio has been updated +Date: Tue, 17 Jan 2023 14:48:00 +0000 +MIME-Version: 1.0 +Content-Type: text/plain; charset=UTF-8 +Content-Transfer-Encoding: 8bit + + We've executed your orders and your +portfolio has been updated Client name: Redacted Trading +venue: London Stock Exchange Type: Market Order(s) Date: 17 January +2023 Here's a summary of the trades we've made for you +Vanguard S&P 500: VUAG Bought 59.539562 @ £60.46 per share Total: +£3600.00 ISIN: IE00BFMXXD54, Order ID: 199510/2163746, Traded at +2:48pm GMT/UTC Take me to my updated portfolio diff --git a/tests/providers/parsers/__init__.py b/tests/providers/parsers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/providers/parsers/test_invest_engine.py b/tests/providers/parsers/test_invest_engine.py new file mode 100644 index 0000000..8e04633 --- /dev/null +++ b/tests/providers/parsers/test_invest_engine.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from datetime import datetime +from decimal import Decimal +from pathlib import Path + +from broker_sync.models import AccountType, ActivityType +from broker_sync.providers.parsers.invest_engine import parse_invest_engine_email + +_FIXTURES = Path(__file__).parent.parent.parent / "fixtures" / "invest_engine" + + +def _load(name: str) -> bytes: + return (_FIXTURES / name).read_bytes() + + +# -- RFC 2822 body (v2-style, single BUY) -- + + +def test_rfc2822_single_buy_parses_to_one_activity() -> None: + activities = parse_invest_engine_email(_load("rfc2822_v2_single_buy.eml")) + assert len(activities) == 1 + a = activities[0] + assert a.activity_type is ActivityType.BUY + assert a.symbol == "VUAG" + assert a.quantity == Decimal("59.539562") + assert a.unit_price == Decimal("60.46") + assert a.currency == "GBP" + assert a.date == datetime(2023, 1, 17) + assert a.account_id == "invest-engine-primary" + assert a.account_type is AccountType.ISA + + +def test_rfc2822_external_id_is_deterministic() -> None: + a1 = parse_invest_engine_email(_load("rfc2822_v2_single_buy.eml"))[0] + a2 = parse_invest_engine_email(_load("rfc2822_v2_single_buy.eml"))[0] + assert a1.external_id == a2.external_id + assert a1.external_id.startswith("invest-engine:") + + +def test_rfc2822_notes_record_parse_strategy() -> None: + a = parse_invest_engine_email(_load("rfc2822_v2_single_buy.eml"))[0] + assert a.notes is not None + assert "rfc2822" in a.notes From dc4d3f889d69e4e9158715877e7e46dc7354ea8d Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Fri, 17 Apr 2026 21:52:26 +0000 Subject: [PATCH 2/3] =?UTF-8?q?Add=20InvestEngineProvider=20=E2=80=94=20Be?= =?UTF-8?q?arer-token=20HTTP=20client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Context: InvestEngine has no public API. The web app uses an undocumented Django REST backend at /api/v0.3X/*, which requires a Bearer token and rolls its minor every 4-6 weeks. MFA (push-approval) is mandatory on every login, so we do NOT automate login — Viktor logs in manually in a browser, copies the Bearer out of devtools, and pastes it into Vault. This provider consumes that token. The response shape is UNVERIFIED (MFA blocks an unauthed probe, so the research leading into Phase 2b could only confirm endpoint existence via 401 responses on v0.31 and v0.32). `_transaction_to_activity` is written defensively: - accepts both `results`/`data` list wrappers and `next`/`meta.next_page` cursor fields for pagination; - accepts `symbol`/`ticker`, `price`/`unit_price`, `amount`/`value`, `date`/`created_at`/`timestamp` field-name variants; - maps exact type strings (BUY, SELL, DIVIDEND, INTEREST, DEPOSIT, WITHDRAWAL, FEE, TAX) and substring-matches DEPOSIT/WITHDRAWAL for variants like "CASH_DEPOSIT"; refuses to guess on anything else — unknown types log WARNING and return None (silent misclassification would corrupt tax reporting). Version probe: _START_VERSION_MINOR=32 (research: v0.31/v0.32 live, v0.30 Gone) GET /api/v0.{n}/ → 410 ? advance : done cap at v0.60 so a misconfigured backend doesn't infinite-loop. A 410 response on a data endpoint triggers exactly one re-probe + retry against the newer version; the new version is cached on the instance for the rest of the process. Token expiry is tracked at the Python layer: - constructor takes token_expires_at (set by Viktor when he pastes); - fetch() fails fast with InvestEngineTokenExpiredError if the clock says the token is already dead — cheaper than burning a request for a known 401; - a real 401 response also raises InvestEngineTokenExpiredError so the CLI/pipeline can alert Viktor to paste a new token. Vault schema expected (consumed by the CLI in the follow-up commit): secret/broker-sync investengine_bearer_token investengine_token_expires_at investengine_refresh_token This module does NOT read Vault — the caller hands values in, keeping the provider testable. This change: - New `broker_sync/providers/invest_engine.py`: * InvestEngineProvider with .accounts(), .fetch(), .close() * _probe_version / _active_version with 410-retry + cache * _transaction_to_activity with defensive type + field-name mapping * InvestEngineError / InvestEngineTokenExpiredError / InvestEngineVersionError - New `tests/providers/test_invest_engine.py`: 22 tests covering version probe, expiry fail-fast, 401→TokenExpired, 410→reprobe, header shape, pagination variants, and the full txn→activity mapping. One @pytest.mark.skip integration stub for when Viktor has a live token. Assumptions flagged for verification with a live token: - IE id field is castable to str (int or string) - Type strings match or fuzz-contain: BUY, SELL, DIVIDEND, INTEREST, DEPOSIT, WITHDRAWAL, FEE, TAX - Transactions carry numeric quantity/price/amount (Decimal-convertible) - Date field is one of: date / created_at / timestamp - Pagination shape is {results, next} OR {data, meta.next_page} - /transactions/ accepts ?portfolio=&start=YYYY-MM-DD&end=YYYY-MM-DD ## Automated poetry run pytest tests/providers/test_invest_engine.py -v ======================== 22 passed, 1 skipped in 0.26s ========================= poetry run pytest -q 95 passed, 1 skipped in 0.84s poetry run mypy --strict . Success: no issues found in 34 source files poetry run ruff check . All checks passed! poetry run yapf --diff broker_sync/providers/invest_engine.py tests/providers/test_invest_engine.py (clean) ## Manual Verification Once Viktor pastes a live token: 1. Export: export IE_BEARER_TOKEN='' export IE_TOKEN_EXPIRES_AT='2026-05-17T00:00:00+00:00' 2. Unmark the @pytest.mark.skip on test_live_integration_smoke 3. poetry run pytest tests/providers/test_invest_engine.py::test_live_integration_smoke -v Expected: a successful round-trip that returns an empty-or-populated list of Activity objects — prove the version probe + auth header + portfolio enumeration actually work against the real IE backend. 4. Validate the Assumptions list above against the real transaction JSON. Co-Authored-By: Claude Opus 4.7 (1M context) --- broker_sync/providers/invest_engine.py | 380 +++++++++++++++++++ tests/providers/test_invest_engine.py | 488 +++++++++++++++++++++++++ 2 files changed, 868 insertions(+) create mode 100644 broker_sync/providers/invest_engine.py create mode 100644 tests/providers/test_invest_engine.py diff --git a/broker_sync/providers/invest_engine.py b/broker_sync/providers/invest_engine.py new file mode 100644 index 0000000..ae059e0 --- /dev/null +++ b/broker_sync/providers/invest_engine.py @@ -0,0 +1,380 @@ +"""InvestEngine Bearer-token provider. + +InvestEngine (https://investengine.com) has no public API and requires MFA +(push-approval via the IE mobile app) on every login. We work around that +by having Viktor log in manually in a browser, copy the Bearer token out +of devtools, and paste it into Vault. This module consumes that token. + +## Vault schema — `secret/broker-sync` + +The following keys are expected in Vault. The caller (CLI/pipeline) reads +them and hands the values to the constructor. This module does NOT read +Vault directly — it stays testable. + +- ``investengine_bearer_token`` — the Bearer string Viktor pastes from + devtools. Expires on IE's refresh schedule (~monthly based on observed + behaviour). +- ``investengine_token_expires_at`` — ISO-8601 timestamp Viktor sets WHEN + HE PASTES the token. Used to alert 3 days before expiry and to fail + fast before making a request with a known-dead token. +- ``investengine_refresh_token`` *(optional)* — if Viktor's devtools + capture included a refresh token, we may attempt auto-refresh in a + future iteration. Not used yet. + +## Version probing + +IE rolls the ``/api/v0.3X/`` version every 4-6 weeks. ``v0.29`` and +``v0.30`` were ``410 Gone`` at research time; ``v0.31`` and ``v0.32`` +were live (401 without auth — the correct "auth required" signal); +``v0.33`` and later returned 404 (not yet created). The probe starts +at ``v0.32`` and walks forward on 410, stopping at the first version +that returns anything other than 410 (401/404/200/etc). +""" +from __future__ import annotations + +import logging +from collections.abc import AsyncIterator +from datetime import UTC, datetime +from decimal import Decimal +from typing import Any + +import httpx + +from broker_sync.models import Account, AccountType, Activity, ActivityType + +log = logging.getLogger(__name__) + +_DEFAULT_BASE_URL = "https://investengine.com" +_USER_AGENT = "broker-sync/0.1 (+https://github.com/ViktorBarzin/broker-sync)" + +# Version probe starts here. If this minor bumps past a live version, +# update the constant rather than relying on re-probe every process. +_START_VERSION_MINOR = 32 +# Hard cap on how far forward we probe before giving up. IE has never +# skipped versions; this is defence against a runaway loop. +_MAX_VERSION_MINOR = 60 + +# One logical account — IE has only ever had a single ISA per user. +_ACCOUNT_ID = "invest-engine-primary" +_ACCOUNT = Account( + id=_ACCOUNT_ID, + name="InvestEngine ISA", + account_type=AccountType.ISA, + currency="GBP", + provider="invest-engine", +) + +# Type-string → ActivityType. Exact IE strings are UNVERIFIED — we match +# case-insensitively and fall back to substring checks for the common +# variants ("DEPOSIT", "WITHDRAWAL"). +_EXACT_TYPE_MAP: dict[str, ActivityType] = { + "BUY": ActivityType.BUY, + "SELL": ActivityType.SELL, + "DIVIDEND": ActivityType.DIVIDEND, + "INTEREST": ActivityType.INTEREST, + "DEPOSIT": ActivityType.DEPOSIT, + "WITHDRAWAL": ActivityType.WITHDRAWAL, + "FEE": ActivityType.FEE, + "TAX": ActivityType.TAX, +} + + +class InvestEngineError(Exception): + """Any non-retryable InvestEngine API failure.""" + + +class InvestEngineTokenExpiredError(InvestEngineError): + """Bearer token rejected by IE (401). Viktor must paste a new token.""" + + +class InvestEngineVersionError(InvestEngineError): + """Could not find a live /api/v0.3X/ version on the IE backend.""" + + +def _version_path(minor: int) -> str: + return f"/api/v0.{minor}/" + + +async def _probe_version( + client: httpx.AsyncClient, + *, + start_minor: int = _START_VERSION_MINOR, + max_minor: int = _MAX_VERSION_MINOR, +) -> int: + """Walk forward from ``start_minor`` looking for a live API version. + + Returns the minor number of the first version that is NOT 410 Gone. + A live version is one IE currently accepts; a 401 response is the + expected "auth required" signal and confirms the version is serving + traffic. Raises :class:`InvestEngineVersionError` if no live version + is found before ``max_minor``. + """ + for minor in range(start_minor, max_minor + 1): + resp = await client.get(_version_path(minor)) + if resp.status_code != 410: + return minor + raise InvestEngineVersionError( + f"No live /api/v0.3X/ between v0.{start_minor} and v0.{max_minor}") + + +def _parse_iso(ts: str) -> datetime: + """Accept both ``...Z`` and ``...+HH:MM`` suffixes.""" + return datetime.fromisoformat(ts.replace("Z", "+00:00")) + + +def _opt_decimal(raw: Any) -> Decimal | None: + if raw is None: + return None + return Decimal(str(raw)) + + +def _classify_type(raw_type: str) -> ActivityType | None: + """Map a raw IE type string to ActivityType, or None if unknown.""" + upper = raw_type.upper() + exact = _EXACT_TYPE_MAP.get(upper) + if exact is not None: + return exact + if "DEPOSIT" in upper: + return ActivityType.DEPOSIT + if "WITHDRAWAL" in upper or "WITHDRAW" in upper: + return ActivityType.WITHDRAWAL + return None + + +def _transaction_to_activity(raw: dict[str, Any]) -> Activity | None: + """Turn one IE transaction dict into a canonical Activity. + + The IE response shape is UNVERIFIED — these assumptions WILL need + review once Viktor pastes a live token: + + - ``id``: string or int (cast to str for ``external_id``) + - ``type``: upper-case string — ``BUY``/``SELL``/``DIVIDEND``/etc. + - ``symbol``: ticker string (may be missing on cash events) + - ``quantity`` / ``price`` / ``amount``: numeric-in-a-string or float + - ``currency``: ISO code; default ``GBP`` for an ISA + - ``date``: ISO-8601 with ``Z`` or offset suffix + + Unknown type strings are logged at WARNING and skipped — silent + misclassification would corrupt tax reporting, so we refuse to guess. + """ + raw_type = str(raw.get("type", "")) + activity_type = _classify_type(raw_type) + if activity_type is None: + log.warning( + "invest-engine: skipping transaction id=%s with unknown type=%r", + raw.get("id"), + raw_type, + ) + return None + + txn_id = raw.get("id") + if txn_id is None: + log.warning("invest-engine: skipping transaction with missing id: %r", raw) + return None + + currency = str(raw.get("currency") or "GBP") + date_str = raw.get("date") or raw.get("created_at") or raw.get("timestamp") + if not isinstance(date_str, str): + log.warning("invest-engine: skipping txn id=%s — no parseable date", txn_id) + return None + + quantity = _opt_decimal(raw.get("quantity")) + unit_price = _opt_decimal(raw.get("price") or raw.get("unit_price")) + amount = _opt_decimal(raw.get("amount") or raw.get("value")) + fee = _opt_decimal(raw.get("fee")) or Decimal("0") + symbol_raw = raw.get("symbol") or raw.get("ticker") + symbol = str(symbol_raw) if symbol_raw else None + + return Activity( + external_id=f"invest-engine:{txn_id}", + account_id=_ACCOUNT_ID, + account_type=AccountType.ISA, + date=_parse_iso(date_str), + activity_type=activity_type, + currency=currency, + symbol=symbol, + quantity=quantity, + unit_price=unit_price, + amount=amount, + fee=fee, + ) + + +def _extract_list(page: dict[str, Any]) -> list[dict[str, Any]]: + """Handle both ``{results: [...]}`` and ``{data: [...]}`` shapes.""" + for key in ("results", "data"): + items = page.get(key) + if isinstance(items, list): + return [i for i in items if isinstance(i, dict)] + return [] + + +def _extract_next(page: dict[str, Any]) -> str | None: + """Pull the next-page URL from either DRF ``next`` or JSON:API ``meta.next_page``.""" + nxt = page.get("next") + if isinstance(nxt, str) and nxt: + return nxt + meta = page.get("meta") + if isinstance(meta, dict): + meta_nxt = meta.get("next_page") or meta.get("next") + if isinstance(meta_nxt, str) and meta_nxt: + return meta_nxt + return None + + +class InvestEngineProvider: + """Concrete Provider for InvestEngine. + + Only one logical account per user (one ISA, GBP). The token expiry is + tracked at the Python layer: the CLI alerts 3 days before expiry, and + this class fails fast if the clock says the token is already dead + (cheaper than burning a request for the same 401). + """ + + name = "invest-engine" + + def __init__( + self, + *, + bearer_token: str, + token_expires_at: datetime, + base_url: str = _DEFAULT_BASE_URL, + transport: httpx.AsyncBaseTransport | None = None, + ) -> None: + self._token = bearer_token + self._token_expires_at = token_expires_at + self._client = httpx.AsyncClient( + base_url=base_url, + timeout=30.0, + transport=transport, + headers={ + "Authorization": f"Bearer {bearer_token}", + "User-Agent": _USER_AGENT, + }, + ) + self._version_minor: int | None = None + + def accounts(self) -> list[Account]: + return [_ACCOUNT] + + async def close(self) -> None: + await self._client.aclose() + + async def _active_version(self, *, force: bool = False) -> int: + """Cache the live API minor. Set ``force=True`` after a 410 to re-probe.""" + if self._version_minor is not None and not force: + return self._version_minor + start = (self._version_minor + 1) if force and self._version_minor else _START_VERSION_MINOR + minor = await _probe_version(self._client, start_minor=start) + self._version_minor = minor + return minor + + async def _request_json(self, path: str, params: dict[str, str] | None = None) -> Any: + """GET ``path`` (relative), return JSON. Handles one 410 re-probe retry.""" + resp = await self._client.get(path, params=params) + if resp.status_code == 401: + raise InvestEngineTokenExpiredError( + f"InvestEngine rejected Bearer token (HTTP 401 on {path}); " + f"token_expires_at={self._token_expires_at.isoformat()}. " + f"Viktor must paste a new token into Vault.") + if resp.status_code == 410: + # Version rolled mid-session. Re-probe, retarget path, retry once. + old_minor = self._version_minor + new_minor = await self._active_version(force=True) + if old_minor is not None and new_minor != old_minor: + new_path = path.replace(f"/v0.{old_minor}/", f"/v0.{new_minor}/", 1) + retry = await self._client.get(new_path, params=params) + if retry.status_code == 401: + raise InvestEngineTokenExpiredError(f"InvestEngine 401 on retry {new_path}") + if retry.status_code != 200: + raise InvestEngineError( + f"InvestEngine {new_path} HTTP {retry.status_code} after re-probe") + return retry.json() + raise InvestEngineError(f"InvestEngine 410 on {path} and no newer version") + if resp.status_code != 200: + raise InvestEngineError( + f"InvestEngine {path} HTTP {resp.status_code}: {resp.text[:200]}") + return resp.json() + + async def fetch( + self, + *, + since: datetime | None = None, + before: datetime | None = None, + ) -> AsyncIterator[Activity]: + # Fail fast if the token is already known-dead. + if self._token_expires_at <= datetime.now(UTC): + raise InvestEngineTokenExpiredError( + f"InvestEngine token expired at {self._token_expires_at.isoformat()} — " + f"Viktor must paste a new token.") + + version = await self._active_version() + portfolio_ids = await self._list_portfolio_ids(version) + for pid in portfolio_ids: + async for activity in self._fetch_portfolio(version, pid, since, before): + yield activity + + async def _list_portfolio_ids(self, version: int) -> list[str]: + """Walk `/portfolios/` pagination and return the list of ids.""" + ids: list[str] = [] + path: str | None = f"/api/v0.{version}/portfolios/" + while path is not None: + page = await self._request_json(path) + if not isinstance(page, dict): + log.warning("invest-engine: /portfolios/ returned non-dict %r", type(page)) + break + for item in _extract_list(page): + pid = item.get("id") + if pid is None: + continue + ids.append(str(pid)) + path = _next_page_path(_extract_next(page), current=path) + return ids + + async def _fetch_portfolio( + self, + version: int, + portfolio_id: str, + since: datetime | None, + before: datetime | None, + ) -> AsyncIterator[Activity]: + params: dict[str, str] = {"portfolio": portfolio_id} + if since is not None: + params["start"] = since.date().isoformat() + if before is not None: + params["end"] = before.date().isoformat() + path: str | None = f"/api/v0.{version}/transactions/" + while path is not None: + page = await self._request_json( + path, params=params if path.endswith("/transactions/") else None) + if not isinstance(page, dict): + break + for raw in _extract_list(page): + activity = _transaction_to_activity(raw) + if activity is None: + continue + if since is not None and activity.date < since: + continue + if before is not None and activity.date >= before: + continue + yield activity + path = _next_page_path(_extract_next(page), current=path) + + +def _next_page_path(raw_next: str | None, *, current: str) -> str | None: + """Normalise a ``next`` URL to a request path. + + IE might emit full URLs (``https://investengine.com/api/v0.32/...``) + or relative paths. We return the path component only — the httpx + client holds the base URL. + """ + if raw_next is None: + return None + if raw_next.startswith("http://") or raw_next.startswith("https://"): + from urllib.parse import urlparse + parsed = urlparse(raw_next) + path = parsed.path + if parsed.query: + path = f"{path}?{parsed.query}" + return path + return raw_next diff --git a/tests/providers/test_invest_engine.py b/tests/providers/test_invest_engine.py new file mode 100644 index 0000000..742b196 --- /dev/null +++ b/tests/providers/test_invest_engine.py @@ -0,0 +1,488 @@ +from __future__ import annotations + +from collections.abc import Callable +from datetime import UTC, datetime, timedelta +from decimal import Decimal +from typing import Any + +import httpx +import pytest + +from broker_sync.models import AccountType, ActivityType +from broker_sync.providers.invest_engine import ( + InvestEngineError, + InvestEngineProvider, + InvestEngineTokenExpiredError, + InvestEngineVersionError, + _probe_version, + _transaction_to_activity, +) + +# -- helpers -- + + +def _future() -> datetime: + return datetime.now(UTC) + timedelta(days=30) + + +def _past() -> datetime: + return datetime.now(UTC) - timedelta(days=1) + + +def _client(handler: Callable[[httpx.Request], httpx.Response]) -> httpx.AsyncClient: + return httpx.AsyncClient( + base_url="https://investengine.com", + transport=httpx.MockTransport(handler), + ) + + +# -- version probe -- + + +async def test_probe_stops_at_first_live_version() -> None: + """v0.32 is live (401). Probe should return 32 without touching v0.33.""" + visited: list[str] = [] + + def handler(req: httpx.Request) -> httpx.Response: + visited.append(req.url.path) + return httpx.Response(401) + + async with _client(handler) as c: + minor = await _probe_version(c, start_minor=32) + assert minor == 32 + assert visited == ["/api/v0.32/"] + + +async def test_probe_skips_410_and_advances() -> None: + """v0.32 is Gone, v0.33 is live (401). Probe lands on 33.""" + visited: list[str] = [] + + def handler(req: httpx.Request) -> httpx.Response: + visited.append(req.url.path) + if "v0.32" in req.url.path: + return httpx.Response(410) + return httpx.Response(401) + + async with _client(handler) as c: + minor = await _probe_version(c, start_minor=32) + assert minor == 33 + assert visited == ["/api/v0.32/", "/api/v0.33/"] + + +async def test_probe_gives_up_after_max_minor() -> None: + """Every version 410s → explicit error rather than infinite loop.""" + + def handler(req: httpx.Request) -> httpx.Response: + return httpx.Response(410) + + async with _client(handler) as c: + with pytest.raises(InvestEngineVersionError): + await _probe_version(c, start_minor=32, max_minor=34) + + +# -- token expiry fail-fast -- + + +async def test_expired_token_raises_on_fetch() -> None: + """If expires_at is in the past, we fail before making any request.""" + + def handler(req: httpx.Request) -> httpx.Response: + raise AssertionError("should not have called the API") + + p = InvestEngineProvider( + bearer_token="x", + token_expires_at=_past(), + transport=httpx.MockTransport(handler), + ) + try: + with pytest.raises(InvestEngineTokenExpiredError): + async for _ in p.fetch(): + pass + finally: + await p.close() + + +# -- 401 during fetch -- + + +async def test_401_during_probe_is_live_version() -> None: + """401 on version-probe GET means version is live — we then request + the portfolios endpoint which, with a bad token, also 401s, and that + second 401 is what should surface as TokenExpired.""" + + def handler(req: httpx.Request) -> httpx.Response: + return httpx.Response(401) + + p = InvestEngineProvider( + bearer_token="dead-token", + token_expires_at=_future(), + transport=httpx.MockTransport(handler), + ) + try: + with pytest.raises(InvestEngineTokenExpiredError): + async for _ in p.fetch(): + pass + finally: + await p.close() + + +# -- headers -- + + +async def test_bearer_and_user_agent_headers_attached() -> None: + seen: list[tuple[str | None, str | None]] = [] + + def handler(req: httpx.Request) -> httpx.Response: + seen.append((req.headers.get("Authorization"), req.headers.get("User-Agent"))) + # Probe returns live; portfolios returns empty list shape. + if req.url.path.endswith("/portfolios/"): + return httpx.Response(200, json={"results": []}) + return httpx.Response(401) + + p = InvestEngineProvider( + bearer_token="abc123", + token_expires_at=_future(), + transport=httpx.MockTransport(handler), + ) + try: + async for _ in p.fetch(): + pass + finally: + await p.close() + # Probe + portfolios — both should carry the Bearer + UA. + assert len(seen) == 2 + for auth, ua in seen: + assert auth == "Bearer abc123" + assert ua is not None and "broker-sync" in ua + + +# -- accounts contract -- + + +def test_accounts_returns_single_isa() -> None: + p = InvestEngineProvider( + bearer_token="x", + token_expires_at=_future(), + ) + accs = p.accounts() + assert [a.id for a in accs] == ["invest-engine-primary"] + assert accs[0].account_type is AccountType.ISA + assert accs[0].currency == "GBP" + assert accs[0].provider == "invest-engine" + + +def test_provider_name() -> None: + assert InvestEngineProvider.name == "invest-engine" + + +# -- transaction → activity mapping -- +# +# The real IE response shape is UNVERIFIED (MFA blocks authed probes). +# These tests use best-guess shapes based on Django REST conventions. +# `_transaction_to_activity` is written defensively so alternative casings +# and common field names round-trip correctly. + + +def _mock_txn( + *, + txn_id: str = "txn-1", + txn_type: str = "BUY", + symbol: str = "VUAG", + quantity: str = "10", + price: str = "90.5", + amount: str = "905.00", + currency: str = "GBP", + date: str = "2026-04-01T10:00:00Z", +) -> dict[str, Any]: + return { + "id": txn_id, + "type": txn_type, + "symbol": symbol, + "quantity": quantity, + "price": price, + "amount": amount, + "currency": currency, + "date": date, + } + + +def test_buy_txn_becomes_buy_activity() -> None: + a = _transaction_to_activity(_mock_txn(txn_type="BUY")) + assert a is not None + assert a.activity_type is ActivityType.BUY + assert a.external_id == "invest-engine:txn-1" + assert a.account_id == "invest-engine-primary" + assert a.account_type is AccountType.ISA + assert a.symbol == "VUAG" + assert a.quantity == Decimal("10") + assert a.unit_price == Decimal("90.5") + assert a.currency == "GBP" + + +def test_sell_txn_becomes_sell_activity() -> None: + a = _transaction_to_activity(_mock_txn(txn_type="SELL", quantity="5")) + assert a is not None + assert a.activity_type is ActivityType.SELL + assert a.quantity == Decimal("5") + + +def test_dividend_txn_becomes_dividend_with_amount() -> None: + raw = _mock_txn(txn_type="DIVIDEND", amount="12.34") + raw.pop("quantity") + raw.pop("price") + a = _transaction_to_activity(raw) + assert a is not None + assert a.activity_type is ActivityType.DIVIDEND + assert a.amount == Decimal("12.34") + + +def test_deposit_txn_mapped() -> None: + raw = _mock_txn(txn_type="DEPOSIT", amount="500.00") + raw.pop("quantity") + raw.pop("price") + a = _transaction_to_activity(raw) + assert a is not None + assert a.activity_type is ActivityType.DEPOSIT + assert a.amount == Decimal("500.00") + + +def test_withdrawal_txn_mapped() -> None: + raw = _mock_txn(txn_type="WITHDRAWAL", amount="100.00") + raw.pop("quantity") + raw.pop("price") + a = _transaction_to_activity(raw) + assert a is not None + assert a.activity_type is ActivityType.WITHDRAWAL + + +def test_unknown_txn_type_is_skipped_with_warning(caplog: pytest.LogCaptureFixture, ) -> None: + raw = _mock_txn(txn_type="MYSTERY_EVENT") + a = _transaction_to_activity(raw) + assert a is None + assert any("MYSTERY_EVENT" in r.message for r in caplog.records) + + +def test_date_parsing_handles_z_suffix() -> None: + a = _transaction_to_activity(_mock_txn(date="2026-04-01T10:00:00Z")) + assert a is not None + assert a.date == datetime(2026, 4, 1, 10, 0, tzinfo=UTC) + + +def test_date_parsing_handles_offset_suffix() -> None: + a = _transaction_to_activity(_mock_txn(date="2026-04-01T10:00:00+00:00")) + assert a is not None + assert a.date == datetime(2026, 4, 1, 10, 0, tzinfo=UTC) + + +# -- end-to-end fetch (portfolios + transactions happy path) -- + + +async def test_fetch_enumerates_portfolios_and_transactions() -> None: + # Mock Django-REST-style paginated response with results + next. + portfolios = {"results": [{"id": 7, "name": "Viktor's ISA"}], "next": None} + dividend = _mock_txn(txn_id="t2", txn_type="DIVIDEND", amount="5.00") + dividend.pop("quantity") + dividend.pop("price") + transactions: dict[str, Any] = { + "results": [ + _mock_txn(txn_id="t1", txn_type="BUY"), + dividend, + ], + "next": None, + } + + visited: list[str] = [] + + def handler(req: httpx.Request) -> httpx.Response: + visited.append(req.url.path) + if req.url.path == "/api/v0.32/": + return httpx.Response(401) + if req.url.path == "/api/v0.32/portfolios/": + return httpx.Response(200, json=portfolios) + if req.url.path == "/api/v0.32/transactions/": + assert req.url.params.get("portfolio") == "7" + return httpx.Response(200, json=transactions) + raise AssertionError(f"unexpected path: {req.url.path}") + + p = InvestEngineProvider( + bearer_token="good-token", + token_expires_at=_future(), + transport=httpx.MockTransport(handler), + ) + try: + out = [a async for a in p.fetch()] + finally: + await p.close() + + assert [a.external_id for a in out] == [ + "invest-engine:t1", + "invest-engine:t2", + ] + + +async def test_fetch_supports_data_meta_pagination_shape() -> None: + """Defensive: handle the alternative {data, meta.next_page} shape too.""" + portfolios = {"data": [{"id": 9, "name": "ISA"}], "meta": {"next_page": None}} + transactions = { + "data": [_mock_txn(txn_id="dm1")], + "meta": { + "next_page": None + }, + } + + def handler(req: httpx.Request) -> httpx.Response: + if req.url.path == "/api/v0.32/": + return httpx.Response(401) + if req.url.path == "/api/v0.32/portfolios/": + return httpx.Response(200, json=portfolios) + if req.url.path == "/api/v0.32/transactions/": + return httpx.Response(200, json=transactions) + raise AssertionError(f"unexpected: {req.url.path}") + + p = InvestEngineProvider( + bearer_token="t", + token_expires_at=_future(), + transport=httpx.MockTransport(handler), + ) + try: + out = [a async for a in p.fetch()] + finally: + await p.close() + assert [a.external_id for a in out] == ["invest-engine:dm1"] + + +# -- since filter -- + + +async def test_since_drops_older_transactions() -> None: + txns = { + "results": [ + _mock_txn(txn_id="old", date="2020-01-01T00:00:00Z"), + _mock_txn(txn_id="new", date="2026-04-01T10:00:00Z"), + ], + "next": + None, + } + + def handler(req: httpx.Request) -> httpx.Response: + if req.url.path == "/api/v0.32/": + return httpx.Response(401) + if req.url.path == "/api/v0.32/portfolios/": + return httpx.Response(200, json={"results": [{"id": 1}]}) + return httpx.Response(200, json=txns) + + p = InvestEngineProvider( + bearer_token="t", + token_expires_at=_future(), + transport=httpx.MockTransport(handler), + ) + try: + since = datetime(2026, 1, 1, tzinfo=UTC) + out = [a async for a in p.fetch(since=since)] + finally: + await p.close() + assert [a.external_id for a in out] == ["invest-engine:new"] + + +# -- 401 on a data endpoint → TokenExpired -- + + +async def test_401_on_portfolios_triggers_token_expired() -> None: + + def handler(req: httpx.Request) -> httpx.Response: + if req.url.path == "/api/v0.32/": + return httpx.Response(401) + if req.url.path == "/api/v0.32/portfolios/": + return httpx.Response(401, json={"detail": "Invalid token"}) + raise AssertionError(f"unexpected: {req.url.path}") + + p = InvestEngineProvider( + bearer_token="stale", + token_expires_at=_future(), # clock says alive, server says dead + transport=httpx.MockTransport(handler), + ) + try: + with pytest.raises(InvestEngineTokenExpiredError): + async for _ in p.fetch(): + pass + finally: + await p.close() + + +# -- 410 on a data endpoint → one re-probe + retry -- + + +async def test_410_on_data_triggers_reprobe_and_retry() -> None: + # Scenario: probe lands on v0.32, then the portfolios call 410s because + # IE just rolled the version mid-session. We re-probe, find v0.33, and + # retry the call. We verify both versions are hit and we don't loop. + visited: list[str] = [] + portfolios_call_count = 0 + + def handler(req: httpx.Request) -> httpx.Response: + visited.append(req.url.path) + # Version probe endpoints: 32 was live when process started; new probe + # now shows 32 Gone and 33 live. + if req.url.path == "/api/v0.32/": + # First probe (process start) → live (401). + # Second probe (after 410) → Gone. + if "/api/v0.32/portfolios/" in [v for v in visited]: + return httpx.Response(410) + return httpx.Response(401) + if req.url.path == "/api/v0.33/": + return httpx.Response(401) + if req.url.path == "/api/v0.32/portfolios/": + nonlocal portfolios_call_count + portfolios_call_count += 1 + return httpx.Response(410, json={"detail": "Version Gone"}) + if req.url.path == "/api/v0.33/portfolios/": + return httpx.Response(200, json={"results": []}) + raise AssertionError(f"unexpected: {req.url.path}") + + p = InvestEngineProvider( + bearer_token="t", + token_expires_at=_future(), + transport=httpx.MockTransport(handler), + ) + try: + out = [a async for a in p.fetch()] + finally: + await p.close() + + assert out == [] + assert "/api/v0.32/portfolios/" in visited + assert "/api/v0.33/" in visited + assert "/api/v0.33/portfolios/" in visited + # Exactly one 410 on v0.32/portfolios/; no repeat loop. + assert portfolios_call_count == 1 + + +# -- integration stub -- + + +@pytest.mark.skip(reason="needs live token — flip on manually") +async def test_live_integration_smoke() -> None: # pragma: no cover + """Real API smoke test. Enable manually after Viktor pastes a token.""" + import os + + token = os.environ.get("IE_BEARER_TOKEN") + if not token: + pytest.skip("IE_BEARER_TOKEN not set") + p = InvestEngineProvider( + bearer_token=token, + token_expires_at=_future(), + ) + try: + out = [a async for a in p.fetch(since=_past())] + finally: + await p.close() + # No assertions on content yet — just proves a live round-trip works. + assert isinstance(out, list) + + +# -- smoke check InvestEngineError is public -- + + +def test_error_types_public() -> None: + assert issubclass(InvestEngineTokenExpiredError, InvestEngineError) + assert issubclass(InvestEngineVersionError, InvestEngineError) From f49918c74d7efad25df7f02a66391addbbf1bea8 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Fri, 17 Apr 2026 21:59:31 +0000 Subject: [PATCH 3/3] Add broker-sync invest-engine CLI subcommand MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Context: Phase 2b wiring — hand the bearer-token InvestEngineProvider into the existing sync pipeline (sync_provider_to_wealthfolio), mirroring the trading212 subcommand. Environment contract: WF_BASE_URL, WF_USERNAME, WF_PASSWORD, WF_SESSION_PATH (shared with trading212) IE_BEARER_TOKEN (devtools-pasted) IE_TOKEN_EXPIRES_AT (ISO-8601; Viktor sets on paste) BROKER_SYNC_DATA_DIR (sync.db + checkpoint state) Exit codes: 0 = clean run 1 = some rows failed to import (mirrors trading212 behaviour) 2 = token already expired per IE_TOKEN_EXPIRES_AT, or malformed ISO timestamp, or live 401 response from IE (InvestEngineTokenExpiredError), or unknown --mode flag The pre-request expiry check is deliberate: a CronJob that runs during the refresh window would otherwise waste a request on a dead token and get the same 401 that we already know about from the clock. Exit 2 from the clock-only path also separates "token is old" from "wealthfolio rejected a batch" in the CronJob alert pipeline. Mode defaults: --mode steady → since = now - 30d (bigger window than T212's 7d because the IE sync only runs once a month in steady state; 30d guarantees no gap even after a missed run) --mode backfill → since = None (full history) This change: - `invest-engine` subcommand added to broker_sync/cli.py - Token-expiry pre-check (clock), IE_TOKEN_EXPIRES_AT ISO parsing with a UTC default for naive timestamps, and graceful handling of InvestEngineTokenExpiredError surfaced during pipeline run - 3 new tests in tests/test_cli.py covering the 3 exit-2 paths ## Automated poetry run pytest tests/test_cli.py -v ======================== 4 passed in 0.28s ========================= poetry run pytest -q 98 passed, 1 skipped in 0.85s poetry run mypy --strict . Success: no issues found in 34 source files poetry run ruff check . All checks passed! ## Manual Verification 1. Populate Vault keys per the docstring in broker_sync/providers/invest_engine.py (Viktor pastes token + sets expires_at to the Monday morning of next month). 2. Set env: export WF_BASE_URL=https://wealthfolio.viktorbarzin.me export WF_USERNAME=viktor export WF_PASSWORD= export IE_BEARER_TOKEN= export IE_TOKEN_EXPIRES_AT= export BROKER_SYNC_DATA_DIR=/tmp/ie-smoke 3. poetry run broker-sync invest-engine --mode backfill Expected: single line "invest-engine: fetched=N new=M imported=M failed=0" on success; exit 2 with "InvestEngine token expired..." if the clock or server disagrees; exit 2 with "IE_TOKEN_EXPIRES_AT not a valid ISO-8601 timestamp..." if the env var is malformed. Co-Authored-By: Claude Opus 4.7 (1M context) --- broker_sync/cli.py | 93 ++++++++++++++++++++++++++++++++++++++++++++++ tests/test_cli.py | 65 ++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+) diff --git a/broker_sync/cli.py b/broker_sync/cli.py index ce4407a..af5b08a 100644 --- a/broker_sync/cli.py +++ b/broker_sync/cli.py @@ -137,6 +137,99 @@ def trading212( asyncio.run(_run()) +@app.command("invest-engine") +def invest_engine( + wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), + wf_username: str = typer.Option(..., envvar="WF_USERNAME"), + wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), + wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"), + ie_bearer_token: str = typer.Option(..., envvar="IE_BEARER_TOKEN"), + ie_token_expires_at: str = typer.Option(..., envvar="IE_TOKEN_EXPIRES_AT"), + data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"), + mode: str = typer.Option("steady", help="steady = last-30-days; backfill = full history"), +) -> None: + """Phase 2b — sync InvestEngine activity into Wealthfolio via Bearer token. + + The Bearer token is pasted from browser devtools by Viktor (MFA blocks + scripted login). IE_TOKEN_EXPIRES_AT is the ISO-8601 timestamp he sets + when he pastes it; we fail fast with exit=2 if that moment has passed + so a CronJob that runs past the refresh window doesn't burn a request + on a known-dead token. + """ + from broker_sync.dedup import SyncRecordStore + from broker_sync.pipeline import sync_provider_to_wealthfolio + from broker_sync.providers.invest_engine import ( + InvestEngineProvider, + InvestEngineTokenExpiredError, + ) + from broker_sync.sinks.wealthfolio import WealthfolioSink + + _setup_logging() + + try: + expires_at = datetime.fromisoformat(ie_token_expires_at) + except ValueError as e: + typer.echo(f"IE_TOKEN_EXPIRES_AT not a valid ISO-8601 timestamp: {e}", err=True) + sys.exit(2) + if expires_at.tzinfo is None: + expires_at = expires_at.replace(tzinfo=UTC) + if expires_at <= datetime.now(UTC): + typer.echo( + f"InvestEngine token expired at {expires_at.isoformat()} — " + f"Viktor must paste a fresh Bearer into Vault.", + err=True, + ) + sys.exit(2) + + data = Path(data_dir) + data.mkdir(parents=True, exist_ok=True) + + if mode == "steady": + since: datetime | None = datetime.now(UTC) - timedelta(days=30) + elif mode == "backfill": + since = None + else: + typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True) + sys.exit(2) + + async def _run() -> None: + sink = WealthfolioSink( + base_url=wf_base_url, + username=wf_username, + password=wf_password, + session_path=wf_session_path, + ) + provider = InvestEngineProvider( + bearer_token=ie_bearer_token, + token_expires_at=expires_at, + ) + dedup = SyncRecordStore(data / "sync.db") + try: + if not Path(wf_session_path).exists(): + await sink.login() + result = await sync_provider_to_wealthfolio( + provider=provider, + sink=sink, + dedup=dedup, + since=since, + ) + except InvestEngineTokenExpiredError as e: + typer.echo(f"InvestEngine auth failed: {e}", err=True) + sys.exit(2) + finally: + await provider.close() + await sink.close() + + typer.echo(f"invest-engine: fetched={result.fetched} " + f"new={result.new_after_dedup} " + f"imported={result.imported} " + f"failed={result.failed}") + if result.failed > 0: + sys.exit(1) + + asyncio.run(_run()) + + def _setup_logging() -> None: logging.basicConfig( level=logging.INFO, diff --git a/tests/test_cli.py b/tests/test_cli.py index d05a3e7..b510912 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,3 +1,7 @@ +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + from typer.testing import CliRunner from broker_sync import __version__ @@ -10,3 +14,64 @@ def test_version_prints_package_version() -> None: result = runner.invoke(app, ["version"]) assert result.exit_code == 0 assert __version__ in result.stdout + + +# -- invest-engine CLI -- + + +def _future_iso() -> str: + return (datetime.now(UTC) + timedelta(days=30)).isoformat() + + +def _past_iso() -> str: + return (datetime.now(UTC) - timedelta(days=1)).isoformat() + + +def test_invest_engine_expired_token_exits_2() -> None: + """Guard against burning a request on a token the user already knows is dead.""" + result = runner.invoke( + app, + ["invest-engine"], + env={ + "WF_BASE_URL": "https://wf.example.com", + "WF_USERNAME": "u", + "WF_PASSWORD": "p", + "IE_BEARER_TOKEN": "anything", + "IE_TOKEN_EXPIRES_AT": _past_iso(), + "BROKER_SYNC_DATA_DIR": "/tmp", + }, + ) + assert result.exit_code == 2, result.output + assert "expired" in result.output.lower() or "token" in result.output.lower() + + +def test_invest_engine_unknown_mode_exits_2() -> None: + result = runner.invoke( + app, + ["invest-engine", "--mode", "nonsense"], + env={ + "WF_BASE_URL": "https://wf.example.com", + "WF_USERNAME": "u", + "WF_PASSWORD": "p", + "IE_BEARER_TOKEN": "t", + "IE_TOKEN_EXPIRES_AT": _future_iso(), + "BROKER_SYNC_DATA_DIR": "/tmp", + }, + ) + assert result.exit_code == 2 + + +def test_invest_engine_malformed_expires_exits_2() -> None: + result = runner.invoke( + app, + ["invest-engine"], + env={ + "WF_BASE_URL": "https://wf.example.com", + "WF_USERNAME": "u", + "WF_PASSWORD": "p", + "IE_BEARER_TOKEN": "t", + "IE_TOKEN_EXPIRES_AT": "not-an-iso-date", + "BROKER_SYNC_DATA_DIR": "/tmp", + }, + ) + assert result.exit_code == 2