broker-sync/broker_sync/providers/imap.py
Viktor Barzin 0d23487608
Some checks failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled
CI / deploy (push) Has been cancelled
ci/woodpecker/push/build Pipeline was successful
imap: skip InvestEngine by default; opt back in via INCLUDE env
Post-mortem 2026-05-27: 39 IMAP-source IE BUYs + their cash-flow
DEPOSITs were re-inserted into Wealthfolio at 09:22:18 UTC, exactly
the rows the £252k dedup removed the previous day. The cron's
BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS=invest-engine env var did its job
(cron logged ie_skipped=53), but some other entry point — kubectl run,
poetry run on the devvm, or a sibling agent session — ran the IMAP
ingest WITHOUT that env. The opt-out was a foot-gun.

This change makes the IE-via-IMAP safety STRUCTURAL: `invest-engine`
is in the default exclude set inside _resolve_excluded_providers().
Any code path now skips IE unless the caller explicitly sets
`BROKER_SYNC_IMAP_INCLUDE_PROVIDERS=invest-engine`. The
`BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS` env still works (additive) for
forward-compat in case Schwab etc. ever need similar treatment.

INCLUDE wins over both the default exclude set and EXCLUDE env.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-27 17:24:54 +00:00

297 lines
10 KiB
Python

"""IMAP email ingestor: dispatches messages to the matching parser by sender.
Used by the `imap-ingest` CLI command for InvestEngine + Schwab confirmation
emails. Each message passes through:
1. Pull ALL messages from the configured mailbox directory.
2. Route each by `From:` to a parser:
- noreply@investengine.com (+ equivalents) → invest_engine parser
- Schwab confirmations (equityawards@schwab.com, etc.) → schwab parser
3. Merge parser output into one list[Activity] with source attribution.
Not imap-idle; runs once per invocation. Designed for a daily CronJob.
"""
from __future__ import annotations
import email
import imaplib
import logging
import os
import re
import ssl
from collections.abc import AsyncIterator, Iterator
from datetime import date, datetime
from decimal import Decimal
from email.message import Message
from typing import NamedTuple
from broker_sync.models import Account, AccountType, Activity, ActivityType
from broker_sync.providers.parsers import invest_engine as ie_parser
from broker_sync.providers.parsers.schwab import parse_schwab_email
_IE_ISA_ACCOUNT_ID = "invest-engine-primary"
_IE_GIA_ACCOUNT_ID = "invest-engine-gia"
_ISA_ANNUAL_CAP = Decimal("20000")
_UK_TAX_YEAR_START = (4, 6) # (month, day) — UK tax year starts 6 April
def _uk_tax_year_start(d: datetime) -> date:
"""Return the start date (6 April of year N) of the UK tax year containing `d`."""
month, day = _UK_TAX_YEAR_START
cutoff = date(d.year, month, day)
return cutoff if d.date() >= cutoff else date(d.year - 1, month, day)
def _split_ie_by_isa_cap(
activities: list[Activity],
*,
isa_cap: Decimal = _ISA_ANNUAL_CAP,
) -> list[Activity]:
"""Re-route IE BUYs: first `isa_cap` GBP of each UK tax year → ISA, rest → GIA.
Viktor's IE account has both an ISA and a GIA wrapper, and his trade
confirmation emails don't indicate which one a given buy hit. Empirically,
he fills the ISA allowance first each tax year (6 April) and any excess
lands in GIA. This function partitions an already-parsed batch of Activity
objects by that rule.
Rule for boundary buys: a BUY is assigned to ISA iff the running tax-year
total BEFORE it is still strictly below the cap; otherwise GIA. Whole-
activity assignment — no fractional splits.
Non-IE activities and non-BUYs are passed through unchanged.
"""
ie_buys = [
a for a in activities
if a.account_id == _IE_ISA_ACCOUNT_ID and a.activity_type is ActivityType.BUY
]
ie_buys.sort(key=lambda a: a.date)
cumulative: dict[date, Decimal] = {}
for a in ie_buys:
ty = _uk_tax_year_start(a.date)
running = cumulative.get(ty, Decimal(0))
trade_value = (a.quantity or Decimal(0)) * (a.unit_price or Decimal(0))
if running < isa_cap:
a.account_id = _IE_ISA_ACCOUNT_ID
a.account_type = AccountType.ISA
else:
a.account_id = _IE_GIA_ACCOUNT_ID
a.account_type = AccountType.GIA
cumulative[ty] = running + trade_value
return activities
log = logging.getLogger(__name__)
_IE_SENDERS = {"noreply@investengine.com", "hello@investengine.com"}
_SCHWAB_SENDERS = {
"equityawards@schwab.com",
"donotreply@schwab.com",
"wealthnotify@schwab.com",
}
_ADDR_RE = re.compile(r"[\w.+-]+@[\w-]+(?:\.[\w-]+)+")
class ImapCreds(NamedTuple):
host: str
user: str
password: str
directory: str
def _extract_sender(msg: Message) -> str:
raw = msg.get("From", "")
m = _ADDR_RE.search(raw)
return (m.group(0) if m else "").lower()
def _html_or_text(msg: Message) -> str:
"""Return the richest body available (prefer HTML)."""
if msg.is_multipart():
html = None
plain = None
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/html" and html is None:
html = part.get_payload(decode=True)
elif ct == "text/plain" and plain is None:
plain = part.get_payload(decode=True)
body = html or plain
else:
body = msg.get_payload(decode=True)
if body is None:
return ""
if isinstance(body, bytes):
charset = msg.get_content_charset() or "utf-8"
try:
return body.decode(charset, errors="replace")
except LookupError:
return body.decode("utf-8", errors="replace")
return str(body)
def _fetch_all(creds: ImapCreds) -> Iterator[bytes]:
ctx = ssl.create_default_context()
with imaplib.IMAP4_SSL(creds.host, ssl_context=ctx) as m:
m.login(creds.user, creds.password)
typ, _ = m.select(creds.directory, readonly=True)
if typ != "OK":
raise RuntimeError(f"IMAP select {creds.directory} failed: {typ}")
typ, data = m.search(None, "ALL")
if typ != "OK":
raise RuntimeError(f"IMAP search failed: {typ}")
ids = data[0].split()
log.info("imap: fetching %d messages from %s", len(ids), creds.directory)
for uid in ids:
typ, rsp = m.fetch(uid, "(RFC822)")
if typ != "OK" or not rsp or not rsp[0]:
continue
raw = rsp[0][1]
if isinstance(raw, bytes):
yield raw
def _resolve_excluded_providers() -> set[str]:
"""Return the set of providers the IMAP fetcher must skip.
Default-exclude list is structural — `invest-engine` is ALWAYS skipped
unless explicitly opted back in via `BROKER_SYNC_IMAP_INCLUDE_PROVIDERS`.
This protects against accidental re-ingestion via any code path that
doesn't set the cron's env (e.g. `kubectl run --rm`, devvm `poetry run`,
a sibling agent session). See post-mortem 2026-05-27 — the IMAP path
re-inserted 39 IE BUYs that had been deduped the previous day, because
the safety lived only on the cronjob spec.
Additional providers can be excluded via
`BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS`. `INCLUDE` always wins over
`EXCLUDE` and the default skip-list.
"""
_DEFAULT_EXCLUDED = {"invest-engine", "invest_engine"}
extra = {
p.strip().lower().replace("_", "-")
for p in os.environ.get("BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS", "").split(",")
if p.strip()
}
include = {
p.strip().lower().replace("_", "-")
for p in os.environ.get("BROKER_SYNC_IMAP_INCLUDE_PROVIDERS", "").split(",")
if p.strip()
}
# Canonicalise the default set under the same key normalisation.
canonical = {p.replace("_", "-") for p in _DEFAULT_EXCLUDED}
return (canonical | extra) - include
def fetch_activities(creds: ImapCreds) -> list[Activity]:
out: list[Activity] = []
ie_parsed = schwab_parsed = ie_skipped = skipped = 0
exclude = _resolve_excluded_providers()
for raw in _fetch_all(creds):
try:
msg = email.message_from_bytes(raw)
except Exception:
skipped += 1
continue
sender = _extract_sender(msg)
if sender in _IE_SENDERS or sender.endswith("@investengine.com"):
if "invest-engine" in exclude:
ie_skipped += 1
continue
out.extend(ie_parser.parse_invest_engine_email(raw))
ie_parsed += 1
elif (
sender in _SCHWAB_SENDERS
or sender.endswith("@schwab.com")
or sender.endswith(".schwab.com") # e.g. donotreply@mail.schwab.com
):
if "schwab" in exclude:
skipped += 1
continue
html = _html_or_text(msg)
out.extend(parse_schwab_email(html))
schwab_parsed += 1
else:
skipped += 1
log.info(
"imap: ie_parsed=%d ie_skipped=%d schwab_parsed=%d skipped=%d%d activities",
ie_parsed,
ie_skipped,
schwab_parsed,
skipped,
len(out),
)
return out
class ImapProvider:
"""Wraps the IMAP fetch + per-sender parse into the Provider protocol.
Yields both InvestEngine AND Schwab activities — downstream the
pipeline's dedup keyed on (provider, account, external_id) already
isolates them by account_id.
"""
name = "imap"
def __init__(self, creds: ImapCreds) -> None:
self._creds = creds
def accounts(self) -> list[Account]:
return [
Account(
id=_IE_ISA_ACCOUNT_ID,
name="InvestEngine ISA",
account_type=AccountType.ISA,
currency="GBP",
provider="invest-engine",
),
Account(
id=_IE_GIA_ACCOUNT_ID,
name="InvestEngine GIA",
account_type=AccountType.GIA,
currency="GBP",
provider="invest-engine",
),
Account(
id="schwab-workplace",
name="Schwab (US workplace)",
account_type=AccountType.GIA,
currency="USD",
provider="schwab",
),
]
async def fetch(
self,
*,
since: datetime | None = None,
before: datetime | None = None,
) -> AsyncIterator[Activity]:
# IMAP doesn't give us a server-side date range directly without
# constructing IMAP SEARCH criteria; filter client-side.
all_activities = fetch_activities(self._creds)
# Apply ISA/GIA £20k-cap routing in one batch-level pass so each UK tax
# year's cumulative total is computed consistently regardless of email
# order on the server.
routed = _split_ie_by_isa_cap(all_activities)
for a in routed:
if since is not None and a.date < since:
continue
if before is not None and a.date >= before:
continue
yield a
if __name__ == "__main__":
# Local smoke — invoked manually for debug, never from the CronJob.
import os
logging.basicConfig(level=logging.INFO)
c = ImapCreds(
host=os.environ["IMAP_HOST"],
user=os.environ["IMAP_USER"],
password=os.environ["IMAP_PASSWORD"],
directory=os.environ.get("IMAP_DIRECTORY", "INBOX"),
)
acts = fetch_activities(c)
print(f"total={len(acts)}")
for a in acts[:5]:
print(f" {a.activity_type} {a.symbol} {a.date.isoformat()}")