trading/backtester/engine.py

164 lines
6.1 KiB
Python

"""Main backtest engine that replays historical data through strategies.
Ties together the :class:`~backtester.data_loader.BacktestDataLoader`,
:class:`~backtester.simulated_broker.SimulatedBroker`,
:class:`~services.signal_generator.ensemble.WeightedEnsemble`, and
:class:`~services.signal_generator.market_data.MarketDataManager` to
produce a :class:`~backtester.metrics.BacktestResult`.
"""
from __future__ import annotations
import logging
from datetime import datetime
from backtester.config import BacktestConfig
from backtester.data_loader import BacktestDataLoader
from backtester.metrics import BacktestResult, compute_metrics
from backtester.simulated_broker import SimulatedBroker
from services.signal_generator.ensemble import WeightedEnsemble
from services.signal_generator.market_data import MarketDataManager
from shared.schemas.trading import (
OrderRequest,
OrderSide,
SignalDirection,
)
from shared.strategies.base import BaseStrategy
logger = logging.getLogger(__name__)
class BacktestEngine:
"""Replays historical data through the trading pipeline.
Parameters
----------
config:
Backtest configuration (dates, capital, slippage, weights, etc.).
strategies:
List of strategy instances to evaluate.
"""
def __init__(
self,
config: BacktestConfig,
strategies: list[BaseStrategy],
) -> None:
self.config = config
self.strategies = strategies
async def run(self, data_loader: BacktestDataLoader) -> BacktestResult:
"""Execute the full backtest and return metrics.
Steps
-----
1. Create SimulatedBroker, MarketDataManager, WeightedEnsemble.
2. Iterate over data_loader bars in chronological order.
3. For each bar: update market data, update broker prices,
build snapshot, run ensemble, submit orders.
4. Close remaining positions at final prices.
5. Compute and return metrics.
"""
broker = SimulatedBroker(
initial_capital=self.config.initial_capital,
slippage_pct=self.config.slippage_pct,
commission_per_trade=self.config.commission_per_trade,
)
market_data = MarketDataManager()
ensemble = WeightedEnsemble(
strategies=self.strategies,
threshold=self.config.signal_threshold,
)
# Resolve strategy weights
weights = self._resolve_weights()
equity_curve: list[tuple[datetime, float]] = []
# ---- Main replay loop ----
async for timestamp, ticker, bar_data, sentiment in data_loader.iterate():
# a. Update market data manager with the new bar
market_data.add_bar(ticker, bar_data)
# b. Update broker prices
broker.set_current_prices({ticker: bar_data["close"]})
# c. Build market snapshot
snapshot = market_data.get_snapshot(ticker)
if snapshot is None:
continue
# d. Run ensemble
signal = await ensemble.evaluate(ticker, snapshot, sentiment, weights)
# e. If signal, do simple position sizing and submit order
if signal is not None:
account = await broker.get_account()
positions = await broker.get_positions()
position_tickers = {p.ticker for p in positions}
# Determine order side
if signal.direction == SignalDirection.LONG and ticker not in position_tickers:
# Buy: size using max_position_pct * equity * strength
position_value = account.equity * self.config.max_position_pct * signal.strength
current_price = bar_data["close"]
if current_price > 0:
qty = int(position_value / current_price)
if qty > 0:
order = OrderRequest(
ticker=ticker,
side=OrderSide.BUY,
qty=float(qty),
)
await broker.submit_order(order)
elif signal.direction == SignalDirection.SHORT and ticker in position_tickers:
# Sell: close entire position
for pos in positions:
if pos.ticker == ticker:
order = OrderRequest(
ticker=ticker,
side=OrderSide.SELL,
qty=pos.qty,
)
await broker.submit_order(order)
break
# g. Record equity snapshot
account = await broker.get_account()
equity_curve.append((timestamp, account.equity))
# ---- Close all remaining positions at final prices ----
remaining_positions = await broker.get_positions()
for pos in remaining_positions:
order = OrderRequest(
ticker=pos.ticker,
side=OrderSide.SELL,
qty=pos.qty,
)
await broker.submit_order(order)
# Record final equity after closing
if equity_curve:
final_account = await broker.get_account()
equity_curve.append((equity_curve[-1][0], final_account.equity))
# ---- Compute metrics ----
trade_log = broker.get_trade_log()
result = compute_metrics(trade_log, equity_curve, self.config.initial_capital)
return result
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _resolve_weights(self) -> dict[str, float]:
"""Return strategy weights, defaulting to equal if none configured."""
if self.config.strategy_weights:
return dict(self.config.strategy_weights)
# Equal weights
if not self.strategies:
return {}
equal_w = round(1.0 / len(self.strategies), 6)
return {s.name: equal_w for s in self.strategies}