From adbd7f3c654b475843748a48a2eed6d0b83a8619 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 24 May 2026 00:59:56 +0000 Subject: [PATCH] feat(kevin_bridge): main orchestrator with dependency injection Composable: cursor/aggregator/strategy/publisher/audit_writer/broker all injected. Master kill-switch (kevin_enable_trading=false) routes to audit-only path. Cursor advances ONLY after XADD succeeds (race fix). Concrete collaborators wired in subsequent tasks. Also extends TradeSignal + SignalDirection.EXIT with the optional fields Kevin paths need (strategy_id, target_dollars, stop_loss_pct, take_profit_pct). --- services/kevin_signal_bridge/__init__.py | 0 services/kevin_signal_bridge/config.py | 63 ++++++ services/kevin_signal_bridge/main.py | 213 ++++++++++++++++++ shared/schemas/trading.py | 12 +- .../services/kevin_signal_bridge/__init__.py | 0 .../services/kevin_signal_bridge/test_main.py | 175 ++++++++++++++ 6 files changed, 461 insertions(+), 2 deletions(-) create mode 100644 services/kevin_signal_bridge/__init__.py create mode 100644 services/kevin_signal_bridge/config.py create mode 100644 services/kevin_signal_bridge/main.py create mode 100644 tests/services/kevin_signal_bridge/__init__.py create mode 100644 tests/services/kevin_signal_bridge/test_main.py diff --git a/services/kevin_signal_bridge/__init__.py b/services/kevin_signal_bridge/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/kevin_signal_bridge/config.py b/services/kevin_signal_bridge/config.py new file mode 100644 index 0000000..3a6d105 --- /dev/null +++ b/services/kevin_signal_bridge/config.py @@ -0,0 +1,63 @@ +"""KevinBridgeConfig — env-var settings for the Kevin signal bridge service. + +All env vars use the TRADING_ prefix consumed by shared.config.BaseConfig. +""" + +from __future__ import annotations + +import json +from typing import Any + +from pydantic import field_validator + +from shared.config import BaseConfig + + +class KevinBridgeConfig(BaseConfig): + """Env-var driven settings for the bridge service.""" + + # Signal translation + kevin_min_conviction: float = 0.60 + kevin_max_mention_age_hours: int = 48 + kevin_hold_days: dict[str, int] = { + "days": 3, + "weeks": 10, + "months": 45, + "long_term": 90, + "unspecified": 10, + } + + # Sizing + kevin_base_position_pct: float = 0.04 + kevin_min_trade_usd: float = 500.0 + kevin_max_trade_usd: float = 5000.0 + kevin_max_per_ticker_usd: float = 7500.0 + + # Exits + kevin_stop_loss_pct: float = 0.08 + kevin_take_profit_pct: float = 0.20 + kevin_avoid_closes_longs: bool = True + kevin_avoid_blocks_days: int = 7 + + # Aggregation + kevin_mention_boost_per_repeat: float = 0.05 + kevin_max_mention_boost: float = 0.20 + + # Risk + kevin_max_position_pct: float = 0.075 + kevin_daily_trade_cap: int = 5 + kevin_daily_alloc_cap_usd: float = 15000.0 + kevin_daily_loss_circuit_pct: float = 0.03 + kevin_equity_drawdown_halt_pct: float = 0.80 + + # Plumbing + kevin_bridge_poll_interval_seconds: int = 60 + kevin_bridge_exit_scan_cron: str = "35 9 * * 1-5" + kevin_enable_trading: bool = False # master kill-switch + + @field_validator("kevin_hold_days", mode="before") + @classmethod + def _parse_hold_days(cls, v: Any) -> Any: + if isinstance(v, str): + return json.loads(v) + return v diff --git a/services/kevin_signal_bridge/main.py b/services/kevin_signal_bridge/main.py new file mode 100644 index 0000000..b2f9930 --- /dev/null +++ b/services/kevin_signal_bridge/main.py @@ -0,0 +1,213 @@ +"""Kevin signal bridge — polls kevin_stock_mentions, calls KevinStrategy, +publishes TradeSignal to signals:generated. + +Kill-switch (kevin_enable_trading=false) writes audit rows but skips +publish. +""" + +from __future__ import annotations + +import asyncio +import logging +from decimal import Decimal +from typing import Any + +from shared.constants.kevin import KEVIN_STRATEGY_UUID +from shared.schemas.kevin import KevinAccountState, KevinDecisionType +from shared.schemas.trading import SignalDirection, TradeSignal + +logger = logging.getLogger(__name__) + + +class KevinBridge: + """End-to-end orchestrator. Composed from injected collaborators + so it's unit-testable. + """ + + def __init__( + self, + config: Any, + cursor: Any, + publisher: Any, + aggregator: Any, + strategy: Any, + audit_writer: Any, + broker: Any, + blocklist: Any = None, + risk_counters: Any = None, + ) -> None: + self.config = config + self.cursor = cursor + self.publisher = publisher + self.aggregator = aggregator + self.strategy = strategy + self.audit_writer = audit_writer + self.broker = broker + self.blocklist = blocklist + self.risk_counters = risk_counters + + async def process_one_pass(self) -> int: + last_seen = await self.cursor.last_seen_id() + pending = await self.aggregator.fetch_pending(since_id=last_seen) + n_processed = 0 + for mention in pending: + try: + published_ok = await self._process_mention(mention) + # Race-fix: only advance cursor when the side-effect actually + # succeeded — for dry-run / no-op flows we also advance, for + # publish failures we do NOT. + if published_ok: + await self.cursor.advance(mention.id) + n_processed += 1 + except Exception: + logger.exception("bridge error on mention %s", mention.id) + return n_processed + + async def _process_mention(self, mention: Any) -> bool: + """Process one mention. Returns True if cursor should advance, + False if publish failed (so we retry next pass).""" + + effective_conviction = getattr( + mention, "effective_conviction", mention.conviction + ) + + account_state = await self._snapshot_account() + is_tradable = await self.broker.is_asset_tradable(mention.symbol) + current_price = await self.broker.get_latest_price(mention.symbol) + + decision = await self.strategy.evaluate_mention( + mention, + account_state, + effective_conviction=effective_conviction, + current_price=current_price, + is_tradable=is_tradable, + ) + + if decision.decision == KevinDecisionType.NO_OP: + status = self._classify_no_op(decision.rationale) + await self.audit_writer.write( + mention_id=mention.id, + bridge_status=status, + effective_conviction=effective_conviction, + signal_id=None, + trade_id=None, + notes=decision.rationale, + ) + return True # cursor advances; nothing to publish + + # Apply blocklist side-effect on AVOID + action_value = getattr(mention.action, "value", mention.action) + if self.blocklist and action_value == "avoid": + await self.blocklist.add( + mention.symbol, ttl_days=self.config.kevin_avoid_blocks_days + ) + + if not self.config.kevin_enable_trading: + await self.audit_writer.write( + mention_id=mention.id, + bridge_status="dry_run", + effective_conviction=effective_conviction, + signal_id=None, + trade_id=None, + notes=f"kill-switch off; would: {decision.rationale}", + ) + return True + + # Publish TradeSignal to Redis Stream — cursor only advances if XADD ok + signal = TradeSignal( + ticker=decision.symbol, + direction=( + SignalDirection.LONG + if decision.decision == KevinDecisionType.OPEN_LONG + else SignalDirection.EXIT + ), + strength=float(decision.effective_conviction or 1.0), + strategy_id=KEVIN_STRATEGY_UUID, + strategy_sources=[ + f"kevin:{action_value}:{effective_conviction}", + ], + target_dollars=decision.target_dollars, + stop_loss_pct=Decimal(str(self.config.kevin_stop_loss_pct)), + take_profit_pct=Decimal(str(self.config.kevin_take_profit_pct)), + ) + + try: + stream_id = await self.publisher.publish(signal) + except Exception: + # Record broker_rejected audit, do NOT advance cursor + await self.audit_writer.write( + mention_id=mention.id, + bridge_status="broker_rejected", + effective_conviction=effective_conviction, + signal_id=signal.signal_id, + trade_id=None, + notes="publish failed; will retry next pass", + ) + return False + + await self.audit_writer.write( + mention_id=mention.id, + bridge_status="emitted", + effective_conviction=effective_conviction, + signal_id=signal.signal_id, + trade_id=None, + notes=f"published to stream as {stream_id}", + ) + return True + + async def _snapshot_account(self) -> KevinAccountState: + acct = await self.broker.get_account() + positions = await self.broker.get_positions() + held = {p.symbol: Decimal(str(getattr(p, "cost_basis", 0))) for p in positions} + blocklist = ( + await self.blocklist.active_set() if self.blocklist else set() + ) + daily_trades = ( + await self.risk_counters.get_daily_trades() if self.risk_counters else 0 + ) + daily_alloc = ( + await self.risk_counters.get_daily_alloc() + if self.risk_counters + else Decimal("0") + ) + return KevinAccountState( + equity_usd=Decimal(str(acct.equity)), + cash_usd=Decimal(str(acct.cash)), + held_positions=held, + blocklisted_symbols=blocklist, + daily_trade_count=daily_trades, + daily_alloc_usd=daily_alloc, + paused=await self._is_paused(), + ) + + async def _is_paused(self) -> bool: + if self.risk_counters: + return bool(await self.risk_counters.is_trading_paused()) + return False + + @staticmethod + def _classify_no_op(rationale: str) -> str: + r = rationale.lower() + if "tradable" in r: + return "skipped_non_tradable" + if "blocklist" in r: + return "skipped_blocklist" + if "cap" in r or "paused" in r or "halt" in r: + return "skipped_caps" + return "deferred" + + +# --- service entry point (Task 11 will fill this in) --- + + +async def run() -> None: + """Boot the bridge with concrete collaborators + main loop. + + Filled in by Task 11 wiring concrete cursor / aggregator / blocklist / + risk_counters / audit_writer. + """ + raise NotImplementedError("Task 11 will wire concrete collaborators") + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/shared/schemas/trading.py b/shared/schemas/trading.py index 64299e3..56cc751 100644 --- a/shared/schemas/trading.py +++ b/shared/schemas/trading.py @@ -1,6 +1,7 @@ """Trading-related Pydantic schemas for Redis Streams messages and API payloads.""" -from datetime import datetime +from datetime import UTC, datetime +from decimal import Decimal from enum import Enum from typing import Any from uuid import UUID, uuid4 @@ -30,6 +31,7 @@ class SignalDirection(str, Enum): LONG = "LONG" SHORT = "SHORT" NEUTRAL = "NEUTRAL" + EXIT = "EXIT" # --------------------------------------------------------------------------- @@ -102,7 +104,13 @@ class TradeSignal(BaseModel): strength: float = Field(ge=0.0, le=1.0) strategy_sources: list[str] sentiment_context: dict[str, Any] | None = None - timestamp: datetime + timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC)) + + # --- Kevin v2 extensions (optional) --- + strategy_id: UUID | None = None + target_dollars: Decimal | None = None + stop_loss_pct: Decimal | None = None + take_profit_pct: Decimal | None = None model_config = {"from_attributes": True} diff --git a/tests/services/kevin_signal_bridge/__init__.py b/tests/services/kevin_signal_bridge/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/services/kevin_signal_bridge/test_main.py b/tests/services/kevin_signal_bridge/test_main.py new file mode 100644 index 0000000..a926302 --- /dev/null +++ b/tests/services/kevin_signal_bridge/test_main.py @@ -0,0 +1,175 @@ +"""Smoke-level orchestrator tests for the Kevin signal bridge.""" + +from decimal import Decimal +from unittest.mock import AsyncMock, MagicMock + +from services.kevin_signal_bridge.main import KevinBridge + + +async def test_bridge_dry_run_writes_audit_does_not_publish(): + """When TRADING_KEVIN_ENABLE_TRADING=false the bridge writes an audit row + and never calls publisher.publish.""" + config = MagicMock(kevin_enable_trading=False, kevin_stop_loss_pct=0.08, kevin_take_profit_pct=0.20, kevin_avoid_blocks_days=7) + cursor = AsyncMock() + cursor.last_seen_id.return_value = 0 + publisher = AsyncMock() + aggregator = AsyncMock() + aggregator.fetch_pending.return_value = [ + MagicMock( + id=1, + symbol="NVDA", + action=MagicMock(value="buy"), + conviction=Decimal("0.8"), + time_horizon=MagicMock(value="weeks"), + ), + ] + strategy = AsyncMock() + strategy.evaluate_mention.return_value = MagicMock( + decision=MagicMock(value="open_long", name="OPEN_LONG"), + symbol="NVDA", + target_dollars=Decimal("3000"), + holding_days=10, + effective_conviction=Decimal("0.8"), + rationale="ok", + ) + # Make the .decision attribute compare against KevinDecisionType.OPEN_LONG + from shared.schemas.kevin import KevinDecisionType + strategy.evaluate_mention.return_value.decision = KevinDecisionType.OPEN_LONG + + audit_writer = AsyncMock() + broker = AsyncMock() + broker.is_asset_tradable.return_value = True + broker.get_latest_price.return_value = Decimal("100") + broker.get_account.return_value = MagicMock( + equity=Decimal("100000"), cash=Decimal("100000") + ) + broker.get_positions.return_value = [] + + bridge = KevinBridge( + config=config, + cursor=cursor, + publisher=publisher, + aggregator=aggregator, + strategy=strategy, + audit_writer=audit_writer, + broker=broker, + ) + await bridge.process_one_pass() + + publisher.publish.assert_not_called() + audit_writer.write.assert_awaited() + args = audit_writer.write.call_args + assert "dry_run" in str(args).lower() + + +async def test_bridge_kill_switch_on_publishes_to_stream(): + config = MagicMock( + kevin_enable_trading=True, + kevin_stop_loss_pct=0.08, + kevin_take_profit_pct=0.20, + kevin_avoid_blocks_days=7, + ) + cursor = AsyncMock() + cursor.last_seen_id.return_value = 0 + publisher = AsyncMock() + publisher.publish.return_value = "1234-0" + aggregator = AsyncMock() + aggregator.fetch_pending.return_value = [ + MagicMock( + id=1, + symbol="NVDA", + action=MagicMock(value="buy"), + conviction=Decimal("0.8"), + effective_conviction=Decimal("0.8"), + time_horizon=MagicMock(value="weeks"), + ), + ] + strategy = AsyncMock() + from shared.schemas.kevin import KevinDecisionType + strategy.evaluate_mention.return_value = MagicMock( + decision=KevinDecisionType.OPEN_LONG, + symbol="NVDA", + target_dollars=Decimal("3000"), + holding_days=10, + effective_conviction=Decimal("0.8"), + rationale="ok", + ) + audit_writer = AsyncMock() + broker = AsyncMock() + broker.is_asset_tradable.return_value = True + broker.get_latest_price.return_value = Decimal("100") + broker.get_account.return_value = MagicMock( + equity=Decimal("100000"), cash=Decimal("100000") + ) + broker.get_positions.return_value = [] + + bridge = KevinBridge( + config=config, + cursor=cursor, + publisher=publisher, + aggregator=aggregator, + strategy=strategy, + audit_writer=audit_writer, + broker=broker, + ) + await bridge.process_one_pass() + + publisher.publish.assert_awaited_once() + cursor.advance.assert_awaited_with(1) + + +async def test_bridge_advances_cursor_only_after_publish(): + """Race-condition guard: cursor must NOT advance if publish raises.""" + config = MagicMock( + kevin_enable_trading=True, + kevin_stop_loss_pct=0.08, + kevin_take_profit_pct=0.20, + kevin_avoid_blocks_days=7, + ) + cursor = AsyncMock() + cursor.last_seen_id.return_value = 0 + publisher = AsyncMock() + publisher.publish.side_effect = RuntimeError("XADD failed") + aggregator = AsyncMock() + aggregator.fetch_pending.return_value = [ + MagicMock( + id=1, + symbol="NVDA", + action=MagicMock(value="buy"), + conviction=Decimal("0.8"), + effective_conviction=Decimal("0.8"), + time_horizon=MagicMock(value="weeks"), + ), + ] + strategy = AsyncMock() + from shared.schemas.kevin import KevinDecisionType + strategy.evaluate_mention.return_value = MagicMock( + decision=KevinDecisionType.OPEN_LONG, + symbol="NVDA", + target_dollars=Decimal("3000"), + holding_days=10, + effective_conviction=Decimal("0.8"), + rationale="ok", + ) + audit_writer = AsyncMock() + broker = AsyncMock() + broker.is_asset_tradable.return_value = True + broker.get_latest_price.return_value = Decimal("100") + broker.get_account.return_value = MagicMock( + equity=Decimal("100000"), cash=Decimal("100000") + ) + broker.get_positions.return_value = [] + + bridge = KevinBridge( + config=config, + cursor=cursor, + publisher=publisher, + aggregator=aggregator, + strategy=strategy, + audit_writer=audit_writer, + broker=broker, + ) + await bridge.process_one_pass() + + # cursor.advance must NOT have been called for mention 1 + cursor.advance.assert_not_called()