trading/scripts/analyze_kevin_video.py
Viktor Barzin 41ab95ec4d
Some checks failed
ci/woodpecker/push/woodpecker Pipeline was canceled
feat(meet-kevin): prompt v2 — forward-looking action + expected_move field
User reported that the old prompt could emit 'sell' on backward-looking
capitulation ('Kevin sold after a 20% drop') — exactly the false signal
to avoid. v2 reframes every per-ticker field as forward-looking and
adds an explicit expected_move enum for the trading bot to weight.

Changes:
- New ExpectedMove enum (up_strong/up_mild/sideways/down_mild/
  down_strong/unknown) in shared/schemas + shared/models, with
  matching kevin_expected_move Postgres enum + column on
  kevin_stock_mentions (migration e5f6a7b8c9d0). NOT NULL with
  server_default 'unknown' so existing rows backfill cleanly.
- SYSTEM_PROMPT rewritten: action semantics now require a FORWARD
  view; reactive sells get downgraded to 'watch' or skipped; the
  rationale_quote must contain forward reasoning. Quality
  checklist updated.
- _ANALYSIS_TOOL JSON schema gains expected_move (required).
- prompt_version v1 → v2 in config + infra + ad-hoc CLI default.
- pipeline.py persists ticker.expected_move into the new column.

Migration safety: the column is NOT NULL DEFAULT 'unknown' so 96
existing mentions auto-fill with 'unknown' (no forward call known
for backward analyses) without breaking any reads.

Cost to backfill the 27 existing analyses with v2 prompt: ~$3 LLM
spend. A follow-up reanalyze script will replay them after this
ships.
2026-05-28 21:40:07 +00:00

179 lines
5.9 KiB
Python

"""Ad-hoc analysis of a single Meet Kevin video.
Usage:
python scripts/analyze_kevin_video.py <video-id-or-url>
Pulls captions via yt-dlp, runs the same Claude prompt the watcher uses
in production, and prints the resulting tickers + actions. No DB writes,
no Redis publish — strictly observational.
Env vars (or pass via flags):
TRADING_ANTHROPIC_OAUTH_TOKEN — required (matches what the pod uses)
TRADING_MEET_KEVIN_LLM_MODEL — default: claude-haiku-4-5-20251001
TRADING_MEET_KEVIN_PROMPT_VERSION — default: v2
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import re
import subprocess
import sys
import tempfile
from datetime import datetime, timezone
from decimal import Decimal
from anthropic import AsyncAnthropic
# repo root on sys.path so `services.*` and `shared.*` import cleanly
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from services.meet_kevin_watcher.caption_extractor import extract_captions
from services.meet_kevin_watcher.llm_analyzer import LlmAnalyzer
_YOUTUBE_ID = re.compile(r"^[A-Za-z0-9_-]{11}$")
def parse_video_id(s: str) -> str:
"""Accept a bare 11-char YouTube ID or any URL containing v=ID."""
if _YOUTUBE_ID.match(s):
return s
m = re.search(r"(?:v=|youtu\.be/|/shorts/)([A-Za-z0-9_-]{11})", s)
if m:
return m.group(1)
raise SystemExit(f"could not parse YouTube video ID from: {s!r}")
def fetch_video_metadata(video_id: str) -> dict:
"""Pull title/description/upload_date via yt-dlp --dump-json (no download)."""
url = f"https://www.youtube.com/watch?v={video_id}"
try:
out = subprocess.check_output(
["yt-dlp", "--skip-download", "--dump-json", url],
stderr=subprocess.DEVNULL,
timeout=30,
)
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired):
return {}
try:
return json.loads(out.decode("utf-8"))
except json.JSONDecodeError:
return {}
def fmt_pct(d: Decimal | float) -> str:
return f"{float(d) * 100:5.1f}%"
def print_analysis(video_id: str, captions, result) -> None:
a = result.analysis
print(f"\n=== Meet Kevin analysis — {video_id} ===")
print(f" Model: {result.raw_response.get('tool_name', '?')}")
print(
f" Tokens: in={result.prompt_tokens} out={result.completion_tokens}"
f" Cost: ${float(result.cost_usd):.4f}"
)
print(f" Transcript: {len(captions.raw_text)} chars, "
f"{len(captions.segments)} segments")
print(f"\n Market outlook: {a.market_outlook_direction.value}")
print(f" Reasoning: {a.market_outlook_reasoning[:200]}")
if a.macro_themes:
print(f" Macro themes: {', '.join(a.macro_themes)}")
if a.key_risks:
print(f" Key risks: {', '.join(a.key_risks)}")
print(f"\n Summary:")
for line in a.summary.split("\n"):
print(f" {line}")
if not a.tickers:
print("\n TICKERS: (none extracted)")
return
print(f"\n TICKERS ({len(a.tickers)}):")
print(f" {'SYMBOL':<8} {'ACTION':<7} {'EXPECTED':<12} {'CONV':>6} {'HORIZON':<11} {'RATIONALE'}")
print(f" {'-'*8} {'-'*7} {'-'*12} {'-'*6} {'-'*11} {'-'*60}")
# sort by action priority (buy/sell first), then conviction desc
action_rank = {"buy": 0, "sell": 1, "hold": 2, "watch": 3, "avoid": 4}
sorted_t = sorted(
a.tickers,
key=lambda t: (
action_rank.get(t.action.value, 99),
-float(t.conviction),
),
)
for t in sorted_t:
rationale = (t.rationale_quote or "").replace("\n", " ")[:60]
expected = getattr(t, "expected_move", None)
expected_str = expected.value if expected is not None else ""
print(
f" {t.symbol:<8} "
f"{t.action.value:<7} "
f"{expected_str:<12} "
f"{fmt_pct(t.conviction):>6} "
f"{t.time_horizon.value:<11} "
f"{rationale}"
)
async def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("video", help="YouTube video ID or URL")
parser.add_argument(
"--model",
default=os.environ.get(
"TRADING_MEET_KEVIN_LLM_MODEL", "claude-haiku-4-5-20251001"
),
)
parser.add_argument(
"--prompt-version",
default=os.environ.get("TRADING_MEET_KEVIN_PROMPT_VERSION", "v2"),
)
args = parser.parse_args()
token = os.environ.get("TRADING_ANTHROPIC_OAUTH_TOKEN")
if not token:
raise SystemExit("TRADING_ANTHROPIC_OAUTH_TOKEN is required")
video_id = parse_video_id(args.video)
print(f"Analyzing video {video_id} with model {args.model}...")
metadata = fetch_video_metadata(video_id)
title = metadata.get("title") or video_id
description = metadata.get("description") or ""
upload_date = metadata.get("upload_date") # YYYYMMDD
if upload_date:
published_at = datetime.strptime(upload_date, "%Y%m%d").replace(
tzinfo=timezone.utc
)
else:
published_at = datetime.now(timezone.utc)
with tempfile.TemporaryDirectory(prefix=f"kevin-{video_id}-") as workdir:
captions = await extract_captions(video_id, workdir)
if captions is None or not captions.raw_text.strip():
raise SystemExit(f"no captions available for {video_id}")
client = AsyncAnthropic(auth_token=token)
analyzer = LlmAnalyzer(
client=client,
model=args.model,
prompt_version=args.prompt_version,
)
result = await analyzer.analyze(
title=title,
description=description,
published_at=published_at,
transcript_text=captions.raw_text,
transcript_segments=[dict(s) for s in captions.segments],
)
print_analysis(video_id, captions, result)
if __name__ == "__main__":
asyncio.run(main())