From cff256442847a1cffee3aedf838086e18f69f543 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 24 May 2026 01:03:53 +0000 Subject: [PATCH] feat(kevin_bridge): exit-scan daily job + cursor + audit writer --- services/kevin_signal_bridge/audit.py | 50 ++++++ services/kevin_signal_bridge/cursor.py | 28 ++++ services/kevin_signal_bridge/exit_scanner.py | 104 ++++++++++++ .../kevin_signal_bridge/test_audit.py | 117 +++++++++++++ .../kevin_signal_bridge/test_cursor.py | 27 +++ .../kevin_signal_bridge/test_exit_scanner.py | 154 ++++++++++++++++++ 6 files changed, 480 insertions(+) create mode 100644 services/kevin_signal_bridge/audit.py create mode 100644 services/kevin_signal_bridge/cursor.py create mode 100644 services/kevin_signal_bridge/exit_scanner.py create mode 100644 tests/services/kevin_signal_bridge/test_audit.py create mode 100644 tests/services/kevin_signal_bridge/test_cursor.py create mode 100644 tests/services/kevin_signal_bridge/test_exit_scanner.py diff --git a/services/kevin_signal_bridge/audit.py b/services/kevin_signal_bridge/audit.py new file mode 100644 index 0000000..5c7b1c0 --- /dev/null +++ b/services/kevin_signal_bridge/audit.py @@ -0,0 +1,50 @@ +"""Audit writer: upserts a row in kevin_signal_bridge_state per processed mention.""" + +from __future__ import annotations + +import uuid +from decimal import Decimal +from typing import Any, Callable + +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from shared.models.meet_kevin_trading import KevinSignalBridgeState + + +class AuditWriter: + def __init__(self, session_factory: Callable[..., Any]) -> None: + self.session_factory = session_factory + + async def write( + self, + *, + mention_id: int, + bridge_status: str, + effective_conviction: Decimal | None = None, + signal_id: uuid.UUID | None = None, + trade_id: uuid.UUID | None = None, + notes: str | None = None, + ) -> None: + """Upsert one audit row (mention_id is unique).""" + async with self.session_factory() as session: + stmt = pg_insert(KevinSignalBridgeState).values( + id=uuid.uuid4(), + mention_id=mention_id, + bridge_status=bridge_status, + effective_conviction=effective_conviction, + signal_id=signal_id, + trade_id=trade_id, + notes=notes, + ) + stmt = stmt.on_conflict_do_update( + index_elements=["mention_id"], + set_={ + "bridge_status": stmt.excluded.bridge_status, + "effective_conviction": stmt.excluded.effective_conviction, + "signal_id": stmt.excluded.signal_id, + "trade_id": stmt.excluded.trade_id, + "notes": stmt.excluded.notes, + }, + ) + await session.execute(stmt) + await session.commit() diff --git a/services/kevin_signal_bridge/cursor.py b/services/kevin_signal_bridge/cursor.py new file mode 100644 index 0000000..e77a763 --- /dev/null +++ b/services/kevin_signal_bridge/cursor.py @@ -0,0 +1,28 @@ +"""Redis-backed cursor for the bridge. + +Tracks the highest `kevin_stock_mentions.id` we've already attempted to +process. Stored at key `kevin:bridge:last_mention_id`. +""" + +from __future__ import annotations + +from typing import Any + + +class RedisCursor: + _KEY = "kevin:bridge:last_mention_id" + + def __init__(self, redis: Any) -> None: + self.redis = redis + + async def last_seen_id(self) -> int: + v = await self.redis.get(self._KEY) + if v is None: + return 0 + return int(v) + + async def advance(self, mention_id: int) -> None: + # Conservative: only set if mention_id > current + current = await self.last_seen_id() + if mention_id > current: + await self.redis.set(self._KEY, str(mention_id)) diff --git a/services/kevin_signal_bridge/exit_scanner.py b/services/kevin_signal_bridge/exit_scanner.py new file mode 100644 index 0000000..80b5aa0 --- /dev/null +++ b/services/kevin_signal_bridge/exit_scanner.py @@ -0,0 +1,104 @@ +"""Daily exit-scan job for the Kevin bridge. + +Walks open Kevin trades (Trade.strategy_id == KEVIN_STRATEGY_UUID, status +FILLED, no offsetting SELL) and publishes EXIT TradeSignals for any whose +holding period has elapsed. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timedelta, timezone +from typing import Any, Callable + +from sqlalchemy import and_, select + +from shared.constants.kevin import KEVIN_STRATEGY_UUID +from shared.models.meet_kevin_trading import KevinSignalBridgeState +from shared.models.trading import Trade, TradeStatus +from shared.schemas.trading import SignalDirection, TradeSignal + +logger = logging.getLogger(__name__) + + +class ExitScanner: + def __init__( + self, + session_factory: Callable[..., Any], + publisher: Any, + config: Any, + ) -> None: + self.session_factory = session_factory + self.publisher = publisher + self.config = config + + async def scan_and_emit_exits(self) -> int: + """Returns the number of EXIT signals emitted.""" + now = datetime.now(timezone.utc) + emitted = 0 + async with self.session_factory() as session: + # Find open Kevin trades (FILLED, no closing trade yet on same ticker) + open_trades = ( + ( + await session.execute( + select(Trade).where( + and_( + Trade.strategy_id == KEVIN_STRATEGY_UUID, + Trade.status == TradeStatus.FILLED, + ) + ) + ) + ) + .scalars() + .all() + ) + + for trade in open_trades: + # Find the source audit row to learn the original holding_days target + async with self.session_factory() as session: + audit = ( + ( + await session.execute( + select(KevinSignalBridgeState).where( + KevinSignalBridgeState.trade_id == trade.id + ) + ) + ) + .scalars() + .one_or_none() + ) + if audit is None: + continue + + hold_days = self._holding_days(audit) + target_exit_at = audit.decided_at + timedelta(days=hold_days) + if now < target_exit_at: + continue + + signal = TradeSignal( + ticker=trade.ticker, + direction=SignalDirection.EXIT, + strength=1.0, + strategy_id=KEVIN_STRATEGY_UUID, + strategy_sources=[f"kevin:exit_scan:hold_expired:{hold_days}d"], + ) + try: + await self.publisher.publish(signal) + emitted += 1 + except Exception: + logger.exception("exit-scan publish failed for trade %s", trade.id) + + return emitted + + def _holding_days(self, audit: KevinSignalBridgeState) -> int: + """Best-effort holding days from notes; fallback to config default.""" + notes = audit.notes or "" + # Try to find ' hold=Nd' in the audit notes + for token in notes.split(): + if token.startswith("hold=") and token.endswith("d"): + try: + return int(token.removeprefix("hold=").removesuffix("d")) + except ValueError: + pass + default_map = getattr(self.config, "kevin_hold_days", {}) + return int(default_map.get("unspecified", 10)) diff --git a/tests/services/kevin_signal_bridge/test_audit.py b/tests/services/kevin_signal_bridge/test_audit.py new file mode 100644 index 0000000..320b343 --- /dev/null +++ b/tests/services/kevin_signal_bridge/test_audit.py @@ -0,0 +1,117 @@ +"""Tests for the audit writer (upsert into kevin_signal_bridge_state).""" + +from datetime import datetime, timezone +from decimal import Decimal + +import pytest +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from services.kevin_signal_bridge.audit import AuditWriter +from shared.models.meet_kevin import ( + KevinAnalysis, + KevinChannel, + KevinMarketOutlook, + KevinStockMention, + KevinTickerAction, + KevinTimeHorizon, + KevinVideo, + KevinVideoStatus, +) +from shared.models.meet_kevin_trading import KevinSignalBridgeState + + +def _factory(session: AsyncSession): + class _StaticSessionFactory: + async def __aenter__(self): + return session + + async def __aexit__(self, *args): + pass + + def factory(): + return _StaticSessionFactory() + + return factory + + +async def _seed_mention(session: AsyncSession) -> int: + channel = KevinChannel(youtube_channel_id="UCa", title="t") + session.add(channel) + await session.flush() + video = KevinVideo( + channel_id=channel.id, + youtube_video_id="va", + title="t", + published_at=datetime.now(timezone.utc), + status=KevinVideoStatus.ANALYZED, + ) + session.add(video) + await session.flush() + analysis = KevinAnalysis( + video_id=video.id, + model="m", + prompt_version="v1", + market_outlook_direction=KevinMarketOutlook.NEUTRAL, + market_outlook_reasoning="x", + summary="x", + prompt_tokens=10, + completion_tokens=10, + cost_usd=Decimal("0.01"), + ) + session.add(analysis) + await session.flush() + mention = KevinStockMention( + video_id=video.id, + analysis_id=analysis.id, + symbol="NVDA", + action=KevinTickerAction.BUY, + conviction=Decimal("0.7"), + time_horizon=KevinTimeHorizon.WEEKS, + rationale_quote="x", + ) + session.add(mention) + await session.flush() + return mention.id + + +@pytest.mark.asyncio +async def test_audit_writer_inserts_new_row(db_session: AsyncSession): + mention_id = await _seed_mention(db_session) + writer = AuditWriter(session_factory=_factory(db_session)) + await writer.write( + mention_id=mention_id, + bridge_status="emitted", + effective_conviction=Decimal("0.75"), + notes="hello", + ) + row = ( + await db_session.execute( + select(KevinSignalBridgeState).where( + KevinSignalBridgeState.mention_id == mention_id + ) + ) + ).scalar_one() + assert row.bridge_status.value == "emitted" + + +@pytest.mark.asyncio +async def test_audit_writer_upserts_on_duplicate(db_session: AsyncSession): + mention_id = await _seed_mention(db_session) + writer = AuditWriter(session_factory=_factory(db_session)) + await writer.write(mention_id=mention_id, bridge_status="dry_run", notes="first") + await writer.write(mention_id=mention_id, bridge_status="emitted", notes="second") + rows = ( + ( + await db_session.execute( + select(KevinSignalBridgeState).where( + KevinSignalBridgeState.mention_id == mention_id + ) + ) + ) + .scalars() + .all() + ) + assert len(rows) == 1 + assert rows[0].bridge_status.value == "emitted" + assert rows[0].notes == "second" diff --git a/tests/services/kevin_signal_bridge/test_cursor.py b/tests/services/kevin_signal_bridge/test_cursor.py new file mode 100644 index 0000000..90d90a3 --- /dev/null +++ b/tests/services/kevin_signal_bridge/test_cursor.py @@ -0,0 +1,27 @@ +"""Tests for the Redis cursor.""" + +import fakeredis.aioredis + +from services.kevin_signal_bridge.cursor import RedisCursor + + +async def _redis(): + return fakeredis.aioredis.FakeRedis() + + +async def test_cursor_defaults_to_zero(): + cursor = RedisCursor(await _redis()) + assert await cursor.last_seen_id() == 0 + + +async def test_cursor_advance_persists_value(): + cursor = RedisCursor(await _redis()) + await cursor.advance(42) + assert await cursor.last_seen_id() == 42 + + +async def test_cursor_advance_never_goes_backwards(): + cursor = RedisCursor(await _redis()) + await cursor.advance(42) + await cursor.advance(10) + assert await cursor.last_seen_id() == 42 diff --git a/tests/services/kevin_signal_bridge/test_exit_scanner.py b/tests/services/kevin_signal_bridge/test_exit_scanner.py new file mode 100644 index 0000000..a12c833 --- /dev/null +++ b/tests/services/kevin_signal_bridge/test_exit_scanner.py @@ -0,0 +1,154 @@ +"""Tests for ExitScanner — emits EXIT signal when holding period elapsed.""" + +from datetime import datetime, timedelta, timezone +from decimal import Decimal +from unittest.mock import AsyncMock, MagicMock + +import pytest +from sqlalchemy.ext.asyncio import AsyncSession + +from services.kevin_signal_bridge.exit_scanner import ExitScanner +from shared.constants.kevin import KEVIN_STRATEGY_UUID +from shared.models.meet_kevin import ( + KevinAnalysis, + KevinChannel, + KevinMarketOutlook, + KevinStockMention, + KevinTickerAction, + KevinTimeHorizon, + KevinVideo, + KevinVideoStatus, +) +from shared.models.meet_kevin_trading import BridgeStatus, KevinSignalBridgeState +from shared.models.trading import Trade, TradeSide, TradeStatus + + +def _factory(session: AsyncSession): + class _StaticSessionFactory: + async def __aenter__(self): + return session + + async def __aexit__(self, *args): + pass + + def factory(): + return _StaticSessionFactory() + + return factory + + +async def _seed_video_and_mention(session: AsyncSession) -> int: + channel = KevinChannel(youtube_channel_id="UCex", title="t") + session.add(channel) + await session.flush() + video = KevinVideo( + channel_id=channel.id, + youtube_video_id="vex", + title="t", + published_at=datetime.now(timezone.utc), + status=KevinVideoStatus.ANALYZED, + ) + session.add(video) + await session.flush() + analysis = KevinAnalysis( + video_id=video.id, + model="m", + prompt_version="v1", + market_outlook_direction=KevinMarketOutlook.NEUTRAL, + market_outlook_reasoning="x", + summary="x", + prompt_tokens=10, + completion_tokens=10, + cost_usd=Decimal("0.01"), + ) + session.add(analysis) + await session.flush() + mention = KevinStockMention( + video_id=video.id, + analysis_id=analysis.id, + symbol="NVDA", + action=KevinTickerAction.BUY, + conviction=Decimal("0.7"), + time_horizon=KevinTimeHorizon.WEEKS, + rationale_quote="x", + ) + session.add(mention) + await session.flush() + return mention.id + + +@pytest.mark.asyncio +async def test_exit_scanner_emits_on_elapsed_hold(db_session: AsyncSession): + mention_id = await _seed_video_and_mention(db_session) + + # Create a filled Kevin trade 30 days ago + trade = Trade( + ticker="NVDA", + side=TradeSide.BUY, + qty=10.0, + price=100.0, + strategy_id=KEVIN_STRATEGY_UUID, + status=TradeStatus.FILLED, + ) + db_session.add(trade) + await db_session.flush() + + # Audit row with decided_at 30 days ago + hold=10d in notes + audit = KevinSignalBridgeState( + mention_id=mention_id, + bridge_status=BridgeStatus.EMITTED, + trade_id=trade.id, + notes="BUY conv=0.7 -> 2% target=$2000 hold=10d", + decided_at=datetime.now(timezone.utc) - timedelta(days=30), + ) + db_session.add(audit) + await db_session.flush() + + publisher = AsyncMock() + publisher.publish.return_value = "1234-0" + config = MagicMock(kevin_hold_days={"unspecified": 10}) + + scanner = ExitScanner( + session_factory=_factory(db_session), publisher=publisher, config=config + ) + emitted = await scanner.scan_and_emit_exits() + assert emitted == 1 + publisher.publish.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_exit_scanner_does_not_emit_when_hold_not_elapsed( + db_session: AsyncSession, +): + mention_id = await _seed_video_and_mention(db_session) + trade = Trade( + ticker="NVDA", + side=TradeSide.BUY, + qty=10.0, + price=100.0, + strategy_id=KEVIN_STRATEGY_UUID, + status=TradeStatus.FILLED, + ) + db_session.add(trade) + await db_session.flush() + + # decided_at 2 days ago + hold=10d -> not elapsed + audit = KevinSignalBridgeState( + mention_id=mention_id, + bridge_status=BridgeStatus.EMITTED, + trade_id=trade.id, + notes="BUY hold=10d", + decided_at=datetime.now(timezone.utc) - timedelta(days=2), + ) + db_session.add(audit) + await db_session.flush() + + publisher = AsyncMock() + config = MagicMock(kevin_hold_days={"unspecified": 10}) + + scanner = ExitScanner( + session_factory=_factory(db_session), publisher=publisher, config=config + ) + emitted = await scanner.scan_and_emit_exits() + assert emitted == 0 + publisher.publish.assert_not_called()