Add ECB FX fetcher + cache population

Context
-------
Phase 1 needs live FX rates for USD-denominated RSU vestings (Schwab),
EUR-denominated deposits, and any multi-currency dividend. The FxCache
from an earlier commit stores (currency, date) → rate_to_gbp but was
intentionally left empty — this commit wires the ingestion path.

Design decisions
----------------
- ECB publishes EUR→X, not X→GBP. Everything pivots through EUR:
      rate(X→GBP) = rate(EUR→GBP) / rate(EUR→X)
  GBP goes into the result at 1.0 so callers iterating the dict get
  a consistent shape; `populate_fx_cache` then skips GBP because
  `convert_to_gbp` has a dedicated passthrough branch.
- `on_date` parameter is accepted for API symmetry with the future
  historical fetcher even though the daily endpoint only serves the
  most recent publication. The docstring calls this out explicitly.
- XML is parsed with stdlib `xml.etree.ElementTree`. No `lxml` —
  the file is 30 lines, no performance concern, and keeping deps
  minimal matters for the container image.
- The HTTP layer takes an optional `httpx.AsyncBaseTransport` the same
  way WealthfolioSink does — MockTransport drives all tests, the
  production caller just leaves it None.

This change
-----------
- broker_sync/fx_ecb.py:
  * `fetch_ecb_rates(on_date, *, transport=None)` — GETs the daily
    XML, parses, pivots through EUR, returns `{ccy: rate_to_gbp}`.
    Raises `RuntimeError` on non-2xx or if GBP is missing (cannot
    pivot). No retries — caller handles resilience.
  * `populate_fx_cache(cache, rates, on_date)` — writes every
    non-GBP rate with `FxRateSource.ECB_LIVE`.
  * `fetch_ecb_rates_historical(start, end)` — `NotImplementedError`
    stub; filed as beads task code-thw.2.2. Needed for backfilling
    years of T212 history (daily endpoint only covers today).
- tests/fixtures/ecb_2026-04-01.xml: realistic 5-currency ECB snapshot.
- tests/test_fx_ecb.py: fixture-driven tests covering the pivot math,
  the 503 failure path, the cache-skip-GBP rule, and the NotImplemented
  guard on the historical stub.

Test plan
---------
## Automated
- poetry run pytest -q  →  52 passed in 0.50s
- poetry run mypy broker_sync tests  →  Success: no issues found in 26 source files
- poetry run ruff check .  →  All checks passed!

## Manual Verification
Live daily endpoint hit deferred to the CLI integration commit — the
fetcher is pure + transport-injectable, so the unit tests cover the
parsing and pivot logic, and the CLI wiring will be the place where
the live call is exercised end-to-end.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-04-17 19:32:23 +00:00
parent 18d8241c85
commit 56f3624344
3 changed files with 194 additions and 0 deletions

100
broker_sync/fx_ecb.py Normal file
View file

@ -0,0 +1,100 @@
"""ECB daily reference-rate fetcher.
The ECB publishes today's rates as an XML document at
https://www.ecb.europa.eu/stats/eurofxref/eurofxref-daily.xml. Rates are
given as EURX, so deriving rate(XGBP) needs a pivot through EUR:
rate(XGBP) = rate(EURGBP) / rate(EURX)
The daily file only contains the most recent publication historical
backfill needs the 90-day (`eurofxref-hist-90d.xml`) or full-history
(`eurofxref-hist.xml`) endpoints. Those are not wired yet because
backfilling years of T212 history is a Phase 1.1 concern; for now
`fetch_ecb_rates_historical` raises NotImplementedError.
"""
from __future__ import annotations
from datetime import date
from decimal import Decimal
from xml.etree import ElementTree as ET
import httpx
from broker_sync.fx import FxCache
from broker_sync.models import FxRateSource
_ECB_DAILY_URL = "https://www.ecb.europa.eu/stats/eurofxref/eurofxref-daily.xml"
_ECB_NS = "{http://www.ecb.int/vocabulary/2002-08-01/eurofxref}"
async def fetch_ecb_rates(
on_date: date,
*,
transport: httpx.AsyncBaseTransport | None = None,
) -> dict[str, Decimal]:
"""Return `{currency: rate_to_gbp}` for every currency the ECB publishes.
GBP is present in the result as rate 1.0 (the pivot target). EUR is
present with its direct EURGBP rate. Everything else is pivoted
through EUR.
`on_date` is accepted for API symmetry with the historical fetcher
but the daily endpoint always returns the latest publication the
caller is responsible for only invoking this for today's date.
"""
del on_date
async with httpx.AsyncClient(transport=transport, timeout=30.0) as c:
resp = await c.get(_ECB_DAILY_URL)
if resp.status_code >= 400:
raise RuntimeError(f"ECB daily fetch failed: HTTP {resp.status_code}")
eur_rates = _parse_eur_rates(resp.text)
if "GBP" not in eur_rates:
raise RuntimeError("ECB response missing GBP — cannot pivot")
eur_to_gbp = eur_rates["GBP"]
out: dict[str, Decimal] = {"GBP": Decimal("1"), "EUR": eur_to_gbp}
for ccy, eur_to_ccy in eur_rates.items():
if ccy == "GBP":
continue
out[ccy] = eur_to_gbp / eur_to_ccy
return out
def _parse_eur_rates(xml_text: str) -> dict[str, Decimal]:
root = ET.fromstring(xml_text)
rates: dict[str, Decimal] = {}
for cube in root.iter(f"{_ECB_NS}Cube"):
ccy = cube.get("currency")
rate = cube.get("rate")
if ccy and rate:
rates[ccy] = Decimal(rate)
return rates
async def populate_fx_cache(
cache: FxCache,
rates: dict[str, Decimal],
on_date: date,
) -> None:
"""Write non-GBP rates into the FxCache tagged `ECB_LIVE`.
GBP is intentionally skipped `convert_to_gbp` short-circuits on
GBP without touching the cache.
"""
for ccy, rate in rates.items():
if ccy == "GBP":
continue
cache.put(ccy, on_date, rate, FxRateSource.ECB_LIVE)
async def fetch_ecb_rates_historical(
start: date,
end: date,
) -> dict[date, dict[str, Decimal]]:
"""Placeholder for the 90-day / full-history ECB endpoints.
Tracked by beads task: see parent Phase 1 epic.
"""
raise NotImplementedError("Historical ECB fetch not wired; needs eurofxref-hist-90d.xml or "
"eurofxref-hist.xml — filed as a follow-up beads task.")