trading/services/api_gateway/routes/backtest.py
Viktor Barzin a2c08743ac
feat: make backtest work end-to-end with Alpaca bars, ticker selection, all 9 strategies
- Change BacktestRequest from strategy_weights dict to strategies list to match frontend
- Add tickers field so users can select which stocks to backtest
- Fetch historical bars from Alpaca StockHistoricalDataClient instead of empty data loader
- Register all 9 strategies (momentum, mean_reversion, news_driven, value, macd_crossover,
  bollinger_breakout, vwap, liquidity, ma_stack) filtered by user selection
- Fix response format: use frontend field names (max_drawdown, total_trades, win_rate as
  0-1 decimal), include equity_curve and run_id in response
- Add ticker selector with checkboxes and custom ticker input to dashboard
- Add alpaca-py to api dependency group in pyproject.toml
2026-02-23 22:25:41 +00:00

283 lines
9.1 KiB
Python

"""Backtest endpoints — run backtests and retrieve results."""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
from services.api_gateway.auth.middleware import get_current_user
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/backtest", tags=["backtest"])
# Store references to background tasks to prevent garbage collection
_background_tasks: set[asyncio.Task] = set()
# All available strategy classes keyed by name
_STRATEGY_REGISTRY: dict[str, type] | None = None
def _get_strategy_registry() -> dict[str, type]:
"""Lazy-load strategy classes to avoid import-time side effects."""
global _STRATEGY_REGISTRY
if _STRATEGY_REGISTRY is None:
from shared.strategies import (
MomentumStrategy,
MeanReversionStrategy,
NewsDrivenStrategy,
ValueStrategy,
MACDCrossoverStrategy,
BollingerBreakoutStrategy,
VWAPStrategy,
LiquidityStrategy,
MAStackStrategy,
)
_STRATEGY_REGISTRY = {
"momentum": MomentumStrategy,
"mean_reversion": MeanReversionStrategy,
"news_driven": NewsDrivenStrategy,
"value": ValueStrategy,
"macd_crossover": MACDCrossoverStrategy,
"bollinger_breakout": BollingerBreakoutStrategy,
"vwap": VWAPStrategy,
"liquidity": LiquidityStrategy,
"ma_stack": MAStackStrategy,
}
return _STRATEGY_REGISTRY
class BacktestRequest(BaseModel):
"""Request body for starting a new backtest."""
start_date: datetime
end_date: datetime
initial_capital: float = Field(default=100_000.0, gt=0)
commission_per_trade: float = Field(default=0.0, ge=0)
slippage_pct: float = Field(default=0.001, ge=0)
strategies: list[str] = Field(default_factory=list)
tickers: list[str] = Field(default_factory=lambda: ["AAPL", "TSLA", "NVDA", "MSFT", "GOOGL"])
max_position_pct: float = Field(default=0.05, gt=0, le=1.0)
signal_threshold: float = Field(default=0.3, ge=0, le=1.0)
@router.post("/run")
async def run_backtest(
body: BacktestRequest,
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Start a backtest with the given configuration.
Returns a ``run_id`` immediately. The backtest runs in a background
task and its results can be retrieved via ``GET /api/backtest/{run_id}``.
"""
run_id = str(uuid.uuid4())
redis = request.app.state.redis
config = request.app.state.config
# Store initial status
await redis.setex(
f"backtest:{run_id}",
86400, # 24h TTL
json.dumps({
"run_id": run_id,
"status": "running",
"config": body.model_dump(mode="json"),
"started_at": datetime.now(tz=timezone.utc).isoformat(),
}),
)
# Launch background task (stored in set to prevent GC)
task = asyncio.create_task(_run_backtest_task(run_id, body, redis, config))
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
return {"run_id": run_id, "status": "running"}
async def _run_backtest_task(
run_id: str,
config: BacktestRequest,
redis,
app_config,
) -> None:
"""Execute the backtest in the background and store results in Redis."""
try:
from backtester.config import BacktestConfig
from backtester.data_loader import BacktestDataLoader
from backtester.engine import BacktestEngine
# ---- Fetch historical bars from Alpaca ----
bars = await _fetch_alpaca_bars(
tickers=config.tickers,
start=config.start_date,
end=config.end_date,
api_key=app_config.alpaca_api_key,
secret_key=app_config.alpaca_secret_key,
)
if not bars:
await redis.setex(
f"backtest:{run_id}",
86400,
json.dumps({
"run_id": run_id,
"status": "failed",
"error": "No historical bar data returned from Alpaca. Check tickers and date range.",
}),
)
return
data_loader = BacktestDataLoader(bars=bars)
# ---- Build strategy list ----
registry = _get_strategy_registry()
strategy_names = config.strategies or list(registry.keys())
strategies = [
registry[name]()
for name in strategy_names
if name in registry
]
if not strategies:
await redis.setex(
f"backtest:{run_id}",
86400,
json.dumps({
"run_id": run_id,
"status": "failed",
"error": f"No valid strategies selected. Available: {list(registry.keys())}",
}),
)
return
bt_config = BacktestConfig(
start_date=config.start_date,
end_date=config.end_date,
initial_capital=config.initial_capital,
commission_per_trade=config.commission_per_trade,
slippage_pct=config.slippage_pct,
strategy_weights={}, # equal weights
max_position_pct=config.max_position_pct,
signal_threshold=config.signal_threshold,
)
engine = BacktestEngine(config=bt_config, strategies=strategies)
result = await engine.run(data_loader)
# ---- Build response matching frontend expectations ----
equity_curve = [
{"timestamp": ts.isoformat(), "value": eq}
for ts, eq in result.equity_curve
]
await redis.setex(
f"backtest:{run_id}",
86400,
json.dumps({
"run_id": run_id,
"status": "completed",
"equity_curve": equity_curve,
"metrics": {
"total_return": result.total_return,
"annualized_return": result.annualized_return,
"sharpe_ratio": result.sharpe_ratio,
"max_drawdown": result.max_drawdown_pct,
"win_rate": result.win_rate / 100.0,
"total_trades": result.trade_count,
"avg_hold_duration": str(result.avg_hold_duration),
},
"completed_at": datetime.now(tz=timezone.utc).isoformat(),
}),
)
except Exception as exc:
logger.exception("Backtest %s failed", run_id)
await redis.setex(
f"backtest:{run_id}",
86400,
json.dumps({
"run_id": run_id,
"status": "failed",
"error": str(exc),
}),
)
async def _fetch_alpaca_bars(
tickers: list[str],
start: datetime,
end: datetime,
api_key: str,
secret_key: str,
) -> list[dict]:
"""Fetch historical bars from Alpaca's market data API.
Runs the synchronous Alpaca SDK call in a thread executor to avoid
blocking the event loop.
"""
if not api_key or not secret_key:
raise ValueError("Alpaca API credentials not configured (TRADING_ALPACA_API_KEY / TRADING_ALPACA_SECRET_KEY)")
def _fetch() -> list[dict]:
from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.requests import StockBarsRequest
from alpaca.data.timeframe import TimeFrame
client = StockHistoricalDataClient(api_key, secret_key)
# Ensure timezone-aware datetimes
start_dt = start if start.tzinfo else start.replace(tzinfo=timezone.utc)
end_dt = end if end.tzinfo else end.replace(tzinfo=timezone.utc)
req = StockBarsRequest(
symbol_or_symbols=tickers,
timeframe=TimeFrame.Day,
start=start_dt,
end=end_dt,
)
bars_response = client.get_stock_bars(req)
all_bars: list[dict] = []
for ticker in tickers:
ticker_bars = bars_response.get(ticker, []) if bars_response else []
for bar in ticker_bars:
all_bars.append({
"timestamp": bar.timestamp,
"ticker": ticker,
"open": float(bar.open),
"high": float(bar.high),
"low": float(bar.low),
"close": float(bar.close),
"volume": int(bar.volume),
})
return all_bars
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _fetch)
@router.get("/{run_id}")
async def get_backtest(
run_id: str,
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Get backtest results by run ID."""
redis = request.app.state.redis
raw = await redis.get(f"backtest:{run_id}")
if raw is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Backtest run not found",
)
return json.loads(raw)