"""Trade Executor service -- main entry point. Consumes ``signals:generated`` from Redis Streams, runs risk checks, submits orders via the brokerage abstraction layer, records trades in the database, and publishes ``TradeExecution`` messages to ``trades:executed``. """ from __future__ import annotations import asyncio import logging import signal import time import uuid from datetime import datetime, timezone from redis.asyncio import Redis from sqlalchemy.ext.asyncio import async_sessionmaker from services.trade_executor.config import TradeExecutorConfig from services.trade_executor.deferred_queue import DeferredSignalQueue from services.trade_executor.risk_manager import RiskManager from services.trade_executor.slack_notifier import SlackNotifier from shared.broker.alpaca_broker import AlpacaBroker from shared.db import create_db from shared.models.trading import Trade as TradeModel from shared.models.trading import TradeSide as TradeSideModel from shared.models.trading import TradeStatus as TradeStatusModel from shared.redis_streams import StreamConsumer, StreamPublisher from shared.schemas.trading import ( OrderRequest, OrderSide, OrderStatus, SignalDirection, TradeExecution, TradeSignal, ) from shared.telemetry import setup_telemetry logger = logging.getLogger(__name__) async def _next_market_open(broker: AlpacaBroker) -> datetime: """Alpaca's clock API knows weekends + holidays; use it instead of hardcoding 9:30 ET.""" from datetime import timedelta clock = await asyncio.to_thread(broker._client.get_clock) next_open = getattr(clock, "next_open", None) if next_open is None: # Defensive fallback — defer by 1 hour and re-check. return datetime.now(timezone.utc) + timedelta(hours=1) if next_open.tzinfo is None: next_open = next_open.replace(tzinfo=timezone.utc) return next_open.astimezone(timezone.utc) async def process_signal( signal: TradeSignal, risk_manager: RiskManager, broker: AlpacaBroker, publisher: StreamPublisher, counters: dict, db_session_factory: async_sessionmaker | None = None, slack_notifier: SlackNotifier | None = None, deferred_queue: DeferredSignalQueue | None = None, config: TradeExecutorConfig | None = None, ) -> None: """Process a single trade signal: risk check, order, record, publish. Parameters ---------- signal: The trade signal to act on. risk_manager: Performs pre-trade risk checks and position sizing. broker: Brokerage adapter for submitting orders. publisher: Publishes execution results to ``trades:executed``. counters: Dict of OpenTelemetry counter/histogram instruments. db_session_factory: Optional async session factory for persisting trades to the DB. """ # --- Step 1: risk check --- approved, reason = await risk_manager.check_risk(signal) if not approved: # v2: defer outside-market-hours instead of dropping. Kevin's # signals are mid/long-term so a Sunday-evening signal should turn # into a Monday paper trade. Skip if signal is already stale beyond # kevin_max_defer_hours. if ( reason == "outside_market_hours" and deferred_queue is not None and config is not None and config.kevin_defer_outside_market_hours ): age_seconds = ( datetime.now(timezone.utc) - signal.timestamp ).total_seconds() max_defer_seconds = config.kevin_max_defer_hours * 3600 if age_seconds < max_defer_seconds: target = await _next_market_open(broker) await deferred_queue.defer(signal, target) logger.info( "Signal DEFERRED for %s until %s", signal.ticker, target.isoformat() ) counters["rejections"].add(1, {"reason": "deferred"}) if slack_notifier is not None: await slack_notifier.notify_deferred(signal, target) return logger.info( "Signal NOT DEFERRED for %s — age %.1fh exceeds max_defer_hours %.1fh", signal.ticker, age_seconds / 3600, config.kevin_max_defer_hours, ) logger.info("Signal REJECTED for %s: %s", signal.ticker, reason) counters["rejections"].add(1, {"reason": reason.split(" ")[0]}) if slack_notifier is not None: await slack_notifier.notify_rejection(signal, reason) return # --- Step 2: calculate position size --- account = await broker.get_account() qty = risk_manager.calculate_position_size(signal, account) if qty <= 0: logger.info("Position size is zero for %s — skipping", signal.ticker) counters["rejections"].add(1, {"reason": "zero_position_size"}) return # --- Step 3: create order --- side = OrderSide.BUY if signal.direction == SignalDirection.LONG else OrderSide.SELL order_request = OrderRequest( ticker=signal.ticker, side=side, qty=float(qty), ) # --- Step 4: submit order --- start = time.monotonic() result = await broker.submit_order(order_request) elapsed = time.monotonic() - start counters["fill_latency"].record(elapsed) # --- Step 5: build trade execution --- trade_id = uuid.uuid4() execution = TradeExecution( trade_id=trade_id, ticker=signal.ticker, side=side, qty=result.qty, price=result.filled_price or 0.0, status=result.status, signal_id=signal.signal_id, strategy_id=None, strategy_sources=signal.strategy_sources, timestamp=result.timestamp, ) # --- Step 6: persist trade to DB --- if db_session_factory is not None: try: side_map = { OrderSide.BUY: TradeSideModel.BUY, OrderSide.SELL: TradeSideModel.SELL, } status_map = { OrderStatus.PENDING: TradeStatusModel.PENDING, OrderStatus.FILLED: TradeStatusModel.FILLED, OrderStatus.CANCELLED: TradeStatusModel.CANCELLED, OrderStatus.REJECTED: TradeStatusModel.REJECTED, } async with db_session_factory() as session: db_trade = TradeModel( id=trade_id, ticker=signal.ticker, side=side_map[side], qty=result.qty, price=result.filled_price or 0.0, timestamp=str(result.timestamp), signal_id=signal.signal_id, status=status_map.get(result.status, TradeStatusModel.PENDING), ) session.add(db_trade) await session.commit() logger.debug("Persisted trade %s to DB (signal_id=%s)", trade_id, signal.signal_id) except Exception: logger.exception("Failed to persist trade to DB") # --- Step 7: publish to trades:executed --- await publisher.publish(execution.model_dump(mode="json")) counters["trades_executed"].add(1) logger.info( "Trade executed: %s %s %.0f shares @ %s status=%s", side.value, signal.ticker, result.qty, result.filled_price, result.status.value, ) # --- Step 8: notify slack (best-effort, fail-soft) --- if slack_notifier is not None: await slack_notifier.notify_trade(signal, result) async def run(config: TradeExecutorConfig | None = None) -> None: """Main service loop. Connects to Redis, initialises the broker and risk manager, then continuously consumes from ``signals:generated`` and publishes execution results to ``trades:executed``. """ if config is None: config = TradeExecutorConfig() logging.basicConfig(level=config.log_level) logger.info("Starting Trade Executor service") # --- Telemetry --- meter = setup_telemetry("trade-executor", config.otel_metrics_port) counters = { "trades_executed": meter.create_counter( "trades_executed", description="Total trades successfully submitted", ), "rejections": meter.create_counter( "trade_rejections", description="Signals rejected by risk checks", ), "fill_latency": meter.create_histogram( "order_fill_latency_seconds", description="Time from order submission to response", unit="s", ), } # --- Redis --- redis = Redis.from_url(config.redis_url, decode_responses=False) consumer = StreamConsumer(redis, "signals:generated", "trade-executor", "worker-1") publisher = StreamPublisher(redis, "trades:executed") # --- Broker --- broker = AlpacaBroker( api_key=config.alpaca_api_key, secret_key=config.alpaca_secret_key, paper=config.paper_trading, ) # --- Risk manager --- risk_manager = RiskManager(config, broker, redis=redis) # --- Slack notifier (no-op when both transports are empty) --- slack_notifier = SlackNotifier( webhook_url=config.slack_webhook_url, bot_token=config.slack_bot_token, channel=config.slack_channel, ) if slack_notifier.enabled: transport = "bot-token" if slack_notifier.uses_bot_token else "webhook" logger.info( "Slack notifications enabled (%s%s)", transport, f", channel=#{slack_notifier.channel}" if slack_notifier.uses_bot_token else "", ) # --- Database (for persisting trades) --- db_session_factory = None try: _engine, db_session_factory = create_db(config) logger.info("Database session factory initialised for trade persistence") except Exception: logger.exception("Failed to initialise DB — trades will NOT be persisted") logger.info("Consuming from signals:generated, publishing to trades:executed") # --- Deferred-signal queue + drain task --- deferred_queue = DeferredSignalQueue(redis) if config.kevin_defer_outside_market_hours: logger.info( "Deferred-signal queue enabled (max_defer=%.1fh, drain_interval=%ds)", config.kevin_max_defer_hours, config.kevin_defer_drain_interval_s, ) # Graceful shutdown on SIGTERM/SIGINT shutdown_event = asyncio.Event() loop = asyncio.get_running_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, shutdown_event.set) async def _drain_deferred_loop() -> None: """Poll the deferred-signal sorted-set every drain_interval_s and re-run process_signal for any due signals.""" while not shutdown_event.is_set(): try: due = await deferred_queue.pop_due() for sig_obj, _queued_at in due: logger.info("Draining deferred signal for %s", sig_obj.ticker) try: await process_signal( sig_obj, risk_manager, broker, publisher, counters, db_session_factory, slack_notifier, deferred_queue, config, ) except Exception: logger.exception("Drained signal processing failed") except Exception: logger.exception("Drain loop error") try: await asyncio.wait_for( shutdown_event.wait(), timeout=config.kevin_defer_drain_interval_s, ) except asyncio.TimeoutError: continue drain_task = ( asyncio.create_task(_drain_deferred_loop()) if config.kevin_defer_outside_market_hours else None ) # --- Consume loop --- try: async for _msg_id, data in consumer.consume(): if shutdown_event.is_set(): break try: signal_msg = TradeSignal.model_validate(data) await process_signal( signal_msg, risk_manager, broker, publisher, counters, db_session_factory, slack_notifier, deferred_queue, config, ) except Exception: logger.exception("Error processing signal: %s", data) finally: if drain_task is not None: drain_task.cancel() try: await drain_task except (asyncio.CancelledError, Exception): pass await redis.aclose() logger.info("Trade executor stopped gracefully") def main() -> None: """CLI entry point.""" asyncio.run(run()) if __name__ == "__main__": main()