trading/tests/services/trade_executor/test_defer_integration.py
Viktor Barzin 2855e79af4 feat(trade-executor): defer outside_market_hours signals to next open
Kevin's signals are mid-long term (weeks/months) and he uploads almost
exclusively pre-market or evenings. Before this change, every such
signal hit RiskManager.outside_market_hours, got consumed off the
Redis stream, and was lost. End result: 71 emitted signals, 0 trades.

New behaviour: when RiskManager rejects with outside_market_hours,
push the signal into a Redis sorted-set keyed by next_market_open
(via Alpaca's clock API — handles weekends + holidays). A background
drain task polls the set every kevin_defer_drain_interval_s (60s);
any signal whose target <= now gets re-run through process_signal.

Safety:
  - kevin_max_defer_hours (default 72h) caps signal staleness so we
    don't trade on week-old views.
  - Other RiskManager rejections (cooldown, kill-switch, drawdown
    halt) fall through to the existing drop path.
  - kevin_defer_outside_market_hours toggle defaults True; flip to
    false for legacy behaviour.

Slack: new notify_deferred() emits "🕒 Meet Kevin: DEFERRED
NVDA until Mon 13:30 UTC (market closed; conviction 0.85)" instead
of the noisy outside_market_hours rejection spam.

Tests: 5 queue + 4 integration = 9 new, all 32 trade-executor tests
GREEN.
2026-06-01 19:01:37 +00:00

138 lines
4 KiB
Python

"""process_signal integration: defer outside_market_hours instead of dropping."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock, patch
from uuid import uuid4
import fakeredis.aioredis
import pytest
from services.trade_executor.config import TradeExecutorConfig
from services.trade_executor.deferred_queue import DeferredSignalQueue
from services.trade_executor.main import process_signal
from shared.schemas.trading import SignalDirection, TradeSignal
@pytest.fixture
def cfg() -> TradeExecutorConfig:
return TradeExecutorConfig(
kevin_defer_outside_market_hours=True,
kevin_max_defer_hours=72.0,
kevin_defer_drain_interval_s=60,
alpaca_api_key="t",
alpaca_secret_key="t",
)
@pytest.fixture
async def queue():
r = fakeredis.aioredis.FakeRedis()
try:
yield DeferredSignalQueue(r)
finally:
await r.aclose()
def _signal(age_hours: float = 0.0) -> TradeSignal:
return TradeSignal(
signal_id=uuid4(),
ticker="NVDA",
direction=SignalDirection.LONG,
strength=0.8,
strategy_sources=["meet_kevin:buy:0.8"],
timestamp=datetime.now(timezone.utc) - timedelta(hours=age_hours),
target_dollars=Decimal("2000"),
)
def _counters():
return {
"trades_executed": MagicMock(),
"rejections": MagicMock(),
"fill_latency": MagicMock(),
}
async def test_outside_market_hours_signal_is_deferred(cfg, queue):
rm = MagicMock()
rm.check_risk = AsyncMock(return_value=(False, "outside_market_hours"))
broker = MagicMock()
next_open = datetime.now(timezone.utc) + timedelta(hours=12)
with patch(
"services.trade_executor.main._next_market_open",
AsyncMock(return_value=next_open),
):
await process_signal(
_signal(),
rm,
broker,
publisher=AsyncMock(),
counters=_counters(),
db_session_factory=None,
slack_notifier=None,
deferred_queue=queue,
config=cfg,
)
assert await queue.size() == 1
async def test_stale_signal_not_deferred(cfg, queue):
"""Signal older than max_defer_hours falls through to normal rejection."""
cfg = cfg.model_copy(update={"kevin_max_defer_hours": 24.0})
rm = MagicMock()
rm.check_risk = AsyncMock(return_value=(False, "outside_market_hours"))
with patch(
"services.trade_executor.main._next_market_open",
AsyncMock(return_value=datetime.now(timezone.utc) + timedelta(hours=12)),
):
await process_signal(
_signal(age_hours=48), # 48h > 24h cap
rm,
broker=MagicMock(),
publisher=AsyncMock(),
counters=_counters(),
db_session_factory=None,
slack_notifier=None,
deferred_queue=queue,
config=cfg,
)
assert await queue.size() == 0
async def test_non_market_hours_rejection_not_deferred(cfg, queue):
"""Other rejections (e.g. cooldown, kill switch) don't defer."""
rm = MagicMock()
rm.check_risk = AsyncMock(return_value=(False, "cooldown_active (5m remaining)"))
await process_signal(
_signal(),
rm,
broker=MagicMock(),
publisher=AsyncMock(),
counters=_counters(),
db_session_factory=None,
slack_notifier=None,
deferred_queue=queue,
config=cfg,
)
assert await queue.size() == 0
async def test_defer_disabled_falls_through(cfg, queue):
cfg = cfg.model_copy(update={"kevin_defer_outside_market_hours": False})
rm = MagicMock()
rm.check_risk = AsyncMock(return_value=(False, "outside_market_hours"))
await process_signal(
_signal(),
rm,
broker=MagicMock(),
publisher=AsyncMock(),
counters=_counters(),
db_session_factory=None,
slack_notifier=None,
deferred_queue=queue,
config=cfg,
)
assert await queue.size() == 0