broker-sync/broker_sync/providers/parsers/schwab.py

241 lines
8.9 KiB
Python
Raw Normal View History

"""Schwab workplace-RSU email parser.
Two email shapes are handled:
1. Trade confirmations (sell-to-cover or user-initiated trades): HTML
with five `<td class="dark-background-body" align="right">` cells
holding date / direction / quantity / ticker / price. one Activity.
2. Release Confirmations (RSU vest events): subject/body mentions
"Release Confirmation" or "Award Vesting"; body lists vest date,
shares released, FMV, shares sold to cover, and USD tax withheld.
(Activity, Activity, VestEvent) tuple: the gross vest (BUY at FMV),
the sell-to-cover (SELL at FMV), and a standalone VestEvent for the
payslip-ingest reconciliation pipeline.
On any parse failure we return the neutral empty result (no Activities,
no VestEvent) an unparseable email shouldn't crash the IMAP batch.
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from decimal import Decimal, InvalidOperation
from bs4 import BeautifulSoup
from dateutil import parser as dateparser
from broker_sync.models import AccountType, Activity, ActivityType, VestEvent
log = logging.getLogger(__name__)
_ACCOUNT_ID = "schwab-workplace"
_DEFAULT_CURRENCY = "USD"
# Vest-confirmation emails reliably include one of these phrases. Matching
# is case-insensitive and on the raw HTML (cheap — no DOM parse needed).
_VEST_SUBJECT_RE = re.compile(r"Release Confirmation|Award Vesting|RSU Release",
re.IGNORECASE)
@dataclass
class VestParseResult:
activities: list[Activity]
vest_event: VestEvent | None
def parse_schwab_email(raw_html: str) -> list[Activity]:
"""Return a single-item list of Activity on success, empty on failure.
For vest-confirmation emails, returns the two Activity rows (gross
vest + sell-to-cover). Use `parse_schwab_email_full` when the caller
also needs the VestEvent.
"""
return parse_schwab_email_full(raw_html).activities
def parse_schwab_email_full(raw_html: str) -> VestParseResult:
"""Full parse — returns activities + optional VestEvent.
Dispatches: vest-confirmation emails `_parse_vest_release`;
everything else the legacy single-row confirmation parser.
"""
if _VEST_SUBJECT_RE.search(raw_html):
result = _parse_vest_release(raw_html)
if result is not None:
return result
log.warning("schwab: detected vest email but could not extract fields; "
"add a real fixture to broker-sync/tests/fixtures/")
return VestParseResult(activities=[], vest_event=None)
return VestParseResult(activities=_parse_trade_confirmation(raw_html), vest_event=None)
def _parse_trade_confirmation(raw_html: str) -> list[Activity]:
"""Legacy 5-cell trade confirmation parser."""
try:
soup = BeautifulSoup(raw_html, "html.parser")
cells = [
td.get_text(strip=True) for td in soup.find_all("td", {
"class": "dark-background-body",
"align": "right"
})
]
if len(cells) < 5:
return []
date_txt, direction_txt, qty_txt, ticker, price_txt = cells[:5]
trade_date = dateparser.parse(date_txt)
direction = (ActivityType.SELL
if direction_txt.strip().lower() == "sold" else ActivityType.BUY)
quantity = Decimal(qty_txt.replace(",", "").strip())
# Price like "$123.45" — strip the currency sign and parse the numeric tail.
# Handle "£", "€", "USD", etc. by taking the last numeric span.
price_clean = price_txt
for sign in ("$", "£", "", "USD", "GBP", "EUR"):
price_clean = price_clean.replace(sign, "")
unit_price = Decimal(price_clean.replace(",", "").strip())
external_id = (f"schwab:{trade_date.date().isoformat()}:{ticker}:"
f"{direction.value}:{quantity}")
return [
Activity(
external_id=external_id,
account_id=_ACCOUNT_ID,
account_type=AccountType.GIA,
date=trade_date,
activity_type=direction,
symbol=ticker.strip(),
quantity=quantity,
unit_price=unit_price,
currency=_DEFAULT_CURRENCY,
notes=f"schwab-email:{direction_txt}",
)
]
except (ValueError, InvalidOperation, IndexError, AttributeError):
return []
# Heuristic extractors for vest-release emails. Labels observed in public
# Schwab RSU release samples; real fixture needed to tighten these.
_VEST_DATE_RE = re.compile(
r"(?:Release Date|Vest Date|Vesting Date)\s*[:<][^0-9]*"
r"(\d{1,2}[\s/\-][A-Za-z]{3}[\s/\-]\d{2,4}|\d{2}/\d{2}/\d{4}|\d{4}-\d{2}-\d{2})",
re.IGNORECASE)
_VEST_TICKER_RE = re.compile(r"(?:Ticker|Symbol)\s*[:<]\s*([A-Z]{2,5})",
re.IGNORECASE)
_VEST_SHARES_RELEASED_RE = re.compile(
r"(?:Shares Released|Total Shares (?:Released|Vested))\s*[:<]\s*"
r"([\d,]+(?:\.\d+)?)",
re.IGNORECASE)
_VEST_SHARES_WITHHELD_RE = re.compile(
r"(?:Shares (?:Withheld|Sold)(?: for Taxes)?)\s*[:<]\s*"
r"([\d,]+(?:\.\d+)?)",
re.IGNORECASE)
_VEST_FMV_RE = re.compile(
r"(?:Market Price|FMV|Fair Market Value)\s*[:<]\s*"
r"\$?\s*([\d,]+(?:\.\d+)?)",
re.IGNORECASE)
_VEST_TAX_USD_RE = re.compile(
r"(?:Tax Withholding Amount|Total Tax Withholding|Tax Withheld)\s*[:<]\s*"
r"\$?\s*([\d,]+(?:\.\d+)?)",
re.IGNORECASE)
def _parse_vest_release(raw_html: str) -> VestParseResult | None:
"""Best-effort extraction from a Schwab Release Confirmation email.
Runs label regexes on the plain-text view of the HTML. Returns None
(signalling fall-through) if the core four fields (date, ticker,
shares released, FMV) don't all resolve — that's a strong signal the
heuristics need a real fixture before they can be trusted on a live
email.
"""
try:
soup = BeautifulSoup(raw_html, "html.parser")
text = soup.get_text(" ", strip=True)
except Exception:
return None
date_str = _search_group(_VEST_DATE_RE, text)
ticker = _search_group(_VEST_TICKER_RE, text)
shares_released_str = _search_group(_VEST_SHARES_RELEASED_RE, text)
fmv_str = _search_group(_VEST_FMV_RE, text)
if not (date_str and ticker and shares_released_str and fmv_str):
return None
try:
vest_date = dateparser.parse(date_str)
shares_vested = Decimal(shares_released_str.replace(",", ""))
fmv = Decimal(fmv_str.replace(",", ""))
except (ValueError, InvalidOperation):
return None
shares_sold_str = _search_group(_VEST_SHARES_WITHHELD_RE, text)
shares_sold_to_cover = (Decimal(shares_sold_str.replace(",", ""))
if shares_sold_str else None)
tax_usd_str = _search_group(_VEST_TAX_USD_RE, text)
tax_withheld_usd = (Decimal(tax_usd_str.replace(",", ""))
if tax_usd_str else None)
external_id = (f"schwab:{vest_date.date().isoformat()}:{ticker}:VEST:"
f"{shares_vested}")
vest_event = VestEvent(
external_id=external_id,
vest_date=vest_date,
ticker=ticker,
shares_vested=shares_vested,
shares_sold_to_cover=shares_sold_to_cover,
fmv_at_vest_usd=fmv,
tax_withheld_usd=tax_withheld_usd,
source="schwab_email",
raw={
"date": date_str,
"ticker": ticker,
"shares_released": shares_released_str,
"fmv": fmv_str,
"shares_withheld": shares_sold_str or "",
"tax_withheld": tax_usd_str or "",
},
)
# Sibling Activities for Wealthfolio: full vest as BUY, sell-to-cover
# slice as SELL, both at the same FMV so net cash = 0 on that day.
activities: list[Activity] = [
Activity(
external_id=f"{external_id}:BUY",
account_id=_ACCOUNT_ID,
account_type=AccountType.GIA,
date=vest_date,
activity_type=ActivityType.BUY,
symbol=ticker,
quantity=shares_vested,
unit_price=fmv,
currency=_DEFAULT_CURRENCY,
notes="schwab-vest-release",
)
]
if shares_sold_to_cover is not None and shares_sold_to_cover > 0:
activities.append(
Activity(
external_id=f"{external_id}:SELL_TO_COVER",
account_id=_ACCOUNT_ID,
account_type=AccountType.GIA,
date=vest_date,
activity_type=ActivityType.SELL,
symbol=ticker,
quantity=shares_sold_to_cover,
unit_price=fmv,
currency=_DEFAULT_CURRENCY,
notes="schwab-sell-to-cover",
))
return VestParseResult(activities=activities, vest_event=vest_event)
def _search_group(pattern: re.Pattern[str], text: str) -> str | None:
m = pattern.search(text)
return m.group(1).strip() if m else None