diff --git a/broker_sync/cli.py b/broker_sync/cli.py index af5b08a..ea7d8c9 100644 --- a/broker_sync/cli.py +++ b/broker_sync/cli.py @@ -230,6 +230,69 @@ def invest_engine( asyncio.run(_run()) +@app.command("imap-ingest") +def imap_ingest( + wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), + wf_username: str = typer.Option(..., envvar="WF_USERNAME"), + wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), + wf_session_path: str = typer.Option("/data/wealthfolio_session.json", + envvar="WF_SESSION_PATH"), + imap_host: str = typer.Option(..., envvar="IMAP_HOST"), + imap_user: str = typer.Option(..., envvar="IMAP_USER"), + imap_password: str = typer.Option(..., envvar="IMAP_PASSWORD"), + imap_directory: str = typer.Option("INBOX", envvar="IMAP_DIRECTORY"), + data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"), +) -> None: + """Phase 2/3 — ingest InvestEngine + Schwab confirmation emails via IMAP. + + Walks the mailbox, routes each message by `From:` sender domain to the + matching parser, pushes any resulting activities through the shared + pipeline (dedup → Wealthfolio CSV-free JSON import). + """ + from broker_sync.dedup import SyncRecordStore + from broker_sync.pipeline import sync_provider_to_wealthfolio + from broker_sync.providers.imap import ImapCreds, ImapProvider + from broker_sync.sinks.wealthfolio import WealthfolioSink + + _setup_logging() + data = Path(data_dir) + data.mkdir(parents=True, exist_ok=True) + + async def _run() -> None: + sink = WealthfolioSink( + base_url=wf_base_url, + username=wf_username, + password=wf_password, + session_path=wf_session_path, + ) + provider = ImapProvider( + ImapCreds( + host=imap_host, + user=imap_user, + password=imap_password, + directory=imap_directory, + )) + dedup = SyncRecordStore(data / "sync.db") + try: + if not Path(wf_session_path).exists(): + await sink.login() + result = await sync_provider_to_wealthfolio( + provider=provider, + sink=sink, + dedup=dedup, + ) + finally: + await sink.close() + typer.echo(f"imap-ingest: fetched={result.fetched} " + f"new={result.new_after_dedup} " + f"imported={result.imported} " + f"failed={result.failed}") + if result.failed > 0: + sys.exit(1) + + asyncio.run(_run()) + + def _setup_logging() -> None: logging.basicConfig( level=logging.INFO, diff --git a/broker_sync/pipeline.py b/broker_sync/pipeline.py index 12caca7..7921934 100644 --- a/broker_sync/pipeline.py +++ b/broker_sync/pipeline.py @@ -89,9 +89,7 @@ async def sync_provider_to_wealthfolio( ) -async def _ensure_accounts( - sink: WealthfolioSink, accounts: list[Account] -) -> dict[str, str]: +async def _ensure_accounts(sink: WealthfolioSink, accounts: list[Account]) -> dict[str, str]: """Return {our_account_id: wealthfolio_uuid}.""" out: dict[str, str] = {} for account in accounts: @@ -134,7 +132,9 @@ async def _flush_batch( for original_account_id, a in batch: wf_id = by_external.get(a.external_id) dedup.record( - provider_name, original_account_id, a.external_id, + provider_name, + original_account_id, + a.external_id, wealthfolio_activity_id=wf_id, ) ok += 1 diff --git a/broker_sync/providers/imap.py b/broker_sync/providers/imap.py new file mode 100644 index 0000000..de46aa9 --- /dev/null +++ b/broker_sync/providers/imap.py @@ -0,0 +1,189 @@ +"""IMAP email ingestor: dispatches messages to the matching parser by sender. + +Used by the `imap-ingest` CLI command for InvestEngine + Schwab confirmation +emails. Each message passes through: + +1. Pull ALL messages from the configured mailbox directory. +2. Route each by `From:` to a parser: + - noreply@investengine.com (+ equivalents) → invest_engine parser + - Schwab confirmations (equityawards@schwab.com, etc.) → schwab parser +3. Merge parser output into one list[Activity] with source attribution. + +Not imap-idle; runs once per invocation. Designed for a daily CronJob. +""" +from __future__ import annotations + +import email +import imaplib +import logging +import re +import ssl +from collections.abc import AsyncIterator, Iterator +from datetime import datetime +from email.message import Message +from typing import NamedTuple + +from broker_sync.models import Account, AccountType, Activity +from broker_sync.providers.parsers import invest_engine as ie_parser +from broker_sync.providers.parsers.schwab import parse_schwab_email + +log = logging.getLogger(__name__) + +_IE_SENDERS = {"noreply@investengine.com", "hello@investengine.com"} +_SCHWAB_SENDERS = { + "equityawards@schwab.com", + "donotreply@schwab.com", + "wealthnotify@schwab.com", +} + +_ADDR_RE = re.compile(r"[\w.+-]+@[\w-]+(?:\.[\w-]+)+") + + +class ImapCreds(NamedTuple): + host: str + user: str + password: str + directory: str + + +def _extract_sender(msg: Message) -> str: + raw = msg.get("From", "") + m = _ADDR_RE.search(raw) + return (m.group(0) if m else "").lower() + + +def _html_or_text(msg: Message) -> str: + """Return the richest body available (prefer HTML).""" + if msg.is_multipart(): + html = None + plain = None + for part in msg.walk(): + ct = part.get_content_type() + if ct == "text/html" and html is None: + html = part.get_payload(decode=True) + elif ct == "text/plain" and plain is None: + plain = part.get_payload(decode=True) + body = html or plain + else: + body = msg.get_payload(decode=True) + if body is None: + return "" + if isinstance(body, bytes): + charset = msg.get_content_charset() or "utf-8" + try: + return body.decode(charset, errors="replace") + except LookupError: + return body.decode("utf-8", errors="replace") + return str(body) + + +def _fetch_all(creds: ImapCreds) -> Iterator[bytes]: + ctx = ssl.create_default_context() + with imaplib.IMAP4_SSL(creds.host, ssl_context=ctx) as m: + m.login(creds.user, creds.password) + typ, _ = m.select(creds.directory, readonly=True) + if typ != "OK": + raise RuntimeError(f"IMAP select {creds.directory} failed: {typ}") + typ, data = m.search(None, "ALL") + if typ != "OK": + raise RuntimeError(f"IMAP search failed: {typ}") + ids = data[0].split() + log.info("imap: fetching %d messages from %s", len(ids), creds.directory) + for uid in ids: + typ, rsp = m.fetch(uid, "(RFC822)") + if typ != "OK" or not rsp or not rsp[0]: + continue + raw = rsp[0][1] + if isinstance(raw, bytes): + yield raw + + +def fetch_activities(creds: ImapCreds) -> list[Activity]: + out: list[Activity] = [] + ie_parsed = schwab_parsed = skipped = 0 + for raw in _fetch_all(creds): + try: + msg = email.message_from_bytes(raw) + except Exception: + skipped += 1 + continue + sender = _extract_sender(msg) + if sender in _IE_SENDERS or sender.endswith("@investengine.com"): + out.extend(ie_parser.parse_invest_engine_email(raw)) + ie_parsed += 1 + elif sender in _SCHWAB_SENDERS or sender.endswith("@schwab.com"): + html = _html_or_text(msg) + out.extend(parse_schwab_email(html)) + schwab_parsed += 1 + else: + skipped += 1 + log.info( + "imap: ie_parsed=%d schwab_parsed=%d skipped=%d → %d activities", + ie_parsed, + schwab_parsed, + skipped, + len(out), + ) + return out + + +class ImapProvider: + """Wraps the IMAP fetch + per-sender parse into the Provider protocol. + + Yields both InvestEngine AND Schwab activities — downstream the + pipeline's dedup keyed on (provider, account, external_id) already + isolates them by account_id. + """ + name = "imap" + + def __init__(self, creds: ImapCreds) -> None: + self._creds = creds + + def accounts(self) -> list[Account]: + return [ + Account( + id="invest-engine-primary", + name="InvestEngine ISA", + account_type=AccountType.ISA, + currency="GBP", + provider="invest-engine", + ), + Account( + id="schwab-workplace", + name="Schwab (US workplace)", + account_type=AccountType.GIA, + currency="USD", + provider="schwab", + ), + ] + + async def fetch( + self, + *, + since: datetime | None = None, + before: datetime | None = None, + ) -> AsyncIterator[Activity]: + # IMAP doesn't give us a server-side date range directly without + # constructing IMAP SEARCH criteria; filter client-side. + for a in fetch_activities(self._creds): + if since is not None and a.date < since: + continue + if before is not None and a.date >= before: + continue + yield a + + +if __name__ == "__main__": + # Local smoke — invoked manually for debug, never from the CronJob. + import os + logging.basicConfig(level=logging.INFO) + c = ImapCreds( + host=os.environ["IMAP_HOST"], + user=os.environ["IMAP_USER"], + password=os.environ["IMAP_PASSWORD"], + directory=os.environ.get("IMAP_DIRECTORY", "INBOX"), + ) + acts = fetch_activities(c) + print(f"total={len(acts)}") + for a in acts[:5]: + print(f" {a.activity_type} {a.symbol} {a.date.isoformat()}") diff --git a/broker_sync/sinks/wealthfolio.py b/broker_sync/sinks/wealthfolio.py index f82817f..47881db 100644 --- a/broker_sync/sinks/wealthfolio.py +++ b/broker_sync/sinks/wealthfolio.py @@ -130,10 +130,7 @@ class WealthfolioSink: """ existing = await self.list_accounts() for a in existing: - if ( - a.get("provider") == account.provider - and a.get("providerAccountId") == account.id - ): + if (a.get("provider") == account.provider and a.get("providerAccountId") == account.id): wf_id = a.get("id") assert isinstance(wf_id, str) return wf_id @@ -159,9 +156,7 @@ class WealthfolioSink: created = resp.json() wf_id = created.get("id") if not isinstance(wf_id, str): - raise WealthfolioError( - f"POST /accounts returned no id: {created}" - ) + raise WealthfolioError(f"POST /accounts returned no id: {created}") return wf_id # -- activity import -- @@ -213,15 +208,12 @@ class WealthfolioSink: checked = check.json() if not isinstance(checked, list): raise ImportValidationError( - f"Wealthfolio /import/check returned non-list: {type(checked).__name__}" - ) + f"Wealthfolio /import/check returned non-list: {type(checked).__name__}") invalid = [r for r in checked if isinstance(r, dict) and r.get("errors")] if invalid: - raise ImportValidationError( - f"Wealthfolio /import/check flagged {len(invalid)} row(s); " - f"first: {invalid[0]}" - ) + raise ImportValidationError(f"Wealthfolio /import/check flagged {len(invalid)} row(s); " + f"first: {invalid[0]}") # Drop any row the server marked is_valid=false (shouldn't happen # without errors, but defensive). valid_rows = [r for r in checked if isinstance(r, dict) and r.get("isValid")] diff --git a/tests/sinks/test_wealthfolio.py b/tests/sinks/test_wealthfolio.py index f554a19..210b915 100644 --- a/tests/sinks/test_wealthfolio.py +++ b/tests/sinks/test_wealthfolio.py @@ -48,7 +48,10 @@ def _login_ok(req: httpx.Request) -> httpx.Response: assert body == {"password": "hunter2"} return httpx.Response( 200, - json={"authenticated": True, "expiresIn": 604800}, + json={ + "authenticated": True, + "expiresIn": 604800 + }, headers={"set-cookie": "wf_token=abc123; Path=/api; HttpOnly"}, ) @@ -219,21 +222,25 @@ async def test_import_dry_run_then_real(tmp_path: Path) -> None: calls.append(req.url.path) if req.url.path == "/api/v1/activities/import/check": # /import/check hydrates and returns a list of ActivityImport. - return httpx.Response(200, json=[ - { - "symbol": "VUAG", - "isValid": True, - "errors": None, - "assetId": "enriched-asset-uuid", - "exchangeMic": "XLON", - }, - ]) + return httpx.Response(200, + json=[ + { + "symbol": "VUAG", + "isValid": True, + "errors": None, + "assetId": "enriched-asset-uuid", + "exchangeMic": "XLON", + }, + ]) if req.url.path == "/api/v1/activities/import": return httpx.Response( 200, json={ "activities": [ - {"id": "wf-1", "external_id": "t212:1"}, + { + "id": "wf-1", + "external_id": "t212:1" + }, ], }, ) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 198e58b..481c4d7 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -86,18 +86,22 @@ async def test_pipeline_skips_dedup_then_imports_new(tmp_path: Path) -> None: body = json.loads(req.content) # Echo each activity back marked valid (mimic Wealthfolio's # hydrate step). - return httpx.Response(200, json=[ - {**a, "isValid": True, "errors": None} for a in body["activities"] - ]) + return httpx.Response(200, + json=[{ + **a, "isValid": True, + "errors": None + } for a in body["activities"]]) if req.url.path == "/api/v1/activities/import": body = req.content.decode() posted_batches.append(body) return httpx.Response( 200, - json={"activities": [ - {"id": f"wf-{i}", "external_id": ext} - for i, ext in enumerate(["a", "b", "c"]) - ]}, + json={ + "activities": [{ + "id": f"wf-{i}", + "external_id": ext + } for i, ext in enumerate(["a", "b", "c"])] + }, ) return httpx.Response(500)