feat(meet-kevin): pipeline orchestrator + service main loop

Implements Task 8 of the Meet Kevin revival plan.

- pipeline.py: PipelineDeps dataclass (frozen, DI-friendly), process_one_video
  state machine (discovered→captioned→analyzed with retry/cost-cap logic),
  and daily_cost_used() SQL helper.
- main.py: async run() entry point with RSS poll loop, per-video pipeline
  processing, OTEL counters, SIGTERM/SIGINT shutdown, httpx client lifecycle,
  and clean Anthropic/DB teardown.
- tests: 5 pipeline unit tests (happy path, no captions, cost cap, retry
  increment, failed-after-3-retries) all passing; full watcher suite 56/56.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-05-21 19:48:43 +00:00
parent 8309556c00
commit 8f5ee8f1c3
3 changed files with 644 additions and 0 deletions

View file

@ -0,0 +1,250 @@
"""Meet Kevin watcher service entry point.
Polls YouTube RSS feeds for new Meet Kevin videos, deduplicates via
``INSERT ON CONFLICT DO NOTHING``, and processes each video through
the caption-extraction + LLM-analysis pipeline.
Usage:
python -m services.meet_kevin_watcher.main
# or via Docker ENTRYPOINT
"""
import asyncio
import logging
import signal
from datetime import timezone
from decimal import Decimal
import httpx
from anthropic import AsyncAnthropic
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from shared.db import create_db
from shared.telemetry import setup_telemetry
from services.meet_kevin_watcher.config import MeetKevinWatcherConfig
from services.meet_kevin_watcher.caption_extractor import extract_captions
from services.meet_kevin_watcher.llm_analyzer import LlmAnalyzer
from services.meet_kevin_watcher.pipeline import PipelineDeps, daily_cost_used, process_one_video
from services.meet_kevin_watcher.rss_poller import fetch_feed, parse_feed
from shared.models.meet_kevin import KevinChannel, KevinVideo
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# RSS polling helper
# ---------------------------------------------------------------------------
async def _poll_channels(
session_factory,
http_client: httpx.AsyncClient,
videos_discovered_counter,
) -> None:
"""Fetch RSS for every poll_enabled channel and upsert new videos."""
async with session_factory() as session:
result = await session.execute(
select(KevinChannel).where(KevinChannel.poll_enabled.is_(True))
)
channels = result.scalars().all()
for channel in channels:
try:
xml_bytes = await fetch_feed(channel.youtube_channel_id, http_client)
discovered = parse_feed(xml_bytes)
if not discovered:
continue
async with session_factory() as session:
for video in discovered:
stmt = (
pg_insert(KevinVideo)
.values(
channel_id=channel.id,
youtube_video_id=video.youtube_video_id,
title=video.title,
description=video.description,
published_at=video.published_at,
thumbnail_url=video.thumbnail_url,
status="discovered",
retry_count=0,
)
.on_conflict_do_nothing(index_elements=["youtube_video_id"])
.returning(KevinVideo.id)
)
row = await session.execute(stmt)
if row.scalar():
videos_discovered_counter.add(1)
logger.info("Discovered new video: %s", video.youtube_video_id)
await session.commit()
except Exception:
logger.exception("RSS poll failed for channel %s", channel.youtube_channel_id)
# ---------------------------------------------------------------------------
# Pipeline processing helper
# ---------------------------------------------------------------------------
async def _process_pending_videos(
session_factory,
deps: PipelineDeps,
captions_extracted_counter,
llm_calls_counter,
llm_cost_counter,
) -> None:
"""Walk all videos with status discovered/captioned and advance each one."""
async with session_factory() as session:
result = await session.execute(
select(KevinVideo)
.where(KevinVideo.status.in_(["discovered", "captioned"]))
.order_by(KevinVideo.published_at.asc())
)
videos = result.scalars().all()
for video in videos:
async with session_factory() as session:
# Re-fetch inside its own session so we can commit per-video
result = await session.execute(
select(KevinVideo).where(KevinVideo.id == video.id)
)
db_video = result.scalar_one()
prev_status = str(db_video.status.value) if hasattr(db_video.status, "value") else str(db_video.status)
try:
new_status = await process_one_video(db_video, session, deps)
await session.commit()
# Update OTEL counters based on the transition
if prev_status == "discovered" and new_status == "captioned":
captions_extracted_counter.add(1)
elif prev_status == "captioned" and new_status == "analyzed":
llm_calls_counter.add(1)
# Retrieve cost from the most-recently-inserted analysis
from sqlalchemy import select as _sel
from shared.models.meet_kevin import KevinAnalysis
async with session_factory() as cost_session:
cost_row = await cost_session.execute(
_sel(KevinAnalysis.cost_usd)
.where(KevinAnalysis.video_id == db_video.id)
.order_by(KevinAnalysis.id.desc())
.limit(1)
)
cost = cost_row.scalar_one_or_none()
if cost is not None:
llm_cost_counter.add(float(cost))
except Exception:
logger.exception("Error processing video %s", db_video.youtube_video_id)
try:
await session.rollback()
except Exception:
pass
# ---------------------------------------------------------------------------
# Service entry point
# ---------------------------------------------------------------------------
async def run() -> None:
"""Boot the Meet Kevin watcher and enter the main polling loop."""
config = MeetKevinWatcherConfig()
logging.basicConfig(level=config.log_level)
logger.info("Starting meet-kevin-watcher service")
# Telemetry
meter = setup_telemetry(config.otel_service_name, config.otel_metrics_port)
videos_discovered_counter = meter.create_counter(
"meet_kevin.videos_discovered",
description="Total new Meet Kevin videos discovered via RSS",
)
captions_extracted_counter = meter.create_counter(
"meet_kevin.captions_extracted",
description="Total videos with captions successfully extracted",
)
llm_calls_counter = meter.create_counter(
"meet_kevin.llm_calls",
description="Total LLM analyze() calls made",
)
llm_cost_counter = meter.create_counter(
"meet_kevin.llm_cost_usd",
description="Cumulative LLM spend in USD",
)
# Database
engine, session_factory = create_db(config)
# Anthropic client + LLM analyzer
anthropic = AsyncAnthropic(api_key=config.anthropic_api_key)
analyzer = LlmAnalyzer(
client=anthropic,
model=config.meet_kevin_llm_model,
prompt_version=config.meet_kevin_prompt_version,
)
# Pipeline deps — wire real callables
async def _extract(video_id: str, workdir: str):
return await extract_captions(video_id, workdir)
async def _analyze(**kwargs):
return await analyzer.analyze(**kwargs)
async def _daily_cost(session):
return await daily_cost_used(session)
deps = PipelineDeps(
extract_captions=_extract,
analyze=_analyze,
daily_cost_used=_daily_cost,
model=config.meet_kevin_llm_model,
prompt_version=config.meet_kevin_prompt_version,
daily_cost_cap_usd=Decimal(str(config.meet_kevin_daily_cost_cap_usd)),
workdir=config.meet_kevin_workdir,
)
# Graceful shutdown
shutdown_event = asyncio.Event()
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, shutdown_event.set)
# Main loop
try:
async with httpx.AsyncClient() as http_client:
while not shutdown_event.is_set():
try:
await _poll_channels(
session_factory, http_client, videos_discovered_counter
)
await _process_pending_videos(
session_factory,
deps,
captions_extracted_counter,
llm_calls_counter,
llm_cost_counter,
)
except Exception:
logger.exception("Main loop iteration failed")
# Wait for next poll interval (or shutdown signal)
try:
await asyncio.wait_for(
shutdown_event.wait(),
timeout=config.meet_kevin_poll_interval_seconds,
)
break # Shutdown signaled
except asyncio.TimeoutError:
pass # Normal timeout — loop again
finally:
await anthropic.close()
await engine.dispose()
logger.info("meet-kevin-watcher stopped gracefully")
if __name__ == "__main__":
asyncio.run(run())

