feat(api): /api/meet-kevin/backtest/* routes
Some checks failed
ci/woodpecker/push/woodpecker Pipeline was canceled

This commit is contained in:
Viktor Barzin 2026-05-24 01:10:34 +00:00
parent db103df9b1
commit 886dbaec86
4 changed files with 478 additions and 0 deletions

View file

@ -93,6 +93,9 @@ def create_app(config: ApiGatewayConfig | None = None) -> FastAPI:
from services.api_gateway.routes.controls import router as controls_router
from services.api_gateway.routes.backtest import router as backtest_router
from services.api_gateway.routes.meet_kevin import router as meet_kevin_router
from services.api_gateway.routes.meet_kevin_backtest import (
router as meet_kevin_backtest_router,
)
app.include_router(portfolio_router)
app.include_router(trades_router)
@ -102,6 +105,7 @@ def create_app(config: ApiGatewayConfig | None = None) -> FastAPI:
app.include_router(controls_router)
app.include_router(backtest_router)
app.include_router(meet_kevin_router)
app.include_router(meet_kevin_backtest_router)
# WebSocket
from services.api_gateway.ws import router as ws_router

View file

@ -0,0 +1,337 @@
"""Meet Kevin backtest API — run, list, get, latest."""
from __future__ import annotations
import asyncio
import logging
import uuid
from datetime import datetime, timezone
from decimal import Decimal
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
from sqlalchemy import select
from services.api_gateway.auth.middleware import get_current_user
from shared.models.meet_kevin import KevinStockMention
from shared.models.meet_kevin_trading import (
KevinBacktestRun,
KevinBacktestRunStatus,
KevinBacktestTrade,
TriggerSource,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/meet-kevin/backtest", tags=["meet-kevin-backtest"])
_background_tasks: set[asyncio.Task[Any]] = set()
class KevinBacktestRunRequest(BaseModel):
"""Request body for /api/meet-kevin/backtest/run."""
initial_capital: float = Field(default=100_000.0, gt=0)
slippage_pct: float = Field(default=0.0005, ge=0)
commission_per_trade: float = Field(default=0.0, ge=0)
dedupe_policy: str = Field(default="roll", pattern="^(roll|ignore)$")
class KevinBacktestRunSummary(BaseModel):
run_uuid: str
status: str
started_at: datetime | None = None
finished_at: datetime | None = None
total_return_pct: float | None = None
trade_count: int | None = None
def _summary_from_row(row: KevinBacktestRun) -> KevinBacktestRunSummary:
metrics = row.metrics_json or {}
return KevinBacktestRunSummary(
run_uuid=str(row.run_uuid),
status=row.status.value if hasattr(row.status, "value") else str(row.status),
started_at=row.started_at,
finished_at=row.finished_at,
total_return_pct=metrics.get("total_return_pct"),
trade_count=metrics.get("trade_count"),
)
@router.post("/run", status_code=status.HTTP_202_ACCEPTED)
async def run_backtest(
body: KevinBacktestRunRequest,
request: Request,
_user: dict = Depends(get_current_user),
) -> dict[str, str]:
"""Kick off a Kevin backtest in the background. Returns run_uuid."""
run_uuid = uuid.uuid4()
session_factory = request.app.state.db_session_factory
async with session_factory() as session:
row = KevinBacktestRun(
run_uuid=run_uuid,
status=KevinBacktestRunStatus.RUNNING,
trigger_source=TriggerSource.MANUAL,
params_json=body.model_dump(),
)
session.add(row)
await session.commit()
task = asyncio.create_task(
_execute_backtest(run_uuid, body, session_factory)
)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
return {"run_uuid": str(run_uuid), "status": "running"}
async def _execute_backtest(
run_uuid: uuid.UUID,
params: KevinBacktestRunRequest,
session_factory: Any,
) -> None:
"""Background task that loads mentions, runs the engine, persists results."""
from backtester.kevin_backtest import KevinBacktestParams, KevinBacktestRunner
from backtester.kevin_price_loader import KevinPriceLoader
from shared.strategies.kevin import KevinStrategy, KevinStrategyConfig
try:
async with session_factory() as session:
mentions = (
(
await session.execute(
select(KevinStockMention).order_by(
KevinStockMention.created_at.asc()
)
)
)
.scalars()
.all()
)
strategy_cfg = KevinStrategyConfig(
min_conviction=Decimal("0.6"),
max_mention_age_hours=48 * 365,
base_position_pct=Decimal("0.04"),
min_trade_usd=Decimal("500"),
max_trade_usd=Decimal("5000"),
max_per_ticker_usd=Decimal("7500"),
hold_days_by_horizon={
"days": 3,
"weeks": 10,
"months": 45,
"long_term": 90,
"unspecified": 10,
},
avoid_closes_longs=True,
avoid_blocks_days=7,
)
strategy = KevinStrategy(strategy_cfg)
# Use an Alpaca-less price loader for now — backtest will fall back
# to whatever is already in market_data. A real run would inject the
# AlpacaBroker here.
class _NoAlpaca:
async def fetch_daily_bars(self, *a: Any, **kw: Any) -> Any:
import pandas as pd
return pd.DataFrame()
async def is_asset_tradable(self, *a: Any, **kw: Any) -> bool:
return True
price_loader = KevinPriceLoader(
session_factory=session_factory, alpaca_fetcher=_NoAlpaca()
)
runner = KevinBacktestRunner(strategy, price_loader)
result = await runner.run(
mentions,
KevinBacktestParams(
initial_capital=Decimal(str(params.initial_capital)),
slippage_pct=Decimal(str(params.slippage_pct)),
commission_per_trade=Decimal(str(params.commission_per_trade)),
dedupe_policy=params.dedupe_policy,
),
)
metrics = {
"total_return_pct": float(result.total_return_pct),
"annualized_return_pct": result.annualized_return,
"sharpe_ratio": result.sharpe_ratio,
"max_drawdown_pct": result.max_drawdown_pct,
"win_rate": result.win_rate,
"trade_count": result.trade_count,
"alpha_vs_spy_pct": (
float(result.alpha_vs_spy_pct)
if result.alpha_vs_spy_pct is not None
else None
),
"beta_vs_spy": (
float(result.beta_vs_spy)
if result.beta_vs_spy is not None
else None
),
"avg_winner_pct": (
float(result.avg_winner_pct)
if result.avg_winner_pct is not None
else None
),
"avg_loser_pct": (
float(result.avg_loser_pct)
if result.avg_loser_pct is not None
else None
),
}
equity_curve_serialised: list[list[Any]] = [
[ts.isoformat(), float(eq)] for ts, eq in result.equity_curve
]
async with session_factory() as session:
row = (
await session.execute(
select(KevinBacktestRun).where(
KevinBacktestRun.run_uuid == run_uuid
)
)
).scalar_one()
row.status = KevinBacktestRunStatus.COMPLETED
row.finished_at = datetime.now(timezone.utc)
row.metrics_json = metrics
row.equity_curve_json = equity_curve_serialised
for t in result.trades:
session.add(
KevinBacktestTrade(
run_id=row.id,
symbol=t["symbol"],
source_mention_id=t.get("source_mention_id"),
entry_at=t["entry_at"],
entry_price=t["entry_price"],
exit_at=t.get("exit_at"),
exit_price=t.get("exit_price"),
qty=t["qty"],
pnl_usd=t.get("pnl_usd"),
pnl_pct=t.get("pnl_pct"),
holding_days_actual=t.get("holding_days_actual"),
)
)
await session.commit()
except Exception as exc:
logger.exception("Kevin backtest %s failed", run_uuid)
async with session_factory() as session:
row = (
await session.execute(
select(KevinBacktestRun).where(
KevinBacktestRun.run_uuid == run_uuid
)
)
).scalar_one()
row.status = KevinBacktestRunStatus.FAILED
row.finished_at = datetime.now(timezone.utc)
row.error_message = str(exc)
await session.commit()
@router.get("/runs")
async def list_runs(
request: Request,
limit: int = Query(20, ge=1, le=200),
_user: dict = Depends(get_current_user),
) -> list[KevinBacktestRunSummary]:
session_factory = request.app.state.db_session_factory
async with session_factory() as session:
rows = (
(
await session.execute(
select(KevinBacktestRun)
.order_by(KevinBacktestRun.started_at.desc())
.limit(limit)
)
)
.scalars()
.all()
)
return [_summary_from_row(r) for r in rows]
@router.get("/runs/{run_uuid}")
async def get_run(
run_uuid: uuid.UUID,
request: Request,
_user: dict = Depends(get_current_user),
) -> dict[str, Any]:
session_factory = request.app.state.db_session_factory
async with session_factory() as session:
row = (
await session.execute(
select(KevinBacktestRun).where(KevinBacktestRun.run_uuid == run_uuid)
)
).scalar_one_or_none()
if row is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="run not found"
)
trades = (
(
await session.execute(
select(KevinBacktestTrade).where(
KevinBacktestTrade.run_id == row.id
)
)
)
.scalars()
.all()
)
return {
"run_uuid": str(row.run_uuid),
"status": row.status.value if hasattr(row.status, "value") else str(row.status),
"params": row.params_json,
"metrics": row.metrics_json,
"equity_curve": row.equity_curve_json,
"benchmark_curve": row.benchmark_curve_json,
"error_message": row.error_message,
"trades": [
{
"symbol": t.symbol,
"entry_at": t.entry_at.isoformat() if t.entry_at else None,
"entry_price": float(t.entry_price) if t.entry_price else None,
"exit_at": t.exit_at.isoformat() if t.exit_at else None,
"exit_price": float(t.exit_price) if t.exit_price else None,
"qty": float(t.qty),
"pnl_pct": float(t.pnl_pct) if t.pnl_pct is not None else None,
"pnl_usd": float(t.pnl_usd) if t.pnl_usd is not None else None,
"holding_days_actual": t.holding_days_actual,
}
for t in trades
],
}
@router.get("/latest")
async def latest_run(
request: Request,
_user: dict = Depends(get_current_user),
) -> dict[str, Any]:
session_factory = request.app.state.db_session_factory
async with session_factory() as session:
row = (
await session.execute(
select(KevinBacktestRun)
.where(KevinBacktestRun.status == KevinBacktestRunStatus.COMPLETED)
.order_by(KevinBacktestRun.started_at.desc())
.limit(1)
)
).scalar_one_or_none()
if row is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="no completed runs yet",
)
return await get_run(row.run_uuid, request, _user)

View file

View file

@ -0,0 +1,137 @@
"""Tests for /api/meet-kevin/backtest/* routes (mocked DB)."""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
import pytest
from fastapi.testclient import TestClient
from services.api_gateway.auth.jwt import create_access_token
from services.api_gateway.auth.middleware import get_config, get_current_user
from services.api_gateway.config import ApiGatewayConfig
from services.api_gateway.main import create_app
@pytest.fixture
def config() -> ApiGatewayConfig:
return ApiGatewayConfig(
jwt_secret_key="test-secret-for-routes-kevin-backtest",
database_url="sqlite+aiosqlite:///:memory:",
redis_url="redis://localhost:6379/0",
)
@pytest.fixture
def auth_headers(config: ApiGatewayConfig) -> dict[str, str]:
token = create_access_token("user-test", "tester", config)
return {"Authorization": f"Bearer {token}"}
def _make_session() -> tuple[MagicMock, AsyncMock]:
session = AsyncMock()
session.__aenter__ = AsyncMock(return_value=session)
session.__aexit__ = AsyncMock(return_value=False)
factory = MagicMock()
factory.return_value = session
return factory, session
def _make_execute_result(scalars: list, scalar_one_or_none=None):
result = MagicMock()
result.scalars.return_value.all.return_value = scalars
result.scalar_one_or_none.return_value = scalar_one_or_none
return result
@pytest.fixture
def app(config: ApiGatewayConfig):
factory, session = _make_session()
app = create_app(config)
app.dependency_overrides[get_current_user] = lambda: {
"sub": "user-test",
"username": "tester",
"type": "access",
}
app.dependency_overrides[get_config] = lambda: config
app.state.db_session_factory = factory
app.state.config = config
app.state.redis = AsyncMock()
app.state.db_engine = MagicMock()
app._test_session = session # type: ignore[attr-defined]
return app
def test_list_runs_returns_empty_initially(app, auth_headers):
session: AsyncMock = app._test_session # type: ignore[attr-defined]
session.execute = AsyncMock(return_value=_make_execute_result([]))
client = TestClient(app, raise_server_exceptions=False)
resp = client.get("/api/meet-kevin/backtest/runs", headers=auth_headers)
assert resp.status_code == 200
assert resp.json() == []
def test_get_run_by_uuid_returns_404_if_unknown(app, auth_headers):
session: AsyncMock = app._test_session # type: ignore[attr-defined]
session.execute = AsyncMock(
return_value=_make_execute_result([], scalar_one_or_none=None)
)
client = TestClient(app, raise_server_exceptions=False)
unknown = uuid.uuid4()
resp = client.get(
f"/api/meet-kevin/backtest/runs/{unknown}", headers=auth_headers
)
assert resp.status_code == 404
def test_list_runs_returns_persisted_row(app, auth_headers):
"""When the DB has a row, the route serialises it correctly."""
fake_row = MagicMock()
fake_row.run_uuid = uuid.uuid4()
fake_row.status = MagicMock(value="completed")
fake_row.started_at = datetime.now(timezone.utc)
fake_row.finished_at = datetime.now(timezone.utc)
fake_row.metrics_json = {"total_return_pct": 3.5, "trade_count": 2}
session: AsyncMock = app._test_session # type: ignore[attr-defined]
session.execute = AsyncMock(return_value=_make_execute_result([fake_row]))
client = TestClient(app, raise_server_exceptions=False)
resp = client.get("/api/meet-kevin/backtest/runs", headers=auth_headers)
assert resp.status_code == 200
rows = resp.json()
assert len(rows) == 1
assert rows[0]["status"] == "completed"
assert rows[0]["total_return_pct"] == 3.5
def test_latest_returns_404_if_no_completed(app, auth_headers):
session: AsyncMock = app._test_session # type: ignore[attr-defined]
session.execute = AsyncMock(
return_value=_make_execute_result([], scalar_one_or_none=None)
)
client = TestClient(app, raise_server_exceptions=False)
resp = client.get("/api/meet-kevin/backtest/latest", headers=auth_headers)
assert resp.status_code == 404
def test_run_backtest_returns_run_uuid(app, auth_headers):
session: AsyncMock = app._test_session # type: ignore[attr-defined]
session.commit = AsyncMock()
session.add = MagicMock()
client = TestClient(app, raise_server_exceptions=False)
resp = client.post(
"/api/meet-kevin/backtest/run",
headers=auth_headers,
json={"initial_capital": 100000.0, "slippage_pct": 0.0005},
)
assert resp.status_code == 202
data = resp.json()
assert "run_uuid" in data
assert data["status"] == "running"