feat(trade-executor): defer outside_market_hours signals to next open

Kevin's signals are mid-long term (weeks/months) and he uploads almost
exclusively pre-market or evenings. Before this change, every such
signal hit RiskManager.outside_market_hours, got consumed off the
Redis stream, and was lost. End result: 71 emitted signals, 0 trades.

New behaviour: when RiskManager rejects with outside_market_hours,
push the signal into a Redis sorted-set keyed by next_market_open
(via Alpaca's clock API — handles weekends + holidays). A background
drain task polls the set every kevin_defer_drain_interval_s (60s);
any signal whose target <= now gets re-run through process_signal.

Safety:
  - kevin_max_defer_hours (default 72h) caps signal staleness so we
    don't trade on week-old views.
  - Other RiskManager rejections (cooldown, kill-switch, drawdown
    halt) fall through to the existing drop path.
  - kevin_defer_outside_market_hours toggle defaults True; flip to
    false for legacy behaviour.

Slack: new notify_deferred() emits "🕒 Meet Kevin: DEFERRED
NVDA until Mon 13:30 UTC (market closed; conviction 0.85)" instead
of the noisy outside_market_hours rejection spam.

Tests: 5 queue + 4 integration = 9 new, all 32 trade-executor tests
GREEN.
This commit is contained in:
Viktor Barzin 2026-06-01 19:01:37 +00:00
parent 00a40c9d2f
commit 2855e79af4
6 changed files with 440 additions and 0 deletions

View file

@ -30,4 +30,14 @@ class TradeExecutorConfig(BaseConfig):
slack_bot_token: str = "" slack_bot_token: str = ""
slack_channel: str = "" slack_channel: str = ""
# Kevin v2: defer signals that arrive outside US market hours into a
# Redis sorted-set and drain at next market open. Kevin's signals are
# mid/long-term — a Sunday-evening signal should turn into a Monday
# paper trade, not get dropped. Cap to 72h so we don't replay
# week-stale signals.
kevin_defer_outside_market_hours: bool = True
kevin_max_defer_hours: float = 72.0
# Drain task polls the deferred queue every kevin_defer_drain_interval_s.
kevin_defer_drain_interval_s: int = 60
model_config = {"env_prefix": "TRADING_"} model_config = {"env_prefix": "TRADING_"}

View file

@ -0,0 +1,76 @@
"""Defer signals that arrive outside US market hours, drain at next market open.
Kevin's signals are mid/long-term (weeks/months). A signal that lands at
Sunday 19:00 UTC should turn into a Monday-morning paper trade, not be
dropped on the floor. This queue holds (signal, target_submission_ts)
pairs in a Redis sorted-set keyed by target_submission_ts.
A background drain task in trade-executor's run() loop polls pop_due()
every ~60 s; signals whose target_submission_ts <= now are reprocessed
through the normal process_signal path.
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from redis.asyncio import Redis
from shared.schemas.trading import TradeSignal
logger = logging.getLogger(__name__)
DEFERRED_KEY = "kevin:deferred_signals"
class DeferredSignalQueue:
"""Redis-sorted-set wrapper. Score = target_submission_ts (epoch seconds).
Member = JSON of {"signal": <TradeSignal.model_dump>, "queued_at": <ISO>}.
"""
def __init__(self, redis: Redis) -> None:
self.redis = redis
async def defer(self, signal: TradeSignal, target_ts: datetime) -> None:
"""Add a signal to the queue, scheduled for re-submission at
``target_ts`` (usually the next market-open timestamp).
"""
member = json.dumps(
{
"signal": signal.model_dump(mode="json"),
"queued_at": datetime.now(timezone.utc).isoformat(),
}
)
await self.redis.zadd(DEFERRED_KEY, {member: target_ts.timestamp()})
async def pop_due(
self, now: datetime | None = None
) -> list[tuple[TradeSignal, datetime]]:
"""Atomically pop and return every signal whose target_ts <= now.
Returns: list of (TradeSignal, queued_at) pairs.
"""
now = now or datetime.now(timezone.utc)
cutoff = now.timestamp()
async with self.redis.pipeline(transaction=True) as pipe:
pipe.zrangebyscore(DEFERRED_KEY, min="-inf", max=cutoff)
pipe.zremrangebyscore(DEFERRED_KEY, min="-inf", max=cutoff)
members, _removed = await pipe.execute()
result: list[tuple[TradeSignal, datetime]] = []
for raw in members:
try:
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
payload = json.loads(raw)
signal = TradeSignal.model_validate(payload["signal"])
queued_at = datetime.fromisoformat(payload["queued_at"])
result.append((signal, queued_at))
except Exception:
logger.exception("Failed to deserialize deferred signal: %s", raw)
return result
async def size(self) -> int:
return await self.redis.zcard(DEFERRED_KEY)

View file

@ -13,11 +13,13 @@ import logging
import signal import signal
import time import time
import uuid import uuid
from datetime import datetime, timezone
from redis.asyncio import Redis from redis.asyncio import Redis
from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.ext.asyncio import async_sessionmaker
from services.trade_executor.config import TradeExecutorConfig from services.trade_executor.config import TradeExecutorConfig
from services.trade_executor.deferred_queue import DeferredSignalQueue
from services.trade_executor.risk_manager import RiskManager from services.trade_executor.risk_manager import RiskManager
from services.trade_executor.slack_notifier import SlackNotifier from services.trade_executor.slack_notifier import SlackNotifier
from shared.broker.alpaca_broker import AlpacaBroker from shared.broker.alpaca_broker import AlpacaBroker
@ -39,6 +41,21 @@ from shared.telemetry import setup_telemetry
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def _next_market_open(broker: AlpacaBroker) -> datetime:
"""Alpaca's clock API knows weekends + holidays; use it instead of
hardcoding 9:30 ET."""
from datetime import timedelta
clock = await asyncio.to_thread(broker._client.get_clock)
next_open = getattr(clock, "next_open", None)
if next_open is None:
# Defensive fallback — defer by 1 hour and re-check.
return datetime.now(timezone.utc) + timedelta(hours=1)
if next_open.tzinfo is None:
next_open = next_open.replace(tzinfo=timezone.utc)
return next_open.astimezone(timezone.utc)
async def process_signal( async def process_signal(
signal: TradeSignal, signal: TradeSignal,
risk_manager: RiskManager, risk_manager: RiskManager,
@ -47,6 +64,8 @@ async def process_signal(
counters: dict, counters: dict,
db_session_factory: async_sessionmaker | None = None, db_session_factory: async_sessionmaker | None = None,
slack_notifier: SlackNotifier | None = None, slack_notifier: SlackNotifier | None = None,
deferred_queue: DeferredSignalQueue | None = None,
config: TradeExecutorConfig | None = None,
) -> None: ) -> None:
"""Process a single trade signal: risk check, order, record, publish. """Process a single trade signal: risk check, order, record, publish.
@ -68,6 +87,36 @@ async def process_signal(
# --- Step 1: risk check --- # --- Step 1: risk check ---
approved, reason = await risk_manager.check_risk(signal) approved, reason = await risk_manager.check_risk(signal)
if not approved: if not approved:
# v2: defer outside-market-hours instead of dropping. Kevin's
# signals are mid/long-term so a Sunday-evening signal should turn
# into a Monday paper trade. Skip if signal is already stale beyond
# kevin_max_defer_hours.
if (
reason == "outside_market_hours"
and deferred_queue is not None
and config is not None
and config.kevin_defer_outside_market_hours
):
age_seconds = (
datetime.now(timezone.utc) - signal.timestamp
).total_seconds()
max_defer_seconds = config.kevin_max_defer_hours * 3600
if age_seconds < max_defer_seconds:
target = await _next_market_open(broker)
await deferred_queue.defer(signal, target)
logger.info(
"Signal DEFERRED for %s until %s", signal.ticker, target.isoformat()
)
counters["rejections"].add(1, {"reason": "deferred"})
if slack_notifier is not None:
await slack_notifier.notify_deferred(signal, target)
return
logger.info(
"Signal NOT DEFERRED for %s — age %.1fh exceeds max_defer_hours %.1fh",
signal.ticker,
age_seconds / 3600,
config.kevin_max_defer_hours,
)
logger.info("Signal REJECTED for %s: %s", signal.ticker, reason) logger.info("Signal REJECTED for %s: %s", signal.ticker, reason)
counters["rejections"].add(1, {"reason": reason.split(" ")[0]}) counters["rejections"].add(1, {"reason": reason.split(" ")[0]})
if slack_notifier is not None: if slack_notifier is not None:
@ -228,12 +277,59 @@ async def run(config: TradeExecutorConfig | None = None) -> None:
logger.info("Consuming from signals:generated, publishing to trades:executed") logger.info("Consuming from signals:generated, publishing to trades:executed")
# --- Deferred-signal queue + drain task ---
deferred_queue = DeferredSignalQueue(redis)
if config.kevin_defer_outside_market_hours:
logger.info(
"Deferred-signal queue enabled (max_defer=%.1fh, drain_interval=%ds)",
config.kevin_max_defer_hours,
config.kevin_defer_drain_interval_s,
)
# Graceful shutdown on SIGTERM/SIGINT # Graceful shutdown on SIGTERM/SIGINT
shutdown_event = asyncio.Event() shutdown_event = asyncio.Event()
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT): for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, shutdown_event.set) loop.add_signal_handler(sig, shutdown_event.set)
async def _drain_deferred_loop() -> None:
"""Poll the deferred-signal sorted-set every drain_interval_s and
re-run process_signal for any due signals."""
while not shutdown_event.is_set():
try:
due = await deferred_queue.pop_due()
for sig_obj, _queued_at in due:
logger.info("Draining deferred signal for %s", sig_obj.ticker)
try:
await process_signal(
sig_obj,
risk_manager,
broker,
publisher,
counters,
db_session_factory,
slack_notifier,
deferred_queue,
config,
)
except Exception:
logger.exception("Drained signal processing failed")
except Exception:
logger.exception("Drain loop error")
try:
await asyncio.wait_for(
shutdown_event.wait(),
timeout=config.kevin_defer_drain_interval_s,
)
except asyncio.TimeoutError:
continue
drain_task = (
asyncio.create_task(_drain_deferred_loop())
if config.kevin_defer_outside_market_hours
else None
)
# --- Consume loop --- # --- Consume loop ---
try: try:
async for _msg_id, data in consumer.consume(): async for _msg_id, data in consumer.consume():
@ -249,10 +345,18 @@ async def run(config: TradeExecutorConfig | None = None) -> None:
counters, counters,
db_session_factory, db_session_factory,
slack_notifier, slack_notifier,
deferred_queue,
config,
) )
except Exception: except Exception:
logger.exception("Error processing signal: %s", data) logger.exception("Error processing signal: %s", data)
finally: finally:
if drain_task is not None:
drain_task.cancel()
try:
await drain_task
except (asyncio.CancelledError, Exception):
pass
await redis.aclose() await redis.aclose()
logger.info("Trade executor stopped gracefully") logger.info("Trade executor stopped gracefully")

View file

@ -76,6 +76,17 @@ class SlackNotifier:
text = self._format_rejection(signal, reason) text = self._format_rejection(signal, reason)
await self._post(text) await self._post(text)
async def notify_deferred(self, signal: TradeSignal, target_ts) -> None:
if not self.enabled:
return
tag = self._strategy_tag(signal)
when = target_ts.strftime("%a %H:%M UTC") if target_ts else "?"
text = (
f":clock3: *{tag}*: DEFERRED {signal.ticker} until {when} "
f"(market closed; conviction {signal.strength:.2f})"
)
await self._post(text)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Internal # Internal
# ------------------------------------------------------------------ # ------------------------------------------------------------------

View file

@ -0,0 +1,138 @@
"""process_signal integration: defer outside_market_hours instead of dropping."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock, patch
from uuid import uuid4
import fakeredis.aioredis
import pytest
from services.trade_executor.config import TradeExecutorConfig
from services.trade_executor.deferred_queue import DeferredSignalQueue
from services.trade_executor.main import process_signal
from shared.schemas.trading import SignalDirection, TradeSignal
@pytest.fixture
def cfg() -> TradeExecutorConfig:
return TradeExecutorConfig(
kevin_defer_outside_market_hours=True,
kevin_max_defer_hours=72.0,
kevin_defer_drain_interval_s=60,
alpaca_api_key="t",
alpaca_secret_key="t",
)
@pytest.fixture
async def queue():
r = fakeredis.aioredis.FakeRedis()
try:
yield DeferredSignalQueue(r)
finally:
await r.aclose()
def _signal(age_hours: float = 0.0) -> TradeSignal:
return TradeSignal(
signal_id=uuid4(),
ticker="NVDA",
direction=SignalDirection.LONG,
strength=0.8,
strategy_sources=["meet_kevin:buy:0.8"],
timestamp=datetime.now(timezone.utc) - timedelta(hours=age_hours),
target_dollars=Decimal("2000"),
)
def _counters():
return {
"trades_executed": MagicMock(),
"rejections": MagicMock(),
"fill_latency": MagicMock(),
}
async def test_outside_market_hours_signal_is_deferred(cfg, queue):
rm = MagicMock()
rm.check_risk = AsyncMock(return_value=(False, "outside_market_hours"))
broker = MagicMock()
next_open = datetime.now(timezone.utc) + timedelta(hours=12)
with patch(
"services.trade_executor.main._next_market_open",
AsyncMock(return_value=next_open),
):
await process_signal(
_signal(),
rm,
broker,
publisher=AsyncMock(),
counters=_counters(),
db_session_factory=None,
slack_notifier=None,
deferred_queue=queue,
config=cfg,
)
assert await queue.size() == 1
async def test_stale_signal_not_deferred(cfg, queue):
"""Signal older than max_defer_hours falls through to normal rejection."""
cfg = cfg.model_copy(update={"kevin_max_defer_hours": 24.0})
rm = MagicMock()
rm.check_risk = AsyncMock(return_value=(False, "outside_market_hours"))
with patch(
"services.trade_executor.main._next_market_open",
AsyncMock(return_value=datetime.now(timezone.utc) + timedelta(hours=12)),
):
await process_signal(
_signal(age_hours=48), # 48h > 24h cap
rm,
broker=MagicMock(),
publisher=AsyncMock(),
counters=_counters(),
db_session_factory=None,
slack_notifier=None,
deferred_queue=queue,
config=cfg,
)
assert await queue.size() == 0
async def test_non_market_hours_rejection_not_deferred(cfg, queue):
"""Other rejections (e.g. cooldown, kill switch) don't defer."""
rm = MagicMock()
rm.check_risk = AsyncMock(return_value=(False, "cooldown_active (5m remaining)"))
await process_signal(
_signal(),
rm,
broker=MagicMock(),
publisher=AsyncMock(),
counters=_counters(),
db_session_factory=None,
slack_notifier=None,
deferred_queue=queue,
config=cfg,
)
assert await queue.size() == 0
async def test_defer_disabled_falls_through(cfg, queue):
cfg = cfg.model_copy(update={"kevin_defer_outside_market_hours": False})
rm = MagicMock()
rm.check_risk = AsyncMock(return_value=(False, "outside_market_hours"))
await process_signal(
_signal(),
rm,
broker=MagicMock(),
publisher=AsyncMock(),
counters=_counters(),
db_session_factory=None,
slack_notifier=None,
deferred_queue=queue,
config=cfg,
)
assert await queue.size() == 0

View file

@ -0,0 +1,101 @@
"""Tests for the DeferredSignalQueue.
Kevin's signals are mid/long-term — a signal that lands outside US market
hours should be held in a Redis sorted-set and replayed at the next market
open instead of dropped.
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from uuid import uuid4
import fakeredis.aioredis
import pytest
from services.trade_executor.deferred_queue import DeferredSignalQueue
from shared.schemas.trading import SignalDirection, TradeSignal
def _signal(ticker: str = "NVDA") -> TradeSignal:
return TradeSignal(
signal_id=uuid4(),
ticker=ticker,
direction=SignalDirection.LONG,
strength=0.85,
strategy_sources=["meet_kevin:buy:0.85"],
timestamp=datetime.now(timezone.utc),
target_dollars=Decimal("2000"),
)
@pytest.fixture
async def redis():
r = fakeredis.aioredis.FakeRedis()
try:
yield r
finally:
await r.aclose()
async def test_defer_then_pop_due_returns_signal(redis):
q = DeferredSignalQueue(redis)
s = _signal()
past = datetime.now(timezone.utc) - timedelta(minutes=1)
await q.defer(s, target_ts=past)
due = await q.pop_due()
assert len(due) == 1
popped_signal, _queued_at = due[0]
assert popped_signal.ticker == "NVDA"
assert popped_signal.signal_id == s.signal_id
async def test_pop_due_skips_future_targets(redis):
q = DeferredSignalQueue(redis)
s_now = _signal("NVDA")
s_future = _signal("AMD")
now = datetime.now(timezone.utc)
await q.defer(s_now, target_ts=now - timedelta(seconds=10))
await q.defer(s_future, target_ts=now + timedelta(hours=12))
due = await q.pop_due()
assert [s.ticker for s, _ in due] == ["NVDA"]
remaining = await q.size()
assert remaining == 1
async def test_pop_due_is_atomic_pop(redis):
"""Once popped, a signal must NOT come back on the next pop."""
q = DeferredSignalQueue(redis)
await q.defer(_signal(), target_ts=datetime.now(timezone.utc) - timedelta(seconds=1))
first = await q.pop_due()
assert len(first) == 1
second = await q.pop_due()
assert second == []
async def test_size_zero_when_empty(redis):
q = DeferredSignalQueue(redis)
assert await q.size() == 0
async def test_defer_preserves_signal_age(redis):
"""The original TradeSignal.timestamp (which the strategy uses for
mention-age cutoffs) must survive the round-trip."""
q = DeferredSignalQueue(redis)
orig_ts = datetime(2026, 6, 1, 15, 0, tzinfo=timezone.utc)
s = TradeSignal(
signal_id=uuid4(),
ticker="MSFT",
direction=SignalDirection.LONG,
strength=0.7,
strategy_sources=["meet_kevin"],
timestamp=orig_ts,
)
await q.defer(s, target_ts=datetime.now(timezone.utc) - timedelta(seconds=1))
[(popped, _)] = await q.pop_due()
assert popped.timestamp == orig_ts