From 3347847e38c827c60ad07c9bfdd839d733116feb Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 24 May 2026 01:01:02 +0000 Subject: [PATCH] feat(kevin_bridge): multi-mention aggregator with capped conviction boost --- services/kevin_signal_bridge/aggregator.py | 97 ++++++++++ .../kevin_signal_bridge/test_aggregator.py | 182 ++++++++++++++++++ 2 files changed, 279 insertions(+) create mode 100644 services/kevin_signal_bridge/aggregator.py create mode 100644 tests/services/kevin_signal_bridge/test_aggregator.py diff --git a/services/kevin_signal_bridge/aggregator.py b/services/kevin_signal_bridge/aggregator.py new file mode 100644 index 0000000..0d55ad3 --- /dev/null +++ b/services/kevin_signal_bridge/aggregator.py @@ -0,0 +1,97 @@ +"""Multi-mention windowed aggregation. + +Reads kevin_stock_mentions since the cursor, groups by symbol within a +48h trailing window, applies a conviction boost = boost_per_repeat * extras +capped at max_boost. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta +from decimal import Decimal +from typing import Any, Callable + +from sqlalchemy import and_, select + +from shared.models.meet_kevin import KevinStockMention + + +@dataclass +class AggregatedMention: + """Mention proxy with effective_conviction set after aggregation.""" + + id: int + symbol: str + action: Any + conviction: Decimal + time_horizon: Any + created_at: datetime + rationale_quote: str + effective_conviction: Decimal + + +class MentionAggregator: + def __init__( + self, + session_factory: Callable[..., Any], + window_hours: int = 48, + boost_per_repeat: Decimal = Decimal("0.05"), + max_boost: Decimal = Decimal("0.20"), + ) -> None: + self.session_factory = session_factory + self.window_hours = window_hours + self.boost_per_repeat = boost_per_repeat + self.max_boost = max_boost + + async def fetch_pending(self, since_id: int) -> list[AggregatedMention]: + async with self.session_factory() as session: + unprocessed = ( + ( + await session.execute( + select(KevinStockMention) + .where(KevinStockMention.id > since_id) + .order_by(KevinStockMention.created_at.asc()) + ) + ) + .scalars() + .all() + ) + + if not unprocessed: + return [] + + out: list[AggregatedMention] = [] + for m in unprocessed: + window_start = m.created_at - timedelta(hours=self.window_hours) + same_symbol_in_window = ( + ( + await session.execute( + select(KevinStockMention).where( + and_( + KevinStockMention.symbol == m.symbol, + KevinStockMention.created_at >= window_start, + KevinStockMention.created_at <= m.created_at, + ) + ) + ) + ) + .scalars() + .all() + ) + extras = max(0, len(same_symbol_in_window) - 1) + boost = min(self.max_boost, self.boost_per_repeat * extras) + effective = min(Decimal("1.0"), m.conviction + boost) + out.append( + AggregatedMention( + id=m.id, + symbol=m.symbol, + action=m.action, + conviction=m.conviction, + time_horizon=m.time_horizon, + created_at=m.created_at, + rationale_quote=m.rationale_quote, + effective_conviction=effective, + ) + ) + return out diff --git a/tests/services/kevin_signal_bridge/test_aggregator.py b/tests/services/kevin_signal_bridge/test_aggregator.py new file mode 100644 index 0000000..f217e93 --- /dev/null +++ b/tests/services/kevin_signal_bridge/test_aggregator.py @@ -0,0 +1,182 @@ +"""Tests for the MentionAggregator — multi-mention window + conviction boost.""" + +from datetime import datetime, timedelta, timezone +from decimal import Decimal + +import pytest +from sqlalchemy.ext.asyncio import AsyncSession + +from services.kevin_signal_bridge.aggregator import MentionAggregator +from shared.models.meet_kevin import ( + KevinAnalysis, + KevinChannel, + KevinMarketOutlook, + KevinStockMention, + KevinTickerAction, + KevinTimeHorizon, + KevinVideo, + KevinVideoStatus, +) + + +async def _seed_channel_video( + session: AsyncSession, suffix: str = "1" +) -> tuple[int, int]: + channel = KevinChannel(youtube_channel_id=f"UC{suffix}", title="t") + session.add(channel) + await session.flush() + video = KevinVideo( + channel_id=channel.id, + youtube_video_id=f"v{suffix}", + title="t", + published_at=datetime.now(timezone.utc), + status=KevinVideoStatus.ANALYZED, + ) + session.add(video) + await session.flush() + analysis = KevinAnalysis( + video_id=video.id, + model="m", + prompt_version="v1", + market_outlook_direction=KevinMarketOutlook.NEUTRAL, + market_outlook_reasoning="x", + summary="x", + prompt_tokens=10, + completion_tokens=10, + cost_usd=Decimal("0.01"), + ) + session.add(analysis) + await session.flush() + return video.id, analysis.id + + +def _factory(session: AsyncSession): + """Return a session_factory that returns the same session.""" + + class _StaticSessionFactory: + async def __aenter__(self): + return session + + async def __aexit__(self, *args): + pass + + def factory(): + return _StaticSessionFactory() + + return factory + + +async def _insert_mention( + session: AsyncSession, + video_id: int, + analysis_id: int, + symbol: str, + conviction: str, + when: datetime, + action: KevinTickerAction = KevinTickerAction.BUY, +) -> KevinStockMention: + m = KevinStockMention( + video_id=video_id, + analysis_id=analysis_id, + symbol=symbol, + action=action, + conviction=Decimal(conviction), + time_horizon=KevinTimeHorizon.WEEKS, + rationale_quote="x", + ) + session.add(m) + await session.flush() + # Override created_at + m.created_at = when + session.add(m) + await session.flush() + return m + + +@pytest.mark.asyncio +async def test_aggregator_returns_all_unseen_mentions(db_session: AsyncSession): + video_id, analysis_id = await _seed_channel_video(db_session) + now = datetime.now(timezone.utc) + m1 = await _insert_mention( + db_session, video_id, analysis_id, "NVDA", "0.7", now - timedelta(hours=10) + ) + m2 = await _insert_mention( + db_session, video_id, analysis_id, "NVDA", "0.7", now - timedelta(hours=5) + ) + m3 = await _insert_mention( + db_session, video_id, analysis_id, "INTC", "0.7", now - timedelta(hours=2) + ) + + agg = MentionAggregator(session_factory=_factory(db_session)) + pending = await agg.fetch_pending(since_id=0) + ids = {p.id for p in pending} + assert ids == {m1.id, m2.id, m3.id} + + +@pytest.mark.asyncio +async def test_aggregator_applies_conviction_boost(db_session: AsyncSession): + video_id, analysis_id = await _seed_channel_video(db_session, "boost") + now = datetime.now(timezone.utc) + await _insert_mention( + db_session, video_id, analysis_id, "NVDA", "0.7", now - timedelta(hours=10) + ) + m2 = await _insert_mention( + db_session, video_id, analysis_id, "NVDA", "0.7", now - timedelta(hours=5) + ) + + agg = MentionAggregator( + session_factory=_factory(db_session), + window_hours=48, + boost_per_repeat=Decimal("0.05"), + ) + pending = await agg.fetch_pending(since_id=0) + # The second NVDA mention should have effective_conviction = 0.7 + 0.05 = 0.75 + by_id = {p.id: p for p in pending} + assert by_id[m2.id].effective_conviction == Decimal("0.75") + + +@pytest.mark.asyncio +async def test_aggregator_caps_boost_at_max(db_session: AsyncSession): + video_id, analysis_id = await _seed_channel_video(db_session, "cap") + now = datetime.now(timezone.utc) + # 6 mentions -> 5 extras -> boost 5*0.05 = 0.25 -> capped at 0.20 + last_m = None + for i in range(6): + last_m = await _insert_mention( + db_session, + video_id, + analysis_id, + "NVDA", + "0.7", + now - timedelta(hours=20 - i), + ) + + agg = MentionAggregator( + session_factory=_factory(db_session), + window_hours=48, + boost_per_repeat=Decimal("0.05"), + max_boost=Decimal("0.20"), + ) + pending = await agg.fetch_pending(since_id=0) + by_id = {p.id: p for p in pending} + assert last_m is not None + # capped at 0.7 + 0.20 = 0.90 + assert by_id[last_m.id].effective_conviction == Decimal("0.90") + + +@pytest.mark.asyncio +async def test_aggregator_excludes_already_processed(db_session: AsyncSession): + video_id, analysis_id = await _seed_channel_video(db_session, "excl") + now = datetime.now(timezone.utc) + m1 = await _insert_mention( + db_session, video_id, analysis_id, "NVDA", "0.7", now - timedelta(hours=10) + ) + m2 = await _insert_mention( + db_session, video_id, analysis_id, "INTC", "0.7", now - timedelta(hours=5) + ) + + agg = MentionAggregator(session_factory=_factory(db_session)) + pending = await agg.fetch_pending(since_id=m1.id) + ids = {p.id for p in pending} + assert m1.id not in ids + assert m2.id in ids