99 lines
3.4 KiB
Python
99 lines
3.4 KiB
Python
"""Historical data loader for backtesting.
|
|
|
|
:class:`BacktestDataLoader` takes pre-loaded bar and sentiment data and
|
|
yields it in chronological order, making the backtester independent of
|
|
any database.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from typing import Any, AsyncIterator
|
|
|
|
from shared.schemas.trading import SentimentContext
|
|
|
|
|
|
class BacktestDataLoader:
|
|
"""Iterates over historical bars (and optional sentiment) chronologically.
|
|
|
|
Parameters
|
|
----------
|
|
bars:
|
|
Pre-loaded OHLCV data. Each dict must contain at minimum:
|
|
``timestamp``, ``ticker``, ``open``, ``high``, ``low``,
|
|
``close``, ``volume``.
|
|
sentiments:
|
|
Optional pre-loaded sentiment data. Each dict must contain:
|
|
``timestamp``, ``ticker``, ``score``, ``confidence``.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
bars: list[dict[str, Any]],
|
|
sentiments: list[dict[str, Any]] | None = None,
|
|
) -> None:
|
|
self._bars = sorted(bars, key=lambda b: b["timestamp"])
|
|
self._sentiments = sorted(sentiments or [], key=lambda s: s["timestamp"])
|
|
|
|
async def iterate(
|
|
self,
|
|
) -> AsyncIterator[tuple[datetime, str, dict[str, Any], SentimentContext | None]]:
|
|
"""Yield ``(timestamp, ticker, bar_data, sentiment_context)`` in order.
|
|
|
|
For each bar the loader aggregates all sentiment records for the
|
|
same ticker whose timestamps are <= the current bar's timestamp,
|
|
building a :class:`SentimentContext`. If no sentiment data is
|
|
available for the ticker, ``None`` is yielded instead.
|
|
"""
|
|
# Pre-index sentiments by ticker for efficient lookup
|
|
sentiment_by_ticker: dict[str, list[dict[str, Any]]] = defaultdict(list)
|
|
for s in self._sentiments:
|
|
sentiment_by_ticker[s["ticker"]].append(s)
|
|
|
|
for bar in self._bars:
|
|
ts = bar["timestamp"]
|
|
ticker = bar["ticker"]
|
|
|
|
# Build bar_data dict suitable for MarketDataManager.add_bar
|
|
bar_data = {
|
|
"timestamp": ts,
|
|
"open": bar["open"],
|
|
"high": bar["high"],
|
|
"low": bar["low"],
|
|
"close": bar["close"],
|
|
"volume": bar["volume"],
|
|
}
|
|
|
|
# Aggregate sentiment up to this timestamp
|
|
sentiment_ctx = self._build_sentiment(
|
|
ticker, ts, sentiment_by_ticker.get(ticker, [])
|
|
)
|
|
|
|
yield ts, ticker, bar_data, sentiment_ctx
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _build_sentiment(
|
|
ticker: str,
|
|
up_to: datetime,
|
|
records: list[dict[str, Any]],
|
|
) -> SentimentContext | None:
|
|
"""Build a SentimentContext from all records with timestamp <= up_to."""
|
|
relevant = [r for r in records if r["timestamp"] <= up_to]
|
|
if not relevant:
|
|
return None
|
|
|
|
scores = [r["score"] for r in relevant]
|
|
confidences = [r["confidence"] for r in relevant]
|
|
|
|
return SentimentContext(
|
|
ticker=ticker,
|
|
avg_score=sum(scores) / len(scores),
|
|
article_count=len(relevant),
|
|
recent_scores=scores[-10:], # last 10 scores
|
|
avg_confidence=sum(confidences) / len(confidences),
|
|
)
|