diff --git a/backtester/kevin_backtest.py b/backtester/kevin_backtest.py new file mode 100644 index 0000000..fe787a8 --- /dev/null +++ b/backtester/kevin_backtest.py @@ -0,0 +1,367 @@ +"""Mention-driven backtest mini-engine for the Kevin strategy. + +Parallel to the bar-driven BacktestEngine. Walks mentions chronologically, +entry at T+1 open, exit at entry_session + holding_days open. Calls the +shared KevinStrategy.evaluate_mention so backtest and live agree. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from decimal import Decimal +from typing import Any, Protocol + +import pandas as pd + +from backtester.metrics import BacktestResult, compute_metrics +from shared.schemas.kevin import ( + KevinAccountState, + KevinDecision, + KevinDecisionType, +) +from shared.strategies.kevin import KevinStrategy + +logger = logging.getLogger(__name__) + + +class PriceLoader(Protocol): + async def daily_bars( + self, symbol: str, start: datetime, end: datetime + ) -> pd.DataFrame: ... + + async def is_tradable(self, symbol: str) -> bool: ... + + async def benchmark_bars( + self, start: datetime, end: datetime + ) -> pd.DataFrame: ... + + +@dataclass +class KevinBacktestParams: + initial_capital: Decimal = Decimal("100000") + slippage_pct: Decimal = Decimal("0.0005") + commission_per_trade: Decimal = Decimal("0") + dedupe_policy: str = "roll" # "roll" | "ignore" + + +@dataclass +class _BacktestTrade: + symbol: str + source_mention_id: int + entry_at: datetime + entry_price: Decimal + qty: Decimal + target_exit_at: datetime + exit_at: datetime | None = None + exit_price: Decimal | None = None + pnl_usd: Decimal | None = None + pnl_pct: Decimal | None = None + holding_days_actual: int | None = None + + +@dataclass +class _Portfolio: + cash: Decimal + open_trades: dict[str, _BacktestTrade] = field(default_factory=dict) + closed_trades: list[_BacktestTrade] = field(default_factory=list) + blocklist_expiry: dict[str, datetime] = field(default_factory=dict) + + def equity_at(self, mark_prices: dict[str, Decimal]) -> Decimal: + total = self.cash + for symbol, trade in self.open_trades.items(): + price = mark_prices.get(symbol, trade.entry_price) + total += trade.qty * price + return total + + def held_dollars(self) -> dict[str, Decimal]: + return {s: t.qty * t.entry_price for s, t in self.open_trades.items()} + + def active_blocklist(self, now: datetime) -> set[str]: + return {s for s, exp in self.blocklist_expiry.items() if exp > now} + + +class KevinBacktestRunner: + def __init__(self, strategy: KevinStrategy, price_loader: PriceLoader) -> None: + self.strategy = strategy + self.price_loader = price_loader + + async def run( + self, mentions: list[Any], params: KevinBacktestParams + ) -> BacktestResult: + if not mentions: + return compute_metrics( + trade_log=[], equity_curve=[], initial_capital=params.initial_capital + ) + + sorted_mentions = sorted(mentions, key=lambda m: m.created_at) + start = sorted_mentions[0].created_at - timedelta(days=1) + end = max(m.created_at for m in sorted_mentions) + timedelta(days=120) + + symbols = sorted({m.symbol for m in sorted_mentions}) + bars: dict[str, pd.DataFrame] = {} + for sym in symbols: + df = await self.price_loader.daily_bars(sym, start, end) + if not df.empty: + bars[sym] = df + + spy_bars = await self.price_loader.benchmark_bars(start, end) + + portfolio = _Portfolio(cash=params.initial_capital) + equity_curve: list[tuple[datetime, Decimal]] = [] + all_dates = _trading_dates(spy_bars) + + for day in all_dates: + # 1. Apply mentions whose created_at falls on or before this trading session + for mention in [ + m for m in sorted_mentions if _entry_day(m.created_at, all_dates) == day + ]: + await self._apply_mention(mention, day, portfolio, bars, params) + + # 2. Roll exits whose target_exit_at <= day + _close_expired(day, portfolio, bars, params) + + # 3. Mark-to-market equity + mark = _mark_prices(bars, portfolio.open_trades, day) + equity_curve.append((day, portfolio.equity_at(mark))) + + # Close any still-open at the last day + if all_dates: + _close_all(all_dates[-1], portfolio, bars, params) + + trades_dict = [self._trade_to_dict(t) for t in portfolio.closed_trades] + return compute_metrics( + trade_log=trades_dict, + equity_curve=equity_curve, + initial_capital=params.initial_capital, + benchmark_bars=spy_bars, + ) + + async def _apply_mention( + self, + mention: Any, + day: datetime, + portfolio: _Portfolio, + bars: dict[str, pd.DataFrame], + params: KevinBacktestParams, + ) -> None: + symbol = mention.symbol + if symbol not in bars: + return # no price data — skip + + is_tradable = await self.price_loader.is_tradable(symbol) + mark = _mark_prices(bars, portfolio.open_trades, day) + equity = portfolio.equity_at(mark) + state = KevinAccountState( + equity_usd=equity, + cash_usd=portfolio.cash, + held_positions=portfolio.held_dollars(), + blocklisted_symbols=portfolio.active_blocklist(day), + daily_trade_count=0, # backtest doesn't enforce daily caps + daily_alloc_usd=Decimal("0"), + paused=False, + ) + + current_price = _price_at(bars[symbol], day, "open") + if current_price is None: + return + + decision = await self.strategy.evaluate_mention( + mention, + state, + effective_conviction=mention.conviction, + current_price=current_price, + is_tradable=is_tradable, + ) + + if decision.decision == KevinDecisionType.OPEN_LONG: + self._open_or_roll(decision, mention, day, portfolio, bars, params) + elif decision.decision == KevinDecisionType.CLOSE_LONG: + self._close_position(symbol, day, portfolio, bars, params) + if mention.action.value == "avoid": + portfolio.blocklist_expiry[symbol] = day + timedelta( + days=self.strategy.config.avoid_blocks_days + ) + + def _open_or_roll( + self, + decision: KevinDecision, + mention: Any, + day: datetime, + portfolio: _Portfolio, + bars: dict[str, pd.DataFrame], + params: KevinBacktestParams, + ) -> None: + symbol = decision.symbol + entry_price = _price_at(bars[symbol], day, "open") + if entry_price is None or decision.target_dollars is None: + return + entry_price *= Decimal("1") + params.slippage_pct + + qty = (decision.target_dollars / entry_price).quantize(Decimal("0.0001")) + if qty <= 0: + return + + cost = qty * entry_price + params.commission_per_trade + if cost > portfolio.cash: + return # insufficient cash in backtest + + # trading days -> calendar days approximation (~7/5 = 1.4) + hold_days = decision.holding_days or 5 + target_exit = day + timedelta(days=int(hold_days * 1.4)) + target_exit = _next_trading_day(target_exit, bars[symbol].index) + + if symbol in portfolio.open_trades: + if params.dedupe_policy == "roll": + portfolio.open_trades[symbol].target_exit_at = max( + portfolio.open_trades[symbol].target_exit_at, target_exit + ) + return # ignore: don't add second position + + portfolio.cash -= cost + portfolio.open_trades[symbol] = _BacktestTrade( + symbol=symbol, + source_mention_id=mention.id, + entry_at=day, + entry_price=entry_price, + qty=qty, + target_exit_at=target_exit, + ) + + def _close_position( + self, + symbol: str, + day: datetime, + portfolio: _Portfolio, + bars: dict[str, pd.DataFrame], + params: KevinBacktestParams, + ) -> None: + if symbol not in portfolio.open_trades: + return + trade = portfolio.open_trades.pop(symbol) + exit_price = _price_at(bars[symbol], day, "open") + if exit_price is None: + exit_price = trade.entry_price # last resort + exit_price *= Decimal("1") - params.slippage_pct + + proceeds = trade.qty * exit_price - params.commission_per_trade + portfolio.cash += proceeds + trade.exit_at = day + trade.exit_price = exit_price + trade.pnl_usd = (exit_price - trade.entry_price) * trade.qty + trade.pnl_pct = ( + (exit_price - trade.entry_price) / trade.entry_price * Decimal("100") + ) + trade.holding_days_actual = (day - trade.entry_at).days + portfolio.closed_trades.append(trade) + + def _trade_to_dict(self, t: _BacktestTrade) -> dict[str, Any]: + return { + "symbol": t.symbol, + "source_mention_id": t.source_mention_id, + "entry_at": t.entry_at, + "entry_price": t.entry_price, + "exit_at": t.exit_at, + "exit_price": t.exit_price, + "qty": t.qty, + "pnl_usd": t.pnl_usd, + "pnl_pct": t.pnl_pct, + "holding_days_actual": t.holding_days_actual, + } + + +# --- helpers --- + + +def _mark_prices( + bars: dict[str, pd.DataFrame], + open_trades: dict[str, _BacktestTrade], + day: datetime, +) -> dict[str, Decimal]: + out: dict[str, Decimal] = {} + for s in open_trades: + if s in bars: + p = _price_at(bars[s], day, "close") + if p is not None: + out[s] = p + return out + + +def _trading_dates(bars: pd.DataFrame) -> list[datetime]: + if bars is None or bars.empty: + return [] + return [d.to_pydatetime().replace(tzinfo=timezone.utc) for d in bars.index] + + +def _entry_day(created_at: datetime, dates: list[datetime]) -> datetime | None: + """Find next trading session AFTER mention's created_at (T+1).""" + target = created_at.date() + for d in dates: + if d.date() > target: + return d + return None + + +def _price_at(df: pd.DataFrame, day: datetime, col: str) -> Decimal | None: + if df is None or df.empty: + return None + matches = df[df.index.date <= day.date()] + if matches.empty: + return None + return Decimal(str(matches.iloc[-1][col])) + + +def _next_trading_day(target: datetime, index: pd.DatetimeIndex) -> datetime: + for d in index: + py_d: datetime = d.to_pydatetime().replace(tzinfo=timezone.utc) + if py_d >= target: + return py_d + last: datetime = index[-1].to_pydatetime().replace(tzinfo=timezone.utc) + return last + + +def _close_expired( + day: datetime, + portfolio: _Portfolio, + bars: dict[str, pd.DataFrame], + params: KevinBacktestParams, +) -> None: + for symbol in list(portfolio.open_trades.keys()): + trade = portfolio.open_trades[symbol] + if trade.target_exit_at <= day: + _force_close(symbol, day, portfolio, bars, params) + + +def _close_all( + day: datetime, + portfolio: _Portfolio, + bars: dict[str, pd.DataFrame], + params: KevinBacktestParams, +) -> None: + for symbol in list(portfolio.open_trades.keys()): + _force_close(symbol, day, portfolio, bars, params) + + +def _force_close( + symbol: str, + day: datetime, + portfolio: _Portfolio, + bars: dict[str, pd.DataFrame], + params: KevinBacktestParams, +) -> None: + trade = portfolio.open_trades.pop(symbol) + exit_price = _price_at(bars[symbol], day, "open") + if exit_price is None: + exit_price = trade.entry_price + exit_price *= Decimal("1") - params.slippage_pct + proceeds = trade.qty * exit_price - params.commission_per_trade + portfolio.cash += proceeds + trade.exit_at = day + trade.exit_price = exit_price + trade.pnl_usd = (exit_price - trade.entry_price) * trade.qty + trade.pnl_pct = ( + (exit_price - trade.entry_price) / trade.entry_price * Decimal("100") + ) + trade.holding_days_actual = (day - trade.entry_at).days + portfolio.closed_trades.append(trade) diff --git a/backtester/kevin_price_loader.py b/backtester/kevin_price_loader.py new file mode 100644 index 0000000..d78fefc --- /dev/null +++ b/backtester/kevin_price_loader.py @@ -0,0 +1,96 @@ +"""Daily bar loader for KevinBacktestRunner. + +Reads from market_data table first; falls back to Alpaca on cache miss +and writes through so subsequent runs are warm. +""" + +from __future__ import annotations + +import logging +from datetime import datetime +from typing import Any + +import pandas as pd +from sqlalchemy import and_, select +from sqlalchemy.ext.asyncio import async_sessionmaker + +from shared.models.timeseries import MarketData + +logger = logging.getLogger(__name__) + + +class KevinPriceLoader: + def __init__( + self, + session_factory: async_sessionmaker, + alpaca_fetcher: Any, + ) -> None: + self.session_factory = session_factory + self.alpaca = alpaca_fetcher + + async def daily_bars( + self, symbol: str, start: datetime, end: datetime + ) -> pd.DataFrame: + async with self.session_factory() as session: + rows = ( + await session.execute( + select( + MarketData.timestamp, + MarketData.open, + MarketData.high, + MarketData.low, + MarketData.close, + MarketData.volume, + ) + .where( + and_( + MarketData.ticker == symbol, + MarketData.timestamp >= start, + MarketData.timestamp <= end, + ) + ) + .order_by(MarketData.timestamp) + ) + ).all() + + if rows: + df = pd.DataFrame( + rows, columns=["timestamp", "open", "high", "low", "close", "volume"] + ) + df = df.set_index("timestamp") + return df + + # cache miss — back-fetch from Alpaca, write through + try: + df = await self.alpaca.fetch_daily_bars(symbol, start, end) + if not df.empty: + await self._write_through(symbol, df) + return df + except Exception as e: + logger.warning("alpaca fetch failed for %s: %s", symbol, e) + return pd.DataFrame() + + async def benchmark_bars(self, start: datetime, end: datetime) -> pd.DataFrame: + return await self.daily_bars("SPY", start, end) + + async def is_tradable(self, symbol: str) -> bool: + try: + return bool(await self.alpaca.is_asset_tradable(symbol)) + except Exception: + return False + + async def _write_through(self, symbol: str, df: pd.DataFrame) -> None: + async with self.session_factory() as session: + for ts, row in df.iterrows(): + session.add( + MarketData( + ticker=symbol, + timestamp=ts.to_pydatetime(), + open=row["open"], + high=row["high"], + low=row["low"], + close=row["close"], + volume=row.get("volume", 0), + ) + ) + await session.commit() diff --git a/backtester/metrics.py b/backtester/metrics.py index b6367c9..ec912e6 100644 --- a/backtester/metrics.py +++ b/backtester/metrics.py @@ -6,13 +6,19 @@ curve produced by a backtest run. from __future__ import annotations +import logging import math from dataclasses import dataclass, field from datetime import datetime, timedelta +from decimal import Decimal from typing import Any +import pandas as pd + from shared.schemas.trading import OrderSide, TradeExecution +logger = logging.getLogger(__name__) + @dataclass class BacktestResult: @@ -56,49 +62,72 @@ class BacktestResult: avg_win_loss_ratio: float = 0.0 trade_count: int = 0 avg_hold_duration: timedelta = field(default_factory=lambda: timedelta(0)) - equity_curve: list[tuple[datetime, float]] = field(default_factory=list) - trade_log: list[TradeExecution] = field(default_factory=list) + equity_curve: list[tuple[datetime, Any]] = field(default_factory=list) + trade_log: list[Any] = field(default_factory=list) + + # --- Kevin v2 extensions --- + total_return_pct: float = 0.0 # alias for total_return (Kevin-style naming) + trades: list[dict[str, Any]] = field(default_factory=list) + alpha_vs_spy_pct: Decimal | None = None + beta_vs_spy: Decimal | None = None + avg_winner_pct: Decimal | None = None + avg_loser_pct: Decimal | None = None + best_trade: dict[str, Any] | None = None + worst_trade: dict[str, Any] | None = None def compute_metrics( - trade_log: list[TradeExecution], - equity_curve: list[tuple[datetime, float]], - initial_capital: float = 100_000.0, + trade_log: list[Any], + equity_curve: list[tuple[datetime, Any]], + initial_capital: float | Decimal = 100_000.0, + benchmark_bars: pd.DataFrame | None = None, ) -> BacktestResult: """Compute all performance metrics from a backtest run. Parameters ---------- trade_log: - Chronological list of every executed trade (buys and sells). + Chronological list of every executed trade. Accepts either + ``TradeExecution`` instances (legacy bar-driven engine) or + dict-shaped per-position rows (Kevin mention-driven engine). equity_curve: List of ``(timestamp, portfolio_equity)`` snapshots. initial_capital: Starting capital used to compute total return. + benchmark_bars: + Optional benchmark price series (e.g. SPY) used to compute + alpha + beta. Returns ------- BacktestResult Populated metrics dataclass. """ + is_dict_trades = bool(trade_log) and isinstance(trade_log[0], dict) + initial_float = float(initial_capital) result = BacktestResult( equity_curve=equity_curve, trade_log=trade_log, + trades=trade_log if is_dict_trades else [], ) if not equity_curve: + if is_dict_trades: + _populate_dict_trade_aggregates(result, trade_log) + _populate_benchmark_metrics(result, equity_curve, benchmark_bars, initial_float) return result # ----- Total return ----- - final_equity = equity_curve[-1][1] - result.total_return = (final_equity - initial_capital) / initial_capital * 100.0 + final_equity = float(equity_curve[-1][1]) + result.total_return = (final_equity - initial_float) / initial_float * 100.0 + result.total_return_pct = result.total_return # ----- Annualized return ----- if len(equity_curve) >= 2: total_days = (equity_curve[-1][0] - equity_curve[0][0]).days if total_days > 0: trading_years = total_days / 365.25 - growth_factor = final_equity / initial_capital + growth_factor = final_equity / initial_float if growth_factor > 0: result.annualized_return = ( (growth_factor ** (1.0 / trading_years)) - 1.0 @@ -119,42 +148,113 @@ def compute_metrics( result.max_drawdown_duration_days = dd_duration # ----- Round-trip trade analysis ----- - round_trips = _build_round_trips(trade_log) - result.trade_count = len(round_trips) + if is_dict_trades: + _populate_dict_trade_aggregates(result, trade_log) + else: + round_trips = _build_round_trips(trade_log) + result.trade_count = len(round_trips) - if round_trips: - pnls = [rt["pnl"] for rt in round_trips] - wins = [p for p in pnls if p > 0] - losses = [p for p in pnls if p <= 0] + if round_trips: + pnls = [rt["pnl"] for rt in round_trips] + wins = [p for p in pnls if p > 0] + losses = [p for p in pnls if p <= 0] - result.win_rate = (len(wins) / len(pnls)) * 100.0 if pnls else 0.0 + result.win_rate = (len(wins) / len(pnls)) * 100.0 if pnls else 0.0 - avg_win = sum(wins) / len(wins) if wins else 0.0 - avg_loss = sum(losses) / len(losses) if losses else 0.0 - if avg_loss != 0: - result.avg_win_loss_ratio = abs(avg_win / avg_loss) - elif avg_win > 0: - result.avg_win_loss_ratio = float("inf") + avg_win = sum(wins) / len(wins) if wins else 0.0 + avg_loss = sum(losses) / len(losses) if losses else 0.0 + if avg_loss != 0: + result.avg_win_loss_ratio = abs(avg_win / avg_loss) + elif avg_win > 0: + result.avg_win_loss_ratio = float("inf") - durations = [rt["duration"] for rt in round_trips] - result.avg_hold_duration = sum(durations, timedelta()) / len(durations) + durations = [rt["duration"] for rt in round_trips] + result.avg_hold_duration = sum(durations, timedelta()) / len(durations) + + # ----- Benchmark metrics (Kevin extensions) ----- + _populate_benchmark_metrics(result, equity_curve, benchmark_bars, initial_float) return result +def _populate_dict_trade_aggregates( + result: BacktestResult, trade_log: list[dict[str, Any]] +) -> None: + """Populate trade-level aggregates when trade_log is dict-shaped.""" + result.trade_count = len(trade_log) + if not trade_log: + return + + closed = [ + t for t in trade_log if t.get("pnl_pct") is not None + ] + if not closed: + return + + pnls = [float(t["pnl_pct"]) for t in closed] + wins = [p for p in pnls if p > 0] + losses = [p for p in pnls if p <= 0] + result.win_rate = (len(wins) / len(pnls)) * 100.0 if pnls else 0.0 + + avg_win = sum(wins) / len(wins) if wins else 0.0 + avg_loss = sum(losses) / len(losses) if losses else 0.0 + if avg_loss != 0: + result.avg_win_loss_ratio = abs(avg_win / avg_loss) + elif avg_win > 0: + result.avg_win_loss_ratio = float("inf") + + +def _populate_benchmark_metrics( + result: BacktestResult, + equity_curve: list[tuple[datetime, Any]], + benchmark_bars: pd.DataFrame | None, + initial_capital: float, +) -> None: + if benchmark_bars is None or benchmark_bars.empty or len(equity_curve) < 2: + return + try: + equity_df = pd.DataFrame( + [(ts, float(eq)) for ts, eq in equity_curve], + columns=["timestamp", "equity"], + ).set_index("timestamp") + equity_ret = equity_df["equity"].pct_change().dropna() + spy_close = benchmark_bars["close"].astype(float).pct_change().dropna() + aligned = pd.concat( + [equity_ret, spy_close], axis=1, keys=["s", "spy"] + ).dropna() + if len(aligned) >= 2: + cov = aligned["s"].cov(aligned["spy"]) + var = aligned["spy"].var() + if var > 0: + result.beta_vs_spy = Decimal(str(round(cov / var, 4))) + spy_total_return = ( + float(benchmark_bars["close"].iloc[-1]) + / float(benchmark_bars["close"].iloc[0]) + - 1 + ) * 100 + strategy_total_return = ( + float(equity_curve[-1][1]) / initial_capital - 1 + ) * 100 + result.alpha_vs_spy_pct = Decimal( + str(round(strategy_total_return - spy_total_return, 4)) + ) + except Exception: + logger.exception("benchmark metrics failed") + + # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ -def _compute_daily_returns(equity_curve: list[tuple[datetime, float]]) -> list[float]: +def _compute_daily_returns(equity_curve: list[tuple[datetime, Any]]) -> list[float]: """Compute simple daily returns from the equity curve.""" if len(equity_curve) < 2: return [] returns: list[float] = [] for i in range(1, len(equity_curve)): - prev = equity_curve[i - 1][1] - curr = equity_curve[i][1] + prev = float(equity_curve[i - 1][1]) + curr = float(equity_curve[i][1]) if prev != 0: returns.append((curr - prev) / prev) else: @@ -198,7 +298,7 @@ def _compute_sortino(daily_returns: list[float]) -> float: def _compute_max_drawdown( - equity_curve: list[tuple[datetime, float]], + equity_curve: list[tuple[datetime, Any]], ) -> tuple[float, float]: """Compute max drawdown percentage and duration in days. @@ -210,17 +310,18 @@ def _compute_max_drawdown( if len(equity_curve) < 2: return 0.0, 0.0 - peak = equity_curve[0][1] + peak = float(equity_curve[0][1]) peak_ts = equity_curve[0][0] max_dd = 0.0 max_dd_duration = 0.0 for ts, equity in equity_curve[1:]: - if equity >= peak: - peak = equity + eq = float(equity) + if eq >= peak: + peak = eq peak_ts = ts else: - dd = (peak - equity) / peak * 100.0 if peak > 0 else 0.0 + dd = (peak - eq) / peak * 100.0 if peak > 0 else 0.0 duration = (ts - peak_ts).days if dd > max_dd: max_dd = dd diff --git a/shared/strategies/kevin.py b/shared/strategies/kevin.py index 484242a..09915d2 100644 --- a/shared/strategies/kevin.py +++ b/shared/strategies/kevin.py @@ -57,8 +57,10 @@ class KevinStrategy: is_tradable: bool, ) -> KevinDecision: symbol = mention.symbol - action = mention.action - horizon = mention.time_horizon + # Normalize the action/horizon to their str value so the strategy works + # with both SQLAlchemy enum instances and lightweight stubs (backtest). + action_value = getattr(mention.action, "value", mention.action) + horizon_value = getattr(mention.time_horizon, "value", mention.time_horizon) # 1. Common no-trade gates if not is_tradable: @@ -76,15 +78,15 @@ class KevinStrategy: ) # 2. Action-specific gates - if action in (TickerAction.HOLD, TickerAction.WATCH): + if action_value in (TickerAction.HOLD.value, TickerAction.WATCH.value): return KevinDecision( decision=KevinDecisionType.NO_OP, symbol=symbol, - rationale=f"action={action.value} is UI-only, never trades", + rationale=f"action={action_value} is UI-only, never trades", ) # 3. SELL — close long if held, else no-op - if action == TickerAction.SELL: + if action_value == TickerAction.SELL.value: if account.is_held(symbol): return KevinDecision( decision=KevinDecisionType.CLOSE_LONG, @@ -99,7 +101,7 @@ class KevinStrategy: ) # 4. AVOID — close long if held + bridge will add blocklist (side effect) - if action == TickerAction.AVOID: + if action_value == TickerAction.AVOID.value: if account.is_held(symbol) and self.config.avoid_closes_longs: return KevinDecision( decision=KevinDecisionType.CLOSE_LONG, @@ -120,7 +122,7 @@ class KevinStrategy: ) # 5. BUY path — full filter stack - assert action == TickerAction.BUY + assert action_value == TickerAction.BUY.value if effective_conviction < self.config.min_conviction: return KevinDecision( @@ -132,7 +134,7 @@ class KevinStrategy: ), ) - if horizon == TimeHorizon.INTRADAY: + if horizon_value == TimeHorizon.INTRADAY.value: return KevinDecision( decision=KevinDecisionType.NO_OP, symbol=symbol, @@ -197,7 +199,7 @@ class KevinStrategy: target_dollars = target_dollars.quantize(Decimal("0.01")) holding_days = self.config.hold_days_by_horizon.get( - horizon.value, self.config.hold_days_by_horizon["unspecified"] + horizon_value, self.config.hold_days_by_horizon["unspecified"] ) return KevinDecision( diff --git a/tests/backtester/__init__.py b/tests/backtester/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/backtester/test_kevin_backtest.py b/tests/backtester/test_kevin_backtest.py new file mode 100644 index 0000000..5452782 --- /dev/null +++ b/tests/backtester/test_kevin_backtest.py @@ -0,0 +1,187 @@ +"""Tests for the mention-driven Kevin backtest mini-engine.""" + +from datetime import datetime, timedelta, timezone +from decimal import Decimal + +import pandas as pd +import pytest + +from backtester.kevin_backtest import ( + KevinBacktestParams, + KevinBacktestRunner, +) +from backtester.metrics import BacktestResult +from shared.strategies.kevin import KevinStrategy, KevinStrategyConfig + + +class _StubPriceLoader: + """In-memory bars; behaves like the real KevinPriceLoader.""" + + def __init__(self, bars_by_symbol: dict[str, pd.DataFrame]): + self.bars = bars_by_symbol + self.spy = bars_by_symbol.get("SPY") + + async def daily_bars(self, symbol, start, end): + return self.bars.get(symbol, pd.DataFrame()) + + async def is_tradable(self, symbol): + return symbol in self.bars + + async def benchmark_bars(self, start, end): + return self.spy if self.spy is not None else pd.DataFrame() + + +def _mention(symbol, action, conviction, horizon, days_ago): + return type( + "M", + (), + { + "id": days_ago, + "symbol": symbol, + "action": type("A", (), {"value": action})(), + "conviction": Decimal(conviction), + "time_horizon": type("H", (), {"value": horizon})(), + "created_at": datetime(2026, 5, 15, 14, 0, tzinfo=timezone.utc) + + timedelta(days=days_ago), + }, + ) + + +def _bars(symbol, start_date, prices): + """Build a daily-bar DataFrame indexed by date.""" + dates = pd.date_range(start_date, periods=len(prices), freq="B", tz="UTC") + return pd.DataFrame( + { + "open": prices, + "high": prices, + "low": prices, + "close": prices, + }, + index=dates, + ) + + +@pytest.fixture +def cfg() -> KevinStrategyConfig: + return KevinStrategyConfig( + min_conviction=Decimal("0.6"), + max_mention_age_hours=48 * 365, # effectively no age filter for backtest + base_position_pct=Decimal("0.04"), + min_trade_usd=Decimal("500"), + max_trade_usd=Decimal("5000"), + max_per_ticker_usd=Decimal("7500"), + hold_days_by_horizon={ + "days": 3, + "weeks": 5, + "months": 10, + "long_term": 15, + "unspecified": 5, + }, + avoid_closes_longs=True, + avoid_blocks_days=7, + ) + + +async def test_backtest_emits_winning_trade(cfg): + # NVDA: enters at $100 day 0, exits at $110 day 5 = +10% + bars = { + "NVDA": _bars("NVDA", "2026-05-15", [100, 102, 104, 106, 108, 110, 112]), + "SPY": _bars("SPY", "2026-05-15", [500, 501, 502, 503, 504, 505, 506]), + } + strategy = KevinStrategy(cfg) + mentions = [_mention("NVDA", "buy", "0.8", "weeks", 0)] + + runner = KevinBacktestRunner(strategy, _StubPriceLoader(bars)) + result = await runner.run( + mentions, + KevinBacktestParams( + initial_capital=Decimal("100000"), + slippage_pct=Decimal("0.0005"), + ), + ) + + assert isinstance(result, BacktestResult) + assert result.trade_count == 1 + assert result.total_return_pct > 0 + # exit was triggered by holding period (5 trading days) + + +async def test_backtest_filters_low_conviction(cfg): + bars = { + "NVDA": _bars("NVDA", "2026-05-15", [100, 105, 110, 115, 120, 125]), + "SPY": _bars("SPY", "2026-05-15", [500] * 6), + } + strategy = KevinStrategy(cfg) + mentions = [_mention("NVDA", "buy", "0.5", "weeks", 0)] # below 0.6 floor + + runner = KevinBacktestRunner(strategy, _StubPriceLoader(bars)) + result = await runner.run(mentions, KevinBacktestParams()) + assert result.trade_count == 0 + + +async def test_backtest_dedupe_roll_extends_exit(cfg): + # Two BUYs on same ticker within hold window; exit should extend + bars = { + "NVDA": _bars("NVDA", "2026-05-15", [100] * 20), + "SPY": _bars("SPY", "2026-05-15", [500] * 20), + } + strategy = KevinStrategy(cfg) + mentions = [ + _mention("NVDA", "buy", "0.7", "weeks", 0), + _mention("NVDA", "buy", "0.7", "weeks", 3), + ] + runner = KevinBacktestRunner(strategy, _StubPriceLoader(bars)) + result = await runner.run( + mentions, + KevinBacktestParams(dedupe_policy="roll"), + ) + # Exit at day 3 + 5 = 8, not day 0 + 5 = 5 + assert result.trade_count == 1 + closed = result.trades[0] + assert closed["holding_days_actual"] >= 5 + + +async def test_backtest_sell_mid_position_closes_early(cfg): + bars = { + "NVDA": _bars("NVDA", "2026-05-15", [100, 105, 110, 95, 90, 85, 80]), + "SPY": _bars("SPY", "2026-05-15", [500] * 7), + } + strategy = KevinStrategy(cfg) + mentions = [ + _mention("NVDA", "buy", "0.8", "weeks", 0), + _mention("NVDA", "sell", "0.9", "days", 2), + ] + runner = KevinBacktestRunner(strategy, _StubPriceLoader(bars)) + result = await runner.run(mentions, KevinBacktestParams()) + assert result.trade_count == 1 + assert result.trades[0]["holding_days_actual"] <= 5 + + +async def test_backtest_handles_missing_bars(cfg): + bars = { + "SPY": _bars("SPY", "2026-05-15", [500] * 5), + # NVDA missing + } + strategy = KevinStrategy(cfg) + mentions = [_mention("NVDA", "buy", "0.8", "weeks", 0)] + runner = KevinBacktestRunner(strategy, _StubPriceLoader(bars)) + result = await runner.run(mentions, KevinBacktestParams()) + # Mention skipped (no price data); no trade + assert result.trade_count == 0 + + +async def test_backtest_computes_alpha_vs_spy(cfg): + # NVDA +10%, SPY flat -> positive alpha + bars = { + "NVDA": _bars("NVDA", "2026-05-15", [100, 100, 100, 100, 100, 110, 110]), + "SPY": _bars("SPY", "2026-05-15", [500] * 7), + } + strategy = KevinStrategy(cfg) + mentions = [_mention("NVDA", "buy", "0.8", "weeks", 0)] + runner = KevinBacktestRunner(strategy, _StubPriceLoader(bars)) + result = await runner.run( + mentions, + KevinBacktestParams(initial_capital=Decimal("100000")), + ) + assert result.alpha_vs_spy_pct is not None + assert result.alpha_vs_spy_pct > 0