"""Schwab workplace-RSU email parser. Two email shapes are handled: 1. Trade confirmations (sell-to-cover or user-initiated trades): HTML with five `` cells holding date / direction / quantity / ticker / price. → one Activity. 2. Release Confirmations (RSU vest events): subject/body mentions "Release Confirmation" or "Award Vesting"; body lists vest date, shares released, FMV, shares sold to cover, and USD tax withheld. → (Activity, Activity, VestEvent) tuple: the gross vest (BUY at FMV), the sell-to-cover (SELL at FMV), and a standalone VestEvent for the payslip-ingest reconciliation pipeline. On any parse failure we return the neutral empty result (no Activities, no VestEvent) — an unparseable email shouldn't crash the IMAP batch. """ from __future__ import annotations import logging import re from dataclasses import dataclass from decimal import Decimal, InvalidOperation from bs4 import BeautifulSoup from dateutil import parser as dateparser from broker_sync.models import AccountType, Activity, ActivityType, VestEvent log = logging.getLogger(__name__) _ACCOUNT_ID = "schwab-workplace" _DEFAULT_CURRENCY = "USD" # Vest-confirmation emails reliably include one of these phrases. Matching # is case-insensitive and on the raw HTML (cheap — no DOM parse needed). _VEST_SUBJECT_RE = re.compile(r"Release Confirmation|Award Vesting|RSU Release", re.IGNORECASE) @dataclass class VestParseResult: activities: list[Activity] vest_event: VestEvent | None def parse_schwab_email(raw_html: str) -> list[Activity]: """Return a single-item list of Activity on success, empty on failure. For vest-confirmation emails, returns the two Activity rows (gross vest + sell-to-cover). Use `parse_schwab_email_full` when the caller also needs the VestEvent. """ return parse_schwab_email_full(raw_html).activities def parse_schwab_email_full(raw_html: str) -> VestParseResult: """Full parse — returns activities + optional VestEvent. Dispatches: vest-confirmation emails → `_parse_vest_release`; everything else → the legacy single-row confirmation parser. """ if _VEST_SUBJECT_RE.search(raw_html): result = _parse_vest_release(raw_html) if result is not None: return result log.warning("schwab: detected vest email but could not extract fields; " "add a real fixture to broker-sync/tests/fixtures/") return VestParseResult(activities=[], vest_event=None) return VestParseResult(activities=_parse_trade_confirmation(raw_html), vest_event=None) def _parse_trade_confirmation(raw_html: str) -> list[Activity]: """Legacy 5-cell trade confirmation parser.""" try: soup = BeautifulSoup(raw_html, "html.parser") cells = [ td.get_text(strip=True) for td in soup.find_all("td", { "class": "dark-background-body", "align": "right" }) ] if len(cells) < 5: return [] date_txt, direction_txt, qty_txt, ticker, price_txt = cells[:5] trade_date = dateparser.parse(date_txt) direction = (ActivityType.SELL if direction_txt.strip().lower() == "sold" else ActivityType.BUY) quantity = Decimal(qty_txt.replace(",", "").strip()) # Price like "$123.45" — strip the currency sign and parse the numeric tail. # Handle "£", "€", "USD", etc. by taking the last numeric span. price_clean = price_txt for sign in ("$", "£", "€", "USD", "GBP", "EUR"): price_clean = price_clean.replace(sign, "") unit_price = Decimal(price_clean.replace(",", "").strip()) external_id = (f"schwab:{trade_date.date().isoformat()}:{ticker}:" f"{direction.value}:{quantity}") return [ Activity( external_id=external_id, account_id=_ACCOUNT_ID, account_type=AccountType.GIA, date=trade_date, activity_type=direction, symbol=ticker.strip(), quantity=quantity, unit_price=unit_price, currency=_DEFAULT_CURRENCY, notes=f"schwab-email:{direction_txt}", ) ] except (ValueError, InvalidOperation, IndexError, AttributeError): return [] # Heuristic extractors for vest-release emails. Labels observed in public # Schwab RSU release samples; real fixture needed to tighten these. _VEST_DATE_RE = re.compile( r"(?:Release Date|Vest Date|Vesting Date)\s*[:<][^0-9]*" r"(\d{1,2}[\s/\-][A-Za-z]{3}[\s/\-]\d{2,4}|\d{2}/\d{2}/\d{4}|\d{4}-\d{2}-\d{2})", re.IGNORECASE) _VEST_TICKER_RE = re.compile(r"(?:Ticker|Symbol)\s*[:<]\s*([A-Z]{2,5})", re.IGNORECASE) _VEST_SHARES_RELEASED_RE = re.compile( r"(?:Shares Released|Total Shares (?:Released|Vested))\s*[:<]\s*" r"([\d,]+(?:\.\d+)?)", re.IGNORECASE) _VEST_SHARES_WITHHELD_RE = re.compile( r"(?:Shares (?:Withheld|Sold)(?: for Taxes)?)\s*[:<]\s*" r"([\d,]+(?:\.\d+)?)", re.IGNORECASE) _VEST_FMV_RE = re.compile( r"(?:Market Price|FMV|Fair Market Value)\s*[:<]\s*" r"\$?\s*([\d,]+(?:\.\d+)?)", re.IGNORECASE) _VEST_TAX_USD_RE = re.compile( r"(?:Tax Withholding Amount|Total Tax Withholding|Tax Withheld)\s*[:<]\s*" r"\$?\s*([\d,]+(?:\.\d+)?)", re.IGNORECASE) def _parse_vest_release(raw_html: str) -> VestParseResult | None: """Best-effort extraction from a Schwab Release Confirmation email. Runs label regexes on the plain-text view of the HTML. Returns None (signalling fall-through) if the core four fields (date, ticker, shares released, FMV) don't all resolve — that's a strong signal the heuristics need a real fixture before they can be trusted on a live email. """ try: soup = BeautifulSoup(raw_html, "html.parser") text = soup.get_text(" ", strip=True) except Exception: return None date_str = _search_group(_VEST_DATE_RE, text) ticker = _search_group(_VEST_TICKER_RE, text) shares_released_str = _search_group(_VEST_SHARES_RELEASED_RE, text) fmv_str = _search_group(_VEST_FMV_RE, text) if not (date_str and ticker and shares_released_str and fmv_str): return None try: vest_date = dateparser.parse(date_str) shares_vested = Decimal(shares_released_str.replace(",", "")) fmv = Decimal(fmv_str.replace(",", "")) except (ValueError, InvalidOperation): return None shares_sold_str = _search_group(_VEST_SHARES_WITHHELD_RE, text) shares_sold_to_cover = (Decimal(shares_sold_str.replace(",", "")) if shares_sold_str else None) tax_usd_str = _search_group(_VEST_TAX_USD_RE, text) tax_withheld_usd = (Decimal(tax_usd_str.replace(",", "")) if tax_usd_str else None) external_id = (f"schwab:{vest_date.date().isoformat()}:{ticker}:VEST:" f"{shares_vested}") vest_event = VestEvent( external_id=external_id, vest_date=vest_date, ticker=ticker, shares_vested=shares_vested, shares_sold_to_cover=shares_sold_to_cover, fmv_at_vest_usd=fmv, tax_withheld_usd=tax_withheld_usd, source="schwab_email", raw={ "date": date_str, "ticker": ticker, "shares_released": shares_released_str, "fmv": fmv_str, "shares_withheld": shares_sold_str or "", "tax_withheld": tax_usd_str or "", }, ) # Sibling Activities for Wealthfolio: full vest as BUY, sell-to-cover # slice as SELL, both at the same FMV so net cash = 0 on that day. activities: list[Activity] = [ Activity( external_id=f"{external_id}:BUY", account_id=_ACCOUNT_ID, account_type=AccountType.GIA, date=vest_date, activity_type=ActivityType.BUY, symbol=ticker, quantity=shares_vested, unit_price=fmv, currency=_DEFAULT_CURRENCY, notes="schwab-vest-release", ) ] if shares_sold_to_cover is not None and shares_sold_to_cover > 0: activities.append( Activity( external_id=f"{external_id}:SELL_TO_COVER", account_id=_ACCOUNT_ID, account_type=AccountType.GIA, date=vest_date, activity_type=ActivityType.SELL, symbol=ticker, quantity=shares_sold_to_cover, unit_price=fmv, currency=_DEFAULT_CURRENCY, notes="schwab-sell-to-cover", )) return VestParseResult(activities=activities, vest_event=vest_event) def _search_group(pattern: re.Pattern[str], text: str) -> str | None: m = pattern.search(text) return m.group(1).strip() if m else None