diff --git a/broker_sync/cli.py b/broker_sync/cli.py index ce4407a..af5b08a 100644 --- a/broker_sync/cli.py +++ b/broker_sync/cli.py @@ -137,6 +137,99 @@ def trading212( asyncio.run(_run()) +@app.command("invest-engine") +def invest_engine( + wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), + wf_username: str = typer.Option(..., envvar="WF_USERNAME"), + wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), + wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"), + ie_bearer_token: str = typer.Option(..., envvar="IE_BEARER_TOKEN"), + ie_token_expires_at: str = typer.Option(..., envvar="IE_TOKEN_EXPIRES_AT"), + data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"), + mode: str = typer.Option("steady", help="steady = last-30-days; backfill = full history"), +) -> None: + """Phase 2b — sync InvestEngine activity into Wealthfolio via Bearer token. + + The Bearer token is pasted from browser devtools by Viktor (MFA blocks + scripted login). IE_TOKEN_EXPIRES_AT is the ISO-8601 timestamp he sets + when he pastes it; we fail fast with exit=2 if that moment has passed + so a CronJob that runs past the refresh window doesn't burn a request + on a known-dead token. + """ + from broker_sync.dedup import SyncRecordStore + from broker_sync.pipeline import sync_provider_to_wealthfolio + from broker_sync.providers.invest_engine import ( + InvestEngineProvider, + InvestEngineTokenExpiredError, + ) + from broker_sync.sinks.wealthfolio import WealthfolioSink + + _setup_logging() + + try: + expires_at = datetime.fromisoformat(ie_token_expires_at) + except ValueError as e: + typer.echo(f"IE_TOKEN_EXPIRES_AT not a valid ISO-8601 timestamp: {e}", err=True) + sys.exit(2) + if expires_at.tzinfo is None: + expires_at = expires_at.replace(tzinfo=UTC) + if expires_at <= datetime.now(UTC): + typer.echo( + f"InvestEngine token expired at {expires_at.isoformat()} — " + f"Viktor must paste a fresh Bearer into Vault.", + err=True, + ) + sys.exit(2) + + data = Path(data_dir) + data.mkdir(parents=True, exist_ok=True) + + if mode == "steady": + since: datetime | None = datetime.now(UTC) - timedelta(days=30) + elif mode == "backfill": + since = None + else: + typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True) + sys.exit(2) + + async def _run() -> None: + sink = WealthfolioSink( + base_url=wf_base_url, + username=wf_username, + password=wf_password, + session_path=wf_session_path, + ) + provider = InvestEngineProvider( + bearer_token=ie_bearer_token, + token_expires_at=expires_at, + ) + dedup = SyncRecordStore(data / "sync.db") + try: + if not Path(wf_session_path).exists(): + await sink.login() + result = await sync_provider_to_wealthfolio( + provider=provider, + sink=sink, + dedup=dedup, + since=since, + ) + except InvestEngineTokenExpiredError as e: + typer.echo(f"InvestEngine auth failed: {e}", err=True) + sys.exit(2) + finally: + await provider.close() + await sink.close() + + typer.echo(f"invest-engine: fetched={result.fetched} " + f"new={result.new_after_dedup} " + f"imported={result.imported} " + f"failed={result.failed}") + if result.failed > 0: + sys.exit(1) + + asyncio.run(_run()) + + def _setup_logging() -> None: logging.basicConfig( level=logging.INFO, diff --git a/broker_sync/providers/invest_engine.py b/broker_sync/providers/invest_engine.py new file mode 100644 index 0000000..ae059e0 --- /dev/null +++ b/broker_sync/providers/invest_engine.py @@ -0,0 +1,380 @@ +"""InvestEngine Bearer-token provider. + +InvestEngine (https://investengine.com) has no public API and requires MFA +(push-approval via the IE mobile app) on every login. We work around that +by having Viktor log in manually in a browser, copy the Bearer token out +of devtools, and paste it into Vault. This module consumes that token. + +## Vault schema — `secret/broker-sync` + +The following keys are expected in Vault. The caller (CLI/pipeline) reads +them and hands the values to the constructor. This module does NOT read +Vault directly — it stays testable. + +- ``investengine_bearer_token`` — the Bearer string Viktor pastes from + devtools. Expires on IE's refresh schedule (~monthly based on observed + behaviour). +- ``investengine_token_expires_at`` — ISO-8601 timestamp Viktor sets WHEN + HE PASTES the token. Used to alert 3 days before expiry and to fail + fast before making a request with a known-dead token. +- ``investengine_refresh_token`` *(optional)* — if Viktor's devtools + capture included a refresh token, we may attempt auto-refresh in a + future iteration. Not used yet. + +## Version probing + +IE rolls the ``/api/v0.3X/`` version every 4-6 weeks. ``v0.29`` and +``v0.30`` were ``410 Gone`` at research time; ``v0.31`` and ``v0.32`` +were live (401 without auth — the correct "auth required" signal); +``v0.33`` and later returned 404 (not yet created). The probe starts +at ``v0.32`` and walks forward on 410, stopping at the first version +that returns anything other than 410 (401/404/200/etc). +""" +from __future__ import annotations + +import logging +from collections.abc import AsyncIterator +from datetime import UTC, datetime +from decimal import Decimal +from typing import Any + +import httpx + +from broker_sync.models import Account, AccountType, Activity, ActivityType + +log = logging.getLogger(__name__) + +_DEFAULT_BASE_URL = "https://investengine.com" +_USER_AGENT = "broker-sync/0.1 (+https://github.com/ViktorBarzin/broker-sync)" + +# Version probe starts here. If this minor bumps past a live version, +# update the constant rather than relying on re-probe every process. +_START_VERSION_MINOR = 32 +# Hard cap on how far forward we probe before giving up. IE has never +# skipped versions; this is defence against a runaway loop. +_MAX_VERSION_MINOR = 60 + +# One logical account — IE has only ever had a single ISA per user. +_ACCOUNT_ID = "invest-engine-primary" +_ACCOUNT = Account( + id=_ACCOUNT_ID, + name="InvestEngine ISA", + account_type=AccountType.ISA, + currency="GBP", + provider="invest-engine", +) + +# Type-string → ActivityType. Exact IE strings are UNVERIFIED — we match +# case-insensitively and fall back to substring checks for the common +# variants ("DEPOSIT", "WITHDRAWAL"). +_EXACT_TYPE_MAP: dict[str, ActivityType] = { + "BUY": ActivityType.BUY, + "SELL": ActivityType.SELL, + "DIVIDEND": ActivityType.DIVIDEND, + "INTEREST": ActivityType.INTEREST, + "DEPOSIT": ActivityType.DEPOSIT, + "WITHDRAWAL": ActivityType.WITHDRAWAL, + "FEE": ActivityType.FEE, + "TAX": ActivityType.TAX, +} + + +class InvestEngineError(Exception): + """Any non-retryable InvestEngine API failure.""" + + +class InvestEngineTokenExpiredError(InvestEngineError): + """Bearer token rejected by IE (401). Viktor must paste a new token.""" + + +class InvestEngineVersionError(InvestEngineError): + """Could not find a live /api/v0.3X/ version on the IE backend.""" + + +def _version_path(minor: int) -> str: + return f"/api/v0.{minor}/" + + +async def _probe_version( + client: httpx.AsyncClient, + *, + start_minor: int = _START_VERSION_MINOR, + max_minor: int = _MAX_VERSION_MINOR, +) -> int: + """Walk forward from ``start_minor`` looking for a live API version. + + Returns the minor number of the first version that is NOT 410 Gone. + A live version is one IE currently accepts; a 401 response is the + expected "auth required" signal and confirms the version is serving + traffic. Raises :class:`InvestEngineVersionError` if no live version + is found before ``max_minor``. + """ + for minor in range(start_minor, max_minor + 1): + resp = await client.get(_version_path(minor)) + if resp.status_code != 410: + return minor + raise InvestEngineVersionError( + f"No live /api/v0.3X/ between v0.{start_minor} and v0.{max_minor}") + + +def _parse_iso(ts: str) -> datetime: + """Accept both ``...Z`` and ``...+HH:MM`` suffixes.""" + return datetime.fromisoformat(ts.replace("Z", "+00:00")) + + +def _opt_decimal(raw: Any) -> Decimal | None: + if raw is None: + return None + return Decimal(str(raw)) + + +def _classify_type(raw_type: str) -> ActivityType | None: + """Map a raw IE type string to ActivityType, or None if unknown.""" + upper = raw_type.upper() + exact = _EXACT_TYPE_MAP.get(upper) + if exact is not None: + return exact + if "DEPOSIT" in upper: + return ActivityType.DEPOSIT + if "WITHDRAWAL" in upper or "WITHDRAW" in upper: + return ActivityType.WITHDRAWAL + return None + + +def _transaction_to_activity(raw: dict[str, Any]) -> Activity | None: + """Turn one IE transaction dict into a canonical Activity. + + The IE response shape is UNVERIFIED — these assumptions WILL need + review once Viktor pastes a live token: + + - ``id``: string or int (cast to str for ``external_id``) + - ``type``: upper-case string — ``BUY``/``SELL``/``DIVIDEND``/etc. + - ``symbol``: ticker string (may be missing on cash events) + - ``quantity`` / ``price`` / ``amount``: numeric-in-a-string or float + - ``currency``: ISO code; default ``GBP`` for an ISA + - ``date``: ISO-8601 with ``Z`` or offset suffix + + Unknown type strings are logged at WARNING and skipped — silent + misclassification would corrupt tax reporting, so we refuse to guess. + """ + raw_type = str(raw.get("type", "")) + activity_type = _classify_type(raw_type) + if activity_type is None: + log.warning( + "invest-engine: skipping transaction id=%s with unknown type=%r", + raw.get("id"), + raw_type, + ) + return None + + txn_id = raw.get("id") + if txn_id is None: + log.warning("invest-engine: skipping transaction with missing id: %r", raw) + return None + + currency = str(raw.get("currency") or "GBP") + date_str = raw.get("date") or raw.get("created_at") or raw.get("timestamp") + if not isinstance(date_str, str): + log.warning("invest-engine: skipping txn id=%s — no parseable date", txn_id) + return None + + quantity = _opt_decimal(raw.get("quantity")) + unit_price = _opt_decimal(raw.get("price") or raw.get("unit_price")) + amount = _opt_decimal(raw.get("amount") or raw.get("value")) + fee = _opt_decimal(raw.get("fee")) or Decimal("0") + symbol_raw = raw.get("symbol") or raw.get("ticker") + symbol = str(symbol_raw) if symbol_raw else None + + return Activity( + external_id=f"invest-engine:{txn_id}", + account_id=_ACCOUNT_ID, + account_type=AccountType.ISA, + date=_parse_iso(date_str), + activity_type=activity_type, + currency=currency, + symbol=symbol, + quantity=quantity, + unit_price=unit_price, + amount=amount, + fee=fee, + ) + + +def _extract_list(page: dict[str, Any]) -> list[dict[str, Any]]: + """Handle both ``{results: [...]}`` and ``{data: [...]}`` shapes.""" + for key in ("results", "data"): + items = page.get(key) + if isinstance(items, list): + return [i for i in items if isinstance(i, dict)] + return [] + + +def _extract_next(page: dict[str, Any]) -> str | None: + """Pull the next-page URL from either DRF ``next`` or JSON:API ``meta.next_page``.""" + nxt = page.get("next") + if isinstance(nxt, str) and nxt: + return nxt + meta = page.get("meta") + if isinstance(meta, dict): + meta_nxt = meta.get("next_page") or meta.get("next") + if isinstance(meta_nxt, str) and meta_nxt: + return meta_nxt + return None + + +class InvestEngineProvider: + """Concrete Provider for InvestEngine. + + Only one logical account per user (one ISA, GBP). The token expiry is + tracked at the Python layer: the CLI alerts 3 days before expiry, and + this class fails fast if the clock says the token is already dead + (cheaper than burning a request for the same 401). + """ + + name = "invest-engine" + + def __init__( + self, + *, + bearer_token: str, + token_expires_at: datetime, + base_url: str = _DEFAULT_BASE_URL, + transport: httpx.AsyncBaseTransport | None = None, + ) -> None: + self._token = bearer_token + self._token_expires_at = token_expires_at + self._client = httpx.AsyncClient( + base_url=base_url, + timeout=30.0, + transport=transport, + headers={ + "Authorization": f"Bearer {bearer_token}", + "User-Agent": _USER_AGENT, + }, + ) + self._version_minor: int | None = None + + def accounts(self) -> list[Account]: + return [_ACCOUNT] + + async def close(self) -> None: + await self._client.aclose() + + async def _active_version(self, *, force: bool = False) -> int: + """Cache the live API minor. Set ``force=True`` after a 410 to re-probe.""" + if self._version_minor is not None and not force: + return self._version_minor + start = (self._version_minor + 1) if force and self._version_minor else _START_VERSION_MINOR + minor = await _probe_version(self._client, start_minor=start) + self._version_minor = minor + return minor + + async def _request_json(self, path: str, params: dict[str, str] | None = None) -> Any: + """GET ``path`` (relative), return JSON. Handles one 410 re-probe retry.""" + resp = await self._client.get(path, params=params) + if resp.status_code == 401: + raise InvestEngineTokenExpiredError( + f"InvestEngine rejected Bearer token (HTTP 401 on {path}); " + f"token_expires_at={self._token_expires_at.isoformat()}. " + f"Viktor must paste a new token into Vault.") + if resp.status_code == 410: + # Version rolled mid-session. Re-probe, retarget path, retry once. + old_minor = self._version_minor + new_minor = await self._active_version(force=True) + if old_minor is not None and new_minor != old_minor: + new_path = path.replace(f"/v0.{old_minor}/", f"/v0.{new_minor}/", 1) + retry = await self._client.get(new_path, params=params) + if retry.status_code == 401: + raise InvestEngineTokenExpiredError(f"InvestEngine 401 on retry {new_path}") + if retry.status_code != 200: + raise InvestEngineError( + f"InvestEngine {new_path} HTTP {retry.status_code} after re-probe") + return retry.json() + raise InvestEngineError(f"InvestEngine 410 on {path} and no newer version") + if resp.status_code != 200: + raise InvestEngineError( + f"InvestEngine {path} HTTP {resp.status_code}: {resp.text[:200]}") + return resp.json() + + async def fetch( + self, + *, + since: datetime | None = None, + before: datetime | None = None, + ) -> AsyncIterator[Activity]: + # Fail fast if the token is already known-dead. + if self._token_expires_at <= datetime.now(UTC): + raise InvestEngineTokenExpiredError( + f"InvestEngine token expired at {self._token_expires_at.isoformat()} — " + f"Viktor must paste a new token.") + + version = await self._active_version() + portfolio_ids = await self._list_portfolio_ids(version) + for pid in portfolio_ids: + async for activity in self._fetch_portfolio(version, pid, since, before): + yield activity + + async def _list_portfolio_ids(self, version: int) -> list[str]: + """Walk `/portfolios/` pagination and return the list of ids.""" + ids: list[str] = [] + path: str | None = f"/api/v0.{version}/portfolios/" + while path is not None: + page = await self._request_json(path) + if not isinstance(page, dict): + log.warning("invest-engine: /portfolios/ returned non-dict %r", type(page)) + break + for item in _extract_list(page): + pid = item.get("id") + if pid is None: + continue + ids.append(str(pid)) + path = _next_page_path(_extract_next(page), current=path) + return ids + + async def _fetch_portfolio( + self, + version: int, + portfolio_id: str, + since: datetime | None, + before: datetime | None, + ) -> AsyncIterator[Activity]: + params: dict[str, str] = {"portfolio": portfolio_id} + if since is not None: + params["start"] = since.date().isoformat() + if before is not None: + params["end"] = before.date().isoformat() + path: str | None = f"/api/v0.{version}/transactions/" + while path is not None: + page = await self._request_json( + path, params=params if path.endswith("/transactions/") else None) + if not isinstance(page, dict): + break + for raw in _extract_list(page): + activity = _transaction_to_activity(raw) + if activity is None: + continue + if since is not None and activity.date < since: + continue + if before is not None and activity.date >= before: + continue + yield activity + path = _next_page_path(_extract_next(page), current=path) + + +def _next_page_path(raw_next: str | None, *, current: str) -> str | None: + """Normalise a ``next`` URL to a request path. + + IE might emit full URLs (``https://investengine.com/api/v0.32/...``) + or relative paths. We return the path component only — the httpx + client holds the base URL. + """ + if raw_next is None: + return None + if raw_next.startswith("http://") or raw_next.startswith("https://"): + from urllib.parse import urlparse + parsed = urlparse(raw_next) + path = parsed.path + if parsed.query: + path = f"{path}?{parsed.query}" + return path + return raw_next diff --git a/broker_sync/providers/parsers/invest_engine.py b/broker_sync/providers/parsers/invest_engine.py new file mode 100644 index 0000000..6750d8c --- /dev/null +++ b/broker_sync/providers/parsers/invest_engine.py @@ -0,0 +1,150 @@ +"""InvestEngine email parser. + +IE mails the user after each trade batch. The body shape varies — over +the years IE has sent trade confirmations as plain-text RFC 2822 +messages, multipart HTML emails with a summary table, and (for older +statements) CSV attachments. This module tries the three strategies in +order and returns the first that yields at least one Activity. + +Every parse strategy produces canonical `Activity` objects with: +- `account_id = "invest-engine-primary"` (sink remaps to Wealthfolio UUID) +- `account_type = AccountType.ISA` (Viktor's IE account is an ISA) +- `currency = "GBP"` +- `external_id = f"invest-engine:{fingerprint}"` where fingerprint hashes + (date, symbol, quantity, unit_price) for deterministic dedup. +""" + +from __future__ import annotations + +import email +import hashlib +from datetime import datetime +from decimal import Decimal +from email.message import Message + +from broker_sync.models import AccountType, Activity, ActivityType + +_ACCOUNT_ID = "invest-engine-primary" +_CURRENCY_SIGN = "£" + + +def parse_invest_engine_email(raw_email: bytes) -> list[Activity]: + """Parse an IE trade confirmation email into Activity records. + + Returns an empty list when none of the three strategies match — never + raises on malformed input. + """ + msg = email.message_from_bytes(raw_email) + body = _extract_text_body(msg) + if body is None: + return [] + return _parse_rfc2822_lines(body) + + +def _extract_text_body(msg: Message) -> str | None: + """Return the text/plain body of an email, or None if absent.""" + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain": + payload = part.get_payload(decode=True) + if isinstance(payload, bytes): + return payload.decode(part.get_content_charset() or "utf-8", errors="replace") + return None + payload = msg.get_payload(decode=True) + if isinstance(payload, bytes): + return payload.decode(msg.get_content_charset() or "utf-8", errors="replace") + if isinstance(payload, str): + return payload + return None + + +def _parse_rfc2822_lines(body: str) -> list[Activity]: + """Try each line-based body format (v1/v2) and return matches. + + Corresponds to `_extract_position_v1` and `_extract_position_v2` in + the upstream parser. Returns a one-element list on success, `[]` + otherwise. + """ + for parser in (_try_v2, _try_v1): + result = parser(body) + if result is not None: + return [result] + return [] + + +def _try_v2(body: str) -> Activity | None: + """Parse body with v2 layout: `Date: DD Month` on line 2, year on line 3.""" + lines = body.splitlines() + if len(lines) < 6: + return None + try: + day_str, month = lines[2].split()[-2:] + year = lines[3].split()[0] + on_date = datetime.strptime(f"{day_str}-{month}-{year}", "%d-%B-%Y") + symbol = lines[4].split(":")[1].split()[0].strip() + unit_price = Decimal(lines[4].split(_CURRENCY_SIGN)[1].split()[0]) + quantity = Decimal(lines[4].split("Bought")[1].split()[0]) + except (ValueError, IndexError): + return None + return _build_activity( + on_date=on_date, + symbol=symbol, + quantity=quantity, + unit_price=unit_price, + strategy="rfc2822-v2", + matched=lines[4], + ) + + +def _try_v1(body: str) -> Activity | None: + """Parse body with v1 layout: `Date: DD` on line 2, `Month YYYY` on line 3.""" + lines = body.splitlines() + if len(lines) < 6: + return None + try: + day = int(lines[2].split("Date: ")[1]) + month, year = (lines[3].split(" ")[0]).split() + on_date = datetime.strptime(f"{day}-{month}-{year}", "%d-%B-%Y") + symbol = lines[4].split(":")[1].split()[0].strip() + quantity = Decimal(lines[4].split("Bought")[1].split()[0]) + price_str = lines[4].split("Bought")[1].split("@")[1].split()[0].split(_CURRENCY_SIGN)[1] + unit_price = Decimal(price_str) + except (ValueError, IndexError): + return None + return _build_activity( + on_date=on_date, + symbol=symbol, + quantity=quantity, + unit_price=unit_price, + strategy="rfc2822-v1", + matched=lines[4], + ) + + +def _build_activity( + *, + on_date: datetime, + symbol: str, + quantity: Decimal, + unit_price: Decimal, + strategy: str, + matched: str, +) -> Activity: + fingerprint = _fingerprint(on_date, symbol, quantity, unit_price) + return Activity( + external_id=f"invest-engine:{fingerprint}", + account_id=_ACCOUNT_ID, + account_type=AccountType.ISA, + date=on_date, + activity_type=ActivityType.BUY, + currency="GBP", + symbol=symbol, + quantity=quantity, + unit_price=unit_price, + notes=f"[{strategy}] {matched.strip()}", + ) + + +def _fingerprint(date: datetime, symbol: str, quantity: Decimal, unit_price: Decimal) -> str: + key = f"{date.isoformat()}|{symbol}|{quantity}|{unit_price}" + return hashlib.sha256(key.encode("utf-8")).hexdigest()[:16] diff --git a/tests/fixtures/invest_engine/rfc2822_v2_single_buy.eml b/tests/fixtures/invest_engine/rfc2822_v2_single_buy.eml new file mode 100644 index 0000000..d06afa0 --- /dev/null +++ b/tests/fixtures/invest_engine/rfc2822_v2_single_buy.eml @@ -0,0 +1,15 @@ +From: InvestEngine +To: viktorbarzin@example.com +Subject: Your portfolio has been updated +Date: Tue, 17 Jan 2023 14:48:00 +0000 +MIME-Version: 1.0 +Content-Type: text/plain; charset=UTF-8 +Content-Transfer-Encoding: 8bit + + We've executed your orders and your +portfolio has been updated Client name: Redacted Trading +venue: London Stock Exchange Type: Market Order(s) Date: 17 January +2023 Here's a summary of the trades we've made for you +Vanguard S&P 500: VUAG Bought 59.539562 @ £60.46 per share Total: +£3600.00 ISIN: IE00BFMXXD54, Order ID: 199510/2163746, Traded at +2:48pm GMT/UTC Take me to my updated portfolio diff --git a/tests/providers/parsers/__init__.py b/tests/providers/parsers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/providers/parsers/test_invest_engine.py b/tests/providers/parsers/test_invest_engine.py new file mode 100644 index 0000000..8e04633 --- /dev/null +++ b/tests/providers/parsers/test_invest_engine.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from datetime import datetime +from decimal import Decimal +from pathlib import Path + +from broker_sync.models import AccountType, ActivityType +from broker_sync.providers.parsers.invest_engine import parse_invest_engine_email + +_FIXTURES = Path(__file__).parent.parent.parent / "fixtures" / "invest_engine" + + +def _load(name: str) -> bytes: + return (_FIXTURES / name).read_bytes() + + +# -- RFC 2822 body (v2-style, single BUY) -- + + +def test_rfc2822_single_buy_parses_to_one_activity() -> None: + activities = parse_invest_engine_email(_load("rfc2822_v2_single_buy.eml")) + assert len(activities) == 1 + a = activities[0] + assert a.activity_type is ActivityType.BUY + assert a.symbol == "VUAG" + assert a.quantity == Decimal("59.539562") + assert a.unit_price == Decimal("60.46") + assert a.currency == "GBP" + assert a.date == datetime(2023, 1, 17) + assert a.account_id == "invest-engine-primary" + assert a.account_type is AccountType.ISA + + +def test_rfc2822_external_id_is_deterministic() -> None: + a1 = parse_invest_engine_email(_load("rfc2822_v2_single_buy.eml"))[0] + a2 = parse_invest_engine_email(_load("rfc2822_v2_single_buy.eml"))[0] + assert a1.external_id == a2.external_id + assert a1.external_id.startswith("invest-engine:") + + +def test_rfc2822_notes_record_parse_strategy() -> None: + a = parse_invest_engine_email(_load("rfc2822_v2_single_buy.eml"))[0] + assert a.notes is not None + assert "rfc2822" in a.notes diff --git a/tests/providers/test_invest_engine.py b/tests/providers/test_invest_engine.py new file mode 100644 index 0000000..742b196 --- /dev/null +++ b/tests/providers/test_invest_engine.py @@ -0,0 +1,488 @@ +from __future__ import annotations + +from collections.abc import Callable +from datetime import UTC, datetime, timedelta +from decimal import Decimal +from typing import Any + +import httpx +import pytest + +from broker_sync.models import AccountType, ActivityType +from broker_sync.providers.invest_engine import ( + InvestEngineError, + InvestEngineProvider, + InvestEngineTokenExpiredError, + InvestEngineVersionError, + _probe_version, + _transaction_to_activity, +) + +# -- helpers -- + + +def _future() -> datetime: + return datetime.now(UTC) + timedelta(days=30) + + +def _past() -> datetime: + return datetime.now(UTC) - timedelta(days=1) + + +def _client(handler: Callable[[httpx.Request], httpx.Response]) -> httpx.AsyncClient: + return httpx.AsyncClient( + base_url="https://investengine.com", + transport=httpx.MockTransport(handler), + ) + + +# -- version probe -- + + +async def test_probe_stops_at_first_live_version() -> None: + """v0.32 is live (401). Probe should return 32 without touching v0.33.""" + visited: list[str] = [] + + def handler(req: httpx.Request) -> httpx.Response: + visited.append(req.url.path) + return httpx.Response(401) + + async with _client(handler) as c: + minor = await _probe_version(c, start_minor=32) + assert minor == 32 + assert visited == ["/api/v0.32/"] + + +async def test_probe_skips_410_and_advances() -> None: + """v0.32 is Gone, v0.33 is live (401). Probe lands on 33.""" + visited: list[str] = [] + + def handler(req: httpx.Request) -> httpx.Response: + visited.append(req.url.path) + if "v0.32" in req.url.path: + return httpx.Response(410) + return httpx.Response(401) + + async with _client(handler) as c: + minor = await _probe_version(c, start_minor=32) + assert minor == 33 + assert visited == ["/api/v0.32/", "/api/v0.33/"] + + +async def test_probe_gives_up_after_max_minor() -> None: + """Every version 410s → explicit error rather than infinite loop.""" + + def handler(req: httpx.Request) -> httpx.Response: + return httpx.Response(410) + + async with _client(handler) as c: + with pytest.raises(InvestEngineVersionError): + await _probe_version(c, start_minor=32, max_minor=34) + + +# -- token expiry fail-fast -- + + +async def test_expired_token_raises_on_fetch() -> None: + """If expires_at is in the past, we fail before making any request.""" + + def handler(req: httpx.Request) -> httpx.Response: + raise AssertionError("should not have called the API") + + p = InvestEngineProvider( + bearer_token="x", + token_expires_at=_past(), + transport=httpx.MockTransport(handler), + ) + try: + with pytest.raises(InvestEngineTokenExpiredError): + async for _ in p.fetch(): + pass + finally: + await p.close() + + +# -- 401 during fetch -- + + +async def test_401_during_probe_is_live_version() -> None: + """401 on version-probe GET means version is live — we then request + the portfolios endpoint which, with a bad token, also 401s, and that + second 401 is what should surface as TokenExpired.""" + + def handler(req: httpx.Request) -> httpx.Response: + return httpx.Response(401) + + p = InvestEngineProvider( + bearer_token="dead-token", + token_expires_at=_future(), + transport=httpx.MockTransport(handler), + ) + try: + with pytest.raises(InvestEngineTokenExpiredError): + async for _ in p.fetch(): + pass + finally: + await p.close() + + +# -- headers -- + + +async def test_bearer_and_user_agent_headers_attached() -> None: + seen: list[tuple[str | None, str | None]] = [] + + def handler(req: httpx.Request) -> httpx.Response: + seen.append((req.headers.get("Authorization"), req.headers.get("User-Agent"))) + # Probe returns live; portfolios returns empty list shape. + if req.url.path.endswith("/portfolios/"): + return httpx.Response(200, json={"results": []}) + return httpx.Response(401) + + p = InvestEngineProvider( + bearer_token="abc123", + token_expires_at=_future(), + transport=httpx.MockTransport(handler), + ) + try: + async for _ in p.fetch(): + pass + finally: + await p.close() + # Probe + portfolios — both should carry the Bearer + UA. + assert len(seen) == 2 + for auth, ua in seen: + assert auth == "Bearer abc123" + assert ua is not None and "broker-sync" in ua + + +# -- accounts contract -- + + +def test_accounts_returns_single_isa() -> None: + p = InvestEngineProvider( + bearer_token="x", + token_expires_at=_future(), + ) + accs = p.accounts() + assert [a.id for a in accs] == ["invest-engine-primary"] + assert accs[0].account_type is AccountType.ISA + assert accs[0].currency == "GBP" + assert accs[0].provider == "invest-engine" + + +def test_provider_name() -> None: + assert InvestEngineProvider.name == "invest-engine" + + +# -- transaction → activity mapping -- +# +# The real IE response shape is UNVERIFIED (MFA blocks authed probes). +# These tests use best-guess shapes based on Django REST conventions. +# `_transaction_to_activity` is written defensively so alternative casings +# and common field names round-trip correctly. + + +def _mock_txn( + *, + txn_id: str = "txn-1", + txn_type: str = "BUY", + symbol: str = "VUAG", + quantity: str = "10", + price: str = "90.5", + amount: str = "905.00", + currency: str = "GBP", + date: str = "2026-04-01T10:00:00Z", +) -> dict[str, Any]: + return { + "id": txn_id, + "type": txn_type, + "symbol": symbol, + "quantity": quantity, + "price": price, + "amount": amount, + "currency": currency, + "date": date, + } + + +def test_buy_txn_becomes_buy_activity() -> None: + a = _transaction_to_activity(_mock_txn(txn_type="BUY")) + assert a is not None + assert a.activity_type is ActivityType.BUY + assert a.external_id == "invest-engine:txn-1" + assert a.account_id == "invest-engine-primary" + assert a.account_type is AccountType.ISA + assert a.symbol == "VUAG" + assert a.quantity == Decimal("10") + assert a.unit_price == Decimal("90.5") + assert a.currency == "GBP" + + +def test_sell_txn_becomes_sell_activity() -> None: + a = _transaction_to_activity(_mock_txn(txn_type="SELL", quantity="5")) + assert a is not None + assert a.activity_type is ActivityType.SELL + assert a.quantity == Decimal("5") + + +def test_dividend_txn_becomes_dividend_with_amount() -> None: + raw = _mock_txn(txn_type="DIVIDEND", amount="12.34") + raw.pop("quantity") + raw.pop("price") + a = _transaction_to_activity(raw) + assert a is not None + assert a.activity_type is ActivityType.DIVIDEND + assert a.amount == Decimal("12.34") + + +def test_deposit_txn_mapped() -> None: + raw = _mock_txn(txn_type="DEPOSIT", amount="500.00") + raw.pop("quantity") + raw.pop("price") + a = _transaction_to_activity(raw) + assert a is not None + assert a.activity_type is ActivityType.DEPOSIT + assert a.amount == Decimal("500.00") + + +def test_withdrawal_txn_mapped() -> None: + raw = _mock_txn(txn_type="WITHDRAWAL", amount="100.00") + raw.pop("quantity") + raw.pop("price") + a = _transaction_to_activity(raw) + assert a is not None + assert a.activity_type is ActivityType.WITHDRAWAL + + +def test_unknown_txn_type_is_skipped_with_warning(caplog: pytest.LogCaptureFixture, ) -> None: + raw = _mock_txn(txn_type="MYSTERY_EVENT") + a = _transaction_to_activity(raw) + assert a is None + assert any("MYSTERY_EVENT" in r.message for r in caplog.records) + + +def test_date_parsing_handles_z_suffix() -> None: + a = _transaction_to_activity(_mock_txn(date="2026-04-01T10:00:00Z")) + assert a is not None + assert a.date == datetime(2026, 4, 1, 10, 0, tzinfo=UTC) + + +def test_date_parsing_handles_offset_suffix() -> None: + a = _transaction_to_activity(_mock_txn(date="2026-04-01T10:00:00+00:00")) + assert a is not None + assert a.date == datetime(2026, 4, 1, 10, 0, tzinfo=UTC) + + +# -- end-to-end fetch (portfolios + transactions happy path) -- + + +async def test_fetch_enumerates_portfolios_and_transactions() -> None: + # Mock Django-REST-style paginated response with results + next. + portfolios = {"results": [{"id": 7, "name": "Viktor's ISA"}], "next": None} + dividend = _mock_txn(txn_id="t2", txn_type="DIVIDEND", amount="5.00") + dividend.pop("quantity") + dividend.pop("price") + transactions: dict[str, Any] = { + "results": [ + _mock_txn(txn_id="t1", txn_type="BUY"), + dividend, + ], + "next": None, + } + + visited: list[str] = [] + + def handler(req: httpx.Request) -> httpx.Response: + visited.append(req.url.path) + if req.url.path == "/api/v0.32/": + return httpx.Response(401) + if req.url.path == "/api/v0.32/portfolios/": + return httpx.Response(200, json=portfolios) + if req.url.path == "/api/v0.32/transactions/": + assert req.url.params.get("portfolio") == "7" + return httpx.Response(200, json=transactions) + raise AssertionError(f"unexpected path: {req.url.path}") + + p = InvestEngineProvider( + bearer_token="good-token", + token_expires_at=_future(), + transport=httpx.MockTransport(handler), + ) + try: + out = [a async for a in p.fetch()] + finally: + await p.close() + + assert [a.external_id for a in out] == [ + "invest-engine:t1", + "invest-engine:t2", + ] + + +async def test_fetch_supports_data_meta_pagination_shape() -> None: + """Defensive: handle the alternative {data, meta.next_page} shape too.""" + portfolios = {"data": [{"id": 9, "name": "ISA"}], "meta": {"next_page": None}} + transactions = { + "data": [_mock_txn(txn_id="dm1")], + "meta": { + "next_page": None + }, + } + + def handler(req: httpx.Request) -> httpx.Response: + if req.url.path == "/api/v0.32/": + return httpx.Response(401) + if req.url.path == "/api/v0.32/portfolios/": + return httpx.Response(200, json=portfolios) + if req.url.path == "/api/v0.32/transactions/": + return httpx.Response(200, json=transactions) + raise AssertionError(f"unexpected: {req.url.path}") + + p = InvestEngineProvider( + bearer_token="t", + token_expires_at=_future(), + transport=httpx.MockTransport(handler), + ) + try: + out = [a async for a in p.fetch()] + finally: + await p.close() + assert [a.external_id for a in out] == ["invest-engine:dm1"] + + +# -- since filter -- + + +async def test_since_drops_older_transactions() -> None: + txns = { + "results": [ + _mock_txn(txn_id="old", date="2020-01-01T00:00:00Z"), + _mock_txn(txn_id="new", date="2026-04-01T10:00:00Z"), + ], + "next": + None, + } + + def handler(req: httpx.Request) -> httpx.Response: + if req.url.path == "/api/v0.32/": + return httpx.Response(401) + if req.url.path == "/api/v0.32/portfolios/": + return httpx.Response(200, json={"results": [{"id": 1}]}) + return httpx.Response(200, json=txns) + + p = InvestEngineProvider( + bearer_token="t", + token_expires_at=_future(), + transport=httpx.MockTransport(handler), + ) + try: + since = datetime(2026, 1, 1, tzinfo=UTC) + out = [a async for a in p.fetch(since=since)] + finally: + await p.close() + assert [a.external_id for a in out] == ["invest-engine:new"] + + +# -- 401 on a data endpoint → TokenExpired -- + + +async def test_401_on_portfolios_triggers_token_expired() -> None: + + def handler(req: httpx.Request) -> httpx.Response: + if req.url.path == "/api/v0.32/": + return httpx.Response(401) + if req.url.path == "/api/v0.32/portfolios/": + return httpx.Response(401, json={"detail": "Invalid token"}) + raise AssertionError(f"unexpected: {req.url.path}") + + p = InvestEngineProvider( + bearer_token="stale", + token_expires_at=_future(), # clock says alive, server says dead + transport=httpx.MockTransport(handler), + ) + try: + with pytest.raises(InvestEngineTokenExpiredError): + async for _ in p.fetch(): + pass + finally: + await p.close() + + +# -- 410 on a data endpoint → one re-probe + retry -- + + +async def test_410_on_data_triggers_reprobe_and_retry() -> None: + # Scenario: probe lands on v0.32, then the portfolios call 410s because + # IE just rolled the version mid-session. We re-probe, find v0.33, and + # retry the call. We verify both versions are hit and we don't loop. + visited: list[str] = [] + portfolios_call_count = 0 + + def handler(req: httpx.Request) -> httpx.Response: + visited.append(req.url.path) + # Version probe endpoints: 32 was live when process started; new probe + # now shows 32 Gone and 33 live. + if req.url.path == "/api/v0.32/": + # First probe (process start) → live (401). + # Second probe (after 410) → Gone. + if "/api/v0.32/portfolios/" in [v for v in visited]: + return httpx.Response(410) + return httpx.Response(401) + if req.url.path == "/api/v0.33/": + return httpx.Response(401) + if req.url.path == "/api/v0.32/portfolios/": + nonlocal portfolios_call_count + portfolios_call_count += 1 + return httpx.Response(410, json={"detail": "Version Gone"}) + if req.url.path == "/api/v0.33/portfolios/": + return httpx.Response(200, json={"results": []}) + raise AssertionError(f"unexpected: {req.url.path}") + + p = InvestEngineProvider( + bearer_token="t", + token_expires_at=_future(), + transport=httpx.MockTransport(handler), + ) + try: + out = [a async for a in p.fetch()] + finally: + await p.close() + + assert out == [] + assert "/api/v0.32/portfolios/" in visited + assert "/api/v0.33/" in visited + assert "/api/v0.33/portfolios/" in visited + # Exactly one 410 on v0.32/portfolios/; no repeat loop. + assert portfolios_call_count == 1 + + +# -- integration stub -- + + +@pytest.mark.skip(reason="needs live token — flip on manually") +async def test_live_integration_smoke() -> None: # pragma: no cover + """Real API smoke test. Enable manually after Viktor pastes a token.""" + import os + + token = os.environ.get("IE_BEARER_TOKEN") + if not token: + pytest.skip("IE_BEARER_TOKEN not set") + p = InvestEngineProvider( + bearer_token=token, + token_expires_at=_future(), + ) + try: + out = [a async for a in p.fetch(since=_past())] + finally: + await p.close() + # No assertions on content yet — just proves a live round-trip works. + assert isinstance(out, list) + + +# -- smoke check InvestEngineError is public -- + + +def test_error_types_public() -> None: + assert issubclass(InvestEngineTokenExpiredError, InvestEngineError) + assert issubclass(InvestEngineVersionError, InvestEngineError) diff --git a/tests/test_cli.py b/tests/test_cli.py index d05a3e7..b510912 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,3 +1,7 @@ +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + from typer.testing import CliRunner from broker_sync import __version__ @@ -10,3 +14,64 @@ def test_version_prints_package_version() -> None: result = runner.invoke(app, ["version"]) assert result.exit_code == 0 assert __version__ in result.stdout + + +# -- invest-engine CLI -- + + +def _future_iso() -> str: + return (datetime.now(UTC) + timedelta(days=30)).isoformat() + + +def _past_iso() -> str: + return (datetime.now(UTC) - timedelta(days=1)).isoformat() + + +def test_invest_engine_expired_token_exits_2() -> None: + """Guard against burning a request on a token the user already knows is dead.""" + result = runner.invoke( + app, + ["invest-engine"], + env={ + "WF_BASE_URL": "https://wf.example.com", + "WF_USERNAME": "u", + "WF_PASSWORD": "p", + "IE_BEARER_TOKEN": "anything", + "IE_TOKEN_EXPIRES_AT": _past_iso(), + "BROKER_SYNC_DATA_DIR": "/tmp", + }, + ) + assert result.exit_code == 2, result.output + assert "expired" in result.output.lower() or "token" in result.output.lower() + + +def test_invest_engine_unknown_mode_exits_2() -> None: + result = runner.invoke( + app, + ["invest-engine", "--mode", "nonsense"], + env={ + "WF_BASE_URL": "https://wf.example.com", + "WF_USERNAME": "u", + "WF_PASSWORD": "p", + "IE_BEARER_TOKEN": "t", + "IE_TOKEN_EXPIRES_AT": _future_iso(), + "BROKER_SYNC_DATA_DIR": "/tmp", + }, + ) + assert result.exit_code == 2 + + +def test_invest_engine_malformed_expires_exits_2() -> None: + result = runner.invoke( + app, + ["invest-engine"], + env={ + "WF_BASE_URL": "https://wf.example.com", + "WF_USERNAME": "u", + "WF_PASSWORD": "p", + "IE_BEARER_TOKEN": "t", + "IE_TOKEN_EXPIRES_AT": "not-an-iso-date", + "BROKER_SYNC_DATA_DIR": "/tmp", + }, + ) + assert result.exit_code == 2