"""Main backtest engine that replays historical data through strategies. Ties together the :class:`~backtester.data_loader.BacktestDataLoader`, :class:`~backtester.simulated_broker.SimulatedBroker`, :class:`~services.signal_generator.ensemble.WeightedEnsemble`, and :class:`~services.signal_generator.market_data.MarketDataManager` to produce a :class:`~backtester.metrics.BacktestResult`. """ from __future__ import annotations import logging from datetime import datetime from backtester.config import BacktestConfig from backtester.data_loader import BacktestDataLoader from backtester.metrics import BacktestResult, compute_metrics from backtester.simulated_broker import SimulatedBroker from services.signal_generator.ensemble import WeightedEnsemble from services.signal_generator.market_data import MarketDataManager from shared.schemas.trading import ( OrderRequest, OrderSide, SignalDirection, ) from shared.strategies.base import BaseStrategy logger = logging.getLogger(__name__) class BacktestEngine: """Replays historical data through the trading pipeline. Parameters ---------- config: Backtest configuration (dates, capital, slippage, weights, etc.). strategies: List of strategy instances to evaluate. """ def __init__( self, config: BacktestConfig, strategies: list[BaseStrategy], ) -> None: self.config = config self.strategies = strategies async def run(self, data_loader: BacktestDataLoader) -> BacktestResult: """Execute the full backtest and return metrics. Steps ----- 1. Create SimulatedBroker, MarketDataManager, WeightedEnsemble. 2. Iterate over data_loader bars in chronological order. 3. For each bar: update market data, update broker prices, build snapshot, run ensemble, submit orders. 4. Close remaining positions at final prices. 5. Compute and return metrics. """ broker = SimulatedBroker( initial_capital=self.config.initial_capital, slippage_pct=self.config.slippage_pct, commission_per_trade=self.config.commission_per_trade, ) market_data = MarketDataManager() ensemble = WeightedEnsemble( strategies=self.strategies, threshold=self.config.signal_threshold, ) # Resolve strategy weights weights = self._resolve_weights() equity_curve: list[tuple[datetime, float]] = [] # ---- Main replay loop ---- async for timestamp, ticker, bar_data, sentiment in data_loader.iterate(): # a. Update market data manager with the new bar market_data.add_bar(ticker, bar_data) # b. Update broker prices broker.set_current_prices({ticker: bar_data["close"]}) # c. Build market snapshot snapshot = market_data.get_snapshot(ticker) if snapshot is None: continue # d. Run ensemble signal = await ensemble.evaluate(ticker, snapshot, sentiment, weights) # e. If signal, do simple position sizing and submit order if signal is not None: account = await broker.get_account() positions = await broker.get_positions() position_tickers = {p.ticker for p in positions} # Determine order side if signal.direction == SignalDirection.LONG and ticker not in position_tickers: # Buy: size using max_position_pct * equity * strength position_value = account.equity * self.config.max_position_pct * signal.strength current_price = bar_data["close"] if current_price > 0: qty = int(position_value / current_price) if qty > 0: order = OrderRequest( ticker=ticker, side=OrderSide.BUY, qty=float(qty), ) await broker.submit_order(order) elif signal.direction == SignalDirection.SHORT and ticker in position_tickers: # Sell: close entire position for pos in positions: if pos.ticker == ticker: order = OrderRequest( ticker=ticker, side=OrderSide.SELL, qty=pos.qty, ) await broker.submit_order(order) break # g. Record equity snapshot account = await broker.get_account() equity_curve.append((timestamp, account.equity)) # ---- Close all remaining positions at final prices ---- remaining_positions = await broker.get_positions() for pos in remaining_positions: order = OrderRequest( ticker=pos.ticker, side=OrderSide.SELL, qty=pos.qty, ) await broker.submit_order(order) # Record final equity after closing if equity_curve: final_account = await broker.get_account() equity_curve.append((equity_curve[-1][0], final_account.equity)) # ---- Compute metrics ---- trade_log = broker.get_trade_log() result = compute_metrics(trade_log, equity_curve, self.config.initial_capital) return result # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _resolve_weights(self) -> dict[str, float]: """Return strategy weights, defaulting to equal if none configured.""" if self.config.strategy_weights: return dict(self.config.strategy_weights) # Equal weights if not self.strategies: return {} equal_w = round(1.0 / len(self.strategies), 6) return {s.name: equal_w for s in self.strategies}