rsu_vest_events: schema + ORM for Schwab vest ground truth (Phase D)

Migration 0008 + ORM model for payslip_ingest.rsu_vest_events.

Purpose: broker-sync (separate repo) will parse Schwab "Release
Confirmation" emails and populate this table, enabling Panel 15 of
the UK payslip dashboard to reconcile:
  payslip.rsu_vest   ↔ SUM(rsu_vest_events.gross_value_gbp)
  RSU-attributed PAYE ↔ SUM(rsu_vest_events.tax_withheld_gbp)

Schema carries both the raw USD figures (fmv_at_vest_usd,
tax_withheld_usd, shares_*) and the GBP-translated values
(gross_value_gbp, tax_withheld_gbp) plus the FX rate used — the
dashboard joins on GBP, audits keep USD.

Idempotent on `external_id` — broker-sync emits a stable
`schwab:{date}:{ticker}:VEST:{shares_vested}` for each vest event.

The broker-sync postgres sink that writes here is pending a real email
fixture (current parser is heuristic-only) and a cross-service DB grant
for broker-sync's K8s ServiceAccount. Follow-up under code-860.

Part of: code-860
This commit is contained in:
Viktor Barzin 2026-04-19 18:27:41 +00:00
parent 08f28ad581
commit 3a62a38069
2 changed files with 89 additions and 0 deletions

View file

@ -0,0 +1,61 @@
"""Add rsu_vest_events for Schwab vest ground-truth reconciliation.
Schwab emails a "Release Confirmation" on each RSU vest, listing the vest
date, shares released at FMV, shares sold to cover tax, and the USD
withholding amount. broker-sync will parse these emails and populate
this table; Panel 15 of the dashboard reconciles
(payslip.rsu_vest, payslip.rsu_income_tax)
(rsu_vest_events.gross_value_gbp, rsu_vest_events.tax_withheld_gbp)
to validate the parser's RSU-split correctness.
Idempotent on `external_id` re-running the IMAP sync doesn't create
duplicates. USD-denominated raw values are retained alongside the
GBP-converted values for audit.
"""
import sqlalchemy as sa
from alembic import op
revision = "0008"
down_revision = "0007"
branch_labels = None
depends_on = None
SCHEMA = "payslip_ingest"
def upgrade() -> None:
op.create_table(
"rsu_vest_events",
sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
sa.Column("external_id", sa.String(), nullable=False, unique=True),
sa.Column("vest_date", sa.Date(), nullable=False),
sa.Column("ticker", sa.String(), nullable=False),
sa.Column("shares_vested", sa.Numeric(14, 4), nullable=False),
sa.Column("shares_sold_to_cover", sa.Numeric(14, 4), nullable=True),
sa.Column("fmv_at_vest_usd", sa.Numeric(12, 4), nullable=False),
sa.Column("tax_withheld_usd", sa.Numeric(12, 2), nullable=True),
sa.Column("fx_rate_gbp", sa.Numeric(10, 6), nullable=True),
sa.Column("gross_value_gbp", sa.Numeric(12, 2), nullable=True),
sa.Column("tax_withheld_gbp", sa.Numeric(12, 2), nullable=True),
sa.Column("source", sa.String(length=32), nullable=False),
sa.Column("raw_extraction", sa.JSON(), nullable=True),
sa.Column("created_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
nullable=False),
schema=SCHEMA,
)
op.create_index(
"ix_rsu_vest_events_vest_date",
"rsu_vest_events",
["vest_date"],
schema=SCHEMA,
)
def downgrade() -> None:
op.drop_index("ix_rsu_vest_events_vest_date",
table_name="rsu_vest_events",
schema=SCHEMA)
op.drop_table("rsu_vest_events", schema=SCHEMA)

View file

@ -99,6 +99,34 @@ class ExternalMetaDeposit(Base):
server_default=text("now()"))
class RsuVestEvent(Base):
"""Schwab RSU vest event — ground truth against payslip.rsu_vest.
One row per vest. `external_id` is stable across IMAP re-runs
(`schwab:{date}:{ticker}:VEST:{shares_vested}`). USD GBP conversion
happens at write time using the daily ECB rate.
"""
__tablename__ = "rsu_vest_events"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
external_id: Mapped[str] = mapped_column(String, unique=True, nullable=False)
vest_date: Mapped[date] = mapped_column(Date, nullable=False)
ticker: Mapped[str] = mapped_column(String, nullable=False)
shares_vested: Mapped[Decimal] = mapped_column(Numeric(14, 4), nullable=False)
shares_sold_to_cover: Mapped[Decimal | None] = mapped_column(Numeric(14, 4), nullable=True)
fmv_at_vest_usd: Mapped[Decimal] = mapped_column(Numeric(12, 4), nullable=False)
tax_withheld_usd: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True)
fx_rate_gbp: Mapped[Decimal | None] = mapped_column(Numeric(10, 6), nullable=True)
gross_value_gbp: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True)
tax_withheld_gbp: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True)
source: Mapped[str] = mapped_column(String(32), nullable=False)
raw_extraction: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=text("now()"))
class P60Reference(Base):
"""HMRC-issued annual P60. One row per (tax_year, employer).