feat(kevin_bridge): exit-scan daily job + cursor + audit writer
Some checks failed
ci/woodpecker/push/woodpecker Pipeline was canceled
Some checks failed
ci/woodpecker/push/woodpecker Pipeline was canceled
This commit is contained in:
parent
a417cae77b
commit
cff2564428
6 changed files with 480 additions and 0 deletions
50
services/kevin_signal_bridge/audit.py
Normal file
50
services/kevin_signal_bridge/audit.py
Normal file
|
|
@ -0,0 +1,50 @@
|
||||||
|
"""Audit writer: upserts a row in kevin_signal_bridge_state per processed mention."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
from decimal import Decimal
|
||||||
|
from typing import Any, Callable
|
||||||
|
|
||||||
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||||
|
|
||||||
|
from shared.models.meet_kevin_trading import KevinSignalBridgeState
|
||||||
|
|
||||||
|
|
||||||
|
class AuditWriter:
|
||||||
|
def __init__(self, session_factory: Callable[..., Any]) -> None:
|
||||||
|
self.session_factory = session_factory
|
||||||
|
|
||||||
|
async def write(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
mention_id: int,
|
||||||
|
bridge_status: str,
|
||||||
|
effective_conviction: Decimal | None = None,
|
||||||
|
signal_id: uuid.UUID | None = None,
|
||||||
|
trade_id: uuid.UUID | None = None,
|
||||||
|
notes: str | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Upsert one audit row (mention_id is unique)."""
|
||||||
|
async with self.session_factory() as session:
|
||||||
|
stmt = pg_insert(KevinSignalBridgeState).values(
|
||||||
|
id=uuid.uuid4(),
|
||||||
|
mention_id=mention_id,
|
||||||
|
bridge_status=bridge_status,
|
||||||
|
effective_conviction=effective_conviction,
|
||||||
|
signal_id=signal_id,
|
||||||
|
trade_id=trade_id,
|
||||||
|
notes=notes,
|
||||||
|
)
|
||||||
|
stmt = stmt.on_conflict_do_update(
|
||||||
|
index_elements=["mention_id"],
|
||||||
|
set_={
|
||||||
|
"bridge_status": stmt.excluded.bridge_status,
|
||||||
|
"effective_conviction": stmt.excluded.effective_conviction,
|
||||||
|
"signal_id": stmt.excluded.signal_id,
|
||||||
|
"trade_id": stmt.excluded.trade_id,
|
||||||
|
"notes": stmt.excluded.notes,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
await session.execute(stmt)
|
||||||
|
await session.commit()
|
||||||
28
services/kevin_signal_bridge/cursor.py
Normal file
28
services/kevin_signal_bridge/cursor.py
Normal file
|
|
@ -0,0 +1,28 @@
|
||||||
|
"""Redis-backed cursor for the bridge.
|
||||||
|
|
||||||
|
Tracks the highest `kevin_stock_mentions.id` we've already attempted to
|
||||||
|
process. Stored at key `kevin:bridge:last_mention_id`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
class RedisCursor:
|
||||||
|
_KEY = "kevin:bridge:last_mention_id"
|
||||||
|
|
||||||
|
def __init__(self, redis: Any) -> None:
|
||||||
|
self.redis = redis
|
||||||
|
|
||||||
|
async def last_seen_id(self) -> int:
|
||||||
|
v = await self.redis.get(self._KEY)
|
||||||
|
if v is None:
|
||||||
|
return 0
|
||||||
|
return int(v)
|
||||||
|
|
||||||
|
async def advance(self, mention_id: int) -> None:
|
||||||
|
# Conservative: only set if mention_id > current
|
||||||
|
current = await self.last_seen_id()
|
||||||
|
if mention_id > current:
|
||||||
|
await self.redis.set(self._KEY, str(mention_id))
|
||||||
104
services/kevin_signal_bridge/exit_scanner.py
Normal file
104
services/kevin_signal_bridge/exit_scanner.py
Normal file
|
|
@ -0,0 +1,104 @@
|
||||||
|
"""Daily exit-scan job for the Kevin bridge.
|
||||||
|
|
||||||
|
Walks open Kevin trades (Trade.strategy_id == KEVIN_STRATEGY_UUID, status
|
||||||
|
FILLED, no offsetting SELL) and publishes EXIT TradeSignals for any whose
|
||||||
|
holding period has elapsed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from typing import Any, Callable
|
||||||
|
|
||||||
|
from sqlalchemy import and_, select
|
||||||
|
|
||||||
|
from shared.constants.kevin import KEVIN_STRATEGY_UUID
|
||||||
|
from shared.models.meet_kevin_trading import KevinSignalBridgeState
|
||||||
|
from shared.models.trading import Trade, TradeStatus
|
||||||
|
from shared.schemas.trading import SignalDirection, TradeSignal
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ExitScanner:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
session_factory: Callable[..., Any],
|
||||||
|
publisher: Any,
|
||||||
|
config: Any,
|
||||||
|
) -> None:
|
||||||
|
self.session_factory = session_factory
|
||||||
|
self.publisher = publisher
|
||||||
|
self.config = config
|
||||||
|
|
||||||
|
async def scan_and_emit_exits(self) -> int:
|
||||||
|
"""Returns the number of EXIT signals emitted."""
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
emitted = 0
|
||||||
|
async with self.session_factory() as session:
|
||||||
|
# Find open Kevin trades (FILLED, no closing trade yet on same ticker)
|
||||||
|
open_trades = (
|
||||||
|
(
|
||||||
|
await session.execute(
|
||||||
|
select(Trade).where(
|
||||||
|
and_(
|
||||||
|
Trade.strategy_id == KEVIN_STRATEGY_UUID,
|
||||||
|
Trade.status == TradeStatus.FILLED,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.scalars()
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
|
||||||
|
for trade in open_trades:
|
||||||
|
# Find the source audit row to learn the original holding_days target
|
||||||
|
async with self.session_factory() as session:
|
||||||
|
audit = (
|
||||||
|
(
|
||||||
|
await session.execute(
|
||||||
|
select(KevinSignalBridgeState).where(
|
||||||
|
KevinSignalBridgeState.trade_id == trade.id
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.scalars()
|
||||||
|
.one_or_none()
|
||||||
|
)
|
||||||
|
if audit is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
hold_days = self._holding_days(audit)
|
||||||
|
target_exit_at = audit.decided_at + timedelta(days=hold_days)
|
||||||
|
if now < target_exit_at:
|
||||||
|
continue
|
||||||
|
|
||||||
|
signal = TradeSignal(
|
||||||
|
ticker=trade.ticker,
|
||||||
|
direction=SignalDirection.EXIT,
|
||||||
|
strength=1.0,
|
||||||
|
strategy_id=KEVIN_STRATEGY_UUID,
|
||||||
|
strategy_sources=[f"kevin:exit_scan:hold_expired:{hold_days}d"],
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
await self.publisher.publish(signal)
|
||||||
|
emitted += 1
|
||||||
|
except Exception:
|
||||||
|
logger.exception("exit-scan publish failed for trade %s", trade.id)
|
||||||
|
|
||||||
|
return emitted
|
||||||
|
|
||||||
|
def _holding_days(self, audit: KevinSignalBridgeState) -> int:
|
||||||
|
"""Best-effort holding days from notes; fallback to config default."""
|
||||||
|
notes = audit.notes or ""
|
||||||
|
# Try to find ' hold=Nd' in the audit notes
|
||||||
|
for token in notes.split():
|
||||||
|
if token.startswith("hold=") and token.endswith("d"):
|
||||||
|
try:
|
||||||
|
return int(token.removeprefix("hold=").removesuffix("d"))
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
default_map = getattr(self.config, "kevin_hold_days", {})
|
||||||
|
return int(default_map.get("unspecified", 10))
|
||||||
117
tests/services/kevin_signal_bridge/test_audit.py
Normal file
117
tests/services/kevin_signal_bridge/test_audit.py
Normal file
|
|
@ -0,0 +1,117 @@
|
||||||
|
"""Tests for the audit writer (upsert into kevin_signal_bridge_state)."""
|
||||||
|
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from decimal import Decimal
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from services.kevin_signal_bridge.audit import AuditWriter
|
||||||
|
from shared.models.meet_kevin import (
|
||||||
|
KevinAnalysis,
|
||||||
|
KevinChannel,
|
||||||
|
KevinMarketOutlook,
|
||||||
|
KevinStockMention,
|
||||||
|
KevinTickerAction,
|
||||||
|
KevinTimeHorizon,
|
||||||
|
KevinVideo,
|
||||||
|
KevinVideoStatus,
|
||||||
|
)
|
||||||
|
from shared.models.meet_kevin_trading import KevinSignalBridgeState
|
||||||
|
|
||||||
|
|
||||||
|
def _factory(session: AsyncSession):
|
||||||
|
class _StaticSessionFactory:
|
||||||
|
async def __aenter__(self):
|
||||||
|
return session
|
||||||
|
|
||||||
|
async def __aexit__(self, *args):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def factory():
|
||||||
|
return _StaticSessionFactory()
|
||||||
|
|
||||||
|
return factory
|
||||||
|
|
||||||
|
|
||||||
|
async def _seed_mention(session: AsyncSession) -> int:
|
||||||
|
channel = KevinChannel(youtube_channel_id="UCa", title="t")
|
||||||
|
session.add(channel)
|
||||||
|
await session.flush()
|
||||||
|
video = KevinVideo(
|
||||||
|
channel_id=channel.id,
|
||||||
|
youtube_video_id="va",
|
||||||
|
title="t",
|
||||||
|
published_at=datetime.now(timezone.utc),
|
||||||
|
status=KevinVideoStatus.ANALYZED,
|
||||||
|
)
|
||||||
|
session.add(video)
|
||||||
|
await session.flush()
|
||||||
|
analysis = KevinAnalysis(
|
||||||
|
video_id=video.id,
|
||||||
|
model="m",
|
||||||
|
prompt_version="v1",
|
||||||
|
market_outlook_direction=KevinMarketOutlook.NEUTRAL,
|
||||||
|
market_outlook_reasoning="x",
|
||||||
|
summary="x",
|
||||||
|
prompt_tokens=10,
|
||||||
|
completion_tokens=10,
|
||||||
|
cost_usd=Decimal("0.01"),
|
||||||
|
)
|
||||||
|
session.add(analysis)
|
||||||
|
await session.flush()
|
||||||
|
mention = KevinStockMention(
|
||||||
|
video_id=video.id,
|
||||||
|
analysis_id=analysis.id,
|
||||||
|
symbol="NVDA",
|
||||||
|
action=KevinTickerAction.BUY,
|
||||||
|
conviction=Decimal("0.7"),
|
||||||
|
time_horizon=KevinTimeHorizon.WEEKS,
|
||||||
|
rationale_quote="x",
|
||||||
|
)
|
||||||
|
session.add(mention)
|
||||||
|
await session.flush()
|
||||||
|
return mention.id
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_audit_writer_inserts_new_row(db_session: AsyncSession):
|
||||||
|
mention_id = await _seed_mention(db_session)
|
||||||
|
writer = AuditWriter(session_factory=_factory(db_session))
|
||||||
|
await writer.write(
|
||||||
|
mention_id=mention_id,
|
||||||
|
bridge_status="emitted",
|
||||||
|
effective_conviction=Decimal("0.75"),
|
||||||
|
notes="hello",
|
||||||
|
)
|
||||||
|
row = (
|
||||||
|
await db_session.execute(
|
||||||
|
select(KevinSignalBridgeState).where(
|
||||||
|
KevinSignalBridgeState.mention_id == mention_id
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).scalar_one()
|
||||||
|
assert row.bridge_status.value == "emitted"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_audit_writer_upserts_on_duplicate(db_session: AsyncSession):
|
||||||
|
mention_id = await _seed_mention(db_session)
|
||||||
|
writer = AuditWriter(session_factory=_factory(db_session))
|
||||||
|
await writer.write(mention_id=mention_id, bridge_status="dry_run", notes="first")
|
||||||
|
await writer.write(mention_id=mention_id, bridge_status="emitted", notes="second")
|
||||||
|
rows = (
|
||||||
|
(
|
||||||
|
await db_session.execute(
|
||||||
|
select(KevinSignalBridgeState).where(
|
||||||
|
KevinSignalBridgeState.mention_id == mention_id
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.scalars()
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
assert len(rows) == 1
|
||||||
|
assert rows[0].bridge_status.value == "emitted"
|
||||||
|
assert rows[0].notes == "second"
|
||||||
27
tests/services/kevin_signal_bridge/test_cursor.py
Normal file
27
tests/services/kevin_signal_bridge/test_cursor.py
Normal file
|
|
@ -0,0 +1,27 @@
|
||||||
|
"""Tests for the Redis cursor."""
|
||||||
|
|
||||||
|
import fakeredis.aioredis
|
||||||
|
|
||||||
|
from services.kevin_signal_bridge.cursor import RedisCursor
|
||||||
|
|
||||||
|
|
||||||
|
async def _redis():
|
||||||
|
return fakeredis.aioredis.FakeRedis()
|
||||||
|
|
||||||
|
|
||||||
|
async def test_cursor_defaults_to_zero():
|
||||||
|
cursor = RedisCursor(await _redis())
|
||||||
|
assert await cursor.last_seen_id() == 0
|
||||||
|
|
||||||
|
|
||||||
|
async def test_cursor_advance_persists_value():
|
||||||
|
cursor = RedisCursor(await _redis())
|
||||||
|
await cursor.advance(42)
|
||||||
|
assert await cursor.last_seen_id() == 42
|
||||||
|
|
||||||
|
|
||||||
|
async def test_cursor_advance_never_goes_backwards():
|
||||||
|
cursor = RedisCursor(await _redis())
|
||||||
|
await cursor.advance(42)
|
||||||
|
await cursor.advance(10)
|
||||||
|
assert await cursor.last_seen_id() == 42
|
||||||
154
tests/services/kevin_signal_bridge/test_exit_scanner.py
Normal file
154
tests/services/kevin_signal_bridge/test_exit_scanner.py
Normal file
|
|
@ -0,0 +1,154 @@
|
||||||
|
"""Tests for ExitScanner — emits EXIT signal when holding period elapsed."""
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from decimal import Decimal
|
||||||
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from services.kevin_signal_bridge.exit_scanner import ExitScanner
|
||||||
|
from shared.constants.kevin import KEVIN_STRATEGY_UUID
|
||||||
|
from shared.models.meet_kevin import (
|
||||||
|
KevinAnalysis,
|
||||||
|
KevinChannel,
|
||||||
|
KevinMarketOutlook,
|
||||||
|
KevinStockMention,
|
||||||
|
KevinTickerAction,
|
||||||
|
KevinTimeHorizon,
|
||||||
|
KevinVideo,
|
||||||
|
KevinVideoStatus,
|
||||||
|
)
|
||||||
|
from shared.models.meet_kevin_trading import BridgeStatus, KevinSignalBridgeState
|
||||||
|
from shared.models.trading import Trade, TradeSide, TradeStatus
|
||||||
|
|
||||||
|
|
||||||
|
def _factory(session: AsyncSession):
|
||||||
|
class _StaticSessionFactory:
|
||||||
|
async def __aenter__(self):
|
||||||
|
return session
|
||||||
|
|
||||||
|
async def __aexit__(self, *args):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def factory():
|
||||||
|
return _StaticSessionFactory()
|
||||||
|
|
||||||
|
return factory
|
||||||
|
|
||||||
|
|
||||||
|
async def _seed_video_and_mention(session: AsyncSession) -> int:
|
||||||
|
channel = KevinChannel(youtube_channel_id="UCex", title="t")
|
||||||
|
session.add(channel)
|
||||||
|
await session.flush()
|
||||||
|
video = KevinVideo(
|
||||||
|
channel_id=channel.id,
|
||||||
|
youtube_video_id="vex",
|
||||||
|
title="t",
|
||||||
|
published_at=datetime.now(timezone.utc),
|
||||||
|
status=KevinVideoStatus.ANALYZED,
|
||||||
|
)
|
||||||
|
session.add(video)
|
||||||
|
await session.flush()
|
||||||
|
analysis = KevinAnalysis(
|
||||||
|
video_id=video.id,
|
||||||
|
model="m",
|
||||||
|
prompt_version="v1",
|
||||||
|
market_outlook_direction=KevinMarketOutlook.NEUTRAL,
|
||||||
|
market_outlook_reasoning="x",
|
||||||
|
summary="x",
|
||||||
|
prompt_tokens=10,
|
||||||
|
completion_tokens=10,
|
||||||
|
cost_usd=Decimal("0.01"),
|
||||||
|
)
|
||||||
|
session.add(analysis)
|
||||||
|
await session.flush()
|
||||||
|
mention = KevinStockMention(
|
||||||
|
video_id=video.id,
|
||||||
|
analysis_id=analysis.id,
|
||||||
|
symbol="NVDA",
|
||||||
|
action=KevinTickerAction.BUY,
|
||||||
|
conviction=Decimal("0.7"),
|
||||||
|
time_horizon=KevinTimeHorizon.WEEKS,
|
||||||
|
rationale_quote="x",
|
||||||
|
)
|
||||||
|
session.add(mention)
|
||||||
|
await session.flush()
|
||||||
|
return mention.id
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_exit_scanner_emits_on_elapsed_hold(db_session: AsyncSession):
|
||||||
|
mention_id = await _seed_video_and_mention(db_session)
|
||||||
|
|
||||||
|
# Create a filled Kevin trade 30 days ago
|
||||||
|
trade = Trade(
|
||||||
|
ticker="NVDA",
|
||||||
|
side=TradeSide.BUY,
|
||||||
|
qty=10.0,
|
||||||
|
price=100.0,
|
||||||
|
strategy_id=KEVIN_STRATEGY_UUID,
|
||||||
|
status=TradeStatus.FILLED,
|
||||||
|
)
|
||||||
|
db_session.add(trade)
|
||||||
|
await db_session.flush()
|
||||||
|
|
||||||
|
# Audit row with decided_at 30 days ago + hold=10d in notes
|
||||||
|
audit = KevinSignalBridgeState(
|
||||||
|
mention_id=mention_id,
|
||||||
|
bridge_status=BridgeStatus.EMITTED,
|
||||||
|
trade_id=trade.id,
|
||||||
|
notes="BUY conv=0.7 -> 2% target=$2000 hold=10d",
|
||||||
|
decided_at=datetime.now(timezone.utc) - timedelta(days=30),
|
||||||
|
)
|
||||||
|
db_session.add(audit)
|
||||||
|
await db_session.flush()
|
||||||
|
|
||||||
|
publisher = AsyncMock()
|
||||||
|
publisher.publish.return_value = "1234-0"
|
||||||
|
config = MagicMock(kevin_hold_days={"unspecified": 10})
|
||||||
|
|
||||||
|
scanner = ExitScanner(
|
||||||
|
session_factory=_factory(db_session), publisher=publisher, config=config
|
||||||
|
)
|
||||||
|
emitted = await scanner.scan_and_emit_exits()
|
||||||
|
assert emitted == 1
|
||||||
|
publisher.publish.assert_awaited_once()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_exit_scanner_does_not_emit_when_hold_not_elapsed(
|
||||||
|
db_session: AsyncSession,
|
||||||
|
):
|
||||||
|
mention_id = await _seed_video_and_mention(db_session)
|
||||||
|
trade = Trade(
|
||||||
|
ticker="NVDA",
|
||||||
|
side=TradeSide.BUY,
|
||||||
|
qty=10.0,
|
||||||
|
price=100.0,
|
||||||
|
strategy_id=KEVIN_STRATEGY_UUID,
|
||||||
|
status=TradeStatus.FILLED,
|
||||||
|
)
|
||||||
|
db_session.add(trade)
|
||||||
|
await db_session.flush()
|
||||||
|
|
||||||
|
# decided_at 2 days ago + hold=10d -> not elapsed
|
||||||
|
audit = KevinSignalBridgeState(
|
||||||
|
mention_id=mention_id,
|
||||||
|
bridge_status=BridgeStatus.EMITTED,
|
||||||
|
trade_id=trade.id,
|
||||||
|
notes="BUY hold=10d",
|
||||||
|
decided_at=datetime.now(timezone.utc) - timedelta(days=2),
|
||||||
|
)
|
||||||
|
db_session.add(audit)
|
||||||
|
await db_session.flush()
|
||||||
|
|
||||||
|
publisher = AsyncMock()
|
||||||
|
config = MagicMock(kevin_hold_days={"unspecified": 10})
|
||||||
|
|
||||||
|
scanner = ExitScanner(
|
||||||
|
session_factory=_factory(db_session), publisher=publisher, config=config
|
||||||
|
)
|
||||||
|
emitted = await scanner.scan_and_emit_exits()
|
||||||
|
assert emitted == 0
|
||||||
|
publisher.publish.assert_not_called()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue