feat(kevin_bridge): multi-mention aggregator with capped conviction boost
Some checks failed
ci/woodpecker/push/woodpecker Pipeline was canceled
Some checks failed
ci/woodpecker/push/woodpecker Pipeline was canceled
This commit is contained in:
parent
adbd7f3c65
commit
3347847e38
2 changed files with 279 additions and 0 deletions
97
services/kevin_signal_bridge/aggregator.py
Normal file
97
services/kevin_signal_bridge/aggregator.py
Normal file
|
|
@ -0,0 +1,97 @@
|
||||||
|
"""Multi-mention windowed aggregation.
|
||||||
|
|
||||||
|
Reads kevin_stock_mentions since the cursor, groups by symbol within a
|
||||||
|
48h trailing window, applies a conviction boost = boost_per_repeat * extras
|
||||||
|
capped at max_boost.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from decimal import Decimal
|
||||||
|
from typing import Any, Callable
|
||||||
|
|
||||||
|
from sqlalchemy import and_, select
|
||||||
|
|
||||||
|
from shared.models.meet_kevin import KevinStockMention
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AggregatedMention:
|
||||||
|
"""Mention proxy with effective_conviction set after aggregation."""
|
||||||
|
|
||||||
|
id: int
|
||||||
|
symbol: str
|
||||||
|
action: Any
|
||||||
|
conviction: Decimal
|
||||||
|
time_horizon: Any
|
||||||
|
created_at: datetime
|
||||||
|
rationale_quote: str
|
||||||
|
effective_conviction: Decimal
|
||||||
|
|
||||||
|
|
||||||
|
class MentionAggregator:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
session_factory: Callable[..., Any],
|
||||||
|
window_hours: int = 48,
|
||||||
|
boost_per_repeat: Decimal = Decimal("0.05"),
|
||||||
|
max_boost: Decimal = Decimal("0.20"),
|
||||||
|
) -> None:
|
||||||
|
self.session_factory = session_factory
|
||||||
|
self.window_hours = window_hours
|
||||||
|
self.boost_per_repeat = boost_per_repeat
|
||||||
|
self.max_boost = max_boost
|
||||||
|
|
||||||
|
async def fetch_pending(self, since_id: int) -> list[AggregatedMention]:
|
||||||
|
async with self.session_factory() as session:
|
||||||
|
unprocessed = (
|
||||||
|
(
|
||||||
|
await session.execute(
|
||||||
|
select(KevinStockMention)
|
||||||
|
.where(KevinStockMention.id > since_id)
|
||||||
|
.order_by(KevinStockMention.created_at.asc())
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.scalars()
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
|
||||||
|
if not unprocessed:
|
||||||
|
return []
|
||||||
|
|
||||||
|
out: list[AggregatedMention] = []
|
||||||
|
for m in unprocessed:
|
||||||
|
window_start = m.created_at - timedelta(hours=self.window_hours)
|
||||||
|
same_symbol_in_window = (
|
||||||
|
(
|
||||||
|
await session.execute(
|
||||||
|
select(KevinStockMention).where(
|
||||||
|
and_(
|
||||||
|
KevinStockMention.symbol == m.symbol,
|
||||||
|
KevinStockMention.created_at >= window_start,
|
||||||
|
KevinStockMention.created_at <= m.created_at,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.scalars()
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
extras = max(0, len(same_symbol_in_window) - 1)
|
||||||
|
boost = min(self.max_boost, self.boost_per_repeat * extras)
|
||||||
|
effective = min(Decimal("1.0"), m.conviction + boost)
|
||||||
|
out.append(
|
||||||
|
AggregatedMention(
|
||||||
|
id=m.id,
|
||||||
|
symbol=m.symbol,
|
||||||
|
action=m.action,
|
||||||
|
conviction=m.conviction,
|
||||||
|
time_horizon=m.time_horizon,
|
||||||
|
created_at=m.created_at,
|
||||||
|
rationale_quote=m.rationale_quote,
|
||||||
|
effective_conviction=effective,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return out
|
||||||
182
tests/services/kevin_signal_bridge/test_aggregator.py
Normal file
182
tests/services/kevin_signal_bridge/test_aggregator.py
Normal file
|
|
@ -0,0 +1,182 @@
|
||||||
|
"""Tests for the MentionAggregator — multi-mention window + conviction boost."""
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from decimal import Decimal
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from services.kevin_signal_bridge.aggregator import MentionAggregator
|
||||||
|
from shared.models.meet_kevin import (
|
||||||
|
KevinAnalysis,
|
||||||
|
KevinChannel,
|
||||||
|
KevinMarketOutlook,
|
||||||
|
KevinStockMention,
|
||||||
|
KevinTickerAction,
|
||||||
|
KevinTimeHorizon,
|
||||||
|
KevinVideo,
|
||||||
|
KevinVideoStatus,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _seed_channel_video(
|
||||||
|
session: AsyncSession, suffix: str = "1"
|
||||||
|
) -> tuple[int, int]:
|
||||||
|
channel = KevinChannel(youtube_channel_id=f"UC{suffix}", title="t")
|
||||||
|
session.add(channel)
|
||||||
|
await session.flush()
|
||||||
|
video = KevinVideo(
|
||||||
|
channel_id=channel.id,
|
||||||
|
youtube_video_id=f"v{suffix}",
|
||||||
|
title="t",
|
||||||
|
published_at=datetime.now(timezone.utc),
|
||||||
|
status=KevinVideoStatus.ANALYZED,
|
||||||
|
)
|
||||||
|
session.add(video)
|
||||||
|
await session.flush()
|
||||||
|
analysis = KevinAnalysis(
|
||||||
|
video_id=video.id,
|
||||||
|
model="m",
|
||||||
|
prompt_version="v1",
|
||||||
|
market_outlook_direction=KevinMarketOutlook.NEUTRAL,
|
||||||
|
market_outlook_reasoning="x",
|
||||||
|
summary="x",
|
||||||
|
prompt_tokens=10,
|
||||||
|
completion_tokens=10,
|
||||||
|
cost_usd=Decimal("0.01"),
|
||||||
|
)
|
||||||
|
session.add(analysis)
|
||||||
|
await session.flush()
|
||||||
|
return video.id, analysis.id
|
||||||
|
|
||||||
|
|
||||||
|
def _factory(session: AsyncSession):
|
||||||
|
"""Return a session_factory that returns the same session."""
|
||||||
|
|
||||||
|
class _StaticSessionFactory:
|
||||||
|
async def __aenter__(self):
|
||||||
|
return session
|
||||||
|
|
||||||
|
async def __aexit__(self, *args):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def factory():
|
||||||
|
return _StaticSessionFactory()
|
||||||
|
|
||||||
|
return factory
|
||||||
|
|
||||||
|
|
||||||
|
async def _insert_mention(
|
||||||
|
session: AsyncSession,
|
||||||
|
video_id: int,
|
||||||
|
analysis_id: int,
|
||||||
|
symbol: str,
|
||||||
|
conviction: str,
|
||||||
|
when: datetime,
|
||||||
|
action: KevinTickerAction = KevinTickerAction.BUY,
|
||||||
|
) -> KevinStockMention:
|
||||||
|
m = KevinStockMention(
|
||||||
|
video_id=video_id,
|
||||||
|
analysis_id=analysis_id,
|
||||||
|
symbol=symbol,
|
||||||
|
action=action,
|
||||||
|
conviction=Decimal(conviction),
|
||||||
|
time_horizon=KevinTimeHorizon.WEEKS,
|
||||||
|
rationale_quote="x",
|
||||||
|
)
|
||||||
|
session.add(m)
|
||||||
|
await session.flush()
|
||||||
|
# Override created_at
|
||||||
|
m.created_at = when
|
||||||
|
session.add(m)
|
||||||
|
await session.flush()
|
||||||
|
return m
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_aggregator_returns_all_unseen_mentions(db_session: AsyncSession):
|
||||||
|
video_id, analysis_id = await _seed_channel_video(db_session)
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
m1 = await _insert_mention(
|
||||||
|
db_session, video_id, analysis_id, "NVDA", "0.7", now - timedelta(hours=10)
|
||||||
|
)
|
||||||
|
m2 = await _insert_mention(
|
||||||
|
db_session, video_id, analysis_id, "NVDA", "0.7", now - timedelta(hours=5)
|
||||||
|
)
|
||||||
|
m3 = await _insert_mention(
|
||||||
|
db_session, video_id, analysis_id, "INTC", "0.7", now - timedelta(hours=2)
|
||||||
|
)
|
||||||
|
|
||||||
|
agg = MentionAggregator(session_factory=_factory(db_session))
|
||||||
|
pending = await agg.fetch_pending(since_id=0)
|
||||||
|
ids = {p.id for p in pending}
|
||||||
|
assert ids == {m1.id, m2.id, m3.id}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_aggregator_applies_conviction_boost(db_session: AsyncSession):
|
||||||
|
video_id, analysis_id = await _seed_channel_video(db_session, "boost")
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
await _insert_mention(
|
||||||
|
db_session, video_id, analysis_id, "NVDA", "0.7", now - timedelta(hours=10)
|
||||||
|
)
|
||||||
|
m2 = await _insert_mention(
|
||||||
|
db_session, video_id, analysis_id, "NVDA", "0.7", now - timedelta(hours=5)
|
||||||
|
)
|
||||||
|
|
||||||
|
agg = MentionAggregator(
|
||||||
|
session_factory=_factory(db_session),
|
||||||
|
window_hours=48,
|
||||||
|
boost_per_repeat=Decimal("0.05"),
|
||||||
|
)
|
||||||
|
pending = await agg.fetch_pending(since_id=0)
|
||||||
|
# The second NVDA mention should have effective_conviction = 0.7 + 0.05 = 0.75
|
||||||
|
by_id = {p.id: p for p in pending}
|
||||||
|
assert by_id[m2.id].effective_conviction == Decimal("0.75")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_aggregator_caps_boost_at_max(db_session: AsyncSession):
|
||||||
|
video_id, analysis_id = await _seed_channel_video(db_session, "cap")
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
# 6 mentions -> 5 extras -> boost 5*0.05 = 0.25 -> capped at 0.20
|
||||||
|
last_m = None
|
||||||
|
for i in range(6):
|
||||||
|
last_m = await _insert_mention(
|
||||||
|
db_session,
|
||||||
|
video_id,
|
||||||
|
analysis_id,
|
||||||
|
"NVDA",
|
||||||
|
"0.7",
|
||||||
|
now - timedelta(hours=20 - i),
|
||||||
|
)
|
||||||
|
|
||||||
|
agg = MentionAggregator(
|
||||||
|
session_factory=_factory(db_session),
|
||||||
|
window_hours=48,
|
||||||
|
boost_per_repeat=Decimal("0.05"),
|
||||||
|
max_boost=Decimal("0.20"),
|
||||||
|
)
|
||||||
|
pending = await agg.fetch_pending(since_id=0)
|
||||||
|
by_id = {p.id: p for p in pending}
|
||||||
|
assert last_m is not None
|
||||||
|
# capped at 0.7 + 0.20 = 0.90
|
||||||
|
assert by_id[last_m.id].effective_conviction == Decimal("0.90")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_aggregator_excludes_already_processed(db_session: AsyncSession):
|
||||||
|
video_id, analysis_id = await _seed_channel_video(db_session, "excl")
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
m1 = await _insert_mention(
|
||||||
|
db_session, video_id, analysis_id, "NVDA", "0.7", now - timedelta(hours=10)
|
||||||
|
)
|
||||||
|
m2 = await _insert_mention(
|
||||||
|
db_session, video_id, analysis_id, "INTC", "0.7", now - timedelta(hours=5)
|
||||||
|
)
|
||||||
|
|
||||||
|
agg = MentionAggregator(session_factory=_factory(db_session))
|
||||||
|
pending = await agg.fetch_pending(since_id=m1.id)
|
||||||
|
ids = {p.id for p in pending}
|
||||||
|
assert m1.id not in ids
|
||||||
|
assert m2.id in ids
|
||||||
Loading…
Add table
Add a link
Reference in a new issue