diff --git a/broker_sync/cli.py b/broker_sync/cli.py index ea7d8c9..3b4ff22 100644 --- a/broker_sync/cli.py +++ b/broker_sync/cli.py @@ -230,6 +230,71 @@ def invest_engine( asyncio.run(_run()) +@app.command("finance-mysql-import") +def finance_mysql_import( + wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), + wf_username: str = typer.Option(..., envvar="WF_USERNAME"), + wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), + wf_session_path: str = typer.Option("/data/wealthfolio_session.json", + envvar="WF_SESSION_PATH"), + db_host: str = typer.Option(..., envvar="FINANCE_DB_HOST"), + db_port: int = typer.Option(3306, envvar="FINANCE_DB_PORT"), + db_user: str = typer.Option(..., envvar="FINANCE_DB_USER"), + db_password: str = typer.Option(..., envvar="FINANCE_DB_PASSWORD"), + db_name: str = typer.Option("finance", envvar="FINANCE_DB_NAME"), + data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"), +) -> None: + """One-shot backfill: read the retired finance app's MySQL position table + and push every row into the correct Wealthfolio account (IE for .L + tickers, Schwab for US tickers). Idempotent via dedup.""" + from broker_sync.dedup import SyncRecordStore + from broker_sync.pipeline import sync_provider_to_wealthfolio + from broker_sync.providers.finance_mysql import ( + FinanceMySQLCreds, + FinanceMySQLProvider, + ) + from broker_sync.sinks.wealthfolio import WealthfolioSink + + _setup_logging() + data = Path(data_dir) + data.mkdir(parents=True, exist_ok=True) + + async def _run() -> None: + sink = WealthfolioSink( + base_url=wf_base_url, + username=wf_username, + password=wf_password, + session_path=wf_session_path, + ) + provider = FinanceMySQLProvider( + FinanceMySQLCreds( + host=db_host, + port=db_port, + user=db_user, + password=db_password, + database=db_name, + )) + dedup = SyncRecordStore(data / "sync.db") + try: + if not Path(wf_session_path).exists(): + await sink.login() + result = await sync_provider_to_wealthfolio( + provider=provider, + sink=sink, + dedup=dedup, + ) + finally: + await sink.close() + typer.echo(f"finance-mysql: fetched={result.fetched} " + f"new={result.new_after_dedup} " + f"imported={result.imported} " + f"failed={result.failed}") + if result.failed > 0: + sys.exit(1) + + asyncio.run(_run()) + + @app.command("imap-ingest") def imap_ingest( wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), diff --git a/broker_sync/providers/finance_mysql.py b/broker_sync/providers/finance_mysql.py new file mode 100644 index 0000000..61eee7d --- /dev/null +++ b/broker_sync/providers/finance_mysql.py @@ -0,0 +1,144 @@ +"""Backfill-from-finance provider. + +The retired `finance` app's MySQL has a `position` table with 5+ years of +InvestEngine + Schwab trade history (2020 onwards) that the broker-sync +pipeline otherwise can't reconstruct (IE's emails only go back to when +Viktor started receiving them; Schwab emails are sparse). This provider +reads that table once and emits canonical Activities so a full-history +backfill into Wealthfolio is possible. + +Ticker routing to Wealthfolio accounts: + *.L (VUAG.L, VUSA.L, etc.) -> InvestEngine ISA (GBP) + everything else (META, *_US_EQ) -> Schwab (US workplace, USD) + +Deduplication: the finance.position PK (a giant numeric string) goes into +external_id verbatim, so re-runs are idempotent against the sync_record +store. +""" +from __future__ import annotations + +import logging +from collections.abc import AsyncIterator +from datetime import UTC, datetime +from decimal import Decimal +from typing import NamedTuple + +import aiomysql # type: ignore[import-untyped] + +from broker_sync.models import Account, AccountType, Activity, ActivityType + +log = logging.getLogger(__name__) + +IE_ACCOUNT_ID = "invest-engine-primary" +SCHWAB_ACCOUNT_ID = "schwab-workplace" + + +class FinanceMySQLCreds(NamedTuple): + host: str + port: int + user: str + password: str + database: str + + +def _route(ticker: str) -> tuple[str, AccountType, str]: + """Return (account_id, account_type, currency) for a raw ticker.""" + if ticker.endswith(".L"): + return IE_ACCOUNT_ID, AccountType.ISA, "GBP" + return SCHWAB_ACCOUNT_ID, AccountType.GIA, "USD" + + +def _normalise_symbol(ticker: str) -> str: + """Strip finance-app quirks so the output symbol matches T212/Wealthfolio.""" + # VUAG.L -> VUAG (LSE handled by Wealthfolio's exchange_mic resolution) + if ticker.endswith(".L"): + return ticker[:-2] + # FLME_US_EQ -> FLME (Trading212-style suffix leaked into the old finance DB) + if ticker.endswith("_US_EQ"): + return ticker[:-6] + if ticker.endswith("_EQ"): + return ticker[:-3] + return ticker + + +def _row_to_activity(row: dict[str, object]) -> Activity: + ticker = str(row["ticker"]) + account_id, account_type, default_ccy = _route(ticker) + raw_qty = Decimal(str(row["num_shares"])) + activity_type = ActivityType.BUY if raw_qty > 0 else ActivityType.SELL + # buy_date from MySQL comes back as datetime (aiomysql converts) + dt = row["buy_date"] + if isinstance(dt, datetime): + date = dt if dt.tzinfo else dt.replace(tzinfo=UTC) + else: + date = datetime.fromisoformat(str(dt)).replace(tzinfo=UTC) + currency_raw = row.get("currency") + currency = str(currency_raw) if currency_raw else default_ccy + return Activity( + external_id=f"finance-mysql:position:{row['id']}", + account_id=account_id, + account_type=account_type, + date=date, + activity_type=activity_type, + symbol=_normalise_symbol(ticker), + quantity=abs(raw_qty), + unit_price=Decimal(str(row["buy_price"])), + currency=currency, + notes=f"finance-mysql:{ticker}", + ) + + +class FinanceMySQLProvider: + """Read-only backfill from the retired finance MySQL `position` table.""" + name = "finance-mysql" + + def __init__(self, creds: FinanceMySQLCreds) -> None: + self._creds = creds + + def accounts(self) -> list[Account]: + return [ + Account( + id=IE_ACCOUNT_ID, + name="InvestEngine ISA", + account_type=AccountType.ISA, + currency="GBP", + provider="invest-engine", + ), + Account( + id=SCHWAB_ACCOUNT_ID, + name="Schwab (US workplace)", + account_type=AccountType.GIA, + currency="USD", + provider="schwab", + ), + ] + + async def fetch( + self, + *, + since: datetime | None = None, + before: datetime | None = None, + ) -> AsyncIterator[Activity]: + conn = await aiomysql.connect( + host=self._creds.host, + port=self._creds.port, + user=self._creds.user, + password=self._creds.password, + db=self._creds.database, + autocommit=True, + ) + try: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute("SELECT id, ticker, buy_price, num_shares, currency, buy_date, " + "account_id FROM position ORDER BY buy_date ASC") + rows = await cur.fetchall() + log.info("finance-mysql: %d position rows", len(rows)) + for row in rows: + activity = _row_to_activity(row) + if since is not None and activity.date < since: + continue + if before is not None and activity.date >= before: + continue + yield activity + finally: + conn.close() diff --git a/broker_sync/sinks/wealthfolio.py b/broker_sync/sinks/wealthfolio.py index e69cd73..4d73412 100644 --- a/broker_sync/sinks/wealthfolio.py +++ b/broker_sync/sinks/wealthfolio.py @@ -243,11 +243,9 @@ class WealthfolioSink: err_msg = summary.get("errorMessage") or "no errorMessage" skipped = int(summary.get("skipped", 0)) dupes = int(summary.get("duplicates", 0)) - raise ImportValidationError( - f"Wealthfolio /import persisted {imported_n}/{total_n} " - f"(skipped={skipped} duplicates={dupes}). " - f"errorMessage: {err_msg}" - ) + raise ImportValidationError(f"Wealthfolio /import persisted {imported_n}/{total_n} " + f"(skipped={skipped} duplicates={dupes}). " + f"errorMessage: {err_msg}") # Legacy silent-drop guard for no-summary responses. elif valid_rows and not got: first_warn = next( @@ -257,6 +255,6 @@ class WealthfolioSink: raise ImportValidationError( f"Wealthfolio /import silently dropped all {len(valid_rows)} rows. " f"First checked row: {checked[0] if checked else 'none'}. " - f"First warning: {first_warn}" - ) - return got + f"First warning: {first_warn}") + assert isinstance(got, list) + return [r for r in got if isinstance(r, dict)] diff --git a/poetry.lock b/poetry.lock index 73fc482..07fce53 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,24 @@ # This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +[[package]] +name = "aiomysql" +version = "0.3.2" +description = "MySQL driver for asyncio." +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "aiomysql-0.3.2-py3-none-any.whl", hash = "sha256:c82c5ba04137d7afd5c693a258bea8ead2aad77101668044143a991e04632eb2"}, + {file = "aiomysql-0.3.2.tar.gz", hash = "sha256:72d15ef5cfc34c03468eb41e1b90adb9fd9347b0b589114bd23ead569a02ac1a"}, +] + +[package.dependencies] +PyMySQL = ">=1.0" + +[package.extras] +rsa = ["PyMySQL[rsa] (>=1.0)"] +sa = ["sqlalchemy (>=1.3,<1.4)"] + [[package]] name = "anyio" version = "4.13.0" @@ -459,6 +478,22 @@ files = [ [package.extras] windows-terminal = ["colorama (>=0.4.6)"] +[[package]] +name = "pymysql" +version = "1.1.2" +description = "Pure Python MySQL Driver" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "pymysql-1.1.2-py3-none-any.whl", hash = "sha256:e6b1d89711dd51f8f74b1631fe08f039e7d76cf67a42a323d3178f0f25762ed9"}, + {file = "pymysql-1.1.2.tar.gz", hash = "sha256:4961d3e165614ae65014e361811a724e2044ad3ea3739de9903ae7c21f539f03"}, +] + +[package.extras] +ed25519 = ["PyNaCl (>=1.4.0)"] +rsa = ["cryptography"] + [[package]] name = "pytest" version = "8.4.2" @@ -670,4 +705,4 @@ platformdirs = ">=3.5.1" [metadata] lock-version = "2.1" python-versions = ">=3.11,<3.13" -content-hash = "04a3e24fe45c75f975140aff6076af0a156772a1a8e82eba30ee2345ac1d8bd6" +content-hash = "dcc5b4eadd0a8df900e74674acf33215091dcb9bd0fffcefb03607dde2408a16" diff --git a/pyproject.toml b/pyproject.toml index 0a25a66..680f5ee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ beautifulsoup4 = "^4.12" python-dateutil = "^2.9" typer = "^0.12" click = "<8.2" # typer 0.12 uses make_metavar() without ctx; click 8.2 made ctx required +aiomysql = "^0.3.2" [tool.poetry.group.dev.dependencies] pytest = "^8.3" diff --git a/tests/providers/test_finance_mysql.py b/tests/providers/test_finance_mysql.py new file mode 100644 index 0000000..2887694 --- /dev/null +++ b/tests/providers/test_finance_mysql.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from datetime import UTC, datetime +from decimal import Decimal + +from broker_sync.models import AccountType, ActivityType +from broker_sync.providers.finance_mysql import _normalise_symbol, _route, _row_to_activity + + +def test_lse_ticker_routes_to_investengine() -> None: + acct, t, ccy = _route("VUAG.L") + assert acct == "invest-engine-primary" + assert t is AccountType.ISA + assert ccy == "GBP" + + +def test_us_ticker_routes_to_schwab() -> None: + assert _route("META") == ("schwab-workplace", AccountType.GIA, "USD") + assert _route("FLME_US_EQ") == ("schwab-workplace", AccountType.GIA, "USD") + + +def test_normalise_symbol() -> None: + assert _normalise_symbol("VUAG.L") == "VUAG" + assert _normalise_symbol("VUSA.L") == "VUSA" + assert _normalise_symbol("META") == "META" + assert _normalise_symbol("FLME_US_EQ") == "FLME" + assert _normalise_symbol("FOO_EQ") == "FOO" + + +def test_row_to_buy_activity() -> None: + row = { + "id": "123456", + "ticker": "VUAG.L", + "buy_price": 85.5, + "num_shares": 10.0, + "currency": "GBP", + "buy_date": datetime(2022, 3, 15, 10, 30), + "account_id": 1, + } + a = _row_to_activity(row) + assert a.external_id == "finance-mysql:position:123456" + assert a.account_id == "invest-engine-primary" + assert a.account_type is AccountType.ISA + assert a.activity_type is ActivityType.BUY + assert a.symbol == "VUAG" # .L stripped + assert a.quantity == Decimal("10.0") + assert a.unit_price == Decimal("85.5") + assert a.currency == "GBP" + assert a.date == datetime(2022, 3, 15, 10, 30, tzinfo=UTC) + + +def test_row_to_sell_when_qty_negative() -> None: + row = { + "id": "x", + "ticker": "META", + "buy_price": 450.0, + "num_shares": -2.5, # sell + "currency": "USD", + "buy_date": datetime(2024, 8, 5), + "account_id": 1, + } + a = _row_to_activity(row) + assert a.activity_type is ActivityType.SELL + assert a.quantity == Decimal("2.5") # absolute + assert a.account_id == "schwab-workplace" + assert a.symbol == "META"