From dee3f2b0a1a6ea757ab872b664e6048648748f02 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Thu, 28 May 2026 21:41:56 +0000 Subject: [PATCH] =?UTF-8?q?feat(scripts):=20reanalyze=5Fkevin=5Fvideos.py?= =?UTF-8?q?=20=E2=80=94=20backfill=20v1=20->=20v2=20prompt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/reanalyze_kevin_videos.py | 215 ++++++++++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 scripts/reanalyze_kevin_videos.py diff --git a/scripts/reanalyze_kevin_videos.py b/scripts/reanalyze_kevin_videos.py new file mode 100644 index 0000000..a9c08f4 --- /dev/null +++ b/scripts/reanalyze_kevin_videos.py @@ -0,0 +1,215 @@ +"""Reanalyze previously-analyzed Meet Kevin videos with a newer prompt version. + +Used to back-fill the v1 → v2 transition: re-run the current SYSTEM_PROMPT +against every ANALYZED video and replace the old kevin_analyses + +kevin_stock_mentions rows. Existing kevin_signal_bridge_state rows are +left alone (they reference mention_id FKs which become orphaned, but +the bridge cursor is past them and never reads them again). + +Usage: + python scripts/reanalyze_kevin_videos.py --all + python scripts/reanalyze_kevin_videos.py --video 5VXbBLaZTD4 + python scripts/reanalyze_kevin_videos.py --since 2026-05-22 --dry-run + +Env: TRADING_ANTHROPIC_OAUTH_TOKEN required (matches pod's env). +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import os +import sys +import tempfile +from datetime import datetime, timedelta, timezone +from decimal import Decimal + +from anthropic import AsyncAnthropic + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from sqlalchemy import delete, select +from sqlalchemy.ext.asyncio import AsyncSession + +from services.meet_kevin_watcher.caption_extractor import extract_captions +from services.meet_kevin_watcher.config import MeetKevinWatcherConfig +from services.meet_kevin_watcher.llm_analyzer import LlmAnalyzer +from shared.db import create_db +from shared.models.meet_kevin import ( + KevinAnalysis, + KevinStockMention, + KevinVideo, + KevinVideoStatus, +) + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") +logger = logging.getLogger("reanalyze") + + +async def _reanalyze_one( + session: AsyncSession, + video: KevinVideo, + analyzer: LlmAnalyzer, + workdir: str, + dry_run: bool, +) -> tuple[bool, Decimal, str]: + """Re-run the v2 prompt on one video. + + Returns: (success, cost_usd, reason). + """ + captions = await extract_captions(video.youtube_video_id, workdir) + if captions is None or not captions.raw_text.strip(): + return False, Decimal("0"), "no_captions" + + try: + result = await analyzer.analyze( + title=video.title or video.youtube_video_id, + description="", + published_at=video.published_at or datetime.now(timezone.utc), + transcript_text=captions.raw_text, + transcript_segments=[dict(s) for s in captions.segments], + ) + except Exception as exc: + logger.warning("LLM analysis failed for %s: %s", video.youtube_video_id, exc) + return False, Decimal("0"), f"llm_failed: {exc}" + + a = result.analysis + logger.info( + "%s — outlook=%s tickers=%d cost=$%.4f", + video.youtube_video_id, + a.market_outlook_direction.value, + len(a.tickers), + float(result.cost_usd), + ) + + if dry_run: + return True, result.cost_usd, "dry_run_skipped_write" + + # Replace old analysis + mentions for this video atomically. + await session.execute( + delete(KevinStockMention).where(KevinStockMention.video_id == video.id) + ) + await session.execute( + delete(KevinAnalysis).where(KevinAnalysis.video_id == video.id) + ) + + db_analysis = KevinAnalysis( + video_id=video.id, + model=analyzer._model, + prompt_version=analyzer._prompt_version, + market_outlook_direction=a.market_outlook_direction.value, + market_outlook_reasoning=a.market_outlook_reasoning, + macro_themes_json=a.macro_themes, + key_risks_json=a.key_risks, + summary=a.summary, + raw_response_json=result.raw_response, + prompt_tokens=result.prompt_tokens, + completion_tokens=result.completion_tokens, + cost_usd=result.cost_usd, + ) + session.add(db_analysis) + await session.flush() + + for ticker in a.tickers: + session.add( + KevinStockMention( + video_id=video.id, + analysis_id=db_analysis.id, + symbol=ticker.symbol, + action=ticker.action.value, + conviction=Decimal(str(ticker.conviction)), + time_horizon=ticker.time_horizon.value, + rationale_quote=ticker.rationale_quote, + video_timestamp_seconds=ticker.video_timestamp_seconds, + expected_move=ticker.expected_move.value, + ) + ) + + await session.commit() + return True, result.cost_usd, "ok" + + +async def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + g = parser.add_mutually_exclusive_group(required=True) + g.add_argument("--all", action="store_true", help="Reanalyze every ANALYZED video") + g.add_argument("--video", help="A single YouTube video ID") + g.add_argument("--since", help="Reanalyze ANALYZED videos with published_at >= YYYY-MM-DD") + parser.add_argument("--dry-run", action="store_true", help="Run the LLM but skip DB writes") + parser.add_argument( + "--max-cost-usd", + type=float, + default=10.0, + help="Hard cap on cumulative LLM spend (USD) for this run", + ) + args = parser.parse_args() + + config = MeetKevinWatcherConfig() + token = config.anthropic_oauth_token or os.environ.get("TRADING_ANTHROPIC_OAUTH_TOKEN", "") + if not token: + raise SystemExit("TRADING_ANTHROPIC_OAUTH_TOKEN is required") + + client = AsyncAnthropic(auth_token=token) + analyzer = LlmAnalyzer( + client=client, + model=config.meet_kevin_llm_model, + prompt_version=config.meet_kevin_prompt_version, + ) + + _engine, session_factory = create_db(config) + + async with session_factory() as session: + stmt = select(KevinVideo).where(KevinVideo.status == KevinVideoStatus.ANALYZED) + if args.video: + stmt = stmt.where(KevinVideo.youtube_video_id == args.video) + elif args.since: + since_dt = datetime.strptime(args.since, "%Y-%m-%d").replace(tzinfo=timezone.utc) + stmt = stmt.where(KevinVideo.published_at >= since_dt) + stmt = stmt.order_by(KevinVideo.published_at.asc()) + videos = (await session.execute(stmt)).scalars().all() + + logger.info("Reanalyzing %d videos with prompt_version=%s%s", + len(videos), + analyzer._prompt_version, + " (dry run)" if args.dry_run else "") + + cumulative_cost = Decimal("0") + ok = 0 + fail = 0 + + with tempfile.TemporaryDirectory(prefix="kevin-reanalyze-") as workdir: + for v in videos: + if cumulative_cost >= Decimal(str(args.max_cost_usd)): + logger.warning("Cost cap $%.2f reached — stopping", args.max_cost_usd) + break + + async with session_factory() as session: + # Re-fetch video in this session + video = ( + await session.execute( + select(KevinVideo).where(KevinVideo.id == v.id) + ) + ).scalar_one() + success, cost, reason = await _reanalyze_one( + session, video, analyzer, workdir, args.dry_run + ) + + cumulative_cost += cost + if success: + ok += 1 + else: + fail += 1 + logger.warning("Skipped %s: %s", v.youtube_video_id, reason) + + # Throttle to stay under Anthropic RPM + await asyncio.sleep(2) + + logger.info( + "Done. ok=%d fail=%d total_cost=$%.4f (dry_run=%s)", + ok, fail, float(cumulative_cost), args.dry_run, + ) + + +if __name__ == "__main__": + asyncio.run(main())