diff --git a/services/api_gateway/config.py b/services/api_gateway/config.py index da88df9..d2fac0d 100644 --- a/services/api_gateway/config.py +++ b/services/api_gateway/config.py @@ -25,6 +25,13 @@ class ApiGatewayConfig(BaseConfig): paper_trading: bool = True snapshot_interval_seconds: int = 60 + # Slack — close notifications from the trade-reconcile loop. The channel + # defaults here (rather than "") because the deployment env carries only + # the bot token; chat:write.public lets the bot post without an invite. + slack_webhook_url: str = "" + slack_bot_token: str = "" + slack_channel: str = "trading-bot" + # CORS settings cors_origins: list[str] = ["http://localhost:5173"] diff --git a/services/api_gateway/tasks/trade_reconcile.py b/services/api_gateway/tasks/trade_reconcile.py index 22118c7..cdec883 100644 --- a/services/api_gateway/tasks/trade_reconcile.py +++ b/services/api_gateway/tasks/trade_reconcile.py @@ -32,6 +32,7 @@ from shared.broker.base import BaseBroker from shared.constants.kevin import KEVIN_STRATEGY_UUID from shared.models.trading import Trade, TradeSide, TradeStatus from shared.schemas.trading import BrokerOrder, OrderResult, OrderStatus +from shared.slack_notifier import SlackNotifier logger = logging.getLogger(__name__) @@ -63,32 +64,31 @@ async def _already_booked(session: AsyncSession, leg_order_id: str) -> bool: async def _reconcile_trade( session: AsyncSession, entry: Trade, order: BrokerOrder -) -> bool: +) -> Trade | None: """Apply one trade's reconciliation. - Returns ``True`` if a closing trade was booked. Books an auto-close when a - stop-loss / take-profit leg has filled (idempotent on the leg order id); - otherwise syncs a non-terminal local status from the parent order. + Returns the booked closing ``Trade`` when a stop-loss / take-profit leg + has filled (idempotent on the leg order id); otherwise syncs a + non-terminal local status from the parent order and returns ``None``. """ leg = _filled_leg(order) if leg is not None and leg.filled_price is not None: if await _already_booked(session, leg.order_id): - return False + return None fill_price = leg.filled_price pnl = (fill_price - entry.price) * leg.qty - session.add( - Trade( - ticker=entry.ticker, - side=TradeSide.SELL, - qty=leg.qty, - price=fill_price, - status=TradeStatus.FILLED, - strategy_id=KEVIN_STRATEGY_UUID, - signal_id=entry.signal_id, - broker_order_id=leg.order_id, - pnl=pnl, - ) + close = Trade( + ticker=entry.ticker, + side=TradeSide.SELL, + qty=leg.qty, + price=fill_price, + status=TradeStatus.FILLED, + strategy_id=KEVIN_STRATEGY_UUID, + signal_id=entry.signal_id, + broker_order_id=leg.order_id, + pnl=pnl, ) + session.add(close) logger.info( "Reconciled auto-close for %s: leg %s filled @ %.2f, pnl=%.2f", entry.ticker, @@ -96,7 +96,7 @@ async def _reconcile_trade( leg.filled_price, pnl, ) - return True + return close # No filled exit leg — sync a non-terminal local status from the parent. if entry.status == TradeStatus.PENDING: @@ -111,12 +111,13 @@ async def _reconcile_trade( entry.broker_order_id, mapped.value, ) - return False + return None async def reconcile_once( broker: BaseBroker, session_factory: async_sessionmaker, + notifier: SlackNotifier | None = None, ) -> None: """Perform a single reconciliation cycle over open Kevin entries.""" async with session_factory() as session: @@ -162,14 +163,30 @@ async def reconcile_once( ) continue try: - if await _reconcile_trade(session, entry, order): - booked += 1 + close = await _reconcile_trade(session, entry, order) except Exception: logger.exception( "reconcile: booking failed for %s (%s) — skipping row", entry.ticker, entry.broker_order_id, ) + continue + if close is None: + continue + booked += 1 + if notifier is not None: + # Slack is an observer — its failure must not lose the row. + try: + await notifier.notify_close( + ticker=close.ticker, + qty=close.qty, + price=close.price, + pnl=close.pnl or 0.0, + strategy_id=close.strategy_id, + reason="bracket leg filled at broker", + ) + except Exception: + logger.warning("reconcile: close notification failed", exc_info=True) await session.commit() @@ -197,17 +214,23 @@ async def trade_reconcile_loop( secret_key=config.alpaca_secret_key, paper=config.paper_trading, ) + notifier = SlackNotifier( + webhook_url=config.slack_webhook_url, + bot_token=config.slack_bot_token, + channel=config.slack_channel, + ) logger.info( - "Trade reconcile started (interval=%ds, paper=%s)", + "Trade reconcile started (interval=%ds, paper=%s, slack=%s)", config.snapshot_interval_seconds, config.paper_trading, + "on" if notifier.enabled else "off", ) while True: try: if is_market_open(): - await reconcile_once(broker, session_factory) + await reconcile_once(broker, session_factory, notifier=notifier) else: logger.debug("Market closed — skipping trade reconcile") except asyncio.CancelledError: diff --git a/services/trade_executor/main.py b/services/trade_executor/main.py index f70775e..5f23ec9 100644 --- a/services/trade_executor/main.py +++ b/services/trade_executor/main.py @@ -21,7 +21,7 @@ from sqlalchemy.ext.asyncio import async_sessionmaker from services.trade_executor.config import TradeExecutorConfig from services.trade_executor.deferred_queue import DeferredSignalQueue from services.trade_executor.risk_manager import RiskManager -from services.trade_executor.slack_notifier import SlackNotifier +from shared.slack_notifier import SlackNotifier from shared.broker.alpaca_broker import AlpacaBroker from shared.db import create_db from shared.models.trading import Trade as TradeModel diff --git a/shared/slack_notifier.py b/shared/slack_notifier.py new file mode 100644 index 0000000..5778f18 --- /dev/null +++ b/shared/slack_notifier.py @@ -0,0 +1,164 @@ +"""Slack notifier for trade-executor. + +Supports two transports, picked by what's configured: + 1. **Bot token + channel** (preferred) — uses chat.postMessage. Channel + can be changed via env var without redeploying the Slack app or + rotating webhook URLs. + 2. **Webhook URL** (legacy) — single-channel, pinned at webhook + creation time. + +If both are set, the bot-token path wins. If neither, the notifier +is a no-op. + +Designed to fail-soft: a Slack outage MUST NOT bubble up and crash +the consumer loop. The trade has already happened on Alpaca — Slack +is a downstream observer, not a transactional dependency. +""" + +from __future__ import annotations + +import logging +from typing import Iterable +from uuid import UUID + +import httpx + +from shared.constants.kevin import KEVIN_STRATEGY_UUID +from shared.schemas.trading import OrderResult, TradeSignal + +logger = logging.getLogger(__name__) + + +# Reasons we DON'T want to spam Slack about. outside_market_hours fires +# every poll when a fresh signal lands after-hours — silencing it keeps +# Slack signal-to-noise high. +_DEFAULT_QUIET = frozenset({"outside_market_hours"}) + + +class SlackNotifier: + def __init__( + self, + webhook_url: str = "", + bot_token: str = "", + channel: str = "", + quiet_rejections: Iterable[str] | None = None, + ) -> None: + self.webhook_url = webhook_url or "" + self.bot_token = bot_token or "" + self.channel = channel or "" + self.quiet_rejections = frozenset( + quiet_rejections if quiet_rejections is not None else _DEFAULT_QUIET + ) + + @property + def enabled(self) -> bool: + # Either transport must be fully configured. + if self.bot_token and self.channel: + return True + if self.webhook_url: + return True + return False + + @property + def uses_bot_token(self) -> bool: + return bool(self.bot_token and self.channel) + + async def notify_trade(self, signal: TradeSignal, result: OrderResult) -> None: + if not self.enabled: + return + text = self._format_trade(signal, result) + await self._post(text) + + async def notify_rejection(self, signal: TradeSignal, reason: str) -> None: + if not self.enabled: + return + if reason in self.quiet_rejections: + return + text = self._format_rejection(signal, reason) + await self._post(text) + + async def notify_deferred(self, signal: TradeSignal, target_ts) -> None: + if not self.enabled: + return + tag = self._strategy_tag(signal) + when = target_ts.strftime("%a %H:%M UTC") if target_ts else "?" + text = ( + f":clock3: *{tag}*: DEFERRED {signal.ticker} until {when} " + f"(market closed; conviction {signal.strength:.2f})" + ) + await self._post(text) + + async def notify_close( + self, + *, + ticker: str, + qty: float, + price: float, + pnl: float, + strategy_id: UUID | None, + reason: str, + ) -> None: + """Position close booked outside the executor (e.g. a bracket + stop-loss / take-profit leg that filled at the broker).""" + if not self.enabled: + return + tag = "Meet Kevin" if strategy_id == KEVIN_STRATEGY_UUID else "trading-bot" + emoji = ":moneybag:" if pnl >= 0 else ":small_red_triangle_down:" + text = ( + f"{emoji} *{tag}*: CLOSED {qty:g} {ticker} @ ${price:.2f} — " + f"P&L {'+' if pnl >= 0 else '-'}${abs(pnl):.2f} ({reason})" + ) + await self._post(text) + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _strategy_tag(self, signal: TradeSignal) -> str: + if signal.strategy_id == KEVIN_STRATEGY_UUID: + return "Meet Kevin" + return "trading-bot" + + def _format_trade(self, signal: TradeSignal, result: OrderResult) -> str: + tag = self._strategy_tag(signal) + price = ( + f"${result.filled_price:.2f}" + if result.filled_price is not None + else "—" + ) + return ( + f":chart_with_upwards_trend: *{tag}*: " + f"{result.side.value} {result.qty:g} {result.ticker} @ {price} " + f"(conviction {signal.strength:.2f}, status {result.status.value})" + ) + + def _format_rejection(self, signal: TradeSignal, reason: str) -> str: + tag = self._strategy_tag(signal) + return ( + f":no_entry: *{tag}*: REJECTED {signal.ticker} — {reason} " + f"(conviction {signal.strength:.2f})" + ) + + async def _post(self, text: str) -> None: + try: + async with httpx.AsyncClient(timeout=5.0) as client: + if self.uses_bot_token: + resp = await client.post( + "https://slack.com/api/chat.postMessage", + headers={ + "Authorization": f"Bearer {self.bot_token}", + "Content-Type": "application/json; charset=utf-8", + }, + json={"channel": self.channel, "text": text}, + ) + body = resp.json() + if not body.get("ok"): + logger.warning( + "Slack chat.postMessage refused: %s (channel=%s)", + body.get("error"), + self.channel, + ) + else: + await client.post(self.webhook_url, json={"text": text}) + except Exception as exc: + logger.warning("Slack post failed (swallowed): %s", exc) diff --git a/tests/services/test_trade_reconcile.py b/tests/services/test_trade_reconcile.py index 5880d9f..8afe5b6 100644 --- a/tests/services/test_trade_reconcile.py +++ b/tests/services/test_trade_reconcile.py @@ -257,6 +257,60 @@ class TestReconcileBooksClose: assert session.added == [] + async def test_booked_close_notifies_slack(self) -> None: + """A reconcile-booked close must post to Slack with realized P&L.""" + entry = _entry_trade(broker_order_id="parent-1", price=100.0, qty=10.0) + tp = _leg(order_id="tp-1", status=OrderStatus.PENDING, filled_price=None) + sl = _leg(order_id="sl-1", status=OrderStatus.FILLED, filled_price=92.0) + broker = AsyncMock() + broker.get_order = AsyncMock( + return_value=_bracket(parent_id="parent-1", legs=[tp, sl]) + ) + session = _FakeSession([entry]) + notifier = AsyncMock() + + await reconcile_once(broker, _factory(session), notifier=notifier) + + notifier.notify_close.assert_awaited_once() + kwargs = notifier.notify_close.await_args.kwargs + assert kwargs["ticker"] == "NVDA" + assert kwargs["qty"] == 10.0 + assert kwargs["price"] == 92.0 + assert kwargs["pnl"] == pytest.approx(-80.0) + assert kwargs["strategy_id"] == KEVIN_STRATEGY_UUID + + async def test_no_notification_when_nothing_booked(self) -> None: + entry = _entry_trade(broker_order_id="parent-1") + tp = _leg(order_id="tp-1", status=OrderStatus.PENDING, filled_price=None) + sl = _leg(order_id="sl-1", status=OrderStatus.PENDING, filled_price=None) + broker = AsyncMock() + broker.get_order = AsyncMock( + return_value=_bracket(parent_id="parent-1", legs=[tp, sl]) + ) + session = _FakeSession([entry]) + notifier = AsyncMock() + + await reconcile_once(broker, _factory(session), notifier=notifier) + + notifier.notify_close.assert_not_awaited() + + async def test_notifier_failure_does_not_block_booking(self) -> None: + """Slack is an observer — its failure must not lose the Trade row.""" + entry = _entry_trade(broker_order_id="parent-1", price=100.0, qty=10.0) + sl = _leg(order_id="sl-1", status=OrderStatus.FILLED, filled_price=92.0) + broker = AsyncMock() + broker.get_order = AsyncMock( + return_value=_bracket(parent_id="parent-1", legs=[sl]) + ) + session = _FakeSession([entry]) + notifier = AsyncMock() + notifier.notify_close = AsyncMock(side_effect=RuntimeError("slack down")) + + await reconcile_once(broker, _factory(session), notifier=notifier) + + assert len(session.added) == 1 + assert session.committed + # --------------------------------------------------------------------------- # Idempotency @@ -405,7 +459,7 @@ class TestReconcileLoop: cfg = _config() calls = 0 - async def boom(broker, sf): # noqa: ANN001 + async def boom(broker, sf, notifier=None): # noqa: ANN001 nonlocal calls calls += 1 if calls == 1: diff --git a/tests/shared/test_slack_notifier.py b/tests/shared/test_slack_notifier.py new file mode 100644 index 0000000..9d732c5 --- /dev/null +++ b/tests/shared/test_slack_notifier.py @@ -0,0 +1,243 @@ +"""Tests for the shared SlackNotifier (trade-executor + reconcile loop).""" + +from __future__ import annotations + +from datetime import datetime, timezone +from unittest.mock import AsyncMock, patch +from uuid import uuid4 + +import pytest + +from shared.slack_notifier import SlackNotifier +from shared.constants.kevin import KEVIN_STRATEGY_UUID +from shared.schemas.trading import ( + OrderResult, + OrderSide, + OrderStatus, + SignalDirection, + TradeSignal, +) + + +def _signal(strategy_id=None, target_dollars=None) -> TradeSignal: + return TradeSignal( + ticker="NVDA", + direction=SignalDirection.LONG, + strength=0.85, + strategy_sources=["meet_kevin:buy:0.85"], + timestamp=datetime.now(timezone.utc), + strategy_id=strategy_id, + target_dollars=target_dollars, + ) + + +def _filled_order(ticker="NVDA", qty=10, price=217.50) -> OrderResult: + return OrderResult( + order_id=str(uuid4()), + ticker=ticker, + side=OrderSide.BUY, + qty=qty, + filled_price=price, + status=OrderStatus.FILLED, + timestamp=datetime.now(timezone.utc), + ) + + +class TestSlackNotifierNoWebhook: + """Empty webhook_url -> notifier is a no-op (returns without raising).""" + + @pytest.mark.asyncio + async def test_notify_trade_noop(self): + notifier = SlackNotifier(webhook_url="") + # should not raise even with no mock + await notifier.notify_trade(_signal(), _filled_order()) + + @pytest.mark.asyncio + async def test_notify_rejection_noop(self): + notifier = SlackNotifier(webhook_url="") + await notifier.notify_rejection(_signal(), "outside_market_hours") + + +class TestSlackNotifierTradePost: + @pytest.mark.asyncio + async def test_trade_post_calls_webhook(self): + notifier = SlackNotifier(webhook_url="https://hooks.slack.test/abc") + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_post = AsyncMock(return_value=AsyncMock(status_code=200)) + mock_client.post = mock_post + mock_client_cls.return_value.__aenter__.return_value = mock_client + await notifier.notify_trade( + _signal(strategy_id=KEVIN_STRATEGY_UUID), + _filled_order(qty=10, price=217.50), + ) + mock_post.assert_called_once() + url, kwargs = mock_post.call_args.args[0], mock_post.call_args.kwargs + assert url == "https://hooks.slack.test/abc" + payload = kwargs["json"] + assert "NVDA" in payload["text"] + assert "10" in payload["text"] + assert "217.50" in payload["text"] + + @pytest.mark.asyncio + async def test_trade_post_strategy_tag_when_kevin(self): + notifier = SlackNotifier(webhook_url="https://hooks.slack.test/abc") + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=AsyncMock(status_code=200)) + mock_client_cls.return_value.__aenter__.return_value = mock_client + await notifier.notify_trade( + _signal(strategy_id=KEVIN_STRATEGY_UUID), + _filled_order(), + ) + payload = mock_client.post.call_args.kwargs["json"] + assert "Meet Kevin" in payload["text"] + + @pytest.mark.asyncio + async def test_trade_post_swallows_http_errors(self): + """A failed Slack post must NOT bubble up — the trade already + happened; we shouldn't crash the consumer loop because Slack is + having a bad day.""" + notifier = SlackNotifier(webhook_url="https://hooks.slack.test/abc") + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.post = AsyncMock(side_effect=Exception("network down")) + mock_client_cls.return_value.__aenter__.return_value = mock_client + # should NOT raise + await notifier.notify_trade(_signal(), _filled_order()) + + +class TestSlackNotifierBotToken: + @pytest.mark.asyncio + async def test_bot_token_calls_chat_postmessage(self): + notifier = SlackNotifier(bot_token="xoxb-test", channel="trading-bot") + assert notifier.uses_bot_token + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_resp = AsyncMock() + mock_resp.json = lambda: {"ok": True, "ts": "1.2"} + mock_client.post = AsyncMock(return_value=mock_resp) + mock_client_cls.return_value.__aenter__.return_value = mock_client + await notifier.notify_trade(_signal(), _filled_order()) + url = mock_client.post.call_args.args[0] + assert url == "https://slack.com/api/chat.postMessage" + kwargs = mock_client.post.call_args.kwargs + assert kwargs["headers"]["Authorization"] == "Bearer xoxb-test" + body = kwargs["json"] + assert body["channel"] == "trading-bot" + assert "NVDA" in body["text"] + + @pytest.mark.asyncio + async def test_bot_token_swallows_channel_not_found(self): + """When the user hasn't created #trading-bot yet, the API returns + ok=false / error=channel_not_found. We log and continue.""" + notifier = SlackNotifier(bot_token="xoxb-test", channel="nonexistent") + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_resp = AsyncMock() + mock_resp.json = lambda: {"ok": False, "error": "channel_not_found"} + mock_client.post = AsyncMock(return_value=mock_resp) + mock_client_cls.return_value.__aenter__.return_value = mock_client + # should not raise + await notifier.notify_trade(_signal(), _filled_order()) + + @pytest.mark.asyncio + async def test_bot_token_wins_when_both_set(self): + notifier = SlackNotifier( + webhook_url="https://hooks.slack.test/abc", + bot_token="xoxb-test", + channel="trading-bot", + ) + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=AsyncMock(json=lambda: {"ok": True})) + mock_client_cls.return_value.__aenter__.return_value = mock_client + await notifier.notify_trade(_signal(), _filled_order()) + assert mock_client.post.call_args.args[0] == "https://slack.com/api/chat.postMessage" + + +class TestSlackNotifierRejectionPost: + @pytest.mark.asyncio + async def test_rejection_post_calls_webhook(self): + notifier = SlackNotifier(webhook_url="https://hooks.slack.test/abc") + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=AsyncMock(status_code=200)) + mock_client_cls.return_value.__aenter__.return_value = mock_client + await notifier.notify_rejection( + _signal(strategy_id=KEVIN_STRATEGY_UUID), + reason="kevin_daily_trade_cap", + ) + payload = mock_client.post.call_args.kwargs["json"] + assert "REJECTED" in payload["text"] + assert "kevin_daily_trade_cap" in payload["text"] + assert "NVDA" in payload["text"] + + @pytest.mark.asyncio + async def test_rejection_post_skips_noise(self): + """outside_market_hours fires every minute when the bot tries to + trade after-hours — we don't want a Slack barrage. The notifier + skips it.""" + notifier = SlackNotifier( + webhook_url="https://hooks.slack.test/abc", + quiet_rejections={"outside_market_hours"}, + ) + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.post = AsyncMock() + mock_client_cls.return_value.__aenter__.return_value = mock_client + await notifier.notify_rejection(_signal(), "outside_market_hours") + mock_client.post.assert_not_called() + + +class TestSlackNotifierClosePost: + """notify_close — reconcile-booked exits (bracket legs that fill at + Alpaca without passing through the executor) must reach Slack.""" + + @pytest.mark.asyncio + async def test_close_posts_loss_with_pnl(self): + notifier = SlackNotifier(bot_token="xoxb-test", channel="trading-bot") + with patch.object(notifier, "_post", new=AsyncMock()) as post: + await notifier.notify_close( + ticker="MRVL", + qty=9, + price=263.61, + pnl=-243.87, + strategy_id=KEVIN_STRATEGY_UUID, + reason="bracket leg filled", + ) + text = post.call_args[0][0] + assert "Meet Kevin" in text + assert "MRVL" in text + assert "263.61" in text + assert "-$243.87" in text + assert "bracket leg filled" in text + + @pytest.mark.asyncio + async def test_close_posts_win_with_positive_pnl(self): + notifier = SlackNotifier(bot_token="xoxb-test", channel="trading-bot") + with patch.object(notifier, "_post", new=AsyncMock()) as post: + await notifier.notify_close( + ticker="TSM", + qty=6, + price=440.0, + pnl=83.06, + strategy_id=KEVIN_STRATEGY_UUID, + reason="take-profit filled", + ) + text = post.call_args[0][0] + assert "+$83.06" in text + + @pytest.mark.asyncio + async def test_close_noop_when_disabled(self): + notifier = SlackNotifier() + with patch.object(notifier, "_post", new=AsyncMock()) as post: + await notifier.notify_close( + ticker="TSM", + qty=6, + price=440.0, + pnl=1.0, + strategy_id=None, + reason="x", + ) + post.assert_not_called()