diff --git a/modules/kubernetes/youtube_dl/yt-highlights/Dockerfile b/modules/kubernetes/youtube_dl/yt-highlights/Dockerfile new file mode 100644 index 00000000..95e1b56f --- /dev/null +++ b/modules/kubernetes/youtube_dl/yt-highlights/Dockerfile @@ -0,0 +1,38 @@ +FROM nvidia/cuda:12.6.3-cudnn-runtime-ubuntu22.04 + +ENV DEBIAN_FRONTEND=noninteractive +ENV PYTHONUNBUFFERED=1 +ENV PATH="/root/.local/bin:$PATH" + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + python3 \ + python3-pip \ + python3-venv \ + ffmpeg \ + git \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Create app directory +WORKDIR /app + +# Install Python dependencies +COPY requirements.txt . +RUN pip3 install --no-cache-dir -r requirements.txt + +# Copy application code +COPY app/ ./app/ + +# Create data directories +RUN mkdir -p /data/audio /data/transcripts /data/highlights /data/config /data/state + +# Expose port +EXPOSE 8000 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 + +# Run the application +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/modules/kubernetes/youtube_dl/yt-highlights/app/__init__.py b/modules/kubernetes/youtube_dl/yt-highlights/app/__init__.py new file mode 100644 index 00000000..c34bcade --- /dev/null +++ b/modules/kubernetes/youtube_dl/yt-highlights/app/__init__.py @@ -0,0 +1 @@ +# YouTube Highlights Extraction Service diff --git a/modules/kubernetes/youtube_dl/yt-highlights/app/main.py b/modules/kubernetes/youtube_dl/yt-highlights/app/main.py new file mode 100644 index 00000000..57526978 --- /dev/null +++ b/modules/kubernetes/youtube_dl/yt-highlights/app/main.py @@ -0,0 +1,1359 @@ +""" +YouTube Highlights Extraction Service + +Downloads YouTube videos, transcribes them using Faster-Whisper, +and extracts highlights using OpenRouter LLM. +""" +import os +import json +import uuid +import asyncio +import logging +import threading +import queue +from datetime import datetime +from pathlib import Path +from typing import Optional +from contextlib import asynccontextmanager + +import feedparser +import httpx +import redis +import yt_dlp +from faster_whisper import WhisperModel +from fastapi import FastAPI, HTTPException +from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse +from pydantic import BaseModel + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Configuration from environment +DATA_PATH = Path(os.getenv("DATA_PATH", "/data")) +ASR_MODEL = os.getenv("ASR_MODEL", "large-v3") +ASR_DEVICE = os.getenv("ASR_DEVICE", "cuda") +OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") +OPENROUTER_MODEL = os.getenv("OPENROUTER_MODEL", "deepseek/deepseek-r1-0528:free") +OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" + +# Ollama fallback configuration (used as last resort if set) +OLLAMA_URL = os.getenv("OLLAMA_URL", "") # e.g., "http://ollama.ollama.svc.cluster.local:11434" +OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen2.5:3b") # Small but capable model + +# Dynamic model pool - fetched from OpenRouter API +_cached_models: list[str] = [] +_models_fetched_at: float = 0 +MODEL_CACHE_TTL = 3600 # Refresh model list every hour + + +def _fetch_free_models() -> list[str]: + """Fetch list of free models from OpenRouter API. + + Returns models sorted by preference (primary env model first if available). + """ + import requests + import time + + global _cached_models, _models_fetched_at + + # Return cached list if still valid + if _cached_models and (time.time() - _models_fetched_at) < MODEL_CACHE_TTL: + return _cached_models + + try: + logger.info("Fetching available models from OpenRouter API...") + response = requests.get( + "https://openrouter.ai/api/v1/models", + headers={"Authorization": f"Bearer {OPENROUTER_API_KEY}"}, + timeout=30.0 + ) + + if response.status_code != 200: + logger.warning(f"Failed to fetch models: {response.status_code}") + # Return fallback list if API fails + return _get_fallback_models() + + data = response.json() + models = data.get("data", []) + + # Filter for free models (pricing is 0 or model ID ends with :free) + free_models = [] + for model in models: + model_id = model.get("id", "") + pricing = model.get("pricing", {}) + + # Check if model is free (prompt and completion are 0 or "0") + prompt_price = pricing.get("prompt", "1") + completion_price = pricing.get("completion", "1") + + is_free = ( + str(prompt_price) == "0" and str(completion_price) == "0" + ) or model_id.endswith(":free") + + if is_free: + free_models.append(model_id) + + logger.info(f"Found {len(free_models)} free models from OpenRouter") + + # Sort models - put preferred/primary model first if in list + sorted_models = [] + if OPENROUTER_MODEL in free_models: + sorted_models.append(OPENROUTER_MODEL) + free_models.remove(OPENROUTER_MODEL) + + # Add remaining models (could add more sophisticated ordering here) + sorted_models.extend(free_models) + + # Cache the result + _cached_models = sorted_models + _models_fetched_at = time.time() + + return sorted_models + + except Exception as e: + logger.warning(f"Error fetching models from OpenRouter: {e}") + return _get_fallback_models() + + +def _get_fallback_models() -> list[str]: + """Fallback model list if API fetch fails - only models known to work.""" + return [ + OPENROUTER_MODEL, + "deepseek/deepseek-r1-0528:free", + "google/gemini-2.0-flash-exp:free", + "meta-llama/llama-3.3-70b-instruct:free", + "mistralai/mistral-small-3.1-24b-instruct:free", + "google/gemma-3-27b-it:free", + ] + +# Slack configuration +SLACK_BOT_TOKEN = os.getenv("SLACK_BOT_TOKEN", "") +SLACK_CHANNEL = os.getenv("SLACK_CHANNEL", "automation") + +# Redis configuration +REDIS_URL = os.getenv("REDIS_URL", "redis://redis.redis.svc.cluster.local:6379/0") +REDIS_PREFIX = "yt-highlights:" + +# Paths +AUDIO_PATH = DATA_PATH / "audio" +TRANSCRIPTS_PATH = DATA_PATH / "transcripts" +HIGHLIGHTS_PATH = DATA_PATH / "highlights" +CONFIG_PATH = DATA_PATH / "config" +STATE_PATH = DATA_PATH / "state" + +# Ensure directories exist +for path in [AUDIO_PATH, TRANSCRIPTS_PATH, HIGHLIGHTS_PATH, CONFIG_PATH, STATE_PATH]: + path.mkdir(parents=True, exist_ok=True) + +# Global state +whisper_model: Optional[WhisperModel] = None +redis_client: Optional[redis.Redis] = None + +# Worker thread state +job_queue: queue.Queue = queue.Queue() +worker_thread: Optional[threading.Thread] = None +worker_running: bool = False + + +class JobStore: + """Redis-backed job storage.""" + + # Jobs older than this are auto-expired + JOB_EXPIRY_HOURS = 24 + + def __init__(self, client: redis.Redis, prefix: str = REDIS_PREFIX): + self.client = client + self.prefix = prefix + + def _key(self, job_id: str) -> str: + return f"{self.prefix}job:{job_id}" + + def set(self, job_id: str, job_data: dict): + """Store job data in Redis.""" + self.client.set(self._key(job_id), json.dumps(job_data)) + # Add to job index + self.client.sadd(f"{self.prefix}jobs", job_id) + + def get(self, job_id: str) -> Optional[dict]: + """Get job data from Redis.""" + data = self.client.get(self._key(job_id)) + if data: + return json.loads(data) + return None + + def update(self, job_id: str, **kwargs): + """Update specific fields in a job.""" + job = self.get(job_id) + if job: + job.update(kwargs) + self.set(job_id, job) + + def delete(self, job_id: str): + """Delete a job from Redis.""" + self.client.delete(self._key(job_id)) + self.client.srem(f"{self.prefix}jobs", job_id) + + def all(self) -> list[dict]: + """Get all jobs.""" + job_ids = self.client.smembers(f"{self.prefix}jobs") + jobs = [] + for job_id in job_ids: + job = self.get(job_id.decode() if isinstance(job_id, bytes) else job_id) + if job: + jobs.append(job) + return jobs + + def get_pending(self) -> list[dict]: + """Get jobs that need to be resumed (queued or processing).""" + pending = [] + for job in self.all(): + if job.get("status") in ("queued", "downloading", "transcribing", "analyzing"): + pending.append(job) + return pending + + def expire_old_jobs(self) -> int: + """Expire jobs older than JOB_EXPIRY_HOURS. + + Returns the number of jobs expired. + """ + from datetime import datetime, timedelta + + cutoff = datetime.utcnow() - timedelta(hours=self.JOB_EXPIRY_HOURS) + expired_count = 0 + + for job in self.all(): + # Skip already completed or failed jobs + if job.get("status") in ("completed", "failed", "expired"): + continue + + # Check job age + created_at = job.get("created_at") + if not created_at: + continue + + try: + # Parse ISO format datetime + job_time = datetime.fromisoformat(created_at.replace("Z", "+00:00").replace("+00:00", "")) + if job_time < cutoff: + job_id = job.get("job_id") + self.update( + job_id, + status="expired", + error=f"Job expired after {self.JOB_EXPIRY_HOURS} hours" + ) + expired_count += 1 + logger.info(f"Expired old job: {job_id}") + except (ValueError, TypeError) as e: + logger.warning(f"Could not parse job date: {created_at}: {e}") + + return expired_count + + +# Global job store (initialized on startup) +job_store: Optional[JobStore] = None + + +# Pydantic models +class ProcessRequest(BaseModel): + video_url: str + whisper_model: Optional[str] = None + language: Optional[str] = "en" + num_highlights: Optional[int] = 5 + + +class ChannelRequest(BaseModel): + channel_id: str + name: Optional[str] = None + + +class JobStatus(BaseModel): + job_id: str + status: str + video_url: str + video_title: Optional[str] = None + progress: Optional[str] = None + error: Optional[str] = None + created_at: str + + +class Highlight(BaseModel): + timestamp: str + timestamp_seconds: int + title: str + description: str + + +class JobResult(BaseModel): + job_id: str + status: str + video_url: str + video_title: str + duration_seconds: int + highlights: list[Highlight] + summary: str + transcript_path: str + + +def load_json(path: Path, default: dict) -> dict: + """Load JSON file or return default.""" + if path.exists(): + return json.loads(path.read_text()) + return default + + +def save_json(path: Path, data: dict): + """Save data to JSON file.""" + path.write_text(json.dumps(data, indent=2)) + + +def send_notification_sync(title: str, message: str, url: str = None): + """Send notification via Slack (synchronous).""" + import requests + + if not SLACK_BOT_TOKEN: + logger.warning("Slack bot token not configured, skipping notification") + return + + try: + # Build Slack message blocks + blocks = [ + { + "type": "header", + "text": {"type": "plain_text", "text": title[:150], "emoji": True} + }, + { + "type": "section", + "text": {"type": "mrkdwn", "text": message[:2900]} + } + ] + + if url: + blocks.append({ + "type": "section", + "text": {"type": "mrkdwn", "text": f"<{url}|Watch Video>"} + }) + + response = requests.post( + "https://slack.com/api/chat.postMessage", + headers={ + "Authorization": f"Bearer {SLACK_BOT_TOKEN}", + "Content-Type": "application/json", + }, + json={ + "channel": SLACK_CHANNEL, + "text": f"{title}: {message}", # Fallback text + "blocks": blocks + }, + timeout=10.0 + ) + + result = response.json() + if not result.get("ok"): + logger.warning(f"Slack API error: {result.get('error', 'unknown')}") + else: + logger.info(f"Slack notification sent: {title}") + + except Exception as e: + logger.warning(f"Failed to send Slack notification: {e}") + + +async def send_notification(title: str, message: str, url: str = None): + """Send notification via Slack (async wrapper).""" + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, send_notification_sync, title, message, url) + + +def get_channels() -> dict: + """Get subscribed channels.""" + return load_json(CONFIG_PATH / "channels.json", {"channels": []}) + + +def save_channels(data: dict): + """Save channels.""" + save_json(CONFIG_PATH / "channels.json", data) + + +def get_processed() -> dict: + """Get processed videos.""" + return load_json(STATE_PATH / "processed.json", {"processed_videos": {}}) + + +def save_processed(data: dict): + """Save processed videos.""" + save_json(STATE_PATH / "processed.json", data) + + +def cleanup_old_processed(hours: int = 24) -> int: + """Remove processed videos older than specified hours. + + Deletes both the state entry and the highlights JSON file. + Returns the number of videos cleaned up. + """ + from datetime import datetime, timedelta + + cutoff = datetime.utcnow() - timedelta(hours=hours) + processed = get_processed() + videos = processed.get("processed_videos", {}) + cleaned = 0 + + to_remove = [] + for video_id, info in videos.items(): + processed_at = info.get("processed_at") + if not processed_at: + continue + + try: + # Parse ISO format datetime + video_time = datetime.fromisoformat(processed_at.replace("Z", "+00:00").replace("+00:00", "")) + if video_time < cutoff: + to_remove.append(video_id) + + # Delete highlights file if exists + highlights_path = info.get("highlights_path") + if highlights_path: + path = Path(highlights_path) + if path.exists(): + path.unlink() + logger.info(f"Deleted old highlights file: {path}") + + # Also delete transcript if exists + transcript_path = TRANSCRIPTS_PATH / f"{video_id}.json" + if transcript_path.exists(): + transcript_path.unlink() + logger.info(f"Deleted old transcript file: {transcript_path}") + + cleaned += 1 + except (ValueError, TypeError) as e: + logger.warning(f"Could not parse processed date: {processed_at}: {e}") + + # Remove from state + for video_id in to_remove: + del videos[video_id] + logger.info(f"Removed old processed video: {video_id}") + + if to_remove: + save_processed(processed) + + return cleaned + + +def extract_video_id(url: str) -> str: + """Extract video ID from YouTube URL.""" + import re + patterns = [ + r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})', + ] + for pattern in patterns: + match = re.search(pattern, url) + if match: + return match.group(1) + return url + + +async def download_audio(video_url: str, output_path: Path) -> dict: + """Download audio from YouTube video (async wrapper).""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, download_audio_sync, video_url, output_path) + + +def download_audio_sync(video_url: str, output_path: Path) -> dict: + """Download audio from YouTube video (synchronous).""" + ydl_opts = { + # Accept any format - FFmpeg will extract audio + 'format': 'best', + 'postprocessors': [{ + 'key': 'FFmpegExtractAudio', + 'preferredcodec': 'mp3', + 'preferredquality': '128', # Lower quality is fine for transcription + }], + 'outtmpl': str(output_path.with_suffix('')), + 'quiet': True, + 'no_warnings': True, + # Avoid 403 errors from YouTube + 'http_headers': { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'en-us,en;q=0.5', + }, + 'extractor_args': {'youtube': {'player_client': ['ios', 'android', 'web']}}, + } + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(video_url, download=True) + return { + 'title': info.get('title', 'Unknown'), + 'duration': info.get('duration', 0), + 'channel': info.get('channel', 'Unknown'), + 'upload_date': info.get('upload_date', ''), + } + + +async def transcribe_audio(audio_path: Path, language: str = "en") -> list[dict]: + """Transcribe audio using Faster-Whisper (async wrapper).""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, transcribe_audio_sync, audio_path, language) + + +def transcribe_audio_sync(audio_path: Path, language: str = "en") -> list[dict]: + """Transcribe audio using Faster-Whisper (synchronous).""" + global whisper_model + + if whisper_model is None: + logger.info(f"Loading Whisper model: {ASR_MODEL} on {ASR_DEVICE}") + whisper_model = WhisperModel( + ASR_MODEL, + device=ASR_DEVICE, + compute_type="float16" if ASR_DEVICE == "cuda" else "int8" + ) + + segments, info = whisper_model.transcribe( + str(audio_path), + language=language, + word_timestamps=True + ) + return [ + { + "start": segment.start, + "end": segment.end, + "text": segment.text.strip(), + } + for segment in segments + ] + + +def format_timestamp(seconds: float) -> str: + """Format seconds as MM:SS or HH:MM:SS.""" + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = int(seconds % 60) + if hours > 0: + return f"{hours}:{minutes:02d}:{secs:02d}" + return f"{minutes}:{secs:02d}" + + +async def extract_highlights( + transcript: list[dict], + video_title: str, + num_highlights: int = 5 +) -> dict: + """Extract highlights using OpenRouter LLM (async wrapper).""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, extract_highlights_sync, transcript, video_title, num_highlights + ) + + +def _call_llm_with_retry(prompt: str) -> dict: + """Call OpenRouter LLM with limited retries, then Ollama fallback. + + Tries up to 5 OpenRouter models once each, then falls back to Ollama. + Designed to fail fast - Ollama is the reliable fallback. + """ + import requests + import time + + # Configuration - keep it fast, Ollama is reliable fallback + MAX_MODELS_TO_TRY = 5 + + # Get available free models (cached, refreshed hourly) + model_pool = _fetch_free_models()[:MAX_MODELS_TO_TRY] + + last_error = None + + for i, model in enumerate(model_pool): + try: + logger.info(f"Trying model: {model} ({i + 1}/{len(model_pool)})") + + response = requests.post( + OPENROUTER_URL, + headers={ + "Authorization": f"Bearer {OPENROUTER_API_KEY}", + "Content-Type": "application/json", + }, + json={ + "model": model, + "messages": [{"role": "user", "content": prompt}], + "temperature": 0.3, + }, + timeout=60.0 # Shorter timeout + ) + + # Non-200 responses - log and try next model + if response.status_code != 200: + logger.warning(f"Model {model} returned {response.status_code}: {response.text[:200]}") + last_error = f"Model {model} error: {response.status_code}" + continue + + result = response.json() + + # Check for API-level errors + if "error" in result: + error_msg = result.get('error', {}) + if isinstance(error_msg, dict): + error_msg = error_msg.get('message', 'unknown') + logger.warning(f"Model {model} API error: {error_msg}") + last_error = f"Model {model} API error: {error_msg}" + continue + + content = result.get("choices", [{}])[0].get("message", {}).get("content", "") + + if not content or not content.strip(): + logger.warning(f"Model {model} returned empty response") + last_error = f"Model {model} returned empty response" + continue + + # Parse and return if successful + parsed = _parse_llm_response(content, model) + if parsed: + logger.info(f"Successfully used model: {model}") + return parsed + else: + last_error = f"Model {model} returned unparseable response" + continue + + except requests.exceptions.Timeout: + logger.warning(f"Model {model} timed out") + last_error = f"Model {model} timed out" + continue + except requests.exceptions.RequestException as e: + logger.warning(f"Model {model} request failed: {e}") + last_error = f"Model {model} request error: {e}" + continue + except Exception as e: + logger.warning(f"Model {model} unexpected error: {e}") + last_error = f"Model {model} error: {e}" + continue + + # Try Ollama as last resort if configured + if OLLAMA_URL: + logger.info(f"All OpenRouter models failed, trying Ollama fallback: {OLLAMA_MODEL}") + try: + response = requests.post( + f"{OLLAMA_URL}/api/generate", + json={ + "model": OLLAMA_MODEL, + "prompt": prompt, + "stream": False, + "options": {"temperature": 0.3} + }, + timeout=300.0 # Ollama can be slow on first load + ) + + if response.status_code == 200: + result = response.json() + content = result.get("response", "") + if content: + parsed = _parse_llm_response(content, f"ollama:{OLLAMA_MODEL}") + if parsed: + logger.info(f"Successfully used Ollama fallback: {OLLAMA_MODEL}") + return parsed + else: + last_error = f"Ollama {OLLAMA_MODEL} returned unparseable response" + else: + last_error = f"Ollama {OLLAMA_MODEL} returned empty response" + else: + last_error = f"Ollama {OLLAMA_MODEL} error: {response.status_code}" + logger.warning(f"Ollama fallback failed: {response.status_code} - {response.text[:200]}") + + except Exception as e: + logger.warning(f"Ollama fallback failed: {e}") + last_error = f"Ollama error: {e}" + + # All models failed + raise ValueError(f"All models failed. Last error: {last_error}") + + +def _parse_llm_response(content: str, model_name: str) -> Optional[dict]: + """Parse LLM response content into JSON dict. Returns None if parsing fails.""" + import re + + # Strip DeepSeek R1 thinking blocks (e.g., ...) + content = re.sub(r'.*?', '', content, flags=re.DOTALL) + + # Parse JSON from response (handle markdown code blocks) + if "```json" in content: + content = content.split("```json")[1].split("```")[0] + elif "```" in content: + content = content.split("```")[1].split("```")[0] + + content = content.strip() + if not content: + logger.warning(f"Model {model_name} returned no JSON content after stripping") + return None + + try: + return json.loads(content) + except json.JSONDecodeError as e: + logger.warning(f"Model {model_name} returned invalid JSON: {e}") + return None + + +def extract_highlights_sync( + transcript: list[dict], + video_title: str, + num_highlights: int = 5 +) -> dict: + """Extract highlights using OpenRouter LLM (synchronous). + + For long transcripts, splits into chunks and processes each separately, + then combines results. Tries multiple models with exponential backoff. + """ + # Chunk configuration - conservative limit for free tier models + MAX_CHUNK_CHARS = 6000 # ~1500 tokens, safe for most free models + HIGHLIGHTS_PER_CHUNK = max(2, num_highlights // 2) + + # Format transcript with timestamps + formatted_segments = [ + f"[{format_timestamp(seg['start'])}] {seg['text']}" + for seg in transcript + ] + formatted_transcript = "\n".join(formatted_segments) + + # If transcript is small enough, process in one go + if len(formatted_transcript) <= MAX_CHUNK_CHARS: + logger.info(f"Processing transcript in single chunk ({len(formatted_transcript)} chars)") + return _process_single_chunk(formatted_transcript, video_title, num_highlights) + + # Split into chunks for long transcripts + chunks = [] + current_chunk = [] + current_length = 0 + + for segment in formatted_segments: + seg_len = len(segment) + 1 # +1 for newline + if current_length + seg_len > MAX_CHUNK_CHARS and current_chunk: + chunks.append("\n".join(current_chunk)) + current_chunk = [segment] + current_length = seg_len + else: + current_chunk.append(segment) + current_length += seg_len + + if current_chunk: + chunks.append("\n".join(current_chunk)) + + logger.info(f"Processing transcript in {len(chunks)} chunks ({len(formatted_transcript)} total chars)") + + # Process each chunk + all_highlights = [] + summaries = [] + + for i, chunk in enumerate(chunks): + logger.info(f"Processing chunk {i + 1}/{len(chunks)} ({len(chunk)} chars)") + try: + result = _process_single_chunk(chunk, video_title, HIGHLIGHTS_PER_CHUNK, is_partial=True, chunk_num=i+1, total_chunks=len(chunks)) + all_highlights.extend(result.get("highlights", [])) + if result.get("summary"): + summaries.append(result["summary"]) + except Exception as e: + logger.warning(f"Chunk {i + 1} failed: {e}") + # Continue with other chunks + + if not all_highlights and not summaries: + raise ValueError("All chunks failed to process") + + # Sort highlights by timestamp and take top N + all_highlights.sort(key=lambda h: h.get("timestamp_seconds", 0)) + top_highlights = all_highlights[:num_highlights] + + # Combine summaries + if len(summaries) > 1: + combined_summary = " ".join(summaries) + elif summaries: + combined_summary = summaries[0] + else: + combined_summary = "Video processed in chunks." + + return { + "highlights": top_highlights, + "summary": combined_summary + } + + +def _process_single_chunk( + formatted_transcript: str, + video_title: str, + num_highlights: int, + is_partial: bool = False, + chunk_num: int = 1, + total_chunks: int = 1 +) -> dict: + """Process a single transcript chunk to extract highlights.""" + chunk_context = "" + summary_instruction = "Provide a brief summary (2-3 sentences MAX, under 200 characters) of the main takeaway." + if is_partial: + chunk_context = f" (Part {chunk_num} of {total_chunks})" + summary_instruction = "Provide a one-sentence summary (under 100 characters) of this section's main point." + + prompt = f"""Analyze this video transcript and extract key moments. + +Video: "{video_title}"{chunk_context} + +TASK: +1. Identify exactly {num_highlights} most important/interesting moments +2. {summary_instruction} + +OUTPUT FORMAT (valid JSON only, no other text): +{{ + "highlights": [ + {{"timestamp": "MM:SS", "timestamp_seconds": , "title": "", "description": "<1 sentence>"}} + ], + "summary": "" +}} + +RULES: +- Timestamps MUST match exactly from transcript (format: MM:SS or H:MM:SS) +- Keep titles punchy and specific (not generic like "Important point") +- Summary must be SHORT - this is critical + +Transcript: +{formatted_transcript}""" + + return _call_llm_with_retry(prompt) + + +def process_video_sync(job_id: str, video_url: str, language: str, num_highlights: int): + """Process a video: download, transcribe, extract highlights (synchronous). + + This runs entirely in the worker thread, keeping the main event loop free. + """ + video_id = extract_video_id(video_url) + + try: + job_store.update(job_id, status="downloading", progress="Downloading audio...") + + audio_path = AUDIO_PATH / f"{video_id}.mp3" + video_info = download_audio_sync(video_url, audio_path) + + job_store.update( + job_id, + video_title=video_info["title"], + status="transcribing", + progress="Transcribing audio..." + ) + + transcript = transcribe_audio_sync(audio_path, language) + + # Save transcript + transcript_path = TRANSCRIPTS_PATH / f"{video_id}.json" + save_json(transcript_path, { + "video_id": video_id, + "video_url": video_url, + "title": video_info["title"], + "duration": video_info["duration"], + "segments": transcript + }) + + job_store.update(job_id, status="analyzing", progress="Extracting highlights...") + + highlights = extract_highlights_sync( + transcript, + video_info["title"], + num_highlights + ) + + # Save highlights + result = { + "job_id": job_id, + "video_id": video_id, + "video_url": video_url, + "video_title": video_info["title"], + "duration_seconds": video_info["duration"], + "highlights": highlights.get("highlights", []), + "summary": highlights.get("summary", ""), + "transcript_path": str(transcript_path), + "processed_at": datetime.utcnow().isoformat() + } + + highlights_path = HIGHLIGHTS_PATH / f"{video_id}.json" + save_json(highlights_path, result) + + # Update processed state + processed = get_processed() + processed["processed_videos"][video_id] = { + "processed_at": datetime.utcnow().isoformat(), + "status": "completed", + "highlights_path": str(highlights_path) + } + save_processed(processed) + + job_store.update(job_id, status="completed", progress=None, result=result) + + # Cleanup audio file + if audio_path.exists(): + audio_path.unlink() + + logger.info(f"Job {job_id} completed: {video_info['title']}") + + # Build notification message with summary and highlights + summary_text = highlights.get('summary', 'No summary') + highlight_list = highlights.get('highlights', []) + + message_parts = [f"*Summary:* {summary_text}"] + + if highlight_list: + message_parts.append("\n*Key Moments:*") + for h in highlight_list[:5]: # Limit to 5 highlights + ts = h.get('timestamp', '0:00') + title = h.get('title', 'Untitled') + message_parts.append(f"- `{ts}` {title}") + + notification_message = "\n".join(message_parts) + + # Send notification (sync version) + send_notification_sync( + title=f"Video Processed: {video_info['title'][:50]}", + message=notification_message, + url=video_url + ) + + except Exception as e: + logger.exception(f"Job {job_id} failed: {e}") + job_store.update(job_id, status="failed", error=str(e)) + + +def worker_loop(): + """Worker thread main loop - processes jobs from the queue one at a time.""" + global worker_running + logger.info("Worker thread started") + + while worker_running: + try: + # Block for up to 1 second waiting for a job + job = job_queue.get(timeout=1.0) + except queue.Empty: + continue + + try: + job_id = job["job_id"] + video_url = job["video_url"] + language = job.get("language", "en") + num_highlights = job.get("num_highlights", 5) + + logger.info(f"Worker processing job {job_id}: {video_url}") + process_video_sync(job_id, video_url, language, num_highlights) + + except Exception as e: + logger.exception(f"Worker error processing job: {e}") + finally: + job_queue.task_done() + + logger.info("Worker thread stopped") + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan handler.""" + global redis_client, job_store, worker_thread, worker_running + + logger.info("Starting yt-highlights service...") + + # Initialize Redis connection + try: + redis_client = redis.from_url(REDIS_URL, decode_responses=False) + redis_client.ping() + job_store = JobStore(redis_client) + logger.info(f"Connected to Redis at {REDIS_URL}") + + # Expire old jobs on startup + expired = job_store.expire_old_jobs() + if expired: + logger.info(f"Expired {expired} old jobs on startup") + + # Cleanup old processed videos on startup + cleaned = cleanup_old_processed(hours=24) + if cleaned: + logger.info(f"Cleaned up {cleaned} old processed videos on startup") + + # Check for pending jobs that need to be resumed + pending = job_store.get_pending() + if pending: + logger.info(f"Found {len(pending)} pending jobs to resume") + for job in pending: + # Mark as failed with resume note - they need to be resubmitted + job_store.update( + job["job_id"], + status="failed", + error="Service restarted - please resubmit" + ) + except Exception as e: + logger.error(f"Failed to connect to Redis: {e}") + raise + + # Start worker thread + worker_running = True + worker_thread = threading.Thread(target=worker_loop, daemon=True, name="video-worker") + worker_thread.start() + logger.info("Worker thread started") + + yield + + logger.info("Shutting down yt-highlights service...") + + # Stop worker thread + worker_running = False + if worker_thread and worker_thread.is_alive(): + worker_thread.join(timeout=5.0) + logger.info("Worker thread stopped") + + if redis_client: + redis_client.close() + + +app = FastAPI( + title="YouTube Highlights Extractor", + description="Extract key moments and summaries from YouTube videos", + version="1.0.0", + lifespan=lifespan +) + + +@app.get("/health") +async def health(): + """Health check endpoint.""" + return {"status": "healthy", "model": ASR_MODEL, "device": ASR_DEVICE} + + +@app.post("/process") +async def process(request: ProcessRequest): + """Queue a video for processing.""" + job_id = str(uuid.uuid4())[:8] + + job_data = { + "job_id": job_id, + "status": "queued", + "video_url": request.video_url, + "video_title": None, + "progress": None, + "error": None, + "created_at": datetime.utcnow().isoformat(), + } + + job_store.set(job_id, job_data) + + # Add to worker queue instead of background task + job_queue.put({ + "job_id": job_id, + "video_url": request.video_url, + "language": request.language or "en", + "num_highlights": request.num_highlights or 5, + }) + + return JobStatus(**job_data) + + +@app.get("/status/{job_id}") +async def status(job_id: str): + """Get job status.""" + job = job_store.get(job_id) + if not job: + raise HTTPException(404, f"Job {job_id} not found") + return JobStatus(**job) + + +@app.get("/results/{job_id}") +async def results(job_id: str): + """Get job results.""" + job = job_store.get(job_id) + if not job: + raise HTTPException(404, f"Job {job_id} not found") + + if job["status"] != "completed": + raise HTTPException(400, f"Job {job_id} not completed: {job['status']}") + + return job.get("result", {}) + + +@app.delete("/jobs/{job_id}") +async def delete_job(job_id: str): + """Delete a job from the queue.""" + job = job_store.get(job_id) + if not job: + raise HTTPException(404, f"Job {job_id} not found") + + job_store.delete(job_id) + return {"status": "deleted", "job_id": job_id} + + +def resolve_channel_id(channel_input: str) -> tuple[str, str]: + """Resolve a YouTube channel handle/URL to a channel ID. + + Args: + channel_input: Can be a handle (@username), channel ID (UC...), or URL + + Returns: + Tuple of (channel_id, channel_name) + """ + # If it's already a channel ID (starts with UC and is 24 chars), return as-is + if channel_input.startswith("UC") and len(channel_input) == 24: + return channel_input, channel_input + + # Build URL from handle or use as-is if it's a URL + if channel_input.startswith("@"): + url = f"https://www.youtube.com/{channel_input}" + elif channel_input.startswith("http"): + url = channel_input + else: + url = f"https://www.youtube.com/@{channel_input}" + + try: + ydl_opts = { + 'quiet': True, + 'extract_flat': True, + 'playlist_items': '1', + 'no_warnings': True, + } + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(url, download=False) + channel_id = info.get('channel_id') + channel_name = info.get('channel') or info.get('uploader') or channel_input + if channel_id: + return channel_id, channel_name + raise ValueError(f"Could not resolve channel ID for {channel_input}") + except Exception as e: + logger.error(f"Failed to resolve channel: {channel_input}: {e}") + raise ValueError(f"Could not resolve channel: {channel_input}") + + +@app.get("/channels") +async def list_channels(): + """List subscribed channels.""" + return get_channels() + + +@app.post("/channels") +async def add_channel(request: ChannelRequest): + """Add a channel subscription. + + Accepts handles (@username), channel IDs (UC...), or URLs. + Resolves to the actual channel ID for RSS feed compatibility. + """ + channels = get_channels() + + # Resolve to actual channel ID + try: + channel_id, channel_name = resolve_channel_id(request.channel_id) + except ValueError as e: + raise HTTPException(400, str(e)) + + # Check if already subscribed (check both input and resolved ID) + for ch in channels["channels"]: + if ch["id"] == channel_id: + raise HTTPException(400, f"Channel already subscribed (ID: {channel_id})") + + channels["channels"].append({ + "id": channel_id, + "name": request.name or channel_name, + "handle": request.channel_id if request.channel_id.startswith("@") else None, + "added_at": datetime.utcnow().isoformat(), + "last_checked": None, + "enabled": True + }) + + save_channels(channels) + logger.info(f"Added channel: {channel_name} (ID: {channel_id})") + return {"status": "added", "channel_id": channel_id, "name": channel_name} + + +@app.delete("/channels/{channel_id}") +async def remove_channel(channel_id: str): + """Remove a channel subscription.""" + channels = get_channels() + channels["channels"] = [ + ch for ch in channels["channels"] + if ch["id"] != channel_id + ] + save_channels(channels) + return {"status": "removed", "channel_id": channel_id} + + +@app.post("/channels/migrate") +async def migrate_channels(): + """Migrate existing channels from handles to proper channel IDs. + + Fixes channels that were added with handles (@username) instead of IDs. + """ + channels = get_channels() + migrated = [] + failed = [] + + for channel in channels["channels"]: + old_id = channel["id"] + # Skip if already a proper channel ID + if old_id.startswith("UC") and len(old_id) == 24: + continue + + try: + new_id, new_name = resolve_channel_id(old_id) + channel["id"] = new_id + channel["handle"] = old_id if old_id.startswith("@") else None + channel["name"] = new_name + migrated.append({"old": old_id, "new": new_id, "name": new_name}) + logger.info(f"Migrated channel: {old_id} -> {new_id}") + except Exception as e: + failed.append({"id": old_id, "error": str(e)}) + logger.error(f"Failed to migrate channel {old_id}: {e}") + + if migrated: + save_channels(channels) + + return {"migrated": migrated, "failed": failed} + + +@app.post("/check-new") +async def check_new_videos(): + """Check all subscribed channels for new videos.""" + channels = get_channels() + processed = get_processed() + new_videos = [] + + for channel in channels["channels"]: + if not channel.get("enabled", True): + continue + + feed_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={channel['id']}" + + try: + feed = feedparser.parse(feed_url) + + for entry in feed.entries[:5]: # Check last 5 videos + video_id = entry.yt_videoid + + if video_id not in processed.get("processed_videos", {}): + new_videos.append({ + "video_id": video_id, + "video_url": entry.link, + "title": entry.title, + "channel": channel["name"], + "published": entry.published + }) + + # Update last checked + channel["last_checked"] = datetime.utcnow().isoformat() + + except Exception as e: + logger.error(f"Error checking channel {channel['id']}: {e}") + + save_channels(channels) + + return { + "channels_checked": len(channels["channels"]), + "new_videos": new_videos + } + + +@app.get("/jobs") +async def list_jobs(): + """List all jobs. Auto-expires jobs older than 24 hours.""" + # Expire old jobs before listing + job_store.expire_old_jobs() + return {"jobs": job_store.all()} + + +@app.get("/processed") +async def list_processed(): + """List all processed videos with their results. Auto-cleans videos older than 24 hours.""" + # Cleanup old processed videos before listing + cleanup_old_processed(hours=24) + + results = [] + for video_id, info in get_processed().get("processed_videos", {}).items(): + highlights_path = Path(info.get("highlights_path", "")) + if highlights_path.exists(): + try: + data = json.loads(highlights_path.read_text()) + results.append(data) + except Exception: + pass + # Sort by processed_at descending + results.sort(key=lambda x: x.get("processed_at", ""), reverse=True) + return {"videos": results} + + +@app.post("/auto-process") +async def auto_process(): + """Check for new videos and auto-queue them for processing. + + Designed to be called by n8n or other schedulers. + """ + # First check for new videos + channels = get_channels() + processed = get_processed() + new_videos = [] + + for channel in channels["channels"]: + if not channel.get("enabled", True): + continue + + feed_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={channel['id']}" + + try: + feed = feedparser.parse(feed_url) + + for entry in feed.entries[:3]: # Check last 3 videos + video_id = entry.yt_videoid + + if video_id not in processed.get("processed_videos", {}): + new_videos.append({ + "video_id": video_id, + "video_url": entry.link, + "title": entry.title, + "channel": channel["name"], + }) + + channel["last_checked"] = datetime.utcnow().isoformat() + + except Exception as e: + logger.error(f"Error checking channel {channel['id']}: {e}") + + save_channels(channels) + + # Queue new videos for processing + queued_jobs = [] + for video in new_videos: + job_id = str(uuid.uuid4())[:8] + + job_data = { + "job_id": job_id, + "status": "queued", + "video_url": video["video_url"], + "video_title": video["title"], + "progress": None, + "error": None, + "created_at": datetime.utcnow().isoformat(), + } + + job_store.set(job_id, job_data) + + # Add to worker queue instead of background task + job_queue.put({ + "job_id": job_id, + "video_url": video["video_url"], + "language": "en", + "num_highlights": 5, + }) + + queued_jobs.append({"job_id": job_id, "title": video["title"]}) + + return { + "channels_checked": len(channels["channels"]), + "new_videos_found": len(new_videos), + "queued": queued_jobs + } + + +# Serve static files for web UI +STATIC_PATH = Path(__file__).parent / "static" +if STATIC_PATH.exists(): + app.mount("/static", StaticFiles(directory=str(STATIC_PATH)), name="static") + + +@app.get("/") +async def root(): + """Serve the web UI.""" + index_path = STATIC_PATH / "index.html" + if index_path.exists(): + return FileResponse(index_path) + return {"message": "YouTube Highlights API", "docs": "/docs"} diff --git a/modules/kubernetes/youtube_dl/yt-highlights/app/static/index.html b/modules/kubernetes/youtube_dl/yt-highlights/app/static/index.html new file mode 100644 index 00000000..f954c218 --- /dev/null +++ b/modules/kubernetes/youtube_dl/yt-highlights/app/static/index.html @@ -0,0 +1,667 @@ + + + + + + YouTube Highlights + + + + + YouTube Highlights Checking... + + + + + + Process a Video + + + Process + + + + + + Subscribed Channels + + + Add Channel + Check for New Videos + + + No channels subscribed yet + + + + + + Processing Queue + + No active jobs + + + + + + Processed Videos + + No videos processed yet + + + + + + + diff --git a/modules/kubernetes/youtube_dl/yt-highlights/requirements.txt b/modules/kubernetes/youtube_dl/yt-highlights/requirements.txt new file mode 100644 index 00000000..debfa714 --- /dev/null +++ b/modules/kubernetes/youtube_dl/yt-highlights/requirements.txt @@ -0,0 +1,10 @@ +fastapi>=0.104.0 +uvicorn>=0.24.0 +yt-dlp==2025.12.8 +faster-whisper>=1.0.0 +httpx>=0.25.0 +requests>=2.31.0 +pydantic>=2.0.0 +python-multipart>=0.0.6 +feedparser>=6.0.0 +redis>=5.0.0