View file

@ -0,0 +1,263 @@
"""Meet Kevin pipeline orchestrator.
Contains the per-video state-machine (process_one_video) and the daily
cost accounting helper (daily_cost_used). Both are designed for
dependency injection so they are fully unit-testable without a real DB
or LLM backend.
Public exports:
PipelineDeps frozen dataclass carrying all injected callables + config
process_one_video advance one KevinVideo by one pipeline stage
daily_cost_used sum today's LLM spend from kevin_analyses
"""
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from decimal import Decimal
from typing import Any, Callable, Coroutine
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.meet_kevin import (
KevinAnalysis,
KevinStockMention,
KevinTranscript,
KevinVideo,
)
from services.meet_kevin_watcher.caption_extractor import CaptionResult
from services.meet_kevin_watcher.llm_analyzer import LlmCallResult
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Dependency-injection container
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class PipelineDeps:
"""Injected dependencies for the pipeline, making it unit-testable.
All async callables match the signatures of the real implementations
but can be replaced with AsyncMock in tests.
"""
extract_captions: Callable[..., Coroutine[Any, Any, CaptionResult | None]]
"""Async callable: (video_id: str, workdir: str) -> CaptionResult | None"""
analyze: Callable[..., Coroutine[Any, Any, LlmCallResult]]
"""Async callable: (**kwargs) -> LlmCallResult"""
daily_cost_used: Callable[..., Coroutine[Any, Any, Decimal]]
"""Async callable: (session: AsyncSession) -> Decimal"""
model: str
"""LLM model identifier stored in kevin_analyses.model."""
prompt_version: str
"""Prompt version string stored in kevin_analyses.prompt_version."""
daily_cost_cap_usd: Decimal
"""Hard ceiling for total LLM spend per calendar day (UTC)."""
workdir: str
"""Filesystem directory for yt-dlp caption downloads."""
# ---------------------------------------------------------------------------
# Daily cost accounting
# ---------------------------------------------------------------------------
async def daily_cost_used(session: AsyncSession) -> Decimal:
"""Return total LLM cost incurred today (UTC) from kevin_analyses.
Uses a single SUM query truncated to the start of the current UTC day.
Args:
session: Async SQLAlchemy session.
Returns:
Sum of cost_usd for all analyses created since midnight UTC today,
as a Decimal. Returns Decimal("0") when no rows match.
"""
stmt = select(
func.coalesce(func.sum(KevinAnalysis.cost_usd), 0)
).where(
KevinAnalysis.created_at >= func.date_trunc("day", func.now())
)
result = await session.execute(stmt)
scalar = result.scalar_one()
return Decimal(str(scalar or 0))
# ---------------------------------------------------------------------------
# Per-video pipeline stage runner
# ---------------------------------------------------------------------------
async def process_one_video(
video: KevinVideo,
session: AsyncSession,
deps: PipelineDeps,
) -> str:
"""Advance *video* by one pipeline stage and return the new status string.
Stage transitions:
discovered extract captions
None result status='failed', failure_reason='no_captions'
CaptionResult insert KevinTranscript, advance to 'captioned'
captioned check daily cost cap
over cap leave as 'captioned' (retry tomorrow)
under cap call analyze()
- success insert KevinAnalysis + KevinStockMention rows,
advance to 'analyzed', set processed_at
- exception increment retry_count;
if retry_count >= 3: status='failed'
otherwise leave as 'captioned'
Args:
video: ORM instance (mutated in-place; caller is responsible for commit).
session: Async SQLAlchemy session (add/flush, NOT commit caller commits).
deps: Injected callables and config.
Returns:
The new status string (e.g. "analyzed", "captioned", "failed").
"""
current_status: str = str(video.status.value) if hasattr(video.status, "value") else str(video.status)
# ------------------------------------------------------------------
# Stage 1: discovered → extract captions
# ------------------------------------------------------------------
if current_status == "discovered":
caption_result: CaptionResult | None = await deps.extract_captions(
video.youtube_video_id, deps.workdir
)
if caption_result is None:
logger.warning("No captions for video %s — marking failed", video.youtube_video_id)
video.status = "failed"
video.failure_reason = "no_captions"
return "failed"
# Determine transcript source from CaptionResult.source field
source_str = caption_result.source # e.g. "youtube", "captions_auto", "captions_manual"
if "manual" in source_str:
transcript_source = "captions_manual"
elif "auto" in source_str or source_str == "youtube":
transcript_source = "captions_auto"
else:
transcript_source = "captions_auto"
transcript = KevinTranscript(
video_id=video.id,
source=transcript_source,
language=caption_result.language,
raw_text=caption_result.raw_text,
segments_json=list(caption_result.segments),
word_count=caption_result.word_count,
)
session.add(transcript)
await session.flush()
video.status = "captioned"
logger.info("Captions extracted for video %s (%d words)", video.youtube_video_id, caption_result.word_count)
current_status = "captioned"
# ------------------------------------------------------------------
# Stage 2: captioned → LLM analysis
# ------------------------------------------------------------------
if current_status == "captioned":
# Check daily cost cap before calling the LLM
cost_so_far: Decimal = await deps.daily_cost_used(session)
if cost_so_far >= deps.daily_cost_cap_usd:
logger.info(
"Daily cost cap $%.4f reached ($%.4f used) — skipping LLM for %s",
deps.daily_cost_cap_usd, cost_so_far, video.youtube_video_id,
)
return "captioned"
# Fetch the transcript for this video to pass to the LLM
from sqlalchemy import select as _select
stmt = _select(KevinTranscript).where(KevinTranscript.video_id == video.id)
result = await session.execute(stmt)
transcript = result.scalar_one()
segments: list[dict] = transcript.segments_json or []
try:
llm_result: LlmCallResult = await deps.analyze(
title=getattr(video, "title", ""),
description=getattr(video, "description", "") or "",
published_at=getattr(video, "published_at", None) or datetime.now(tz=timezone.utc),
transcript_text=transcript.raw_text,
transcript_segments=segments,
)
except Exception as exc:
video.retry_count = (video.retry_count or 0) + 1
if video.retry_count >= 3:
video.status = "failed"
video.failure_reason = f"llm_error: {type(exc).__name__}"
logger.error(
"Video %s failed after %d retries: %s",
video.youtube_video_id, video.retry_count, exc,
)
return "failed"
else:
logger.warning(
"LLM error for video %s (retry %d/3): %s",
video.youtube_video_id, video.retry_count, exc,
)
return "captioned"
analysis = llm_result.analysis
# Persist KevinAnalysis row
db_analysis = KevinAnalysis(
video_id=video.id,
model=deps.model,
prompt_version=deps.prompt_version,
market_outlook_direction=analysis.market_outlook_direction.value,
market_outlook_reasoning=analysis.market_outlook_reasoning,
macro_themes_json=analysis.macro_themes,
key_risks_json=analysis.key_risks,
summary=analysis.summary,
raw_response_json=llm_result.raw_response,
prompt_tokens=llm_result.prompt_tokens,
completion_tokens=llm_result.completion_tokens,
cost_usd=llm_result.cost_usd,
)
session.add(db_analysis)
await session.flush() # get db_analysis.id
# Persist KevinStockMention rows
for ticker in analysis.tickers:
mention = KevinStockMention(
video_id=video.id,
analysis_id=db_analysis.id,
symbol=ticker.symbol,
action=ticker.action.value,
conviction=Decimal(str(ticker.conviction)),
time_horizon=ticker.time_horizon.value,
rationale_quote=ticker.rationale_quote,
video_timestamp_seconds=ticker.video_timestamp_seconds,
)
session.add(mention)
video.status = "analyzed"
video.processed_at = datetime.now(tz=timezone.utc)
logger.info(
"Analysis complete for video %s: %s, %d tickers, cost=$%.4f",
video.youtube_video_id,
analysis.market_outlook_direction.value,
len(analysis.tickers),
llm_result.cost_usd,
)
return "analyzed"
# Unknown status — log and return unchanged
logger.warning("process_one_video: unexpected status %r for video %s", current_status, video.youtube_video_id)
return current_status

