diff --git a/services/meet_kevin_watcher/caption_extractor.py b/services/meet_kevin_watcher/caption_extractor.py new file mode 100644 index 0000000..32f02b8 --- /dev/null +++ b/services/meet_kevin_watcher/caption_extractor.py @@ -0,0 +1,249 @@ +"""Extract captions from YouTube videos via yt-dlp.""" + +import asyncio +import logging +import re +from dataclasses import dataclass +from pathlib import Path + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class CaptionResult: + """Result of caption extraction from a video.""" + + source: str + """Source (e.g., 'youtube').""" + + language: str + """Language code (e.g., 'en').""" + + raw_text: str + """Raw concatenated caption text.""" + + segments: tuple[dict[str, float | str], ...] + """Tuple of segments with {start: float, end: float, text: str}.""" + + word_count: int + """Total word count across all segments.""" + + +def _ts_to_seconds(hours: int, minutes: int, seconds: int, millis: int) -> float: + """Convert timestamp components to seconds.""" + return hours * 3600 + minutes * 60 + seconds + millis / 1000.0 + + +def _lang_from_filename(filename: str) -> str: + """Extract language code from SRT filename. + + Examples: + 'video_id.en.srt' -> 'en' + 'video_id.en.auto.srt' -> 'en' + """ + # Split by dots and get the second component (first is video_id) + parts = filename.replace(".srt", "").split(".") + if len(parts) >= 2: + return parts[1] + return "unknown" + + +def parse_srt(text: str) -> list[dict[str, float | str]]: + """Parse SRT caption text into structured segments. + + Args: + text: Raw SRT content (SubRip format) + + Returns: + List of dicts with {start: float, end: float, text: str}. + Returns empty list on empty/malformed input. + """ + if not text or not text.strip(): + return [] + + segments: list[dict[str, float | str]] = [] + + # Split by double newline to get blocks + blocks = re.split(r"\n\s*\n", text.strip()) + + # Timestamp regex: handles both comma and period as decimal separator + ts_pattern = r"(\d+):(\d+):(\d+)[,.](\d+)" + + for block in blocks: + lines = block.strip().split("\n") + + if len(lines) < 3: + # Need at least: sequence number, timestamp line, and text + continue + + # Second line should have timestamps + ts_line = lines[1].strip() + + # Extract start and end timestamps + timestamps = re.findall(ts_pattern, ts_line) + if len(timestamps) < 2: + continue + + try: + start_ts = timestamps[0] + end_ts = timestamps[1] + + start = _ts_to_seconds( + int(start_ts[0]), int(start_ts[1]), int(start_ts[2]), int(start_ts[3]) + ) + end = _ts_to_seconds( + int(end_ts[0]), int(end_ts[1]), int(end_ts[2]), int(end_ts[3]) + ) + + # Text is everything from line 2 onwards, joined with newlines + text_content = "\n".join(lines[2:]).strip() + + if text_content: + segments.append( + { + "start": start, + "end": end, + "text": text_content, + } + ) + + except (ValueError, IndexError): + logger.warning("Failed to parse SRT block: %s", block) + continue + + return segments + + +async def _run_yt_dlp(cmd: list[str], cwd: str) -> int: + """Run yt-dlp as subprocess. + + Args: + cmd: Command as list (e.g., ['yt-dlp', '--write-sub', ...]) + cwd: Working directory + + Returns: + Exit code (0 on success) + """ + try: + process = await asyncio.create_subprocess_exec( + *cmd, + cwd=cwd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + _stdout, stderr = await process.communicate() + + exit_code = process.returncode + if exit_code is None: + exit_code = 1 + + if exit_code != 0: + if stderr: + logger.error( + "yt-dlp failed with code %d: %s", + exit_code, + stderr.decode("utf-8", errors="replace"), + ) + + return exit_code + + except Exception as e: + logger.error("Failed to run yt-dlp: %s", e) + return 1 + + +async def extract_captions( + video_id: str, workdir: str, sub_lang_glob: str = "en.*" +) -> CaptionResult | None: + """Extract captions from a YouTube video via yt-dlp. + + Args: + video_id: YouTube video ID (11 characters) + workdir: Working directory where SRT files are written + sub_lang_glob: Language pattern for yt-dlp --sub-lang (default: 'en.*') + + Returns: + CaptionResult if captions were found and parsed, None otherwise. + """ + workdir_path = Path(workdir) + workdir_path.mkdir(parents=True, exist_ok=True) + + # Build yt-dlp command + url = f"https://www.youtube.com/watch?v={video_id}" + cmd = [ + "yt-dlp", + "--write-auto-sub", + "--write-sub", + "--sub-lang", + sub_lang_glob, + "--skip-download", + "--convert-subs", + "srt", + "-o", + f"{video_id}.%(ext)s", + url, + ] + + # Run yt-dlp + exit_code = await _run_yt_dlp(cmd, workdir) + + if exit_code != 0: + logger.warning("yt-dlp failed for video %s", video_id) + return None + + # Look for SRT files, preferring manual subs over auto + srt_files = list(workdir_path.glob(f"{video_id}*.srt")) + + if not srt_files: + logger.info("No captions found for video %s", video_id) + return None + + # Sort: manual subs (without .auto.) first + srt_files.sort( + key=lambda f: ( + ".auto." in f.name, # False (manual) sorts before True (auto) + f.name, + ) + ) + + selected_srt = srt_files[0] + logger.info("Using caption file: %s", selected_srt.name) + + # Parse the SRT + raw_text = selected_srt.read_text() + segments = parse_srt(raw_text) + + if not segments: + logger.warning("Failed to parse captions from %s", selected_srt.name) + # Clean up + for srt_file in srt_files: + try: + srt_file.unlink() + except OSError: + pass + return None + + # Extract language from filename + language = _lang_from_filename(selected_srt.name) + + # Calculate word count from segment text only + all_text = " ".join(str(seg["text"]) for seg in segments) + word_count = len(all_text.split()) + + # Clean up all SRT files + for srt_file in srt_files: + try: + srt_file.unlink() + except OSError: + pass + + result = CaptionResult( + source="youtube", + language=language, + raw_text=raw_text, + segments=tuple(segments), + word_count=word_count, + ) + + return result diff --git a/tests/fixtures/sample.srt b/tests/fixtures/sample.srt new file mode 100644 index 0000000..8614280 --- /dev/null +++ b/tests/fixtures/sample.srt @@ -0,0 +1,11 @@ +1 +00:00:01,000 --> 00:00:04,500 +Welcome back to Meet Kevin + +2 +00:00:04,500 --> 00:00:09,000 +Today we are talking about NVDA and AMD earnings + +3 +00:00:09,000 --> 00:00:14,250 +You will want to watch this until the end diff --git a/tests/services/meet_kevin_watcher/test_caption_extractor.py b/tests/services/meet_kevin_watcher/test_caption_extractor.py new file mode 100644 index 0000000..5e5d8e9 --- /dev/null +++ b/tests/services/meet_kevin_watcher/test_caption_extractor.py @@ -0,0 +1,329 @@ +"""Tests for caption extraction via yt-dlp.""" + +from pathlib import Path +from unittest.mock import patch + +import pytest + +from services.meet_kevin_watcher.caption_extractor import ( + CaptionResult, + parse_srt, + extract_captions, +) + + +@pytest.fixture +def sample_srt() -> str: + """Load sample SRT fixture.""" + fixture_path = Path(__file__).parent.parent.parent / "fixtures" / "sample.srt" + return fixture_path.read_text() + + +class TestParseSrt: + """Test parse_srt with various inputs.""" + + def test_parse_srt_valid_fixture(self, sample_srt: str): + """parse_srt extracts segments from valid SRT with correct timing.""" + segments = parse_srt(sample_srt) + + # Should return 3 segments + assert len(segments) == 3 + assert isinstance(segments, list) + + # Segment 0: Welcome back + seg0 = segments[0] + assert seg0["start"] == 1.0 + assert seg0["end"] == 4.5 + assert "Welcome back to Meet Kevin" in seg0["text"] + + # Segment 1: NVDA mention + seg1 = segments[1] + assert seg1["start"] == 4.5 + assert seg1["end"] == 9.0 + assert "NVDA" in seg1["text"] + assert "AMD earnings" in seg1["text"] + + # Segment 2: End + seg2 = segments[2] + assert seg2["start"] == 9.0 + assert seg2["end"] == 14.25 + assert "watch this until the end" in seg2["text"] + + def test_parse_srt_empty_input(self): + """parse_srt returns empty list on empty input.""" + result = parse_srt("") + assert result == [] + + def test_parse_srt_whitespace_only(self): + """parse_srt returns empty list on whitespace-only input.""" + result = parse_srt(" \n\n \t ") + assert result == [] + + def test_parse_srt_invalid_format(self): + """parse_srt returns empty list on malformed SRT.""" + result = parse_srt("not valid srt content") + assert result == [] + + def test_parse_srt_timestamp_with_period(self): + """parse_srt handles timestamps with period instead of comma.""" + srt = """1 +00:00:01.000 --> 00:00:04.500 +With period separator + +2 +00:00:05.000 --> 00:00:10.000 +Second segment""" + segments = parse_srt(srt) + assert len(segments) == 2 + assert segments[0]["start"] == 1.0 + assert segments[0]["end"] == 4.5 + assert segments[1]["start"] == 5.0 + + def test_parse_srt_multiline_text(self): + """parse_srt handles multiline subtitle text.""" + srt = """1 +00:00:01,000 --> 00:00:05,000 +First line +Second line +Third line""" + segments = parse_srt(srt) + assert len(segments) == 1 + assert "First line\nSecond line\nThird line" in segments[0]["text"] + + def test_parse_srt_various_durations(self): + """parse_srt correctly converts various timestamp formats.""" + srt = """1 +00:00:00,000 --> 00:00:00,500 +Short + +2 +01:23:45,678 --> 01:23:46,789 +Long""" + segments = parse_srt(srt) + assert len(segments) == 2 + + # First: very short (0.5 seconds) + assert segments[0]["start"] == 0.0 + assert segments[0]["end"] == 0.5 + + # Second: 1 hour 23 min 45+ sec + assert segments[1]["start"] == pytest.approx(1 * 3600 + 23 * 60 + 45.678) + assert segments[1]["end"] == pytest.approx(1 * 3600 + 23 * 60 + 46.789) + + +class TestExtractCaptions: + """Test extract_captions with mocked yt-dlp.""" + + @pytest.mark.asyncio + async def test_extract_captions_success(self, tmp_path): + """extract_captions returns CaptionResult when yt-dlp succeeds.""" + video_id = "dQw4w9WgXcQ" + workdir = str(tmp_path) + + # Pre-populate the SRT file that yt-dlp would create + srt_file = tmp_path / f"{video_id}.en.srt" + srt_file.write_text("""1 +00:00:01,000 --> 00:00:04,500 +Welcome + +2 +00:00:04,500 --> 00:00:09,000 +NVDA discussion""") + + with patch( + "services.meet_kevin_watcher.caption_extractor._run_yt_dlp" + ) as mock_run: + mock_run.return_value = 0 + + result = await extract_captions(video_id, workdir) + + # Should return CaptionResult + assert result is not None + assert isinstance(result, CaptionResult) + assert result.source == "youtube" + assert result.language == "en" + assert result.raw_text is not None + assert len(result.segments) == 2 + assert result.word_count > 0 + + # Verify yt-dlp was called with correct arguments + mock_run.assert_called_once() + call_args = mock_run.call_args[0][0] # First positional arg is cmd list + assert "yt-dlp" in call_args[0] + assert "--write-auto-sub" in call_args + assert "--write-sub" in call_args + assert "--sub-lang" in call_args + assert "en.*" in call_args + assert "--skip-download" in call_args + assert "--convert-subs" in call_args + assert "srt" in call_args + + @pytest.mark.asyncio + async def test_extract_captions_no_srt_produced(self, tmp_path): + """extract_captions returns None when no SRT file is created.""" + video_id = "dQw4w9WgXcQ" + workdir = str(tmp_path) + + with patch( + "services.meet_kevin_watcher.caption_extractor._run_yt_dlp" + ) as mock_run: + mock_run.return_value = 0 + + # Don't create any SRT file + result = await extract_captions(video_id, workdir) + + assert result is None + + @pytest.mark.asyncio + async def test_extract_captions_yt_dlp_failure(self, tmp_path): + """extract_captions returns None when yt-dlp exits with non-zero.""" + video_id = "dQw4w9WgXcQ" + workdir = str(tmp_path) + + with patch( + "services.meet_kevin_watcher.caption_extractor._run_yt_dlp" + ) as mock_run: + mock_run.return_value = 1 # Non-zero exit + + result = await extract_captions(video_id, workdir) + + assert result is None + + @pytest.mark.asyncio + async def test_extract_captions_prefers_manual_subs(self, tmp_path): + """extract_captions prefers manual subs over auto-generated.""" + video_id = "dQw4w9WgXcQ" + workdir = str(tmp_path) + + # Create both manual and auto subs + manual_srt = tmp_path / f"{video_id}.en.srt" + manual_srt.write_text("""1 +00:00:01,000 --> 00:00:05,000 +Manual subtitle""") + + auto_srt = tmp_path / f"{video_id}.en.auto.srt" + auto_srt.write_text("""1 +00:00:01,000 --> 00:00:05,000 +Auto subtitle""") + + with patch( + "services.meet_kevin_watcher.caption_extractor._run_yt_dlp" + ) as mock_run: + mock_run.return_value = 0 + + result = await extract_captions(video_id, workdir) + + # Should use manual (without .auto.) + assert result is not None + assert "Manual subtitle" in result.raw_text + + @pytest.mark.asyncio + async def test_extract_captions_fallback_to_auto(self, tmp_path): + """extract_captions falls back to auto-subs if no manual subs.""" + video_id = "dQw4w9WgXcQ" + workdir = str(tmp_path) + + # Create only auto subs + auto_srt = tmp_path / f"{video_id}.en.auto.srt" + auto_srt.write_text("""1 +00:00:01,000 --> 00:00:05,000 +Auto subtitle""") + + with patch( + "services.meet_kevin_watcher.caption_extractor._run_yt_dlp" + ) as mock_run: + mock_run.return_value = 0 + + result = await extract_captions(video_id, workdir) + + # Should use auto-subs when manual not available + assert result is not None + assert "Auto subtitle" in result.raw_text + + @pytest.mark.asyncio + async def test_extract_captions_cleans_up_srt(self, tmp_path): + """extract_captions deletes SRT files after parsing.""" + video_id = "dQw4w9WgXcQ" + workdir = str(tmp_path) + + # Create SRT file + srt_file = tmp_path / f"{video_id}.en.srt" + srt_file.write_text("""1 +00:00:01,000 --> 00:00:05,000 +Test""") + + with patch( + "services.meet_kevin_watcher.caption_extractor._run_yt_dlp" + ) as mock_run: + mock_run.return_value = 0 + + result = await extract_captions(video_id, workdir) + + assert result is not None + # SRT file should be deleted after parsing + assert not srt_file.exists() + + @pytest.mark.asyncio + async def test_extract_captions_custom_lang_glob(self, tmp_path): + """extract_captions accepts custom language glob pattern.""" + video_id = "dQw4w9WgXcQ" + workdir = str(tmp_path) + + # Create a German subtitle file + de_srt = tmp_path / f"{video_id}.de.srt" + de_srt.write_text("""1 +00:00:01,000 --> 00:00:05,000 +Deutscher Untertitel""") + + with patch( + "services.meet_kevin_watcher.caption_extractor._run_yt_dlp" + ) as mock_run: + mock_run.return_value = 0 + + result = await extract_captions(video_id, workdir, sub_lang_glob="de.*") + + # Should find the German subtitle + assert result is not None + assert result.language == "de" + assert "Deutscher Untertitel" in result.raw_text + + @pytest.mark.asyncio + async def test_extract_captions_word_count(self, tmp_path): + """extract_captions calculates word count correctly.""" + video_id = "dQw4w9WgXcQ" + workdir = str(tmp_path) + + # Create SRT with known word count + srt_file = tmp_path / f"{video_id}.en.srt" + srt_file.write_text("""1 +00:00:01,000 --> 00:00:05,000 +One two three + +2 +00:00:05,000 --> 00:00:10,000 +Four five""") + + with patch( + "services.meet_kevin_watcher.caption_extractor._run_yt_dlp" + ) as mock_run: + mock_run.return_value = 0 + + result = await extract_captions(video_id, workdir) + + assert result is not None + # 5 words total: "One two three Four five" + assert result.word_count == 5 + + def test_caption_result_is_frozen(self): + """CaptionResult is frozen (immutable).""" + result = CaptionResult( + source="youtube", + language="en", + raw_text="Test subtitle", + segments=({"start": 0.0, "end": 1.0, "text": "Test"},), + word_count=1, + ) + + # Should not be mutable (frozen=True) + with pytest.raises(AttributeError): + result.source = "other" # type: ignore