From 6c909d12c37ee6ac4144d469382c1991aadda1bd Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Mon, 23 Feb 2026 21:49:26 +0000 Subject: [PATCH] feat: add MACD, Bollinger, VWAP, ATR, EMA, SMA-200 indicator computations --- services/signal_generator/market_data.py | 159 +++++++++++++- tests/test_indicators.py | 263 +++++++++++++++++++++++ 2 files changed, 421 insertions(+), 1 deletion(-) create mode 100644 tests/test_indicators.py diff --git a/services/signal_generator/market_data.py b/services/signal_generator/market_data.py index c5898e6..4c69ef1 100644 --- a/services/signal_generator/market_data.py +++ b/services/signal_generator/market_data.py @@ -1,11 +1,13 @@ """In-memory market data manager with rolling OHLCV windows. Maintains a per-ticker deque of recent bars and computes technical -indicators (SMA, RSI) on demand when building ``MarketSnapshot`` objects. +indicators (SMA, RSI, EMA, MACD, Bollinger Bands, VWAP, ATR) on demand +when building ``MarketSnapshot`` objects. """ from __future__ import annotations +import math from collections import deque from typing import Any @@ -60,6 +62,17 @@ class MarketDataManager: latest = bars[-1] closes = [b.close for b in bars] + bar_list = list(bars) + + macd_result = self._compute_macd(closes) + macd_val = macd_signal = macd_hist = None + if macd_result: + macd_val, macd_signal, macd_hist = macd_result + + boll_result = self._compute_bollinger(closes) + boll_upper = boll_mid = boll_lower = None + if boll_result: + boll_upper, boll_mid, boll_lower = boll_result return MarketSnapshot( ticker=ticker, @@ -71,7 +84,18 @@ class MarketDataManager: volume=latest.volume, sma_20=self._compute_sma(closes, 20), sma_50=self._compute_sma(closes, 50), + sma_200=self._compute_sma(closes, 200), rsi=self._compute_rsi(closes, _RSI_PERIOD), + ema_9=self._compute_ema(closes, 9), + ema_21=self._compute_ema(closes, 21), + macd=macd_val, + macd_signal=macd_signal, + macd_histogram=macd_hist, + bollinger_upper=boll_upper, + bollinger_mid=boll_mid, + bollinger_lower=boll_lower, + vwap=self._compute_vwap(bar_list), + atr=self._compute_atr(bar_list), bars=[b.model_dump(mode="json") for b in bars], ) @@ -120,3 +144,136 @@ class MarketDataManager: rs = avg_gain / avg_loss rsi = 100.0 - (100.0 / (1.0 + rs)) return round(rsi, 4) + + @staticmethod + def _compute_ema(closes: list[float], period: int) -> float | None: + """Compute the exponential moving average over *closes*. + + Seeds with the SMA of the first *period* closes, then applies the + EMA multiplier ``2 / (period + 1)`` for each subsequent close. + Returns ``None`` if there are fewer than *period* data points. + """ + if len(closes) < period: + return None + + multiplier = 2.0 / (period + 1) + # Seed with SMA of first `period` values + ema = sum(closes[:period]) / period + for price in closes[period:]: + ema = (price - ema) * multiplier + ema + return round(ema, 6) + + @staticmethod + def _compute_macd( + closes: list[float], + ) -> tuple[float, float, float] | None: + """Compute MACD(12, 26, 9). + + Needs at least 35 closes (26 for slow EMA seed + 9 for signal + line). Returns ``(macd_line, signal_line, histogram)`` or + ``None``. + """ + if len(closes) < 35: + return None + + # Helper to compute a running EMA series from closes + def _ema_series(data: list[float], period: int) -> list[float]: + multiplier = 2.0 / (period + 1) + ema = sum(data[:period]) / period + result = [ema] + for val in data[period:]: + ema = (val - ema) * multiplier + ema + result.append(ema) + return result + + ema12_series = _ema_series(closes, 12) + ema26_series = _ema_series(closes, 26) + + # Align: ema12_series starts at index 12, ema26_series at index 26 + # After ema26 seed, we have len(closes)-26 subsequent values (+1 for seed) + # We need to align ema12 to the same time window as ema26 + # ema26 has values for indices 26..len(closes)-1 (total: len-26+1 entries incl seed at 26) + # ema12 has values for indices 12..len(closes)-1 (total: len-12+1 entries incl seed at 12) + # Offset into ema12 for index 26 = 26 - 12 = 14 + offset = 26 - 12 + macd_values = [ + ema12_series[offset + i] - ema26_series[i] + for i in range(len(ema26_series)) + ] + + if len(macd_values) < 9: + return None + + # Signal line = EMA-9 of MACD values + signal_series = _ema_series(macd_values, 9) + + macd_line = round(macd_values[-1], 6) + signal_line = round(signal_series[-1], 6) + histogram = round(macd_line - signal_line, 6) + return (macd_line, signal_line, histogram) + + @staticmethod + def _compute_bollinger( + closes: list[float], period: int = 20, num_std: float = 2.0 + ) -> tuple[float, float, float] | None: + """Compute Bollinger Bands (SMA ± *num_std* standard deviations). + + Returns ``(upper, mid, lower)`` or ``None`` if insufficient data. + """ + if len(closes) < period: + return None + + window = closes[-period:] + mid = sum(window) / period + variance = sum((x - mid) ** 2 for x in window) / period + std = math.sqrt(variance) + + upper = round(mid + num_std * std, 6) + mid_r = round(mid, 6) + lower = round(mid - num_std * std, 6) + return (upper, mid_r, lower) + + @staticmethod + def _compute_vwap(bars: list[OHLCVBar]) -> float | None: + """Compute the cumulative Volume-Weighted Average Price. + + Typical price = (high + low + close) / 3. + Returns ``None`` if no bars are provided. + """ + if not bars: + return None + + cum_tp_vol = 0.0 + cum_vol = 0.0 + for bar in bars: + typical = (bar.high + bar.low + bar.close) / 3.0 + cum_tp_vol += typical * bar.volume + cum_vol += bar.volume + + if cum_vol == 0: + return None + + return round(cum_tp_vol / cum_vol, 6) + + @staticmethod + def _compute_atr(bars: list[OHLCVBar], period: int = 14) -> float | None: + """Compute the Average True Range. + + Needs at least ``period + 1`` bars (to compute ``period`` true + ranges). True range = max(H-L, |H-prevC|, |L-prevC|). + Returns ``None`` if insufficient data. + """ + if len(bars) < period + 1: + return None + + true_ranges: list[float] = [] + for i in range(1, len(bars)): + h = bars[i].high + l = bars[i].low # noqa: E741 + prev_c = bars[i - 1].close + tr = max(h - l, abs(h - prev_c), abs(l - prev_c)) + true_ranges.append(tr) + + # Simple average of the last `period` true ranges + recent = true_ranges[-period:] + return round(sum(recent) / period, 6) diff --git a/tests/test_indicators.py b/tests/test_indicators.py new file mode 100644 index 0000000..9b14974 --- /dev/null +++ b/tests/test_indicators.py @@ -0,0 +1,263 @@ +"""Tests for extended technical indicator computations in MarketDataManager.""" + +from datetime import datetime, timezone + +import pytest + +from services.signal_generator.market_data import MarketDataManager +from shared.schemas.trading import OHLCVBar + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _bar( + close: float, + volume: float = 1000.0, + high: float | None = None, + low: float | None = None, + open_: float | None = None, +) -> OHLCVBar: + """Create a single OHLCVBar with sensible defaults.""" + return OHLCVBar( + timestamp=datetime.now(tz=timezone.utc), + open=open_ if open_ is not None else close, + high=high if high is not None else close, + low=low if low is not None else close, + close=close, + volume=volume, + ) + + +def _add_bars( + mgr: MarketDataManager, + ticker: str, + closes: list[float], + volume: float = 1000.0, +) -> None: + """Add multiple bars (one per close) to the manager.""" + for c in closes: + mgr.add_bar(ticker, _bar(c, volume=volume)) + + +# --------------------------------------------------------------------------- +# EMA +# --------------------------------------------------------------------------- + + +class TestEMA: + """Tests for exponential moving average computation.""" + + def test_ema_returns_none_insufficient_data(self) -> None: + mgr = MarketDataManager(max_bars=300) + _add_bars(mgr, "X", [100.0] * 5) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.ema_9 is None + + def test_ema_9_with_exact_data(self) -> None: + mgr = MarketDataManager(max_bars=300) + _add_bars(mgr, "X", [100.0] * 9) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.ema_9 == pytest.approx(100.0, abs=0.01) + + def test_ema_responds_to_recent_prices(self) -> None: + mgr = MarketDataManager(max_bars=300) + _add_bars(mgr, "X", [100.0] * 20 + [110.0]) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.ema_9 is not None + assert 100.0 < snap.ema_9 < 110.0 + + def test_ema_21_returns_none_insufficient_data(self) -> None: + mgr = MarketDataManager(max_bars=300) + _add_bars(mgr, "X", [100.0] * 15) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.ema_21 is None + + def test_ema_21_computed_with_enough_data(self) -> None: + mgr = MarketDataManager(max_bars=300) + _add_bars(mgr, "X", [100.0] * 25) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.ema_21 == pytest.approx(100.0, abs=0.01) + + +# --------------------------------------------------------------------------- +# SMA-200 +# --------------------------------------------------------------------------- + + +class TestSMA200: + """Tests for 200-period simple moving average.""" + + def test_sma_200_returns_none_insufficient_data(self) -> None: + mgr = MarketDataManager(max_bars=300) + _add_bars(mgr, "X", [100.0] * 100) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.sma_200 is None + + def test_sma_200_computed_with_enough_data(self) -> None: + mgr = MarketDataManager(max_bars=300) + _add_bars(mgr, "X", [100.0] * 200) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.sma_200 == pytest.approx(100.0, abs=0.01) + + +# --------------------------------------------------------------------------- +# MACD +# --------------------------------------------------------------------------- + + +class TestMACD: + """Tests for MACD(12,26,9) computation.""" + + def test_macd_returns_none_insufficient_data(self) -> None: + mgr = MarketDataManager(max_bars=300) + _add_bars(mgr, "X", [100.0] * 20) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.macd is None + + def test_macd_computed_with_enough_data(self) -> None: + mgr = MarketDataManager(max_bars=300) + _add_bars(mgr, "X", [100.0] * 40) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.macd == pytest.approx(0.0, abs=0.01) + + def test_macd_positive_in_uptrend(self) -> None: + mgr = MarketDataManager(max_bars=300) + rising = [50.0 + i for i in range(50)] + _add_bars(mgr, "X", rising) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.macd is not None + assert snap.macd > 0 + + +# --------------------------------------------------------------------------- +# Bollinger Bands +# --------------------------------------------------------------------------- + + +class TestBollingerBands: + """Tests for Bollinger Bands (SMA-20 +/- 2 std dev).""" + + def test_bollinger_returns_none_insufficient_data(self) -> None: + mgr = MarketDataManager(max_bars=300) + _add_bars(mgr, "X", [100.0] * 10) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.bollinger_upper is None + + def test_bollinger_computed_with_enough_data(self) -> None: + mgr = MarketDataManager(max_bars=300) + _add_bars(mgr, "X", [100.0] * 25) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.bollinger_mid == pytest.approx(100.0, abs=0.01) + assert snap.bollinger_upper is not None + assert snap.bollinger_lower is not None + assert snap.bollinger_upper >= snap.bollinger_mid + assert snap.bollinger_lower <= snap.bollinger_mid + + def test_bollinger_width_increases_with_volatility(self) -> None: + # Stable prices -> narrow bands + mgr_stable = MarketDataManager(max_bars=300) + _add_bars(mgr_stable, "X", [100.0] * 25) + snap_stable = mgr_stable.get_snapshot("X") + + # Alternating prices -> wider bands + mgr_volatile = MarketDataManager(max_bars=300) + alternating = [100.0 + (10.0 if i % 2 == 0 else -10.0) for i in range(25)] + _add_bars(mgr_volatile, "X", alternating) + snap_volatile = mgr_volatile.get_snapshot("X") + + assert snap_stable is not None and snap_volatile is not None + assert snap_stable.bollinger_upper is not None + assert snap_stable.bollinger_lower is not None + assert snap_volatile.bollinger_upper is not None + assert snap_volatile.bollinger_lower is not None + + width_stable = snap_stable.bollinger_upper - snap_stable.bollinger_lower + width_volatile = snap_volatile.bollinger_upper - snap_volatile.bollinger_lower + assert width_volatile > width_stable + + +# --------------------------------------------------------------------------- +# VWAP +# --------------------------------------------------------------------------- + + +class TestVWAP: + """Tests for Volume-Weighted Average Price.""" + + def test_vwap_computed(self) -> None: + mgr = MarketDataManager(max_bars=300) + for _ in range(5): + mgr.add_bar("X", _bar(close=100.0, high=100.0, low=100.0, volume=1000.0)) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.vwap == pytest.approx(100.0, abs=0.01) + + def test_vwap_weighted_by_volume(self) -> None: + mgr = MarketDataManager(max_bars=300) + # Bar 1: typical price = (110+90+100)/3 = 100, volume = 100 + mgr.add_bar("X", _bar(close=100.0, high=110.0, low=90.0, volume=100.0)) + # Bar 2: typical price = (220+180+200)/3 = 200, volume = 900 + mgr.add_bar("X", _bar(close=200.0, high=220.0, low=180.0, volume=900.0)) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.vwap is not None + # Expected: (100*100 + 200*900) / (100+900) = 190 + assert snap.vwap == pytest.approx(190.0, abs=0.01) + + +# --------------------------------------------------------------------------- +# ATR +# --------------------------------------------------------------------------- + + +class TestATR: + """Tests for Average True Range.""" + + def test_atr_returns_none_insufficient_data(self) -> None: + mgr = MarketDataManager(max_bars=300) + for _ in range(5): + mgr.add_bar("X", _bar(close=100.0, high=105.0, low=95.0)) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.atr is None + + def test_atr_computed_with_enough_data(self) -> None: + mgr = MarketDataManager(max_bars=300) + for _ in range(15): + mgr.add_bar("X", _bar(close=100.0, high=105.0, low=95.0)) + snap = mgr.get_snapshot("X") + assert snap is not None + assert snap.atr is not None + assert snap.atr >= 0 + + def test_atr_increases_with_volatility(self) -> None: + # Tight range + mgr_tight = MarketDataManager(max_bars=300) + for _ in range(15): + mgr_tight.add_bar("X", _bar(close=100.0, high=101.0, low=99.0)) + snap_tight = mgr_tight.get_snapshot("X") + + # Wide range + mgr_wide = MarketDataManager(max_bars=300) + for _ in range(15): + mgr_wide.add_bar("X", _bar(close=100.0, high=120.0, low=80.0)) + snap_wide = mgr_wide.get_snapshot("X") + + assert snap_tight is not None and snap_wide is not None + assert snap_tight.atr is not None and snap_wide.atr is not None + assert snap_wide.atr > snap_tight.atr