diff --git a/services/trade_executor/config.py b/services/trade_executor/config.py index 1b7b1a5..e33f3a2 100644 --- a/services/trade_executor/config.py +++ b/services/trade_executor/config.py @@ -22,4 +22,7 @@ class TradeExecutorConfig(BaseConfig): kevin_equity_drawdown_halt_pct: float = 0.20 # 20% drawdown → permanent pause kevin_daily_loss_circuit_pct: float = 0.05 # 5% daily loss → 24h pause + # Slack webhook for per-trade notifications (empty → notifier no-ops). + slack_webhook_url: str = "" + model_config = {"env_prefix": "TRADING_"} diff --git a/services/trade_executor/main.py b/services/trade_executor/main.py index 20ef7aa..87025ad 100644 --- a/services/trade_executor/main.py +++ b/services/trade_executor/main.py @@ -19,6 +19,7 @@ from sqlalchemy.ext.asyncio import async_sessionmaker from services.trade_executor.config import TradeExecutorConfig from services.trade_executor.risk_manager import RiskManager +from services.trade_executor.slack_notifier import SlackNotifier from shared.broker.alpaca_broker import AlpacaBroker from shared.db import create_db from shared.models.trading import Trade as TradeModel @@ -45,6 +46,7 @@ async def process_signal( publisher: StreamPublisher, counters: dict, db_session_factory: async_sessionmaker | None = None, + slack_notifier: SlackNotifier | None = None, ) -> None: """Process a single trade signal: risk check, order, record, publish. @@ -68,6 +70,8 @@ async def process_signal( if not approved: logger.info("Signal REJECTED for %s: %s", signal.ticker, reason) counters["rejections"].add(1, {"reason": reason.split(" ")[0]}) + if slack_notifier is not None: + await slack_notifier.notify_rejection(signal, reason) return # --- Step 2: calculate position size --- @@ -149,6 +153,10 @@ async def process_signal( result.status.value, ) + # --- Step 8: notify slack (best-effort, fail-soft) --- + if slack_notifier is not None: + await slack_notifier.notify_trade(signal, result) + async def run(config: TradeExecutorConfig | None = None) -> None: """Main service loop. @@ -196,6 +204,11 @@ async def run(config: TradeExecutorConfig | None = None) -> None: # --- Risk manager --- risk_manager = RiskManager(config, broker, redis=redis) + # --- Slack notifier (no-op when slack_webhook_url is empty) --- + slack_notifier = SlackNotifier(webhook_url=config.slack_webhook_url) + if slack_notifier.enabled: + logger.info("Slack notifications enabled") + # --- Database (for persisting trades) --- db_session_factory = None try: @@ -219,7 +232,15 @@ async def run(config: TradeExecutorConfig | None = None) -> None: break try: signal_msg = TradeSignal.model_validate(data) - await process_signal(signal_msg, risk_manager, broker, publisher, counters, db_session_factory) + await process_signal( + signal_msg, + risk_manager, + broker, + publisher, + counters, + db_session_factory, + slack_notifier, + ) except Exception: logger.exception("Error processing signal: %s", data) finally: diff --git a/services/trade_executor/slack_notifier.py b/services/trade_executor/slack_notifier.py new file mode 100644 index 0000000..1f7432d --- /dev/null +++ b/services/trade_executor/slack_notifier.py @@ -0,0 +1,94 @@ +"""Slack webhook notifier for trade-executor. + +Posts a short message on each successful order submit and on +notable risk rejections. No-op when the webhook URL is empty. + +Designed to fail-soft: a Slack outage MUST NOT bubble up and crash +the consumer loop. The trade has already happened on Alpaca — Slack +is a downstream observer, not a transactional dependency. +""" + +from __future__ import annotations + +import logging +from typing import Iterable + +import httpx + +from shared.constants.kevin import KEVIN_STRATEGY_UUID +from shared.schemas.trading import OrderResult, TradeSignal + +logger = logging.getLogger(__name__) + + +# Reasons we DON'T want to spam Slack about. outside_market_hours fires +# every poll when a fresh signal lands after-hours — silencing it keeps +# Slack signal-to-noise high. +_DEFAULT_QUIET = frozenset({"outside_market_hours"}) + + +class SlackNotifier: + def __init__( + self, + webhook_url: str, + quiet_rejections: Iterable[str] | None = None, + ) -> None: + self.webhook_url = webhook_url or "" + self.quiet_rejections = frozenset( + quiet_rejections if quiet_rejections is not None else _DEFAULT_QUIET + ) + + @property + def enabled(self) -> bool: + return bool(self.webhook_url) + + async def notify_trade(self, signal: TradeSignal, result: OrderResult) -> None: + if not self.enabled: + return + text = self._format_trade(signal, result) + await self._post(text) + + async def notify_rejection(self, signal: TradeSignal, reason: str) -> None: + if not self.enabled: + return + if reason in self.quiet_rejections: + return + text = self._format_rejection(signal, reason) + await self._post(text) + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _strategy_tag(self, signal: TradeSignal) -> str: + if signal.strategy_id == KEVIN_STRATEGY_UUID: + return "Meet Kevin" + return "trading-bot" + + def _format_trade(self, signal: TradeSignal, result: OrderResult) -> str: + tag = self._strategy_tag(signal) + price = ( + f"${result.filled_price:.2f}" + if result.filled_price is not None + else "—" + ) + return ( + f":chart_with_upwards_trend: *{tag}*: " + f"{result.side.value} {result.qty:g} {result.ticker} @ {price} " + f"(conviction {signal.strength:.2f}, status {result.status.value})" + ) + + def _format_rejection(self, signal: TradeSignal, reason: str) -> str: + tag = self._strategy_tag(signal) + return ( + f":no_entry: *{tag}*: REJECTED {signal.ticker} — {reason} " + f"(conviction {signal.strength:.2f})" + ) + + async def _post(self, text: str) -> None: + payload = {"text": text} + try: + async with httpx.AsyncClient(timeout=5.0) as client: + await client.post(self.webhook_url, json=payload) + except Exception as exc: + logger.warning("Slack post failed (swallowed): %s", exc) diff --git a/tests/services/trade_executor/test_slack_notifier.py b/tests/services/trade_executor/test_slack_notifier.py new file mode 100644 index 0000000..1a1bce4 --- /dev/null +++ b/tests/services/trade_executor/test_slack_notifier.py @@ -0,0 +1,142 @@ +"""Tests for the SlackNotifier used by trade-executor.""" + +from __future__ import annotations + +from datetime import datetime, timezone +from decimal import Decimal +from unittest.mock import AsyncMock, patch +from uuid import UUID, uuid4 + +import pytest + +from services.trade_executor.slack_notifier import SlackNotifier +from shared.constants.kevin import KEVIN_STRATEGY_UUID +from shared.schemas.trading import ( + OrderResult, + OrderSide, + OrderStatus, + SignalDirection, + TradeSignal, +) + + +def _signal(strategy_id=None, target_dollars=None) -> TradeSignal: + return TradeSignal( + ticker="NVDA", + direction=SignalDirection.LONG, + strength=0.85, + strategy_sources=["meet_kevin:buy:0.85"], + timestamp=datetime.now(timezone.utc), + strategy_id=strategy_id, + target_dollars=target_dollars, + ) + + +def _filled_order(ticker="NVDA", qty=10, price=217.50) -> OrderResult: + return OrderResult( + order_id=str(uuid4()), + ticker=ticker, + side=OrderSide.BUY, + qty=qty, + filled_price=price, + status=OrderStatus.FILLED, + timestamp=datetime.now(timezone.utc), + ) + + +class TestSlackNotifierNoWebhook: + """Empty webhook_url -> notifier is a no-op (returns without raising).""" + + @pytest.mark.asyncio + async def test_notify_trade_noop(self): + notifier = SlackNotifier(webhook_url="") + # should not raise even with no mock + await notifier.notify_trade(_signal(), _filled_order()) + + @pytest.mark.asyncio + async def test_notify_rejection_noop(self): + notifier = SlackNotifier(webhook_url="") + await notifier.notify_rejection(_signal(), "outside_market_hours") + + +class TestSlackNotifierTradePost: + @pytest.mark.asyncio + async def test_trade_post_calls_webhook(self): + notifier = SlackNotifier(webhook_url="https://hooks.slack.test/abc") + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_post = AsyncMock(return_value=AsyncMock(status_code=200)) + mock_client.post = mock_post + mock_client_cls.return_value.__aenter__.return_value = mock_client + await notifier.notify_trade( + _signal(strategy_id=KEVIN_STRATEGY_UUID), + _filled_order(qty=10, price=217.50), + ) + mock_post.assert_called_once() + url, kwargs = mock_post.call_args.args[0], mock_post.call_args.kwargs + assert url == "https://hooks.slack.test/abc" + payload = kwargs["json"] + assert "NVDA" in payload["text"] + assert "10" in payload["text"] + assert "217.50" in payload["text"] + + @pytest.mark.asyncio + async def test_trade_post_strategy_tag_when_kevin(self): + notifier = SlackNotifier(webhook_url="https://hooks.slack.test/abc") + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=AsyncMock(status_code=200)) + mock_client_cls.return_value.__aenter__.return_value = mock_client + await notifier.notify_trade( + _signal(strategy_id=KEVIN_STRATEGY_UUID), + _filled_order(), + ) + payload = mock_client.post.call_args.kwargs["json"] + assert "Meet Kevin" in payload["text"] + + @pytest.mark.asyncio + async def test_trade_post_swallows_http_errors(self): + """A failed Slack post must NOT bubble up — the trade already + happened; we shouldn't crash the consumer loop because Slack is + having a bad day.""" + notifier = SlackNotifier(webhook_url="https://hooks.slack.test/abc") + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.post = AsyncMock(side_effect=Exception("network down")) + mock_client_cls.return_value.__aenter__.return_value = mock_client + # should NOT raise + await notifier.notify_trade(_signal(), _filled_order()) + + +class TestSlackNotifierRejectionPost: + @pytest.mark.asyncio + async def test_rejection_post_calls_webhook(self): + notifier = SlackNotifier(webhook_url="https://hooks.slack.test/abc") + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=AsyncMock(status_code=200)) + mock_client_cls.return_value.__aenter__.return_value = mock_client + await notifier.notify_rejection( + _signal(strategy_id=KEVIN_STRATEGY_UUID), + reason="kevin_daily_trade_cap", + ) + payload = mock_client.post.call_args.kwargs["json"] + assert "REJECTED" in payload["text"] + assert "kevin_daily_trade_cap" in payload["text"] + assert "NVDA" in payload["text"] + + @pytest.mark.asyncio + async def test_rejection_post_skips_noise(self): + """outside_market_hours fires every minute when the bot tries to + trade after-hours — we don't want a Slack barrage. The notifier + skips it.""" + notifier = SlackNotifier( + webhook_url="https://hooks.slack.test/abc", + quiet_rejections={"outside_market_hours"}, + ) + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.post = AsyncMock() + mock_client_cls.return_value.__aenter__.return_value = mock_client + await notifier.notify_rejection(_signal(), "outside_market_hours") + mock_client.post.assert_not_called()