finance.position (171 rows, 2020-06-07 to 2025-12-19) is the only source of InvestEngine + Schwab trade history pre-dating the broker-sync project. This provider reads it once and pushes every row into the correct WF account (.L tickers → IE ISA, others → Schwab). Dedup: external_id = 'finance-mysql:position:<PK>' — idempotent on re-run. Auth: aiomysql as MySQL root (user-authorized) against the standalone mysql:8.4 in-cluster service. New CLI: broker-sync finance-mysql-import New tests: 5 unit tests covering route, symbol normalise, BUY/SELL detection. poetry run pytest -q → 114 passed, 1 skipped poetry run mypy → clean (aiomysql shielded with type: ignore) poetry run ruff check → clean
144 lines
4.9 KiB
Python
144 lines
4.9 KiB
Python
"""Backfill-from-finance provider.
|
|
|
|
The retired `finance` app's MySQL has a `position` table with 5+ years of
|
|
InvestEngine + Schwab trade history (2020 onwards) that the broker-sync
|
|
pipeline otherwise can't reconstruct (IE's emails only go back to when
|
|
Viktor started receiving them; Schwab emails are sparse). This provider
|
|
reads that table once and emits canonical Activities so a full-history
|
|
backfill into Wealthfolio is possible.
|
|
|
|
Ticker routing to Wealthfolio accounts:
|
|
*.L (VUAG.L, VUSA.L, etc.) -> InvestEngine ISA (GBP)
|
|
everything else (META, *_US_EQ) -> Schwab (US workplace, USD)
|
|
|
|
Deduplication: the finance.position PK (a giant numeric string) goes into
|
|
external_id verbatim, so re-runs are idempotent against the sync_record
|
|
store.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from collections.abc import AsyncIterator
|
|
from datetime import UTC, datetime
|
|
from decimal import Decimal
|
|
from typing import NamedTuple
|
|
|
|
import aiomysql # type: ignore[import-untyped]
|
|
|
|
from broker_sync.models import Account, AccountType, Activity, ActivityType
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
IE_ACCOUNT_ID = "invest-engine-primary"
|
|
SCHWAB_ACCOUNT_ID = "schwab-workplace"
|
|
|
|
|
|
class FinanceMySQLCreds(NamedTuple):
|
|
host: str
|
|
port: int
|
|
user: str
|
|
password: str
|
|
database: str
|
|
|
|
|
|
def _route(ticker: str) -> tuple[str, AccountType, str]:
|
|
"""Return (account_id, account_type, currency) for a raw ticker."""
|
|
if ticker.endswith(".L"):
|
|
return IE_ACCOUNT_ID, AccountType.ISA, "GBP"
|
|
return SCHWAB_ACCOUNT_ID, AccountType.GIA, "USD"
|
|
|
|
|
|
def _normalise_symbol(ticker: str) -> str:
|
|
"""Strip finance-app quirks so the output symbol matches T212/Wealthfolio."""
|
|
# VUAG.L -> VUAG (LSE handled by Wealthfolio's exchange_mic resolution)
|
|
if ticker.endswith(".L"):
|
|
return ticker[:-2]
|
|
# FLME_US_EQ -> FLME (Trading212-style suffix leaked into the old finance DB)
|
|
if ticker.endswith("_US_EQ"):
|
|
return ticker[:-6]
|
|
if ticker.endswith("_EQ"):
|
|
return ticker[:-3]
|
|
return ticker
|
|
|
|
|
|
def _row_to_activity(row: dict[str, object]) -> Activity:
|
|
ticker = str(row["ticker"])
|
|
account_id, account_type, default_ccy = _route(ticker)
|
|
raw_qty = Decimal(str(row["num_shares"]))
|
|
activity_type = ActivityType.BUY if raw_qty > 0 else ActivityType.SELL
|
|
# buy_date from MySQL comes back as datetime (aiomysql converts)
|
|
dt = row["buy_date"]
|
|
if isinstance(dt, datetime):
|
|
date = dt if dt.tzinfo else dt.replace(tzinfo=UTC)
|
|
else:
|
|
date = datetime.fromisoformat(str(dt)).replace(tzinfo=UTC)
|
|
currency_raw = row.get("currency")
|
|
currency = str(currency_raw) if currency_raw else default_ccy
|
|
return Activity(
|
|
external_id=f"finance-mysql:position:{row['id']}",
|
|
account_id=account_id,
|
|
account_type=account_type,
|
|
date=date,
|
|
activity_type=activity_type,
|
|
symbol=_normalise_symbol(ticker),
|
|
quantity=abs(raw_qty),
|
|
unit_price=Decimal(str(row["buy_price"])),
|
|
currency=currency,
|
|
notes=f"finance-mysql:{ticker}",
|
|
)
|
|
|
|
|
|
class FinanceMySQLProvider:
|
|
"""Read-only backfill from the retired finance MySQL `position` table."""
|
|
name = "finance-mysql"
|
|
|
|
def __init__(self, creds: FinanceMySQLCreds) -> None:
|
|
self._creds = creds
|
|
|
|
def accounts(self) -> list[Account]:
|
|
return [
|
|
Account(
|
|
id=IE_ACCOUNT_ID,
|
|
name="InvestEngine ISA",
|
|
account_type=AccountType.ISA,
|
|
currency="GBP",
|
|
provider="invest-engine",
|
|
),
|
|
Account(
|
|
id=SCHWAB_ACCOUNT_ID,
|
|
name="Schwab (US workplace)",
|
|
account_type=AccountType.GIA,
|
|
currency="USD",
|
|
provider="schwab",
|
|
),
|
|
]
|
|
|
|
async def fetch(
|
|
self,
|
|
*,
|
|
since: datetime | None = None,
|
|
before: datetime | None = None,
|
|
) -> AsyncIterator[Activity]:
|
|
conn = await aiomysql.connect(
|
|
host=self._creds.host,
|
|
port=self._creds.port,
|
|
user=self._creds.user,
|
|
password=self._creds.password,
|
|
db=self._creds.database,
|
|
autocommit=True,
|
|
)
|
|
try:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
await cur.execute("SELECT id, ticker, buy_price, num_shares, currency, buy_date, "
|
|
"account_id FROM position ORDER BY buy_date ASC")
|
|
rows = await cur.fetchall()
|
|
log.info("finance-mysql: %d position rows", len(rows))
|
|
for row in rows:
|
|
activity = _row_to_activity(row)
|
|
if since is not None and activity.date < since:
|
|
continue
|
|
if before is not None and activity.date >= before:
|
|
continue
|
|
yield activity
|
|
finally:
|
|
conn.close()
|