feat: add MACD, Bollinger, VWAP, ATR, EMA, SMA-200 indicator computations
This commit is contained in:
parent
6f512cf91f
commit
6c909d12c3
2 changed files with 421 additions and 1 deletions
|
|
@ -1,11 +1,13 @@
|
|||
"""In-memory market data manager with rolling OHLCV windows.
|
||||
|
||||
Maintains a per-ticker deque of recent bars and computes technical
|
||||
indicators (SMA, RSI) on demand when building ``MarketSnapshot`` objects.
|
||||
indicators (SMA, RSI, EMA, MACD, Bollinger Bands, VWAP, ATR) on demand
|
||||
when building ``MarketSnapshot`` objects.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from collections import deque
|
||||
from typing import Any
|
||||
|
||||
|
|
@ -60,6 +62,17 @@ class MarketDataManager:
|
|||
|
||||
latest = bars[-1]
|
||||
closes = [b.close for b in bars]
|
||||
bar_list = list(bars)
|
||||
|
||||
macd_result = self._compute_macd(closes)
|
||||
macd_val = macd_signal = macd_hist = None
|
||||
if macd_result:
|
||||
macd_val, macd_signal, macd_hist = macd_result
|
||||
|
||||
boll_result = self._compute_bollinger(closes)
|
||||
boll_upper = boll_mid = boll_lower = None
|
||||
if boll_result:
|
||||
boll_upper, boll_mid, boll_lower = boll_result
|
||||
|
||||
return MarketSnapshot(
|
||||
ticker=ticker,
|
||||
|
|
@ -71,7 +84,18 @@ class MarketDataManager:
|
|||
volume=latest.volume,
|
||||
sma_20=self._compute_sma(closes, 20),
|
||||
sma_50=self._compute_sma(closes, 50),
|
||||
sma_200=self._compute_sma(closes, 200),
|
||||
rsi=self._compute_rsi(closes, _RSI_PERIOD),
|
||||
ema_9=self._compute_ema(closes, 9),
|
||||
ema_21=self._compute_ema(closes, 21),
|
||||
macd=macd_val,
|
||||
macd_signal=macd_signal,
|
||||
macd_histogram=macd_hist,
|
||||
bollinger_upper=boll_upper,
|
||||
bollinger_mid=boll_mid,
|
||||
bollinger_lower=boll_lower,
|
||||
vwap=self._compute_vwap(bar_list),
|
||||
atr=self._compute_atr(bar_list),
|
||||
bars=[b.model_dump(mode="json") for b in bars],
|
||||
)
|
||||
|
||||
|
|
@ -120,3 +144,136 @@ class MarketDataManager:
|
|||
rs = avg_gain / avg_loss
|
||||
rsi = 100.0 - (100.0 / (1.0 + rs))
|
||||
return round(rsi, 4)
|
||||
|
||||
@staticmethod
|
||||
def _compute_ema(closes: list[float], period: int) -> float | None:
|
||||
"""Compute the exponential moving average over *closes*.
|
||||
|
||||
Seeds with the SMA of the first *period* closes, then applies the
|
||||
EMA multiplier ``2 / (period + 1)`` for each subsequent close.
|
||||
Returns ``None`` if there are fewer than *period* data points.
|
||||
"""
|
||||
if len(closes) < period:
|
||||
return None
|
||||
|
||||
multiplier = 2.0 / (period + 1)
|
||||
# Seed with SMA of first `period` values
|
||||
ema = sum(closes[:period]) / period
|
||||
for price in closes[period:]:
|
||||
ema = (price - ema) * multiplier + ema
|
||||
return round(ema, 6)
|
||||
|
||||
@staticmethod
|
||||
def _compute_macd(
|
||||
closes: list[float],
|
||||
) -> tuple[float, float, float] | None:
|
||||
"""Compute MACD(12, 26, 9).
|
||||
|
||||
Needs at least 35 closes (26 for slow EMA seed + 9 for signal
|
||||
line). Returns ``(macd_line, signal_line, histogram)`` or
|
||||
``None``.
|
||||
"""
|
||||
if len(closes) < 35:
|
||||
return None
|
||||
|
||||
# Helper to compute a running EMA series from closes
|
||||
def _ema_series(data: list[float], period: int) -> list[float]:
|
||||
multiplier = 2.0 / (period + 1)
|
||||
ema = sum(data[:period]) / period
|
||||
result = [ema]
|
||||
for val in data[period:]:
|
||||
ema = (val - ema) * multiplier + ema
|
||||
result.append(ema)
|
||||
return result
|
||||
|
||||
ema12_series = _ema_series(closes, 12)
|
||||
ema26_series = _ema_series(closes, 26)
|
||||
|
||||
# Align: ema12_series starts at index 12, ema26_series at index 26
|
||||
# After ema26 seed, we have len(closes)-26 subsequent values (+1 for seed)
|
||||
# We need to align ema12 to the same time window as ema26
|
||||
# ema26 has values for indices 26..len(closes)-1 (total: len-26+1 entries incl seed at 26)
|
||||
# ema12 has values for indices 12..len(closes)-1 (total: len-12+1 entries incl seed at 12)
|
||||
# Offset into ema12 for index 26 = 26 - 12 = 14
|
||||
offset = 26 - 12
|
||||
macd_values = [
|
||||
ema12_series[offset + i] - ema26_series[i]
|
||||
for i in range(len(ema26_series))
|
||||
]
|
||||
|
||||
if len(macd_values) < 9:
|
||||
return None
|
||||
|
||||
# Signal line = EMA-9 of MACD values
|
||||
signal_series = _ema_series(macd_values, 9)
|
||||
|
||||
macd_line = round(macd_values[-1], 6)
|
||||
signal_line = round(signal_series[-1], 6)
|
||||
histogram = round(macd_line - signal_line, 6)
|
||||
return (macd_line, signal_line, histogram)
|
||||
|
||||
@staticmethod
|
||||
def _compute_bollinger(
|
||||
closes: list[float], period: int = 20, num_std: float = 2.0
|
||||
) -> tuple[float, float, float] | None:
|
||||
"""Compute Bollinger Bands (SMA ± *num_std* standard deviations).
|
||||
|
||||
Returns ``(upper, mid, lower)`` or ``None`` if insufficient data.
|
||||
"""
|
||||
if len(closes) < period:
|
||||
return None
|
||||
|
||||
window = closes[-period:]
|
||||
mid = sum(window) / period
|
||||
variance = sum((x - mid) ** 2 for x in window) / period
|
||||
std = math.sqrt(variance)
|
||||
|
||||
upper = round(mid + num_std * std, 6)
|
||||
mid_r = round(mid, 6)
|
||||
lower = round(mid - num_std * std, 6)
|
||||
return (upper, mid_r, lower)
|
||||
|
||||
@staticmethod
|
||||
def _compute_vwap(bars: list[OHLCVBar]) -> float | None:
|
||||
"""Compute the cumulative Volume-Weighted Average Price.
|
||||
|
||||
Typical price = (high + low + close) / 3.
|
||||
Returns ``None`` if no bars are provided.
|
||||
"""
|
||||
if not bars:
|
||||
return None
|
||||
|
||||
cum_tp_vol = 0.0
|
||||
cum_vol = 0.0
|
||||
for bar in bars:
|
||||
typical = (bar.high + bar.low + bar.close) / 3.0
|
||||
cum_tp_vol += typical * bar.volume
|
||||
cum_vol += bar.volume
|
||||
|
||||
if cum_vol == 0:
|
||||
return None
|
||||
|
||||
return round(cum_tp_vol / cum_vol, 6)
|
||||
|
||||
@staticmethod
|
||||
def _compute_atr(bars: list[OHLCVBar], period: int = 14) -> float | None:
|
||||
"""Compute the Average True Range.
|
||||
|
||||
Needs at least ``period + 1`` bars (to compute ``period`` true
|
||||
ranges). True range = max(H-L, |H-prevC|, |L-prevC|).
|
||||
Returns ``None`` if insufficient data.
|
||||
"""
|
||||
if len(bars) < period + 1:
|
||||
return None
|
||||
|
||||
true_ranges: list[float] = []
|
||||
for i in range(1, len(bars)):
|
||||
h = bars[i].high
|
||||
l = bars[i].low # noqa: E741
|
||||
prev_c = bars[i - 1].close
|
||||
tr = max(h - l, abs(h - prev_c), abs(l - prev_c))
|
||||
true_ranges.append(tr)
|
||||
|
||||
# Simple average of the last `period` true ranges
|
||||
recent = true_ranges[-period:]
|
||||
return round(sum(recent) / period, 6)
|
||||
|
|
|
|||
263
tests/test_indicators.py
Normal file
263
tests/test_indicators.py
Normal file
|
|
@ -0,0 +1,263 @@
|
|||
"""Tests for extended technical indicator computations in MarketDataManager."""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from services.signal_generator.market_data import MarketDataManager
|
||||
from shared.schemas.trading import OHLCVBar
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _bar(
|
||||
close: float,
|
||||
volume: float = 1000.0,
|
||||
high: float | None = None,
|
||||
low: float | None = None,
|
||||
open_: float | None = None,
|
||||
) -> OHLCVBar:
|
||||
"""Create a single OHLCVBar with sensible defaults."""
|
||||
return OHLCVBar(
|
||||
timestamp=datetime.now(tz=timezone.utc),
|
||||
open=open_ if open_ is not None else close,
|
||||
high=high if high is not None else close,
|
||||
low=low if low is not None else close,
|
||||
close=close,
|
||||
volume=volume,
|
||||
)
|
||||
|
||||
|
||||
def _add_bars(
|
||||
mgr: MarketDataManager,
|
||||
ticker: str,
|
||||
closes: list[float],
|
||||
volume: float = 1000.0,
|
||||
) -> None:
|
||||
"""Add multiple bars (one per close) to the manager."""
|
||||
for c in closes:
|
||||
mgr.add_bar(ticker, _bar(c, volume=volume))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# EMA
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestEMA:
|
||||
"""Tests for exponential moving average computation."""
|
||||
|
||||
def test_ema_returns_none_insufficient_data(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
_add_bars(mgr, "X", [100.0] * 5)
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.ema_9 is None
|
||||
|
||||
def test_ema_9_with_exact_data(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
_add_bars(mgr, "X", [100.0] * 9)
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.ema_9 == pytest.approx(100.0, abs=0.01)
|
||||
|
||||
def test_ema_responds_to_recent_prices(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
_add_bars(mgr, "X", [100.0] * 20 + [110.0])
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.ema_9 is not None
|
||||
assert 100.0 < snap.ema_9 < 110.0
|
||||
|
||||
def test_ema_21_returns_none_insufficient_data(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
_add_bars(mgr, "X", [100.0] * 15)
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.ema_21 is None
|
||||
|
||||
def test_ema_21_computed_with_enough_data(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
_add_bars(mgr, "X", [100.0] * 25)
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.ema_21 == pytest.approx(100.0, abs=0.01)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SMA-200
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSMA200:
|
||||
"""Tests for 200-period simple moving average."""
|
||||
|
||||
def test_sma_200_returns_none_insufficient_data(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
_add_bars(mgr, "X", [100.0] * 100)
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.sma_200 is None
|
||||
|
||||
def test_sma_200_computed_with_enough_data(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
_add_bars(mgr, "X", [100.0] * 200)
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.sma_200 == pytest.approx(100.0, abs=0.01)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MACD
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMACD:
|
||||
"""Tests for MACD(12,26,9) computation."""
|
||||
|
||||
def test_macd_returns_none_insufficient_data(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
_add_bars(mgr, "X", [100.0] * 20)
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.macd is None
|
||||
|
||||
def test_macd_computed_with_enough_data(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
_add_bars(mgr, "X", [100.0] * 40)
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.macd == pytest.approx(0.0, abs=0.01)
|
||||
|
||||
def test_macd_positive_in_uptrend(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
rising = [50.0 + i for i in range(50)]
|
||||
_add_bars(mgr, "X", rising)
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.macd is not None
|
||||
assert snap.macd > 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Bollinger Bands
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBollingerBands:
|
||||
"""Tests for Bollinger Bands (SMA-20 +/- 2 std dev)."""
|
||||
|
||||
def test_bollinger_returns_none_insufficient_data(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
_add_bars(mgr, "X", [100.0] * 10)
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.bollinger_upper is None
|
||||
|
||||
def test_bollinger_computed_with_enough_data(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
_add_bars(mgr, "X", [100.0] * 25)
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.bollinger_mid == pytest.approx(100.0, abs=0.01)
|
||||
assert snap.bollinger_upper is not None
|
||||
assert snap.bollinger_lower is not None
|
||||
assert snap.bollinger_upper >= snap.bollinger_mid
|
||||
assert snap.bollinger_lower <= snap.bollinger_mid
|
||||
|
||||
def test_bollinger_width_increases_with_volatility(self) -> None:
|
||||
# Stable prices -> narrow bands
|
||||
mgr_stable = MarketDataManager(max_bars=300)
|
||||
_add_bars(mgr_stable, "X", [100.0] * 25)
|
||||
snap_stable = mgr_stable.get_snapshot("X")
|
||||
|
||||
# Alternating prices -> wider bands
|
||||
mgr_volatile = MarketDataManager(max_bars=300)
|
||||
alternating = [100.0 + (10.0 if i % 2 == 0 else -10.0) for i in range(25)]
|
||||
_add_bars(mgr_volatile, "X", alternating)
|
||||
snap_volatile = mgr_volatile.get_snapshot("X")
|
||||
|
||||
assert snap_stable is not None and snap_volatile is not None
|
||||
assert snap_stable.bollinger_upper is not None
|
||||
assert snap_stable.bollinger_lower is not None
|
||||
assert snap_volatile.bollinger_upper is not None
|
||||
assert snap_volatile.bollinger_lower is not None
|
||||
|
||||
width_stable = snap_stable.bollinger_upper - snap_stable.bollinger_lower
|
||||
width_volatile = snap_volatile.bollinger_upper - snap_volatile.bollinger_lower
|
||||
assert width_volatile > width_stable
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# VWAP
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestVWAP:
|
||||
"""Tests for Volume-Weighted Average Price."""
|
||||
|
||||
def test_vwap_computed(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
for _ in range(5):
|
||||
mgr.add_bar("X", _bar(close=100.0, high=100.0, low=100.0, volume=1000.0))
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.vwap == pytest.approx(100.0, abs=0.01)
|
||||
|
||||
def test_vwap_weighted_by_volume(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
# Bar 1: typical price = (110+90+100)/3 = 100, volume = 100
|
||||
mgr.add_bar("X", _bar(close=100.0, high=110.0, low=90.0, volume=100.0))
|
||||
# Bar 2: typical price = (220+180+200)/3 = 200, volume = 900
|
||||
mgr.add_bar("X", _bar(close=200.0, high=220.0, low=180.0, volume=900.0))
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.vwap is not None
|
||||
# Expected: (100*100 + 200*900) / (100+900) = 190
|
||||
assert snap.vwap == pytest.approx(190.0, abs=0.01)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ATR
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestATR:
|
||||
"""Tests for Average True Range."""
|
||||
|
||||
def test_atr_returns_none_insufficient_data(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
for _ in range(5):
|
||||
mgr.add_bar("X", _bar(close=100.0, high=105.0, low=95.0))
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.atr is None
|
||||
|
||||
def test_atr_computed_with_enough_data(self) -> None:
|
||||
mgr = MarketDataManager(max_bars=300)
|
||||
for _ in range(15):
|
||||
mgr.add_bar("X", _bar(close=100.0, high=105.0, low=95.0))
|
||||
snap = mgr.get_snapshot("X")
|
||||
assert snap is not None
|
||||
assert snap.atr is not None
|
||||
assert snap.atr >= 0
|
||||
|
||||
def test_atr_increases_with_volatility(self) -> None:
|
||||
# Tight range
|
||||
mgr_tight = MarketDataManager(max_bars=300)
|
||||
for _ in range(15):
|
||||
mgr_tight.add_bar("X", _bar(close=100.0, high=101.0, low=99.0))
|
||||
snap_tight = mgr_tight.get_snapshot("X")
|
||||
|
||||
# Wide range
|
||||
mgr_wide = MarketDataManager(max_bars=300)
|
||||
for _ in range(15):
|
||||
mgr_wide.add_bar("X", _bar(close=100.0, high=120.0, low=80.0))
|
||||
snap_wide = mgr_wide.get_snapshot("X")
|
||||
|
||||
assert snap_tight is not None and snap_wide is not None
|
||||
assert snap_tight.atr is not None and snap_wide.atr is not None
|
||||
assert snap_wide.atr > snap_tight.atr
|
||||
Loading…
Add table
Add a link
Reference in a new issue