View file

@ -0,0 +1,131 @@
"""Tests for the Meet Kevin pipeline orchestrator (Task 8).
Tests use AsyncMock/MagicMock to avoid any real DB or LLM calls.
"""
import pytest
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
from services.meet_kevin_watcher.pipeline import process_one_video, PipelineDeps
from services.meet_kevin_watcher.caption_extractor import CaptionResult
from services.meet_kevin_watcher.llm_analyzer import LlmCallResult
from shared.schemas.meet_kevin import (
MeetKevinAnalysis, MeetKevinTickerMention,
TickerAction, TimeHorizon, MarketOutlook,
)
def _make_analysis():
return MeetKevinAnalysis(
market_outlook_direction=MarketOutlook.BEARISH,
market_outlook_reasoning="x",
macro_themes=["x"],
key_risks=["x"],
summary="x",
tickers=[
MeetKevinTickerMention(
symbol="NVDA", action=TickerAction.SELL, conviction=0.8,
time_horizon=TimeHorizon.WEEKS, rationale_quote="x",
video_timestamp_seconds=10,
)
],
)
async def test_process_one_video_happy_path():
"""discovered -> captioned -> analyzed; both deps awaited once."""
video = MagicMock(id=1, youtube_video_id="vid", status="discovered", retry_count=0)
session = AsyncMock()
# Wire session.execute to return something for the transcript fetch
transcript = MagicMock(raw_text="hello NVDA", segments_json=[{"start": 0.0, "end": 1.0, "text": "hello"}])
session.execute = AsyncMock(return_value=MagicMock(scalar_one=lambda: transcript))
deps = PipelineDeps(
extract_captions=AsyncMock(return_value=CaptionResult(
source="captions_auto", language="en", raw_text="hello NVDA",
segments=[{"start": 0.0, "end": 1.0, "text": "hello"}], word_count=2,
)),
analyze=AsyncMock(return_value=LlmCallResult(
analysis=_make_analysis(),
raw_response={"stop_reason": "tool_use"},
prompt_tokens=100, completion_tokens=50, cost_usd=Decimal("0.05"),
)),
daily_cost_used=AsyncMock(return_value=Decimal("0")),
model="claude-sonnet-4-6", prompt_version="v1",
daily_cost_cap_usd=Decimal("5"), workdir="/tmp",
)
new_status = await process_one_video(video, session, deps)
assert new_status == "analyzed"
deps.extract_captions.assert_awaited_once()
deps.analyze.assert_awaited_once()
async def test_process_one_video_no_captions_marks_failed():
video = MagicMock(id=1, youtube_video_id="vid", status="discovered", retry_count=0)
session = AsyncMock()
deps = PipelineDeps(
extract_captions=AsyncMock(return_value=None),
analyze=AsyncMock(),
daily_cost_used=AsyncMock(return_value=Decimal("0")),
model="x", prompt_version="v1",
daily_cost_cap_usd=Decimal("5"), workdir="/tmp",
)
new_status = await process_one_video(video, session, deps)
assert new_status == "failed"
assert video.failure_reason == "no_captions"
deps.analyze.assert_not_awaited()
async def test_process_one_video_cost_cap_skips_llm():
"""When daily cost cap reached, video stays captioned and analyze NOT called."""
video = MagicMock(id=1, youtube_video_id="vid", status="captioned", retry_count=0)
session = AsyncMock()
deps = PipelineDeps(
extract_captions=AsyncMock(),
analyze=AsyncMock(),
daily_cost_used=AsyncMock(return_value=Decimal("5.01")),
model="x", prompt_version="v1",
daily_cost_cap_usd=Decimal("5"), workdir="/tmp",
)
new_status = await process_one_video(video, session, deps)
assert new_status == "captioned"
deps.analyze.assert_not_awaited()
async def test_process_one_video_llm_error_increments_retry():
"""LLM exception increments retry_count; video stays captioned below threshold."""
video = MagicMock(id=1, youtube_video_id="vid", status="captioned", retry_count=0)
session = AsyncMock()
transcript = MagicMock(raw_text="hello", segments_json=[])
session.execute = AsyncMock(return_value=MagicMock(scalar_one=lambda: transcript))
deps = PipelineDeps(
extract_captions=AsyncMock(),
analyze=AsyncMock(side_effect=ValueError("LLM exploded")),
daily_cost_used=AsyncMock(return_value=Decimal("0")),
model="x", prompt_version="v1",
daily_cost_cap_usd=Decimal("5"), workdir="/tmp",
)
new_status = await process_one_video(video, session, deps)
assert new_status == "captioned"
assert video.retry_count == 1
async def test_process_one_video_llm_error_marks_failed_after_3_retries():
"""After 3 retries, the video is marked failed."""
video = MagicMock(id=1, youtube_video_id="vid", status="captioned", retry_count=2)
session = AsyncMock()
transcript = MagicMock(raw_text="hello", segments_json=[])
session.execute = AsyncMock(return_value=MagicMock(scalar_one=lambda: transcript))
deps = PipelineDeps(
extract_captions=AsyncMock(),
analyze=AsyncMock(side_effect=RuntimeError("API down")),
daily_cost_used=AsyncMock(return_value=Decimal("0")),
model="x", prompt_version="v1",
daily_cost_cap_usd=Decimal("5"), workdir="/tmp",
)
new_status = await process_one_video(video, session, deps)
assert new_status == "failed"
assert "RuntimeError" in video.failure_reason