trading/services/api_gateway/routes/backtest.py

158 lines
5.2 KiB
Python
Raw Normal View History

"""Backtest endpoints — run backtests and retrieve results."""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
from services.api_gateway.auth.middleware import get_current_user
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/backtest", tags=["backtest"])
# Store references to background tasks to prevent garbage collection
_background_tasks: set[asyncio.Task] = set()
class BacktestRequest(BaseModel):
"""Request body for starting a new backtest."""
start_date: datetime
end_date: datetime
initial_capital: float = Field(default=100_000.0, gt=0)
commission_per_trade: float = Field(default=0.0, ge=0)
slippage_pct: float = Field(default=0.001, ge=0)
strategy_weights: dict[str, float] = Field(default_factory=dict)
max_position_pct: float = Field(default=0.05, gt=0, le=1.0)
signal_threshold: float = Field(default=0.3, ge=0, le=1.0)
@router.post("/run")
async def run_backtest(
body: BacktestRequest,
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Start a backtest with the given configuration.
Returns a ``run_id`` immediately. The backtest runs in a background
task and its results can be retrieved via ``GET /api/backtest/{run_id}``.
"""
run_id = str(uuid.uuid4())
redis = request.app.state.redis
# Store initial status
await redis.setex(
f"backtest:{run_id}",
86400, # 24h TTL
json.dumps({
"status": "running",
"config": body.model_dump(mode="json"),
"started_at": datetime.now(tz=timezone.utc).isoformat(),
}),
)
# Launch background task (stored in set to prevent GC)
task = asyncio.create_task(_run_backtest_task(run_id, body, redis))
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
return {"run_id": run_id, "status": "running"}
async def _run_backtest_task(
run_id: str,
config: BacktestRequest,
redis,
) -> None:
"""Execute the backtest in the background and store results in Redis."""
try:
from backtester.config import BacktestConfig
from backtester.engine import BacktestEngine
from shared.strategies.momentum import MomentumStrategy
from shared.strategies.mean_reversion import MeanReversionStrategy
from shared.strategies.news_driven import NewsDrivenStrategy
bt_config = BacktestConfig(
start_date=config.start_date,
end_date=config.end_date,
initial_capital=config.initial_capital,
commission_per_trade=config.commission_per_trade,
slippage_pct=config.slippage_pct,
strategy_weights=config.strategy_weights,
max_position_pct=config.max_position_pct,
signal_threshold=config.signal_threshold,
)
strategies = [
MomentumStrategy(),
MeanReversionStrategy(),
NewsDrivenStrategy(),
]
engine = BacktestEngine(config=bt_config, strategies=strategies)
# Use an empty data loader for now; a full implementation
# would load historical bars from TimescaleDB.
from backtester.data_loader import BacktestDataLoader
data_loader = BacktestDataLoader(bars=[], sentiments=[])
result = await engine.run(data_loader)
await redis.setex(
f"backtest:{run_id}",
86400,
json.dumps({
"status": "completed",
"config": config.model_dump(mode="json"),
"result": {
"total_return": result.total_return,
"annualized_return": result.annualized_return,
"sharpe_ratio": result.sharpe_ratio,
"sortino_ratio": result.sortino_ratio,
"max_drawdown_pct": result.max_drawdown_pct,
"max_drawdown_duration_days": result.max_drawdown_duration_days,
"win_rate": result.win_rate,
"avg_win_loss_ratio": result.avg_win_loss_ratio,
"trade_count": result.trade_count,
"avg_hold_duration_seconds": result.avg_hold_duration.total_seconds(),
},
"completed_at": datetime.now(tz=timezone.utc).isoformat(),
}),
)
except Exception as exc:
logger.exception("Backtest %s failed", run_id)
await redis.setex(
f"backtest:{run_id}",
86400,
json.dumps({
"status": "failed",
"error": str(exc),
}),
)
@router.get("/{run_id}")
async def get_backtest(
run_id: str,
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Get backtest results by run ID."""
redis = request.app.state.redis
raw = await redis.get(f"backtest:{run_id}")
if raw is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Backtest run not found",
)
return json.loads(raw)