feat: add 6 new strategies (value, MACD, Bollinger, VWAP, liquidity, MA stack)
This commit is contained in:
parent
0530f496ca
commit
4d6bebe6f7
8 changed files with 1366 additions and 0 deletions
|
|
@ -10,16 +10,40 @@ MeanReversionStrategy
|
|||
RSI-based mean reversion strategy.
|
||||
NewsDrivenStrategy
|
||||
News sentiment driven strategy.
|
||||
ValueStrategy
|
||||
Fundamental valuation strategy.
|
||||
MACDCrossoverStrategy
|
||||
MACD / signal line crossover strategy.
|
||||
BollingerBreakoutStrategy
|
||||
Bollinger Band breakout strategy.
|
||||
VWAPStrategy
|
||||
VWAP crossover strategy.
|
||||
LiquidityStrategy
|
||||
Volume anomaly and divergence strategy.
|
||||
MAStackStrategy
|
||||
Moving average stack alignment strategy.
|
||||
"""
|
||||
|
||||
from shared.strategies.base import BaseStrategy
|
||||
from shared.strategies.bollinger_breakout import BollingerBreakoutStrategy
|
||||
from shared.strategies.liquidity import LiquidityStrategy
|
||||
from shared.strategies.macd_crossover import MACDCrossoverStrategy
|
||||
from shared.strategies.ma_stack import MAStackStrategy
|
||||
from shared.strategies.mean_reversion import MeanReversionStrategy
|
||||
from shared.strategies.momentum import MomentumStrategy
|
||||
from shared.strategies.news_driven import NewsDrivenStrategy
|
||||
from shared.strategies.value import ValueStrategy
|
||||
from shared.strategies.vwap import VWAPStrategy
|
||||
|
||||
__all__ = [
|
||||
"BaseStrategy",
|
||||
"BollingerBreakoutStrategy",
|
||||
"LiquidityStrategy",
|
||||
"MACDCrossoverStrategy",
|
||||
"MAStackStrategy",
|
||||
"MeanReversionStrategy",
|
||||
"MomentumStrategy",
|
||||
"NewsDrivenStrategy",
|
||||
"ValueStrategy",
|
||||
"VWAPStrategy",
|
||||
]
|
||||
|
|
|
|||
84
shared/strategies/bollinger_breakout.py
Normal file
84
shared/strategies/bollinger_breakout.py
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
"""Bollinger Band breakout strategy — trade on price breaching Bollinger Bands."""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
|
||||
from shared.strategies.base import BaseStrategy
|
||||
|
||||
|
||||
class BollingerBreakoutStrategy(BaseStrategy):
|
||||
"""Generate signals when price breaks through Bollinger Bands.
|
||||
|
||||
**Buy signal** (LONG):
|
||||
- Price > upper band AND volume > 1.5x average volume (momentum
|
||||
breakout), or
|
||||
- Price < lower band (mean reversion bounce).
|
||||
|
||||
**Sell signal**: None — this strategy only generates LONG signals.
|
||||
|
||||
Signal strength = distance from the relevant band / band_width,
|
||||
clamped to [0, 1].
|
||||
"""
|
||||
|
||||
name: str = "bollinger_breakout"
|
||||
|
||||
async def evaluate(
|
||||
self,
|
||||
ticker: str,
|
||||
market: MarketSnapshot,
|
||||
sentiment: SentimentContext | None = None,
|
||||
) -> TradeSignal | None:
|
||||
if (
|
||||
market.bollinger_upper is None
|
||||
or market.bollinger_mid is None
|
||||
or market.bollinger_lower is None
|
||||
):
|
||||
return None
|
||||
|
||||
price = market.current_price
|
||||
upper = market.bollinger_upper
|
||||
lower = market.bollinger_lower
|
||||
band_width = upper - lower
|
||||
|
||||
if band_width <= 0:
|
||||
return None
|
||||
|
||||
avg_volume = self._avg_volume(market.bars)
|
||||
|
||||
direction: SignalDirection | None = None
|
||||
distance: float = 0.0
|
||||
|
||||
if price > upper and market.volume > 1.5 * avg_volume:
|
||||
# Momentum breakout above upper band on high volume.
|
||||
direction = SignalDirection.LONG
|
||||
distance = price - upper
|
||||
elif price < lower:
|
||||
# Mean reversion bounce off lower band.
|
||||
direction = SignalDirection.LONG
|
||||
distance = lower - price
|
||||
else:
|
||||
return None
|
||||
|
||||
raw_strength = distance / band_width
|
||||
strength = max(0.0, min(1.0, raw_strength))
|
||||
|
||||
return TradeSignal(
|
||||
ticker=ticker,
|
||||
direction=direction,
|
||||
strength=strength,
|
||||
strategy_sources=[self.name],
|
||||
timestamp=datetime.now(tz=timezone.utc),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _avg_volume(bars: list[dict]) -> float:
|
||||
"""Compute average volume from historical bars.
|
||||
|
||||
Returns a fallback of 0.0 if no bars are available so the volume
|
||||
comparison always works (volume > 1.5 * 0 is always true when
|
||||
volume > 0).
|
||||
"""
|
||||
if not bars:
|
||||
return 0.0
|
||||
volumes = [b.get("volume", 0) for b in bars if "volume" in b]
|
||||
return sum(volumes) / len(volumes) if volumes else 0.0
|
||||
77
shared/strategies/liquidity.py
Normal file
77
shared/strategies/liquidity.py
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
"""Liquidity strategy — trade on volume anomalies and volume-price divergence."""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
|
||||
from shared.strategies.base import BaseStrategy
|
||||
|
||||
|
||||
class LiquidityStrategy(BaseStrategy):
|
||||
"""Generate signals based on relative volume and volume-price relationships.
|
||||
|
||||
Requires at least 5 bars of historical data to compute average volume.
|
||||
|
||||
**No signal** if relative_volume < 1.0 (thin liquidity).
|
||||
|
||||
**Buy signal** (LONG):
|
||||
relative_volume >= 2.0 AND price is rising (close > open).
|
||||
|
||||
**Sell signal** (SHORT):
|
||||
- relative_volume >= 2.0 AND price is falling (close < open), or
|
||||
- Price is rising on declining volume (relative_volume < 0.7) —
|
||||
bearish divergence.
|
||||
|
||||
Signal strength = ``relative_volume / 4.0``, clamped to [0, 1].
|
||||
"""
|
||||
|
||||
name: str = "liquidity"
|
||||
|
||||
async def evaluate(
|
||||
self,
|
||||
ticker: str,
|
||||
market: MarketSnapshot,
|
||||
sentiment: SentimentContext | None = None,
|
||||
) -> TradeSignal | None:
|
||||
if len(market.bars) < 5:
|
||||
return None
|
||||
|
||||
# Compute average volume from bars.
|
||||
volumes = [b.get("volume", 0) for b in market.bars if "volume" in b]
|
||||
if not volumes:
|
||||
return None
|
||||
avg_volume = sum(volumes) / len(volumes)
|
||||
if avg_volume <= 0:
|
||||
return None
|
||||
|
||||
relative_volume = market.volume / avg_volume
|
||||
|
||||
price_rising = market.close > market.open
|
||||
|
||||
direction: SignalDirection | None = None
|
||||
|
||||
# Bearish divergence: price rising on declining volume.
|
||||
if price_rising and relative_volume < 0.7:
|
||||
direction = SignalDirection.SHORT
|
||||
elif relative_volume < 1.0:
|
||||
# Thin liquidity — no signal.
|
||||
return None
|
||||
elif relative_volume >= 2.0:
|
||||
if price_rising:
|
||||
direction = SignalDirection.LONG
|
||||
elif market.close < market.open:
|
||||
direction = SignalDirection.SHORT
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
raw_strength = relative_volume / 4.0
|
||||
strength = max(0.0, min(1.0, raw_strength))
|
||||
|
||||
return TradeSignal(
|
||||
ticker=ticker,
|
||||
direction=direction,
|
||||
strength=strength,
|
||||
strategy_sources=[self.name],
|
||||
timestamp=datetime.now(tz=timezone.utc),
|
||||
)
|
||||
114
shared/strategies/ma_stack.py
Normal file
114
shared/strategies/ma_stack.py
Normal file
|
|
@ -0,0 +1,114 @@
|
|||
"""Moving Average Stack strategy — trade on alignment of multiple moving averages."""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
|
||||
from shared.strategies.base import BaseStrategy
|
||||
|
||||
|
||||
class MAStackStrategy(BaseStrategy):
|
||||
"""Generate signals based on moving average alignment (stacking).
|
||||
|
||||
Checks whether the price and four moving averages (EMA-9, EMA-21,
|
||||
SMA-50, SMA-200) are aligned in bullish or bearish order. Also
|
||||
detects golden/death cross (SMA-50 vs SMA-200).
|
||||
|
||||
**Buy signal** (LONG):
|
||||
Bull alignment score >= 3 (at least 3 of 4 ordering conditions met).
|
||||
|
||||
**Sell signal** (SHORT):
|
||||
Bear alignment score >= 3.
|
||||
|
||||
Signal strength = ``score / 4.0 + cross_bonus``, clamped to [0, 1].
|
||||
Cross bonus = 0.15 for golden cross, -0.15 for death cross (applied
|
||||
only when direction agrees).
|
||||
"""
|
||||
|
||||
name: str = "ma_stack"
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._prev_sma_50: dict[str, float] = {}
|
||||
self._prev_sma_200: dict[str, float] = {}
|
||||
|
||||
async def evaluate(
|
||||
self,
|
||||
ticker: str,
|
||||
market: MarketSnapshot,
|
||||
sentiment: SentimentContext | None = None,
|
||||
) -> TradeSignal | None:
|
||||
if (
|
||||
market.ema_9 is None
|
||||
or market.ema_21 is None
|
||||
or market.sma_50 is None
|
||||
or market.sma_200 is None
|
||||
):
|
||||
return None
|
||||
|
||||
price = market.current_price
|
||||
ema_9 = market.ema_9
|
||||
ema_21 = market.ema_21
|
||||
sma_50 = market.sma_50
|
||||
sma_200 = market.sma_200
|
||||
|
||||
# Count bull alignment: price > ema_9 > ema_21 > sma_50 > sma_200
|
||||
bull_score = 0
|
||||
if price > ema_9:
|
||||
bull_score += 1
|
||||
if ema_9 > ema_21:
|
||||
bull_score += 1
|
||||
if ema_21 > sma_50:
|
||||
bull_score += 1
|
||||
if sma_50 > sma_200:
|
||||
bull_score += 1
|
||||
|
||||
# Count bear alignment: price < ema_9 < ema_21 < sma_50 < sma_200
|
||||
bear_score = 0
|
||||
if price < ema_9:
|
||||
bear_score += 1
|
||||
if ema_9 < ema_21:
|
||||
bear_score += 1
|
||||
if ema_21 < sma_50:
|
||||
bear_score += 1
|
||||
if sma_50 < sma_200:
|
||||
bear_score += 1
|
||||
|
||||
# Detect golden/death cross (SMA-50 vs SMA-200).
|
||||
cross_bonus = 0.0
|
||||
if ticker in self._prev_sma_50:
|
||||
prev_50 = self._prev_sma_50[ticker]
|
||||
prev_200 = self._prev_sma_200[ticker]
|
||||
# Golden cross: SMA-50 crosses above SMA-200.
|
||||
if prev_50 <= prev_200 and sma_50 > sma_200:
|
||||
cross_bonus = 0.15
|
||||
# Death cross: SMA-50 crosses below SMA-200.
|
||||
elif prev_50 >= prev_200 and sma_50 < sma_200:
|
||||
cross_bonus = -0.15
|
||||
|
||||
# Update stored state.
|
||||
self._prev_sma_50[ticker] = sma_50
|
||||
self._prev_sma_200[ticker] = sma_200
|
||||
|
||||
# Determine direction.
|
||||
if bull_score >= 3:
|
||||
direction = SignalDirection.LONG
|
||||
score = bull_score
|
||||
# Only apply positive cross bonus for LONG.
|
||||
bonus = max(0.0, cross_bonus)
|
||||
elif bear_score >= 3:
|
||||
direction = SignalDirection.SHORT
|
||||
score = bear_score
|
||||
# Only apply negative cross bonus (as positive value) for SHORT.
|
||||
bonus = abs(min(0.0, cross_bonus))
|
||||
else:
|
||||
return None
|
||||
|
||||
raw_strength = score / 4.0 + bonus
|
||||
strength = max(0.0, min(1.0, raw_strength))
|
||||
|
||||
return TradeSignal(
|
||||
ticker=ticker,
|
||||
direction=direction,
|
||||
strength=strength,
|
||||
strategy_sources=[self.name],
|
||||
timestamp=datetime.now(tz=timezone.utc),
|
||||
)
|
||||
82
shared/strategies/macd_crossover.py
Normal file
82
shared/strategies/macd_crossover.py
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
"""MACD crossover strategy — trade on MACD/signal line crossovers."""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
|
||||
from shared.strategies.base import BaseStrategy
|
||||
|
||||
|
||||
class MACDCrossoverStrategy(BaseStrategy):
|
||||
"""Detect MACD / signal line crossovers for entry signals.
|
||||
|
||||
Tracks previous MACD and signal values per ticker. On the first call
|
||||
for any ticker the strategy stores state and returns None.
|
||||
|
||||
**Buy signal** (LONG):
|
||||
Bullish crossover — previous ``macd - signal <= 0`` and current
|
||||
``macd - signal > 0``.
|
||||
|
||||
**Sell signal** (SHORT):
|
||||
Bearish crossover — previous ``macd - signal >= 0`` and current
|
||||
``macd - signal < 0``.
|
||||
|
||||
Signal strength = ``abs(histogram) / atr`` (or ``abs(histogram) / 2.0``
|
||||
when ATR is unavailable), clamped to [0, 1].
|
||||
"""
|
||||
|
||||
name: str = "macd_crossover"
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._prev_macd: dict[str, float] = {}
|
||||
self._prev_signal: dict[str, float] = {}
|
||||
|
||||
async def evaluate(
|
||||
self,
|
||||
ticker: str,
|
||||
market: MarketSnapshot,
|
||||
sentiment: SentimentContext | None = None,
|
||||
) -> TradeSignal | None:
|
||||
if market.macd is None or market.macd_signal is None:
|
||||
return None
|
||||
|
||||
macd = market.macd
|
||||
signal = market.macd_signal
|
||||
|
||||
# First call for this ticker — store state only.
|
||||
if ticker not in self._prev_macd:
|
||||
self._prev_macd[ticker] = macd
|
||||
self._prev_signal[ticker] = signal
|
||||
return None
|
||||
|
||||
prev_diff = self._prev_macd[ticker] - self._prev_signal[ticker]
|
||||
curr_diff = macd - signal
|
||||
|
||||
# Update stored state.
|
||||
self._prev_macd[ticker] = macd
|
||||
self._prev_signal[ticker] = signal
|
||||
|
||||
direction: SignalDirection | None = None
|
||||
|
||||
if prev_diff <= 0 and curr_diff > 0:
|
||||
direction = SignalDirection.LONG
|
||||
elif prev_diff >= 0 and curr_diff < 0:
|
||||
direction = SignalDirection.SHORT
|
||||
else:
|
||||
return None
|
||||
|
||||
# Compute strength.
|
||||
histogram = market.macd_histogram if market.macd_histogram is not None else curr_diff
|
||||
if market.atr is not None and market.atr > 0:
|
||||
raw_strength = abs(histogram) / market.atr
|
||||
else:
|
||||
raw_strength = abs(histogram) / 2.0
|
||||
|
||||
strength = max(0.0, min(1.0, raw_strength))
|
||||
|
||||
return TradeSignal(
|
||||
ticker=ticker,
|
||||
direction=direction,
|
||||
strength=strength,
|
||||
strategy_sources=[self.name],
|
||||
timestamp=datetime.now(tz=timezone.utc),
|
||||
)
|
||||
98
shared/strategies/value.py
Normal file
98
shared/strategies/value.py
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
"""Value strategy — trade on fundamental valuation metrics."""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
|
||||
from shared.strategies.base import BaseStrategy
|
||||
|
||||
|
||||
class ValueStrategy(BaseStrategy):
|
||||
"""Generate signals from fundamental financial data.
|
||||
|
||||
Computes a composite score from PEG ratio, P/E ratio, EPS, revenue
|
||||
growth, profit margin, and debt-to-equity ratio.
|
||||
|
||||
**Buy signal** (LONG):
|
||||
Composite score > 0.3 (undervalued).
|
||||
|
||||
**Sell signal** (SHORT):
|
||||
Composite score < -0.3 (overvalued).
|
||||
|
||||
Signal strength = ``abs(score) / 2.0``, clamped to [0, 1].
|
||||
"""
|
||||
|
||||
name: str = "value"
|
||||
|
||||
async def evaluate(
|
||||
self,
|
||||
ticker: str,
|
||||
market: MarketSnapshot,
|
||||
sentiment: SentimentContext | None = None,
|
||||
) -> TradeSignal | None:
|
||||
if market.fundamentals is None:
|
||||
return None
|
||||
|
||||
f = market.fundamentals
|
||||
|
||||
if f.peg_ratio is None or f.pe_ratio is None:
|
||||
return None
|
||||
|
||||
score = 0.0
|
||||
|
||||
# PEG ratio scoring
|
||||
if f.peg_ratio < 1.0:
|
||||
score += 0.3
|
||||
elif f.peg_ratio > 3.0:
|
||||
score -= 0.3
|
||||
|
||||
# P/E ratio scoring
|
||||
if f.pe_ratio < 15:
|
||||
score += 0.3
|
||||
elif f.pe_ratio > 40:
|
||||
score -= 0.3
|
||||
|
||||
# EPS scoring
|
||||
if f.eps_ttm is not None:
|
||||
if f.eps_ttm > 0:
|
||||
score += 0.2
|
||||
elif f.eps_ttm < 0:
|
||||
score -= 0.3
|
||||
|
||||
# Revenue growth scoring
|
||||
if f.revenue_growth_yoy is not None:
|
||||
if f.revenue_growth_yoy > 0.1:
|
||||
score += 0.2
|
||||
elif f.revenue_growth_yoy < -0.1:
|
||||
score -= 0.2
|
||||
|
||||
# Profit margin scoring
|
||||
if f.profit_margin is not None:
|
||||
if f.profit_margin > 0.15:
|
||||
score += 0.1
|
||||
elif f.profit_margin < 0:
|
||||
score -= 0.2
|
||||
|
||||
# Debt-to-equity scoring
|
||||
if f.debt_to_equity is not None:
|
||||
if f.debt_to_equity > 3.0:
|
||||
score -= 0.2
|
||||
elif f.debt_to_equity < 0.5:
|
||||
score += 0.1
|
||||
|
||||
# Determine direction
|
||||
if score > 0.3:
|
||||
direction = SignalDirection.LONG
|
||||
elif score < -0.3:
|
||||
direction = SignalDirection.SHORT
|
||||
else:
|
||||
return None
|
||||
|
||||
strength = max(0.0, min(1.0, abs(score) / 2.0))
|
||||
|
||||
return TradeSignal(
|
||||
ticker=ticker,
|
||||
direction=direction,
|
||||
strength=strength,
|
||||
strategy_sources=[self.name],
|
||||
timestamp=datetime.now(tz=timezone.utc),
|
||||
)
|
||||
91
shared/strategies/vwap.py
Normal file
91
shared/strategies/vwap.py
Normal file
|
|
@ -0,0 +1,91 @@
|
|||
"""VWAP crossover strategy — trade on price crossing the Volume Weighted Average Price."""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
|
||||
from shared.strategies.base import BaseStrategy
|
||||
|
||||
|
||||
class VWAPStrategy(BaseStrategy):
|
||||
"""Generate signals when price crosses above or below VWAP.
|
||||
|
||||
Tracks previous price and VWAP per ticker. The first call for any
|
||||
ticker stores state and returns None.
|
||||
|
||||
**Buy signal** (LONG):
|
||||
Price crosses from below VWAP to above VWAP.
|
||||
|
||||
**Sell signal** (SHORT):
|
||||
Price crosses from above VWAP to below VWAP.
|
||||
|
||||
Signal strength = ``distance_pct * 20 * vol_ratio``, clamped to [0, 1],
|
||||
where ``distance_pct = abs(price - vwap) / vwap`` and ``vol_ratio``
|
||||
is a simple volume multiplier (1.0 by default).
|
||||
"""
|
||||
|
||||
name: str = "vwap"
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._prev_price: dict[str, float] = {}
|
||||
self._prev_vwap: dict[str, float] = {}
|
||||
|
||||
async def evaluate(
|
||||
self,
|
||||
ticker: str,
|
||||
market: MarketSnapshot,
|
||||
sentiment: SentimentContext | None = None,
|
||||
) -> TradeSignal | None:
|
||||
if market.vwap is None:
|
||||
return None
|
||||
|
||||
price = market.current_price
|
||||
vwap = market.vwap
|
||||
|
||||
# First call for this ticker — store state only.
|
||||
if ticker not in self._prev_price:
|
||||
self._prev_price[ticker] = price
|
||||
self._prev_vwap[ticker] = vwap
|
||||
return None
|
||||
|
||||
prev_price = self._prev_price[ticker]
|
||||
prev_vwap = self._prev_vwap[ticker]
|
||||
|
||||
# Update stored state.
|
||||
self._prev_price[ticker] = price
|
||||
self._prev_vwap[ticker] = vwap
|
||||
|
||||
# Detect crossover.
|
||||
prev_above = prev_price > prev_vwap
|
||||
curr_above = price > vwap
|
||||
|
||||
if prev_above == curr_above:
|
||||
# No crossover.
|
||||
return None
|
||||
|
||||
if curr_above:
|
||||
direction = SignalDirection.LONG
|
||||
else:
|
||||
direction = SignalDirection.SHORT
|
||||
|
||||
# Compute strength.
|
||||
distance_pct = abs(price - vwap) / vwap if vwap != 0 else 0.0
|
||||
|
||||
# Volume ratio: use bars average volume if available.
|
||||
vol_ratio = 1.0
|
||||
if market.bars:
|
||||
volumes = [b.get("volume", 0) for b in market.bars if "volume" in b]
|
||||
if volumes:
|
||||
avg_vol = sum(volumes) / len(volumes)
|
||||
if avg_vol > 0:
|
||||
vol_ratio = market.volume / avg_vol
|
||||
|
||||
raw_strength = distance_pct * 20.0 * vol_ratio
|
||||
strength = max(0.0, min(1.0, raw_strength))
|
||||
|
||||
return TradeSignal(
|
||||
ticker=ticker,
|
||||
direction=direction,
|
||||
strength=strength,
|
||||
strategy_sources=[self.name],
|
||||
timestamp=datetime.now(tz=timezone.utc),
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue