imap: skip InvestEngine emails via BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS
Some checks failed
CI / test (push) Waiting to run
CI / build (push) Blocked by required conditions
CI / deploy (push) Blocked by required conditions
ci/woodpecker/push/build Pipeline failed

The IMAP IE parser and the bearer-token IE API path generate different
external_ids for the same fill, so running both produces duplicate BUYs
in Wealthfolio. With IE now served by the API path (broker-sync invest-engine),
we keep the IMAP path live for Schwab and gate IE off via env var.

Setting BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS=invest-engine on the imap CronJob
stops new dupes; Schwab routing is unaffected.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-05-26 21:16:28 +00:00
parent d5dbeb96af
commit 68d4832c2e
2 changed files with 51 additions and 2 deletions

View file

@ -16,6 +16,7 @@ from __future__ import annotations
import email import email
import imaplib import imaplib
import logging import logging
import os
import re import re
import ssl import ssl
from collections.abc import AsyncIterator, Iterator from collections.abc import AsyncIterator, Iterator
@ -152,7 +153,12 @@ def _fetch_all(creds: ImapCreds) -> Iterator[bytes]:
def fetch_activities(creds: ImapCreds) -> list[Activity]: def fetch_activities(creds: ImapCreds) -> list[Activity]:
out: list[Activity] = [] out: list[Activity] = []
ie_parsed = schwab_parsed = skipped = 0 ie_parsed = schwab_parsed = ie_skipped = skipped = 0
exclude = {
p.strip().lower()
for p in os.environ.get("BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS", "").split(",")
if p.strip()
}
for raw in _fetch_all(creds): for raw in _fetch_all(creds):
try: try:
msg = email.message_from_bytes(raw) msg = email.message_from_bytes(raw)
@ -161,6 +167,9 @@ def fetch_activities(creds: ImapCreds) -> list[Activity]:
continue continue
sender = _extract_sender(msg) sender = _extract_sender(msg)
if sender in _IE_SENDERS or sender.endswith("@investengine.com"): if sender in _IE_SENDERS or sender.endswith("@investengine.com"):
if "invest-engine" in exclude or "invest_engine" in exclude:
ie_skipped += 1
continue
out.extend(ie_parser.parse_invest_engine_email(raw)) out.extend(ie_parser.parse_invest_engine_email(raw))
ie_parsed += 1 ie_parsed += 1
elif ( elif (
@ -168,14 +177,18 @@ def fetch_activities(creds: ImapCreds) -> list[Activity]:
or sender.endswith("@schwab.com") or sender.endswith("@schwab.com")
or sender.endswith(".schwab.com") # e.g. donotreply@mail.schwab.com or sender.endswith(".schwab.com") # e.g. donotreply@mail.schwab.com
): ):
if "schwab" in exclude:
skipped += 1
continue
html = _html_or_text(msg) html = _html_or_text(msg)
out.extend(parse_schwab_email(html)) out.extend(parse_schwab_email(html))
schwab_parsed += 1 schwab_parsed += 1
else: else:
skipped += 1 skipped += 1
log.info( log.info(
"imap: ie_parsed=%d schwab_parsed=%d skipped=%d%d activities", "imap: ie_parsed=%d ie_skipped=%d schwab_parsed=%d skipped=%d%d activities",
ie_parsed, ie_parsed,
ie_skipped,
schwab_parsed, schwab_parsed,
skipped, skipped,
len(out), len(out),

View file

@ -101,6 +101,42 @@ def test_non_ie_activities_passed_through_unchanged() -> None:
assert routed[0].account_type is AccountType.GIA assert routed[0].account_type is AccountType.GIA
def test_exclude_invest_engine_skips_ie_emails(monkeypatch) -> None:
"""BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS=invest-engine should skip IE messages
so we don't duplicate IE buys already ingested via the bearer-token API path.
Schwab routing must remain unaffected."""
from broker_sync.providers import imap as imap_mod
ie_email = (
b"From: noreply@investengine.com\r\n"
b"Subject: VUAG Bought\r\n"
b"Content-Type: text/plain\r\n\r\n"
b"Vanguard S&P 500: VUAG Bought 10.0 @ 100.0 per share Total: 1000.00\r\n"
)
schwab_email = (
b"From: donotreply@schwab.com\r\n"
b"Subject: Order Confirmed\r\n"
b"Content-Type: text/html\r\n\r\n"
b"<html><body>no-op</body></html>\r\n"
)
monkeypatch.setattr(imap_mod, "_fetch_all", lambda _: [ie_email, schwab_email])
monkeypatch.setattr(imap_mod.ie_parser, "parse_invest_engine_email",
lambda raw: [object()])
monkeypatch.setattr(imap_mod, "parse_schwab_email", lambda html: [object()])
creds = imap_mod.ImapCreds(host="h", user="u", password="p", directory="d")
monkeypatch.setenv("BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS", "invest-engine")
out_excluded = imap_mod.fetch_activities(creds)
# IE skipped → only the schwab activity is emitted
assert len(out_excluded) == 1
monkeypatch.delenv("BROKER_SYNC_IMAP_EXCLUDE_PROVIDERS", raising=False)
out_default = imap_mod.fetch_activities(creds)
# Both providers fire when env unset
assert len(out_default) == 2
def test_schwab_subdomain_sender_matches() -> None: def test_schwab_subdomain_sender_matches() -> None:
"""Real Schwab trade emails come from `donotreply@mail.schwab.com` """Real Schwab trade emails come from `donotreply@mail.schwab.com`
(subdomain), not just `donotreply@schwab.com`. The matcher must (subdomain), not just `donotreply@schwab.com`. The matcher must