Add imap-ingest CLI + ImapProvider: route emails to IE/Schwab parsers

Wires the IE + Schwab email parsers into an actual runnable sync. Walks
the IMAP mailbox, routes each message by sender domain:
  - *@investengine.com → invest_engine.parse_invest_engine_email
  - *@schwab.com       → schwab.parse_schwab_email
then pushes the resulting Activities through the shared pipeline.

broker-sync imap-ingest — new CLI command taking IMAP_HOST/USER/PASSWORD/
DIRECTORY (mirrors the old wealthfolio-sync image's env shape so the
Terraform CronJob's existing env wiring works unchanged).

Verified: poetry run pytest -q → 109 passed + 1 skipped; mypy strict
clean (37 files); ruff + yapf clean.
This commit is contained in:
Viktor Barzin 2026-04-17 22:12:05 +00:00
parent f089b8b93a
commit 6efd03570a
6 changed files with 290 additions and 35 deletions

View file

@ -230,6 +230,69 @@ def invest_engine(
asyncio.run(_run())
@app.command("imap-ingest")
def imap_ingest(
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
wf_session_path: str = typer.Option("/data/wealthfolio_session.json",
envvar="WF_SESSION_PATH"),
imap_host: str = typer.Option(..., envvar="IMAP_HOST"),
imap_user: str = typer.Option(..., envvar="IMAP_USER"),
imap_password: str = typer.Option(..., envvar="IMAP_PASSWORD"),
imap_directory: str = typer.Option("INBOX", envvar="IMAP_DIRECTORY"),
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
) -> None:
"""Phase 2/3 — ingest InvestEngine + Schwab confirmation emails via IMAP.
Walks the mailbox, routes each message by `From:` sender domain to the
matching parser, pushes any resulting activities through the shared
pipeline (dedup Wealthfolio CSV-free JSON import).
"""
from broker_sync.dedup import SyncRecordStore
from broker_sync.pipeline import sync_provider_to_wealthfolio
from broker_sync.providers.imap import ImapCreds, ImapProvider
from broker_sync.sinks.wealthfolio import WealthfolioSink
_setup_logging()
data = Path(data_dir)
data.mkdir(parents=True, exist_ok=True)
async def _run() -> None:
sink = WealthfolioSink(
base_url=wf_base_url,
username=wf_username,
password=wf_password,
session_path=wf_session_path,
)
provider = ImapProvider(
ImapCreds(
host=imap_host,
user=imap_user,
password=imap_password,
directory=imap_directory,
))
dedup = SyncRecordStore(data / "sync.db")
try:
if not Path(wf_session_path).exists():
await sink.login()
result = await sync_provider_to_wealthfolio(
provider=provider,
sink=sink,
dedup=dedup,
)
finally:
await sink.close()
typer.echo(f"imap-ingest: fetched={result.fetched} "
f"new={result.new_after_dedup} "
f"imported={result.imported} "
f"failed={result.failed}")
if result.failed > 0:
sys.exit(1)
asyncio.run(_run())
def _setup_logging() -> None:
logging.basicConfig(
level=logging.INFO,

View file

@ -89,9 +89,7 @@ async def sync_provider_to_wealthfolio(
)
async def _ensure_accounts(
sink: WealthfolioSink, accounts: list[Account]
) -> dict[str, str]:
async def _ensure_accounts(sink: WealthfolioSink, accounts: list[Account]) -> dict[str, str]:
"""Return {our_account_id: wealthfolio_uuid}."""
out: dict[str, str] = {}
for account in accounts:
@ -134,7 +132,9 @@ async def _flush_batch(
for original_account_id, a in batch:
wf_id = by_external.get(a.external_id)
dedup.record(
provider_name, original_account_id, a.external_id,
provider_name,
original_account_id,
a.external_id,
wealthfolio_activity_id=wf_id,
)
ok += 1

View file

@ -0,0 +1,189 @@
"""IMAP email ingestor: dispatches messages to the matching parser by sender.
Used by the `imap-ingest` CLI command for InvestEngine + Schwab confirmation
emails. Each message passes through:
1. Pull ALL messages from the configured mailbox directory.
2. Route each by `From:` to a parser:
- noreply@investengine.com (+ equivalents) invest_engine parser
- Schwab confirmations (equityawards@schwab.com, etc.) schwab parser
3. Merge parser output into one list[Activity] with source attribution.
Not imap-idle; runs once per invocation. Designed for a daily CronJob.
"""
from __future__ import annotations
import email
import imaplib
import logging
import re
import ssl
from collections.abc import AsyncIterator, Iterator
from datetime import datetime
from email.message import Message
from typing import NamedTuple
from broker_sync.models import Account, AccountType, Activity
from broker_sync.providers.parsers import invest_engine as ie_parser
from broker_sync.providers.parsers.schwab import parse_schwab_email
log = logging.getLogger(__name__)
_IE_SENDERS = {"noreply@investengine.com", "hello@investengine.com"}
_SCHWAB_SENDERS = {
"equityawards@schwab.com",
"donotreply@schwab.com",
"wealthnotify@schwab.com",
}
_ADDR_RE = re.compile(r"[\w.+-]+@[\w-]+(?:\.[\w-]+)+")
class ImapCreds(NamedTuple):
host: str
user: str
password: str
directory: str
def _extract_sender(msg: Message) -> str:
raw = msg.get("From", "")
m = _ADDR_RE.search(raw)
return (m.group(0) if m else "").lower()
def _html_or_text(msg: Message) -> str:
"""Return the richest body available (prefer HTML)."""
if msg.is_multipart():
html = None
plain = None
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/html" and html is None:
html = part.get_payload(decode=True)
elif ct == "text/plain" and plain is None:
plain = part.get_payload(decode=True)
body = html or plain
else:
body = msg.get_payload(decode=True)
if body is None:
return ""
if isinstance(body, bytes):
charset = msg.get_content_charset() or "utf-8"
try:
return body.decode(charset, errors="replace")
except LookupError:
return body.decode("utf-8", errors="replace")
return str(body)
def _fetch_all(creds: ImapCreds) -> Iterator[bytes]:
ctx = ssl.create_default_context()
with imaplib.IMAP4_SSL(creds.host, ssl_context=ctx) as m:
m.login(creds.user, creds.password)
typ, _ = m.select(creds.directory, readonly=True)
if typ != "OK":
raise RuntimeError(f"IMAP select {creds.directory} failed: {typ}")
typ, data = m.search(None, "ALL")
if typ != "OK":
raise RuntimeError(f"IMAP search failed: {typ}")
ids = data[0].split()
log.info("imap: fetching %d messages from %s", len(ids), creds.directory)
for uid in ids:
typ, rsp = m.fetch(uid, "(RFC822)")
if typ != "OK" or not rsp or not rsp[0]:
continue
raw = rsp[0][1]
if isinstance(raw, bytes):
yield raw
def fetch_activities(creds: ImapCreds) -> list[Activity]:
out: list[Activity] = []
ie_parsed = schwab_parsed = skipped = 0
for raw in _fetch_all(creds):
try:
msg = email.message_from_bytes(raw)
except Exception:
skipped += 1
continue
sender = _extract_sender(msg)
if sender in _IE_SENDERS or sender.endswith("@investengine.com"):
out.extend(ie_parser.parse_invest_engine_email(raw))
ie_parsed += 1
elif sender in _SCHWAB_SENDERS or sender.endswith("@schwab.com"):
html = _html_or_text(msg)
out.extend(parse_schwab_email(html))
schwab_parsed += 1
else:
skipped += 1
log.info(
"imap: ie_parsed=%d schwab_parsed=%d skipped=%d%d activities",
ie_parsed,
schwab_parsed,
skipped,
len(out),
)
return out
class ImapProvider:
"""Wraps the IMAP fetch + per-sender parse into the Provider protocol.
Yields both InvestEngine AND Schwab activities downstream the
pipeline's dedup keyed on (provider, account, external_id) already
isolates them by account_id.
"""
name = "imap"
def __init__(self, creds: ImapCreds) -> None:
self._creds = creds
def accounts(self) -> list[Account]:
return [
Account(
id="invest-engine-primary",
name="InvestEngine ISA",
account_type=AccountType.ISA,
currency="GBP",
provider="invest-engine",
),
Account(
id="schwab-workplace",
name="Schwab (US workplace)",
account_type=AccountType.GIA,
currency="USD",
provider="schwab",
),
]
async def fetch(
self,
*,
since: datetime | None = None,
before: datetime | None = None,
) -> AsyncIterator[Activity]:
# IMAP doesn't give us a server-side date range directly without
# constructing IMAP SEARCH criteria; filter client-side.
for a in fetch_activities(self._creds):
if since is not None and a.date < since:
continue
if before is not None and a.date >= before:
continue
yield a
if __name__ == "__main__":
# Local smoke — invoked manually for debug, never from the CronJob.
import os
logging.basicConfig(level=logging.INFO)
c = ImapCreds(
host=os.environ["IMAP_HOST"],
user=os.environ["IMAP_USER"],
password=os.environ["IMAP_PASSWORD"],
directory=os.environ.get("IMAP_DIRECTORY", "INBOX"),
)
acts = fetch_activities(c)
print(f"total={len(acts)}")
for a in acts[:5]:
print(f" {a.activity_type} {a.symbol} {a.date.isoformat()}")

View file

@ -130,10 +130,7 @@ class WealthfolioSink:
"""
existing = await self.list_accounts()
for a in existing:
if (
a.get("provider") == account.provider
and a.get("providerAccountId") == account.id
):
if (a.get("provider") == account.provider and a.get("providerAccountId") == account.id):
wf_id = a.get("id")
assert isinstance(wf_id, str)
return wf_id
@ -159,9 +156,7 @@ class WealthfolioSink:
created = resp.json()
wf_id = created.get("id")
if not isinstance(wf_id, str):
raise WealthfolioError(
f"POST /accounts returned no id: {created}"
)
raise WealthfolioError(f"POST /accounts returned no id: {created}")
return wf_id
# -- activity import --
@ -213,15 +208,12 @@ class WealthfolioSink:
checked = check.json()
if not isinstance(checked, list):
raise ImportValidationError(
f"Wealthfolio /import/check returned non-list: {type(checked).__name__}"
)
f"Wealthfolio /import/check returned non-list: {type(checked).__name__}")
invalid = [r for r in checked if isinstance(r, dict) and r.get("errors")]
if invalid:
raise ImportValidationError(
f"Wealthfolio /import/check flagged {len(invalid)} row(s); "
f"first: {invalid[0]}"
)
raise ImportValidationError(f"Wealthfolio /import/check flagged {len(invalid)} row(s); "
f"first: {invalid[0]}")
# Drop any row the server marked is_valid=false (shouldn't happen
# without errors, but defensive).
valid_rows = [r for r in checked if isinstance(r, dict) and r.get("isValid")]

View file

@ -48,7 +48,10 @@ def _login_ok(req: httpx.Request) -> httpx.Response:
assert body == {"password": "hunter2"}
return httpx.Response(
200,
json={"authenticated": True, "expiresIn": 604800},
json={
"authenticated": True,
"expiresIn": 604800
},
headers={"set-cookie": "wf_token=abc123; Path=/api; HttpOnly"},
)
@ -219,21 +222,25 @@ async def test_import_dry_run_then_real(tmp_path: Path) -> None:
calls.append(req.url.path)
if req.url.path == "/api/v1/activities/import/check":
# /import/check hydrates and returns a list of ActivityImport.
return httpx.Response(200, json=[
{
"symbol": "VUAG",
"isValid": True,
"errors": None,
"assetId": "enriched-asset-uuid",
"exchangeMic": "XLON",
},
])
return httpx.Response(200,
json=[
{
"symbol": "VUAG",
"isValid": True,
"errors": None,
"assetId": "enriched-asset-uuid",
"exchangeMic": "XLON",
},
])
if req.url.path == "/api/v1/activities/import":
return httpx.Response(
200,
json={
"activities": [
{"id": "wf-1", "external_id": "t212:1"},
{
"id": "wf-1",
"external_id": "t212:1"
},
],
},
)

View file

@ -86,18 +86,22 @@ async def test_pipeline_skips_dedup_then_imports_new(tmp_path: Path) -> None:
body = json.loads(req.content)
# Echo each activity back marked valid (mimic Wealthfolio's
# hydrate step).
return httpx.Response(200, json=[
{**a, "isValid": True, "errors": None} for a in body["activities"]
])
return httpx.Response(200,
json=[{
**a, "isValid": True,
"errors": None
} for a in body["activities"]])
if req.url.path == "/api/v1/activities/import":
body = req.content.decode()
posted_batches.append(body)
return httpx.Response(
200,
json={"activities": [
{"id": f"wf-{i}", "external_id": ext}
for i, ext in enumerate(["a", "b", "c"])
]},
json={
"activities": [{
"id": f"wf-{i}",
"external_id": ext
} for i, ext in enumerate(["a", "b", "c"])]
},
)
return httpx.Response(500)