- Implement CaptionResult frozen dataclass for structured caption data - Add parse_srt() to parse SubRip format with flexible timestamp handling - Add extract_captions() async function using yt-dlp subprocess wrapper - Prefer manual captions over auto-generated; clean up SRT files after parsing - Add 16 comprehensive tests covering edge cases (empty input, malformed SRT, timestamp variations, language extraction, manual vs auto selection) - Type-safe implementation with full mypy --strict compliance - Add sample.srt fixture with 3 segments mentioning NVDA for test reference
249 lines
6.5 KiB
Python
249 lines
6.5 KiB
Python
"""Extract captions from YouTube videos via yt-dlp."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CaptionResult:
|
|
"""Result of caption extraction from a video."""
|
|
|
|
source: str
|
|
"""Source (e.g., 'youtube')."""
|
|
|
|
language: str
|
|
"""Language code (e.g., 'en')."""
|
|
|
|
raw_text: str
|
|
"""Raw concatenated caption text."""
|
|
|
|
segments: tuple[dict[str, float | str], ...]
|
|
"""Tuple of segments with {start: float, end: float, text: str}."""
|
|
|
|
word_count: int
|
|
"""Total word count across all segments."""
|
|
|
|
|
|
def _ts_to_seconds(hours: int, minutes: int, seconds: int, millis: int) -> float:
|
|
"""Convert timestamp components to seconds."""
|
|
return hours * 3600 + minutes * 60 + seconds + millis / 1000.0
|
|
|
|
|
|
def _lang_from_filename(filename: str) -> str:
|
|
"""Extract language code from SRT filename.
|
|
|
|
Examples:
|
|
'video_id.en.srt' -> 'en'
|
|
'video_id.en.auto.srt' -> 'en'
|
|
"""
|
|
# Split by dots and get the second component (first is video_id)
|
|
parts = filename.replace(".srt", "").split(".")
|
|
if len(parts) >= 2:
|
|
return parts[1]
|
|
return "unknown"
|
|
|
|
|
|
def parse_srt(text: str) -> list[dict[str, float | str]]:
|
|
"""Parse SRT caption text into structured segments.
|
|
|
|
Args:
|
|
text: Raw SRT content (SubRip format)
|
|
|
|
Returns:
|
|
List of dicts with {start: float, end: float, text: str}.
|
|
Returns empty list on empty/malformed input.
|
|
"""
|
|
if not text or not text.strip():
|
|
return []
|
|
|
|
segments: list[dict[str, float | str]] = []
|
|
|
|
# Split by double newline to get blocks
|
|
blocks = re.split(r"\n\s*\n", text.strip())
|
|
|
|
# Timestamp regex: handles both comma and period as decimal separator
|
|
ts_pattern = r"(\d+):(\d+):(\d+)[,.](\d+)"
|
|
|
|
for block in blocks:
|
|
lines = block.strip().split("\n")
|
|
|
|
if len(lines) < 3:
|
|
# Need at least: sequence number, timestamp line, and text
|
|
continue
|
|
|
|
# Second line should have timestamps
|
|
ts_line = lines[1].strip()
|
|
|
|
# Extract start and end timestamps
|
|
timestamps = re.findall(ts_pattern, ts_line)
|
|
if len(timestamps) < 2:
|
|
continue
|
|
|
|
try:
|
|
start_ts = timestamps[0]
|
|
end_ts = timestamps[1]
|
|
|
|
start = _ts_to_seconds(
|
|
int(start_ts[0]), int(start_ts[1]), int(start_ts[2]), int(start_ts[3])
|
|
)
|
|
end = _ts_to_seconds(
|
|
int(end_ts[0]), int(end_ts[1]), int(end_ts[2]), int(end_ts[3])
|
|
)
|
|
|
|
# Text is everything from line 2 onwards, joined with newlines
|
|
text_content = "\n".join(lines[2:]).strip()
|
|
|
|
if text_content:
|
|
segments.append(
|
|
{
|
|
"start": start,
|
|
"end": end,
|
|
"text": text_content,
|
|
}
|
|
)
|
|
|
|
except (ValueError, IndexError):
|
|
logger.warning("Failed to parse SRT block: %s", block)
|
|
continue
|
|
|
|
return segments
|
|
|
|
|
|
async def _run_yt_dlp(cmd: list[str], cwd: str) -> int:
|
|
"""Run yt-dlp as subprocess.
|
|
|
|
Args:
|
|
cmd: Command as list (e.g., ['yt-dlp', '--write-sub', ...])
|
|
cwd: Working directory
|
|
|
|
Returns:
|
|
Exit code (0 on success)
|
|
"""
|
|
try:
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
cwd=cwd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
|
|
_stdout, stderr = await process.communicate()
|
|
|
|
exit_code = process.returncode
|
|
if exit_code is None:
|
|
exit_code = 1
|
|
|
|
if exit_code != 0:
|
|
if stderr:
|
|
logger.error(
|
|
"yt-dlp failed with code %d: %s",
|
|
exit_code,
|
|
stderr.decode("utf-8", errors="replace"),
|
|
)
|
|
|
|
return exit_code
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to run yt-dlp: %s", e)
|
|
return 1
|
|
|
|
|
|
async def extract_captions(
|
|
video_id: str, workdir: str, sub_lang_glob: str = "en.*"
|
|
) -> CaptionResult | None:
|
|
"""Extract captions from a YouTube video via yt-dlp.
|
|
|
|
Args:
|
|
video_id: YouTube video ID (11 characters)
|
|
workdir: Working directory where SRT files are written
|
|
sub_lang_glob: Language pattern for yt-dlp --sub-lang (default: 'en.*')
|
|
|
|
Returns:
|
|
CaptionResult if captions were found and parsed, None otherwise.
|
|
"""
|
|
workdir_path = Path(workdir)
|
|
workdir_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Build yt-dlp command
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
cmd = [
|
|
"yt-dlp",
|
|
"--write-auto-sub",
|
|
"--write-sub",
|
|
"--sub-lang",
|
|
sub_lang_glob,
|
|
"--skip-download",
|
|
"--convert-subs",
|
|
"srt",
|
|
"-o",
|
|
f"{video_id}.%(ext)s",
|
|
url,
|
|
]
|
|
|
|
# Run yt-dlp
|
|
exit_code = await _run_yt_dlp(cmd, workdir)
|
|
|
|
if exit_code != 0:
|
|
logger.warning("yt-dlp failed for video %s", video_id)
|
|
return None
|
|
|
|
# Look for SRT files, preferring manual subs over auto
|
|
srt_files = list(workdir_path.glob(f"{video_id}*.srt"))
|
|
|
|
if not srt_files:
|
|
logger.info("No captions found for video %s", video_id)
|
|
return None
|
|
|
|
# Sort: manual subs (without .auto.) first
|
|
srt_files.sort(
|
|
key=lambda f: (
|
|
".auto." in f.name, # False (manual) sorts before True (auto)
|
|
f.name,
|
|
)
|
|
)
|
|
|
|
selected_srt = srt_files[0]
|
|
logger.info("Using caption file: %s", selected_srt.name)
|
|
|
|
# Parse the SRT
|
|
raw_text = selected_srt.read_text()
|
|
segments = parse_srt(raw_text)
|
|
|
|
if not segments:
|
|
logger.warning("Failed to parse captions from %s", selected_srt.name)
|
|
# Clean up
|
|
for srt_file in srt_files:
|
|
try:
|
|
srt_file.unlink()
|
|
except OSError:
|
|
pass
|
|
return None
|
|
|
|
# Extract language from filename
|
|
language = _lang_from_filename(selected_srt.name)
|
|
|
|
# Calculate word count from segment text only
|
|
all_text = " ".join(str(seg["text"]) for seg in segments)
|
|
word_count = len(all_text.split())
|
|
|
|
# Clean up all SRT files
|
|
for srt_file in srt_files:
|
|
try:
|
|
srt_file.unlink()
|
|
except OSError:
|
|
pass
|
|
|
|
result = CaptionResult(
|
|
source="youtube",
|
|
language=language,
|
|
raw_text=raw_text,
|
|
segments=tuple(segments),
|
|
word_count=word_count,
|
|
)
|
|
|
|
return result
|