"""Performance metrics for backtesting results. Computes standard risk and return metrics from the trade log and equity curve produced by a backtest run. """ from __future__ import annotations import logging import math from dataclasses import dataclass, field from datetime import datetime, timedelta from decimal import Decimal from typing import Any import pandas as pd from shared.schemas.trading import OrderSide, TradeExecution logger = logging.getLogger(__name__) @dataclass class BacktestResult: """Container for all computed backtest metrics. Attributes ---------- total_return: ``(final - initial) / initial * 100`` as a percentage. annualized_return: Total return annualized using 252 trading days. sharpe_ratio: ``mean(daily_returns) / std(daily_returns) * sqrt(252)``. sortino_ratio: Like Sharpe but using only downside deviation. max_drawdown_pct: Maximum peak-to-trough decline as a percentage. max_drawdown_duration_days: Duration (in calendar days) of the longest drawdown. win_rate: Percentage of winning trades. avg_win_loss_ratio: ``avg(winning_pnl) / abs(avg(losing_pnl))``. trade_count: Total number of round-trip trades. avg_hold_duration: Mean hold duration across all round-trip trades. equity_curve: List of ``(timestamp, equity)`` snapshots. trade_log: Raw list of :class:`TradeExecution` objects. """ total_return: float = 0.0 annualized_return: float = 0.0 sharpe_ratio: float = 0.0 sortino_ratio: float = 0.0 max_drawdown_pct: float = 0.0 max_drawdown_duration_days: float = 0.0 win_rate: float = 0.0 avg_win_loss_ratio: float = 0.0 trade_count: int = 0 avg_hold_duration: timedelta = field(default_factory=lambda: timedelta(0)) equity_curve: list[tuple[datetime, Any]] = field(default_factory=list) trade_log: list[Any] = field(default_factory=list) # --- Kevin v2 extensions --- total_return_pct: float = 0.0 # alias for total_return (Kevin-style naming) trades: list[dict[str, Any]] = field(default_factory=list) alpha_vs_spy_pct: Decimal | None = None beta_vs_spy: Decimal | None = None avg_winner_pct: Decimal | None = None avg_loser_pct: Decimal | None = None best_trade: dict[str, Any] | None = None worst_trade: dict[str, Any] | None = None def compute_metrics( trade_log: list[Any], equity_curve: list[tuple[datetime, Any]], initial_capital: float | Decimal = 100_000.0, benchmark_bars: pd.DataFrame | None = None, ) -> BacktestResult: """Compute all performance metrics from a backtest run. Parameters ---------- trade_log: Chronological list of every executed trade. Accepts either ``TradeExecution`` instances (legacy bar-driven engine) or dict-shaped per-position rows (Kevin mention-driven engine). equity_curve: List of ``(timestamp, portfolio_equity)`` snapshots. initial_capital: Starting capital used to compute total return. benchmark_bars: Optional benchmark price series (e.g. SPY) used to compute alpha + beta. Returns ------- BacktestResult Populated metrics dataclass. """ is_dict_trades = bool(trade_log) and isinstance(trade_log[0], dict) initial_float = float(initial_capital) result = BacktestResult( equity_curve=equity_curve, trade_log=trade_log, trades=trade_log if is_dict_trades else [], ) if not equity_curve: if is_dict_trades: _populate_dict_trade_aggregates(result, trade_log) _populate_benchmark_metrics(result, equity_curve, benchmark_bars, initial_float) return result # ----- Total return ----- final_equity = float(equity_curve[-1][1]) result.total_return = (final_equity - initial_float) / initial_float * 100.0 result.total_return_pct = result.total_return # ----- Annualized return ----- if len(equity_curve) >= 2: total_days = (equity_curve[-1][0] - equity_curve[0][0]).days if total_days > 0: trading_years = total_days / 365.25 growth_factor = final_equity / initial_float if growth_factor > 0: result.annualized_return = ( (growth_factor ** (1.0 / trading_years)) - 1.0 ) * 100.0 # ----- Daily returns ----- daily_returns = _compute_daily_returns(equity_curve) # ----- Sharpe ratio ----- result.sharpe_ratio = _compute_sharpe(daily_returns) # ----- Sortino ratio ----- result.sortino_ratio = _compute_sortino(daily_returns) # ----- Max drawdown ----- dd_pct, dd_duration = _compute_max_drawdown(equity_curve) result.max_drawdown_pct = dd_pct result.max_drawdown_duration_days = dd_duration # ----- Round-trip trade analysis ----- if is_dict_trades: _populate_dict_trade_aggregates(result, trade_log) else: round_trips = _build_round_trips(trade_log) result.trade_count = len(round_trips) if round_trips: pnls = [rt["pnl"] for rt in round_trips] wins = [p for p in pnls if p > 0] losses = [p for p in pnls if p <= 0] result.win_rate = (len(wins) / len(pnls)) * 100.0 if pnls else 0.0 avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss = sum(losses) / len(losses) if losses else 0.0 if avg_loss != 0: result.avg_win_loss_ratio = abs(avg_win / avg_loss) elif avg_win > 0: result.avg_win_loss_ratio = float("inf") durations = [rt["duration"] for rt in round_trips] result.avg_hold_duration = sum(durations, timedelta()) / len(durations) # ----- Benchmark metrics (Kevin extensions) ----- _populate_benchmark_metrics(result, equity_curve, benchmark_bars, initial_float) return result def _populate_dict_trade_aggregates( result: BacktestResult, trade_log: list[dict[str, Any]] ) -> None: """Populate trade-level aggregates when trade_log is dict-shaped.""" result.trade_count = len(trade_log) if not trade_log: return closed = [t for t in trade_log if t.get("pnl_pct") is not None] if not closed: return pnls = [float(t["pnl_pct"]) for t in closed] wins = [p for p in pnls if p > 0] losses = [p for p in pnls if p <= 0] result.win_rate = (len(wins) / len(pnls)) * 100.0 if pnls else 0.0 avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss = sum(losses) / len(losses) if losses else 0.0 if avg_loss != 0: result.avg_win_loss_ratio = abs(avg_win / avg_loss) elif avg_win > 0: result.avg_win_loss_ratio = float("inf") # Kevin extensions: winners / losers / best / worst (use Decimal pnl_pct if present) winners_d = [t for t in closed if Decimal(str(t["pnl_pct"])) > 0] losers_d = [t for t in closed if Decimal(str(t["pnl_pct"])) <= 0] if winners_d: total = sum(Decimal(str(t["pnl_pct"])) for t in winners_d) result.avg_winner_pct = total / Decimal(len(winners_d)) if losers_d: total_l = sum(Decimal(str(t["pnl_pct"])) for t in losers_d) result.avg_loser_pct = total_l / Decimal(len(losers_d)) if closed: best = max(closed, key=lambda t: Decimal(str(t["pnl_pct"]))) worst = min(closed, key=lambda t: Decimal(str(t["pnl_pct"]))) result.best_trade = { "symbol": best["symbol"], "pnl_pct": Decimal(str(best["pnl_pct"])), } result.worst_trade = { "symbol": worst["symbol"], "pnl_pct": Decimal(str(worst["pnl_pct"])), } def _populate_benchmark_metrics( result: BacktestResult, equity_curve: list[tuple[datetime, Any]], benchmark_bars: pd.DataFrame | None, initial_capital: float, ) -> None: if benchmark_bars is None or benchmark_bars.empty or len(equity_curve) < 2: return try: equity_df = pd.DataFrame( [(ts, float(eq)) for ts, eq in equity_curve], columns=["timestamp", "equity"], ).set_index("timestamp") equity_ret = equity_df["equity"].pct_change().dropna() spy_close = benchmark_bars["close"].astype(float).pct_change().dropna() aligned = pd.concat( [equity_ret, spy_close], axis=1, keys=["s", "spy"] ).dropna() if len(aligned) >= 2: cov = aligned["s"].cov(aligned["spy"]) var = aligned["spy"].var() if var > 0: result.beta_vs_spy = Decimal(str(round(cov / var, 4))) spy_total_return = ( float(benchmark_bars["close"].iloc[-1]) / float(benchmark_bars["close"].iloc[0]) - 1 ) * 100 strategy_total_return = ( float(equity_curve[-1][1]) / initial_capital - 1 ) * 100 result.alpha_vs_spy_pct = Decimal( str(round(strategy_total_return - spy_total_return, 4)) ) except Exception: logger.exception("benchmark metrics failed") # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _compute_daily_returns(equity_curve: list[tuple[datetime, Any]]) -> list[float]: """Compute simple daily returns from the equity curve.""" if len(equity_curve) < 2: return [] returns: list[float] = [] for i in range(1, len(equity_curve)): prev = float(equity_curve[i - 1][1]) curr = float(equity_curve[i][1]) if prev != 0: returns.append((curr - prev) / prev) else: returns.append(0.0) return returns def _compute_sharpe(daily_returns: list[float]) -> float: """Sharpe ratio: mean / std * sqrt(252).""" if len(daily_returns) < 2: return 0.0 mean_ret = sum(daily_returns) / len(daily_returns) variance = sum((r - mean_ret) ** 2 for r in daily_returns) / (len(daily_returns) - 1) std_ret = math.sqrt(variance) if std_ret == 0: return 0.0 return (mean_ret / std_ret) * math.sqrt(252) def _compute_sortino(daily_returns: list[float]) -> float: """Sortino ratio: mean / downside_deviation * sqrt(252).""" if len(daily_returns) < 2: return 0.0 mean_ret = sum(daily_returns) / len(daily_returns) downside = [r for r in daily_returns if r < 0] if not downside: return 0.0 if mean_ret == 0 else float("inf") downside_variance = sum(r ** 2 for r in downside) / len(downside) downside_dev = math.sqrt(downside_variance) if downside_dev == 0: return 0.0 return (mean_ret / downside_dev) * math.sqrt(252) def _compute_max_drawdown( equity_curve: list[tuple[datetime, Any]], ) -> tuple[float, float]: """Compute max drawdown percentage and duration in days. Returns ------- tuple[float, float] ``(max_drawdown_pct, max_drawdown_duration_days)`` """ if len(equity_curve) < 2: return 0.0, 0.0 peak = float(equity_curve[0][1]) peak_ts = equity_curve[0][0] max_dd = 0.0 max_dd_duration = 0.0 for ts, equity in equity_curve[1:]: eq = float(equity) if eq >= peak: peak = eq peak_ts = ts else: dd = (peak - eq) / peak * 100.0 if peak > 0 else 0.0 duration = (ts - peak_ts).days if dd > max_dd: max_dd = dd max_dd_duration = duration return max_dd, max_dd_duration def _build_round_trips( trade_log: list[TradeExecution], ) -> list[dict[str, Any]]: """Match buys with sells to produce round-trip P&L and duration. Uses a simple FIFO approach: each BUY opens (or adds to) a position; each SELL closes (reduces) it. """ # ticker -> list of {"qty": float, "price": float, "timestamp": datetime} open_positions: dict[str, list[dict[str, Any]]] = {} round_trips: list[dict[str, Any]] = [] for trade in trade_log: ticker = trade.ticker if trade.side == OrderSide.BUY: if ticker not in open_positions: open_positions[ticker] = [] open_positions[ticker].append({ "qty": trade.qty, "price": trade.price, "timestamp": trade.timestamp, }) elif trade.side == OrderSide.SELL: if ticker not in open_positions or not open_positions[ticker]: continue remaining_sell_qty = trade.qty while remaining_sell_qty > 0 and open_positions.get(ticker): entry = open_positions[ticker][0] matched_qty = min(remaining_sell_qty, entry["qty"]) pnl = (trade.price - entry["price"]) * matched_qty duration = trade.timestamp - entry["timestamp"] round_trips.append({ "ticker": ticker, "qty": matched_qty, "entry_price": entry["price"], "exit_price": trade.price, "pnl": pnl, "duration": duration, }) entry["qty"] -= matched_qty remaining_sell_qty -= matched_qty if entry["qty"] <= 0: open_positions[ticker].pop(0) return round_trips