diff --git a/services/trade_executor/config.py b/services/trade_executor/config.py index 6e39b27..92a2421 100644 --- a/services/trade_executor/config.py +++ b/services/trade_executor/config.py @@ -30,4 +30,14 @@ class TradeExecutorConfig(BaseConfig): slack_bot_token: str = "" slack_channel: str = "" + # Kevin v2: defer signals that arrive outside US market hours into a + # Redis sorted-set and drain at next market open. Kevin's signals are + # mid/long-term — a Sunday-evening signal should turn into a Monday + # paper trade, not get dropped. Cap to 72h so we don't replay + # week-stale signals. + kevin_defer_outside_market_hours: bool = True + kevin_max_defer_hours: float = 72.0 + # Drain task polls the deferred queue every kevin_defer_drain_interval_s. + kevin_defer_drain_interval_s: int = 60 + model_config = {"env_prefix": "TRADING_"} diff --git a/services/trade_executor/deferred_queue.py b/services/trade_executor/deferred_queue.py new file mode 100644 index 0000000..125eb42 --- /dev/null +++ b/services/trade_executor/deferred_queue.py @@ -0,0 +1,76 @@ +"""Defer signals that arrive outside US market hours, drain at next market open. + +Kevin's signals are mid/long-term (weeks/months). A signal that lands at +Sunday 19:00 UTC should turn into a Monday-morning paper trade, not be +dropped on the floor. This queue holds (signal, target_submission_ts) +pairs in a Redis sorted-set keyed by target_submission_ts. + +A background drain task in trade-executor's run() loop polls pop_due() +every ~60 s; signals whose target_submission_ts <= now are reprocessed +through the normal process_signal path. +""" + +from __future__ import annotations + +import json +import logging +from datetime import datetime, timezone + +from redis.asyncio import Redis + +from shared.schemas.trading import TradeSignal + +logger = logging.getLogger(__name__) + +DEFERRED_KEY = "kevin:deferred_signals" + + +class DeferredSignalQueue: + """Redis-sorted-set wrapper. Score = target_submission_ts (epoch seconds). + Member = JSON of {"signal": , "queued_at": }. + """ + + def __init__(self, redis: Redis) -> None: + self.redis = redis + + async def defer(self, signal: TradeSignal, target_ts: datetime) -> None: + """Add a signal to the queue, scheduled for re-submission at + ``target_ts`` (usually the next market-open timestamp). + """ + member = json.dumps( + { + "signal": signal.model_dump(mode="json"), + "queued_at": datetime.now(timezone.utc).isoformat(), + } + ) + await self.redis.zadd(DEFERRED_KEY, {member: target_ts.timestamp()}) + + async def pop_due( + self, now: datetime | None = None + ) -> list[tuple[TradeSignal, datetime]]: + """Atomically pop and return every signal whose target_ts <= now. + + Returns: list of (TradeSignal, queued_at) pairs. + """ + now = now or datetime.now(timezone.utc) + cutoff = now.timestamp() + async with self.redis.pipeline(transaction=True) as pipe: + pipe.zrangebyscore(DEFERRED_KEY, min="-inf", max=cutoff) + pipe.zremrangebyscore(DEFERRED_KEY, min="-inf", max=cutoff) + members, _removed = await pipe.execute() + + result: list[tuple[TradeSignal, datetime]] = [] + for raw in members: + try: + if isinstance(raw, bytes): + raw = raw.decode("utf-8") + payload = json.loads(raw) + signal = TradeSignal.model_validate(payload["signal"]) + queued_at = datetime.fromisoformat(payload["queued_at"]) + result.append((signal, queued_at)) + except Exception: + logger.exception("Failed to deserialize deferred signal: %s", raw) + return result + + async def size(self) -> int: + return await self.redis.zcard(DEFERRED_KEY) diff --git a/services/trade_executor/main.py b/services/trade_executor/main.py index 209c4ce..a241e7d 100644 --- a/services/trade_executor/main.py +++ b/services/trade_executor/main.py @@ -13,11 +13,13 @@ import logging import signal import time import uuid +from datetime import datetime, timezone from redis.asyncio import Redis from sqlalchemy.ext.asyncio import async_sessionmaker from services.trade_executor.config import TradeExecutorConfig +from services.trade_executor.deferred_queue import DeferredSignalQueue from services.trade_executor.risk_manager import RiskManager from services.trade_executor.slack_notifier import SlackNotifier from shared.broker.alpaca_broker import AlpacaBroker @@ -39,6 +41,21 @@ from shared.telemetry import setup_telemetry logger = logging.getLogger(__name__) +async def _next_market_open(broker: AlpacaBroker) -> datetime: + """Alpaca's clock API knows weekends + holidays; use it instead of + hardcoding 9:30 ET.""" + from datetime import timedelta + + clock = await asyncio.to_thread(broker._client.get_clock) + next_open = getattr(clock, "next_open", None) + if next_open is None: + # Defensive fallback — defer by 1 hour and re-check. + return datetime.now(timezone.utc) + timedelta(hours=1) + if next_open.tzinfo is None: + next_open = next_open.replace(tzinfo=timezone.utc) + return next_open.astimezone(timezone.utc) + + async def process_signal( signal: TradeSignal, risk_manager: RiskManager, @@ -47,6 +64,8 @@ async def process_signal( counters: dict, db_session_factory: async_sessionmaker | None = None, slack_notifier: SlackNotifier | None = None, + deferred_queue: DeferredSignalQueue | None = None, + config: TradeExecutorConfig | None = None, ) -> None: """Process a single trade signal: risk check, order, record, publish. @@ -68,6 +87,36 @@ async def process_signal( # --- Step 1: risk check --- approved, reason = await risk_manager.check_risk(signal) if not approved: + # v2: defer outside-market-hours instead of dropping. Kevin's + # signals are mid/long-term so a Sunday-evening signal should turn + # into a Monday paper trade. Skip if signal is already stale beyond + # kevin_max_defer_hours. + if ( + reason == "outside_market_hours" + and deferred_queue is not None + and config is not None + and config.kevin_defer_outside_market_hours + ): + age_seconds = ( + datetime.now(timezone.utc) - signal.timestamp + ).total_seconds() + max_defer_seconds = config.kevin_max_defer_hours * 3600 + if age_seconds < max_defer_seconds: + target = await _next_market_open(broker) + await deferred_queue.defer(signal, target) + logger.info( + "Signal DEFERRED for %s until %s", signal.ticker, target.isoformat() + ) + counters["rejections"].add(1, {"reason": "deferred"}) + if slack_notifier is not None: + await slack_notifier.notify_deferred(signal, target) + return + logger.info( + "Signal NOT DEFERRED for %s — age %.1fh exceeds max_defer_hours %.1fh", + signal.ticker, + age_seconds / 3600, + config.kevin_max_defer_hours, + ) logger.info("Signal REJECTED for %s: %s", signal.ticker, reason) counters["rejections"].add(1, {"reason": reason.split(" ")[0]}) if slack_notifier is not None: @@ -228,12 +277,59 @@ async def run(config: TradeExecutorConfig | None = None) -> None: logger.info("Consuming from signals:generated, publishing to trades:executed") + # --- Deferred-signal queue + drain task --- + deferred_queue = DeferredSignalQueue(redis) + if config.kevin_defer_outside_market_hours: + logger.info( + "Deferred-signal queue enabled (max_defer=%.1fh, drain_interval=%ds)", + config.kevin_max_defer_hours, + config.kevin_defer_drain_interval_s, + ) + # Graceful shutdown on SIGTERM/SIGINT shutdown_event = asyncio.Event() loop = asyncio.get_running_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, shutdown_event.set) + async def _drain_deferred_loop() -> None: + """Poll the deferred-signal sorted-set every drain_interval_s and + re-run process_signal for any due signals.""" + while not shutdown_event.is_set(): + try: + due = await deferred_queue.pop_due() + for sig_obj, _queued_at in due: + logger.info("Draining deferred signal for %s", sig_obj.ticker) + try: + await process_signal( + sig_obj, + risk_manager, + broker, + publisher, + counters, + db_session_factory, + slack_notifier, + deferred_queue, + config, + ) + except Exception: + logger.exception("Drained signal processing failed") + except Exception: + logger.exception("Drain loop error") + try: + await asyncio.wait_for( + shutdown_event.wait(), + timeout=config.kevin_defer_drain_interval_s, + ) + except asyncio.TimeoutError: + continue + + drain_task = ( + asyncio.create_task(_drain_deferred_loop()) + if config.kevin_defer_outside_market_hours + else None + ) + # --- Consume loop --- try: async for _msg_id, data in consumer.consume(): @@ -249,10 +345,18 @@ async def run(config: TradeExecutorConfig | None = None) -> None: counters, db_session_factory, slack_notifier, + deferred_queue, + config, ) except Exception: logger.exception("Error processing signal: %s", data) finally: + if drain_task is not None: + drain_task.cancel() + try: + await drain_task + except (asyncio.CancelledError, Exception): + pass await redis.aclose() logger.info("Trade executor stopped gracefully") diff --git a/services/trade_executor/slack_notifier.py b/services/trade_executor/slack_notifier.py index 933f639..ce82bd3 100644 --- a/services/trade_executor/slack_notifier.py +++ b/services/trade_executor/slack_notifier.py @@ -76,6 +76,17 @@ class SlackNotifier: text = self._format_rejection(signal, reason) await self._post(text) + async def notify_deferred(self, signal: TradeSignal, target_ts) -> None: + if not self.enabled: + return + tag = self._strategy_tag(signal) + when = target_ts.strftime("%a %H:%M UTC") if target_ts else "?" + text = ( + f":clock3: *{tag}*: DEFERRED {signal.ticker} until {when} " + f"(market closed; conviction {signal.strength:.2f})" + ) + await self._post(text) + # ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ diff --git a/tests/services/trade_executor/test_defer_integration.py b/tests/services/trade_executor/test_defer_integration.py new file mode 100644 index 0000000..db6a34a --- /dev/null +++ b/tests/services/trade_executor/test_defer_integration.py @@ -0,0 +1,138 @@ +"""process_signal integration: defer outside_market_hours instead of dropping.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from decimal import Decimal +from unittest.mock import AsyncMock, MagicMock, patch +from uuid import uuid4 + +import fakeredis.aioredis +import pytest + +from services.trade_executor.config import TradeExecutorConfig +from services.trade_executor.deferred_queue import DeferredSignalQueue +from services.trade_executor.main import process_signal +from shared.schemas.trading import SignalDirection, TradeSignal + + +@pytest.fixture +def cfg() -> TradeExecutorConfig: + return TradeExecutorConfig( + kevin_defer_outside_market_hours=True, + kevin_max_defer_hours=72.0, + kevin_defer_drain_interval_s=60, + alpaca_api_key="t", + alpaca_secret_key="t", + ) + + +@pytest.fixture +async def queue(): + r = fakeredis.aioredis.FakeRedis() + try: + yield DeferredSignalQueue(r) + finally: + await r.aclose() + + +def _signal(age_hours: float = 0.0) -> TradeSignal: + return TradeSignal( + signal_id=uuid4(), + ticker="NVDA", + direction=SignalDirection.LONG, + strength=0.8, + strategy_sources=["meet_kevin:buy:0.8"], + timestamp=datetime.now(timezone.utc) - timedelta(hours=age_hours), + target_dollars=Decimal("2000"), + ) + + +def _counters(): + return { + "trades_executed": MagicMock(), + "rejections": MagicMock(), + "fill_latency": MagicMock(), + } + + +async def test_outside_market_hours_signal_is_deferred(cfg, queue): + rm = MagicMock() + rm.check_risk = AsyncMock(return_value=(False, "outside_market_hours")) + broker = MagicMock() + next_open = datetime.now(timezone.utc) + timedelta(hours=12) + with patch( + "services.trade_executor.main._next_market_open", + AsyncMock(return_value=next_open), + ): + await process_signal( + _signal(), + rm, + broker, + publisher=AsyncMock(), + counters=_counters(), + db_session_factory=None, + slack_notifier=None, + deferred_queue=queue, + config=cfg, + ) + assert await queue.size() == 1 + + +async def test_stale_signal_not_deferred(cfg, queue): + """Signal older than max_defer_hours falls through to normal rejection.""" + cfg = cfg.model_copy(update={"kevin_max_defer_hours": 24.0}) + rm = MagicMock() + rm.check_risk = AsyncMock(return_value=(False, "outside_market_hours")) + with patch( + "services.trade_executor.main._next_market_open", + AsyncMock(return_value=datetime.now(timezone.utc) + timedelta(hours=12)), + ): + await process_signal( + _signal(age_hours=48), # 48h > 24h cap + rm, + broker=MagicMock(), + publisher=AsyncMock(), + counters=_counters(), + db_session_factory=None, + slack_notifier=None, + deferred_queue=queue, + config=cfg, + ) + assert await queue.size() == 0 + + +async def test_non_market_hours_rejection_not_deferred(cfg, queue): + """Other rejections (e.g. cooldown, kill switch) don't defer.""" + rm = MagicMock() + rm.check_risk = AsyncMock(return_value=(False, "cooldown_active (5m remaining)")) + await process_signal( + _signal(), + rm, + broker=MagicMock(), + publisher=AsyncMock(), + counters=_counters(), + db_session_factory=None, + slack_notifier=None, + deferred_queue=queue, + config=cfg, + ) + assert await queue.size() == 0 + + +async def test_defer_disabled_falls_through(cfg, queue): + cfg = cfg.model_copy(update={"kevin_defer_outside_market_hours": False}) + rm = MagicMock() + rm.check_risk = AsyncMock(return_value=(False, "outside_market_hours")) + await process_signal( + _signal(), + rm, + broker=MagicMock(), + publisher=AsyncMock(), + counters=_counters(), + db_session_factory=None, + slack_notifier=None, + deferred_queue=queue, + config=cfg, + ) + assert await queue.size() == 0 diff --git a/tests/services/trade_executor/test_deferred_queue.py b/tests/services/trade_executor/test_deferred_queue.py new file mode 100644 index 0000000..9171f6b --- /dev/null +++ b/tests/services/trade_executor/test_deferred_queue.py @@ -0,0 +1,101 @@ +"""Tests for the DeferredSignalQueue. + +Kevin's signals are mid/long-term — a signal that lands outside US market +hours should be held in a Redis sorted-set and replayed at the next market +open instead of dropped. +""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from decimal import Decimal +from uuid import uuid4 + +import fakeredis.aioredis +import pytest + +from services.trade_executor.deferred_queue import DeferredSignalQueue +from shared.schemas.trading import SignalDirection, TradeSignal + + +def _signal(ticker: str = "NVDA") -> TradeSignal: + return TradeSignal( + signal_id=uuid4(), + ticker=ticker, + direction=SignalDirection.LONG, + strength=0.85, + strategy_sources=["meet_kevin:buy:0.85"], + timestamp=datetime.now(timezone.utc), + target_dollars=Decimal("2000"), + ) + + +@pytest.fixture +async def redis(): + r = fakeredis.aioredis.FakeRedis() + try: + yield r + finally: + await r.aclose() + + +async def test_defer_then_pop_due_returns_signal(redis): + q = DeferredSignalQueue(redis) + s = _signal() + past = datetime.now(timezone.utc) - timedelta(minutes=1) + await q.defer(s, target_ts=past) + + due = await q.pop_due() + assert len(due) == 1 + popped_signal, _queued_at = due[0] + assert popped_signal.ticker == "NVDA" + assert popped_signal.signal_id == s.signal_id + + +async def test_pop_due_skips_future_targets(redis): + q = DeferredSignalQueue(redis) + s_now = _signal("NVDA") + s_future = _signal("AMD") + now = datetime.now(timezone.utc) + await q.defer(s_now, target_ts=now - timedelta(seconds=10)) + await q.defer(s_future, target_ts=now + timedelta(hours=12)) + + due = await q.pop_due() + assert [s.ticker for s, _ in due] == ["NVDA"] + + remaining = await q.size() + assert remaining == 1 + + +async def test_pop_due_is_atomic_pop(redis): + """Once popped, a signal must NOT come back on the next pop.""" + q = DeferredSignalQueue(redis) + await q.defer(_signal(), target_ts=datetime.now(timezone.utc) - timedelta(seconds=1)) + + first = await q.pop_due() + assert len(first) == 1 + second = await q.pop_due() + assert second == [] + + +async def test_size_zero_when_empty(redis): + q = DeferredSignalQueue(redis) + assert await q.size() == 0 + + +async def test_defer_preserves_signal_age(redis): + """The original TradeSignal.timestamp (which the strategy uses for + mention-age cutoffs) must survive the round-trip.""" + q = DeferredSignalQueue(redis) + orig_ts = datetime(2026, 6, 1, 15, 0, tzinfo=timezone.utc) + s = TradeSignal( + signal_id=uuid4(), + ticker="MSFT", + direction=SignalDirection.LONG, + strength=0.7, + strategy_sources=["meet_kevin"], + timestamp=orig_ts, + ) + await q.defer(s, target_ts=datetime.now(timezone.utc) - timedelta(seconds=1)) + [(popped, _)] = await q.pop_due() + assert popped.timestamp == orig_ts