Add finance_mysql provider + CLI for historical backfill

finance.position (171 rows, 2020-06-07 to 2025-12-19) is the only source
of InvestEngine + Schwab trade history pre-dating the broker-sync project.
This provider reads it once and pushes every row into the correct WF
account (.L tickers → IE ISA, others → Schwab).

Dedup: external_id = 'finance-mysql:position:<PK>' — idempotent on re-run.
Auth: aiomysql as MySQL root (user-authorized) against the standalone
mysql:8.4 in-cluster service.

New CLI: broker-sync finance-mysql-import
New tests: 5 unit tests covering route, symbol normalise, BUY/SELL
detection.

poetry run pytest -q   →  114 passed, 1 skipped
poetry run mypy        →  clean (aiomysql shielded with type: ignore)
poetry run ruff check  →  clean
This commit is contained in:
Viktor Barzin 2026-04-17 22:38:21 +00:00
parent 74b2179c83
commit a190875f63
6 changed files with 318 additions and 9 deletions

View file

@ -230,6 +230,71 @@ def invest_engine(
asyncio.run(_run())
@app.command("finance-mysql-import")
def finance_mysql_import(
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),
wf_username: str = typer.Option(..., envvar="WF_USERNAME"),
wf_password: str = typer.Option(..., envvar="WF_PASSWORD"),
wf_session_path: str = typer.Option("/data/wealthfolio_session.json",
envvar="WF_SESSION_PATH"),
db_host: str = typer.Option(..., envvar="FINANCE_DB_HOST"),
db_port: int = typer.Option(3306, envvar="FINANCE_DB_PORT"),
db_user: str = typer.Option(..., envvar="FINANCE_DB_USER"),
db_password: str = typer.Option(..., envvar="FINANCE_DB_PASSWORD"),
db_name: str = typer.Option("finance", envvar="FINANCE_DB_NAME"),
data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"),
) -> None:
"""One-shot backfill: read the retired finance app's MySQL position table
and push every row into the correct Wealthfolio account (IE for .L
tickers, Schwab for US tickers). Idempotent via dedup."""
from broker_sync.dedup import SyncRecordStore
from broker_sync.pipeline import sync_provider_to_wealthfolio
from broker_sync.providers.finance_mysql import (
FinanceMySQLCreds,
FinanceMySQLProvider,
)
from broker_sync.sinks.wealthfolio import WealthfolioSink
_setup_logging()
data = Path(data_dir)
data.mkdir(parents=True, exist_ok=True)
async def _run() -> None:
sink = WealthfolioSink(
base_url=wf_base_url,
username=wf_username,
password=wf_password,
session_path=wf_session_path,
)
provider = FinanceMySQLProvider(
FinanceMySQLCreds(
host=db_host,
port=db_port,
user=db_user,
password=db_password,
database=db_name,
))
dedup = SyncRecordStore(data / "sync.db")
try:
if not Path(wf_session_path).exists():
await sink.login()
result = await sync_provider_to_wealthfolio(
provider=provider,
sink=sink,
dedup=dedup,
)
finally:
await sink.close()
typer.echo(f"finance-mysql: fetched={result.fetched} "
f"new={result.new_after_dedup} "
f"imported={result.imported} "
f"failed={result.failed}")
if result.failed > 0:
sys.exit(1)
asyncio.run(_run())
@app.command("imap-ingest")
def imap_ingest(
wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"),

View file

@ -0,0 +1,144 @@
"""Backfill-from-finance provider.
The retired `finance` app's MySQL has a `position` table with 5+ years of
InvestEngine + Schwab trade history (2020 onwards) that the broker-sync
pipeline otherwise can't reconstruct (IE's emails only go back to when
Viktor started receiving them; Schwab emails are sparse). This provider
reads that table once and emits canonical Activities so a full-history
backfill into Wealthfolio is possible.
Ticker routing to Wealthfolio accounts:
*.L (VUAG.L, VUSA.L, etc.) -> InvestEngine ISA (GBP)
everything else (META, *_US_EQ) -> Schwab (US workplace, USD)
Deduplication: the finance.position PK (a giant numeric string) goes into
external_id verbatim, so re-runs are idempotent against the sync_record
store.
"""
from __future__ import annotations
import logging
from collections.abc import AsyncIterator
from datetime import UTC, datetime
from decimal import Decimal
from typing import NamedTuple
import aiomysql # type: ignore[import-untyped]
from broker_sync.models import Account, AccountType, Activity, ActivityType
log = logging.getLogger(__name__)
IE_ACCOUNT_ID = "invest-engine-primary"
SCHWAB_ACCOUNT_ID = "schwab-workplace"
class FinanceMySQLCreds(NamedTuple):
host: str
port: int
user: str
password: str
database: str
def _route(ticker: str) -> tuple[str, AccountType, str]:
"""Return (account_id, account_type, currency) for a raw ticker."""
if ticker.endswith(".L"):
return IE_ACCOUNT_ID, AccountType.ISA, "GBP"
return SCHWAB_ACCOUNT_ID, AccountType.GIA, "USD"
def _normalise_symbol(ticker: str) -> str:
"""Strip finance-app quirks so the output symbol matches T212/Wealthfolio."""
# VUAG.L -> VUAG (LSE handled by Wealthfolio's exchange_mic resolution)
if ticker.endswith(".L"):
return ticker[:-2]
# FLME_US_EQ -> FLME (Trading212-style suffix leaked into the old finance DB)
if ticker.endswith("_US_EQ"):
return ticker[:-6]
if ticker.endswith("_EQ"):
return ticker[:-3]
return ticker
def _row_to_activity(row: dict[str, object]) -> Activity:
ticker = str(row["ticker"])
account_id, account_type, default_ccy = _route(ticker)
raw_qty = Decimal(str(row["num_shares"]))
activity_type = ActivityType.BUY if raw_qty > 0 else ActivityType.SELL
# buy_date from MySQL comes back as datetime (aiomysql converts)
dt = row["buy_date"]
if isinstance(dt, datetime):
date = dt if dt.tzinfo else dt.replace(tzinfo=UTC)
else:
date = datetime.fromisoformat(str(dt)).replace(tzinfo=UTC)
currency_raw = row.get("currency")
currency = str(currency_raw) if currency_raw else default_ccy
return Activity(
external_id=f"finance-mysql:position:{row['id']}",
account_id=account_id,
account_type=account_type,
date=date,
activity_type=activity_type,
symbol=_normalise_symbol(ticker),
quantity=abs(raw_qty),
unit_price=Decimal(str(row["buy_price"])),
currency=currency,
notes=f"finance-mysql:{ticker}",
)
class FinanceMySQLProvider:
"""Read-only backfill from the retired finance MySQL `position` table."""
name = "finance-mysql"
def __init__(self, creds: FinanceMySQLCreds) -> None:
self._creds = creds
def accounts(self) -> list[Account]:
return [
Account(
id=IE_ACCOUNT_ID,
name="InvestEngine ISA",
account_type=AccountType.ISA,
currency="GBP",
provider="invest-engine",
),
Account(
id=SCHWAB_ACCOUNT_ID,
name="Schwab (US workplace)",
account_type=AccountType.GIA,
currency="USD",
provider="schwab",
),
]
async def fetch(
self,
*,
since: datetime | None = None,
before: datetime | None = None,
) -> AsyncIterator[Activity]:
conn = await aiomysql.connect(
host=self._creds.host,
port=self._creds.port,
user=self._creds.user,
password=self._creds.password,
db=self._creds.database,
autocommit=True,
)
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("SELECT id, ticker, buy_price, num_shares, currency, buy_date, "
"account_id FROM position ORDER BY buy_date ASC")
rows = await cur.fetchall()
log.info("finance-mysql: %d position rows", len(rows))
for row in rows:
activity = _row_to_activity(row)
if since is not None and activity.date < since:
continue
if before is not None and activity.date >= before:
continue
yield activity
finally:
conn.close()

View file

@ -243,11 +243,9 @@ class WealthfolioSink:
err_msg = summary.get("errorMessage") or "no errorMessage"
skipped = int(summary.get("skipped", 0))
dupes = int(summary.get("duplicates", 0))
raise ImportValidationError(
f"Wealthfolio /import persisted {imported_n}/{total_n} "
f"(skipped={skipped} duplicates={dupes}). "
f"errorMessage: {err_msg}"
)
raise ImportValidationError(f"Wealthfolio /import persisted {imported_n}/{total_n} "
f"(skipped={skipped} duplicates={dupes}). "
f"errorMessage: {err_msg}")
# Legacy silent-drop guard for no-summary responses.
elif valid_rows and not got:
first_warn = next(
@ -257,6 +255,6 @@ class WealthfolioSink:
raise ImportValidationError(
f"Wealthfolio /import silently dropped all {len(valid_rows)} rows. "
f"First checked row: {checked[0] if checked else 'none'}. "
f"First warning: {first_warn}"
)
return got
f"First warning: {first_warn}")
assert isinstance(got, list)
return [r for r in got if isinstance(r, dict)]

37
poetry.lock generated
View file

@ -1,5 +1,24 @@
# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand.
[[package]]
name = "aiomysql"
version = "0.3.2"
description = "MySQL driver for asyncio."
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "aiomysql-0.3.2-py3-none-any.whl", hash = "sha256:c82c5ba04137d7afd5c693a258bea8ead2aad77101668044143a991e04632eb2"},
{file = "aiomysql-0.3.2.tar.gz", hash = "sha256:72d15ef5cfc34c03468eb41e1b90adb9fd9347b0b589114bd23ead569a02ac1a"},
]
[package.dependencies]
PyMySQL = ">=1.0"
[package.extras]
rsa = ["PyMySQL[rsa] (>=1.0)"]
sa = ["sqlalchemy (>=1.3,<1.4)"]
[[package]]
name = "anyio"
version = "4.13.0"
@ -459,6 +478,22 @@ files = [
[package.extras]
windows-terminal = ["colorama (>=0.4.6)"]
[[package]]
name = "pymysql"
version = "1.1.2"
description = "Pure Python MySQL Driver"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "pymysql-1.1.2-py3-none-any.whl", hash = "sha256:e6b1d89711dd51f8f74b1631fe08f039e7d76cf67a42a323d3178f0f25762ed9"},
{file = "pymysql-1.1.2.tar.gz", hash = "sha256:4961d3e165614ae65014e361811a724e2044ad3ea3739de9903ae7c21f539f03"},
]
[package.extras]
ed25519 = ["PyNaCl (>=1.4.0)"]
rsa = ["cryptography"]
[[package]]
name = "pytest"
version = "8.4.2"
@ -670,4 +705,4 @@ platformdirs = ">=3.5.1"
[metadata]
lock-version = "2.1"
python-versions = ">=3.11,<3.13"
content-hash = "04a3e24fe45c75f975140aff6076af0a156772a1a8e82eba30ee2345ac1d8bd6"
content-hash = "dcc5b4eadd0a8df900e74674acf33215091dcb9bd0fffcefb03607dde2408a16"

