From d15337e838245f7a0c36a459c3973e084656d452 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Mon, 23 Feb 2026 23:02:56 +0000 Subject: [PATCH] [ci skip] f1-stream: add extractor framework with demo streams (Phase 3) - BaseExtractor ABC with health_check method - ExtractorRegistry with concurrent fan-out extraction - ExtractionService with in-memory cache and background polling - DemoExtractor with 3 public HLS test streams - Adaptive polling: 5min during live sessions, 30min otherwise - GET /streams, GET /extractors, POST /extract endpoints --- .../files/backend/extractors/__init__.py | 49 +++++++ .../files/backend/extractors/base.py | 118 +++++++++++++++++ .../files/backend/extractors/demo.py | 75 +++++++++++ .../files/backend/extractors/models.py | 29 +++++ .../files/backend/extractors/registry.py | 116 +++++++++++++++++ .../files/backend/extractors/service.py | 121 ++++++++++++++++++ stacks/f1-stream/files/backend/main.py | 103 ++++++++++++++- stacks/f1-stream/main.tf | 2 +- 8 files changed, 608 insertions(+), 5 deletions(-) create mode 100644 stacks/f1-stream/files/backend/extractors/__init__.py create mode 100644 stacks/f1-stream/files/backend/extractors/base.py create mode 100644 stacks/f1-stream/files/backend/extractors/demo.py create mode 100644 stacks/f1-stream/files/backend/extractors/models.py create mode 100644 stacks/f1-stream/files/backend/extractors/registry.py create mode 100644 stacks/f1-stream/files/backend/extractors/service.py diff --git a/stacks/f1-stream/files/backend/extractors/__init__.py b/stacks/f1-stream/files/backend/extractors/__init__.py new file mode 100644 index 00000000..2b5fffdf --- /dev/null +++ b/stacks/f1-stream/files/backend/extractors/__init__.py @@ -0,0 +1,49 @@ +"""Stream extraction framework. + +To add a new extractor: +1. Create a new file in this package (e.g., my_site.py) +2. Subclass BaseExtractor from backend.extractors.base +3. Implement site_key, site_name, and extract() +4. Import and register it in this file's create_registry() function + +Example: + from backend.extractors.my_site import MySiteExtractor + registry.register(MySiteExtractor()) +""" + +from backend.extractors.demo import DemoExtractor +from backend.extractors.models import ExtractedStream +from backend.extractors.registry import ExtractorRegistry +from backend.extractors.service import ExtractionService + +__all__ = [ + "ExtractedStream", + "ExtractorRegistry", + "ExtractionService", + "create_registry", + "create_extraction_service", +] + + +def create_registry() -> ExtractorRegistry: + """Create and populate the extractor registry with all known extractors. + + Add new extractors here by importing and registering them. + """ + registry = ExtractorRegistry() + + # --- Register extractors below --- + registry.register(DemoExtractor()) + # registry.register(MySiteExtractor()) # Add new extractors here + + return registry + + +def create_extraction_service() -> ExtractionService: + """Create an ExtractionService with all extractors registered. + + This is the main entry point for the extraction framework. + Call this once during app startup. + """ + registry = create_registry() + return ExtractionService(registry) diff --git a/stacks/f1-stream/files/backend/extractors/base.py b/stacks/f1-stream/files/backend/extractors/base.py new file mode 100644 index 00000000..b6fae383 --- /dev/null +++ b/stacks/f1-stream/files/backend/extractors/base.py @@ -0,0 +1,118 @@ +"""Base class for all site-specific stream extractors.""" + +import logging +from abc import ABC, abstractmethod + +import httpx + +from backend.extractors.models import ExtractedStream + +logger = logging.getLogger(__name__) + + +class BaseExtractor(ABC): + """Abstract base class for site-specific stream extractors. + + To create a new extractor: + 1. Create a new file in backend/extractors/ + 2. Subclass BaseExtractor + 3. Implement site_key, site_name, and extract() + 4. Register it in backend/extractors/__init__.py + """ + + @property + @abstractmethod + def site_key(self) -> str: + """Unique identifier for this site (e.g., 'sportsurge'). + + Must be lowercase, alphanumeric with hyphens/underscores only. + Used as the cache key and in API responses. + """ + + @property + @abstractmethod + def site_name(self) -> str: + """Human-readable name (e.g., 'SportSurge'). + + Displayed in the UI and API responses. + """ + + @abstractmethod + async def extract(self) -> list[ExtractedStream]: + """Extract stream URLs from this site. + + Returns a list of ExtractedStream objects. Each represents a + discovered stream URL. The extractor should set url, quality, + and title fields; site_key, site_name, and extracted_at are + auto-populated if left empty. + + Implementations should: + - Use httpx for HTTP requests + - Handle their own errors gracefully (log and return empty list) + - Set quality when detectable from the source + - Set title to something descriptive + """ + + async def health_check(self, url: str) -> bool: + """Verify a URL is live (HEAD request, check for m3u8 content). + + Sends a HEAD request and checks: + 1. HTTP 200 response + 2. Content-Type suggests HLS/media content (if available) + + Returns True if the URL appears to be a live stream. + """ + try: + async with httpx.AsyncClient( + timeout=10.0, + follow_redirects=True, + headers={"User-Agent": "Mozilla/5.0"}, + ) as client: + response = await client.head(url) + + if response.status_code != 200: + logger.debug( + "[%s] Health check failed for %s: HTTP %d", + self.site_key, + url, + response.status_code, + ) + return False + + content_type = response.headers.get("content-type", "").lower() + # m3u8 streams typically have these content types + live_indicators = [ + "application/vnd.apple.mpegurl", + "application/x-mpegurl", + "video/", + "audio/", + "octet-stream", + ] + + # If content-type is present and doesn't look like media, + # the URL might not be a stream. But some servers don't set + # content-type properly for HEAD, so we still return True + # if content-type is missing or generic. + if content_type and not any(ind in content_type for ind in live_indicators): + # Content type present but doesn't look like media. + # Could still be valid (some servers return text/plain for m3u8). + if "text/" in content_type or "html" in content_type: + logger.debug( + "[%s] Health check suspect for %s: content-type=%s", + self.site_key, + url, + content_type, + ) + return False + + return True + + except httpx.TimeoutException: + logger.debug("[%s] Health check timed out for %s", self.site_key, url) + return False + except httpx.HTTPError as e: + logger.debug("[%s] Health check error for %s: %s", self.site_key, url, e) + return False + except Exception: + logger.exception("[%s] Unexpected error during health check for %s", self.site_key, url) + return False diff --git a/stacks/f1-stream/files/backend/extractors/demo.py b/stacks/f1-stream/files/backend/extractors/demo.py new file mode 100644 index 00000000..d1cb2785 --- /dev/null +++ b/stacks/f1-stream/files/backend/extractors/demo.py @@ -0,0 +1,75 @@ +"""Demo extractor - returns hardcoded test streams for framework testing. + +This extractor exists purely for testing the extraction pipeline end-to-end. +It does NOT connect to any real streaming site. Disable it in production by +removing its registration from __init__.py or setting DEMO_EXTRACTOR_ENABLED=false. +""" + +import logging +import os + +from backend.extractors.base import BaseExtractor +from backend.extractors.models import ExtractedStream + +logger = logging.getLogger(__name__) + +# Set DEMO_EXTRACTOR_ENABLED=false to disable this extractor +DEMO_ENABLED = os.getenv("DEMO_EXTRACTOR_ENABLED", "true").lower() in ("true", "1", "yes") + + +class DemoExtractor(BaseExtractor): + """Demo extractor that returns hardcoded test streams. + + Use this to verify the extraction framework works end-to-end without + needing a real streaming site. The streams are publicly available HLS + test streams from Apple and others. + """ + + @property + def site_key(self) -> str: + return "demo" + + @property + def site_name(self) -> str: + return "Demo (Test Streams)" + + async def extract(self) -> list[ExtractedStream]: + """Return hardcoded test streams for framework testing.""" + if not DEMO_ENABLED: + logger.info("[demo] Demo extractor is disabled via DEMO_EXTRACTOR_ENABLED") + return [] + + logger.info("[demo] Returning demo test streams") + + streams = [ + ExtractedStream( + url="https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8", + site_key=self.site_key, + site_name=self.site_name, + quality="720p", + title="Big Buck Bunny (Test Stream)", + is_live=False, + ), + ExtractedStream( + url="https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_16x9/bipbop_16x9_variant.m3u8", + site_key=self.site_key, + site_name=self.site_name, + quality="1080p", + title="Apple Bipbop (Test Stream)", + is_live=False, + ), + ExtractedStream( + url="https://cph-p2p-msl.akamaized.net/hls/live/2000341/test/master.m3u8", + site_key=self.site_key, + site_name=self.site_name, + quality="", + title="Akamai Live Test Stream", + is_live=False, + ), + ] + + # Optionally run health checks on the demo streams + for stream in streams: + stream.is_live = await self.health_check(stream.url) + + return streams diff --git a/stacks/f1-stream/files/backend/extractors/models.py b/stacks/f1-stream/files/backend/extractors/models.py new file mode 100644 index 00000000..a66723c0 --- /dev/null +++ b/stacks/f1-stream/files/backend/extractors/models.py @@ -0,0 +1,29 @@ +"""Data models for the stream extraction framework.""" + +from dataclasses import dataclass, field +from datetime import datetime, timezone + + +@dataclass +class ExtractedStream: + """Represents a single stream URL discovered by an extractor.""" + + url: str # The HLS/m3u8 URL + site_key: str # Which extractor found it + site_name: str # Human-readable name + quality: str = "" # e.g., "720p", "1080p", or empty + title: str = "" # e.g., "F1 Race Live" + extracted_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + is_live: bool = False # Whether it passed health check + + def to_dict(self) -> dict: + """Serialize to a plain dictionary for JSON responses.""" + return { + "url": self.url, + "site_key": self.site_key, + "site_name": self.site_name, + "quality": self.quality, + "title": self.title, + "extracted_at": self.extracted_at, + "is_live": self.is_live, + } diff --git a/stacks/f1-stream/files/backend/extractors/registry.py b/stacks/f1-stream/files/backend/extractors/registry.py new file mode 100644 index 00000000..93378cda --- /dev/null +++ b/stacks/f1-stream/files/backend/extractors/registry.py @@ -0,0 +1,116 @@ +"""Central registry for stream extractors.""" + +import asyncio +import logging +from datetime import datetime, timezone + +from backend.extractors.base import BaseExtractor +from backend.extractors.models import ExtractedStream + +logger = logging.getLogger(__name__) + + +class ExtractorRegistry: + """Central registry for all site extractors. + + Manages extractor instances and provides fan-out extraction across + all registered extractors with independent error handling. + """ + + def __init__(self) -> None: + self._extractors: dict[str, BaseExtractor] = {} + + def register(self, extractor: BaseExtractor) -> None: + """Register an extractor instance. + + Args: + extractor: A BaseExtractor subclass instance. + + Raises: + ValueError: If an extractor with the same site_key is already registered. + """ + key = extractor.site_key + if key in self._extractors: + raise ValueError( + f"Extractor with site_key '{key}' is already registered " + f"(existing: {self._extractors[key].site_name}, " + f"new: {extractor.site_name})" + ) + self._extractors[key] = extractor + logger.info("Registered extractor: %s (%s)", extractor.site_name, key) + + def get(self, site_key: str) -> BaseExtractor | None: + """Get an extractor by its site_key. + + Args: + site_key: The unique identifier of the extractor. + + Returns: + The extractor instance, or None if not found. + """ + return self._extractors.get(site_key) + + def list_extractors(self) -> list[dict]: + """List all registered extractors. + + Returns: + A list of dicts with site_key and site_name for each extractor. + """ + return [ + {"site_key": ext.site_key, "site_name": ext.site_name} + for ext in self._extractors.values() + ] + + async def extract_all(self) -> list[ExtractedStream]: + """Fan-out extraction to all registered extractors concurrently. + + Each extractor runs independently. If one fails, the others + continue and their results are still collected. + + Returns: + Combined list of ExtractedStream from all extractors. + """ + if not self._extractors: + logger.warning("No extractors registered, nothing to extract") + return [] + + logger.info( + "Running extraction across %d extractor(s): %s", + len(self._extractors), + ", ".join(self._extractors.keys()), + ) + + async def _safe_extract(extractor: BaseExtractor) -> list[ExtractedStream]: + """Run a single extractor with error isolation.""" + try: + streams = await extractor.extract() + # Fill in site_key/site_name if the extractor didn't set them + now = datetime.now(timezone.utc).isoformat() + for stream in streams: + if not stream.site_key: + stream.site_key = extractor.site_key + if not stream.site_name: + stream.site_name = extractor.site_name + if not stream.extracted_at: + stream.extracted_at = now + logger.info( + "[%s] Extracted %d stream(s)", extractor.site_key, len(streams) + ) + return streams + except Exception: + logger.exception( + "[%s] Extractor failed during extraction", extractor.site_key + ) + return [] + + # Run all extractors concurrently + tasks = [_safe_extract(ext) for ext in self._extractors.values()] + results = await asyncio.gather(*tasks) + + # Flatten results + all_streams: list[ExtractedStream] = [] + for stream_list in results: + all_streams.extend(stream_list) + + logger.info("Extraction complete: %d total stream(s) found", len(all_streams)) + return all_streams diff --git a/stacks/f1-stream/files/backend/extractors/service.py b/stacks/f1-stream/files/backend/extractors/service.py new file mode 100644 index 00000000..6942c615 --- /dev/null +++ b/stacks/f1-stream/files/backend/extractors/service.py @@ -0,0 +1,121 @@ +"""Extraction service - manages extraction lifecycle: polling, caching, serving.""" + +import logging +from datetime import datetime, timezone + +from backend.extractors.models import ExtractedStream +from backend.extractors.registry import ExtractorRegistry + +logger = logging.getLogger(__name__) + + +class ExtractionService: + """Manages the extraction lifecycle: polling, caching, and serving results. + + Extraction runs on a background schedule (via APScheduler), never on + client request path. Results are cached in memory, keyed by site_key. + """ + + def __init__(self, registry: ExtractorRegistry) -> None: + self._registry = registry + # Cache: site_key -> list of ExtractedStream + self._cache: dict[str, list[ExtractedStream]] = {} + self._last_run: str | None = None + self._last_run_stream_count: int = 0 + + async def run_extraction(self) -> None: + """Run all extractors and cache their results. + + This is called by the background scheduler. Each extractor's + results replace its previous cache entry entirely. + """ + logger.info("Starting extraction run...") + start = datetime.now(timezone.utc) + + streams = await self._registry.extract_all() + + # Group streams by site_key and update cache + new_cache: dict[str, list[ExtractedStream]] = {} + for stream in streams: + new_cache.setdefault(stream.site_key, []).append(stream) + + # Replace cache for extractors that returned results. + # Clear cache for extractors that returned nothing (site went down, etc.) + for extractor_info in self._registry.list_extractors(): + key = extractor_info["site_key"] + if key in new_cache: + self._cache[key] = new_cache[key] + else: + # Extractor returned nothing - clear its cache + self._cache.pop(key, None) + + self._last_run = start.isoformat() + self._last_run_stream_count = len(streams) + + elapsed = (datetime.now(timezone.utc) - start).total_seconds() + logger.info( + "Extraction run complete: %d stream(s) from %d extractor(s) in %.1fs", + len(streams), + len(new_cache), + elapsed, + ) + + def get_streams(self) -> list[dict]: + """Return all cached streams as a flat list of dicts. + + Returns: + List of serialized ExtractedStream dicts from all extractors. + """ + all_streams: list[dict] = [] + for streams in self._cache.values(): + all_streams.extend(s.to_dict() for s in streams) + return all_streams + + def get_streams_for_session(self, session_type: str) -> list[dict]: + """Return cached streams filtered/annotated for a specific session type. + + Currently returns all streams (extractors don't yet differentiate by + session type). This method exists as a hook for future filtering, + e.g., some extractors might only have race streams but not FP streams. + + Args: + session_type: The F1 session type (e.g., "race", "qualifying", "fp1"). + + Returns: + List of serialized ExtractedStream dicts. + """ + # For now, all streams are potentially relevant to any session. + # Future extractors may tag streams with session types, at which + # point this method will filter accordingly. + streams = self.get_streams() + logger.debug( + "Returning %d stream(s) for session type '%s'", + len(streams), + session_type, + ) + return streams + + def get_status(self) -> dict: + """Return extraction service status for the /extractors endpoint.""" + extractor_list = self._registry.list_extractors() + extractor_statuses = [] + + for info in extractor_list: + key = info["site_key"] + cached = self._cache.get(key, []) + extractor_statuses.append( + { + "site_key": key, + "site_name": info["site_name"], + "cached_streams": len(cached), + } + ) + + return { + "extractors": extractor_statuses, + "total_cached_streams": sum( + len(streams) for streams in self._cache.values() + ), + "last_run": self._last_run, + "last_run_stream_count": self._last_run_stream_count, + } diff --git a/stacks/f1-stream/files/backend/main.py b/stacks/f1-stream/files/backend/main.py index fe830354..ef639a77 100644 --- a/stacks/f1-stream/files/backend/main.py +++ b/stacks/f1-stream/files/backend/main.py @@ -1,12 +1,14 @@ -"""F1 Streams - FastAPI backend with schedule service.""" +"""F1 Streams - FastAPI backend with schedule and stream extraction services.""" import logging from contextlib import asynccontextmanager from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger from fastapi import FastAPI +from backend.extractors import create_extraction_service from backend.schedule import ScheduleService logging.basicConfig( @@ -16,21 +18,67 @@ logging.basicConfig( logger = logging.getLogger(__name__) schedule_service = ScheduleService() +extraction_service = create_extraction_service() scheduler = AsyncIOScheduler() async def _scheduled_refresh() -> None: - """Callback for APScheduler daily refresh.""" + """Callback for APScheduler daily schedule refresh.""" logger.info("Running scheduled schedule refresh...") await schedule_service.refresh() +async def _scheduled_extraction() -> None: + """Callback for APScheduler stream extraction. + + Adjusts its own interval based on whether a session is currently live: + - During a live session: reschedule to every 5 minutes + - Otherwise: reschedule to every 30 minutes + """ + logger.info("Running scheduled extraction...") + await extraction_service.run_extraction() + + # Check if any session is currently live and adjust polling interval + schedule_data = schedule_service.get_schedule() + is_live = False + for race in schedule_data.get("races", []): + for session in race.get("sessions", []): + if session.get("status") == "live": + is_live = True + break + if is_live: + break + + # Update the extraction job interval based on live status + job = scheduler.get_job("stream_extraction") + if job: + current_interval = getattr(job.trigger, "interval_length", None) + desired_interval = 300 if is_live else 1800 # 5 min or 30 min + + if current_interval != desired_interval: + interval_minutes = 5 if is_live else 30 + scheduler.reschedule_job( + "stream_extraction", + trigger=IntervalTrigger(minutes=interval_minutes), + ) + logger.info( + "Extraction interval adjusted to %d minutes (live=%s)", + interval_minutes, + is_live, + ) + + @asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown lifecycle handler.""" # Startup: load schedule and start background scheduler await schedule_service.initialize() + # Run initial extraction + logger.info("Running initial stream extraction...") + await extraction_service.run_extraction() + + # Schedule daily schedule refresh scheduler.add_job( _scheduled_refresh, trigger=CronTrigger(hour=3, minute=0, timezone="UTC"), @@ -38,8 +86,18 @@ async def lifespan(app: FastAPI): name="Refresh F1 schedule daily at 03:00 UTC", replace_existing=True, ) + + # Schedule periodic stream extraction (default: every 30 minutes) + scheduler.add_job( + _scheduled_extraction, + trigger=IntervalTrigger(minutes=30), + id="stream_extraction", + name="Extract streams from all registered sites", + replace_existing=True, + ) + scheduler.start() - logger.info("APScheduler started - daily refresh at 03:00 UTC") + logger.info("APScheduler started - schedule refresh at 03:00 UTC, extraction every 30m") yield @@ -51,6 +109,9 @@ async def lifespan(app: FastAPI): app = FastAPI(title="F1 Streams", lifespan=lifespan) +# --- Health & Info --- + + @app.get("/health") async def health(): return {"status": "ok"} @@ -58,7 +119,10 @@ async def health(): @app.get("/") async def root(): - return {"service": "f1-streams", "version": "2.0.1"} + return {"service": "f1-streams", "version": "3.0.0"} + + +# --- Schedule --- @app.get("/schedule") @@ -74,6 +138,37 @@ async def refresh_schedule(): return {"status": "refreshed"} +# --- Streams & Extraction --- + + +@app.get("/streams") +async def get_streams(): + """Return all currently cached streams from all extractors.""" + streams = extraction_service.get_streams() + return { + "streams": streams, + "count": len(streams), + } + + +@app.get("/extractors") +async def get_extractors(): + """List registered extractors and their current status.""" + return extraction_service.get_status() + + +@app.post("/extract") +async def trigger_extraction(): + """Manually trigger an extraction run across all registered extractors.""" + await extraction_service.run_extraction() + status = extraction_service.get_status() + return { + "status": "extraction_complete", + "streams_found": status["total_cached_streams"], + "extractors_run": len(status["extractors"]), + } + + if __name__ == "__main__": import uvicorn diff --git a/stacks/f1-stream/main.tf b/stacks/f1-stream/main.tf index 889ccc07..7d84fa2c 100644 --- a/stacks/f1-stream/main.tf +++ b/stacks/f1-stream/main.tf @@ -36,7 +36,7 @@ resource "kubernetes_deployment" "f1-stream" { } spec { container { - image = "viktorbarzin/f1-stream:v2.0.3" + image = "viktorbarzin/f1-stream:v3.0.0" name = "f1-stream" resources { limits = {