diff --git a/services/meet_kevin_watcher/main.py b/services/meet_kevin_watcher/main.py new file mode 100644 index 0000000..40f151e --- /dev/null +++ b/services/meet_kevin_watcher/main.py @@ -0,0 +1,250 @@ +"""Meet Kevin watcher service entry point. + +Polls YouTube RSS feeds for new Meet Kevin videos, deduplicates via +``INSERT … ON CONFLICT DO NOTHING``, and processes each video through +the caption-extraction + LLM-analysis pipeline. + +Usage: + python -m services.meet_kevin_watcher.main + # or via Docker ENTRYPOINT +""" + +import asyncio +import logging +import signal +from datetime import timezone +from decimal import Decimal + +import httpx +from anthropic import AsyncAnthropic +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from shared.db import create_db +from shared.telemetry import setup_telemetry +from services.meet_kevin_watcher.config import MeetKevinWatcherConfig +from services.meet_kevin_watcher.caption_extractor import extract_captions +from services.meet_kevin_watcher.llm_analyzer import LlmAnalyzer +from services.meet_kevin_watcher.pipeline import PipelineDeps, daily_cost_used, process_one_video +from services.meet_kevin_watcher.rss_poller import fetch_feed, parse_feed +from shared.models.meet_kevin import KevinChannel, KevinVideo + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# RSS polling helper +# --------------------------------------------------------------------------- + + +async def _poll_channels( + session_factory, + http_client: httpx.AsyncClient, + videos_discovered_counter, +) -> None: + """Fetch RSS for every poll_enabled channel and upsert new videos.""" + async with session_factory() as session: + result = await session.execute( + select(KevinChannel).where(KevinChannel.poll_enabled.is_(True)) + ) + channels = result.scalars().all() + + for channel in channels: + try: + xml_bytes = await fetch_feed(channel.youtube_channel_id, http_client) + discovered = parse_feed(xml_bytes) + if not discovered: + continue + + async with session_factory() as session: + for video in discovered: + stmt = ( + pg_insert(KevinVideo) + .values( + channel_id=channel.id, + youtube_video_id=video.youtube_video_id, + title=video.title, + description=video.description, + published_at=video.published_at, + thumbnail_url=video.thumbnail_url, + status="discovered", + retry_count=0, + ) + .on_conflict_do_nothing(index_elements=["youtube_video_id"]) + .returning(KevinVideo.id) + ) + row = await session.execute(stmt) + if row.scalar(): + videos_discovered_counter.add(1) + logger.info("Discovered new video: %s", video.youtube_video_id) + + await session.commit() + except Exception: + logger.exception("RSS poll failed for channel %s", channel.youtube_channel_id) + + +# --------------------------------------------------------------------------- +# Pipeline processing helper +# --------------------------------------------------------------------------- + + +async def _process_pending_videos( + session_factory, + deps: PipelineDeps, + captions_extracted_counter, + llm_calls_counter, + llm_cost_counter, +) -> None: + """Walk all videos with status discovered/captioned and advance each one.""" + async with session_factory() as session: + result = await session.execute( + select(KevinVideo) + .where(KevinVideo.status.in_(["discovered", "captioned"])) + .order_by(KevinVideo.published_at.asc()) + ) + videos = result.scalars().all() + + for video in videos: + async with session_factory() as session: + # Re-fetch inside its own session so we can commit per-video + result = await session.execute( + select(KevinVideo).where(KevinVideo.id == video.id) + ) + db_video = result.scalar_one() + + prev_status = str(db_video.status.value) if hasattr(db_video.status, "value") else str(db_video.status) + + try: + new_status = await process_one_video(db_video, session, deps) + await session.commit() + + # Update OTEL counters based on the transition + if prev_status == "discovered" and new_status == "captioned": + captions_extracted_counter.add(1) + elif prev_status == "captioned" and new_status == "analyzed": + llm_calls_counter.add(1) + # Retrieve cost from the most-recently-inserted analysis + from sqlalchemy import select as _sel + from shared.models.meet_kevin import KevinAnalysis + async with session_factory() as cost_session: + cost_row = await cost_session.execute( + _sel(KevinAnalysis.cost_usd) + .where(KevinAnalysis.video_id == db_video.id) + .order_by(KevinAnalysis.id.desc()) + .limit(1) + ) + cost = cost_row.scalar_one_or_none() + if cost is not None: + llm_cost_counter.add(float(cost)) + + except Exception: + logger.exception("Error processing video %s", db_video.youtube_video_id) + try: + await session.rollback() + except Exception: + pass + + +# --------------------------------------------------------------------------- +# Service entry point +# --------------------------------------------------------------------------- + + +async def run() -> None: + """Boot the Meet Kevin watcher and enter the main polling loop.""" + config = MeetKevinWatcherConfig() + + logging.basicConfig(level=config.log_level) + logger.info("Starting meet-kevin-watcher service") + + # Telemetry + meter = setup_telemetry(config.otel_service_name, config.otel_metrics_port) + videos_discovered_counter = meter.create_counter( + "meet_kevin.videos_discovered", + description="Total new Meet Kevin videos discovered via RSS", + ) + captions_extracted_counter = meter.create_counter( + "meet_kevin.captions_extracted", + description="Total videos with captions successfully extracted", + ) + llm_calls_counter = meter.create_counter( + "meet_kevin.llm_calls", + description="Total LLM analyze() calls made", + ) + llm_cost_counter = meter.create_counter( + "meet_kevin.llm_cost_usd", + description="Cumulative LLM spend in USD", + ) + + # Database + engine, session_factory = create_db(config) + + # Anthropic client + LLM analyzer + anthropic = AsyncAnthropic(api_key=config.anthropic_api_key) + analyzer = LlmAnalyzer( + client=anthropic, + model=config.meet_kevin_llm_model, + prompt_version=config.meet_kevin_prompt_version, + ) + + # Pipeline deps — wire real callables + async def _extract(video_id: str, workdir: str): + return await extract_captions(video_id, workdir) + + async def _analyze(**kwargs): + return await analyzer.analyze(**kwargs) + + async def _daily_cost(session): + return await daily_cost_used(session) + + deps = PipelineDeps( + extract_captions=_extract, + analyze=_analyze, + daily_cost_used=_daily_cost, + model=config.meet_kevin_llm_model, + prompt_version=config.meet_kevin_prompt_version, + daily_cost_cap_usd=Decimal(str(config.meet_kevin_daily_cost_cap_usd)), + workdir=config.meet_kevin_workdir, + ) + + # Graceful shutdown + shutdown_event = asyncio.Event() + loop = asyncio.get_running_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, shutdown_event.set) + + # Main loop + try: + async with httpx.AsyncClient() as http_client: + while not shutdown_event.is_set(): + try: + await _poll_channels( + session_factory, http_client, videos_discovered_counter + ) + await _process_pending_videos( + session_factory, + deps, + captions_extracted_counter, + llm_calls_counter, + llm_cost_counter, + ) + except Exception: + logger.exception("Main loop iteration failed") + + # Wait for next poll interval (or shutdown signal) + try: + await asyncio.wait_for( + shutdown_event.wait(), + timeout=config.meet_kevin_poll_interval_seconds, + ) + break # Shutdown signaled + except asyncio.TimeoutError: + pass # Normal timeout — loop again + finally: + await anthropic.close() + await engine.dispose() + logger.info("meet-kevin-watcher stopped gracefully") + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/services/meet_kevin_watcher/pipeline.py b/services/meet_kevin_watcher/pipeline.py new file mode 100644 index 0000000..0f114ef --- /dev/null +++ b/services/meet_kevin_watcher/pipeline.py @@ -0,0 +1,263 @@ +"""Meet Kevin pipeline orchestrator. + +Contains the per-video state-machine (process_one_video) and the daily +cost accounting helper (daily_cost_used). Both are designed for +dependency injection so they are fully unit-testable without a real DB +or LLM backend. + +Public exports: + PipelineDeps — frozen dataclass carrying all injected callables + config + process_one_video — advance one KevinVideo by one pipeline stage + daily_cost_used — sum today's LLM spend from kevin_analyses +""" + +import logging +from dataclasses import dataclass +from datetime import datetime, timezone +from decimal import Decimal +from typing import Any, Callable, Coroutine + +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from shared.models.meet_kevin import ( + KevinAnalysis, + KevinStockMention, + KevinTranscript, + KevinVideo, +) +from services.meet_kevin_watcher.caption_extractor import CaptionResult +from services.meet_kevin_watcher.llm_analyzer import LlmCallResult + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Dependency-injection container +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class PipelineDeps: + """Injected dependencies for the pipeline, making it unit-testable. + + All async callables match the signatures of the real implementations + but can be replaced with AsyncMock in tests. + """ + + extract_captions: Callable[..., Coroutine[Any, Any, CaptionResult | None]] + """Async callable: (video_id: str, workdir: str) -> CaptionResult | None""" + + analyze: Callable[..., Coroutine[Any, Any, LlmCallResult]] + """Async callable: (**kwargs) -> LlmCallResult""" + + daily_cost_used: Callable[..., Coroutine[Any, Any, Decimal]] + """Async callable: (session: AsyncSession) -> Decimal""" + + model: str + """LLM model identifier stored in kevin_analyses.model.""" + + prompt_version: str + """Prompt version string stored in kevin_analyses.prompt_version.""" + + daily_cost_cap_usd: Decimal + """Hard ceiling for total LLM spend per calendar day (UTC).""" + + workdir: str + """Filesystem directory for yt-dlp caption downloads.""" + + +# --------------------------------------------------------------------------- +# Daily cost accounting +# --------------------------------------------------------------------------- + + +async def daily_cost_used(session: AsyncSession) -> Decimal: + """Return total LLM cost incurred today (UTC) from kevin_analyses. + + Uses a single SUM query truncated to the start of the current UTC day. + + Args: + session: Async SQLAlchemy session. + + Returns: + Sum of cost_usd for all analyses created since midnight UTC today, + as a Decimal. Returns Decimal("0") when no rows match. + """ + stmt = select( + func.coalesce(func.sum(KevinAnalysis.cost_usd), 0) + ).where( + KevinAnalysis.created_at >= func.date_trunc("day", func.now()) + ) + result = await session.execute(stmt) + scalar = result.scalar_one() + return Decimal(str(scalar or 0)) + + +# --------------------------------------------------------------------------- +# Per-video pipeline stage runner +# --------------------------------------------------------------------------- + + +async def process_one_video( + video: KevinVideo, + session: AsyncSession, + deps: PipelineDeps, +) -> str: + """Advance *video* by one pipeline stage and return the new status string. + + Stage transitions: + discovered → extract captions + • None result → status='failed', failure_reason='no_captions' + • CaptionResult → insert KevinTranscript, advance to 'captioned' + captioned → check daily cost cap + • over cap → leave as 'captioned' (retry tomorrow) + • under cap → call analyze() + - success → insert KevinAnalysis + KevinStockMention rows, + advance to 'analyzed', set processed_at + - exception → increment retry_count; + if retry_count >= 3: status='failed' + otherwise leave as 'captioned' + + Args: + video: ORM instance (mutated in-place; caller is responsible for commit). + session: Async SQLAlchemy session (add/flush, NOT commit — caller commits). + deps: Injected callables and config. + + Returns: + The new status string (e.g. "analyzed", "captioned", "failed"). + """ + current_status: str = str(video.status.value) if hasattr(video.status, "value") else str(video.status) + + # ------------------------------------------------------------------ + # Stage 1: discovered → extract captions + # ------------------------------------------------------------------ + if current_status == "discovered": + caption_result: CaptionResult | None = await deps.extract_captions( + video.youtube_video_id, deps.workdir + ) + + if caption_result is None: + logger.warning("No captions for video %s — marking failed", video.youtube_video_id) + video.status = "failed" + video.failure_reason = "no_captions" + return "failed" + + # Determine transcript source from CaptionResult.source field + source_str = caption_result.source # e.g. "youtube", "captions_auto", "captions_manual" + if "manual" in source_str: + transcript_source = "captions_manual" + elif "auto" in source_str or source_str == "youtube": + transcript_source = "captions_auto" + else: + transcript_source = "captions_auto" + + transcript = KevinTranscript( + video_id=video.id, + source=transcript_source, + language=caption_result.language, + raw_text=caption_result.raw_text, + segments_json=list(caption_result.segments), + word_count=caption_result.word_count, + ) + session.add(transcript) + await session.flush() + + video.status = "captioned" + logger.info("Captions extracted for video %s (%d words)", video.youtube_video_id, caption_result.word_count) + current_status = "captioned" + + # ------------------------------------------------------------------ + # Stage 2: captioned → LLM analysis + # ------------------------------------------------------------------ + if current_status == "captioned": + # Check daily cost cap before calling the LLM + cost_so_far: Decimal = await deps.daily_cost_used(session) + if cost_so_far >= deps.daily_cost_cap_usd: + logger.info( + "Daily cost cap $%.4f reached ($%.4f used) — skipping LLM for %s", + deps.daily_cost_cap_usd, cost_so_far, video.youtube_video_id, + ) + return "captioned" + + # Fetch the transcript for this video to pass to the LLM + from sqlalchemy import select as _select + stmt = _select(KevinTranscript).where(KevinTranscript.video_id == video.id) + result = await session.execute(stmt) + transcript = result.scalar_one() + + segments: list[dict] = transcript.segments_json or [] + + try: + llm_result: LlmCallResult = await deps.analyze( + title=getattr(video, "title", ""), + description=getattr(video, "description", "") or "", + published_at=getattr(video, "published_at", None) or datetime.now(tz=timezone.utc), + transcript_text=transcript.raw_text, + transcript_segments=segments, + ) + except Exception as exc: + video.retry_count = (video.retry_count or 0) + 1 + if video.retry_count >= 3: + video.status = "failed" + video.failure_reason = f"llm_error: {type(exc).__name__}" + logger.error( + "Video %s failed after %d retries: %s", + video.youtube_video_id, video.retry_count, exc, + ) + return "failed" + else: + logger.warning( + "LLM error for video %s (retry %d/3): %s", + video.youtube_video_id, video.retry_count, exc, + ) + return "captioned" + + analysis = llm_result.analysis + + # Persist KevinAnalysis row + db_analysis = KevinAnalysis( + video_id=video.id, + model=deps.model, + prompt_version=deps.prompt_version, + market_outlook_direction=analysis.market_outlook_direction.value, + market_outlook_reasoning=analysis.market_outlook_reasoning, + macro_themes_json=analysis.macro_themes, + key_risks_json=analysis.key_risks, + summary=analysis.summary, + raw_response_json=llm_result.raw_response, + prompt_tokens=llm_result.prompt_tokens, + completion_tokens=llm_result.completion_tokens, + cost_usd=llm_result.cost_usd, + ) + session.add(db_analysis) + await session.flush() # get db_analysis.id + + # Persist KevinStockMention rows + for ticker in analysis.tickers: + mention = KevinStockMention( + video_id=video.id, + analysis_id=db_analysis.id, + symbol=ticker.symbol, + action=ticker.action.value, + conviction=Decimal(str(ticker.conviction)), + time_horizon=ticker.time_horizon.value, + rationale_quote=ticker.rationale_quote, + video_timestamp_seconds=ticker.video_timestamp_seconds, + ) + session.add(mention) + + video.status = "analyzed" + video.processed_at = datetime.now(tz=timezone.utc) + logger.info( + "Analysis complete for video %s: %s, %d tickers, cost=$%.4f", + video.youtube_video_id, + analysis.market_outlook_direction.value, + len(analysis.tickers), + llm_result.cost_usd, + ) + return "analyzed" + + # Unknown status — log and return unchanged + logger.warning("process_one_video: unexpected status %r for video %s", current_status, video.youtube_video_id) + return current_status diff --git a/tests/services/meet_kevin_watcher/test_pipeline.py b/tests/services/meet_kevin_watcher/test_pipeline.py new file mode 100644 index 0000000..5fad771 --- /dev/null +++ b/tests/services/meet_kevin_watcher/test_pipeline.py @@ -0,0 +1,131 @@ +"""Tests for the Meet Kevin pipeline orchestrator (Task 8). + +Tests use AsyncMock/MagicMock to avoid any real DB or LLM calls. +""" + +import pytest +from decimal import Decimal +from unittest.mock import AsyncMock, MagicMock + +from services.meet_kevin_watcher.pipeline import process_one_video, PipelineDeps +from services.meet_kevin_watcher.caption_extractor import CaptionResult +from services.meet_kevin_watcher.llm_analyzer import LlmCallResult +from shared.schemas.meet_kevin import ( + MeetKevinAnalysis, MeetKevinTickerMention, + TickerAction, TimeHorizon, MarketOutlook, +) + + +def _make_analysis(): + return MeetKevinAnalysis( + market_outlook_direction=MarketOutlook.BEARISH, + market_outlook_reasoning="x", + macro_themes=["x"], + key_risks=["x"], + summary="x", + tickers=[ + MeetKevinTickerMention( + symbol="NVDA", action=TickerAction.SELL, conviction=0.8, + time_horizon=TimeHorizon.WEEKS, rationale_quote="x", + video_timestamp_seconds=10, + ) + ], + ) + + +async def test_process_one_video_happy_path(): + """discovered -> captioned -> analyzed; both deps awaited once.""" + video = MagicMock(id=1, youtube_video_id="vid", status="discovered", retry_count=0) + session = AsyncMock() + # Wire session.execute to return something for the transcript fetch + transcript = MagicMock(raw_text="hello NVDA", segments_json=[{"start": 0.0, "end": 1.0, "text": "hello"}]) + session.execute = AsyncMock(return_value=MagicMock(scalar_one=lambda: transcript)) + + deps = PipelineDeps( + extract_captions=AsyncMock(return_value=CaptionResult( + source="captions_auto", language="en", raw_text="hello NVDA", + segments=[{"start": 0.0, "end": 1.0, "text": "hello"}], word_count=2, + )), + analyze=AsyncMock(return_value=LlmCallResult( + analysis=_make_analysis(), + raw_response={"stop_reason": "tool_use"}, + prompt_tokens=100, completion_tokens=50, cost_usd=Decimal("0.05"), + )), + daily_cost_used=AsyncMock(return_value=Decimal("0")), + model="claude-sonnet-4-6", prompt_version="v1", + daily_cost_cap_usd=Decimal("5"), workdir="/tmp", + ) + new_status = await process_one_video(video, session, deps) + assert new_status == "analyzed" + deps.extract_captions.assert_awaited_once() + deps.analyze.assert_awaited_once() + + +async def test_process_one_video_no_captions_marks_failed(): + video = MagicMock(id=1, youtube_video_id="vid", status="discovered", retry_count=0) + session = AsyncMock() + deps = PipelineDeps( + extract_captions=AsyncMock(return_value=None), + analyze=AsyncMock(), + daily_cost_used=AsyncMock(return_value=Decimal("0")), + model="x", prompt_version="v1", + daily_cost_cap_usd=Decimal("5"), workdir="/tmp", + ) + new_status = await process_one_video(video, session, deps) + assert new_status == "failed" + assert video.failure_reason == "no_captions" + deps.analyze.assert_not_awaited() + + +async def test_process_one_video_cost_cap_skips_llm(): + """When daily cost cap reached, video stays captioned and analyze NOT called.""" + video = MagicMock(id=1, youtube_video_id="vid", status="captioned", retry_count=0) + session = AsyncMock() + deps = PipelineDeps( + extract_captions=AsyncMock(), + analyze=AsyncMock(), + daily_cost_used=AsyncMock(return_value=Decimal("5.01")), + model="x", prompt_version="v1", + daily_cost_cap_usd=Decimal("5"), workdir="/tmp", + ) + new_status = await process_one_video(video, session, deps) + assert new_status == "captioned" + deps.analyze.assert_not_awaited() + + +async def test_process_one_video_llm_error_increments_retry(): + """LLM exception increments retry_count; video stays captioned below threshold.""" + video = MagicMock(id=1, youtube_video_id="vid", status="captioned", retry_count=0) + session = AsyncMock() + transcript = MagicMock(raw_text="hello", segments_json=[]) + session.execute = AsyncMock(return_value=MagicMock(scalar_one=lambda: transcript)) + + deps = PipelineDeps( + extract_captions=AsyncMock(), + analyze=AsyncMock(side_effect=ValueError("LLM exploded")), + daily_cost_used=AsyncMock(return_value=Decimal("0")), + model="x", prompt_version="v1", + daily_cost_cap_usd=Decimal("5"), workdir="/tmp", + ) + new_status = await process_one_video(video, session, deps) + assert new_status == "captioned" + assert video.retry_count == 1 + + +async def test_process_one_video_llm_error_marks_failed_after_3_retries(): + """After 3 retries, the video is marked failed.""" + video = MagicMock(id=1, youtube_video_id="vid", status="captioned", retry_count=2) + session = AsyncMock() + transcript = MagicMock(raw_text="hello", segments_json=[]) + session.execute = AsyncMock(return_value=MagicMock(scalar_one=lambda: transcript)) + + deps = PipelineDeps( + extract_captions=AsyncMock(), + analyze=AsyncMock(side_effect=RuntimeError("API down")), + daily_cost_used=AsyncMock(return_value=Decimal("0")), + model="x", prompt_version="v1", + daily_cost_cap_usd=Decimal("5"), workdir="/tmp", + ) + new_status = await process_one_video(video, session, deps) + assert new_status == "failed" + assert "RuntimeError" in video.failure_reason