All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
End-to-end Phase 2 verification surfaced a FK violation: the bridge publishes a TradeSignal to the Redis stream and writes kevin_signal_bridge_state with signal_id, but signal_id has a FK to the signals table — which was never populated for Kevin-emitted signals (only the news+sentiment path wrote there). AuditWriter.persist_signal() inserts the TradeSignal into the signals table idempotently (on_conflict_do_nothing on the UUID PK) before the bridge publishes to Redis. Bridge calls it as a new step right before the XADD, so: 1. Signal row exists in signals table 2. XADD to signals:generated 3. Audit row with signal_id FK now resolves Verified live: mention #84 (synthetic NVDA buy, conviction 0.85) emitted a signal, trade-executor consumed and correctly rejected with outside_market_hours (market was closed at the time).
358 lines
12 KiB
Python
358 lines
12 KiB
Python
"""Kevin signal bridge — polls kevin_stock_mentions, calls KevinStrategy,
|
|
publishes TradeSignal to signals:generated.
|
|
|
|
Kill-switch (kevin_enable_trading=false) writes audit rows but skips
|
|
publish.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from decimal import Decimal
|
|
from typing import Any
|
|
|
|
from shared.constants.kevin import KEVIN_STRATEGY_UUID
|
|
from shared.schemas.kevin import KevinAccountState, KevinDecisionType
|
|
from shared.schemas.trading import SignalDirection, TradeSignal
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class KevinBridge:
|
|
"""End-to-end orchestrator. Composed from injected collaborators
|
|
so it's unit-testable.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Any,
|
|
cursor: Any,
|
|
publisher: Any,
|
|
aggregator: Any,
|
|
strategy: Any,
|
|
audit_writer: Any,
|
|
broker: Any,
|
|
blocklist: Any = None,
|
|
risk_counters: Any = None,
|
|
) -> None:
|
|
self.config = config
|
|
self.cursor = cursor
|
|
self.publisher = publisher
|
|
self.aggregator = aggregator
|
|
self.strategy = strategy
|
|
self.audit_writer = audit_writer
|
|
self.broker = broker
|
|
self.blocklist = blocklist
|
|
self.risk_counters = risk_counters
|
|
|
|
async def process_one_pass(self) -> int:
|
|
last_seen = await self.cursor.last_seen_id()
|
|
pending = await self.aggregator.fetch_pending(since_id=last_seen)
|
|
n_processed = 0
|
|
for mention in pending:
|
|
try:
|
|
published_ok = await self._process_mention(mention)
|
|
# Race-fix: only advance cursor when the side-effect actually
|
|
# succeeded — for dry-run / no-op flows we also advance, for
|
|
# publish failures we do NOT.
|
|
if published_ok:
|
|
await self.cursor.advance(mention.id)
|
|
n_processed += 1
|
|
except Exception:
|
|
logger.exception("bridge error on mention %s", mention.id)
|
|
return n_processed
|
|
|
|
async def _process_mention(self, mention: Any) -> bool:
|
|
"""Process one mention. Returns True if cursor should advance,
|
|
False if publish failed (so we retry next pass)."""
|
|
|
|
effective_conviction = getattr(
|
|
mention, "effective_conviction", mention.conviction
|
|
)
|
|
|
|
account_state = await self._snapshot_account()
|
|
is_tradable = await self.broker.is_asset_tradable(mention.symbol)
|
|
current_price = await self.broker.get_latest_price(mention.symbol)
|
|
|
|
decision = await self.strategy.evaluate_mention(
|
|
mention,
|
|
account_state,
|
|
effective_conviction=effective_conviction,
|
|
current_price=current_price,
|
|
is_tradable=is_tradable,
|
|
)
|
|
|
|
if decision.decision == KevinDecisionType.NO_OP:
|
|
status = self._classify_no_op(decision.rationale)
|
|
await self.audit_writer.write(
|
|
mention_id=mention.id,
|
|
bridge_status=status,
|
|
effective_conviction=effective_conviction,
|
|
signal_id=None,
|
|
trade_id=None,
|
|
notes=decision.rationale,
|
|
)
|
|
return True # cursor advances; nothing to publish
|
|
|
|
# Apply blocklist side-effect on AVOID
|
|
action_value = getattr(mention.action, "value", mention.action)
|
|
if self.blocklist and action_value == "avoid":
|
|
await self.blocklist.add(
|
|
mention.symbol, ttl_days=self.config.kevin_avoid_blocks_days
|
|
)
|
|
|
|
if not self.config.kevin_enable_trading:
|
|
await self.audit_writer.write(
|
|
mention_id=mention.id,
|
|
bridge_status="dry_run",
|
|
effective_conviction=effective_conviction,
|
|
signal_id=None,
|
|
trade_id=None,
|
|
notes=f"kill-switch off; would: {decision.rationale}",
|
|
)
|
|
return True
|
|
|
|
# Publish TradeSignal to Redis Stream — cursor only advances if XADD ok
|
|
signal = TradeSignal(
|
|
ticker=decision.symbol,
|
|
direction=(
|
|
SignalDirection.LONG
|
|
if decision.decision == KevinDecisionType.OPEN_LONG
|
|
else SignalDirection.EXIT
|
|
),
|
|
strength=float(decision.effective_conviction or 1.0),
|
|
strategy_id=KEVIN_STRATEGY_UUID,
|
|
strategy_sources=[
|
|
f"kevin:{action_value}:{effective_conviction}",
|
|
],
|
|
target_dollars=decision.target_dollars,
|
|
stop_loss_pct=Decimal(str(self.config.kevin_stop_loss_pct)),
|
|
take_profit_pct=Decimal(str(self.config.kevin_take_profit_pct)),
|
|
)
|
|
|
|
# Persist to signals table FIRST so downstream FK
|
|
# (kevin_signal_bridge_state.signal_id → signals.id) resolves.
|
|
# On retry the on_conflict_do_nothing makes this idempotent.
|
|
await self.audit_writer.persist_signal(signal)
|
|
|
|
try:
|
|
stream_id = await self.publisher.publish(signal)
|
|
except Exception:
|
|
# Record broker_rejected audit, do NOT advance cursor
|
|
await self.audit_writer.write(
|
|
mention_id=mention.id,
|
|
bridge_status="broker_rejected",
|
|
effective_conviction=effective_conviction,
|
|
signal_id=signal.signal_id,
|
|
trade_id=None,
|
|
notes="publish failed; will retry next pass",
|
|
)
|
|
return False
|
|
|
|
await self.audit_writer.write(
|
|
mention_id=mention.id,
|
|
bridge_status="emitted",
|
|
effective_conviction=effective_conviction,
|
|
signal_id=signal.signal_id,
|
|
trade_id=None,
|
|
notes=f"published to stream as {stream_id}",
|
|
)
|
|
return True
|
|
|
|
async def _snapshot_account(self) -> KevinAccountState:
|
|
acct = await self.broker.get_account()
|
|
positions = await self.broker.get_positions()
|
|
# PositionInfo schema uses `ticker` + `market_value` (no `cost_basis`).
|
|
# Cost basis ≈ qty * avg_entry; this is what the strategy's per-ticker cap compares against.
|
|
held = {
|
|
p.ticker: Decimal(str(p.qty)) * Decimal(str(p.avg_entry))
|
|
for p in positions
|
|
if p.qty != 0
|
|
}
|
|
blocklist = (
|
|
await self.blocklist.active_set() if self.blocklist else set()
|
|
)
|
|
daily_trades = (
|
|
await self.risk_counters.get_daily_trades() if self.risk_counters else 0
|
|
)
|
|
daily_alloc = (
|
|
await self.risk_counters.get_daily_alloc()
|
|
if self.risk_counters
|
|
else Decimal("0")
|
|
)
|
|
return KevinAccountState(
|
|
equity_usd=Decimal(str(acct.equity)),
|
|
cash_usd=Decimal(str(acct.cash)),
|
|
held_positions=held,
|
|
blocklisted_symbols=blocklist,
|
|
daily_trade_count=daily_trades,
|
|
daily_alloc_usd=daily_alloc,
|
|
paused=await self._is_paused(),
|
|
)
|
|
|
|
async def _is_paused(self) -> bool:
|
|
if self.risk_counters:
|
|
return bool(await self.risk_counters.is_trading_paused())
|
|
return False
|
|
|
|
@staticmethod
|
|
def _classify_no_op(rationale: str) -> str:
|
|
r = rationale.lower()
|
|
if "tradable" in r:
|
|
return "skipped_non_tradable"
|
|
if "blocklist" in r:
|
|
return "skipped_blocklist"
|
|
if "cap" in r or "paused" in r or "halt" in r:
|
|
return "skipped_caps"
|
|
return "deferred"
|
|
|
|
|
|
# --- concrete publisher (Pydantic -> stream) ---
|
|
|
|
|
|
class TradeSignalPublisher:
|
|
"""Wraps a StreamPublisher to accept Pydantic TradeSignal objects."""
|
|
|
|
def __init__(self, stream_publisher: Any) -> None:
|
|
self.stream_publisher = stream_publisher
|
|
|
|
async def publish(self, signal: TradeSignal) -> str:
|
|
payload = signal.model_dump(mode="json")
|
|
return str(await self.stream_publisher.publish(payload))
|
|
|
|
|
|
# --- service entry point ---
|
|
|
|
|
|
async def run() -> None:
|
|
"""Boot the bridge with concrete collaborators + main poll loop."""
|
|
import os
|
|
import signal as _signal
|
|
from decimal import Decimal as _D
|
|
|
|
from redis.asyncio import Redis
|
|
|
|
from services.kevin_signal_bridge.aggregator import MentionAggregator
|
|
from services.kevin_signal_bridge.audit import AuditWriter
|
|
from services.kevin_signal_bridge.blocklist import KevinBlocklist
|
|
from services.kevin_signal_bridge.config import KevinBridgeConfig
|
|
from services.kevin_signal_bridge.cursor import RedisCursor
|
|
from services.kevin_signal_bridge.risk_counters import KevinRiskCounters
|
|
from shared.broker.alpaca_broker import AlpacaBroker
|
|
from shared.db import create_db
|
|
from shared.redis_streams import StreamPublisher
|
|
from shared.strategies.kevin import KevinStrategy, KevinStrategyConfig
|
|
|
|
logging.basicConfig(
|
|
level=os.environ.get("TRADING_LOG_LEVEL", "INFO"),
|
|
format="%(asctime)s %(name)s %(levelname)s %(message)s",
|
|
)
|
|
logger.info("Booting Kevin signal bridge")
|
|
|
|
config = KevinBridgeConfig()
|
|
_engine, session_factory = create_db(config)
|
|
|
|
redis = Redis.from_url(config.redis_url)
|
|
cursor = RedisCursor(redis)
|
|
blocklist = KevinBlocklist(redis)
|
|
risk_counters = KevinRiskCounters(redis)
|
|
|
|
aggregator = MentionAggregator(
|
|
session_factory=session_factory,
|
|
window_hours=config.kevin_max_mention_age_hours,
|
|
boost_per_repeat=_D(str(config.kevin_mention_boost_per_repeat)),
|
|
max_boost=_D(str(config.kevin_max_mention_boost)),
|
|
)
|
|
|
|
strategy_cfg = KevinStrategyConfig(
|
|
min_conviction=_D(str(config.kevin_min_conviction)),
|
|
max_mention_age_hours=config.kevin_max_mention_age_hours,
|
|
base_position_pct=_D(str(config.kevin_base_position_pct)),
|
|
min_trade_usd=_D(str(config.kevin_min_trade_usd)),
|
|
max_trade_usd=_D(str(config.kevin_max_trade_usd)),
|
|
max_per_ticker_usd=_D(str(config.kevin_max_per_ticker_usd)),
|
|
hold_days_by_horizon=config.kevin_hold_days,
|
|
avoid_closes_longs=config.kevin_avoid_closes_longs,
|
|
avoid_blocks_days=config.kevin_avoid_blocks_days,
|
|
)
|
|
strategy = KevinStrategy(strategy_cfg)
|
|
|
|
audit_writer = AuditWriter(session_factory=session_factory)
|
|
stream_pub = StreamPublisher(redis, "signals:generated")
|
|
publisher = TradeSignalPublisher(stream_pub)
|
|
|
|
api_key = os.environ.get("TRADING_ALPACA_API_KEY", "")
|
|
secret_key = os.environ.get("TRADING_ALPACA_SECRET_KEY", "")
|
|
broker: Any
|
|
if api_key and secret_key:
|
|
broker = AlpacaBroker(api_key=api_key, secret_key=secret_key, paper=True)
|
|
else:
|
|
# Run without a broker — useful for the dry-run smoke test
|
|
from unittest.mock import AsyncMock as _AsyncMock
|
|
from unittest.mock import MagicMock as _MagicMock
|
|
|
|
logger.warning(
|
|
"Alpaca creds missing — running with stub broker (dry-run only)"
|
|
)
|
|
broker = _AsyncMock()
|
|
broker.is_asset_tradable.return_value = True
|
|
broker.get_latest_price.return_value = _D("0")
|
|
broker.get_account.return_value = _MagicMock(
|
|
equity=_D("100000"), cash=_D("100000")
|
|
)
|
|
broker.get_positions.return_value = []
|
|
|
|
bridge = KevinBridge(
|
|
config=config,
|
|
cursor=cursor,
|
|
publisher=publisher,
|
|
aggregator=aggregator,
|
|
strategy=strategy,
|
|
audit_writer=audit_writer,
|
|
broker=broker,
|
|
blocklist=blocklist,
|
|
risk_counters=risk_counters,
|
|
)
|
|
|
|
stop = asyncio.Event()
|
|
|
|
def _on_signal(*_: Any) -> None:
|
|
logger.info("Received shutdown signal")
|
|
stop.set()
|
|
|
|
loop = asyncio.get_running_loop()
|
|
for sig in (_signal.SIGINT, _signal.SIGTERM):
|
|
try:
|
|
loop.add_signal_handler(sig, _on_signal)
|
|
except NotImplementedError:
|
|
pass
|
|
|
|
logger.info(
|
|
"Bridge running: poll_interval=%ss kill_switch_on=%s",
|
|
config.kevin_bridge_poll_interval_seconds,
|
|
config.kevin_enable_trading,
|
|
)
|
|
|
|
while not stop.is_set():
|
|
try:
|
|
n = await bridge.process_one_pass()
|
|
if n:
|
|
logger.info("Processed %d mentions", n)
|
|
except Exception:
|
|
logger.exception("Bridge poll iteration failed")
|
|
|
|
try:
|
|
await asyncio.wait_for(
|
|
stop.wait(),
|
|
timeout=config.kevin_bridge_poll_interval_seconds,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
|
|
logger.info("Bridge shutting down")
|
|
await redis.aclose()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run())
|