Kevin's signals are mid-long term (weeks/months) and he uploads almost
exclusively pre-market or evenings. Before this change, every such
signal hit RiskManager.outside_market_hours, got consumed off the
Redis stream, and was lost. End result: 71 emitted signals, 0 trades.
New behaviour: when RiskManager rejects with outside_market_hours,
push the signal into a Redis sorted-set keyed by next_market_open
(via Alpaca's clock API — handles weekends + holidays). A background
drain task polls the set every kevin_defer_drain_interval_s (60s);
any signal whose target <= now gets re-run through process_signal.
Safety:
- kevin_max_defer_hours (default 72h) caps signal staleness so we
don't trade on week-old views.
- Other RiskManager rejections (cooldown, kill-switch, drawdown
halt) fall through to the existing drop path.
- kevin_defer_outside_market_hours toggle defaults True; flip to
false for legacy behaviour.
Slack: new notify_deferred() emits "🕒 Meet Kevin: DEFERRED
NVDA until Mon 13:30 UTC (market closed; conviction 0.85)" instead
of the noisy outside_market_hours rejection spam.
Tests: 5 queue + 4 integration = 9 new, all 32 trade-executor tests
GREEN.
370 lines
13 KiB
Python
370 lines
13 KiB
Python
"""Trade Executor service -- main entry point.
|
|
|
|
Consumes ``signals:generated`` from Redis Streams, runs risk checks,
|
|
submits orders via the brokerage abstraction layer, records trades
|
|
in the database, and publishes ``TradeExecution`` messages to
|
|
``trades:executed``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import signal
|
|
import time
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
from redis.asyncio import Redis
|
|
from sqlalchemy.ext.asyncio import async_sessionmaker
|
|
|
|
from services.trade_executor.config import TradeExecutorConfig
|
|
from services.trade_executor.deferred_queue import DeferredSignalQueue
|
|
from services.trade_executor.risk_manager import RiskManager
|
|
from services.trade_executor.slack_notifier import SlackNotifier
|
|
from shared.broker.alpaca_broker import AlpacaBroker
|
|
from shared.db import create_db
|
|
from shared.models.trading import Trade as TradeModel
|
|
from shared.models.trading import TradeSide as TradeSideModel
|
|
from shared.models.trading import TradeStatus as TradeStatusModel
|
|
from shared.redis_streams import StreamConsumer, StreamPublisher
|
|
from shared.schemas.trading import (
|
|
OrderRequest,
|
|
OrderSide,
|
|
OrderStatus,
|
|
SignalDirection,
|
|
TradeExecution,
|
|
TradeSignal,
|
|
)
|
|
from shared.telemetry import setup_telemetry
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def _next_market_open(broker: AlpacaBroker) -> datetime:
|
|
"""Alpaca's clock API knows weekends + holidays; use it instead of
|
|
hardcoding 9:30 ET."""
|
|
from datetime import timedelta
|
|
|
|
clock = await asyncio.to_thread(broker._client.get_clock)
|
|
next_open = getattr(clock, "next_open", None)
|
|
if next_open is None:
|
|
# Defensive fallback — defer by 1 hour and re-check.
|
|
return datetime.now(timezone.utc) + timedelta(hours=1)
|
|
if next_open.tzinfo is None:
|
|
next_open = next_open.replace(tzinfo=timezone.utc)
|
|
return next_open.astimezone(timezone.utc)
|
|
|
|
|
|
async def process_signal(
|
|
signal: TradeSignal,
|
|
risk_manager: RiskManager,
|
|
broker: AlpacaBroker,
|
|
publisher: StreamPublisher,
|
|
counters: dict,
|
|
db_session_factory: async_sessionmaker | None = None,
|
|
slack_notifier: SlackNotifier | None = None,
|
|
deferred_queue: DeferredSignalQueue | None = None,
|
|
config: TradeExecutorConfig | None = None,
|
|
) -> None:
|
|
"""Process a single trade signal: risk check, order, record, publish.
|
|
|
|
Parameters
|
|
----------
|
|
signal:
|
|
The trade signal to act on.
|
|
risk_manager:
|
|
Performs pre-trade risk checks and position sizing.
|
|
broker:
|
|
Brokerage adapter for submitting orders.
|
|
publisher:
|
|
Publishes execution results to ``trades:executed``.
|
|
counters:
|
|
Dict of OpenTelemetry counter/histogram instruments.
|
|
db_session_factory:
|
|
Optional async session factory for persisting trades to the DB.
|
|
"""
|
|
# --- Step 1: risk check ---
|
|
approved, reason = await risk_manager.check_risk(signal)
|
|
if not approved:
|
|
# v2: defer outside-market-hours instead of dropping. Kevin's
|
|
# signals are mid/long-term so a Sunday-evening signal should turn
|
|
# into a Monday paper trade. Skip if signal is already stale beyond
|
|
# kevin_max_defer_hours.
|
|
if (
|
|
reason == "outside_market_hours"
|
|
and deferred_queue is not None
|
|
and config is not None
|
|
and config.kevin_defer_outside_market_hours
|
|
):
|
|
age_seconds = (
|
|
datetime.now(timezone.utc) - signal.timestamp
|
|
).total_seconds()
|
|
max_defer_seconds = config.kevin_max_defer_hours * 3600
|
|
if age_seconds < max_defer_seconds:
|
|
target = await _next_market_open(broker)
|
|
await deferred_queue.defer(signal, target)
|
|
logger.info(
|
|
"Signal DEFERRED for %s until %s", signal.ticker, target.isoformat()
|
|
)
|
|
counters["rejections"].add(1, {"reason": "deferred"})
|
|
if slack_notifier is not None:
|
|
await slack_notifier.notify_deferred(signal, target)
|
|
return
|
|
logger.info(
|
|
"Signal NOT DEFERRED for %s — age %.1fh exceeds max_defer_hours %.1fh",
|
|
signal.ticker,
|
|
age_seconds / 3600,
|
|
config.kevin_max_defer_hours,
|
|
)
|
|
logger.info("Signal REJECTED for %s: %s", signal.ticker, reason)
|
|
counters["rejections"].add(1, {"reason": reason.split(" ")[0]})
|
|
if slack_notifier is not None:
|
|
await slack_notifier.notify_rejection(signal, reason)
|
|
return
|
|
|
|
# --- Step 2: calculate position size ---
|
|
account = await broker.get_account()
|
|
qty = risk_manager.calculate_position_size(signal, account)
|
|
if qty <= 0:
|
|
logger.info("Position size is zero for %s — skipping", signal.ticker)
|
|
counters["rejections"].add(1, {"reason": "zero_position_size"})
|
|
return
|
|
|
|
# --- Step 3: create order ---
|
|
side = OrderSide.BUY if signal.direction == SignalDirection.LONG else OrderSide.SELL
|
|
order_request = OrderRequest(
|
|
ticker=signal.ticker,
|
|
side=side,
|
|
qty=float(qty),
|
|
)
|
|
|
|
# --- Step 4: submit order ---
|
|
start = time.monotonic()
|
|
result = await broker.submit_order(order_request)
|
|
elapsed = time.monotonic() - start
|
|
counters["fill_latency"].record(elapsed)
|
|
|
|
# --- Step 5: build trade execution ---
|
|
trade_id = uuid.uuid4()
|
|
execution = TradeExecution(
|
|
trade_id=trade_id,
|
|
ticker=signal.ticker,
|
|
side=side,
|
|
qty=result.qty,
|
|
price=result.filled_price or 0.0,
|
|
status=result.status,
|
|
signal_id=signal.signal_id,
|
|
strategy_id=None,
|
|
strategy_sources=signal.strategy_sources,
|
|
timestamp=result.timestamp,
|
|
)
|
|
|
|
# --- Step 6: persist trade to DB ---
|
|
if db_session_factory is not None:
|
|
try:
|
|
side_map = {
|
|
OrderSide.BUY: TradeSideModel.BUY,
|
|
OrderSide.SELL: TradeSideModel.SELL,
|
|
}
|
|
status_map = {
|
|
OrderStatus.PENDING: TradeStatusModel.PENDING,
|
|
OrderStatus.FILLED: TradeStatusModel.FILLED,
|
|
OrderStatus.CANCELLED: TradeStatusModel.CANCELLED,
|
|
OrderStatus.REJECTED: TradeStatusModel.REJECTED,
|
|
}
|
|
async with db_session_factory() as session:
|
|
db_trade = TradeModel(
|
|
id=trade_id,
|
|
ticker=signal.ticker,
|
|
side=side_map[side],
|
|
qty=result.qty,
|
|
price=result.filled_price or 0.0,
|
|
timestamp=str(result.timestamp),
|
|
signal_id=signal.signal_id,
|
|
status=status_map.get(result.status, TradeStatusModel.PENDING),
|
|
)
|
|
session.add(db_trade)
|
|
await session.commit()
|
|
logger.debug("Persisted trade %s to DB (signal_id=%s)", trade_id, signal.signal_id)
|
|
except Exception:
|
|
logger.exception("Failed to persist trade to DB")
|
|
|
|
# --- Step 7: publish to trades:executed ---
|
|
await publisher.publish(execution.model_dump(mode="json"))
|
|
counters["trades_executed"].add(1)
|
|
logger.info(
|
|
"Trade executed: %s %s %.0f shares @ %s status=%s",
|
|
side.value,
|
|
signal.ticker,
|
|
result.qty,
|
|
result.filled_price,
|
|
result.status.value,
|
|
)
|
|
|
|
# --- Step 8: notify slack (best-effort, fail-soft) ---
|
|
if slack_notifier is not None:
|
|
await slack_notifier.notify_trade(signal, result)
|
|
|
|
|
|
async def run(config: TradeExecutorConfig | None = None) -> None:
|
|
"""Main service loop.
|
|
|
|
Connects to Redis, initialises the broker and risk manager, then
|
|
continuously consumes from ``signals:generated`` and publishes
|
|
execution results to ``trades:executed``.
|
|
"""
|
|
if config is None:
|
|
config = TradeExecutorConfig()
|
|
|
|
logging.basicConfig(level=config.log_level)
|
|
logger.info("Starting Trade Executor service")
|
|
|
|
# --- Telemetry ---
|
|
meter = setup_telemetry("trade-executor", config.otel_metrics_port)
|
|
counters = {
|
|
"trades_executed": meter.create_counter(
|
|
"trades_executed",
|
|
description="Total trades successfully submitted",
|
|
),
|
|
"rejections": meter.create_counter(
|
|
"trade_rejections",
|
|
description="Signals rejected by risk checks",
|
|
),
|
|
"fill_latency": meter.create_histogram(
|
|
"order_fill_latency_seconds",
|
|
description="Time from order submission to response",
|
|
unit="s",
|
|
),
|
|
}
|
|
|
|
# --- Redis ---
|
|
redis = Redis.from_url(config.redis_url, decode_responses=False)
|
|
consumer = StreamConsumer(redis, "signals:generated", "trade-executor", "worker-1")
|
|
publisher = StreamPublisher(redis, "trades:executed")
|
|
|
|
# --- Broker ---
|
|
broker = AlpacaBroker(
|
|
api_key=config.alpaca_api_key,
|
|
secret_key=config.alpaca_secret_key,
|
|
paper=config.paper_trading,
|
|
)
|
|
|
|
# --- Risk manager ---
|
|
risk_manager = RiskManager(config, broker, redis=redis)
|
|
|
|
# --- Slack notifier (no-op when both transports are empty) ---
|
|
slack_notifier = SlackNotifier(
|
|
webhook_url=config.slack_webhook_url,
|
|
bot_token=config.slack_bot_token,
|
|
channel=config.slack_channel,
|
|
)
|
|
if slack_notifier.enabled:
|
|
transport = "bot-token" if slack_notifier.uses_bot_token else "webhook"
|
|
logger.info(
|
|
"Slack notifications enabled (%s%s)",
|
|
transport,
|
|
f", channel=#{slack_notifier.channel}" if slack_notifier.uses_bot_token else "",
|
|
)
|
|
|
|
# --- Database (for persisting trades) ---
|
|
db_session_factory = None
|
|
try:
|
|
_engine, db_session_factory = create_db(config)
|
|
logger.info("Database session factory initialised for trade persistence")
|
|
except Exception:
|
|
logger.exception("Failed to initialise DB — trades will NOT be persisted")
|
|
|
|
logger.info("Consuming from signals:generated, publishing to trades:executed")
|
|
|
|
# --- Deferred-signal queue + drain task ---
|
|
deferred_queue = DeferredSignalQueue(redis)
|
|
if config.kevin_defer_outside_market_hours:
|
|
logger.info(
|
|
"Deferred-signal queue enabled (max_defer=%.1fh, drain_interval=%ds)",
|
|
config.kevin_max_defer_hours,
|
|
config.kevin_defer_drain_interval_s,
|
|
)
|
|
|
|
# Graceful shutdown on SIGTERM/SIGINT
|
|
shutdown_event = asyncio.Event()
|
|
loop = asyncio.get_running_loop()
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
loop.add_signal_handler(sig, shutdown_event.set)
|
|
|
|
async def _drain_deferred_loop() -> None:
|
|
"""Poll the deferred-signal sorted-set every drain_interval_s and
|
|
re-run process_signal for any due signals."""
|
|
while not shutdown_event.is_set():
|
|
try:
|
|
due = await deferred_queue.pop_due()
|
|
for sig_obj, _queued_at in due:
|
|
logger.info("Draining deferred signal for %s", sig_obj.ticker)
|
|
try:
|
|
await process_signal(
|
|
sig_obj,
|
|
risk_manager,
|
|
broker,
|
|
publisher,
|
|
counters,
|
|
db_session_factory,
|
|
slack_notifier,
|
|
deferred_queue,
|
|
config,
|
|
)
|
|
except Exception:
|
|
logger.exception("Drained signal processing failed")
|
|
except Exception:
|
|
logger.exception("Drain loop error")
|
|
try:
|
|
await asyncio.wait_for(
|
|
shutdown_event.wait(),
|
|
timeout=config.kevin_defer_drain_interval_s,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
|
|
drain_task = (
|
|
asyncio.create_task(_drain_deferred_loop())
|
|
if config.kevin_defer_outside_market_hours
|
|
else None
|
|
)
|
|
|
|
# --- Consume loop ---
|
|
try:
|
|
async for _msg_id, data in consumer.consume():
|
|
if shutdown_event.is_set():
|
|
break
|
|
try:
|
|
signal_msg = TradeSignal.model_validate(data)
|
|
await process_signal(
|
|
signal_msg,
|
|
risk_manager,
|
|
broker,
|
|
publisher,
|
|
counters,
|
|
db_session_factory,
|
|
slack_notifier,
|
|
deferred_queue,
|
|
config,
|
|
)
|
|
except Exception:
|
|
logger.exception("Error processing signal: %s", data)
|
|
finally:
|
|
if drain_task is not None:
|
|
drain_task.cancel()
|
|
try:
|
|
await drain_task
|
|
except (asyncio.CancelledError, Exception):
|
|
pass
|
|
await redis.aclose()
|
|
logger.info("Trade executor stopped gracefully")
|
|
|
|
|
|
def main() -> None:
|
|
"""CLI entry point."""
|
|
asyncio.run(run())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|