From 9bf0523ea957a5d943e2c4002afc94a88ac86a0a Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Mon, 23 Feb 2026 23:41:16 +0000 Subject: [PATCH] [ci skip] f1-stream: add stream health checker and HLS proxy (Phases 4-5) Phase 4 - Stream Health and Fallback: - StreamHealthChecker with partial GET validation of m3u8 content - Bitrate extraction from BANDWIDTH tags - Response time measurement for quality ranking - Fallback ordering: live first, fastest response time first - GET /streams now only returns health-verified streams Phase 5 - HLS Proxy Core: - GET /proxy?url= - m3u8 playlist fetch with full URI rewriting - GET /relay?url= - chunked segment relay (never buffers full segment) - m3u8 rewriter handles master, variant, and segment URIs - Base64url encoding for URL parameters - CORS middleware for browser playback - Range header forwarding for seeking support --- .../files/backend/extractors/models.py | 6 + .../files/backend/extractors/service.py | 102 +++++-- stacks/f1-stream/files/backend/health.py | 233 ++++++++++++++++ .../f1-stream/files/backend/m3u8_rewriter.py | 264 ++++++++++++++++++ stacks/f1-stream/files/backend/main.py | 112 +++++++- stacks/f1-stream/files/backend/proxy.py | 229 +++++++++++++++ 6 files changed, 926 insertions(+), 20 deletions(-) create mode 100644 stacks/f1-stream/files/backend/health.py create mode 100644 stacks/f1-stream/files/backend/m3u8_rewriter.py create mode 100644 stacks/f1-stream/files/backend/proxy.py diff --git a/stacks/f1-stream/files/backend/extractors/models.py b/stacks/f1-stream/files/backend/extractors/models.py index a66723c0..5d665544 100644 --- a/stacks/f1-stream/files/backend/extractors/models.py +++ b/stacks/f1-stream/files/backend/extractors/models.py @@ -15,6 +15,9 @@ class ExtractedStream: title: str = "" # e.g., "F1 Race Live" extracted_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) is_live: bool = False # Whether it passed health check + response_time_ms: int = 0 # Health check response time (lower = better) + checked_at: str = "" # ISO timestamp of last health check + bitrate: int = 0 # Bitrate in bps if detectable from m3u8 playlist def to_dict(self) -> dict: """Serialize to a plain dictionary for JSON responses.""" @@ -26,4 +29,7 @@ class ExtractedStream: "title": self.title, "extracted_at": self.extracted_at, "is_live": self.is_live, + "response_time_ms": self.response_time_ms, + "checked_at": self.checked_at, + "bitrate": self.bitrate, } diff --git a/stacks/f1-stream/files/backend/extractors/service.py b/stacks/f1-stream/files/backend/extractors/service.py index 6942c615..30b9b390 100644 --- a/stacks/f1-stream/files/backend/extractors/service.py +++ b/stacks/f1-stream/files/backend/extractors/service.py @@ -1,19 +1,25 @@ -"""Extraction service - manages extraction lifecycle: polling, caching, serving.""" +"""Extraction service - manages extraction lifecycle: polling, caching, health checking, serving.""" import logging from datetime import datetime, timezone from backend.extractors.models import ExtractedStream from backend.extractors.registry import ExtractorRegistry +from backend.health import StreamHealthChecker logger = logging.getLogger(__name__) class ExtractionService: - """Manages the extraction lifecycle: polling, caching, and serving results. + """Manages the extraction lifecycle: polling, caching, health checking, and serving. Extraction runs on a background schedule (via APScheduler), never on - client request path. Results are cached in memory, keyed by site_key. + client request path. After extraction, health checks verify each stream + is live. Results are cached in memory, keyed by site_key. + + GET /streams only returns streams that passed health checks, sorted by: + 1. is_live (live streams first) + 2. response_time_ms (fastest first) """ def __init__(self, registry: ExtractorRegistry) -> None: @@ -22,18 +28,36 @@ class ExtractionService: self._cache: dict[str, list[ExtractedStream]] = {} self._last_run: str | None = None self._last_run_stream_count: int = 0 + self._health_checker = StreamHealthChecker() async def run_extraction(self) -> None: - """Run all extractors and cache their results. + """Run all extractors, health-check results, and cache them. This is called by the background scheduler. Each extractor's - results replace its previous cache entry entirely. + results replace its previous cache entry entirely. After extraction, + health checks are run to verify streams are live and measure + response times. """ logger.info("Starting extraction run...") start = datetime.now(timezone.utc) streams = await self._registry.extract_all() + # Run health checks on all extracted streams + if streams: + stream_dicts = [s.to_dict() for s in streams] + health_map = await self._health_checker.check_all(stream_dicts) + + # Update stream objects with health check results + for stream in streams: + health = health_map.get(stream.url) + if health: + stream.is_live = health.is_live + stream.response_time_ms = health.response_time_ms + stream.checked_at = health.checked_at + if health.bitrate > 0: + stream.bitrate = health.bitrate + # Group streams by site_key and update cache new_cache: dict[str, list[ExtractedStream]] = {} for stream in streams: @@ -52,29 +76,68 @@ class ExtractionService: self._last_run = start.isoformat() self._last_run_stream_count = len(streams) + live_count = sum( + 1 for streams_list in self._cache.values() + for s in streams_list if s.is_live + ) elapsed = (datetime.now(timezone.utc) - start).total_seconds() logger.info( - "Extraction run complete: %d stream(s) from %d extractor(s) in %.1fs", + "Extraction run complete: %d stream(s) from %d extractor(s) in %.1fs (%d live)", len(streams), len(new_cache), elapsed, + live_count, ) def get_streams(self) -> list[dict]: - """Return all cached streams as a flat list of dicts. + """Return all cached streams as a sorted list of dicts. + + Only returns streams that passed health checks (is_live=True). + Sorted by fallback priority: + 1. is_live (live streams first) - filters to live only + 2. response_time_ms (fastest first) Returns: - List of serialized ExtractedStream dicts from all extractors. + List of serialized ExtractedStream dicts from all extractors, + filtered to live-only and sorted by response time. """ - all_streams: list[dict] = [] + all_streams: list[ExtractedStream] = [] for streams in self._cache.values(): - all_streams.extend(s.to_dict() for s in streams) - return all_streams + all_streams.extend(streams) + + # Sort by fallback priority: live first, then fastest response + all_streams.sort( + key=lambda s: (not s.is_live, s.response_time_ms) + ) + + # Only return live streams to clients + live_streams = [s for s in all_streams if s.is_live] + return [s.to_dict() for s in live_streams] + + def get_all_streams_unfiltered(self) -> list[dict]: + """Return ALL cached streams including unhealthy ones. + + Used for debugging and status endpoints. Sorted by fallback priority + but includes streams that failed health checks. + + Returns: + List of all serialized ExtractedStream dicts. + """ + all_streams: list[ExtractedStream] = [] + for streams in self._cache.values(): + all_streams.extend(streams) + + # Sort by fallback priority: live first, then fastest response + all_streams.sort( + key=lambda s: (not s.is_live, s.response_time_ms) + ) + + return [s.to_dict() for s in all_streams] def get_streams_for_session(self, session_type: str) -> list[dict]: """Return cached streams filtered/annotated for a specific session type. - Currently returns all streams (extractors don't yet differentiate by + Currently returns all live streams (extractors don't yet differentiate by session type). This method exists as a hook for future filtering, e.g., some extractors might only have race streams but not FP streams. @@ -82,7 +145,7 @@ class ExtractionService: session_type: The F1 session type (e.g., "race", "qualifying", "fp1"). Returns: - List of serialized ExtractedStream dicts. + List of serialized ExtractedStream dicts (live only, sorted). """ # For now, all streams are potentially relevant to any session. # Future extractors may tag streams with session types, at which @@ -103,19 +166,26 @@ class ExtractionService: for info in extractor_list: key = info["site_key"] cached = self._cache.get(key, []) + live_count = sum(1 for s in cached if s.is_live) extractor_statuses.append( { "site_key": key, "site_name": info["site_name"], "cached_streams": len(cached), + "live_streams": live_count, } ) + total_cached = sum(len(streams) for streams in self._cache.values()) + total_live = sum( + 1 for streams in self._cache.values() + for s in streams if s.is_live + ) + return { "extractors": extractor_statuses, - "total_cached_streams": sum( - len(streams) for streams in self._cache.values() - ), + "total_cached_streams": total_cached, + "total_live_streams": total_live, "last_run": self._last_run, "last_run_stream_count": self._last_run_stream_count, } diff --git a/stacks/f1-stream/files/backend/health.py b/stacks/f1-stream/files/backend/health.py new file mode 100644 index 00000000..63f92fdb --- /dev/null +++ b/stacks/f1-stream/files/backend/health.py @@ -0,0 +1,233 @@ +"""Stream health checker - verifies extracted streams are live and responsive. + +Performs GET requests against m3u8 URLs to verify they contain valid HLS +playlists (#EXTM3U header), measures response times for quality ranking, +and supports concurrent checking of multiple streams. +""" + +import asyncio +import logging +import time +from dataclasses import dataclass, field +from datetime import datetime, timezone + +import httpx + +logger = logging.getLogger(__name__) + +# How long to wait for a single health check (seconds) +HEALTH_CHECK_TIMEOUT = 10.0 + +# Maximum bytes to read when verifying m3u8 content +# We only need to see the #EXTM3U header and a few lines +MAX_CONTENT_BYTES = 8192 + +# User-Agent to send with health check requests +USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/120.0.0.0 Safari/537.36" +) + + +@dataclass +class StreamHealth: + """Result of a single stream health check.""" + + url: str + is_live: bool + response_time_ms: int # Lower = better quality indicator + checked_at: str = field( + default_factory=lambda: datetime.now(timezone.utc).isoformat() + ) + error: str = "" # Error message if not live + bitrate: int = 0 # Bitrate in bps if detectable from playlist + + def to_dict(self) -> dict: + """Serialize to a plain dictionary for JSON responses.""" + return { + "url": self.url, + "is_live": self.is_live, + "response_time_ms": self.response_time_ms, + "checked_at": self.checked_at, + "error": self.error, + "bitrate": self.bitrate, + } + + +def _extract_bitrate(content: str) -> int: + """Try to extract bitrate from m3u8 playlist content. + + Looks for BANDWIDTH= in #EXT-X-STREAM-INF tags. Returns the highest + bitrate found, or 0 if none detected. + """ + max_bitrate = 0 + for line in content.splitlines(): + if "BANDWIDTH=" in line: + try: + # Parse BANDWIDTH= from the tag + for part in line.split(","): + part = part.strip() + if part.startswith("BANDWIDTH="): + bw = int(part.split("=", 1)[1]) + max_bitrate = max(max_bitrate, bw) + except (ValueError, IndexError): + continue + return max_bitrate + + +class StreamHealthChecker: + """Background health checker for extracted streams. + + Verifies streams are live by performing a partial GET on the m3u8 URL, + checking for valid HLS content (#EXTM3U header), and measuring response + time as a quality indicator. + """ + + def __init__(self, timeout: float = HEALTH_CHECK_TIMEOUT) -> None: + self._timeout = timeout + + async def check_stream(self, url: str) -> StreamHealth: + """Check if a stream URL is live by doing a partial GET on the m3u8. + + Verification steps: + 1. GET the m3u8 URL (not just HEAD - need to verify playlist content) + 2. Check if response contains #EXTM3U header + 3. Measure response time as a quality indicator + 4. Extract bitrate info if available + + Args: + url: The m3u8 stream URL to check. + + Returns: + StreamHealth with is_live, response_time_ms, checked_at, and + optional bitrate and error information. + """ + start_time = time.monotonic() + checked_at = datetime.now(timezone.utc).isoformat() + + try: + async with httpx.AsyncClient( + timeout=self._timeout, + follow_redirects=True, + headers={ + "User-Agent": USER_AGENT, + "Accept": "*/*", + }, + ) as client: + # Use a partial GET with Range header to limit download + # but fall back to reading limited bytes if Range not supported + response = await client.get( + url, + headers={"Range": f"bytes=0-{MAX_CONTENT_BYTES - 1}"}, + ) + + elapsed_ms = int((time.monotonic() - start_time) * 1000) + + # Accept 200 (full content) or 206 (partial content) + if response.status_code not in (200, 206): + return StreamHealth( + url=url, + is_live=False, + response_time_ms=elapsed_ms, + checked_at=checked_at, + error=f"HTTP {response.status_code}", + ) + + content = response.text[:MAX_CONTENT_BYTES] + + # Verify it's a valid HLS playlist + if "#EXTM3U" not in content: + return StreamHealth( + url=url, + is_live=False, + response_time_ms=elapsed_ms, + checked_at=checked_at, + error="Response does not contain #EXTM3U header", + ) + + # Extract bitrate info if available + bitrate = _extract_bitrate(content) + + return StreamHealth( + url=url, + is_live=True, + response_time_ms=elapsed_ms, + checked_at=checked_at, + bitrate=bitrate, + ) + + except httpx.TimeoutException: + elapsed_ms = int((time.monotonic() - start_time) * 1000) + logger.debug("Health check timed out for %s", url) + return StreamHealth( + url=url, + is_live=False, + response_time_ms=elapsed_ms, + checked_at=checked_at, + error="Timeout", + ) + except httpx.HTTPError as e: + elapsed_ms = int((time.monotonic() - start_time) * 1000) + logger.debug("Health check HTTP error for %s: %s", url, e) + return StreamHealth( + url=url, + is_live=False, + response_time_ms=elapsed_ms, + checked_at=checked_at, + error=f"HTTP error: {e}", + ) + except Exception as e: + elapsed_ms = int((time.monotonic() - start_time) * 1000) + logger.exception("Unexpected error during health check for %s", url) + return StreamHealth( + url=url, + is_live=False, + response_time_ms=elapsed_ms, + checked_at=checked_at, + error=f"Unexpected error: {e}", + ) + + async def check_all( + self, streams: list[dict], + ) -> dict[str, StreamHealth]: + """Check all streams concurrently, return health map keyed by URL. + + Args: + streams: List of stream dicts (must have a "url" key). + + Returns: + Dictionary mapping stream URL to its StreamHealth result. + """ + urls = [s["url"] for s in streams if "url" in s] + + if not urls: + return {} + + logger.info("Running health checks on %d stream(s)...", len(urls)) + + # Run all checks concurrently + tasks = [self.check_stream(url) for url in urls] + results = await asyncio.gather(*tasks, return_exceptions=True) + + health_map: dict[str, StreamHealth] = {} + for url, result in zip(urls, results): + if isinstance(result, Exception): + logger.error("Health check task failed for %s: %s", url, result) + health_map[url] = StreamHealth( + url=url, + is_live=False, + response_time_ms=0, + error=f"Task error: {result}", + ) + else: + health_map[url] = result + + live_count = sum(1 for h in health_map.values() if h.is_live) + logger.info( + "Health checks complete: %d/%d streams are live", + live_count, + len(health_map), + ) + + return health_map diff --git a/stacks/f1-stream/files/backend/m3u8_rewriter.py b/stacks/f1-stream/files/backend/m3u8_rewriter.py new file mode 100644 index 00000000..a7b40451 --- /dev/null +++ b/stacks/f1-stream/files/backend/m3u8_rewriter.py @@ -0,0 +1,264 @@ +"""m3u8 playlist rewriter - rewrites URIs in HLS playlists to go through the proxy. + +Handles both master playlists (containing variant stream references) and +media playlists (containing segment URLs). Resolves relative URIs to +absolute before encoding, and routes .m3u8 references through /proxy +while routing segments (.ts, .m4s, etc.) through /relay. +""" + +import base64 +import logging +import re +from urllib.parse import urljoin + +logger = logging.getLogger(__name__) + + +def encode_url(url: str) -> str: + """Base64url-encode a URL for safe transport as a query parameter. + + Uses URL-safe base64 encoding with padding stripped to avoid + double-encoding issues when the URL contains special characters. + + Args: + url: The raw URL to encode. + + Returns: + Base64url-encoded string with padding removed. + """ + return base64.urlsafe_b64encode(url.encode()).decode().rstrip("=") + + +def decode_url(encoded: str) -> str: + """Decode a base64url-encoded URL. + + Re-adds padding that was stripped during encoding. + + Args: + encoded: Base64url-encoded string (padding may be stripped). + + Returns: + The original URL string. + + Raises: + ValueError: If the encoded string is not valid base64url. + """ + # Add padding back - base64 requires length to be multiple of 4 + padding = 4 - len(encoded) % 4 + if padding != 4: + encoded += "=" * padding + return base64.urlsafe_b64decode(encoded).decode() + + +def _resolve_uri(uri: str, base_url: str) -> str: + """Resolve a potentially relative URI against a base URL. + + Args: + uri: The URI from the m3u8 playlist (may be relative or absolute). + base_url: The URL of the playlist itself (used as base for relative URIs). + + Returns: + Absolute URL. + """ + if uri.startswith("http://") or uri.startswith("https://"): + return uri + return urljoin(base_url, uri) + + +def _is_playlist_uri(uri: str) -> bool: + """Determine if a URI likely points to another playlist (vs a segment). + + Playlist URIs end in .m3u8 or .m3u. Everything else is treated as a + segment (TS, fMP4, init segment, etc.). + + Args: + uri: The URI to classify. + + Returns: + True if the URI appears to be a playlist reference. + """ + # Strip query string for extension check + path = uri.split("?")[0].split("#")[0].lower() + return path.endswith(".m3u8") or path.endswith(".m3u") + + +def _build_proxy_url(absolute_uri: str, proxy_base: str) -> str: + """Build a /proxy URL for a playlist reference. + + Args: + absolute_uri: The absolute URL of the upstream playlist. + proxy_base: The base URL of our proxy service. + + Returns: + Rewritten URL pointing to our /proxy endpoint. + """ + encoded = encode_url(absolute_uri) + return f"{proxy_base}/proxy?url={encoded}" + + +def _build_relay_url(absolute_uri: str, proxy_base: str) -> str: + """Build a /relay URL for a segment reference. + + Args: + absolute_uri: The absolute URL of the upstream segment. + proxy_base: The base URL of our proxy service. + + Returns: + Rewritten URL pointing to our /relay endpoint. + """ + encoded = encode_url(absolute_uri) + return f"{proxy_base}/relay?url={encoded}" + + +def _rewrite_uri(uri: str, base_url: str, proxy_base: str) -> str: + """Rewrite a single URI from an m3u8 playlist. + + Resolves relative URIs, then routes playlists through /proxy and + segments through /relay. + + Args: + uri: The raw URI from the playlist. + base_url: The URL of the playlist containing this URI. + proxy_base: The base URL of our proxy service. + + Returns: + Rewritten URI pointing to our proxy. + """ + absolute = _resolve_uri(uri, base_url) + if _is_playlist_uri(uri): + return _build_proxy_url(absolute, proxy_base) + return _build_relay_url(absolute, proxy_base) + + +def rewrite_playlist(content: str, base_url: str, proxy_base: str) -> str: + """Rewrite all URIs in an m3u8 playlist to go through the proxy. + + Handles both master playlists (with #EXT-X-STREAM-INF variant + references) and media playlists (with segment URIs). Also handles + #EXT-X-MAP:URI= init segment references. + + Args: + content: The raw m3u8 playlist text. + base_url: The original URL of this playlist (for resolving relative URIs). + proxy_base: The base URL of our proxy (e.g., "https://f1.viktorbarzin.me"). + + Returns: + The rewritten m3u8 playlist text with all URIs proxied. + """ + proxy_base = proxy_base.rstrip("/") + lines = content.splitlines() + output_lines: list[str] = [] + + # Track if the previous line was #EXT-X-STREAM-INF (next line is a variant URI) + next_is_variant = False + + for line in lines: + stripped = line.strip() + + # Handle #EXT-X-MAP:URI="..." (init segment) + if stripped.startswith("#EXT-X-MAP:"): + output_lines.append(_rewrite_ext_x_map(stripped, base_url, proxy_base)) + continue + + # Handle #EXT-X-STREAM-INF (marks next line as variant playlist URI) + if stripped.startswith("#EXT-X-STREAM-INF:"): + output_lines.append(line) + next_is_variant = True + continue + + # Handle #EXT-X-MEDIA with URI= attribute + if stripped.startswith("#EXT-X-MEDIA:") and "URI=" in stripped: + output_lines.append(_rewrite_ext_x_media(stripped, base_url, proxy_base)) + continue + + # Handle #EXT-X-I-FRAME-STREAM-INF with URI= attribute + if stripped.startswith("#EXT-X-I-FRAME-STREAM-INF:") and "URI=" in stripped: + output_lines.append( + _rewrite_tag_with_uri(stripped, base_url, proxy_base, is_playlist=True) + ) + continue + + # If previous line was #EXT-X-STREAM-INF, this line is a variant playlist URI + if next_is_variant and stripped and not stripped.startswith("#"): + absolute = _resolve_uri(stripped, base_url) + output_lines.append(_build_proxy_url(absolute, proxy_base)) + next_is_variant = False + continue + + # Regular URI line (non-comment, non-empty, not a tag) + if stripped and not stripped.startswith("#"): + # This is a segment URI (TS, fMP4, etc.) + absolute = _resolve_uri(stripped, base_url) + output_lines.append(_build_relay_url(absolute, proxy_base)) + continue + + # Tags and comments pass through unchanged + output_lines.append(line) + # Reset variant flag if we hit another tag + if stripped.startswith("#") and not stripped.startswith("#EXT-X-STREAM-INF:"): + next_is_variant = False + + return "\n".join(output_lines) + + +def _rewrite_ext_x_map(line: str, base_url: str, proxy_base: str) -> str: + """Rewrite the URI in an #EXT-X-MAP tag. + + #EXT-X-MAP:URI="init.mp4" -> #EXT-X-MAP:URI="" + The init segment goes through /relay since it's binary data. + """ + # Match URI="..." or URI=... (with or without quotes) + match = re.search(r'URI="([^"]+)"', line) + if not match: + match = re.search(r"URI=([^,\s]+)", line) + + if not match: + return line + + original_uri = match.group(1) + absolute = _resolve_uri(original_uri, base_url) + relay_url = _build_relay_url(absolute, proxy_base) + + return line[:match.start(1)] + relay_url + line[match.end(1):] + + +def _rewrite_ext_x_media(line: str, base_url: str, proxy_base: str) -> str: + """Rewrite the URI in an #EXT-X-MEDIA tag. + + #EXT-X-MEDIA:TYPE=AUDIO,...,URI="audio.m3u8" -> rewrite URI to /proxy + """ + return _rewrite_tag_with_uri(line, base_url, proxy_base, is_playlist=True) + + +def _rewrite_tag_with_uri( + line: str, base_url: str, proxy_base: str, is_playlist: bool = False, +) -> str: + """Rewrite the URI attribute within an HLS tag line. + + Generic handler for any tag that contains a URI="..." attribute. + + Args: + line: The full tag line. + base_url: Base URL for resolving relative URIs. + proxy_base: Our proxy base URL. + is_playlist: If True, route through /proxy; otherwise /relay. + + Returns: + The tag line with the URI rewritten. + """ + match = re.search(r'URI="([^"]+)"', line) + if not match: + match = re.search(r"URI=([^,\s]+)", line) + + if not match: + return line + + original_uri = match.group(1) + absolute = _resolve_uri(original_uri, base_url) + + if is_playlist: + new_url = _build_proxy_url(absolute, proxy_base) + else: + new_url = _build_relay_url(absolute, proxy_base) + + return line[:match.start(1)] + new_url + line[match.end(1):] diff --git a/stacks/f1-stream/files/backend/main.py b/stacks/f1-stream/files/backend/main.py index ef639a77..a3d3e661 100644 --- a/stacks/f1-stream/files/backend/main.py +++ b/stacks/f1-stream/files/backend/main.py @@ -1,4 +1,4 @@ -"""F1 Streams - FastAPI backend with schedule and stream extraction services.""" +"""F1 Streams - FastAPI backend with schedule, stream extraction, health checking, and HLS proxy.""" import logging from contextlib import asynccontextmanager @@ -6,9 +6,12 @@ from contextlib import asynccontextmanager from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger -from fastapi import FastAPI +from fastapi import FastAPI, Query, Request +from fastapi.middleware.cors import CORSMiddleware +from starlette.responses import Response, StreamingResponse from backend.extractors import create_extraction_service +from backend.proxy import proxy_playlist, relay_stream from backend.schedule import ScheduleService logging.basicConfig( @@ -108,6 +111,16 @@ async def lifespan(app: FastAPI): app = FastAPI(title="F1 Streams", lifespan=lifespan) +# --- CORS Middleware --- +# Required for browser-based HLS players to access proxy/relay endpoints +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["GET", "OPTIONS"], + allow_headers=["Range"], + expose_headers=["Content-Range", "Content-Length", "Content-Type"], +) + # --- Health & Info --- @@ -119,7 +132,7 @@ async def health(): @app.get("/") async def root(): - return {"service": "f1-streams", "version": "3.0.0"} + return {"service": "f1-streams", "version": "4.0.0"} # --- Schedule --- @@ -143,7 +156,12 @@ async def refresh_schedule(): @app.get("/streams") async def get_streams(): - """Return all currently cached streams from all extractors.""" + """Return all currently cached streams that passed health checks. + + Streams are sorted by fallback priority: + 1. Live streams only (is_live=True) + 2. Fastest response time first (lowest response_time_ms) + """ streams = extraction_service.get_streams() return { "streams": streams, @@ -151,6 +169,20 @@ async def get_streams(): } +@app.get("/streams/all") +async def get_all_streams(): + """Return ALL cached streams including unhealthy ones (for debugging). + + Unlike GET /streams, this endpoint includes streams that failed health + checks. Useful for diagnosing extraction or health check issues. + """ + streams = extraction_service.get_all_streams_unfiltered() + return { + "streams": streams, + "count": len(streams), + } + + @app.get("/extractors") async def get_extractors(): """List registered extractors and their current status.""" @@ -165,10 +197,82 @@ async def trigger_extraction(): return { "status": "extraction_complete", "streams_found": status["total_cached_streams"], + "live_streams": status["total_live_streams"], "extractors_run": len(status["extractors"]), } +# --- HLS Proxy --- + + +def _get_proxy_base(request: Request) -> str: + """Derive the proxy base URL from the incoming request. + + Uses X-Forwarded-Proto and X-Forwarded-Host headers if present + (behind a reverse proxy), otherwise falls back to request URL. + """ + proto = request.headers.get("x-forwarded-proto", request.url.scheme) + host = request.headers.get("x-forwarded-host", request.url.netloc) + return f"{proto}://{host}" + + +@app.get("/proxy") +async def proxy_endpoint( + request: Request, + url: str = Query(..., description="Base64url-encoded m3u8 playlist URL"), +): + """Proxy an upstream m3u8 playlist with URI rewriting. + + Fetches the upstream m3u8 playlist, rewrites all URIs to route through + our /proxy (for sub-playlists) and /relay (for segments) endpoints, + and returns the rewritten playlist. + + The `url` parameter must be base64url-encoded to avoid URL encoding issues. + + Example: + GET /proxy?url=aHR0cHM6Ly9leGFtcGxlLmNvbS9zdHJlYW0ubTN1OA + """ + proxy_base = _get_proxy_base(request) + rewritten = await proxy_playlist(url, proxy_base) + + return Response( + content=rewritten, + media_type="application/vnd.apple.mpegurl", + headers={ + "Cache-Control": "no-cache, no-store, must-revalidate", + }, + ) + + +@app.get("/relay") +async def relay_endpoint( + request: Request, + url: str = Query(..., description="Base64url-encoded segment URL"), +): + """Relay an upstream media segment as a chunked byte stream. + + Fetches the upstream segment (TS, fMP4, init segment, etc.) and streams + it to the client using chunked transfer encoding. Never buffers the + full segment in memory. + + The `url` parameter must be base64url-encoded to avoid URL encoding issues. + + Supports HTTP Range requests for seeking. + + Example: + GET /relay?url=aHR0cHM6Ly9leGFtcGxlLmNvbS9zZWdtZW50LnRz + """ + range_header = request.headers.get("range") + + stream_gen, headers, status_code = await relay_stream(url, range_header) + + return StreamingResponse( + stream_gen, + status_code=status_code, + headers=headers, + ) + + if __name__ == "__main__": import uvicorn diff --git a/stacks/f1-stream/files/backend/proxy.py b/stacks/f1-stream/files/backend/proxy.py new file mode 100644 index 00000000..ade8283b --- /dev/null +++ b/stacks/f1-stream/files/backend/proxy.py @@ -0,0 +1,229 @@ +"""HLS proxy - fetches upstream m3u8 playlists and relays media segments. + +Two core functions: +1. Playlist proxy: fetches an upstream m3u8 playlist, rewrites all URIs + to route through our /proxy and /relay endpoints, returns the rewritten + playlist to the client. +2. Segment relay: fetches an upstream media segment (TS, fMP4, init) and + streams it to the client using chunked transfer encoding, never buffering + the full segment in memory. + +All responses include CORS headers for browser playback. +""" + +import logging +from typing import AsyncGenerator + +import httpx +from fastapi import HTTPException + +from backend.m3u8_rewriter import decode_url, rewrite_playlist + +logger = logging.getLogger(__name__) + +# Chunk size for relay streaming (64 KB) +RELAY_CHUNK_SIZE = 65536 + +# Timeout for upstream playlist fetches (seconds) +PLAYLIST_TIMEOUT = 15.0 + +# Timeout for upstream segment relay - longer because segments are bigger +RELAY_TIMEOUT = 30.0 + +# User-Agent for upstream requests +USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/120.0.0.0 Safari/537.36" +) + + +async def proxy_playlist(encoded_url: str, proxy_base: str) -> str: + """Fetch an upstream m3u8 playlist and rewrite all URIs through our proxy. + + Args: + encoded_url: Base64url-encoded URL of the upstream m3u8 playlist. + proxy_base: The base URL of our proxy service for rewriting URIs + (e.g., "https://f1.viktorbarzin.me"). + + Returns: + The rewritten m3u8 playlist text. + + Raises: + HTTPException: If the URL can't be decoded, upstream fails, or + content is not a valid HLS playlist. + """ + # Decode the URL + try: + url = decode_url(encoded_url) + except Exception as e: + logger.error("Failed to decode proxy URL: %s", e) + raise HTTPException(status_code=400, detail=f"Invalid encoded URL: {e}") + + logger.info("Proxying playlist: %s", url) + + # Fetch the upstream playlist + try: + async with httpx.AsyncClient( + timeout=PLAYLIST_TIMEOUT, + follow_redirects=True, + headers={ + "User-Agent": USER_AGENT, + "Accept": "*/*", + }, + ) as client: + response = await client.get(url) + + if response.status_code != 200: + logger.warning( + "Upstream playlist returned HTTP %d for %s", + response.status_code, + url, + ) + raise HTTPException( + status_code=502, + detail=f"Upstream returned HTTP {response.status_code}", + ) + + content = response.text + + except httpx.TimeoutException: + logger.error("Timeout fetching upstream playlist: %s", url) + raise HTTPException(status_code=504, detail="Upstream playlist timeout") + except httpx.HTTPError as e: + logger.error("HTTP error fetching upstream playlist: %s - %s", url, e) + raise HTTPException(status_code=502, detail=f"Upstream error: {e}") + except HTTPException: + raise + except Exception as e: + logger.exception("Unexpected error fetching playlist: %s", url) + raise HTTPException(status_code=500, detail=f"Internal error: {e}") + + # Validate it looks like an m3u8 playlist + if "#EXTM3U" not in content: + logger.warning("Upstream response is not a valid m3u8 playlist: %s", url) + raise HTTPException( + status_code=502, + detail="Upstream response is not a valid HLS playlist", + ) + + # Rewrite all URIs to go through our proxy + rewritten = rewrite_playlist(content, url, proxy_base) + + logger.debug( + "Proxied playlist from %s: %d bytes -> %d bytes", + url, + len(content), + len(rewritten), + ) + + return rewritten + + +async def relay_stream( + encoded_url: str, + range_header: str | None = None, +) -> tuple[AsyncGenerator[bytes, None], dict[str, str], int]: + """Relay an upstream media segment as a chunked byte stream. + + Never buffers the full segment in memory. Streams chunks as they + arrive from the upstream server. + + Args: + encoded_url: Base64url-encoded URL of the upstream segment. + range_header: Optional HTTP Range header from the client to + forward to upstream. + + Returns: + A tuple of (async_generator, headers_dict, status_code) where: + - async_generator yields bytes chunks + - headers_dict contains content-type and other relevant headers + - status_code is the HTTP status (200 or 206) + + Raises: + HTTPException: If the URL can't be decoded or upstream fails. + """ + # Decode the URL + try: + url = decode_url(encoded_url) + except Exception as e: + logger.error("Failed to decode relay URL: %s", e) + raise HTTPException(status_code=400, detail=f"Invalid encoded URL: {e}") + + logger.debug("Relaying segment: %s", url) + + # Build upstream request headers + headers = { + "User-Agent": USER_AGENT, + "Accept": "*/*", + } + if range_header: + headers["Range"] = range_header + + # Create the client and stream - caller is responsible for cleanup + # via the async generator protocol + client = httpx.AsyncClient( + timeout=RELAY_TIMEOUT, + follow_redirects=True, + ) + + try: + response = await client.send( + client.build_request("GET", url, headers=headers), + stream=True, + ) + + if response.status_code not in (200, 206): + await response.aclose() + await client.aclose() + logger.warning( + "Upstream segment returned HTTP %d for %s", + response.status_code, + url, + ) + raise HTTPException( + status_code=502, + detail=f"Upstream returned HTTP {response.status_code}", + ) + + # Collect relevant response headers to forward + response_headers: dict[str, str] = {} + + content_type = response.headers.get("content-type", "video/mp2t") + response_headers["Content-Type"] = content_type + + if "content-length" in response.headers: + response_headers["Content-Length"] = response.headers["content-length"] + + if "content-range" in response.headers: + response_headers["Content-Range"] = response.headers["content-range"] + + status_code = response.status_code + + async def _stream_chunks() -> AsyncGenerator[bytes, None]: + """Yield chunks from the upstream response, then clean up.""" + try: + async for chunk in response.aiter_bytes(chunk_size=RELAY_CHUNK_SIZE): + yield chunk + except Exception as e: + logger.error("Error streaming segment from %s: %s", url, e) + finally: + await response.aclose() + await client.aclose() + + return _stream_chunks(), response_headers, status_code + + except HTTPException: + raise + except httpx.TimeoutException: + await client.aclose() + logger.error("Timeout relaying segment: %s", url) + raise HTTPException(status_code=504, detail="Upstream segment timeout") + except httpx.HTTPError as e: + await client.aclose() + logger.error("HTTP error relaying segment: %s - %s", url, e) + raise HTTPException(status_code=502, detail=f"Upstream error: {e}") + except Exception as e: + await client.aclose() + logger.exception("Unexpected error relaying segment: %s", url) + raise HTTPException(status_code=500, detail=f"Internal error: {e}")