[ci skip] f1-stream: add stream health checker and HLS proxy (Phases 4-5)

Phase 4 - Stream Health and Fallback:
- StreamHealthChecker with partial GET validation of m3u8 content
- Bitrate extraction from BANDWIDTH tags
- Response time measurement for quality ranking
- Fallback ordering: live first, fastest response time first
- GET /streams now only returns health-verified streams

Phase 5 - HLS Proxy Core:
- GET /proxy?url= - m3u8 playlist fetch with full URI rewriting
- GET /relay?url= - chunked segment relay (never buffers full segment)
- m3u8 rewriter handles master, variant, and segment URIs
- Base64url encoding for URL parameters
- CORS middleware for browser playback
- Range header forwarding for seeking support
This commit is contained in:
Viktor Barzin 2026-02-23 23:41:16 +00:00
parent 00d9458dec
commit 9bf0523ea9
No known key found for this signature in database
GPG key ID: 0EB088298288D958
6 changed files with 926 additions and 20 deletions

View file

@ -15,6 +15,9 @@ class ExtractedStream:
title: str = "" # e.g., "F1 Race Live"
extracted_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
is_live: bool = False # Whether it passed health check
response_time_ms: int = 0 # Health check response time (lower = better)
checked_at: str = "" # ISO timestamp of last health check
bitrate: int = 0 # Bitrate in bps if detectable from m3u8 playlist
def to_dict(self) -> dict:
"""Serialize to a plain dictionary for JSON responses."""
@ -26,4 +29,7 @@ class ExtractedStream:
"title": self.title,
"extracted_at": self.extracted_at,
"is_live": self.is_live,
"response_time_ms": self.response_time_ms,
"checked_at": self.checked_at,
"bitrate": self.bitrate,
}

View file

@ -1,19 +1,25 @@
"""Extraction service - manages extraction lifecycle: polling, caching, serving."""
"""Extraction service - manages extraction lifecycle: polling, caching, health checking, serving."""
import logging
from datetime import datetime, timezone
from backend.extractors.models import ExtractedStream
from backend.extractors.registry import ExtractorRegistry
from backend.health import StreamHealthChecker
logger = logging.getLogger(__name__)
class ExtractionService:
"""Manages the extraction lifecycle: polling, caching, and serving results.
"""Manages the extraction lifecycle: polling, caching, health checking, and serving.
Extraction runs on a background schedule (via APScheduler), never on
client request path. Results are cached in memory, keyed by site_key.
client request path. After extraction, health checks verify each stream
is live. Results are cached in memory, keyed by site_key.
GET /streams only returns streams that passed health checks, sorted by:
1. is_live (live streams first)
2. response_time_ms (fastest first)
"""
def __init__(self, registry: ExtractorRegistry) -> None:
@ -22,18 +28,36 @@ class ExtractionService:
self._cache: dict[str, list[ExtractedStream]] = {}
self._last_run: str | None = None
self._last_run_stream_count: int = 0
self._health_checker = StreamHealthChecker()
async def run_extraction(self) -> None:
"""Run all extractors and cache their results.
"""Run all extractors, health-check results, and cache them.
This is called by the background scheduler. Each extractor's
results replace its previous cache entry entirely.
results replace its previous cache entry entirely. After extraction,
health checks are run to verify streams are live and measure
response times.
"""
logger.info("Starting extraction run...")
start = datetime.now(timezone.utc)
streams = await self._registry.extract_all()
# Run health checks on all extracted streams
if streams:
stream_dicts = [s.to_dict() for s in streams]
health_map = await self._health_checker.check_all(stream_dicts)
# Update stream objects with health check results
for stream in streams:
health = health_map.get(stream.url)
if health:
stream.is_live = health.is_live
stream.response_time_ms = health.response_time_ms
stream.checked_at = health.checked_at
if health.bitrate > 0:
stream.bitrate = health.bitrate
# Group streams by site_key and update cache
new_cache: dict[str, list[ExtractedStream]] = {}
for stream in streams:
@ -52,29 +76,68 @@ class ExtractionService:
self._last_run = start.isoformat()
self._last_run_stream_count = len(streams)
live_count = sum(
1 for streams_list in self._cache.values()
for s in streams_list if s.is_live
)
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
logger.info(
"Extraction run complete: %d stream(s) from %d extractor(s) in %.1fs",
"Extraction run complete: %d stream(s) from %d extractor(s) in %.1fs (%d live)",
len(streams),
len(new_cache),
elapsed,
live_count,
)
def get_streams(self) -> list[dict]:
"""Return all cached streams as a flat list of dicts.
"""Return all cached streams as a sorted list of dicts.
Only returns streams that passed health checks (is_live=True).
Sorted by fallback priority:
1. is_live (live streams first) - filters to live only
2. response_time_ms (fastest first)
Returns:
List of serialized ExtractedStream dicts from all extractors.
List of serialized ExtractedStream dicts from all extractors,
filtered to live-only and sorted by response time.
"""
all_streams: list[dict] = []
all_streams: list[ExtractedStream] = []
for streams in self._cache.values():
all_streams.extend(s.to_dict() for s in streams)
return all_streams
all_streams.extend(streams)
# Sort by fallback priority: live first, then fastest response
all_streams.sort(
key=lambda s: (not s.is_live, s.response_time_ms)
)
# Only return live streams to clients
live_streams = [s for s in all_streams if s.is_live]
return [s.to_dict() for s in live_streams]
def get_all_streams_unfiltered(self) -> list[dict]:
"""Return ALL cached streams including unhealthy ones.
Used for debugging and status endpoints. Sorted by fallback priority
but includes streams that failed health checks.
Returns:
List of all serialized ExtractedStream dicts.
"""
all_streams: list[ExtractedStream] = []
for streams in self._cache.values():
all_streams.extend(streams)
# Sort by fallback priority: live first, then fastest response
all_streams.sort(
key=lambda s: (not s.is_live, s.response_time_ms)
)
return [s.to_dict() for s in all_streams]
def get_streams_for_session(self, session_type: str) -> list[dict]:
"""Return cached streams filtered/annotated for a specific session type.
Currently returns all streams (extractors don't yet differentiate by
Currently returns all live streams (extractors don't yet differentiate by
session type). This method exists as a hook for future filtering,
e.g., some extractors might only have race streams but not FP streams.
@ -82,7 +145,7 @@ class ExtractionService:
session_type: The F1 session type (e.g., "race", "qualifying", "fp1").
Returns:
List of serialized ExtractedStream dicts.
List of serialized ExtractedStream dicts (live only, sorted).
"""
# For now, all streams are potentially relevant to any session.
# Future extractors may tag streams with session types, at which
@ -103,19 +166,26 @@ class ExtractionService:
for info in extractor_list:
key = info["site_key"]
cached = self._cache.get(key, [])
live_count = sum(1 for s in cached if s.is_live)
extractor_statuses.append(
{
"site_key": key,
"site_name": info["site_name"],
"cached_streams": len(cached),
"live_streams": live_count,
}
)
total_cached = sum(len(streams) for streams in self._cache.values())
total_live = sum(
1 for streams in self._cache.values()
for s in streams if s.is_live
)
return {
"extractors": extractor_statuses,
"total_cached_streams": sum(
len(streams) for streams in self._cache.values()
),
"total_cached_streams": total_cached,
"total_live_streams": total_live,
"last_run": self._last_run,
"last_run_stream_count": self._last_run_stream_count,
}

View file

@ -0,0 +1,233 @@
"""Stream health checker - verifies extracted streams are live and responsive.
Performs GET requests against m3u8 URLs to verify they contain valid HLS
playlists (#EXTM3U header), measures response times for quality ranking,
and supports concurrent checking of multiple streams.
"""
import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
import httpx
logger = logging.getLogger(__name__)
# How long to wait for a single health check (seconds)
HEALTH_CHECK_TIMEOUT = 10.0
# Maximum bytes to read when verifying m3u8 content
# We only need to see the #EXTM3U header and a few lines
MAX_CONTENT_BYTES = 8192
# User-Agent to send with health check requests
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
@dataclass
class StreamHealth:
"""Result of a single stream health check."""
url: str
is_live: bool
response_time_ms: int # Lower = better quality indicator
checked_at: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
error: str = "" # Error message if not live
bitrate: int = 0 # Bitrate in bps if detectable from playlist
def to_dict(self) -> dict:
"""Serialize to a plain dictionary for JSON responses."""
return {
"url": self.url,
"is_live": self.is_live,
"response_time_ms": self.response_time_ms,
"checked_at": self.checked_at,
"error": self.error,
"bitrate": self.bitrate,
}
def _extract_bitrate(content: str) -> int:
"""Try to extract bitrate from m3u8 playlist content.
Looks for BANDWIDTH= in #EXT-X-STREAM-INF tags. Returns the highest
bitrate found, or 0 if none detected.
"""
max_bitrate = 0
for line in content.splitlines():
if "BANDWIDTH=" in line:
try:
# Parse BANDWIDTH=<number> from the tag
for part in line.split(","):
part = part.strip()
if part.startswith("BANDWIDTH="):
bw = int(part.split("=", 1)[1])
max_bitrate = max(max_bitrate, bw)
except (ValueError, IndexError):
continue
return max_bitrate
class StreamHealthChecker:
"""Background health checker for extracted streams.
Verifies streams are live by performing a partial GET on the m3u8 URL,
checking for valid HLS content (#EXTM3U header), and measuring response
time as a quality indicator.
"""
def __init__(self, timeout: float = HEALTH_CHECK_TIMEOUT) -> None:
self._timeout = timeout
async def check_stream(self, url: str) -> StreamHealth:
"""Check if a stream URL is live by doing a partial GET on the m3u8.
Verification steps:
1. GET the m3u8 URL (not just HEAD - need to verify playlist content)
2. Check if response contains #EXTM3U header
3. Measure response time as a quality indicator
4. Extract bitrate info if available
Args:
url: The m3u8 stream URL to check.
Returns:
StreamHealth with is_live, response_time_ms, checked_at, and
optional bitrate and error information.
"""
start_time = time.monotonic()
checked_at = datetime.now(timezone.utc).isoformat()
try:
async with httpx.AsyncClient(
timeout=self._timeout,
follow_redirects=True,
headers={
"User-Agent": USER_AGENT,
"Accept": "*/*",
},
) as client:
# Use a partial GET with Range header to limit download
# but fall back to reading limited bytes if Range not supported
response = await client.get(
url,
headers={"Range": f"bytes=0-{MAX_CONTENT_BYTES - 1}"},
)
elapsed_ms = int((time.monotonic() - start_time) * 1000)
# Accept 200 (full content) or 206 (partial content)
if response.status_code not in (200, 206):
return StreamHealth(
url=url,
is_live=False,
response_time_ms=elapsed_ms,
checked_at=checked_at,
error=f"HTTP {response.status_code}",
)
content = response.text[:MAX_CONTENT_BYTES]
# Verify it's a valid HLS playlist
if "#EXTM3U" not in content:
return StreamHealth(
url=url,
is_live=False,
response_time_ms=elapsed_ms,
checked_at=checked_at,
error="Response does not contain #EXTM3U header",
)
# Extract bitrate info if available
bitrate = _extract_bitrate(content)
return StreamHealth(
url=url,
is_live=True,
response_time_ms=elapsed_ms,
checked_at=checked_at,
bitrate=bitrate,
)
except httpx.TimeoutException:
elapsed_ms = int((time.monotonic() - start_time) * 1000)
logger.debug("Health check timed out for %s", url)
return StreamHealth(
url=url,
is_live=False,
response_time_ms=elapsed_ms,
checked_at=checked_at,
error="Timeout",
)
except httpx.HTTPError as e:
elapsed_ms = int((time.monotonic() - start_time) * 1000)
logger.debug("Health check HTTP error for %s: %s", url, e)
return StreamHealth(
url=url,
is_live=False,
response_time_ms=elapsed_ms,
checked_at=checked_at,
error=f"HTTP error: {e}",
)
except Exception as e:
elapsed_ms = int((time.monotonic() - start_time) * 1000)
logger.exception("Unexpected error during health check for %s", url)
return StreamHealth(
url=url,
is_live=False,
response_time_ms=elapsed_ms,
checked_at=checked_at,
error=f"Unexpected error: {e}",
)
async def check_all(
self, streams: list[dict],
) -> dict[str, StreamHealth]:
"""Check all streams concurrently, return health map keyed by URL.
Args:
streams: List of stream dicts (must have a "url" key).
Returns:
Dictionary mapping stream URL to its StreamHealth result.
"""
urls = [s["url"] for s in streams if "url" in s]
if not urls:
return {}
logger.info("Running health checks on %d stream(s)...", len(urls))
# Run all checks concurrently
tasks = [self.check_stream(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
health_map: dict[str, StreamHealth] = {}
for url, result in zip(urls, results):
if isinstance(result, Exception):
logger.error("Health check task failed for %s: %s", url, result)
health_map[url] = StreamHealth(
url=url,
is_live=False,
response_time_ms=0,
error=f"Task error: {result}",
)
else:
health_map[url] = result
live_count = sum(1 for h in health_map.values() if h.is_live)
logger.info(
"Health checks complete: %d/%d streams are live",
live_count,
len(health_map),
)
return health_map

View file

@ -0,0 +1,264 @@
"""m3u8 playlist rewriter - rewrites URIs in HLS playlists to go through the proxy.
Handles both master playlists (containing variant stream references) and
media playlists (containing segment URLs). Resolves relative URIs to
absolute before encoding, and routes .m3u8 references through /proxy
while routing segments (.ts, .m4s, etc.) through /relay.
"""
import base64
import logging
import re
from urllib.parse import urljoin
logger = logging.getLogger(__name__)
def encode_url(url: str) -> str:
"""Base64url-encode a URL for safe transport as a query parameter.
Uses URL-safe base64 encoding with padding stripped to avoid
double-encoding issues when the URL contains special characters.
Args:
url: The raw URL to encode.
Returns:
Base64url-encoded string with padding removed.
"""
return base64.urlsafe_b64encode(url.encode()).decode().rstrip("=")
def decode_url(encoded: str) -> str:
"""Decode a base64url-encoded URL.
Re-adds padding that was stripped during encoding.
Args:
encoded: Base64url-encoded string (padding may be stripped).
Returns:
The original URL string.
Raises:
ValueError: If the encoded string is not valid base64url.
"""
# Add padding back - base64 requires length to be multiple of 4
padding = 4 - len(encoded) % 4
if padding != 4:
encoded += "=" * padding
return base64.urlsafe_b64decode(encoded).decode()
def _resolve_uri(uri: str, base_url: str) -> str:
"""Resolve a potentially relative URI against a base URL.
Args:
uri: The URI from the m3u8 playlist (may be relative or absolute).
base_url: The URL of the playlist itself (used as base for relative URIs).
Returns:
Absolute URL.
"""
if uri.startswith("http://") or uri.startswith("https://"):
return uri
return urljoin(base_url, uri)
def _is_playlist_uri(uri: str) -> bool:
"""Determine if a URI likely points to another playlist (vs a segment).
Playlist URIs end in .m3u8 or .m3u. Everything else is treated as a
segment (TS, fMP4, init segment, etc.).
Args:
uri: The URI to classify.
Returns:
True if the URI appears to be a playlist reference.
"""
# Strip query string for extension check
path = uri.split("?")[0].split("#")[0].lower()
return path.endswith(".m3u8") or path.endswith(".m3u")
def _build_proxy_url(absolute_uri: str, proxy_base: str) -> str:
"""Build a /proxy URL for a playlist reference.
Args:
absolute_uri: The absolute URL of the upstream playlist.
proxy_base: The base URL of our proxy service.
Returns:
Rewritten URL pointing to our /proxy endpoint.
"""
encoded = encode_url(absolute_uri)
return f"{proxy_base}/proxy?url={encoded}"
def _build_relay_url(absolute_uri: str, proxy_base: str) -> str:
"""Build a /relay URL for a segment reference.
Args:
absolute_uri: The absolute URL of the upstream segment.
proxy_base: The base URL of our proxy service.
Returns:
Rewritten URL pointing to our /relay endpoint.
"""
encoded = encode_url(absolute_uri)
return f"{proxy_base}/relay?url={encoded}"
def _rewrite_uri(uri: str, base_url: str, proxy_base: str) -> str:
"""Rewrite a single URI from an m3u8 playlist.
Resolves relative URIs, then routes playlists through /proxy and
segments through /relay.
Args:
uri: The raw URI from the playlist.
base_url: The URL of the playlist containing this URI.
proxy_base: The base URL of our proxy service.
Returns:
Rewritten URI pointing to our proxy.
"""
absolute = _resolve_uri(uri, base_url)
if _is_playlist_uri(uri):
return _build_proxy_url(absolute, proxy_base)
return _build_relay_url(absolute, proxy_base)
def rewrite_playlist(content: str, base_url: str, proxy_base: str) -> str:
"""Rewrite all URIs in an m3u8 playlist to go through the proxy.
Handles both master playlists (with #EXT-X-STREAM-INF variant
references) and media playlists (with segment URIs). Also handles
#EXT-X-MAP:URI= init segment references.
Args:
content: The raw m3u8 playlist text.
base_url: The original URL of this playlist (for resolving relative URIs).
proxy_base: The base URL of our proxy (e.g., "https://f1.viktorbarzin.me").
Returns:
The rewritten m3u8 playlist text with all URIs proxied.
"""
proxy_base = proxy_base.rstrip("/")
lines = content.splitlines()
output_lines: list[str] = []
# Track if the previous line was #EXT-X-STREAM-INF (next line is a variant URI)
next_is_variant = False
for line in lines:
stripped = line.strip()
# Handle #EXT-X-MAP:URI="..." (init segment)
if stripped.startswith("#EXT-X-MAP:"):
output_lines.append(_rewrite_ext_x_map(stripped, base_url, proxy_base))
continue
# Handle #EXT-X-STREAM-INF (marks next line as variant playlist URI)
if stripped.startswith("#EXT-X-STREAM-INF:"):
output_lines.append(line)
next_is_variant = True
continue
# Handle #EXT-X-MEDIA with URI= attribute
if stripped.startswith("#EXT-X-MEDIA:") and "URI=" in stripped:
output_lines.append(_rewrite_ext_x_media(stripped, base_url, proxy_base))
continue
# Handle #EXT-X-I-FRAME-STREAM-INF with URI= attribute
if stripped.startswith("#EXT-X-I-FRAME-STREAM-INF:") and "URI=" in stripped:
output_lines.append(
_rewrite_tag_with_uri(stripped, base_url, proxy_base, is_playlist=True)
)
continue
# If previous line was #EXT-X-STREAM-INF, this line is a variant playlist URI
if next_is_variant and stripped and not stripped.startswith("#"):
absolute = _resolve_uri(stripped, base_url)
output_lines.append(_build_proxy_url(absolute, proxy_base))
next_is_variant = False
continue
# Regular URI line (non-comment, non-empty, not a tag)
if stripped and not stripped.startswith("#"):
# This is a segment URI (TS, fMP4, etc.)
absolute = _resolve_uri(stripped, base_url)
output_lines.append(_build_relay_url(absolute, proxy_base))
continue
# Tags and comments pass through unchanged
output_lines.append(line)
# Reset variant flag if we hit another tag
if stripped.startswith("#") and not stripped.startswith("#EXT-X-STREAM-INF:"):
next_is_variant = False
return "\n".join(output_lines)
def _rewrite_ext_x_map(line: str, base_url: str, proxy_base: str) -> str:
"""Rewrite the URI in an #EXT-X-MAP tag.
#EXT-X-MAP:URI="init.mp4" -> #EXT-X-MAP:URI="<relay_url>"
The init segment goes through /relay since it's binary data.
"""
# Match URI="..." or URI=... (with or without quotes)
match = re.search(r'URI="([^"]+)"', line)
if not match:
match = re.search(r"URI=([^,\s]+)", line)
if not match:
return line
original_uri = match.group(1)
absolute = _resolve_uri(original_uri, base_url)
relay_url = _build_relay_url(absolute, proxy_base)
return line[:match.start(1)] + relay_url + line[match.end(1):]
def _rewrite_ext_x_media(line: str, base_url: str, proxy_base: str) -> str:
"""Rewrite the URI in an #EXT-X-MEDIA tag.
#EXT-X-MEDIA:TYPE=AUDIO,...,URI="audio.m3u8" -> rewrite URI to /proxy
"""
return _rewrite_tag_with_uri(line, base_url, proxy_base, is_playlist=True)
def _rewrite_tag_with_uri(
line: str, base_url: str, proxy_base: str, is_playlist: bool = False,
) -> str:
"""Rewrite the URI attribute within an HLS tag line.
Generic handler for any tag that contains a URI="..." attribute.
Args:
line: The full tag line.
base_url: Base URL for resolving relative URIs.
proxy_base: Our proxy base URL.
is_playlist: If True, route through /proxy; otherwise /relay.
Returns:
The tag line with the URI rewritten.
"""
match = re.search(r'URI="([^"]+)"', line)
if not match:
match = re.search(r"URI=([^,\s]+)", line)
if not match:
return line
original_uri = match.group(1)
absolute = _resolve_uri(original_uri, base_url)
if is_playlist:
new_url = _build_proxy_url(absolute, proxy_base)
else:
new_url = _build_relay_url(absolute, proxy_base)
return line[:match.start(1)] + new_url + line[match.end(1):]

View file

@ -1,4 +1,4 @@
"""F1 Streams - FastAPI backend with schedule and stream extraction services."""
"""F1 Streams - FastAPI backend with schedule, stream extraction, health checking, and HLS proxy."""
import logging
from contextlib import asynccontextmanager
@ -6,9 +6,12 @@ from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from fastapi import FastAPI
from fastapi import FastAPI, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import Response, StreamingResponse
from backend.extractors import create_extraction_service
from backend.proxy import proxy_playlist, relay_stream
from backend.schedule import ScheduleService
logging.basicConfig(
@ -108,6 +111,16 @@ async def lifespan(app: FastAPI):
app = FastAPI(title="F1 Streams", lifespan=lifespan)
# --- CORS Middleware ---
# Required for browser-based HLS players to access proxy/relay endpoints
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "OPTIONS"],
allow_headers=["Range"],
expose_headers=["Content-Range", "Content-Length", "Content-Type"],
)
# --- Health & Info ---
@ -119,7 +132,7 @@ async def health():
@app.get("/")
async def root():
return {"service": "f1-streams", "version": "3.0.0"}
return {"service": "f1-streams", "version": "4.0.0"}
# --- Schedule ---
@ -143,7 +156,12 @@ async def refresh_schedule():
@app.get("/streams")
async def get_streams():
"""Return all currently cached streams from all extractors."""
"""Return all currently cached streams that passed health checks.
Streams are sorted by fallback priority:
1. Live streams only (is_live=True)
2. Fastest response time first (lowest response_time_ms)
"""
streams = extraction_service.get_streams()
return {
"streams": streams,
@ -151,6 +169,20 @@ async def get_streams():
}
@app.get("/streams/all")
async def get_all_streams():
"""Return ALL cached streams including unhealthy ones (for debugging).
Unlike GET /streams, this endpoint includes streams that failed health
checks. Useful for diagnosing extraction or health check issues.
"""
streams = extraction_service.get_all_streams_unfiltered()
return {
"streams": streams,
"count": len(streams),
}
@app.get("/extractors")
async def get_extractors():
"""List registered extractors and their current status."""
@ -165,10 +197,82 @@ async def trigger_extraction():
return {
"status": "extraction_complete",
"streams_found": status["total_cached_streams"],
"live_streams": status["total_live_streams"],
"extractors_run": len(status["extractors"]),
}
# --- HLS Proxy ---
def _get_proxy_base(request: Request) -> str:
"""Derive the proxy base URL from the incoming request.
Uses X-Forwarded-Proto and X-Forwarded-Host headers if present
(behind a reverse proxy), otherwise falls back to request URL.
"""
proto = request.headers.get("x-forwarded-proto", request.url.scheme)
host = request.headers.get("x-forwarded-host", request.url.netloc)
return f"{proto}://{host}"
@app.get("/proxy")
async def proxy_endpoint(
request: Request,
url: str = Query(..., description="Base64url-encoded m3u8 playlist URL"),
):
"""Proxy an upstream m3u8 playlist with URI rewriting.
Fetches the upstream m3u8 playlist, rewrites all URIs to route through
our /proxy (for sub-playlists) and /relay (for segments) endpoints,
and returns the rewritten playlist.
The `url` parameter must be base64url-encoded to avoid URL encoding issues.
Example:
GET /proxy?url=aHR0cHM6Ly9leGFtcGxlLmNvbS9zdHJlYW0ubTN1OA
"""
proxy_base = _get_proxy_base(request)
rewritten = await proxy_playlist(url, proxy_base)
return Response(
content=rewritten,
media_type="application/vnd.apple.mpegurl",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
},
)
@app.get("/relay")
async def relay_endpoint(
request: Request,
url: str = Query(..., description="Base64url-encoded segment URL"),
):
"""Relay an upstream media segment as a chunked byte stream.
Fetches the upstream segment (TS, fMP4, init segment, etc.) and streams
it to the client using chunked transfer encoding. Never buffers the
full segment in memory.
The `url` parameter must be base64url-encoded to avoid URL encoding issues.
Supports HTTP Range requests for seeking.
Example:
GET /relay?url=aHR0cHM6Ly9leGFtcGxlLmNvbS9zZWdtZW50LnRz
"""
range_header = request.headers.get("range")
stream_gen, headers, status_code = await relay_stream(url, range_header)
return StreamingResponse(
stream_gen,
status_code=status_code,
headers=headers,
)
if __name__ == "__main__":
import uvicorn

View file

@ -0,0 +1,229 @@
"""HLS proxy - fetches upstream m3u8 playlists and relays media segments.
Two core functions:
1. Playlist proxy: fetches an upstream m3u8 playlist, rewrites all URIs
to route through our /proxy and /relay endpoints, returns the rewritten
playlist to the client.
2. Segment relay: fetches an upstream media segment (TS, fMP4, init) and
streams it to the client using chunked transfer encoding, never buffering
the full segment in memory.
All responses include CORS headers for browser playback.
"""
import logging
from typing import AsyncGenerator
import httpx
from fastapi import HTTPException
from backend.m3u8_rewriter import decode_url, rewrite_playlist
logger = logging.getLogger(__name__)
# Chunk size for relay streaming (64 KB)
RELAY_CHUNK_SIZE = 65536
# Timeout for upstream playlist fetches (seconds)
PLAYLIST_TIMEOUT = 15.0
# Timeout for upstream segment relay - longer because segments are bigger
RELAY_TIMEOUT = 30.0
# User-Agent for upstream requests
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
async def proxy_playlist(encoded_url: str, proxy_base: str) -> str:
"""Fetch an upstream m3u8 playlist and rewrite all URIs through our proxy.
Args:
encoded_url: Base64url-encoded URL of the upstream m3u8 playlist.
proxy_base: The base URL of our proxy service for rewriting URIs
(e.g., "https://f1.viktorbarzin.me").
Returns:
The rewritten m3u8 playlist text.
Raises:
HTTPException: If the URL can't be decoded, upstream fails, or
content is not a valid HLS playlist.
"""
# Decode the URL
try:
url = decode_url(encoded_url)
except Exception as e:
logger.error("Failed to decode proxy URL: %s", e)
raise HTTPException(status_code=400, detail=f"Invalid encoded URL: {e}")
logger.info("Proxying playlist: %s", url)
# Fetch the upstream playlist
try:
async with httpx.AsyncClient(
timeout=PLAYLIST_TIMEOUT,
follow_redirects=True,
headers={
"User-Agent": USER_AGENT,
"Accept": "*/*",
},
) as client:
response = await client.get(url)
if response.status_code != 200:
logger.warning(
"Upstream playlist returned HTTP %d for %s",
response.status_code,
url,
)
raise HTTPException(
status_code=502,
detail=f"Upstream returned HTTP {response.status_code}",
)
content = response.text
except httpx.TimeoutException:
logger.error("Timeout fetching upstream playlist: %s", url)
raise HTTPException(status_code=504, detail="Upstream playlist timeout")
except httpx.HTTPError as e:
logger.error("HTTP error fetching upstream playlist: %s - %s", url, e)
raise HTTPException(status_code=502, detail=f"Upstream error: {e}")
except HTTPException:
raise
except Exception as e:
logger.exception("Unexpected error fetching playlist: %s", url)
raise HTTPException(status_code=500, detail=f"Internal error: {e}")
# Validate it looks like an m3u8 playlist
if "#EXTM3U" not in content:
logger.warning("Upstream response is not a valid m3u8 playlist: %s", url)
raise HTTPException(
status_code=502,
detail="Upstream response is not a valid HLS playlist",
)
# Rewrite all URIs to go through our proxy
rewritten = rewrite_playlist(content, url, proxy_base)
logger.debug(
"Proxied playlist from %s: %d bytes -> %d bytes",
url,
len(content),
len(rewritten),
)
return rewritten
async def relay_stream(
encoded_url: str,
range_header: str | None = None,
) -> tuple[AsyncGenerator[bytes, None], dict[str, str], int]:
"""Relay an upstream media segment as a chunked byte stream.
Never buffers the full segment in memory. Streams chunks as they
arrive from the upstream server.
Args:
encoded_url: Base64url-encoded URL of the upstream segment.
range_header: Optional HTTP Range header from the client to
forward to upstream.
Returns:
A tuple of (async_generator, headers_dict, status_code) where:
- async_generator yields bytes chunks
- headers_dict contains content-type and other relevant headers
- status_code is the HTTP status (200 or 206)
Raises:
HTTPException: If the URL can't be decoded or upstream fails.
"""
# Decode the URL
try:
url = decode_url(encoded_url)
except Exception as e:
logger.error("Failed to decode relay URL: %s", e)
raise HTTPException(status_code=400, detail=f"Invalid encoded URL: {e}")
logger.debug("Relaying segment: %s", url)
# Build upstream request headers
headers = {
"User-Agent": USER_AGENT,
"Accept": "*/*",
}
if range_header:
headers["Range"] = range_header
# Create the client and stream - caller is responsible for cleanup
# via the async generator protocol
client = httpx.AsyncClient(
timeout=RELAY_TIMEOUT,
follow_redirects=True,
)
try:
response = await client.send(
client.build_request("GET", url, headers=headers),
stream=True,
)
if response.status_code not in (200, 206):
await response.aclose()
await client.aclose()
logger.warning(
"Upstream segment returned HTTP %d for %s",
response.status_code,
url,
)
raise HTTPException(
status_code=502,
detail=f"Upstream returned HTTP {response.status_code}",
)
# Collect relevant response headers to forward
response_headers: dict[str, str] = {}
content_type = response.headers.get("content-type", "video/mp2t")
response_headers["Content-Type"] = content_type
if "content-length" in response.headers:
response_headers["Content-Length"] = response.headers["content-length"]
if "content-range" in response.headers:
response_headers["Content-Range"] = response.headers["content-range"]
status_code = response.status_code
async def _stream_chunks() -> AsyncGenerator[bytes, None]:
"""Yield chunks from the upstream response, then clean up."""
try:
async for chunk in response.aiter_bytes(chunk_size=RELAY_CHUNK_SIZE):
yield chunk
except Exception as e:
logger.error("Error streaming segment from %s: %s", url, e)
finally:
await response.aclose()
await client.aclose()
return _stream_chunks(), response_headers, status_code
except HTTPException:
raise
except httpx.TimeoutException:
await client.aclose()
logger.error("Timeout relaying segment: %s", url)
raise HTTPException(status_code=504, detail="Upstream segment timeout")
except httpx.HTTPError as e:
await client.aclose()
logger.error("HTTP error relaying segment: %s - %s", url, e)
raise HTTPException(status_code=502, detail=f"Upstream error: {e}")
except Exception as e:
await client.aclose()
logger.exception("Unexpected error relaying segment: %s", url)
raise HTTPException(status_code=500, detail=f"Internal error: {e}")