88 lines
2.7 KiB
Python
88 lines
2.7 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import sqlite3
|
||
|
|
from datetime import date
|
||
|
|
from decimal import Decimal
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
from broker_sync.models import FxRateSource
|
||
|
|
|
||
|
|
_SCHEMA = """
|
||
|
|
CREATE TABLE IF NOT EXISTS fx_rate (
|
||
|
|
currency TEXT NOT NULL,
|
||
|
|
on_date TEXT NOT NULL,
|
||
|
|
rate_gbp TEXT NOT NULL,
|
||
|
|
source TEXT NOT NULL,
|
||
|
|
PRIMARY KEY (currency, on_date)
|
||
|
|
);
|
||
|
|
"""
|
||
|
|
|
||
|
|
|
||
|
|
class FxCache:
|
||
|
|
"""SQLite-backed FX rate store.
|
||
|
|
|
||
|
|
Rate meaning: 1 unit of `currency` = `rate_gbp` GBP.
|
||
|
|
Source is upgradable — a later reconciliation run can replace an
|
||
|
|
ECB_LIVE row with the HMRC_MONTHLY value for the same date.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, db_path: Path | str) -> None:
|
||
|
|
self._path = Path(db_path)
|
||
|
|
self._path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
with self._conn() as c:
|
||
|
|
c.executescript(_SCHEMA)
|
||
|
|
|
||
|
|
def _conn(self) -> sqlite3.Connection:
|
||
|
|
return sqlite3.connect(self._path)
|
||
|
|
|
||
|
|
def get(self, currency: str, on_date: date) -> tuple[Decimal, FxRateSource] | None:
|
||
|
|
with self._conn() as c:
|
||
|
|
row = c.execute(
|
||
|
|
"SELECT rate_gbp, source FROM fx_rate WHERE currency=? AND on_date=?",
|
||
|
|
(currency.upper(), on_date.isoformat()),
|
||
|
|
).fetchone()
|
||
|
|
if row is None:
|
||
|
|
return None
|
||
|
|
return Decimal(row[0]), FxRateSource(row[1])
|
||
|
|
|
||
|
|
def put(
|
||
|
|
self,
|
||
|
|
currency: str,
|
||
|
|
on_date: date,
|
||
|
|
rate_gbp: Decimal,
|
||
|
|
source: FxRateSource,
|
||
|
|
) -> None:
|
||
|
|
with self._conn() as c:
|
||
|
|
c.execute(
|
||
|
|
"INSERT INTO fx_rate (currency, on_date, rate_gbp, source) "
|
||
|
|
"VALUES (?, ?, ?, ?) "
|
||
|
|
"ON CONFLICT(currency, on_date) DO UPDATE SET "
|
||
|
|
"rate_gbp=excluded.rate_gbp, source=excluded.source",
|
||
|
|
(currency.upper(), on_date.isoformat(), str(rate_gbp), str(source)),
|
||
|
|
)
|
||
|
|
c.commit()
|
||
|
|
|
||
|
|
|
||
|
|
def convert_to_gbp(
|
||
|
|
amount: Decimal,
|
||
|
|
currency: str,
|
||
|
|
on_date: date,
|
||
|
|
cache: FxCache | None,
|
||
|
|
) -> tuple[Decimal, Decimal, FxRateSource]:
|
||
|
|
"""Return (amount_gbp, rate_used, source).
|
||
|
|
|
||
|
|
- GBP passes through with rate=1 and source=ECB_LIVE (conventional).
|
||
|
|
- Otherwise the cache must have a rate for (currency, on_date). The
|
||
|
|
caller is responsible for populating the cache from ECB (live) or
|
||
|
|
HMRC (reconciliation) upstream.
|
||
|
|
"""
|
||
|
|
if currency.upper() == "GBP":
|
||
|
|
return amount, Decimal("1"), FxRateSource.ECB_LIVE
|
||
|
|
if cache is None:
|
||
|
|
raise LookupError(f"No FX cache available for {currency} {on_date.isoformat()}")
|
||
|
|
hit = cache.get(currency, on_date)
|
||
|
|
if hit is None:
|
||
|
|
raise LookupError(f"No FX rate cached for {currency.upper()} {on_date.isoformat()}")
|
||
|
|
rate, source = hit
|
||
|
|
return amount * rate, rate, source
|