feat(trade-executor): Slack notifications on trade + risk-rejection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
SlackNotifier posts a short message to a Slack incoming webhook on:
- trade-executor submits an order (filled or pending)
- RiskManager rejects a signal (except outside_market_hours, which
spams every poll when the bot tries to trade after-hours)
Key properties:
- No-op when slack_webhook_url is empty (fail-soft default).
- HTTP errors are swallowed — a Slack outage MUST NOT crash the
consumer loop; the trade already happened on Alpaca.
- Kevin-strategy signals tagged "Meet Kevin" in the message so I can
tell which strategy fired.
Wiring:
- TradeExecutorConfig.slack_webhook_url + TRADING_SLACK_WEBHOOK_URL
env var, sourced from Vault secret/trading-bot/slack_webhook_url
via existing ExternalSecret.
- SlackNotifier passed to process_signal; both rejection + post-trade
paths call it.
Tests: 7 new (no-op when disabled, post calls webhook with correct
text, Kevin strategy tag, swallows HTTP errors, suppresses noisy
rejections).
This commit is contained in:
parent
35707a5c8a
commit
382188a19b
4 changed files with 261 additions and 1 deletions
|
|
@ -22,4 +22,7 @@ class TradeExecutorConfig(BaseConfig):
|
||||||
kevin_equity_drawdown_halt_pct: float = 0.20 # 20% drawdown → permanent pause
|
kevin_equity_drawdown_halt_pct: float = 0.20 # 20% drawdown → permanent pause
|
||||||
kevin_daily_loss_circuit_pct: float = 0.05 # 5% daily loss → 24h pause
|
kevin_daily_loss_circuit_pct: float = 0.05 # 5% daily loss → 24h pause
|
||||||
|
|
||||||
|
# Slack webhook for per-trade notifications (empty → notifier no-ops).
|
||||||
|
slack_webhook_url: str = ""
|
||||||
|
|
||||||
model_config = {"env_prefix": "TRADING_"}
|
model_config = {"env_prefix": "TRADING_"}
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ from sqlalchemy.ext.asyncio import async_sessionmaker
|
||||||
|
|
||||||
from services.trade_executor.config import TradeExecutorConfig
|
from services.trade_executor.config import TradeExecutorConfig
|
||||||
from services.trade_executor.risk_manager import RiskManager
|
from services.trade_executor.risk_manager import RiskManager
|
||||||
|
from services.trade_executor.slack_notifier import SlackNotifier
|
||||||
from shared.broker.alpaca_broker import AlpacaBroker
|
from shared.broker.alpaca_broker import AlpacaBroker
|
||||||
from shared.db import create_db
|
from shared.db import create_db
|
||||||
from shared.models.trading import Trade as TradeModel
|
from shared.models.trading import Trade as TradeModel
|
||||||
|
|
@ -45,6 +46,7 @@ async def process_signal(
|
||||||
publisher: StreamPublisher,
|
publisher: StreamPublisher,
|
||||||
counters: dict,
|
counters: dict,
|
||||||
db_session_factory: async_sessionmaker | None = None,
|
db_session_factory: async_sessionmaker | None = None,
|
||||||
|
slack_notifier: SlackNotifier | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Process a single trade signal: risk check, order, record, publish.
|
"""Process a single trade signal: risk check, order, record, publish.
|
||||||
|
|
||||||
|
|
@ -68,6 +70,8 @@ async def process_signal(
|
||||||
if not approved:
|
if not approved:
|
||||||
logger.info("Signal REJECTED for %s: %s", signal.ticker, reason)
|
logger.info("Signal REJECTED for %s: %s", signal.ticker, reason)
|
||||||
counters["rejections"].add(1, {"reason": reason.split(" ")[0]})
|
counters["rejections"].add(1, {"reason": reason.split(" ")[0]})
|
||||||
|
if slack_notifier is not None:
|
||||||
|
await slack_notifier.notify_rejection(signal, reason)
|
||||||
return
|
return
|
||||||
|
|
||||||
# --- Step 2: calculate position size ---
|
# --- Step 2: calculate position size ---
|
||||||
|
|
@ -149,6 +153,10 @@ async def process_signal(
|
||||||
result.status.value,
|
result.status.value,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# --- Step 8: notify slack (best-effort, fail-soft) ---
|
||||||
|
if slack_notifier is not None:
|
||||||
|
await slack_notifier.notify_trade(signal, result)
|
||||||
|
|
||||||
|
|
||||||
async def run(config: TradeExecutorConfig | None = None) -> None:
|
async def run(config: TradeExecutorConfig | None = None) -> None:
|
||||||
"""Main service loop.
|
"""Main service loop.
|
||||||
|
|
@ -196,6 +204,11 @@ async def run(config: TradeExecutorConfig | None = None) -> None:
|
||||||
# --- Risk manager ---
|
# --- Risk manager ---
|
||||||
risk_manager = RiskManager(config, broker, redis=redis)
|
risk_manager = RiskManager(config, broker, redis=redis)
|
||||||
|
|
||||||
|
# --- Slack notifier (no-op when slack_webhook_url is empty) ---
|
||||||
|
slack_notifier = SlackNotifier(webhook_url=config.slack_webhook_url)
|
||||||
|
if slack_notifier.enabled:
|
||||||
|
logger.info("Slack notifications enabled")
|
||||||
|
|
||||||
# --- Database (for persisting trades) ---
|
# --- Database (for persisting trades) ---
|
||||||
db_session_factory = None
|
db_session_factory = None
|
||||||
try:
|
try:
|
||||||
|
|
@ -219,7 +232,15 @@ async def run(config: TradeExecutorConfig | None = None) -> None:
|
||||||
break
|
break
|
||||||
try:
|
try:
|
||||||
signal_msg = TradeSignal.model_validate(data)
|
signal_msg = TradeSignal.model_validate(data)
|
||||||
await process_signal(signal_msg, risk_manager, broker, publisher, counters, db_session_factory)
|
await process_signal(
|
||||||
|
signal_msg,
|
||||||
|
risk_manager,
|
||||||
|
broker,
|
||||||
|
publisher,
|
||||||
|
counters,
|
||||||
|
db_session_factory,
|
||||||
|
slack_notifier,
|
||||||
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error processing signal: %s", data)
|
logger.exception("Error processing signal: %s", data)
|
||||||
finally:
|
finally:
|
||||||
|
|
|
||||||
94
services/trade_executor/slack_notifier.py
Normal file
94
services/trade_executor/slack_notifier.py
Normal file
|
|
@ -0,0 +1,94 @@
|
||||||
|
"""Slack webhook notifier for trade-executor.
|
||||||
|
|
||||||
|
Posts a short message on each successful order submit and on
|
||||||
|
notable risk rejections. No-op when the webhook URL is empty.
|
||||||
|
|
||||||
|
Designed to fail-soft: a Slack outage MUST NOT bubble up and crash
|
||||||
|
the consumer loop. The trade has already happened on Alpaca — Slack
|
||||||
|
is a downstream observer, not a transactional dependency.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from shared.constants.kevin import KEVIN_STRATEGY_UUID
|
||||||
|
from shared.schemas.trading import OrderResult, TradeSignal
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# Reasons we DON'T want to spam Slack about. outside_market_hours fires
|
||||||
|
# every poll when a fresh signal lands after-hours — silencing it keeps
|
||||||
|
# Slack signal-to-noise high.
|
||||||
|
_DEFAULT_QUIET = frozenset({"outside_market_hours"})
|
||||||
|
|
||||||
|
|
||||||
|
class SlackNotifier:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
webhook_url: str,
|
||||||
|
quiet_rejections: Iterable[str] | None = None,
|
||||||
|
) -> None:
|
||||||
|
self.webhook_url = webhook_url or ""
|
||||||
|
self.quiet_rejections = frozenset(
|
||||||
|
quiet_rejections if quiet_rejections is not None else _DEFAULT_QUIET
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def enabled(self) -> bool:
|
||||||
|
return bool(self.webhook_url)
|
||||||
|
|
||||||
|
async def notify_trade(self, signal: TradeSignal, result: OrderResult) -> None:
|
||||||
|
if not self.enabled:
|
||||||
|
return
|
||||||
|
text = self._format_trade(signal, result)
|
||||||
|
await self._post(text)
|
||||||
|
|
||||||
|
async def notify_rejection(self, signal: TradeSignal, reason: str) -> None:
|
||||||
|
if not self.enabled:
|
||||||
|
return
|
||||||
|
if reason in self.quiet_rejections:
|
||||||
|
return
|
||||||
|
text = self._format_rejection(signal, reason)
|
||||||
|
await self._post(text)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Internal
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _strategy_tag(self, signal: TradeSignal) -> str:
|
||||||
|
if signal.strategy_id == KEVIN_STRATEGY_UUID:
|
||||||
|
return "Meet Kevin"
|
||||||
|
return "trading-bot"
|
||||||
|
|
||||||
|
def _format_trade(self, signal: TradeSignal, result: OrderResult) -> str:
|
||||||
|
tag = self._strategy_tag(signal)
|
||||||
|
price = (
|
||||||
|
f"${result.filled_price:.2f}"
|
||||||
|
if result.filled_price is not None
|
||||||
|
else "—"
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
f":chart_with_upwards_trend: *{tag}*: "
|
||||||
|
f"{result.side.value} {result.qty:g} {result.ticker} @ {price} "
|
||||||
|
f"(conviction {signal.strength:.2f}, status {result.status.value})"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _format_rejection(self, signal: TradeSignal, reason: str) -> str:
|
||||||
|
tag = self._strategy_tag(signal)
|
||||||
|
return (
|
||||||
|
f":no_entry: *{tag}*: REJECTED {signal.ticker} — {reason} "
|
||||||
|
f"(conviction {signal.strength:.2f})"
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _post(self, text: str) -> None:
|
||||||
|
payload = {"text": text}
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||||
|
await client.post(self.webhook_url, json=payload)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Slack post failed (swallowed): %s", exc)
|
||||||
142
tests/services/trade_executor/test_slack_notifier.py
Normal file
142
tests/services/trade_executor/test_slack_notifier.py
Normal file
|
|
@ -0,0 +1,142 @@
|
||||||
|
"""Tests for the SlackNotifier used by trade-executor."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from decimal import Decimal
|
||||||
|
from unittest.mock import AsyncMock, patch
|
||||||
|
from uuid import UUID, uuid4
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from services.trade_executor.slack_notifier import SlackNotifier
|
||||||
|
from shared.constants.kevin import KEVIN_STRATEGY_UUID
|
||||||
|
from shared.schemas.trading import (
|
||||||
|
OrderResult,
|
||||||
|
OrderSide,
|
||||||
|
OrderStatus,
|
||||||
|
SignalDirection,
|
||||||
|
TradeSignal,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _signal(strategy_id=None, target_dollars=None) -> TradeSignal:
|
||||||
|
return TradeSignal(
|
||||||
|
ticker="NVDA",
|
||||||
|
direction=SignalDirection.LONG,
|
||||||
|
strength=0.85,
|
||||||
|
strategy_sources=["meet_kevin:buy:0.85"],
|
||||||
|
timestamp=datetime.now(timezone.utc),
|
||||||
|
strategy_id=strategy_id,
|
||||||
|
target_dollars=target_dollars,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _filled_order(ticker="NVDA", qty=10, price=217.50) -> OrderResult:
|
||||||
|
return OrderResult(
|
||||||
|
order_id=str(uuid4()),
|
||||||
|
ticker=ticker,
|
||||||
|
side=OrderSide.BUY,
|
||||||
|
qty=qty,
|
||||||
|
filled_price=price,
|
||||||
|
status=OrderStatus.FILLED,
|
||||||
|
timestamp=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSlackNotifierNoWebhook:
|
||||||
|
"""Empty webhook_url -> notifier is a no-op (returns without raising)."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_notify_trade_noop(self):
|
||||||
|
notifier = SlackNotifier(webhook_url="")
|
||||||
|
# should not raise even with no mock
|
||||||
|
await notifier.notify_trade(_signal(), _filled_order())
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_notify_rejection_noop(self):
|
||||||
|
notifier = SlackNotifier(webhook_url="")
|
||||||
|
await notifier.notify_rejection(_signal(), "outside_market_hours")
|
||||||
|
|
||||||
|
|
||||||
|
class TestSlackNotifierTradePost:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_trade_post_calls_webhook(self):
|
||||||
|
notifier = SlackNotifier(webhook_url="https://hooks.slack.test/abc")
|
||||||
|
with patch("httpx.AsyncClient") as mock_client_cls:
|
||||||
|
mock_client = AsyncMock()
|
||||||
|
mock_post = AsyncMock(return_value=AsyncMock(status_code=200))
|
||||||
|
mock_client.post = mock_post
|
||||||
|
mock_client_cls.return_value.__aenter__.return_value = mock_client
|
||||||
|
await notifier.notify_trade(
|
||||||
|
_signal(strategy_id=KEVIN_STRATEGY_UUID),
|
||||||
|
_filled_order(qty=10, price=217.50),
|
||||||
|
)
|
||||||
|
mock_post.assert_called_once()
|
||||||
|
url, kwargs = mock_post.call_args.args[0], mock_post.call_args.kwargs
|
||||||
|
assert url == "https://hooks.slack.test/abc"
|
||||||
|
payload = kwargs["json"]
|
||||||
|
assert "NVDA" in payload["text"]
|
||||||
|
assert "10" in payload["text"]
|
||||||
|
assert "217.50" in payload["text"]
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_trade_post_strategy_tag_when_kevin(self):
|
||||||
|
notifier = SlackNotifier(webhook_url="https://hooks.slack.test/abc")
|
||||||
|
with patch("httpx.AsyncClient") as mock_client_cls:
|
||||||
|
mock_client = AsyncMock()
|
||||||
|
mock_client.post = AsyncMock(return_value=AsyncMock(status_code=200))
|
||||||
|
mock_client_cls.return_value.__aenter__.return_value = mock_client
|
||||||
|
await notifier.notify_trade(
|
||||||
|
_signal(strategy_id=KEVIN_STRATEGY_UUID),
|
||||||
|
_filled_order(),
|
||||||
|
)
|
||||||
|
payload = mock_client.post.call_args.kwargs["json"]
|
||||||
|
assert "Meet Kevin" in payload["text"]
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_trade_post_swallows_http_errors(self):
|
||||||
|
"""A failed Slack post must NOT bubble up — the trade already
|
||||||
|
happened; we shouldn't crash the consumer loop because Slack is
|
||||||
|
having a bad day."""
|
||||||
|
notifier = SlackNotifier(webhook_url="https://hooks.slack.test/abc")
|
||||||
|
with patch("httpx.AsyncClient") as mock_client_cls:
|
||||||
|
mock_client = AsyncMock()
|
||||||
|
mock_client.post = AsyncMock(side_effect=Exception("network down"))
|
||||||
|
mock_client_cls.return_value.__aenter__.return_value = mock_client
|
||||||
|
# should NOT raise
|
||||||
|
await notifier.notify_trade(_signal(), _filled_order())
|
||||||
|
|
||||||
|
|
||||||
|
class TestSlackNotifierRejectionPost:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rejection_post_calls_webhook(self):
|
||||||
|
notifier = SlackNotifier(webhook_url="https://hooks.slack.test/abc")
|
||||||
|
with patch("httpx.AsyncClient") as mock_client_cls:
|
||||||
|
mock_client = AsyncMock()
|
||||||
|
mock_client.post = AsyncMock(return_value=AsyncMock(status_code=200))
|
||||||
|
mock_client_cls.return_value.__aenter__.return_value = mock_client
|
||||||
|
await notifier.notify_rejection(
|
||||||
|
_signal(strategy_id=KEVIN_STRATEGY_UUID),
|
||||||
|
reason="kevin_daily_trade_cap",
|
||||||
|
)
|
||||||
|
payload = mock_client.post.call_args.kwargs["json"]
|
||||||
|
assert "REJECTED" in payload["text"]
|
||||||
|
assert "kevin_daily_trade_cap" in payload["text"]
|
||||||
|
assert "NVDA" in payload["text"]
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rejection_post_skips_noise(self):
|
||||||
|
"""outside_market_hours fires every minute when the bot tries to
|
||||||
|
trade after-hours — we don't want a Slack barrage. The notifier
|
||||||
|
skips it."""
|
||||||
|
notifier = SlackNotifier(
|
||||||
|
webhook_url="https://hooks.slack.test/abc",
|
||||||
|
quiet_rejections={"outside_market_hours"},
|
||||||
|
)
|
||||||
|
with patch("httpx.AsyncClient") as mock_client_cls:
|
||||||
|
mock_client = AsyncMock()
|
||||||
|
mock_client.post = AsyncMock()
|
||||||
|
mock_client_cls.return_value.__aenter__.return_value = mock_client
|
||||||
|
await notifier.notify_rejection(_signal(), "outside_market_hours")
|
||||||
|
mock_client.post.assert_not_called()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue