Merge ie-email-parser: HTML + CSV fallbacks + failure-mode tests
# Conflicts: # broker_sync/providers/parsers/invest_engine.py # tests/providers/parsers/test_invest_engine.py
This commit is contained in:
commit
1aa60ce348
6 changed files with 390 additions and 15 deletions
|
|
@ -16,43 +16,77 @@ Every parse strategy produces canonical `Activity` objects with:
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import email
|
||||
import hashlib
|
||||
import io
|
||||
import re
|
||||
from datetime import datetime
|
||||
from decimal import Decimal
|
||||
from decimal import Decimal, InvalidOperation
|
||||
from email.message import Message
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from broker_sync.models import AccountType, Activity, ActivityType
|
||||
|
||||
_ACCOUNT_ID = "invest-engine-primary"
|
||||
_CURRENCY_SIGN = "£"
|
||||
|
||||
# HTML trade summary rows have the shape "Bought <qty> @ £<price> per share".
|
||||
_BOUGHT_RE = re.compile(
|
||||
r"Bought\s+([0-9]+(?:\.[0-9]+)?)\s*@\s*" + re.escape(_CURRENCY_SIGN) + r"([0-9]+(?:\.[0-9]+)?)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
# Ticker lines look like "Vanguard S&P 500: VUAG" — we want the last
|
||||
# all-caps token after the colon.
|
||||
_TICKER_RE = re.compile(r":\s*([A-Z][A-Z0-9]{1,9})\s*$")
|
||||
# Date rows contain "Date: DD Month YYYY".
|
||||
_DATE_RE = re.compile(
|
||||
r"Date:\s*([0-9]{1,2})\s+([A-Za-z]+)\s+([0-9]{4})",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def parse_invest_engine_email(raw_email: bytes) -> list[Activity]:
|
||||
"""Parse an IE trade confirmation email into Activity records.
|
||||
|
||||
Returns an empty list when none of the three strategies match — never
|
||||
Tries RFC 2822 body lines first, then HTML tables, then a CSV
|
||||
attachment. Returns an empty list when nothing matches — never
|
||||
raises on malformed input.
|
||||
"""
|
||||
msg = email.message_from_bytes(raw_email)
|
||||
body = _extract_text_body(msg)
|
||||
if body is None:
|
||||
return []
|
||||
return _parse_rfc2822_lines(body)
|
||||
text_body = _extract_part_body(msg, "text/plain")
|
||||
if text_body is not None:
|
||||
activities = _parse_rfc2822_lines(text_body)
|
||||
if activities:
|
||||
return activities
|
||||
html_body = _extract_part_body(msg, "text/html")
|
||||
if html_body is not None:
|
||||
activities = _parse_html_tables(html_body)
|
||||
if activities:
|
||||
return activities
|
||||
csv_activities = _parse_csv_attachment(raw_email)
|
||||
if csv_activities:
|
||||
return csv_activities
|
||||
return []
|
||||
|
||||
|
||||
def _extract_text_body(msg: Message) -> str | None:
|
||||
"""Return the text/plain body of an email, or None if absent."""
|
||||
def _extract_part_body(msg: Message, content_type: str) -> str | None:
|
||||
"""Return the first sub-part of the given content type, or None."""
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
if part.get_content_type() == "text/plain":
|
||||
payload = part.get_payload(decode=True)
|
||||
if isinstance(payload, bytes):
|
||||
return payload.decode(part.get_content_charset() or "utf-8", errors="replace")
|
||||
if part.get_content_type() == content_type:
|
||||
return _decode_payload(part)
|
||||
return None
|
||||
payload = msg.get_payload(decode=True)
|
||||
if msg.get_content_type() == content_type:
|
||||
return _decode_payload(msg)
|
||||
return None
|
||||
|
||||
|
||||
def _decode_payload(part: Message) -> str | None:
|
||||
payload = part.get_payload(decode=True)
|
||||
if isinstance(payload, bytes):
|
||||
return payload.decode(msg.get_content_charset() or "utf-8", errors="replace")
|
||||
return payload.decode(part.get_content_charset() or "utf-8", errors="replace")
|
||||
if isinstance(payload, str):
|
||||
return payload
|
||||
return None
|
||||
|
|
@ -63,7 +97,8 @@ def _parse_rfc2822_lines(body: str) -> list[Activity]:
|
|||
|
||||
Corresponds to `_extract_position_v1` and `_extract_position_v2` in
|
||||
the upstream parser. Returns a one-element list on success, `[]`
|
||||
otherwise.
|
||||
otherwise. v3/v4 are not ported — no surviving fixtures exist and
|
||||
the HTML fallback covers newer formats.
|
||||
"""
|
||||
for parser in (_try_v2, _try_v1):
|
||||
result = parser(body)
|
||||
|
|
@ -121,6 +156,150 @@ def _try_v1(body: str) -> Activity | None:
|
|||
)
|
||||
|
||||
|
||||
def _parse_html_tables(body: str) -> list[Activity]:
|
||||
"""Parse an HTML body with per-order nested summary tables.
|
||||
|
||||
Walks every leaf <table> (a table with no child tables); each leaf
|
||||
carries one trade summary (ticker, bought line, total, ISIN + order
|
||||
id). Tables that don't contain the expected shape are skipped, so a
|
||||
partially corrupted email yields only its intact orders.
|
||||
"""
|
||||
soup = BeautifulSoup(body, "html.parser")
|
||||
on_date = _extract_html_date(soup)
|
||||
if on_date is None:
|
||||
return []
|
||||
activities: list[Activity] = []
|
||||
for table in soup.find_all("table"):
|
||||
if table.find("table") is not None:
|
||||
continue
|
||||
activity = _try_html_summary_table(table, on_date)
|
||||
if activity is not None:
|
||||
activities.append(activity)
|
||||
return activities
|
||||
|
||||
|
||||
def _extract_html_date(soup: BeautifulSoup) -> datetime | None:
|
||||
match = _DATE_RE.search(soup.get_text(" ", strip=True))
|
||||
if match is None:
|
||||
return None
|
||||
day, month, year = match.groups()
|
||||
try:
|
||||
return datetime.strptime(f"{day}-{month}-{year}", "%d-%B-%Y")
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def _try_html_summary_table(nested: object, on_date: datetime) -> Activity | None:
|
||||
"""Interpret a leaf <table> as a single trade summary.
|
||||
|
||||
Returns None if the table is structural (no "Bought N @ £P" row) or
|
||||
any required field is missing.
|
||||
"""
|
||||
get_text = getattr(nested, "get_text", None)
|
||||
if get_text is None:
|
||||
return None
|
||||
text = get_text(" ", strip=True)
|
||||
bought = _BOUGHT_RE.search(text)
|
||||
if bought is None:
|
||||
return None
|
||||
symbol = _extract_html_symbol(nested)
|
||||
if symbol is None:
|
||||
return None
|
||||
quantity = Decimal(bought.group(1))
|
||||
unit_price = Decimal(bought.group(2))
|
||||
return _build_activity(
|
||||
on_date=on_date,
|
||||
symbol=symbol,
|
||||
quantity=quantity,
|
||||
unit_price=unit_price,
|
||||
strategy="html",
|
||||
matched=text[:200],
|
||||
)
|
||||
|
||||
|
||||
def _extract_html_symbol(nested: object) -> str | None:
|
||||
find_all = getattr(nested, "find_all", None)
|
||||
if find_all is None:
|
||||
return None
|
||||
for cell in find_all("td"):
|
||||
cell_text = cell.get_text(" ", strip=True)
|
||||
m = _TICKER_RE.search(cell_text)
|
||||
if m is not None:
|
||||
return m.group(1)
|
||||
return None
|
||||
|
||||
|
||||
_CSV_CONTENT_TYPES = {"text/csv", "application/csv", "application/vnd.ms-excel"}
|
||||
# Required columns for the CSV attachment strategy. IE has not (yet) sent
|
||||
# CSV-attached statements in production — the column set here mirrors the
|
||||
# upstream _extract_positions_csv contract (ticker, buy_price, num_shares,
|
||||
# buy_date, currency) with modern names.
|
||||
_CSV_COLUMNS = {"ticker", "unit_price", "quantity", "date", "currency"}
|
||||
|
||||
|
||||
def _parse_csv_attachment(raw_email: bytes) -> list[Activity]:
|
||||
"""Parse a CSV attachment from the email into Activity records.
|
||||
|
||||
Walks every MIME part, picks the first one with a CSV-ish content
|
||||
type OR a `.csv` filename, and iterates its rows. Rows missing a
|
||||
required column or with an unparseable number/date are skipped.
|
||||
"""
|
||||
msg = email.message_from_bytes(raw_email)
|
||||
csv_text = _extract_csv_attachment_text(msg)
|
||||
if csv_text is None:
|
||||
return []
|
||||
reader = csv.DictReader(io.StringIO(csv_text))
|
||||
fieldnames = set(reader.fieldnames or [])
|
||||
if not _CSV_COLUMNS.issubset(fieldnames):
|
||||
return []
|
||||
activities: list[Activity] = []
|
||||
for row in reader:
|
||||
activity = _csv_row_to_activity(row)
|
||||
if activity is not None:
|
||||
activities.append(activity)
|
||||
return activities
|
||||
|
||||
|
||||
def _extract_csv_attachment_text(msg: Message) -> str | None:
|
||||
for part in msg.walk():
|
||||
if not _looks_like_csv_part(part):
|
||||
continue
|
||||
payload = part.get_payload(decode=True)
|
||||
if isinstance(payload, bytes):
|
||||
return payload.decode(part.get_content_charset() or "utf-8", errors="replace")
|
||||
if isinstance(payload, str):
|
||||
return payload
|
||||
return None
|
||||
|
||||
|
||||
def _looks_like_csv_part(part: Message) -> bool:
|
||||
if part.get_content_type() in _CSV_CONTENT_TYPES:
|
||||
return True
|
||||
filename = part.get_filename()
|
||||
return isinstance(filename, str) and filename.lower().endswith(".csv")
|
||||
|
||||
|
||||
def _csv_row_to_activity(row: dict[str, str]) -> Activity | None:
|
||||
try:
|
||||
on_date = datetime.strptime(row["date"], "%Y-%m-%d")
|
||||
symbol = row["ticker"].strip()
|
||||
quantity = Decimal(row["quantity"])
|
||||
unit_price = Decimal(row["unit_price"])
|
||||
currency = row["currency"].strip() or "GBP"
|
||||
except (KeyError, ValueError, InvalidOperation):
|
||||
return None
|
||||
if not symbol or currency != "GBP":
|
||||
return None
|
||||
return _build_activity(
|
||||
on_date=on_date,
|
||||
symbol=symbol,
|
||||
quantity=quantity,
|
||||
unit_price=unit_price,
|
||||
strategy="csv",
|
||||
matched=f"{symbol},{unit_price},{quantity},{row['date']}",
|
||||
)
|
||||
|
||||
|
||||
def _build_activity(
|
||||
*,
|
||||
on_date: datetime,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue