feat(kevin_bridge): main orchestrator with dependency injection
Some checks failed
ci/woodpecker/push/woodpecker Pipeline was canceled

Composable: cursor/aggregator/strategy/publisher/audit_writer/broker
all injected. Master kill-switch (kevin_enable_trading=false) routes to
audit-only path. Cursor advances ONLY after XADD succeeds (race fix).
Concrete collaborators wired in subsequent tasks.

Also extends TradeSignal + SignalDirection.EXIT with the optional
fields Kevin paths need (strategy_id, target_dollars, stop_loss_pct,
take_profit_pct).
This commit is contained in:
Viktor Barzin 2026-05-24 00:59:56 +00:00
parent cd75c4ab7e
commit adbd7f3c65
6 changed files with 461 additions and 2 deletions

View file

View file

@ -0,0 +1,63 @@
"""KevinBridgeConfig — env-var settings for the Kevin signal bridge service.
All env vars use the TRADING_ prefix consumed by shared.config.BaseConfig.
"""
from __future__ import annotations
import json
from typing import Any
from pydantic import field_validator
from shared.config import BaseConfig
class KevinBridgeConfig(BaseConfig):
"""Env-var driven settings for the bridge service."""
# Signal translation
kevin_min_conviction: float = 0.60
kevin_max_mention_age_hours: int = 48
kevin_hold_days: dict[str, int] = {
"days": 3,
"weeks": 10,
"months": 45,
"long_term": 90,
"unspecified": 10,
}
# Sizing
kevin_base_position_pct: float = 0.04
kevin_min_trade_usd: float = 500.0
kevin_max_trade_usd: float = 5000.0
kevin_max_per_ticker_usd: float = 7500.0
# Exits
kevin_stop_loss_pct: float = 0.08
kevin_take_profit_pct: float = 0.20
kevin_avoid_closes_longs: bool = True
kevin_avoid_blocks_days: int = 7
# Aggregation
kevin_mention_boost_per_repeat: float = 0.05
kevin_max_mention_boost: float = 0.20
# Risk
kevin_max_position_pct: float = 0.075
kevin_daily_trade_cap: int = 5
kevin_daily_alloc_cap_usd: float = 15000.0
kevin_daily_loss_circuit_pct: float = 0.03
kevin_equity_drawdown_halt_pct: float = 0.80
# Plumbing
kevin_bridge_poll_interval_seconds: int = 60
kevin_bridge_exit_scan_cron: str = "35 9 * * 1-5"
kevin_enable_trading: bool = False # master kill-switch
@field_validator("kevin_hold_days", mode="before")
@classmethod
def _parse_hold_days(cls, v: Any) -> Any:
if isinstance(v, str):
return json.loads(v)
return v

View file

@ -0,0 +1,213 @@
"""Kevin signal bridge — polls kevin_stock_mentions, calls KevinStrategy,
publishes TradeSignal to signals:generated.
Kill-switch (kevin_enable_trading=false) writes audit rows but skips
publish.
"""
from __future__ import annotations
import asyncio
import logging
from decimal import Decimal
from typing import Any
from shared.constants.kevin import KEVIN_STRATEGY_UUID
from shared.schemas.kevin import KevinAccountState, KevinDecisionType
from shared.schemas.trading import SignalDirection, TradeSignal
logger = logging.getLogger(__name__)
class KevinBridge:
"""End-to-end orchestrator. Composed from injected collaborators
so it's unit-testable.
"""
def __init__(
self,
config: Any,
cursor: Any,
publisher: Any,
aggregator: Any,
strategy: Any,
audit_writer: Any,
broker: Any,
blocklist: Any = None,
risk_counters: Any = None,
) -> None:
self.config = config
self.cursor = cursor
self.publisher = publisher
self.aggregator = aggregator
self.strategy = strategy
self.audit_writer = audit_writer
self.broker = broker
self.blocklist = blocklist
self.risk_counters = risk_counters
async def process_one_pass(self) -> int:
last_seen = await self.cursor.last_seen_id()
pending = await self.aggregator.fetch_pending(since_id=last_seen)
n_processed = 0
for mention in pending:
try:
published_ok = await self._process_mention(mention)
# Race-fix: only advance cursor when the side-effect actually
# succeeded — for dry-run / no-op flows we also advance, for
# publish failures we do NOT.
if published_ok:
await self.cursor.advance(mention.id)
n_processed += 1
except Exception:
logger.exception("bridge error on mention %s", mention.id)
return n_processed
async def _process_mention(self, mention: Any) -> bool:
"""Process one mention. Returns True if cursor should advance,
False if publish failed (so we retry next pass)."""
effective_conviction = getattr(
mention, "effective_conviction", mention.conviction
)
account_state = await self._snapshot_account()
is_tradable = await self.broker.is_asset_tradable(mention.symbol)
current_price = await self.broker.get_latest_price(mention.symbol)
decision = await self.strategy.evaluate_mention(
mention,
account_state,
effective_conviction=effective_conviction,
current_price=current_price,
is_tradable=is_tradable,
)
if decision.decision == KevinDecisionType.NO_OP:
status = self._classify_no_op(decision.rationale)
await self.audit_writer.write(
mention_id=mention.id,
bridge_status=status,
effective_conviction=effective_conviction,
signal_id=None,
trade_id=None,
notes=decision.rationale,
)
return True # cursor advances; nothing to publish
# Apply blocklist side-effect on AVOID
action_value = getattr(mention.action, "value", mention.action)
if self.blocklist and action_value == "avoid":
await self.blocklist.add(
mention.symbol, ttl_days=self.config.kevin_avoid_blocks_days
)
if not self.config.kevin_enable_trading:
await self.audit_writer.write(
mention_id=mention.id,
bridge_status="dry_run",
effective_conviction=effective_conviction,
signal_id=None,
trade_id=None,
notes=f"kill-switch off; would: {decision.rationale}",
)
return True
# Publish TradeSignal to Redis Stream — cursor only advances if XADD ok
signal = TradeSignal(
ticker=decision.symbol,
direction=(
SignalDirection.LONG
if decision.decision == KevinDecisionType.OPEN_LONG
else SignalDirection.EXIT
),
strength=float(decision.effective_conviction or 1.0),
strategy_id=KEVIN_STRATEGY_UUID,
strategy_sources=[
f"kevin:{action_value}:{effective_conviction}",
],
target_dollars=decision.target_dollars,
stop_loss_pct=Decimal(str(self.config.kevin_stop_loss_pct)),
take_profit_pct=Decimal(str(self.config.kevin_take_profit_pct)),
)
try:
stream_id = await self.publisher.publish(signal)
except Exception:
# Record broker_rejected audit, do NOT advance cursor
await self.audit_writer.write(
mention_id=mention.id,
bridge_status="broker_rejected",
effective_conviction=effective_conviction,
signal_id=signal.signal_id,
trade_id=None,
notes="publish failed; will retry next pass",
)
return False
await self.audit_writer.write(
mention_id=mention.id,
bridge_status="emitted",
effective_conviction=effective_conviction,
signal_id=signal.signal_id,
trade_id=None,
notes=f"published to stream as {stream_id}",
)
return True
async def _snapshot_account(self) -> KevinAccountState:
acct = await self.broker.get_account()
positions = await self.broker.get_positions()
held = {p.symbol: Decimal(str(getattr(p, "cost_basis", 0))) for p in positions}
blocklist = (
await self.blocklist.active_set() if self.blocklist else set()
)
daily_trades = (
await self.risk_counters.get_daily_trades() if self.risk_counters else 0
)
daily_alloc = (
await self.risk_counters.get_daily_alloc()
if self.risk_counters
else Decimal("0")
)
return KevinAccountState(
equity_usd=Decimal(str(acct.equity)),
cash_usd=Decimal(str(acct.cash)),
held_positions=held,
blocklisted_symbols=blocklist,
daily_trade_count=daily_trades,
daily_alloc_usd=daily_alloc,
paused=await self._is_paused(),
)
async def _is_paused(self) -> bool:
if self.risk_counters:
return bool(await self.risk_counters.is_trading_paused())
return False
@staticmethod
def _classify_no_op(rationale: str) -> str:
r = rationale.lower()
if "tradable" in r:
return "skipped_non_tradable"
if "blocklist" in r:
return "skipped_blocklist"
if "cap" in r or "paused" in r or "halt" in r:
return "skipped_caps"
return "deferred"
# --- service entry point (Task 11 will fill this in) ---
async def run() -> None:
"""Boot the bridge with concrete collaborators + main loop.
Filled in by Task 11 wiring concrete cursor / aggregator / blocklist /
risk_counters / audit_writer.
"""
raise NotImplementedError("Task 11 will wire concrete collaborators")
if __name__ == "__main__":
asyncio.run(run())

View file

@ -1,6 +1,7 @@
"""Trading-related Pydantic schemas for Redis Streams messages and API payloads."""
from datetime import datetime
from datetime import UTC, datetime
from decimal import Decimal
from enum import Enum
from typing import Any
from uuid import UUID, uuid4
@ -30,6 +31,7 @@ class SignalDirection(str, Enum):
LONG = "LONG"
SHORT = "SHORT"
NEUTRAL = "NEUTRAL"
EXIT = "EXIT"
# ---------------------------------------------------------------------------
@ -102,7 +104,13 @@ class TradeSignal(BaseModel):
strength: float = Field(ge=0.0, le=1.0)
strategy_sources: list[str]
sentiment_context: dict[str, Any] | None = None
timestamp: datetime
timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC))
# --- Kevin v2 extensions (optional) ---
strategy_id: UUID | None = None
target_dollars: Decimal | None = None
stop_loss_pct: Decimal | None = None
take_profit_pct: Decimal | None = None
model_config = {"from_attributes": True}

View file

@ -0,0 +1,175 @@
"""Smoke-level orchestrator tests for the Kevin signal bridge."""
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
from services.kevin_signal_bridge.main import KevinBridge
async def test_bridge_dry_run_writes_audit_does_not_publish():
"""When TRADING_KEVIN_ENABLE_TRADING=false the bridge writes an audit row
and never calls publisher.publish."""
config = MagicMock(kevin_enable_trading=False, kevin_stop_loss_pct=0.08, kevin_take_profit_pct=0.20, kevin_avoid_blocks_days=7)
cursor = AsyncMock()
cursor.last_seen_id.return_value = 0
publisher = AsyncMock()
aggregator = AsyncMock()
aggregator.fetch_pending.return_value = [
MagicMock(
id=1,
symbol="NVDA",
action=MagicMock(value="buy"),
conviction=Decimal("0.8"),
time_horizon=MagicMock(value="weeks"),
),
]
strategy = AsyncMock()
strategy.evaluate_mention.return_value = MagicMock(
decision=MagicMock(value="open_long", name="OPEN_LONG"),
symbol="NVDA",
target_dollars=Decimal("3000"),
holding_days=10,
effective_conviction=Decimal("0.8"),
rationale="ok",
)
# Make the .decision attribute compare against KevinDecisionType.OPEN_LONG
from shared.schemas.kevin import KevinDecisionType
strategy.evaluate_mention.return_value.decision = KevinDecisionType.OPEN_LONG
audit_writer = AsyncMock()
broker = AsyncMock()
broker.is_asset_tradable.return_value = True
broker.get_latest_price.return_value = Decimal("100")
broker.get_account.return_value = MagicMock(
equity=Decimal("100000"), cash=Decimal("100000")
)
broker.get_positions.return_value = []
bridge = KevinBridge(
config=config,
cursor=cursor,
publisher=publisher,
aggregator=aggregator,
strategy=strategy,
audit_writer=audit_writer,
broker=broker,
)
await bridge.process_one_pass()
publisher.publish.assert_not_called()
audit_writer.write.assert_awaited()
args = audit_writer.write.call_args
assert "dry_run" in str(args).lower()
async def test_bridge_kill_switch_on_publishes_to_stream():
config = MagicMock(
kevin_enable_trading=True,
kevin_stop_loss_pct=0.08,
kevin_take_profit_pct=0.20,
kevin_avoid_blocks_days=7,
)
cursor = AsyncMock()
cursor.last_seen_id.return_value = 0
publisher = AsyncMock()
publisher.publish.return_value = "1234-0"
aggregator = AsyncMock()
aggregator.fetch_pending.return_value = [
MagicMock(
id=1,
symbol="NVDA",
action=MagicMock(value="buy"),
conviction=Decimal("0.8"),
effective_conviction=Decimal("0.8"),
time_horizon=MagicMock(value="weeks"),
),
]
strategy = AsyncMock()
from shared.schemas.kevin import KevinDecisionType
strategy.evaluate_mention.return_value = MagicMock(
decision=KevinDecisionType.OPEN_LONG,
symbol="NVDA",
target_dollars=Decimal("3000"),
holding_days=10,
effective_conviction=Decimal("0.8"),
rationale="ok",
)
audit_writer = AsyncMock()
broker = AsyncMock()
broker.is_asset_tradable.return_value = True
broker.get_latest_price.return_value = Decimal("100")
broker.get_account.return_value = MagicMock(
equity=Decimal("100000"), cash=Decimal("100000")
)
broker.get_positions.return_value = []
bridge = KevinBridge(
config=config,
cursor=cursor,
publisher=publisher,
aggregator=aggregator,
strategy=strategy,
audit_writer=audit_writer,
broker=broker,
)
await bridge.process_one_pass()
publisher.publish.assert_awaited_once()
cursor.advance.assert_awaited_with(1)
async def test_bridge_advances_cursor_only_after_publish():
"""Race-condition guard: cursor must NOT advance if publish raises."""
config = MagicMock(
kevin_enable_trading=True,
kevin_stop_loss_pct=0.08,
kevin_take_profit_pct=0.20,
kevin_avoid_blocks_days=7,
)
cursor = AsyncMock()
cursor.last_seen_id.return_value = 0
publisher = AsyncMock()
publisher.publish.side_effect = RuntimeError("XADD failed")
aggregator = AsyncMock()
aggregator.fetch_pending.return_value = [
MagicMock(
id=1,
symbol="NVDA",
action=MagicMock(value="buy"),
conviction=Decimal("0.8"),
effective_conviction=Decimal("0.8"),
time_horizon=MagicMock(value="weeks"),
),
]
strategy = AsyncMock()
from shared.schemas.kevin import KevinDecisionType
strategy.evaluate_mention.return_value = MagicMock(
decision=KevinDecisionType.OPEN_LONG,
symbol="NVDA",
target_dollars=Decimal("3000"),
holding_days=10,
effective_conviction=Decimal("0.8"),
rationale="ok",
)
audit_writer = AsyncMock()
broker = AsyncMock()
broker.is_asset_tradable.return_value = True
broker.get_latest_price.return_value = Decimal("100")
broker.get_account.return_value = MagicMock(
equity=Decimal("100000"), cash=Decimal("100000")
)
broker.get_positions.return_value = []
bridge = KevinBridge(
config=config,
cursor=cursor,
publisher=publisher,
aggregator=aggregator,
strategy=strategy,
audit_writer=audit_writer,
broker=broker,
)
await bridge.process_one_pass()
# cursor.advance must NOT have been called for mention 1
cursor.advance.assert_not_called()