trading/services/trade_executor/slack_notifier.py
Viktor Barzin 065b634b99
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
feat(trade-executor): Slack bot-token transport + semver image tags
Two changes that ship together so a single CI run lands both:

1) SlackNotifier — support bot-token + channel transport
   - Previous version only supported a pinned webhook URL.
   - New mode uses chat.postMessage with bot_token + channel.
   - Channel can be changed via env var without rotating webhooks.
   - bot-token transport wins when both are set.
   - Fail-soft: ok=false (e.g. channel_not_found if the user
     hasn't created #trading-bot yet) is logged + skipped, not
     raised.
   - 5 new tests (10 total): bot-token wins, channel_not_found
     swallowed, headers/payload shape verified.

2) Image tags — switch from :${CI_PIPELINE_NUMBER} → :0.1.${N}
   - 3-part semver so Keel patch policy (cluster-wide default
     in inject-keel-annotations) is bounded to patch bumps
     within 0.1.x. Prior 1-part tags (:53) were technically
     parseable as major-only, which Keel patch wouldn't bump
     but could still resolve oddly under digest tracking.
   - Memory id=1935 documents Keel patch ≠ bulletproof for
     non-semver; semver tags are the safer mode.
   - update-deployment + verify-deploy steps updated to match.
   - :latest still pushed for cache-from + bootstrap.
2026-05-27 10:06:49 +00:00

130 lines
4.5 KiB
Python

"""Slack notifier for trade-executor.
Supports two transports, picked by what's configured:
1. **Bot token + channel** (preferred) — uses chat.postMessage. Channel
can be changed via env var without redeploying the Slack app or
rotating webhook URLs.
2. **Webhook URL** (legacy) — single-channel, pinned at webhook
creation time.
If both are set, the bot-token path wins. If neither, the notifier
is a no-op.
Designed to fail-soft: a Slack outage MUST NOT bubble up and crash
the consumer loop. The trade has already happened on Alpaca — Slack
is a downstream observer, not a transactional dependency.
"""
from __future__ import annotations
import logging
from typing import Iterable
import httpx
from shared.constants.kevin import KEVIN_STRATEGY_UUID
from shared.schemas.trading import OrderResult, TradeSignal
logger = logging.getLogger(__name__)
# Reasons we DON'T want to spam Slack about. outside_market_hours fires
# every poll when a fresh signal lands after-hours — silencing it keeps
# Slack signal-to-noise high.
_DEFAULT_QUIET = frozenset({"outside_market_hours"})
class SlackNotifier:
def __init__(
self,
webhook_url: str = "",
bot_token: str = "",
channel: str = "",
quiet_rejections: Iterable[str] | None = None,
) -> None:
self.webhook_url = webhook_url or ""
self.bot_token = bot_token or ""
self.channel = channel or ""
self.quiet_rejections = frozenset(
quiet_rejections if quiet_rejections is not None else _DEFAULT_QUIET
)
@property
def enabled(self) -> bool:
# Either transport must be fully configured.
if self.bot_token and self.channel:
return True
if self.webhook_url:
return True
return False
@property
def uses_bot_token(self) -> bool:
return bool(self.bot_token and self.channel)
async def notify_trade(self, signal: TradeSignal, result: OrderResult) -> None:
if not self.enabled:
return
text = self._format_trade(signal, result)
await self._post(text)
async def notify_rejection(self, signal: TradeSignal, reason: str) -> None:
if not self.enabled:
return
if reason in self.quiet_rejections:
return
text = self._format_rejection(signal, reason)
await self._post(text)
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _strategy_tag(self, signal: TradeSignal) -> str:
if signal.strategy_id == KEVIN_STRATEGY_UUID:
return "Meet Kevin"
return "trading-bot"
def _format_trade(self, signal: TradeSignal, result: OrderResult) -> str:
tag = self._strategy_tag(signal)
price = (
f"${result.filled_price:.2f}"
if result.filled_price is not None
else ""
)
return (
f":chart_with_upwards_trend: *{tag}*: "
f"{result.side.value} {result.qty:g} {result.ticker} @ {price} "
f"(conviction {signal.strength:.2f}, status {result.status.value})"
)
def _format_rejection(self, signal: TradeSignal, reason: str) -> str:
tag = self._strategy_tag(signal)
return (
f":no_entry: *{tag}*: REJECTED {signal.ticker}{reason} "
f"(conviction {signal.strength:.2f})"
)
async def _post(self, text: str) -> None:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
if self.uses_bot_token:
resp = await client.post(
"https://slack.com/api/chat.postMessage",
headers={
"Authorization": f"Bearer {self.bot_token}",
"Content-Type": "application/json; charset=utf-8",
},
json={"channel": self.channel, "text": text},
)
body = resp.json()
if not body.get("ok"):
logger.warning(
"Slack chat.postMessage refused: %s (channel=%s)",
body.get("error"),
self.channel,
)
else:
await client.post(self.webhook_url, json={"text": text})
except Exception as exc:
logger.warning("Slack post failed (swallowed): %s", exc)