trading/services/meet_kevin_watcher/main.py
Viktor Barzin 8a1d03a967 refactor(meet-kevin): switch LLM back to native Anthropic SDK with OAuth bearer
Previous refactor (89f01ad) moved to OpenRouter because no sk-ant-api-* key
was found in Vault. Turns out claude-agent-service-spare-{1,2} hold
sk-ant-oat01-* OAuth tokens (108 chars, scope user:inference, 1-year TTL,
minted via 'claude setup-token' — see memory id=832).

These tokens work with the Anthropic SDK via the auth_token= constructor
argument (routes to Authorization: Bearer ... instead of x-api-key: ...).
They consume the Enterprise Claude subscription quota rather than
per-call billing, so the OpenRouter zero-credit problem goes away.

- llm_analyzer.py: revert OpenAI client to AsyncAnthropic; tool-use API
  + cache_control restored
- config.py: openrouter_api_key -> anthropic_oauth_token; model slug
  reverted from anthropic/claude-sonnet-4.5 -> claude-sonnet-4-5
- main.py: AsyncOpenAI -> AsyncAnthropic(auth_token=...), drop OpenRouter
  attribution headers
- pyproject: openai>=1.50 -> anthropic>=0.40 in meet_kevin extras
- tests: mocks ported back to messages.create + tool_use blocks
2026-05-22 19:24:40 +00:00

252 lines
9.2 KiB
Python

