"""Extract captions from YouTube videos via yt-dlp.""" import asyncio import logging import re from dataclasses import dataclass from pathlib import Path logger = logging.getLogger(__name__) @dataclass(frozen=True) class CaptionResult: """Result of caption extraction from a video.""" source: str """Source (e.g., 'youtube').""" language: str """Language code (e.g., 'en').""" raw_text: str """Raw concatenated caption text.""" segments: tuple[dict[str, float | str], ...] """Tuple of segments with {start: float, end: float, text: str}.""" word_count: int """Total word count across all segments.""" def _ts_to_seconds(hours: int, minutes: int, seconds: int, millis: int) -> float: """Convert timestamp components to seconds.""" return hours * 3600 + minutes * 60 + seconds + millis / 1000.0 def _lang_from_filename(filename: str) -> str: """Extract language code from SRT filename. Examples: 'video_id.en.srt' -> 'en' 'video_id.en.auto.srt' -> 'en' """ # Split by dots and get the second component (first is video_id) parts = filename.replace(".srt", "").split(".") if len(parts) >= 2: return parts[1] return "unknown" def parse_srt(text: str) -> list[dict[str, float | str]]: """Parse SRT caption text into structured segments. Args: text: Raw SRT content (SubRip format) Returns: List of dicts with {start: float, end: float, text: str}. Returns empty list on empty/malformed input. """ if not text or not text.strip(): return [] segments: list[dict[str, float | str]] = [] # Split by double newline to get blocks blocks = re.split(r"\n\s*\n", text.strip()) # Timestamp regex: handles both comma and period as decimal separator ts_pattern = r"(\d+):(\d+):(\d+)[,.](\d+)" for block in blocks: lines = block.strip().split("\n") if len(lines) < 3: # Need at least: sequence number, timestamp line, and text continue # Second line should have timestamps ts_line = lines[1].strip() # Extract start and end timestamps timestamps = re.findall(ts_pattern, ts_line) if len(timestamps) < 2: continue try: start_ts = timestamps[0] end_ts = timestamps[1] start = _ts_to_seconds( int(start_ts[0]), int(start_ts[1]), int(start_ts[2]), int(start_ts[3]) ) end = _ts_to_seconds( int(end_ts[0]), int(end_ts[1]), int(end_ts[2]), int(end_ts[3]) ) # Text is everything from line 2 onwards, joined with newlines text_content = "\n".join(lines[2:]).strip() if text_content: segments.append( { "start": start, "end": end, "text": text_content, } ) except (ValueError, IndexError): logger.warning("Failed to parse SRT block: %s", block) continue return segments async def _run_yt_dlp(cmd: list[str], cwd: str) -> int: """Run yt-dlp as subprocess. Args: cmd: Command as list (e.g., ['yt-dlp', '--write-sub', ...]) cwd: Working directory Returns: Exit code (0 on success) """ try: process = await asyncio.create_subprocess_exec( *cmd, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) _stdout, stderr = await process.communicate() exit_code = process.returncode if exit_code is None: exit_code = 1 if exit_code != 0: if stderr: logger.error( "yt-dlp failed with code %d: %s", exit_code, stderr.decode("utf-8", errors="replace"), ) return exit_code except Exception as e: logger.error("Failed to run yt-dlp: %s", e) return 1 async def extract_captions( video_id: str, workdir: str, sub_lang_glob: str = "en.*" ) -> CaptionResult | None: """Extract captions from a YouTube video via yt-dlp. Args: video_id: YouTube video ID (11 characters) workdir: Working directory where SRT files are written sub_lang_glob: Language pattern for yt-dlp --sub-lang (default: 'en.*') Returns: CaptionResult if captions were found and parsed, None otherwise. """ workdir_path = Path(workdir) workdir_path.mkdir(parents=True, exist_ok=True) # Build yt-dlp command url = f"https://www.youtube.com/watch?v={video_id}" cmd = [ "yt-dlp", "--write-auto-sub", "--write-sub", "--sub-lang", sub_lang_glob, "--skip-download", "--convert-subs", "srt", "-o", f"{video_id}.%(ext)s", url, ] # Run yt-dlp exit_code = await _run_yt_dlp(cmd, workdir) if exit_code != 0: logger.warning("yt-dlp failed for video %s", video_id) return None # Look for SRT files, preferring manual subs over auto srt_files = list(workdir_path.glob(f"{video_id}*.srt")) if not srt_files: logger.info("No captions found for video %s", video_id) return None # Sort: manual subs (without .auto.) first srt_files.sort( key=lambda f: ( ".auto." in f.name, # False (manual) sorts before True (auto) f.name, ) ) selected_srt = srt_files[0] logger.info("Using caption file: %s", selected_srt.name) # Parse the SRT raw_text = selected_srt.read_text() segments = parse_srt(raw_text) if not segments: logger.warning("Failed to parse captions from %s", selected_srt.name) # Clean up for srt_file in srt_files: try: srt_file.unlink() except OSError: pass return None # Extract language from filename language = _lang_from_filename(selected_srt.name) # Calculate word count from segment text only all_text = " ".join(str(seg["text"]) for seg in segments) word_count = len(all_text.split()) # Clean up all SRT files for srt_file in srt_files: try: srt_file.unlink() except OSError: pass result = CaptionResult( source="youtube", language=language, raw_text=raw_text, segments=tuple(segments), word_count=word_count, ) return result