feat(scripts): reanalyze_kevin_videos.py — backfill v1 -> v2 prompt
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

This commit is contained in:
Viktor Barzin 2026-05-28 21:41:56 +00:00
parent 41ab95ec4d
commit dee3f2b0a1

View file

@ -0,0 +1,215 @@
"""Reanalyze previously-analyzed Meet Kevin videos with a newer prompt version.
Used to back-fill the v1 v2 transition: re-run the current SYSTEM_PROMPT
against every ANALYZED video and replace the old kevin_analyses +
kevin_stock_mentions rows. Existing kevin_signal_bridge_state rows are
left alone (they reference mention_id FKs which become orphaned, but
the bridge cursor is past them and never reads them again).
Usage:
python scripts/reanalyze_kevin_videos.py --all
python scripts/reanalyze_kevin_videos.py --video 5VXbBLaZTD4
python scripts/reanalyze_kevin_videos.py --since 2026-05-22 --dry-run
Env: TRADING_ANTHROPIC_OAUTH_TOKEN required (matches pod's env).
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
import tempfile
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from anthropic import AsyncAnthropic
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from services.meet_kevin_watcher.caption_extractor import extract_captions
from services.meet_kevin_watcher.config import MeetKevinWatcherConfig
from services.meet_kevin_watcher.llm_analyzer import LlmAnalyzer
from shared.db import create_db
from shared.models.meet_kevin import (
KevinAnalysis,
KevinStockMention,
KevinVideo,
KevinVideoStatus,
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger = logging.getLogger("reanalyze")
async def _reanalyze_one(
session: AsyncSession,
video: KevinVideo,
analyzer: LlmAnalyzer,
workdir: str,
dry_run: bool,
) -> tuple[bool, Decimal, str]:
"""Re-run the v2 prompt on one video.
Returns: (success, cost_usd, reason).
"""
captions = await extract_captions(video.youtube_video_id, workdir)
if captions is None or not captions.raw_text.strip():
return False, Decimal("0"), "no_captions"
try:
result = await analyzer.analyze(
title=video.title or video.youtube_video_id,
description="",
published_at=video.published_at or datetime.now(timezone.utc),
transcript_text=captions.raw_text,
transcript_segments=[dict(s) for s in captions.segments],
)
except Exception as exc:
logger.warning("LLM analysis failed for %s: %s", video.youtube_video_id, exc)
return False, Decimal("0"), f"llm_failed: {exc}"
a = result.analysis
logger.info(
"%s — outlook=%s tickers=%d cost=$%.4f",
video.youtube_video_id,
a.market_outlook_direction.value,
len(a.tickers),
float(result.cost_usd),
)
if dry_run:
return True, result.cost_usd, "dry_run_skipped_write"
# Replace old analysis + mentions for this video atomically.
await session.execute(
delete(KevinStockMention).where(KevinStockMention.video_id == video.id)
)
await session.execute(
delete(KevinAnalysis).where(KevinAnalysis.video_id == video.id)
)
db_analysis = KevinAnalysis(
video_id=video.id,
model=analyzer._model,
prompt_version=analyzer._prompt_version,
market_outlook_direction=a.market_outlook_direction.value,
market_outlook_reasoning=a.market_outlook_reasoning,
macro_themes_json=a.macro_themes,
key_risks_json=a.key_risks,
summary=a.summary,
raw_response_json=result.raw_response,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=result.cost_usd,
)
session.add(db_analysis)
await session.flush()
for ticker in a.tickers:
session.add(
KevinStockMention(
video_id=video.id,
analysis_id=db_analysis.id,
symbol=ticker.symbol,
action=ticker.action.value,
conviction=Decimal(str(ticker.conviction)),
time_horizon=ticker.time_horizon.value,
rationale_quote=ticker.rationale_quote,
video_timestamp_seconds=ticker.video_timestamp_seconds,
expected_move=ticker.expected_move.value,
)
)
await session.commit()
return True, result.cost_usd, "ok"
async def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
g = parser.add_mutually_exclusive_group(required=True)
g.add_argument("--all", action="store_true", help="Reanalyze every ANALYZED video")
g.add_argument("--video", help="A single YouTube video ID")
g.add_argument("--since", help="Reanalyze ANALYZED videos with published_at >= YYYY-MM-DD")
parser.add_argument("--dry-run", action="store_true", help="Run the LLM but skip DB writes")
parser.add_argument(
"--max-cost-usd",
type=float,
default=10.0,
help="Hard cap on cumulative LLM spend (USD) for this run",
)
args = parser.parse_args()
config = MeetKevinWatcherConfig()
token = config.anthropic_oauth_token or os.environ.get("TRADING_ANTHROPIC_OAUTH_TOKEN", "")
if not token:
raise SystemExit("TRADING_ANTHROPIC_OAUTH_TOKEN is required")
client = AsyncAnthropic(auth_token=token)
analyzer = LlmAnalyzer(
client=client,
model=config.meet_kevin_llm_model,
prompt_version=config.meet_kevin_prompt_version,
)
_engine, session_factory = create_db(config)
async with session_factory() as session:
stmt = select(KevinVideo).where(KevinVideo.status == KevinVideoStatus.ANALYZED)
if args.video:
stmt = stmt.where(KevinVideo.youtube_video_id == args.video)
elif args.since:
since_dt = datetime.strptime(args.since, "%Y-%m-%d").replace(tzinfo=timezone.utc)
stmt = stmt.where(KevinVideo.published_at >= since_dt)
stmt = stmt.order_by(KevinVideo.published_at.asc())
videos = (await session.execute(stmt)).scalars().all()
logger.info("Reanalyzing %d videos with prompt_version=%s%s",
len(videos),
analyzer._prompt_version,
" (dry run)" if args.dry_run else "")
cumulative_cost = Decimal("0")
ok = 0
fail = 0
with tempfile.TemporaryDirectory(prefix="kevin-reanalyze-") as workdir:
for v in videos:
if cumulative_cost >= Decimal(str(args.max_cost_usd)):
logger.warning("Cost cap $%.2f reached — stopping", args.max_cost_usd)
break
async with session_factory() as session:
# Re-fetch video in this session
video = (
await session.execute(
select(KevinVideo).where(KevinVideo.id == v.id)
)
).scalar_one()
success, cost, reason = await _reanalyze_one(
session, video, analyzer, workdir, args.dry_run
)
cumulative_cost += cost
if success:
ok += 1
else:
fail += 1
logger.warning("Skipped %s: %s", v.youtube_video_id, reason)
# Throttle to stay under Anthropic RPM
await asyncio.sleep(2)
logger.info(
"Done. ok=%d fail=%d total_cost=$%.4f (dry_run=%s)",
ok, fail, float(cumulative_cost), args.dry_run,
)
if __name__ == "__main__":
asyncio.run(main())