broker-sync/broker_sync/providers/finance_mysql.py
Viktor Barzin a190875f63 Add finance_mysql provider + CLI for historical backfill
finance.position (171 rows, 2020-06-07 to 2025-12-19) is the only source
of InvestEngine + Schwab trade history pre-dating the broker-sync project.
This provider reads it once and pushes every row into the correct WF
account (.L tickers → IE ISA, others → Schwab).

Dedup: external_id = 'finance-mysql:position:<PK>' — idempotent on re-run.
Auth: aiomysql as MySQL root (user-authorized) against the standalone
mysql:8.4 in-cluster service.

New CLI: broker-sync finance-mysql-import
New tests: 5 unit tests covering route, symbol normalise, BUY/SELL
detection.

poetry run pytest -q   →  114 passed, 1 skipped
poetry run mypy        →  clean (aiomysql shielded with type: ignore)
poetry run ruff check  →  clean
2026-04-17 22:38:21 +00:00

144 lines
4.9 KiB
Python

"""Backfill-from-finance provider.
The retired `finance` app's MySQL has a `position` table with 5+ years of
InvestEngine + Schwab trade history (2020 onwards) that the broker-sync
pipeline otherwise can't reconstruct (IE's emails only go back to when
Viktor started receiving them; Schwab emails are sparse). This provider
reads that table once and emits canonical Activities so a full-history
backfill into Wealthfolio is possible.
Ticker routing to Wealthfolio accounts:
*.L (VUAG.L, VUSA.L, etc.) -> InvestEngine ISA (GBP)
everything else (META, *_US_EQ) -> Schwab (US workplace, USD)
Deduplication: the finance.position PK (a giant numeric string) goes into
external_id verbatim, so re-runs are idempotent against the sync_record
store.
"""
from __future__ import annotations
import logging
from collections.abc import AsyncIterator
from datetime import UTC, datetime
from decimal import Decimal
from typing import NamedTuple
import aiomysql # type: ignore[import-untyped]
from broker_sync.models import Account, AccountType, Activity, ActivityType
log = logging.getLogger(__name__)
IE_ACCOUNT_ID = "invest-engine-primary"
SCHWAB_ACCOUNT_ID = "schwab-workplace"
class FinanceMySQLCreds(NamedTuple):
host: str
port: int
user: str
password: str
database: str
def _route(ticker: str) -> tuple[str, AccountType, str]:
"""Return (account_id, account_type, currency) for a raw ticker."""
if ticker.endswith(".L"):
return IE_ACCOUNT_ID, AccountType.ISA, "GBP"
return SCHWAB_ACCOUNT_ID, AccountType.GIA, "USD"
def _normalise_symbol(ticker: str) -> str:
"""Strip finance-app quirks so the output symbol matches T212/Wealthfolio."""
# VUAG.L -> VUAG (LSE handled by Wealthfolio's exchange_mic resolution)
if ticker.endswith(".L"):
return ticker[:-2]
# FLME_US_EQ -> FLME (Trading212-style suffix leaked into the old finance DB)
if ticker.endswith("_US_EQ"):
return ticker[:-6]
if ticker.endswith("_EQ"):
return ticker[:-3]
return ticker
def _row_to_activity(row: dict[str, object]) -> Activity:
ticker = str(row["ticker"])
account_id, account_type, default_ccy = _route(ticker)
raw_qty = Decimal(str(row["num_shares"]))
activity_type = ActivityType.BUY if raw_qty > 0 else ActivityType.SELL
# buy_date from MySQL comes back as datetime (aiomysql converts)
dt = row["buy_date"]
if isinstance(dt, datetime):
date = dt if dt.tzinfo else dt.replace(tzinfo=UTC)
else:
date = datetime.fromisoformat(str(dt)).replace(tzinfo=UTC)
currency_raw = row.get("currency")
currency = str(currency_raw) if currency_raw else default_ccy
return Activity(
external_id=f"finance-mysql:position:{row['id']}",
account_id=account_id,
account_type=account_type,
date=date,
activity_type=activity_type,
symbol=_normalise_symbol(ticker),
quantity=abs(raw_qty),
unit_price=Decimal(str(row["buy_price"])),
currency=currency,
notes=f"finance-mysql:{ticker}",
)
class FinanceMySQLProvider:
"""Read-only backfill from the retired finance MySQL `position` table."""
name = "finance-mysql"
def __init__(self, creds: FinanceMySQLCreds) -> None:
self._creds = creds
def accounts(self) -> list[Account]:
return [
Account(
id=IE_ACCOUNT_ID,
name="InvestEngine ISA",
account_type=AccountType.ISA,
currency="GBP",
provider="invest-engine",
),
Account(
id=SCHWAB_ACCOUNT_ID,
name="Schwab (US workplace)",
account_type=AccountType.GIA,
currency="USD",
provider="schwab",
),
]
async def fetch(
self,
*,
since: datetime | None = None,
before: datetime | None = None,
) -> AsyncIterator[Activity]:
conn = await aiomysql.connect(
host=self._creds.host,
port=self._creds.port,
user=self._creds.user,
password=self._creds.password,
db=self._creds.database,
autocommit=True,
)
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("SELECT id, ticker, buy_price, num_shares, currency, buy_date, "
"account_id FROM position ORDER BY buy_date ASC")
rows = await cur.fetchall()
log.info("finance-mysql: %d position rows", len(rows))
for row in rows:
activity = _row_to_activity(row)
if since is not None and activity.date < since:
continue
if before is not None and activity.date >= before:
continue
yield activity
finally:
conn.close()