feat(kevin): mention-driven backtest mini-engine
Some checks failed
ci/woodpecker/push/woodpecker Pipeline was canceled

Walks mentions chronologically, T+1 entry, time-based exit per
KevinStrategy. Reuses backtester/metrics::compute_metrics for headline
numbers. KevinPriceLoader fronts market_data + Alpaca.
This commit is contained in:
Viktor Barzin 2026-05-24 00:56:57 +00:00
parent 7dcce5ea0e
commit 23ce45a4f2
6 changed files with 794 additions and 41 deletions

View file

@ -6,13 +6,19 @@ curve produced by a backtest run.
from __future__ import annotations
import logging
import math
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Any
import pandas as pd
from shared.schemas.trading import OrderSide, TradeExecution
logger = logging.getLogger(__name__)
@dataclass
class BacktestResult:
@ -56,49 +62,72 @@ class BacktestResult:
avg_win_loss_ratio: float = 0.0
trade_count: int = 0
avg_hold_duration: timedelta = field(default_factory=lambda: timedelta(0))
equity_curve: list[tuple[datetime, float]] = field(default_factory=list)
trade_log: list[TradeExecution] = field(default_factory=list)
equity_curve: list[tuple[datetime, Any]] = field(default_factory=list)
trade_log: list[Any] = field(default_factory=list)
# --- Kevin v2 extensions ---
total_return_pct: float = 0.0 # alias for total_return (Kevin-style naming)
trades: list[dict[str, Any]] = field(default_factory=list)
alpha_vs_spy_pct: Decimal | None = None
beta_vs_spy: Decimal | None = None
avg_winner_pct: Decimal | None = None
avg_loser_pct: Decimal | None = None
best_trade: dict[str, Any] | None = None
worst_trade: dict[str, Any] | None = None
def compute_metrics(
trade_log: list[TradeExecution],
equity_curve: list[tuple[datetime, float]],
initial_capital: float = 100_000.0,
trade_log: list[Any],
equity_curve: list[tuple[datetime, Any]],
initial_capital: float | Decimal = 100_000.0,
benchmark_bars: pd.DataFrame | None = None,
) -> BacktestResult:
"""Compute all performance metrics from a backtest run.
Parameters
----------
trade_log:
Chronological list of every executed trade (buys and sells).
Chronological list of every executed trade. Accepts either
``TradeExecution`` instances (legacy bar-driven engine) or
dict-shaped per-position rows (Kevin mention-driven engine).
equity_curve:
List of ``(timestamp, portfolio_equity)`` snapshots.
initial_capital:
Starting capital used to compute total return.
benchmark_bars:
Optional benchmark price series (e.g. SPY) used to compute
alpha + beta.
Returns
-------
BacktestResult
Populated metrics dataclass.
"""
is_dict_trades = bool(trade_log) and isinstance(trade_log[0], dict)
initial_float = float(initial_capital)
result = BacktestResult(
equity_curve=equity_curve,
trade_log=trade_log,
trades=trade_log if is_dict_trades else [],
)
if not equity_curve:
if is_dict_trades:
_populate_dict_trade_aggregates(result, trade_log)
_populate_benchmark_metrics(result, equity_curve, benchmark_bars, initial_float)
return result
# ----- Total return -----
final_equity = equity_curve[-1][1]
result.total_return = (final_equity - initial_capital) / initial_capital * 100.0
final_equity = float(equity_curve[-1][1])
result.total_return = (final_equity - initial_float) / initial_float * 100.0
result.total_return_pct = result.total_return
# ----- Annualized return -----
if len(equity_curve) >= 2:
total_days = (equity_curve[-1][0] - equity_curve[0][0]).days
if total_days > 0:
trading_years = total_days / 365.25
growth_factor = final_equity / initial_capital
growth_factor = final_equity / initial_float
if growth_factor > 0:
result.annualized_return = (
(growth_factor ** (1.0 / trading_years)) - 1.0
@ -119,42 +148,113 @@ def compute_metrics(
result.max_drawdown_duration_days = dd_duration
# ----- Round-trip trade analysis -----
round_trips = _build_round_trips(trade_log)
result.trade_count = len(round_trips)
if is_dict_trades:
_populate_dict_trade_aggregates(result, trade_log)
else:
round_trips = _build_round_trips(trade_log)
result.trade_count = len(round_trips)
if round_trips:
pnls = [rt["pnl"] for rt in round_trips]
wins = [p for p in pnls if p > 0]
losses = [p for p in pnls if p <= 0]
if round_trips:
pnls = [rt["pnl"] for rt in round_trips]
wins = [p for p in pnls if p > 0]
losses = [p for p in pnls if p <= 0]
result.win_rate = (len(wins) / len(pnls)) * 100.0 if pnls else 0.0
result.win_rate = (len(wins) / len(pnls)) * 100.0 if pnls else 0.0
avg_win = sum(wins) / len(wins) if wins else 0.0
avg_loss = sum(losses) / len(losses) if losses else 0.0
if avg_loss != 0:
result.avg_win_loss_ratio = abs(avg_win / avg_loss)
elif avg_win > 0:
result.avg_win_loss_ratio = float("inf")
avg_win = sum(wins) / len(wins) if wins else 0.0
avg_loss = sum(losses) / len(losses) if losses else 0.0
if avg_loss != 0:
result.avg_win_loss_ratio = abs(avg_win / avg_loss)
elif avg_win > 0:
result.avg_win_loss_ratio = float("inf")
durations = [rt["duration"] for rt in round_trips]
result.avg_hold_duration = sum(durations, timedelta()) / len(durations)
durations = [rt["duration"] for rt in round_trips]
result.avg_hold_duration = sum(durations, timedelta()) / len(durations)
# ----- Benchmark metrics (Kevin extensions) -----
_populate_benchmark_metrics(result, equity_curve, benchmark_bars, initial_float)
return result
def _populate_dict_trade_aggregates(
result: BacktestResult, trade_log: list[dict[str, Any]]
) -> None:
"""Populate trade-level aggregates when trade_log is dict-shaped."""
result.trade_count = len(trade_log)
if not trade_log:
return
closed = [
t for t in trade_log if t.get("pnl_pct") is not None
]
if not closed:
return
pnls = [float(t["pnl_pct"]) for t in closed]
wins = [p for p in pnls if p > 0]
losses = [p for p in pnls if p <= 0]
result.win_rate = (len(wins) / len(pnls)) * 100.0 if pnls else 0.0
avg_win = sum(wins) / len(wins) if wins else 0.0
avg_loss = sum(losses) / len(losses) if losses else 0.0
if avg_loss != 0:
result.avg_win_loss_ratio = abs(avg_win / avg_loss)
elif avg_win > 0:
result.avg_win_loss_ratio = float("inf")
def _populate_benchmark_metrics(
result: BacktestResult,
equity_curve: list[tuple[datetime, Any]],
benchmark_bars: pd.DataFrame | None,
initial_capital: float,
) -> None:
if benchmark_bars is None or benchmark_bars.empty or len(equity_curve) < 2:
return
try:
equity_df = pd.DataFrame(
[(ts, float(eq)) for ts, eq in equity_curve],
columns=["timestamp", "equity"],
).set_index("timestamp")
equity_ret = equity_df["equity"].pct_change().dropna()
spy_close = benchmark_bars["close"].astype(float).pct_change().dropna()
aligned = pd.concat(
[equity_ret, spy_close], axis=1, keys=["s", "spy"]
).dropna()
if len(aligned) >= 2:
cov = aligned["s"].cov(aligned["spy"])
var = aligned["spy"].var()
if var > 0:
result.beta_vs_spy = Decimal(str(round(cov / var, 4)))
spy_total_return = (
float(benchmark_bars["close"].iloc[-1])
/ float(benchmark_bars["close"].iloc[0])
- 1
) * 100
strategy_total_return = (
float(equity_curve[-1][1]) / initial_capital - 1
) * 100
result.alpha_vs_spy_pct = Decimal(
str(round(strategy_total_return - spy_total_return, 4))
)
except Exception:
logger.exception("benchmark metrics failed")
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _compute_daily_returns(equity_curve: list[tuple[datetime, float]]) -> list[float]:
def _compute_daily_returns(equity_curve: list[tuple[datetime, Any]]) -> list[float]:
"""Compute simple daily returns from the equity curve."""
if len(equity_curve) < 2:
return []
returns: list[float] = []
for i in range(1, len(equity_curve)):
prev = equity_curve[i - 1][1]
curr = equity_curve[i][1]
prev = float(equity_curve[i - 1][1])
curr = float(equity_curve[i][1])
if prev != 0:
returns.append((curr - prev) / prev)
else:
@ -198,7 +298,7 @@ def _compute_sortino(daily_returns: list[float]) -> float:
def _compute_max_drawdown(
equity_curve: list[tuple[datetime, float]],
equity_curve: list[tuple[datetime, Any]],
) -> tuple[float, float]:
"""Compute max drawdown percentage and duration in days.
@ -210,17 +310,18 @@ def _compute_max_drawdown(
if len(equity_curve) < 2:
return 0.0, 0.0
peak = equity_curve[0][1]
peak = float(equity_curve[0][1])
peak_ts = equity_curve[0][0]
max_dd = 0.0
max_dd_duration = 0.0
for ts, equity in equity_curve[1:]:
if equity >= peak:
peak = equity
eq = float(equity)
if eq >= peak:
peak = eq
peak_ts = ts
else:
dd = (peak - equity) / peak * 100.0 if peak > 0 else 0.0
dd = (peak - eq) / peak * 100.0 if peak > 0 else 0.0
duration = (ts - peak_ts).days
if dd > max_dd:
max_dd = dd