"""Kevin signal bridge — polls kevin_stock_mentions, calls KevinStrategy, publishes TradeSignal to signals:generated. Kill-switch (kevin_enable_trading=false) writes audit rows but skips publish. """ from __future__ import annotations import asyncio import logging from datetime import date, datetime from decimal import Decimal from typing import Any from zoneinfo import ZoneInfo from shared.constants.kevin import KEVIN_STRATEGY_UUID from shared.schemas.kevin import KevinAccountState, KevinDecisionType from shared.schemas.trading import SignalDirection, TradeSignal logger = logging.getLogger(__name__) _ET = ZoneInfo("America/New_York") def should_run_exit_scan( cron: str, now_et: datetime, last_run_date: date | None, ) -> bool: """Decide whether the daily exit-scan should run right now. ``croniter`` is not a project dependency, so we parse only the fields the Kevin schedule uses — ``minute hour * * dow`` — and apply a simple once-per-ET-weekday gate: * fire only on a weekday listed in the cron's day-of-week field, * only at/after the cron's HH:MM (ET), * at most once per ET calendar day (tracked via ``last_run_date``). The hour/minute and DOW are honoured; the day-of-month / month fields are treated as wildcards (the Kevin cron always sets them to ``*``). """ minute, hour, _dom, _month, dow = cron.split() target_minutes = int(hour) * 60 + int(minute) # cron DOW: 0/7 = Sunday … 6 = Saturday. Python weekday(): Mon=0 … Sun=6. allowed_dows = _parse_cron_dow(dow) py_to_cron = {0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6, 6: 0} if py_to_cron[now_et.weekday()] not in allowed_dows: return False if now_et.hour * 60 + now_et.minute < target_minutes: return False return last_run_date != now_et.date() def _parse_cron_dow(dow: str) -> set[int]: """Expand a cron day-of-week field (``*``, ``1-5``, ``1,3,5``) to a set of cron DOW integers (0/7 = Sunday … 6 = Saturday).""" if dow == "*": return set(range(7)) days: set[int] = set() for part in dow.split(","): if "-" in part: lo, hi = (int(x) for x in part.split("-")) days.update(range(lo, hi + 1)) else: days.add(int(part)) return days class KevinBridge: """End-to-end orchestrator. Composed from injected collaborators so it's unit-testable. """ def __init__( self, config: Any, cursor: Any, publisher: Any, aggregator: Any, strategy: Any, audit_writer: Any, broker: Any, blocklist: Any = None, risk_counters: Any = None, ) -> None: self.config = config self.cursor = cursor self.publisher = publisher self.aggregator = aggregator self.strategy = strategy self.audit_writer = audit_writer self.broker = broker self.blocklist = blocklist self.risk_counters = risk_counters async def process_one_pass(self) -> int: last_seen = await self.cursor.last_seen_id() pending = await self.aggregator.fetch_pending(since_id=last_seen) n_processed = 0 for mention in pending: try: published_ok = await self._process_mention(mention) # Race-fix: only advance cursor when the side-effect actually # succeeded — for dry-run / no-op flows we also advance, for # publish failures we do NOT. if published_ok: await self.cursor.advance(mention.id) n_processed += 1 except Exception: logger.exception("bridge error on mention %s", mention.id) return n_processed async def _process_mention(self, mention: Any) -> bool: """Process one mention. Returns True if cursor should advance, False if publish failed (so we retry next pass).""" effective_conviction = getattr( mention, "effective_conviction", mention.conviction ) account_state = await self._snapshot_account() is_tradable = await self.broker.is_asset_tradable(mention.symbol) current_price = await self.broker.get_latest_price(mention.symbol) decision = await self.strategy.evaluate_mention( mention, account_state, effective_conviction=effective_conviction, current_price=current_price, is_tradable=is_tradable, ) if decision.decision == KevinDecisionType.NO_OP: status = self._classify_no_op(decision.rationale) await self.audit_writer.write( mention_id=mention.id, bridge_status=status, effective_conviction=effective_conviction, signal_id=None, trade_id=None, notes=decision.rationale, ) return True # cursor advances; nothing to publish # Apply blocklist side-effect on AVOID action_value = getattr(mention.action, "value", mention.action) if self.blocklist and action_value == "avoid": await self.blocklist.add( mention.symbol, ttl_days=self.config.kevin_avoid_blocks_days ) if not self.config.kevin_enable_trading: await self.audit_writer.write( mention_id=mention.id, bridge_status="dry_run", effective_conviction=effective_conviction, signal_id=None, trade_id=None, notes=f"kill-switch off; would: {decision.rationale}", ) return True # Publish TradeSignal to Redis Stream — cursor only advances if XADD ok signal = TradeSignal( ticker=decision.symbol, direction=( SignalDirection.LONG if decision.decision == KevinDecisionType.OPEN_LONG else SignalDirection.EXIT ), strength=float(decision.effective_conviction or 1.0), strategy_id=KEVIN_STRATEGY_UUID, strategy_sources=[ f"kevin:{action_value}:{effective_conviction}", ], target_dollars=decision.target_dollars, stop_loss_pct=Decimal(str(self.config.kevin_stop_loss_pct)), take_profit_pct=Decimal(str(self.config.kevin_take_profit_pct)), current_price=current_price, ) # Persist to signals table FIRST so downstream FK # (kevin_signal_bridge_state.signal_id → signals.id) resolves. # On retry the on_conflict_do_nothing makes this idempotent. await self.audit_writer.persist_signal(signal) try: stream_id = await self.publisher.publish(signal) except Exception: # Record broker_rejected audit, do NOT advance cursor await self.audit_writer.write( mention_id=mention.id, bridge_status="broker_rejected", effective_conviction=effective_conviction, signal_id=signal.signal_id, trade_id=None, notes="publish failed; will retry next pass", ) return False await self.audit_writer.write( mention_id=mention.id, bridge_status="emitted", effective_conviction=effective_conviction, signal_id=signal.signal_id, trade_id=None, notes=f"published to stream as {stream_id}", ) return True async def _snapshot_account(self) -> KevinAccountState: acct = await self.broker.get_account() positions = await self.broker.get_positions() # PositionInfo schema uses `ticker` + `market_value` (no `cost_basis`). # Cost basis ≈ qty * avg_entry; this is what the strategy's per-ticker cap compares against. held = { p.ticker: Decimal(str(p.qty)) * Decimal(str(p.avg_entry)) for p in positions if p.qty != 0 } blocklist = ( await self.blocklist.active_set() if self.blocklist else set() ) daily_trades = ( await self.risk_counters.get_daily_trades() if self.risk_counters else 0 ) daily_alloc = ( await self.risk_counters.get_daily_alloc() if self.risk_counters else Decimal("0") ) return KevinAccountState( equity_usd=Decimal(str(acct.equity)), cash_usd=Decimal(str(acct.cash)), held_positions=held, blocklisted_symbols=blocklist, daily_trade_count=daily_trades, daily_alloc_usd=daily_alloc, paused=await self._is_paused(), ) async def _is_paused(self) -> bool: if self.risk_counters: return bool(await self.risk_counters.is_trading_paused()) return False @staticmethod def _classify_no_op(rationale: str) -> str: r = rationale.lower() if "tradable" in r: return "skipped_non_tradable" if "blocklist" in r: return "skipped_blocklist" if "cap" in r or "paused" in r or "halt" in r: return "skipped_caps" return "deferred" # --- concrete publisher (Pydantic -> stream) --- class TradeSignalPublisher: """Wraps a StreamPublisher to accept Pydantic TradeSignal objects.""" def __init__(self, stream_publisher: Any) -> None: self.stream_publisher = stream_publisher async def publish(self, signal: TradeSignal) -> str: payload = signal.model_dump(mode="json") return str(await self.stream_publisher.publish(payload)) # --- service entry point --- async def run() -> None: """Boot the bridge with concrete collaborators + main poll loop.""" import os import signal as _signal from decimal import Decimal as _D from redis.asyncio import Redis from services.kevin_signal_bridge.aggregator import MentionAggregator from services.kevin_signal_bridge.audit import AuditWriter from services.kevin_signal_bridge.blocklist import KevinBlocklist from services.kevin_signal_bridge.config import KevinBridgeConfig from services.kevin_signal_bridge.cursor import RedisCursor from services.kevin_signal_bridge.exit_scanner import ExitScanner from services.kevin_signal_bridge.risk_counters import KevinRiskCounters from shared.broker.alpaca_broker import AlpacaBroker from shared.db import create_db from shared.redis_streams import StreamPublisher from shared.strategies.kevin import KevinStrategy, KevinStrategyConfig logging.basicConfig( level=os.environ.get("TRADING_LOG_LEVEL", "INFO"), format="%(asctime)s %(name)s %(levelname)s %(message)s", ) logger.info("Booting Kevin signal bridge") config = KevinBridgeConfig() _engine, session_factory = create_db(config) redis = Redis.from_url(config.redis_url) cursor = RedisCursor(redis) blocklist = KevinBlocklist(redis) risk_counters = KevinRiskCounters(redis) aggregator = MentionAggregator( session_factory=session_factory, window_hours=config.kevin_max_mention_age_hours, boost_per_repeat=_D(str(config.kevin_mention_boost_per_repeat)), max_boost=_D(str(config.kevin_max_mention_boost)), ) strategy_cfg = KevinStrategyConfig( min_conviction=_D(str(config.kevin_min_conviction)), max_mention_age_hours=config.kevin_max_mention_age_hours, base_position_pct=_D(str(config.kevin_base_position_pct)), min_trade_usd=_D(str(config.kevin_min_trade_usd)), max_trade_usd=_D(str(config.kevin_max_trade_usd)), max_per_ticker_usd=_D(str(config.kevin_max_per_ticker_usd)), hold_days_by_horizon=config.kevin_hold_days, avoid_closes_longs=config.kevin_avoid_closes_longs, avoid_blocks_days=config.kevin_avoid_blocks_days, ) strategy = KevinStrategy(strategy_cfg) audit_writer = AuditWriter(session_factory=session_factory) stream_pub = StreamPublisher(redis, "signals:generated") publisher = TradeSignalPublisher(stream_pub) api_key = os.environ.get("TRADING_ALPACA_API_KEY", "") secret_key = os.environ.get("TRADING_ALPACA_SECRET_KEY", "") broker: Any if api_key and secret_key: broker = AlpacaBroker(api_key=api_key, secret_key=secret_key, paper=True) else: # Run without a broker — useful for the dry-run smoke test from unittest.mock import AsyncMock as _AsyncMock from unittest.mock import MagicMock as _MagicMock logger.warning( "Alpaca creds missing — running with stub broker (dry-run only)" ) broker = _AsyncMock() broker.is_asset_tradable.return_value = True broker.get_latest_price.return_value = _D("0") broker.get_account.return_value = _MagicMock( equity=_D("100000"), cash=_D("100000") ) broker.get_positions.return_value = [] bridge = KevinBridge( config=config, cursor=cursor, publisher=publisher, aggregator=aggregator, strategy=strategy, audit_writer=audit_writer, broker=broker, blocklist=blocklist, risk_counters=risk_counters, ) # Daily exit scan — emits EXIT signals for Kevin positions whose hold has # elapsed and that are STILL held at the broker. Shares the bridge's # publisher + broker; gated to fire once per ET weekday (see # should_run_exit_scan; croniter is intentionally not a dependency). exit_scanner = ExitScanner( session_factory=session_factory, publisher=publisher, config=config, broker=broker, ) last_exit_scan_date: date | None = None stop = asyncio.Event() def _on_signal(*_: Any) -> None: logger.info("Received shutdown signal") stop.set() loop = asyncio.get_running_loop() for sig in (_signal.SIGINT, _signal.SIGTERM): try: loop.add_signal_handler(sig, _on_signal) except NotImplementedError: pass logger.info( "Bridge running: poll_interval=%ss kill_switch_on=%s", config.kevin_bridge_poll_interval_seconds, config.kevin_enable_trading, ) while not stop.is_set(): try: n = await bridge.process_one_pass() if n: logger.info("Processed %d mentions", n) except Exception: logger.exception("Bridge poll iteration failed") if should_run_exit_scan( config.kevin_bridge_exit_scan_cron, datetime.now(_ET), last_exit_scan_date, ): try: emitted = await exit_scanner.scan_and_emit_exits() last_exit_scan_date = datetime.now(_ET).date() logger.info("Exit scan emitted %d EXIT signal(s)", emitted) except Exception: logger.exception("Exit scan failed") try: await asyncio.wait_for( stop.wait(), timeout=config.kevin_bridge_poll_interval_seconds, ) except asyncio.TimeoutError: pass logger.info("Bridge shutting down") await redis.aclose() if __name__ == "__main__": asyncio.run(run())