trading/services/api_gateway/routes/backtest.py
Viktor Barzin 2a56727267
fix: resolve 8 critical issues from code review
C1: Fix BacktestDataLoader constructor args (was passing wrong kwargs)
C2: Fix BacktestResult attribute names (max_drawdown_pct, avg_hold_duration)
C3: Remove insecure JWT secret default (now required via env var)
C4: Fix .env.example to use TRADING_ prefix for all config vars
C5: Add missing fields to portfolio endpoint (daily_pnl_pct, total_pnl, trading_active)
C6: Add missing /portfolio/metrics endpoint
C7: Add 'value' field to equity curve response for frontend compatibility
C8: Add 6M/ALL periods and case-insensitive period enum parsing
Also: make app creation lazy to avoid config validation at import time
2026-02-22 17:48:40 +00:00

152 lines
5 KiB
Python

"""Backtest endpoints — run backtests and retrieve results."""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
from services.api_gateway.auth.middleware import get_current_user
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/backtest", tags=["backtest"])
class BacktestRequest(BaseModel):
"""Request body for starting a new backtest."""
start_date: datetime
end_date: datetime
initial_capital: float = Field(default=100_000.0, gt=0)
commission_per_trade: float = Field(default=0.0, ge=0)
slippage_pct: float = Field(default=0.001, ge=0)
strategy_weights: dict[str, float] = Field(default_factory=dict)
max_position_pct: float = Field(default=0.05, gt=0, le=1.0)
signal_threshold: float = Field(default=0.3, ge=0, le=1.0)
@router.post("/run")
async def run_backtest(
body: BacktestRequest,
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Start a backtest with the given configuration.
Returns a ``run_id`` immediately. The backtest runs in a background
task and its results can be retrieved via ``GET /api/backtest/{run_id}``.
"""
run_id = str(uuid.uuid4())
redis = request.app.state.redis
# Store initial status
await redis.setex(
f"backtest:{run_id}",
86400, # 24h TTL
json.dumps({
"status": "running",
"config": body.model_dump(mode="json"),
"started_at": datetime.now(tz=timezone.utc).isoformat(),
}),
)
# Launch background task
asyncio.create_task(_run_backtest_task(run_id, body, redis))
return {"run_id": run_id, "status": "running"}
async def _run_backtest_task(
run_id: str,
config: BacktestRequest,
redis,
) -> None:
"""Execute the backtest in the background and store results in Redis."""
try:
from backtester.config import BacktestConfig
from backtester.engine import BacktestEngine
from shared.strategies.momentum import MomentumStrategy
from shared.strategies.mean_reversion import MeanReversionStrategy
from shared.strategies.news_driven import NewsDrivenStrategy
bt_config = BacktestConfig(
start_date=config.start_date,
end_date=config.end_date,
initial_capital=config.initial_capital,
commission_per_trade=config.commission_per_trade,
slippage_pct=config.slippage_pct,
strategy_weights=config.strategy_weights,
max_position_pct=config.max_position_pct,
signal_threshold=config.signal_threshold,
)
strategies = [
MomentumStrategy(),
MeanReversionStrategy(),
NewsDrivenStrategy(),
]
engine = BacktestEngine(config=bt_config, strategies=strategies)
# Use an empty data loader for now; a full implementation
# would load historical bars from TimescaleDB.
from backtester.data_loader import BacktestDataLoader
data_loader = BacktestDataLoader(bars=[], sentiments=[])
result = await engine.run(data_loader)
await redis.setex(
f"backtest:{run_id}",
86400,
json.dumps({
"status": "completed",
"config": config.model_dump(mode="json"),
"result": {
"total_return": result.total_return,
"annualized_return": result.annualized_return,
"sharpe_ratio": result.sharpe_ratio,
"sortino_ratio": result.sortino_ratio,
"max_drawdown_pct": result.max_drawdown_pct,
"max_drawdown_duration_days": result.max_drawdown_duration_days,
"win_rate": result.win_rate,
"avg_win_loss_ratio": result.avg_win_loss_ratio,
"trade_count": result.trade_count,
"avg_hold_duration_seconds": result.avg_hold_duration.total_seconds(),
},
"completed_at": datetime.now(tz=timezone.utc).isoformat(),
}),
)
except Exception as exc:
logger.exception("Backtest %s failed", run_id)
await redis.setex(
f"backtest:{run_id}",
86400,
json.dumps({
"status": "failed",
"error": str(exc),
}),
)
@router.get("/{run_id}")
async def get_backtest(
run_id: str,
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Get backtest results by run ID."""
redis = request.app.state.redis
raw = await redis.get(f"backtest:{run_id}")
if raw is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Backtest run not found",
)
return json.loads(raw)