View file

@ -13,6 +13,7 @@ beautifulsoup4 = "^4.12"
python-dateutil = "^2.9"
typer = "^0.12"
click = "<8.2" # typer 0.12 uses make_metavar() without ctx; click 8.2 made ctx required
aiomysql = "^0.3.2"
[tool.poetry.group.dev.dependencies]
pytest = "^8.3"

View file

@ -0,0 +1,66 @@
from __future__ import annotations
from datetime import UTC, datetime
from decimal import Decimal
from broker_sync.models import AccountType, ActivityType
from broker_sync.providers.finance_mysql import _normalise_symbol, _route, _row_to_activity
def test_lse_ticker_routes_to_investengine() -> None:
acct, t, ccy = _route("VUAG.L")
assert acct == "invest-engine-primary"
assert t is AccountType.ISA
assert ccy == "GBP"
def test_us_ticker_routes_to_schwab() -> None:
assert _route("META") == ("schwab-workplace", AccountType.GIA, "USD")
assert _route("FLME_US_EQ") == ("schwab-workplace", AccountType.GIA, "USD")
def test_normalise_symbol() -> None:
assert _normalise_symbol("VUAG.L") == "VUAG"
assert _normalise_symbol("VUSA.L") == "VUSA"
assert _normalise_symbol("META") == "META"
assert _normalise_symbol("FLME_US_EQ") == "FLME"
assert _normalise_symbol("FOO_EQ") == "FOO"
def test_row_to_buy_activity() -> None:
row = {
"id": "123456",
"ticker": "VUAG.L",
"buy_price": 85.5,
"num_shares": 10.0,
"currency": "GBP",
"buy_date": datetime(2022, 3, 15, 10, 30),
"account_id": 1,
}
a = _row_to_activity(row)
assert a.external_id == "finance-mysql:position:123456"
assert a.account_id == "invest-engine-primary"
assert a.account_type is AccountType.ISA
assert a.activity_type is ActivityType.BUY
assert a.symbol == "VUAG" # .L stripped
assert a.quantity == Decimal("10.0")
assert a.unit_price == Decimal("85.5")
assert a.currency == "GBP"
assert a.date == datetime(2022, 3, 15, 10, 30, tzinfo=UTC)
def test_row_to_sell_when_qty_negative() -> None:
row = {
"id": "x",
"ticker": "META",
"buy_price": 450.0,
"num_shares": -2.5, # sell
"currency": "USD",
"buy_date": datetime(2024, 8, 5),
"account_id": 1,
}
a = _row_to_activity(row)
assert a.activity_type is ActivityType.SELL
assert a.quantity == Decimal("2.5") # absolute
assert a.account_id == "schwab-workplace"
assert a.symbol == "META"