"""InvestEngine email parser. IE mails the user after each trade batch. The body shape varies — over the years IE has sent trade confirmations as plain-text RFC 2822 messages, multipart HTML emails with a summary table, and (for older statements) CSV attachments. This module tries the three strategies in order and returns the first that yields at least one Activity. Every parse strategy produces canonical `Activity` objects with: - `account_id = "invest-engine-primary"` (sink remaps to Wealthfolio UUID) - `account_type = AccountType.ISA` (Viktor's IE account is an ISA) - `currency = "GBP"` - `external_id = f"invest-engine:{fingerprint}"` where fingerprint hashes (date, symbol, quantity, unit_price) for deterministic dedup. """ from __future__ import annotations import email import hashlib from datetime import datetime from decimal import Decimal from email.message import Message from broker_sync.models import AccountType, Activity, ActivityType _ACCOUNT_ID = "invest-engine-primary" _CURRENCY_SIGN = "£" def parse_invest_engine_email(raw_email: bytes) -> list[Activity]: """Parse an IE trade confirmation email into Activity records. Returns an empty list when none of the three strategies match — never raises on malformed input. """ msg = email.message_from_bytes(raw_email) body = _extract_text_body(msg) if body is None: return [] return _parse_rfc2822_lines(body) def _extract_text_body(msg: Message) -> str | None: """Return the text/plain body of an email, or None if absent.""" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) if isinstance(payload, bytes): return payload.decode(part.get_content_charset() or "utf-8", errors="replace") return None payload = msg.get_payload(decode=True) if isinstance(payload, bytes): return payload.decode(msg.get_content_charset() or "utf-8", errors="replace") if isinstance(payload, str): return payload return None def _parse_rfc2822_lines(body: str) -> list[Activity]: """Try each line-based body format (v1/v2) and return matches. Corresponds to `_extract_position_v1` and `_extract_position_v2` in the upstream parser. Returns a one-element list on success, `[]` otherwise. """ for parser in (_try_v2, _try_v1): result = parser(body) if result is not None: return [result] return [] def _try_v2(body: str) -> Activity | None: """Parse body with v2 layout: `Date: DD Month` on line 2, year on line 3.""" lines = body.splitlines() if len(lines) < 6: return None try: day_str, month = lines[2].split()[-2:] year = lines[3].split()[0] on_date = datetime.strptime(f"{day_str}-{month}-{year}", "%d-%B-%Y") symbol = lines[4].split(":")[1].split()[0].strip() unit_price = Decimal(lines[4].split(_CURRENCY_SIGN)[1].split()[0]) quantity = Decimal(lines[4].split("Bought")[1].split()[0]) except (ValueError, IndexError): return None return _build_activity( on_date=on_date, symbol=symbol, quantity=quantity, unit_price=unit_price, strategy="rfc2822-v2", matched=lines[4], ) def _try_v1(body: str) -> Activity | None: """Parse body with v1 layout: `Date: DD` on line 2, `Month YYYY` on line 3.""" lines = body.splitlines() if len(lines) < 6: return None try: day = int(lines[2].split("Date: ")[1]) month, year = (lines[3].split(" ")[0]).split() on_date = datetime.strptime(f"{day}-{month}-{year}", "%d-%B-%Y") symbol = lines[4].split(":")[1].split()[0].strip() quantity = Decimal(lines[4].split("Bought")[1].split()[0]) price_str = lines[4].split("Bought")[1].split("@")[1].split()[0].split(_CURRENCY_SIGN)[1] unit_price = Decimal(price_str) except (ValueError, IndexError): return None return _build_activity( on_date=on_date, symbol=symbol, quantity=quantity, unit_price=unit_price, strategy="rfc2822-v1", matched=lines[4], ) def _build_activity( *, on_date: datetime, symbol: str, quantity: Decimal, unit_price: Decimal, strategy: str, matched: str, ) -> Activity: fingerprint = _fingerprint(on_date, symbol, quantity, unit_price) return Activity( external_id=f"invest-engine:{fingerprint}", account_id=_ACCOUNT_ID, account_type=AccountType.ISA, date=on_date, activity_type=ActivityType.BUY, currency="GBP", symbol=symbol, quantity=quantity, unit_price=unit_price, notes=f"[{strategy}] {matched.strip()}", ) def _fingerprint(date: datetime, symbol: str, quantity: Decimal, unit_price: Decimal) -> str: key = f"{date.isoformat()}|{symbol}|{quantity}|{unit_price}" return hashlib.sha256(key.encode("utf-8")).hexdigest()[:16]