trading/backtester/data_loader.py

99 lines
3.4 KiB
Python

"""Historical data loader for backtesting.
:class:`BacktestDataLoader` takes pre-loaded bar and sentiment data and
yields it in chronological order, making the backtester independent of
any database.
"""
from __future__ import annotations
from collections import defaultdict
from datetime import datetime
from typing import Any, AsyncIterator
from shared.schemas.trading import SentimentContext
class BacktestDataLoader:
"""Iterates over historical bars (and optional sentiment) chronologically.
Parameters
----------
bars:
Pre-loaded OHLCV data. Each dict must contain at minimum:
``timestamp``, ``ticker``, ``open``, ``high``, ``low``,
``close``, ``volume``.
sentiments:
Optional pre-loaded sentiment data. Each dict must contain:
``timestamp``, ``ticker``, ``score``, ``confidence``.
"""
def __init__(
self,
bars: list[dict[str, Any]],
sentiments: list[dict[str, Any]] | None = None,
) -> None:
self._bars = sorted(bars, key=lambda b: b["timestamp"])
self._sentiments = sorted(sentiments or [], key=lambda s: s["timestamp"])
async def iterate(
self,
) -> AsyncIterator[tuple[datetime, str, dict[str, Any], SentimentContext | None]]:
"""Yield ``(timestamp, ticker, bar_data, sentiment_context)`` in order.
For each bar the loader aggregates all sentiment records for the
same ticker whose timestamps are <= the current bar's timestamp,
building a :class:`SentimentContext`. If no sentiment data is
available for the ticker, ``None`` is yielded instead.
"""
# Pre-index sentiments by ticker for efficient lookup
sentiment_by_ticker: dict[str, list[dict[str, Any]]] = defaultdict(list)
for s in self._sentiments:
sentiment_by_ticker[s["ticker"]].append(s)
for bar in self._bars:
ts = bar["timestamp"]
ticker = bar["ticker"]
# Build bar_data dict suitable for MarketDataManager.add_bar
bar_data = {
"timestamp": ts,
"open": bar["open"],
"high": bar["high"],
"low": bar["low"],
"close": bar["close"],
"volume": bar["volume"],
}
# Aggregate sentiment up to this timestamp
sentiment_ctx = self._build_sentiment(
ticker, ts, sentiment_by_ticker.get(ticker, [])
)
yield ts, ticker, bar_data, sentiment_ctx
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _build_sentiment(
ticker: str,
up_to: datetime,
records: list[dict[str, Any]],
) -> SentimentContext | None:
"""Build a SentimentContext from all records with timestamp <= up_to."""
relevant = [r for r in records if r["timestamp"] <= up_to]
if not relevant:
return None
scores = [r["score"] for r in relevant]
confidences = [r["confidence"] for r in relevant]
return SentimentContext(
ticker=ticker,
avg_score=sum(scores) / len(scores),
article_count=len(relevant),
recent_scores=scores[-10:], # last 10 scores
avg_confidence=sum(confidences) / len(confidences),
)