trading/backtester/metrics.py
Viktor Barzin cd75c4ab7e
Some checks failed
ci/woodpecker/push/woodpecker Pipeline was canceled
feat(backtester): extend compute_metrics with alpha/beta/winners/best
In-place extension (no fork). Existing tests still pass; new fields are
optional and None when no benchmark is supplied.
2026-05-24 00:57:42 +00:00

400 lines
13 KiB
Python

"""Performance metrics for backtesting results.
Computes standard risk and return metrics from the trade log and equity
curve produced by a backtest run.
"""
from __future__ import annotations
import logging
import math
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Any
import pandas as pd
from shared.schemas.trading import OrderSide, TradeExecution
logger = logging.getLogger(__name__)
@dataclass
class BacktestResult:
"""Container for all computed backtest metrics.
Attributes
----------
total_return:
``(final - initial) / initial * 100`` as a percentage.
annualized_return:
Total return annualized using 252 trading days.
sharpe_ratio:
``mean(daily_returns) / std(daily_returns) * sqrt(252)``.
sortino_ratio:
Like Sharpe but using only downside deviation.
max_drawdown_pct:
Maximum peak-to-trough decline as a percentage.
max_drawdown_duration_days:
Duration (in calendar days) of the longest drawdown.
win_rate:
Percentage of winning trades.
avg_win_loss_ratio:
``avg(winning_pnl) / abs(avg(losing_pnl))``.
trade_count:
Total number of round-trip trades.
avg_hold_duration:
Mean hold duration across all round-trip trades.
equity_curve:
List of ``(timestamp, equity)`` snapshots.
trade_log:
Raw list of :class:`TradeExecution` objects.
"""
total_return: float = 0.0
annualized_return: float = 0.0
sharpe_ratio: float = 0.0
sortino_ratio: float = 0.0
max_drawdown_pct: float = 0.0
max_drawdown_duration_days: float = 0.0
win_rate: float = 0.0
avg_win_loss_ratio: float = 0.0
trade_count: int = 0
avg_hold_duration: timedelta = field(default_factory=lambda: timedelta(0))
equity_curve: list[tuple[datetime, Any]] = field(default_factory=list)
trade_log: list[Any] = field(default_factory=list)
# --- Kevin v2 extensions ---
total_return_pct: float = 0.0 # alias for total_return (Kevin-style naming)
trades: list[dict[str, Any]] = field(default_factory=list)
alpha_vs_spy_pct: Decimal | None = None
beta_vs_spy: Decimal | None = None
avg_winner_pct: Decimal | None = None
avg_loser_pct: Decimal | None = None
best_trade: dict[str, Any] | None = None
worst_trade: dict[str, Any] | None = None
def compute_metrics(
trade_log: list[Any],
equity_curve: list[tuple[datetime, Any]],
initial_capital: float | Decimal = 100_000.0,
benchmark_bars: pd.DataFrame | None = None,
) -> BacktestResult:
"""Compute all performance metrics from a backtest run.
Parameters
----------
trade_log:
Chronological list of every executed trade. Accepts either
``TradeExecution`` instances (legacy bar-driven engine) or
dict-shaped per-position rows (Kevin mention-driven engine).
equity_curve:
List of ``(timestamp, portfolio_equity)`` snapshots.
initial_capital:
Starting capital used to compute total return.
benchmark_bars:
Optional benchmark price series (e.g. SPY) used to compute
alpha + beta.
Returns
-------
BacktestResult
Populated metrics dataclass.
"""
is_dict_trades = bool(trade_log) and isinstance(trade_log[0], dict)
initial_float = float(initial_capital)
result = BacktestResult(
equity_curve=equity_curve,
trade_log=trade_log,
trades=trade_log if is_dict_trades else [],
)
if not equity_curve:
if is_dict_trades:
_populate_dict_trade_aggregates(result, trade_log)
_populate_benchmark_metrics(result, equity_curve, benchmark_bars, initial_float)
return result
# ----- Total return -----
final_equity = float(equity_curve[-1][1])
result.total_return = (final_equity - initial_float) / initial_float * 100.0
result.total_return_pct = result.total_return
# ----- Annualized return -----
if len(equity_curve) >= 2:
total_days = (equity_curve[-1][0] - equity_curve[0][0]).days
if total_days > 0:
trading_years = total_days / 365.25
growth_factor = final_equity / initial_float
if growth_factor > 0:
result.annualized_return = (
(growth_factor ** (1.0 / trading_years)) - 1.0
) * 100.0
# ----- Daily returns -----
daily_returns = _compute_daily_returns(equity_curve)
# ----- Sharpe ratio -----
result.sharpe_ratio = _compute_sharpe(daily_returns)
# ----- Sortino ratio -----
result.sortino_ratio = _compute_sortino(daily_returns)
# ----- Max drawdown -----
dd_pct, dd_duration = _compute_max_drawdown(equity_curve)
result.max_drawdown_pct = dd_pct
result.max_drawdown_duration_days = dd_duration
# ----- Round-trip trade analysis -----
if is_dict_trades:
_populate_dict_trade_aggregates(result, trade_log)
else:
round_trips = _build_round_trips(trade_log)
result.trade_count = len(round_trips)
if round_trips:
pnls = [rt["pnl"] for rt in round_trips]
wins = [p for p in pnls if p > 0]
losses = [p for p in pnls if p <= 0]
result.win_rate = (len(wins) / len(pnls)) * 100.0 if pnls else 0.0
avg_win = sum(wins) / len(wins) if wins else 0.0
avg_loss = sum(losses) / len(losses) if losses else 0.0
if avg_loss != 0:
result.avg_win_loss_ratio = abs(avg_win / avg_loss)
elif avg_win > 0:
result.avg_win_loss_ratio = float("inf")
durations = [rt["duration"] for rt in round_trips]
result.avg_hold_duration = sum(durations, timedelta()) / len(durations)
# ----- Benchmark metrics (Kevin extensions) -----
_populate_benchmark_metrics(result, equity_curve, benchmark_bars, initial_float)
return result
def _populate_dict_trade_aggregates(
result: BacktestResult, trade_log: list[dict[str, Any]]
) -> None:
"""Populate trade-level aggregates when trade_log is dict-shaped."""
result.trade_count = len(trade_log)
if not trade_log:
return
closed = [t for t in trade_log if t.get("pnl_pct") is not None]
if not closed:
return
pnls = [float(t["pnl_pct"]) for t in closed]
wins = [p for p in pnls if p > 0]
losses = [p for p in pnls if p <= 0]
result.win_rate = (len(wins) / len(pnls)) * 100.0 if pnls else 0.0
avg_win = sum(wins) / len(wins) if wins else 0.0
avg_loss = sum(losses) / len(losses) if losses else 0.0
if avg_loss != 0:
result.avg_win_loss_ratio = abs(avg_win / avg_loss)
elif avg_win > 0:
result.avg_win_loss_ratio = float("inf")
# Kevin extensions: winners / losers / best / worst (use Decimal pnl_pct if present)
winners_d = [t for t in closed if Decimal(str(t["pnl_pct"])) > 0]
losers_d = [t for t in closed if Decimal(str(t["pnl_pct"])) <= 0]
if winners_d:
total = sum(Decimal(str(t["pnl_pct"])) for t in winners_d)
result.avg_winner_pct = total / Decimal(len(winners_d))
if losers_d:
total_l = sum(Decimal(str(t["pnl_pct"])) for t in losers_d)
result.avg_loser_pct = total_l / Decimal(len(losers_d))
if closed:
best = max(closed, key=lambda t: Decimal(str(t["pnl_pct"])))
worst = min(closed, key=lambda t: Decimal(str(t["pnl_pct"])))
result.best_trade = {
"symbol": best["symbol"],
"pnl_pct": Decimal(str(best["pnl_pct"])),
}
result.worst_trade = {
"symbol": worst["symbol"],
"pnl_pct": Decimal(str(worst["pnl_pct"])),
}
def _populate_benchmark_metrics(
result: BacktestResult,
equity_curve: list[tuple[datetime, Any]],
benchmark_bars: pd.DataFrame | None,
initial_capital: float,
) -> None:
if benchmark_bars is None or benchmark_bars.empty or len(equity_curve) < 2:
return
try:
equity_df = pd.DataFrame(
[(ts, float(eq)) for ts, eq in equity_curve],
columns=["timestamp", "equity"],
).set_index("timestamp")
equity_ret = equity_df["equity"].pct_change().dropna()
spy_close = benchmark_bars["close"].astype(float).pct_change().dropna()
aligned = pd.concat(
[equity_ret, spy_close], axis=1, keys=["s", "spy"]
).dropna()
if len(aligned) >= 2:
cov = aligned["s"].cov(aligned["spy"])
var = aligned["spy"].var()
if var > 0:
result.beta_vs_spy = Decimal(str(round(cov / var, 4)))
spy_total_return = (
float(benchmark_bars["close"].iloc[-1])
/ float(benchmark_bars["close"].iloc[0])
- 1
) * 100
strategy_total_return = (
float(equity_curve[-1][1]) / initial_capital - 1
) * 100
result.alpha_vs_spy_pct = Decimal(
str(round(strategy_total_return - spy_total_return, 4))
)
except Exception:
logger.exception("benchmark metrics failed")
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _compute_daily_returns(equity_curve: list[tuple[datetime, Any]]) -> list[float]:
"""Compute simple daily returns from the equity curve."""
if len(equity_curve) < 2:
return []
returns: list[float] = []
for i in range(1, len(equity_curve)):
prev = float(equity_curve[i - 1][1])
curr = float(equity_curve[i][1])
if prev != 0:
returns.append((curr - prev) / prev)
else:
returns.append(0.0)
return returns
def _compute_sharpe(daily_returns: list[float]) -> float:
"""Sharpe ratio: mean / std * sqrt(252)."""
if len(daily_returns) < 2:
return 0.0
mean_ret = sum(daily_returns) / len(daily_returns)
variance = sum((r - mean_ret) ** 2 for r in daily_returns) / (len(daily_returns) - 1)
std_ret = math.sqrt(variance)
if std_ret == 0:
return 0.0
return (mean_ret / std_ret) * math.sqrt(252)
def _compute_sortino(daily_returns: list[float]) -> float:
"""Sortino ratio: mean / downside_deviation * sqrt(252)."""
if len(daily_returns) < 2:
return 0.0
mean_ret = sum(daily_returns) / len(daily_returns)
downside = [r for r in daily_returns if r < 0]
if not downside:
return 0.0 if mean_ret == 0 else float("inf")
downside_variance = sum(r ** 2 for r in downside) / len(downside)
downside_dev = math.sqrt(downside_variance)
if downside_dev == 0:
return 0.0
return (mean_ret / downside_dev) * math.sqrt(252)
def _compute_max_drawdown(
equity_curve: list[tuple[datetime, Any]],
) -> tuple[float, float]:
"""Compute max drawdown percentage and duration in days.
Returns
-------
tuple[float, float]
``(max_drawdown_pct, max_drawdown_duration_days)``
"""
if len(equity_curve) < 2:
return 0.0, 0.0
peak = float(equity_curve[0][1])
peak_ts = equity_curve[0][0]
max_dd = 0.0
max_dd_duration = 0.0
for ts, equity in equity_curve[1:]:
eq = float(equity)
if eq >= peak:
peak = eq
peak_ts = ts
else:
dd = (peak - eq) / peak * 100.0 if peak > 0 else 0.0
duration = (ts - peak_ts).days
if dd > max_dd:
max_dd = dd
max_dd_duration = duration
return max_dd, max_dd_duration
def _build_round_trips(
trade_log: list[TradeExecution],
) -> list[dict[str, Any]]:
"""Match buys with sells to produce round-trip P&L and duration.
Uses a simple FIFO approach: each BUY opens (or adds to) a
position; each SELL closes (reduces) it.
"""
# ticker -> list of {"qty": float, "price": float, "timestamp": datetime}
open_positions: dict[str, list[dict[str, Any]]] = {}
round_trips: list[dict[str, Any]] = []
for trade in trade_log:
ticker = trade.ticker
if trade.side == OrderSide.BUY:
if ticker not in open_positions:
open_positions[ticker] = []
open_positions[ticker].append({
"qty": trade.qty,
"price": trade.price,
"timestamp": trade.timestamp,
})
elif trade.side == OrderSide.SELL:
if ticker not in open_positions or not open_positions[ticker]:
continue
remaining_sell_qty = trade.qty
while remaining_sell_qty > 0 and open_positions.get(ticker):
entry = open_positions[ticker][0]
matched_qty = min(remaining_sell_qty, entry["qty"])
pnl = (trade.price - entry["price"]) * matched_qty
duration = trade.timestamp - entry["timestamp"]
round_trips.append({
"ticker": ticker,
"qty": matched_qty,
"entry_price": entry["price"],
"exit_price": trade.price,
"pnl": pnl,
"duration": duration,
})
entry["qty"] -= matched_qty
remaining_sell_qty -= matched_qty
if entry["qty"] <= 0:
open_positions[ticker].pop(0)
return round_trips