feat(meet-kevin): caption extractor via yt-dlp
- Implement CaptionResult frozen dataclass for structured caption data - Add parse_srt() to parse SubRip format with flexible timestamp handling - Add extract_captions() async function using yt-dlp subprocess wrapper - Prefer manual captions over auto-generated; clean up SRT files after parsing - Add 16 comprehensive tests covering edge cases (empty input, malformed SRT, timestamp variations, language extraction, manual vs auto selection) - Type-safe implementation with full mypy --strict compliance - Add sample.srt fixture with 3 segments mentioning NVDA for test reference
This commit is contained in:
parent
8ce3ede09c
commit
145f7dbec5
3 changed files with 589 additions and 0 deletions
249
services/meet_kevin_watcher/caption_extractor.py
Normal file
249
services/meet_kevin_watcher/caption_extractor.py
Normal file
|
|
@ -0,0 +1,249 @@
|
|||
"""Extract captions from YouTube videos via yt-dlp."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class CaptionResult:
|
||||
"""Result of caption extraction from a video."""
|
||||
|
||||
source: str
|
||||
"""Source (e.g., 'youtube')."""
|
||||
|
||||
language: str
|
||||
"""Language code (e.g., 'en')."""
|
||||
|
||||
raw_text: str
|
||||
"""Raw concatenated caption text."""
|
||||
|
||||
segments: tuple[dict[str, float | str], ...]
|
||||
"""Tuple of segments with {start: float, end: float, text: str}."""
|
||||
|
||||
word_count: int
|
||||
"""Total word count across all segments."""
|
||||
|
||||
|
||||
def _ts_to_seconds(hours: int, minutes: int, seconds: int, millis: int) -> float:
|
||||
"""Convert timestamp components to seconds."""
|
||||
return hours * 3600 + minutes * 60 + seconds + millis / 1000.0
|
||||
|
||||
|
||||
def _lang_from_filename(filename: str) -> str:
|
||||
"""Extract language code from SRT filename.
|
||||
|
||||
Examples:
|
||||
'video_id.en.srt' -> 'en'
|
||||
'video_id.en.auto.srt' -> 'en'
|
||||
"""
|
||||
# Split by dots and get the second component (first is video_id)
|
||||
parts = filename.replace(".srt", "").split(".")
|
||||
if len(parts) >= 2:
|
||||
return parts[1]
|
||||
return "unknown"
|
||||
|
||||
|
||||
def parse_srt(text: str) -> list[dict[str, float | str]]:
|
||||
"""Parse SRT caption text into structured segments.
|
||||
|
||||
Args:
|
||||
text: Raw SRT content (SubRip format)
|
||||
|
||||
Returns:
|
||||
List of dicts with {start: float, end: float, text: str}.
|
||||
Returns empty list on empty/malformed input.
|
||||
"""
|
||||
if not text or not text.strip():
|
||||
return []
|
||||
|
||||
segments: list[dict[str, float | str]] = []
|
||||
|
||||
# Split by double newline to get blocks
|
||||
blocks = re.split(r"\n\s*\n", text.strip())
|
||||
|
||||
# Timestamp regex: handles both comma and period as decimal separator
|
||||
ts_pattern = r"(\d+):(\d+):(\d+)[,.](\d+)"
|
||||
|
||||
for block in blocks:
|
||||
lines = block.strip().split("\n")
|
||||
|
||||
if len(lines) < 3:
|
||||
# Need at least: sequence number, timestamp line, and text
|
||||
continue
|
||||
|
||||
# Second line should have timestamps
|
||||
ts_line = lines[1].strip()
|
||||
|
||||
# Extract start and end timestamps
|
||||
timestamps = re.findall(ts_pattern, ts_line)
|
||||
if len(timestamps) < 2:
|
||||
continue
|
||||
|
||||
try:
|
||||
start_ts = timestamps[0]
|
||||
end_ts = timestamps[1]
|
||||
|
||||
start = _ts_to_seconds(
|
||||
int(start_ts[0]), int(start_ts[1]), int(start_ts[2]), int(start_ts[3])
|
||||
)
|
||||
end = _ts_to_seconds(
|
||||
int(end_ts[0]), int(end_ts[1]), int(end_ts[2]), int(end_ts[3])
|
||||
)
|
||||
|
||||
# Text is everything from line 2 onwards, joined with newlines
|
||||
text_content = "\n".join(lines[2:]).strip()
|
||||
|
||||
if text_content:
|
||||
segments.append(
|
||||
{
|
||||
"start": start,
|
||||
"end": end,
|
||||
"text": text_content,
|
||||
}
|
||||
)
|
||||
|
||||
except (ValueError, IndexError):
|
||||
logger.warning("Failed to parse SRT block: %s", block)
|
||||
continue
|
||||
|
||||
return segments
|
||||
|
||||
|
||||
async def _run_yt_dlp(cmd: list[str], cwd: str) -> int:
|
||||
"""Run yt-dlp as subprocess.
|
||||
|
||||
Args:
|
||||
cmd: Command as list (e.g., ['yt-dlp', '--write-sub', ...])
|
||||
cwd: Working directory
|
||||
|
||||
Returns:
|
||||
Exit code (0 on success)
|
||||
"""
|
||||
try:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
cwd=cwd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
|
||||
_stdout, stderr = await process.communicate()
|
||||
|
||||
exit_code = process.returncode
|
||||
if exit_code is None:
|
||||
exit_code = 1
|
||||
|
||||
if exit_code != 0:
|
||||
if stderr:
|
||||
logger.error(
|
||||
"yt-dlp failed with code %d: %s",
|
||||
exit_code,
|
||||
stderr.decode("utf-8", errors="replace"),
|
||||
)
|
||||
|
||||
return exit_code
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to run yt-dlp: %s", e)
|
||||
return 1
|
||||
|
||||
|
||||
async def extract_captions(
|
||||
video_id: str, workdir: str, sub_lang_glob: str = "en.*"
|
||||
) -> CaptionResult | None:
|
||||
"""Extract captions from a YouTube video via yt-dlp.
|
||||
|
||||
Args:
|
||||
video_id: YouTube video ID (11 characters)
|
||||
workdir: Working directory where SRT files are written
|
||||
sub_lang_glob: Language pattern for yt-dlp --sub-lang (default: 'en.*')
|
||||
|
||||
Returns:
|
||||
CaptionResult if captions were found and parsed, None otherwise.
|
||||
"""
|
||||
workdir_path = Path(workdir)
|
||||
workdir_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Build yt-dlp command
|
||||
url = f"https://www.youtube.com/watch?v={video_id}"
|
||||
cmd = [
|
||||
"yt-dlp",
|
||||
"--write-auto-sub",
|
||||
"--write-sub",
|
||||
"--sub-lang",
|
||||
sub_lang_glob,
|
||||
"--skip-download",
|
||||
"--convert-subs",
|
||||
"srt",
|
||||
"-o",
|
||||
f"{video_id}.%(ext)s",
|
||||
url,
|
||||
]
|
||||
|
||||
# Run yt-dlp
|
||||
exit_code = await _run_yt_dlp(cmd, workdir)
|
||||
|
||||
if exit_code != 0:
|
||||
logger.warning("yt-dlp failed for video %s", video_id)
|
||||
return None
|
||||
|
||||
# Look for SRT files, preferring manual subs over auto
|
||||
srt_files = list(workdir_path.glob(f"{video_id}*.srt"))
|
||||
|
||||
if not srt_files:
|
||||
logger.info("No captions found for video %s", video_id)
|
||||
return None
|
||||
|
||||
# Sort: manual subs (without .auto.) first
|
||||
srt_files.sort(
|
||||
key=lambda f: (
|
||||
".auto." in f.name, # False (manual) sorts before True (auto)
|
||||
f.name,
|
||||
)
|
||||
)
|
||||
|
||||
selected_srt = srt_files[0]
|
||||
logger.info("Using caption file: %s", selected_srt.name)
|
||||
|
||||
# Parse the SRT
|
||||
raw_text = selected_srt.read_text()
|
||||
segments = parse_srt(raw_text)
|
||||
|
||||
if not segments:
|
||||
logger.warning("Failed to parse captions from %s", selected_srt.name)
|
||||
# Clean up
|
||||
for srt_file in srt_files:
|
||||
try:
|
||||
srt_file.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
return None
|
||||
|
||||
# Extract language from filename
|
||||
language = _lang_from_filename(selected_srt.name)
|
||||
|
||||
# Calculate word count from segment text only
|
||||
all_text = " ".join(str(seg["text"]) for seg in segments)
|
||||
word_count = len(all_text.split())
|
||||
|
||||
# Clean up all SRT files
|
||||
for srt_file in srt_files:
|
||||
try:
|
||||
srt_file.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
result = CaptionResult(
|
||||
source="youtube",
|
||||
language=language,
|
||||
raw_text=raw_text,
|
||||
segments=tuple(segments),
|
||||
word_count=word_count,
|
||||
)
|
||||
|
||||
return result
|
||||
Loading…
Add table
Add a link
Reference in a new issue