"""Meet Kevin watcher service entry point.
Polls YouTube RSS feeds for new Meet Kevin videos, deduplicates via
``INSERT … ON CONFLICT DO NOTHING``, and processes each video through
the caption-extraction + LLM-analysis pipeline.
Usage:
python -m services.meet_kevin_watcher.main
# or via Docker ENTRYPOINT
"""
import asyncio
import logging
import signal
from datetime import timezone
from decimal import Decimal
import httpx
from anthropic import AsyncAnthropic
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from shared.db import create_db
from shared.telemetry import setup_telemetry
from services.meet_kevin_watcher.config import MeetKevinWatcherConfig
from services.meet_kevin_watcher.caption_extractor import extract_captions
from services.meet_kevin_watcher.llm_analyzer import LlmAnalyzer
from services.meet_kevin_watcher.pipeline import PipelineDeps, daily_cost_used, process_one_video
from services.meet_kevin_watcher.rss_poller import fetch_feed, parse_feed
from shared.models.meet_kevin import KevinChannel, KevinVideo
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# RSS polling helper
# ---------------------------------------------------------------------------
async def _poll_channels(
session_factory,
http_client: httpx.AsyncClient,
videos_discovered_counter,
) -> None:
"""Fetch RSS for every poll_enabled channel and upsert new videos."""
async with session_factory() as session:
result = await session.execute(
select(KevinChannel).where(KevinChannel.poll_enabled.is_(True))
)
channels = result.scalars().all()
for channel in channels:
try:
xml_bytes = await fetch_feed(channel.youtube_channel_id, http_client)
discovered = parse_feed(xml_bytes)
if not discovered:
continue
async with session_factory() as session:
for video in discovered:
stmt = (
pg_insert(KevinVideo)
.values(
channel_id=channel.id,
youtube_video_id=video.youtube_video_id,
title=video.title,
description=video.description,
published_at=video.published_at,
thumbnail_url=video.thumbnail_url,
status="discovered",
retry_count=0,
)
.on_conflict_do_nothing(index_elements=["youtube_video_id"])
.returning(KevinVideo.id)
)
row = await session.execute(stmt)
if row.scalar():
videos_discovered_counter.add(1)
logger.info("Discovered new video: %s", video.youtube_video_id)
await session.commit()
except Exception:
logger.exception("RSS poll failed for channel %s", channel.youtube_channel_id)
# ---------------------------------------------------------------------------
# Pipeline processing helper
# ---------------------------------------------------------------------------
async def _process_pending_videos(
session_factory,
deps: PipelineDeps,
captions_extracted_counter,
llm_calls_counter,
llm_cost_counter,
) -> None:
"""Walk all videos with status discovered/captioned and advance each one."""
async with session_factory() as session:
result = await session.execute(
select(KevinVideo)
.where(KevinVideo.status.in_(["discovered", "captioned"]))
.order_by(KevinVideo.published_at.asc())
)
videos = result.scalars().all()
for video in videos:
async with session_factory() as session:
# Re-fetch inside its own session so we can commit per-video
result = await session.execute(
select(KevinVideo).where(KevinVideo.id == video.id)
)
db_video = result.scalar_one()
prev_status = str(db_video.status.value) if hasattr(db_video.status, "value") else str(db_video.status)
try:
new_status = await process_one_video(db_video, session, deps)
await session.commit()
# Update OTEL counters based on the transition
if prev_status == "discovered" and new_status == "captioned":
captions_extracted_counter.add(1)
elif prev_status == "captioned" and new_status == "analyzed":
llm_calls_counter.add(1)
# Retrieve cost from the most-recently-inserted analysis
from sqlalchemy import select as _sel
from shared.models.meet_kevin import KevinAnalysis
async with session_factory() as cost_session:
cost_row = await cost_session.execute(
_sel(KevinAnalysis.cost_usd)
.where(KevinAnalysis.video_id == db_video.id)
.order_by(KevinAnalysis.id.desc())
.limit(1)
)
cost = cost_row.scalar_one_or_none()
if cost is not None:
llm_cost_counter.add(float(cost))
except Exception:
logger.exception("Error processing video %s", db_video.youtube_video_id)
try:
await session.rollback()
except Exception:
pass
# ---------------------------------------------------------------------------
# Service entry point
# ---------------------------------------------------------------------------
async def run() -> None:
"""Boot the Meet Kevin watcher and enter the main polling loop."""
config = MeetKevinWatcherConfig()
logging.basicConfig(level=config.log_level)
logger.info("Starting meet-kevin-watcher service")
# Telemetry
meter = setup_telemetry(config.otel_service_name, config.otel_metrics_port)
videos_discovered_counter = meter.create_counter(
"meet_kevin.videos_discovered",
description="Total new Meet Kevin videos discovered via RSS",
)
captions_extracted_counter = meter.create_counter(
"meet_kevin.captions_extracted",
description="Total videos with captions successfully extracted",
)
llm_calls_counter = meter.create_counter(
"meet_kevin.llm_calls",
description="Total LLM analyze() calls made",
)
llm_cost_counter = meter.create_counter(
"meet_kevin.llm_cost_usd",
description="Cumulative LLM spend in USD",
)
# Database
engine, session_factory = create_db(config)
# Anthropic client + LLM analyzer (OAuth bearer token)
client = AsyncAnthropic(
auth_token=config.anthropic_oauth_token,
)
analyzer = LlmAnalyzer(
client=client,
model=config.meet_kevin_llm_model,
prompt_version=config.meet_kevin_prompt_version,
)
# Pipeline deps — wire real callables
async def _extract(video_id: str, workdir: str):
return await extract_captions(video_id, workdir)
async def _analyze(**kwargs):
return await analyzer.analyze(**kwargs)
async def _daily_cost(session):
return await daily_cost_used(session)
deps = PipelineDeps(
extract_captions=_extract,
analyze=_analyze,
daily_cost_used=_daily_cost,
model=config.meet_kevin_llm_model,
prompt_version=config.meet_kevin_prompt_version,
daily_cost_cap_usd=Decimal(str(config.meet_kevin_daily_cost_cap_usd)),
workdir=config.meet_kevin_workdir,
)
# Graceful shutdown
shutdown_event = asyncio.Event()
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, shutdown_event.set)
# Main loop
try:
async with httpx.AsyncClient() as http_client:
while not shutdown_event.is_set():
try:
await _poll_channels(
session_factory, http_client, videos_discovered_counter
)
await _process_pending_videos(
session_factory,
deps,
captions_extracted_counter,
llm_calls_counter,
llm_cost_counter,
)
except Exception:
logger.exception("Main loop iteration failed")
# Wait for next poll interval (or shutdown signal)
try:
await asyncio.wait_for(
shutdown_event.wait(),
timeout=config.meet_kevin_poll_interval_seconds,
)
break # Shutdown signaled
except asyncio.TimeoutError:
pass # Normal timeout — loop again
finally:
await client.close()
await engine.dispose()
logger.info("meet-kevin-watcher stopped gracefully")
if __name__ == "__main__":
asyncio.run(run())