broker-sync/broker_sync/providers/imap.py
Viktor Barzin 68d4832c2e
Some checks failed
CI / test (push) Waiting to run
CI / build (push) Blocked by required conditions
CI / deploy (push) Blocked by required conditions
ci/woodpecker/push/build Pipeline failed
imap: skip InvestEngine emails via BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS
The IMAP IE parser and the bearer-token IE API path generate different
external_ids for the same fill, so running both produces duplicate BUYs
in Wealthfolio. With IE now served by the API path (broker-sync invest-engine),
we keep the IMAP path live for Schwab and gate IE off via env var.

Setting BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS=invest-engine on the imap CronJob
stops new dupes; Schwab routing is unaffected.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-26 21:16:28 +00:00

270 lines
9 KiB
Python

"""IMAP email ingestor: dispatches messages to the matching parser by sender.
Used by the `imap-ingest` CLI command for InvestEngine + Schwab confirmation
emails. Each message passes through:
1. Pull ALL messages from the configured mailbox directory.
2. Route each by `From:` to a parser:
- noreply@investengine.com (+ equivalents) → invest_engine parser
- Schwab confirmations (equityawards@schwab.com, etc.) → schwab parser
3. Merge parser output into one list[Activity] with source attribution.
Not imap-idle; runs once per invocation. Designed for a daily CronJob.
"""
from __future__ import annotations
import email
import imaplib
import logging
import os
import re
import ssl
from collections.abc import AsyncIterator, Iterator
from datetime import date, datetime
from decimal import Decimal
from email.message import Message
from typing import NamedTuple
from broker_sync.models import Account, AccountType, Activity, ActivityType
from broker_sync.providers.parsers import invest_engine as ie_parser
from broker_sync.providers.parsers.schwab import parse_schwab_email
_IE_ISA_ACCOUNT_ID = "invest-engine-primary"
_IE_GIA_ACCOUNT_ID = "invest-engine-gia"
_ISA_ANNUAL_CAP = Decimal("20000")
_UK_TAX_YEAR_START = (4, 6) # (month, day) — UK tax year starts 6 April
def _uk_tax_year_start(d: datetime) -> date:
"""Return the start date (6 April of year N) of the UK tax year containing `d`."""
month, day = _UK_TAX_YEAR_START
cutoff = date(d.year, month, day)
return cutoff if d.date() >= cutoff else date(d.year - 1, month, day)
def _split_ie_by_isa_cap(
activities: list[Activity],
*,
isa_cap: Decimal = _ISA_ANNUAL_CAP,
) -> list[Activity]:
"""Re-route IE BUYs: first `isa_cap` GBP of each UK tax year → ISA, rest → GIA.
Viktor's IE account has both an ISA and a GIA wrapper, and his trade
confirmation emails don't indicate which one a given buy hit. Empirically,
he fills the ISA allowance first each tax year (6 April) and any excess
lands in GIA. This function partitions an already-parsed batch of Activity
objects by that rule.
Rule for boundary buys: a BUY is assigned to ISA iff the running tax-year
total BEFORE it is still strictly below the cap; otherwise GIA. Whole-
activity assignment — no fractional splits.
Non-IE activities and non-BUYs are passed through unchanged.
"""
ie_buys = [
a for a in activities
if a.account_id == _IE_ISA_ACCOUNT_ID and a.activity_type is ActivityType.BUY
]
ie_buys.sort(key=lambda a: a.date)
cumulative: dict[date, Decimal] = {}
for a in ie_buys:
ty = _uk_tax_year_start(a.date)
running = cumulative.get(ty, Decimal(0))
trade_value = (a.quantity or Decimal(0)) * (a.unit_price or Decimal(0))
if running < isa_cap:
a.account_id = _IE_ISA_ACCOUNT_ID
a.account_type = AccountType.ISA
else:
a.account_id = _IE_GIA_ACCOUNT_ID
a.account_type = AccountType.GIA
cumulative[ty] = running + trade_value
return activities
log = logging.getLogger(__name__)
_IE_SENDERS = {"noreply@investengine.com", "hello@investengine.com"}
_SCHWAB_SENDERS = {
"equityawards@schwab.com",
"donotreply@schwab.com",
"wealthnotify@schwab.com",
}
_ADDR_RE = re.compile(r"[\w.+-]+@[\w-]+(?:\.[\w-]+)+")
class ImapCreds(NamedTuple):
host: str
user: str
password: str
directory: str
def _extract_sender(msg: Message) -> str:
raw = msg.get("From", "")
m = _ADDR_RE.search(raw)
return (m.group(0) if m else "").lower()
def _html_or_text(msg: Message) -> str:
"""Return the richest body available (prefer HTML)."""
if msg.is_multipart():
html = None
plain = None
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/html" and html is None:
html = part.get_payload(decode=True)
elif ct == "text/plain" and plain is None:
plain = part.get_payload(decode=True)
body = html or plain
else:
body = msg.get_payload(decode=True)
if body is None:
return ""
if isinstance(body, bytes):
charset = msg.get_content_charset() or "utf-8"
try:
return body.decode(charset, errors="replace")
except LookupError:
return body.decode("utf-8", errors="replace")
return str(body)
def _fetch_all(creds: ImapCreds) -> Iterator[bytes]:
ctx = ssl.create_default_context()
with imaplib.IMAP4_SSL(creds.host, ssl_context=ctx) as m:
m.login(creds.user, creds.password)
typ, _ = m.select(creds.directory, readonly=True)
if typ != "OK":
raise RuntimeError(f"IMAP select {creds.directory} failed: {typ}")
typ, data = m.search(None, "ALL")
if typ != "OK":
raise RuntimeError(f"IMAP search failed: {typ}")
ids = data[0].split()
log.info("imap: fetching %d messages from %s", len(ids), creds.directory)
for uid in ids:
typ, rsp = m.fetch(uid, "(RFC822)")
if typ != "OK" or not rsp or not rsp[0]:
continue
raw = rsp[0][1]
if isinstance(raw, bytes):
yield raw
def fetch_activities(creds: ImapCreds) -> list[Activity]:
out: list[Activity] = []
ie_parsed = schwab_parsed = ie_skipped = skipped = 0
exclude = {
p.strip().lower()
for p in os.environ.get("BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS", "").split(",")
if p.strip()
}
for raw in _fetch_all(creds):
try:
msg = email.message_from_bytes(raw)
except Exception:
skipped += 1
continue
sender = _extract_sender(msg)
if sender in _IE_SENDERS or sender.endswith("@investengine.com"):
if "invest-engine" in exclude or "invest_engine" in exclude:
ie_skipped += 1
continue
out.extend(ie_parser.parse_invest_engine_email(raw))
ie_parsed += 1
elif (
sender in _SCHWAB_SENDERS
or sender.endswith("@schwab.com")
or sender.endswith(".schwab.com") # e.g. donotreply@mail.schwab.com
):
if "schwab" in exclude:
skipped += 1
continue
html = _html_or_text(msg)
out.extend(parse_schwab_email(html))
schwab_parsed += 1
else:
skipped += 1
log.info(
"imap: ie_parsed=%d ie_skipped=%d schwab_parsed=%d skipped=%d%d activities",
ie_parsed,
ie_skipped,
schwab_parsed,
skipped,
len(out),
)
return out
class ImapProvider:
"""Wraps the IMAP fetch + per-sender parse into the Provider protocol.
Yields both InvestEngine AND Schwab activities — downstream the
pipeline's dedup keyed on (provider, account, external_id) already
isolates them by account_id.
"""
name = "imap"
def __init__(self, creds: ImapCreds) -> None:
self._creds = creds
def accounts(self) -> list[Account]:
return [
Account(
id=_IE_ISA_ACCOUNT_ID,
name="InvestEngine ISA",
account_type=AccountType.ISA,
currency="GBP",
provider="invest-engine",
),
Account(
id=_IE_GIA_ACCOUNT_ID,
name="InvestEngine GIA",
account_type=AccountType.GIA,
currency="GBP",
provider="invest-engine",
),
Account(
id="schwab-workplace",
name="Schwab (US workplace)",
account_type=AccountType.GIA,
currency="USD",
provider="schwab",
),
]
async def fetch(
self,
*,
since: datetime | None = None,
before: datetime | None = None,
) -> AsyncIterator[Activity]:
# IMAP doesn't give us a server-side date range directly without
# constructing IMAP SEARCH criteria; filter client-side.
all_activities = fetch_activities(self._creds)
# Apply ISA/GIA £20k-cap routing in one batch-level pass so each UK tax
# year's cumulative total is computed consistently regardless of email
# order on the server.
routed = _split_ie_by_isa_cap(all_activities)
for a in routed:
if since is not None and a.date < since:
continue
if before is not None and a.date >= before:
continue
yield a
if __name__ == "__main__":
# Local smoke — invoked manually for debug, never from the CronJob.
import os
logging.basicConfig(level=logging.INFO)
c = ImapCreds(
host=os.environ["IMAP_HOST"],
user=os.environ["IMAP_USER"],
password=os.environ["IMAP_PASSWORD"],
directory=os.environ.get("IMAP_DIRECTORY", "INBOX"),
)
acts = fetch_activities(c)
print(f"total={len(acts)}")
for a in acts[:5]:
print(f" {a.activity_type} {a.symbol} {a.date.isoformat()}")