"""InvestEngine email parser. IE mails the user after each trade batch. The body shape varies — over the years IE has sent trade confirmations as plain-text RFC 2822 messages, multipart HTML emails with a summary table, and (for older statements) CSV attachments. This module tries the three strategies in order and returns the first that yields at least one Activity. Every parse strategy produces canonical `Activity` objects with: - `account_id = "invest-engine-primary"` (sink remaps to Wealthfolio UUID) - `account_type = AccountType.ISA` (Viktor's IE account is an ISA) - `currency = "GBP"` - `external_id = f"invest-engine:{fingerprint}"` where fingerprint hashes (date, symbol, quantity, unit_price) for deterministic dedup. """ from __future__ import annotations import csv import email import hashlib import io import re from datetime import datetime from decimal import Decimal, InvalidOperation from email.message import Message from bs4 import BeautifulSoup from broker_sync.models import AccountType, Activity, ActivityType _ACCOUNT_ID = "invest-engine-primary" _CURRENCY_SIGN = "£" # HTML trade summary rows have the shape "Bought @ £ per share". _BOUGHT_RE = re.compile( r"Bought\s+([0-9]+(?:\.[0-9]+)?)\s*@\s*" + re.escape(_CURRENCY_SIGN) + r"([0-9]+(?:\.[0-9]+)?)", re.IGNORECASE, ) # Ticker lines look like "Vanguard S&P 500: VUAG" — we want the last # all-caps token after the colon. _TICKER_RE = re.compile(r":\s*([A-Z][A-Z0-9]{1,9})\s*$") # Date rows contain "Date: DD Month YYYY". _DATE_RE = re.compile( r"Date:\s*([0-9]{1,2})\s+([A-Za-z]+)\s+([0-9]{4})", re.IGNORECASE, ) def parse_invest_engine_email(raw_email: bytes) -> list[Activity]: """Parse an IE trade confirmation email into Activity records. Tries RFC 2822 body lines first, then HTML tables, then a CSV attachment. Returns an empty list when nothing matches — never raises on malformed input. """ msg = email.message_from_bytes(raw_email) text_body = _extract_part_body(msg, "text/plain") if text_body is not None: activities = _parse_rfc2822_lines(text_body) if activities: return activities html_body = _extract_part_body(msg, "text/html") if html_body is not None: activities = _parse_html_tables(html_body) if activities: return activities csv_activities = _parse_csv_attachment(raw_email) if csv_activities: return csv_activities return [] def _extract_part_body(msg: Message, content_type: str) -> str | None: """Return the first sub-part of the given content type, or None.""" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == content_type: return _decode_payload(part) return None if msg.get_content_type() == content_type: return _decode_payload(msg) return None def _decode_payload(part: Message) -> str | None: payload = part.get_payload(decode=True) if isinstance(payload, bytes): return payload.decode(part.get_content_charset() or "utf-8", errors="replace") if isinstance(payload, str): return payload return None def _parse_rfc2822_lines(body: str) -> list[Activity]: """Try each line-based body format (v1/v2) and return matches. Corresponds to `_extract_position_v1` and `_extract_position_v2` in the upstream parser. Returns a one-element list on success, `[]` otherwise. v3/v4 are not ported — no surviving fixtures exist and the HTML fallback covers newer formats. """ for parser in (_try_v2, _try_v1): result = parser(body) if result is not None: return [result] return [] def _try_v2(body: str) -> Activity | None: """Parse body with v2 layout: `Date: DD Month` on line 2, year on line 3.""" lines = body.splitlines() if len(lines) < 6: return None try: day_str, month = lines[2].split()[-2:] year = lines[3].split()[0] on_date = datetime.strptime(f"{day_str}-{month}-{year}", "%d-%B-%Y") symbol = lines[4].split(":")[1].split()[0].strip() unit_price = Decimal(lines[4].split(_CURRENCY_SIGN)[1].split()[0]) quantity = Decimal(lines[4].split("Bought")[1].split()[0]) except (ValueError, IndexError): return None return _build_activity( on_date=on_date, symbol=symbol, quantity=quantity, unit_price=unit_price, strategy="rfc2822-v2", matched=lines[4], ) def _try_v1(body: str) -> Activity | None: """Parse body with v1 layout: `Date: DD` on line 2, `Month YYYY` on line 3.""" lines = body.splitlines() if len(lines) < 6: return None try: day = int(lines[2].split("Date: ")[1]) month, year = (lines[3].split(" ")[0]).split() on_date = datetime.strptime(f"{day}-{month}-{year}", "%d-%B-%Y") symbol = lines[4].split(":")[1].split()[0].strip() quantity = Decimal(lines[4].split("Bought")[1].split()[0]) price_str = lines[4].split("Bought")[1].split("@")[1].split()[0].split(_CURRENCY_SIGN)[1] unit_price = Decimal(price_str) except (ValueError, IndexError): return None return _build_activity( on_date=on_date, symbol=symbol, quantity=quantity, unit_price=unit_price, strategy="rfc2822-v1", matched=lines[4], ) def _parse_html_tables(body: str) -> list[Activity]: """Parse an HTML body with per-order nested summary tables. Walks every leaf (a table with no child tables); each leaf carries one trade summary (ticker, bought line, total, ISIN + order id). Tables that don't contain the expected shape are skipped, so a partially corrupted email yields only its intact orders. """ soup = BeautifulSoup(body, "html.parser") on_date = _extract_html_date(soup) if on_date is None: return [] activities: list[Activity] = [] for table in soup.find_all("table"): if table.find("table") is not None: continue activity = _try_html_summary_table(table, on_date) if activity is not None: activities.append(activity) return activities def _extract_html_date(soup: BeautifulSoup) -> datetime | None: match = _DATE_RE.search(soup.get_text(" ", strip=True)) if match is None: return None day, month, year = match.groups() try: return datetime.strptime(f"{day}-{month}-{year}", "%d-%B-%Y") except ValueError: return None def _try_html_summary_table(nested: object, on_date: datetime) -> Activity | None: """Interpret a leaf
as a single trade summary. Returns None if the table is structural (no "Bought N @ £P" row) or any required field is missing. """ get_text = getattr(nested, "get_text", None) if get_text is None: return None text = get_text(" ", strip=True) bought = _BOUGHT_RE.search(text) if bought is None: return None symbol = _extract_html_symbol(nested) if symbol is None: return None quantity = Decimal(bought.group(1)) unit_price = Decimal(bought.group(2)) return _build_activity( on_date=on_date, symbol=symbol, quantity=quantity, unit_price=unit_price, strategy="html", matched=text[:200], ) def _extract_html_symbol(nested: object) -> str | None: find_all = getattr(nested, "find_all", None) if find_all is None: return None for cell in find_all("td"): cell_text = cell.get_text(" ", strip=True) m = _TICKER_RE.search(cell_text) if m is not None: return m.group(1) return None _CSV_CONTENT_TYPES = {"text/csv", "application/csv", "application/vnd.ms-excel"} # Required columns for the CSV attachment strategy. IE has not (yet) sent # CSV-attached statements in production — the column set here mirrors the # upstream _extract_positions_csv contract (ticker, buy_price, num_shares, # buy_date, currency) with modern names. _CSV_COLUMNS = {"ticker", "unit_price", "quantity", "date", "currency"} def _parse_csv_attachment(raw_email: bytes) -> list[Activity]: """Parse a CSV attachment from the email into Activity records. Walks every MIME part, picks the first one with a CSV-ish content type OR a `.csv` filename, and iterates its rows. Rows missing a required column or with an unparseable number/date are skipped. """ msg = email.message_from_bytes(raw_email) csv_text = _extract_csv_attachment_text(msg) if csv_text is None: return [] reader = csv.DictReader(io.StringIO(csv_text)) fieldnames = set(reader.fieldnames or []) if not _CSV_COLUMNS.issubset(fieldnames): return [] activities: list[Activity] = [] for row in reader: activity = _csv_row_to_activity(row) if activity is not None: activities.append(activity) return activities def _extract_csv_attachment_text(msg: Message) -> str | None: for part in msg.walk(): if not _looks_like_csv_part(part): continue payload = part.get_payload(decode=True) if isinstance(payload, bytes): return payload.decode(part.get_content_charset() or "utf-8", errors="replace") if isinstance(payload, str): return payload return None def _looks_like_csv_part(part: Message) -> bool: if part.get_content_type() in _CSV_CONTENT_TYPES: return True filename = part.get_filename() return isinstance(filename, str) and filename.lower().endswith(".csv") def _csv_row_to_activity(row: dict[str, str]) -> Activity | None: try: on_date = datetime.strptime(row["date"], "%Y-%m-%d") symbol = row["ticker"].strip() quantity = Decimal(row["quantity"]) unit_price = Decimal(row["unit_price"]) currency = row["currency"].strip() or "GBP" except (KeyError, ValueError, InvalidOperation): return None if not symbol or currency != "GBP": return None return _build_activity( on_date=on_date, symbol=symbol, quantity=quantity, unit_price=unit_price, strategy="csv", matched=f"{symbol},{unit_price},{quantity},{row['date']}", ) def _build_activity( *, on_date: datetime, symbol: str, quantity: Decimal, unit_price: Decimal, strategy: str, matched: str, ) -> Activity: fingerprint = _fingerprint(on_date, symbol, quantity, unit_price) return Activity( external_id=f"invest-engine:{fingerprint}", account_id=_ACCOUNT_ID, account_type=AccountType.ISA, date=on_date, activity_type=ActivityType.BUY, currency="GBP", symbol=symbol, quantity=quantity, unit_price=unit_price, notes=f"[{strategy}] {matched.strip()}", ) def _fingerprint(date: datetime, symbol: str, quantity: Decimal, unit_price: Decimal) -> str: key = f"{date.isoformat()}|{symbol}|{quantity}|{unit_price}" return hashlib.sha256(key.encode("utf-8")).hexdigest()[:16]