[ci skip] f1-stream: add stream health checker and HLS proxy (Phases 4-5)
Phase 4 - Stream Health and Fallback: - StreamHealthChecker with partial GET validation of m3u8 content - Bitrate extraction from BANDWIDTH tags - Response time measurement for quality ranking - Fallback ordering: live first, fastest response time first - GET /streams now only returns health-verified streams Phase 5 - HLS Proxy Core: - GET /proxy?url= - m3u8 playlist fetch with full URI rewriting - GET /relay?url= - chunked segment relay (never buffers full segment) - m3u8 rewriter handles master, variant, and segment URIs - Base64url encoding for URL parameters - CORS middleware for browser playback - Range header forwarding for seeking support
This commit is contained in:
parent
a9a4ac37a2
commit
6867036087
6 changed files with 926 additions and 20 deletions
|
|
@ -15,6 +15,9 @@ class ExtractedStream:
|
||||||
title: str = "" # e.g., "F1 Race Live"
|
title: str = "" # e.g., "F1 Race Live"
|
||||||
extracted_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
|
extracted_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
|
||||||
is_live: bool = False # Whether it passed health check
|
is_live: bool = False # Whether it passed health check
|
||||||
|
response_time_ms: int = 0 # Health check response time (lower = better)
|
||||||
|
checked_at: str = "" # ISO timestamp of last health check
|
||||||
|
bitrate: int = 0 # Bitrate in bps if detectable from m3u8 playlist
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
def to_dict(self) -> dict:
|
||||||
"""Serialize to a plain dictionary for JSON responses."""
|
"""Serialize to a plain dictionary for JSON responses."""
|
||||||
|
|
@ -26,4 +29,7 @@ class ExtractedStream:
|
||||||
"title": self.title,
|
"title": self.title,
|
||||||
"extracted_at": self.extracted_at,
|
"extracted_at": self.extracted_at,
|
||||||
"is_live": self.is_live,
|
"is_live": self.is_live,
|
||||||
|
"response_time_ms": self.response_time_ms,
|
||||||
|
"checked_at": self.checked_at,
|
||||||
|
"bitrate": self.bitrate,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,19 +1,25 @@
|
||||||
"""Extraction service - manages extraction lifecycle: polling, caching, serving."""
|
"""Extraction service - manages extraction lifecycle: polling, caching, health checking, serving."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
from backend.extractors.models import ExtractedStream
|
from backend.extractors.models import ExtractedStream
|
||||||
from backend.extractors.registry import ExtractorRegistry
|
from backend.extractors.registry import ExtractorRegistry
|
||||||
|
from backend.health import StreamHealthChecker
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ExtractionService:
|
class ExtractionService:
|
||||||
"""Manages the extraction lifecycle: polling, caching, and serving results.
|
"""Manages the extraction lifecycle: polling, caching, health checking, and serving.
|
||||||
|
|
||||||
Extraction runs on a background schedule (via APScheduler), never on
|
Extraction runs on a background schedule (via APScheduler), never on
|
||||||
client request path. Results are cached in memory, keyed by site_key.
|
client request path. After extraction, health checks verify each stream
|
||||||
|
is live. Results are cached in memory, keyed by site_key.
|
||||||
|
|
||||||
|
GET /streams only returns streams that passed health checks, sorted by:
|
||||||
|
1. is_live (live streams first)
|
||||||
|
2. response_time_ms (fastest first)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, registry: ExtractorRegistry) -> None:
|
def __init__(self, registry: ExtractorRegistry) -> None:
|
||||||
|
|
@ -22,18 +28,36 @@ class ExtractionService:
|
||||||
self._cache: dict[str, list[ExtractedStream]] = {}
|
self._cache: dict[str, list[ExtractedStream]] = {}
|
||||||
self._last_run: str | None = None
|
self._last_run: str | None = None
|
||||||
self._last_run_stream_count: int = 0
|
self._last_run_stream_count: int = 0
|
||||||
|
self._health_checker = StreamHealthChecker()
|
||||||
|
|
||||||
async def run_extraction(self) -> None:
|
async def run_extraction(self) -> None:
|
||||||
"""Run all extractors and cache their results.
|
"""Run all extractors, health-check results, and cache them.
|
||||||
|
|
||||||
This is called by the background scheduler. Each extractor's
|
This is called by the background scheduler. Each extractor's
|
||||||
results replace its previous cache entry entirely.
|
results replace its previous cache entry entirely. After extraction,
|
||||||
|
health checks are run to verify streams are live and measure
|
||||||
|
response times.
|
||||||
"""
|
"""
|
||||||
logger.info("Starting extraction run...")
|
logger.info("Starting extraction run...")
|
||||||
start = datetime.now(timezone.utc)
|
start = datetime.now(timezone.utc)
|
||||||
|
|
||||||
streams = await self._registry.extract_all()
|
streams = await self._registry.extract_all()
|
||||||
|
|
||||||
|
# Run health checks on all extracted streams
|
||||||
|
if streams:
|
||||||
|
stream_dicts = [s.to_dict() for s in streams]
|
||||||
|
health_map = await self._health_checker.check_all(stream_dicts)
|
||||||
|
|
||||||
|
# Update stream objects with health check results
|
||||||
|
for stream in streams:
|
||||||
|
health = health_map.get(stream.url)
|
||||||
|
if health:
|
||||||
|
stream.is_live = health.is_live
|
||||||
|
stream.response_time_ms = health.response_time_ms
|
||||||
|
stream.checked_at = health.checked_at
|
||||||
|
if health.bitrate > 0:
|
||||||
|
stream.bitrate = health.bitrate
|
||||||
|
|
||||||
# Group streams by site_key and update cache
|
# Group streams by site_key and update cache
|
||||||
new_cache: dict[str, list[ExtractedStream]] = {}
|
new_cache: dict[str, list[ExtractedStream]] = {}
|
||||||
for stream in streams:
|
for stream in streams:
|
||||||
|
|
@ -52,29 +76,68 @@ class ExtractionService:
|
||||||
self._last_run = start.isoformat()
|
self._last_run = start.isoformat()
|
||||||
self._last_run_stream_count = len(streams)
|
self._last_run_stream_count = len(streams)
|
||||||
|
|
||||||
|
live_count = sum(
|
||||||
|
1 for streams_list in self._cache.values()
|
||||||
|
for s in streams_list if s.is_live
|
||||||
|
)
|
||||||
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
|
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
|
||||||
logger.info(
|
logger.info(
|
||||||
"Extraction run complete: %d stream(s) from %d extractor(s) in %.1fs",
|
"Extraction run complete: %d stream(s) from %d extractor(s) in %.1fs (%d live)",
|
||||||
len(streams),
|
len(streams),
|
||||||
len(new_cache),
|
len(new_cache),
|
||||||
elapsed,
|
elapsed,
|
||||||
|
live_count,
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_streams(self) -> list[dict]:
|
def get_streams(self) -> list[dict]:
|
||||||
"""Return all cached streams as a flat list of dicts.
|
"""Return all cached streams as a sorted list of dicts.
|
||||||
|
|
||||||
|
Only returns streams that passed health checks (is_live=True).
|
||||||
|
Sorted by fallback priority:
|
||||||
|
1. is_live (live streams first) - filters to live only
|
||||||
|
2. response_time_ms (fastest first)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of serialized ExtractedStream dicts from all extractors.
|
List of serialized ExtractedStream dicts from all extractors,
|
||||||
|
filtered to live-only and sorted by response time.
|
||||||
"""
|
"""
|
||||||
all_streams: list[dict] = []
|
all_streams: list[ExtractedStream] = []
|
||||||
for streams in self._cache.values():
|
for streams in self._cache.values():
|
||||||
all_streams.extend(s.to_dict() for s in streams)
|
all_streams.extend(streams)
|
||||||
return all_streams
|
|
||||||
|
# Sort by fallback priority: live first, then fastest response
|
||||||
|
all_streams.sort(
|
||||||
|
key=lambda s: (not s.is_live, s.response_time_ms)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Only return live streams to clients
|
||||||
|
live_streams = [s for s in all_streams if s.is_live]
|
||||||
|
return [s.to_dict() for s in live_streams]
|
||||||
|
|
||||||
|
def get_all_streams_unfiltered(self) -> list[dict]:
|
||||||
|
"""Return ALL cached streams including unhealthy ones.
|
||||||
|
|
||||||
|
Used for debugging and status endpoints. Sorted by fallback priority
|
||||||
|
but includes streams that failed health checks.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of all serialized ExtractedStream dicts.
|
||||||
|
"""
|
||||||
|
all_streams: list[ExtractedStream] = []
|
||||||
|
for streams in self._cache.values():
|
||||||
|
all_streams.extend(streams)
|
||||||
|
|
||||||
|
# Sort by fallback priority: live first, then fastest response
|
||||||
|
all_streams.sort(
|
||||||
|
key=lambda s: (not s.is_live, s.response_time_ms)
|
||||||
|
)
|
||||||
|
|
||||||
|
return [s.to_dict() for s in all_streams]
|
||||||
|
|
||||||
def get_streams_for_session(self, session_type: str) -> list[dict]:
|
def get_streams_for_session(self, session_type: str) -> list[dict]:
|
||||||
"""Return cached streams filtered/annotated for a specific session type.
|
"""Return cached streams filtered/annotated for a specific session type.
|
||||||
|
|
||||||
Currently returns all streams (extractors don't yet differentiate by
|
Currently returns all live streams (extractors don't yet differentiate by
|
||||||
session type). This method exists as a hook for future filtering,
|
session type). This method exists as a hook for future filtering,
|
||||||
e.g., some extractors might only have race streams but not FP streams.
|
e.g., some extractors might only have race streams but not FP streams.
|
||||||
|
|
||||||
|
|
@ -82,7 +145,7 @@ class ExtractionService:
|
||||||
session_type: The F1 session type (e.g., "race", "qualifying", "fp1").
|
session_type: The F1 session type (e.g., "race", "qualifying", "fp1").
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of serialized ExtractedStream dicts.
|
List of serialized ExtractedStream dicts (live only, sorted).
|
||||||
"""
|
"""
|
||||||
# For now, all streams are potentially relevant to any session.
|
# For now, all streams are potentially relevant to any session.
|
||||||
# Future extractors may tag streams with session types, at which
|
# Future extractors may tag streams with session types, at which
|
||||||
|
|
@ -103,19 +166,26 @@ class ExtractionService:
|
||||||
for info in extractor_list:
|
for info in extractor_list:
|
||||||
key = info["site_key"]
|
key = info["site_key"]
|
||||||
cached = self._cache.get(key, [])
|
cached = self._cache.get(key, [])
|
||||||
|
live_count = sum(1 for s in cached if s.is_live)
|
||||||
extractor_statuses.append(
|
extractor_statuses.append(
|
||||||
{
|
{
|
||||||
"site_key": key,
|
"site_key": key,
|
||||||
"site_name": info["site_name"],
|
"site_name": info["site_name"],
|
||||||
"cached_streams": len(cached),
|
"cached_streams": len(cached),
|
||||||
|
"live_streams": live_count,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
total_cached = sum(len(streams) for streams in self._cache.values())
|
||||||
|
total_live = sum(
|
||||||
|
1 for streams in self._cache.values()
|
||||||
|
for s in streams if s.is_live
|
||||||
|
)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"extractors": extractor_statuses,
|
"extractors": extractor_statuses,
|
||||||
"total_cached_streams": sum(
|
"total_cached_streams": total_cached,
|
||||||
len(streams) for streams in self._cache.values()
|
"total_live_streams": total_live,
|
||||||
),
|
|
||||||
"last_run": self._last_run,
|
"last_run": self._last_run,
|
||||||
"last_run_stream_count": self._last_run_stream_count,
|
"last_run_stream_count": self._last_run_stream_count,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
233
stacks/f1-stream/files/backend/health.py
Normal file
233
stacks/f1-stream/files/backend/health.py
Normal file
|
|
@ -0,0 +1,233 @@
|
||||||
|
"""Stream health checker - verifies extracted streams are live and responsive.
|
||||||
|
|
||||||
|
Performs GET requests against m3u8 URLs to verify they contain valid HLS
|
||||||
|
playlists (#EXTM3U header), measures response times for quality ranking,
|
||||||
|
and supports concurrent checking of multiple streams.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# How long to wait for a single health check (seconds)
|
||||||
|
HEALTH_CHECK_TIMEOUT = 10.0
|
||||||
|
|
||||||
|
# Maximum bytes to read when verifying m3u8 content
|
||||||
|
# We only need to see the #EXTM3U header and a few lines
|
||||||
|
MAX_CONTENT_BYTES = 8192
|
||||||
|
|
||||||
|
# User-Agent to send with health check requests
|
||||||
|
USER_AGENT = (
|
||||||
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||||||
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||||||
|
"Chrome/120.0.0.0 Safari/537.36"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StreamHealth:
|
||||||
|
"""Result of a single stream health check."""
|
||||||
|
|
||||||
|
url: str
|
||||||
|
is_live: bool
|
||||||
|
response_time_ms: int # Lower = better quality indicator
|
||||||
|
checked_at: str = field(
|
||||||
|
default_factory=lambda: datetime.now(timezone.utc).isoformat()
|
||||||
|
)
|
||||||
|
error: str = "" # Error message if not live
|
||||||
|
bitrate: int = 0 # Bitrate in bps if detectable from playlist
|
||||||
|
|
||||||
|
def to_dict(self) -> dict:
|
||||||
|
"""Serialize to a plain dictionary for JSON responses."""
|
||||||
|
return {
|
||||||
|
"url": self.url,
|
||||||
|
"is_live": self.is_live,
|
||||||
|
"response_time_ms": self.response_time_ms,
|
||||||
|
"checked_at": self.checked_at,
|
||||||
|
"error": self.error,
|
||||||
|
"bitrate": self.bitrate,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_bitrate(content: str) -> int:
|
||||||
|
"""Try to extract bitrate from m3u8 playlist content.
|
||||||
|
|
||||||
|
Looks for BANDWIDTH= in #EXT-X-STREAM-INF tags. Returns the highest
|
||||||
|
bitrate found, or 0 if none detected.
|
||||||
|
"""
|
||||||
|
max_bitrate = 0
|
||||||
|
for line in content.splitlines():
|
||||||
|
if "BANDWIDTH=" in line:
|
||||||
|
try:
|
||||||
|
# Parse BANDWIDTH=<number> from the tag
|
||||||
|
for part in line.split(","):
|
||||||
|
part = part.strip()
|
||||||
|
if part.startswith("BANDWIDTH="):
|
||||||
|
bw = int(part.split("=", 1)[1])
|
||||||
|
max_bitrate = max(max_bitrate, bw)
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
continue
|
||||||
|
return max_bitrate
|
||||||
|
|
||||||
|
|
||||||
|
class StreamHealthChecker:
|
||||||
|
"""Background health checker for extracted streams.
|
||||||
|
|
||||||
|
Verifies streams are live by performing a partial GET on the m3u8 URL,
|
||||||
|
checking for valid HLS content (#EXTM3U header), and measuring response
|
||||||
|
time as a quality indicator.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, timeout: float = HEALTH_CHECK_TIMEOUT) -> None:
|
||||||
|
self._timeout = timeout
|
||||||
|
|
||||||
|
async def check_stream(self, url: str) -> StreamHealth:
|
||||||
|
"""Check if a stream URL is live by doing a partial GET on the m3u8.
|
||||||
|
|
||||||
|
Verification steps:
|
||||||
|
1. GET the m3u8 URL (not just HEAD - need to verify playlist content)
|
||||||
|
2. Check if response contains #EXTM3U header
|
||||||
|
3. Measure response time as a quality indicator
|
||||||
|
4. Extract bitrate info if available
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url: The m3u8 stream URL to check.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
StreamHealth with is_live, response_time_ms, checked_at, and
|
||||||
|
optional bitrate and error information.
|
||||||
|
"""
|
||||||
|
start_time = time.monotonic()
|
||||||
|
checked_at = datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(
|
||||||
|
timeout=self._timeout,
|
||||||
|
follow_redirects=True,
|
||||||
|
headers={
|
||||||
|
"User-Agent": USER_AGENT,
|
||||||
|
"Accept": "*/*",
|
||||||
|
},
|
||||||
|
) as client:
|
||||||
|
# Use a partial GET with Range header to limit download
|
||||||
|
# but fall back to reading limited bytes if Range not supported
|
||||||
|
response = await client.get(
|
||||||
|
url,
|
||||||
|
headers={"Range": f"bytes=0-{MAX_CONTENT_BYTES - 1}"},
|
||||||
|
)
|
||||||
|
|
||||||
|
elapsed_ms = int((time.monotonic() - start_time) * 1000)
|
||||||
|
|
||||||
|
# Accept 200 (full content) or 206 (partial content)
|
||||||
|
if response.status_code not in (200, 206):
|
||||||
|
return StreamHealth(
|
||||||
|
url=url,
|
||||||
|
is_live=False,
|
||||||
|
response_time_ms=elapsed_ms,
|
||||||
|
checked_at=checked_at,
|
||||||
|
error=f"HTTP {response.status_code}",
|
||||||
|
)
|
||||||
|
|
||||||
|
content = response.text[:MAX_CONTENT_BYTES]
|
||||||
|
|
||||||
|
# Verify it's a valid HLS playlist
|
||||||
|
if "#EXTM3U" not in content:
|
||||||
|
return StreamHealth(
|
||||||
|
url=url,
|
||||||
|
is_live=False,
|
||||||
|
response_time_ms=elapsed_ms,
|
||||||
|
checked_at=checked_at,
|
||||||
|
error="Response does not contain #EXTM3U header",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Extract bitrate info if available
|
||||||
|
bitrate = _extract_bitrate(content)
|
||||||
|
|
||||||
|
return StreamHealth(
|
||||||
|
url=url,
|
||||||
|
is_live=True,
|
||||||
|
response_time_ms=elapsed_ms,
|
||||||
|
checked_at=checked_at,
|
||||||
|
bitrate=bitrate,
|
||||||
|
)
|
||||||
|
|
||||||
|
except httpx.TimeoutException:
|
||||||
|
elapsed_ms = int((time.monotonic() - start_time) * 1000)
|
||||||
|
logger.debug("Health check timed out for %s", url)
|
||||||
|
return StreamHealth(
|
||||||
|
url=url,
|
||||||
|
is_live=False,
|
||||||
|
response_time_ms=elapsed_ms,
|
||||||
|
checked_at=checked_at,
|
||||||
|
error="Timeout",
|
||||||
|
)
|
||||||
|
except httpx.HTTPError as e:
|
||||||
|
elapsed_ms = int((time.monotonic() - start_time) * 1000)
|
||||||
|
logger.debug("Health check HTTP error for %s: %s", url, e)
|
||||||
|
return StreamHealth(
|
||||||
|
url=url,
|
||||||
|
is_live=False,
|
||||||
|
response_time_ms=elapsed_ms,
|
||||||
|
checked_at=checked_at,
|
||||||
|
error=f"HTTP error: {e}",
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
elapsed_ms = int((time.monotonic() - start_time) * 1000)
|
||||||
|
logger.exception("Unexpected error during health check for %s", url)
|
||||||
|
return StreamHealth(
|
||||||
|
url=url,
|
||||||
|
is_live=False,
|
||||||
|
response_time_ms=elapsed_ms,
|
||||||
|
checked_at=checked_at,
|
||||||
|
error=f"Unexpected error: {e}",
|
||||||
|
)
|
||||||
|
|
||||||
|
async def check_all(
|
||||||
|
self, streams: list[dict],
|
||||||
|
) -> dict[str, StreamHealth]:
|
||||||
|
"""Check all streams concurrently, return health map keyed by URL.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
streams: List of stream dicts (must have a "url" key).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary mapping stream URL to its StreamHealth result.
|
||||||
|
"""
|
||||||
|
urls = [s["url"] for s in streams if "url" in s]
|
||||||
|
|
||||||
|
if not urls:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
logger.info("Running health checks on %d stream(s)...", len(urls))
|
||||||
|
|
||||||
|
# Run all checks concurrently
|
||||||
|
tasks = [self.check_stream(url) for url in urls]
|
||||||
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
|
health_map: dict[str, StreamHealth] = {}
|
||||||
|
for url, result in zip(urls, results):
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
logger.error("Health check task failed for %s: %s", url, result)
|
||||||
|
health_map[url] = StreamHealth(
|
||||||
|
url=url,
|
||||||
|
is_live=False,
|
||||||
|
response_time_ms=0,
|
||||||
|
error=f"Task error: {result}",
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
health_map[url] = result
|
||||||
|
|
||||||
|
live_count = sum(1 for h in health_map.values() if h.is_live)
|
||||||
|
logger.info(
|
||||||
|
"Health checks complete: %d/%d streams are live",
|
||||||
|
live_count,
|
||||||
|
len(health_map),
|
||||||
|
)
|
||||||
|
|
||||||
|
return health_map
|
||||||
264
stacks/f1-stream/files/backend/m3u8_rewriter.py
Normal file
264
stacks/f1-stream/files/backend/m3u8_rewriter.py
Normal file
|
|
@ -0,0 +1,264 @@
|
||||||
|
"""m3u8 playlist rewriter - rewrites URIs in HLS playlists to go through the proxy.
|
||||||
|
|
||||||
|
Handles both master playlists (containing variant stream references) and
|
||||||
|
media playlists (containing segment URLs). Resolves relative URIs to
|
||||||
|
absolute before encoding, and routes .m3u8 references through /proxy
|
||||||
|
while routing segments (.ts, .m4s, etc.) through /relay.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from urllib.parse import urljoin
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def encode_url(url: str) -> str:
|
||||||
|
"""Base64url-encode a URL for safe transport as a query parameter.
|
||||||
|
|
||||||
|
Uses URL-safe base64 encoding with padding stripped to avoid
|
||||||
|
double-encoding issues when the URL contains special characters.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url: The raw URL to encode.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Base64url-encoded string with padding removed.
|
||||||
|
"""
|
||||||
|
return base64.urlsafe_b64encode(url.encode()).decode().rstrip("=")
|
||||||
|
|
||||||
|
|
||||||
|
def decode_url(encoded: str) -> str:
|
||||||
|
"""Decode a base64url-encoded URL.
|
||||||
|
|
||||||
|
Re-adds padding that was stripped during encoding.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
encoded: Base64url-encoded string (padding may be stripped).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The original URL string.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If the encoded string is not valid base64url.
|
||||||
|
"""
|
||||||
|
# Add padding back - base64 requires length to be multiple of 4
|
||||||
|
padding = 4 - len(encoded) % 4
|
||||||
|
if padding != 4:
|
||||||
|
encoded += "=" * padding
|
||||||
|
return base64.urlsafe_b64decode(encoded).decode()
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_uri(uri: str, base_url: str) -> str:
|
||||||
|
"""Resolve a potentially relative URI against a base URL.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uri: The URI from the m3u8 playlist (may be relative or absolute).
|
||||||
|
base_url: The URL of the playlist itself (used as base for relative URIs).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Absolute URL.
|
||||||
|
"""
|
||||||
|
if uri.startswith("http://") or uri.startswith("https://"):
|
||||||
|
return uri
|
||||||
|
return urljoin(base_url, uri)
|
||||||
|
|
||||||
|
|
||||||
|
def _is_playlist_uri(uri: str) -> bool:
|
||||||
|
"""Determine if a URI likely points to another playlist (vs a segment).
|
||||||
|
|
||||||
|
Playlist URIs end in .m3u8 or .m3u. Everything else is treated as a
|
||||||
|
segment (TS, fMP4, init segment, etc.).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uri: The URI to classify.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if the URI appears to be a playlist reference.
|
||||||
|
"""
|
||||||
|
# Strip query string for extension check
|
||||||
|
path = uri.split("?")[0].split("#")[0].lower()
|
||||||
|
return path.endswith(".m3u8") or path.endswith(".m3u")
|
||||||
|
|
||||||
|
|
||||||
|
def _build_proxy_url(absolute_uri: str, proxy_base: str) -> str:
|
||||||
|
"""Build a /proxy URL for a playlist reference.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
absolute_uri: The absolute URL of the upstream playlist.
|
||||||
|
proxy_base: The base URL of our proxy service.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Rewritten URL pointing to our /proxy endpoint.
|
||||||
|
"""
|
||||||
|
encoded = encode_url(absolute_uri)
|
||||||
|
return f"{proxy_base}/proxy?url={encoded}"
|
||||||
|
|
||||||
|
|
||||||
|
def _build_relay_url(absolute_uri: str, proxy_base: str) -> str:
|
||||||
|
"""Build a /relay URL for a segment reference.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
absolute_uri: The absolute URL of the upstream segment.
|
||||||
|
proxy_base: The base URL of our proxy service.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Rewritten URL pointing to our /relay endpoint.
|
||||||
|
"""
|
||||||
|
encoded = encode_url(absolute_uri)
|
||||||
|
return f"{proxy_base}/relay?url={encoded}"
|
||||||
|
|
||||||
|
|
||||||
|
def _rewrite_uri(uri: str, base_url: str, proxy_base: str) -> str:
|
||||||
|
"""Rewrite a single URI from an m3u8 playlist.
|
||||||
|
|
||||||
|
Resolves relative URIs, then routes playlists through /proxy and
|
||||||
|
segments through /relay.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uri: The raw URI from the playlist.
|
||||||
|
base_url: The URL of the playlist containing this URI.
|
||||||
|
proxy_base: The base URL of our proxy service.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Rewritten URI pointing to our proxy.
|
||||||
|
"""
|
||||||
|
absolute = _resolve_uri(uri, base_url)
|
||||||
|
if _is_playlist_uri(uri):
|
||||||
|
return _build_proxy_url(absolute, proxy_base)
|
||||||
|
return _build_relay_url(absolute, proxy_base)
|
||||||
|
|
||||||
|
|
||||||
|
def rewrite_playlist(content: str, base_url: str, proxy_base: str) -> str:
|
||||||
|
"""Rewrite all URIs in an m3u8 playlist to go through the proxy.
|
||||||
|
|
||||||
|
Handles both master playlists (with #EXT-X-STREAM-INF variant
|
||||||
|
references) and media playlists (with segment URIs). Also handles
|
||||||
|
#EXT-X-MAP:URI= init segment references.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
content: The raw m3u8 playlist text.
|
||||||
|
base_url: The original URL of this playlist (for resolving relative URIs).
|
||||||
|
proxy_base: The base URL of our proxy (e.g., "https://f1.viktorbarzin.me").
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The rewritten m3u8 playlist text with all URIs proxied.
|
||||||
|
"""
|
||||||
|
proxy_base = proxy_base.rstrip("/")
|
||||||
|
lines = content.splitlines()
|
||||||
|
output_lines: list[str] = []
|
||||||
|
|
||||||
|
# Track if the previous line was #EXT-X-STREAM-INF (next line is a variant URI)
|
||||||
|
next_is_variant = False
|
||||||
|
|
||||||
|
for line in lines:
|
||||||
|
stripped = line.strip()
|
||||||
|
|
||||||
|
# Handle #EXT-X-MAP:URI="..." (init segment)
|
||||||
|
if stripped.startswith("#EXT-X-MAP:"):
|
||||||
|
output_lines.append(_rewrite_ext_x_map(stripped, base_url, proxy_base))
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Handle #EXT-X-STREAM-INF (marks next line as variant playlist URI)
|
||||||
|
if stripped.startswith("#EXT-X-STREAM-INF:"):
|
||||||
|
output_lines.append(line)
|
||||||
|
next_is_variant = True
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Handle #EXT-X-MEDIA with URI= attribute
|
||||||
|
if stripped.startswith("#EXT-X-MEDIA:") and "URI=" in stripped:
|
||||||
|
output_lines.append(_rewrite_ext_x_media(stripped, base_url, proxy_base))
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Handle #EXT-X-I-FRAME-STREAM-INF with URI= attribute
|
||||||
|
if stripped.startswith("#EXT-X-I-FRAME-STREAM-INF:") and "URI=" in stripped:
|
||||||
|
output_lines.append(
|
||||||
|
_rewrite_tag_with_uri(stripped, base_url, proxy_base, is_playlist=True)
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# If previous line was #EXT-X-STREAM-INF, this line is a variant playlist URI
|
||||||
|
if next_is_variant and stripped and not stripped.startswith("#"):
|
||||||
|
absolute = _resolve_uri(stripped, base_url)
|
||||||
|
output_lines.append(_build_proxy_url(absolute, proxy_base))
|
||||||
|
next_is_variant = False
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Regular URI line (non-comment, non-empty, not a tag)
|
||||||
|
if stripped and not stripped.startswith("#"):
|
||||||
|
# This is a segment URI (TS, fMP4, etc.)
|
||||||
|
absolute = _resolve_uri(stripped, base_url)
|
||||||
|
output_lines.append(_build_relay_url(absolute, proxy_base))
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Tags and comments pass through unchanged
|
||||||
|
output_lines.append(line)
|
||||||
|
# Reset variant flag if we hit another tag
|
||||||
|
if stripped.startswith("#") and not stripped.startswith("#EXT-X-STREAM-INF:"):
|
||||||
|
next_is_variant = False
|
||||||
|
|
||||||
|
return "\n".join(output_lines)
|
||||||
|
|
||||||
|
|
||||||
|
def _rewrite_ext_x_map(line: str, base_url: str, proxy_base: str) -> str:
|
||||||
|
"""Rewrite the URI in an #EXT-X-MAP tag.
|
||||||
|
|
||||||
|
#EXT-X-MAP:URI="init.mp4" -> #EXT-X-MAP:URI="<relay_url>"
|
||||||
|
The init segment goes through /relay since it's binary data.
|
||||||
|
"""
|
||||||
|
# Match URI="..." or URI=... (with or without quotes)
|
||||||
|
match = re.search(r'URI="([^"]+)"', line)
|
||||||
|
if not match:
|
||||||
|
match = re.search(r"URI=([^,\s]+)", line)
|
||||||
|
|
||||||
|
if not match:
|
||||||
|
return line
|
||||||
|
|
||||||
|
original_uri = match.group(1)
|
||||||
|
absolute = _resolve_uri(original_uri, base_url)
|
||||||
|
relay_url = _build_relay_url(absolute, proxy_base)
|
||||||
|
|
||||||
|
return line[:match.start(1)] + relay_url + line[match.end(1):]
|
||||||
|
|
||||||
|
|
||||||
|
def _rewrite_ext_x_media(line: str, base_url: str, proxy_base: str) -> str:
|
||||||
|
"""Rewrite the URI in an #EXT-X-MEDIA tag.
|
||||||
|
|
||||||
|
#EXT-X-MEDIA:TYPE=AUDIO,...,URI="audio.m3u8" -> rewrite URI to /proxy
|
||||||
|
"""
|
||||||
|
return _rewrite_tag_with_uri(line, base_url, proxy_base, is_playlist=True)
|
||||||
|
|
||||||
|
|
||||||
|
def _rewrite_tag_with_uri(
|
||||||
|
line: str, base_url: str, proxy_base: str, is_playlist: bool = False,
|
||||||
|
) -> str:
|
||||||
|
"""Rewrite the URI attribute within an HLS tag line.
|
||||||
|
|
||||||
|
Generic handler for any tag that contains a URI="..." attribute.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
line: The full tag line.
|
||||||
|
base_url: Base URL for resolving relative URIs.
|
||||||
|
proxy_base: Our proxy base URL.
|
||||||
|
is_playlist: If True, route through /proxy; otherwise /relay.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The tag line with the URI rewritten.
|
||||||
|
"""
|
||||||
|
match = re.search(r'URI="([^"]+)"', line)
|
||||||
|
if not match:
|
||||||
|
match = re.search(r"URI=([^,\s]+)", line)
|
||||||
|
|
||||||
|
if not match:
|
||||||
|
return line
|
||||||
|
|
||||||
|
original_uri = match.group(1)
|
||||||
|
absolute = _resolve_uri(original_uri, base_url)
|
||||||
|
|
||||||
|
if is_playlist:
|
||||||
|
new_url = _build_proxy_url(absolute, proxy_base)
|
||||||
|
else:
|
||||||
|
new_url = _build_relay_url(absolute, proxy_base)
|
||||||
|
|
||||||
|
return line[:match.start(1)] + new_url + line[match.end(1):]
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
"""F1 Streams - FastAPI backend with schedule and stream extraction services."""
|
"""F1 Streams - FastAPI backend with schedule, stream extraction, health checking, and HLS proxy."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
@ -6,9 +6,12 @@ from contextlib import asynccontextmanager
|
||||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||||
from apscheduler.triggers.cron import CronTrigger
|
from apscheduler.triggers.cron import CronTrigger
|
||||||
from apscheduler.triggers.interval import IntervalTrigger
|
from apscheduler.triggers.interval import IntervalTrigger
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI, Query, Request
|
||||||
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
from starlette.responses import Response, StreamingResponse
|
||||||
|
|
||||||
from backend.extractors import create_extraction_service
|
from backend.extractors import create_extraction_service
|
||||||
|
from backend.proxy import proxy_playlist, relay_stream
|
||||||
from backend.schedule import ScheduleService
|
from backend.schedule import ScheduleService
|
||||||
|
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
|
|
@ -108,6 +111,16 @@ async def lifespan(app: FastAPI):
|
||||||
|
|
||||||
app = FastAPI(title="F1 Streams", lifespan=lifespan)
|
app = FastAPI(title="F1 Streams", lifespan=lifespan)
|
||||||
|
|
||||||
|
# --- CORS Middleware ---
|
||||||
|
# Required for browser-based HLS players to access proxy/relay endpoints
|
||||||
|
app.add_middleware(
|
||||||
|
CORSMiddleware,
|
||||||
|
allow_origins=["*"],
|
||||||
|
allow_methods=["GET", "OPTIONS"],
|
||||||
|
allow_headers=["Range"],
|
||||||
|
expose_headers=["Content-Range", "Content-Length", "Content-Type"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# --- Health & Info ---
|
# --- Health & Info ---
|
||||||
|
|
||||||
|
|
@ -119,7 +132,7 @@ async def health():
|
||||||
|
|
||||||
@app.get("/")
|
@app.get("/")
|
||||||
async def root():
|
async def root():
|
||||||
return {"service": "f1-streams", "version": "3.0.0"}
|
return {"service": "f1-streams", "version": "4.0.0"}
|
||||||
|
|
||||||
|
|
||||||
# --- Schedule ---
|
# --- Schedule ---
|
||||||
|
|
@ -143,7 +156,12 @@ async def refresh_schedule():
|
||||||
|
|
||||||
@app.get("/streams")
|
@app.get("/streams")
|
||||||
async def get_streams():
|
async def get_streams():
|
||||||
"""Return all currently cached streams from all extractors."""
|
"""Return all currently cached streams that passed health checks.
|
||||||
|
|
||||||
|
Streams are sorted by fallback priority:
|
||||||
|
1. Live streams only (is_live=True)
|
||||||
|
2. Fastest response time first (lowest response_time_ms)
|
||||||
|
"""
|
||||||
streams = extraction_service.get_streams()
|
streams = extraction_service.get_streams()
|
||||||
return {
|
return {
|
||||||
"streams": streams,
|
"streams": streams,
|
||||||
|
|
@ -151,6 +169,20 @@ async def get_streams():
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/streams/all")
|
||||||
|
async def get_all_streams():
|
||||||
|
"""Return ALL cached streams including unhealthy ones (for debugging).
|
||||||
|
|
||||||
|
Unlike GET /streams, this endpoint includes streams that failed health
|
||||||
|
checks. Useful for diagnosing extraction or health check issues.
|
||||||
|
"""
|
||||||
|
streams = extraction_service.get_all_streams_unfiltered()
|
||||||
|
return {
|
||||||
|
"streams": streams,
|
||||||
|
"count": len(streams),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@app.get("/extractors")
|
@app.get("/extractors")
|
||||||
async def get_extractors():
|
async def get_extractors():
|
||||||
"""List registered extractors and their current status."""
|
"""List registered extractors and their current status."""
|
||||||
|
|
@ -165,10 +197,82 @@ async def trigger_extraction():
|
||||||
return {
|
return {
|
||||||
"status": "extraction_complete",
|
"status": "extraction_complete",
|
||||||
"streams_found": status["total_cached_streams"],
|
"streams_found": status["total_cached_streams"],
|
||||||
|
"live_streams": status["total_live_streams"],
|
||||||
"extractors_run": len(status["extractors"]),
|
"extractors_run": len(status["extractors"]),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# --- HLS Proxy ---
|
||||||
|
|
||||||
|
|
||||||
|
def _get_proxy_base(request: Request) -> str:
|
||||||
|
"""Derive the proxy base URL from the incoming request.
|
||||||
|
|
||||||
|
Uses X-Forwarded-Proto and X-Forwarded-Host headers if present
|
||||||
|
(behind a reverse proxy), otherwise falls back to request URL.
|
||||||
|
"""
|
||||||
|
proto = request.headers.get("x-forwarded-proto", request.url.scheme)
|
||||||
|
host = request.headers.get("x-forwarded-host", request.url.netloc)
|
||||||
|
return f"{proto}://{host}"
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/proxy")
|
||||||
|
async def proxy_endpoint(
|
||||||
|
request: Request,
|
||||||
|
url: str = Query(..., description="Base64url-encoded m3u8 playlist URL"),
|
||||||
|
):
|
||||||
|
"""Proxy an upstream m3u8 playlist with URI rewriting.
|
||||||
|
|
||||||
|
Fetches the upstream m3u8 playlist, rewrites all URIs to route through
|
||||||
|
our /proxy (for sub-playlists) and /relay (for segments) endpoints,
|
||||||
|
and returns the rewritten playlist.
|
||||||
|
|
||||||
|
The `url` parameter must be base64url-encoded to avoid URL encoding issues.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
GET /proxy?url=aHR0cHM6Ly9leGFtcGxlLmNvbS9zdHJlYW0ubTN1OA
|
||||||
|
"""
|
||||||
|
proxy_base = _get_proxy_base(request)
|
||||||
|
rewritten = await proxy_playlist(url, proxy_base)
|
||||||
|
|
||||||
|
return Response(
|
||||||
|
content=rewritten,
|
||||||
|
media_type="application/vnd.apple.mpegurl",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache, no-store, must-revalidate",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/relay")
|
||||||
|
async def relay_endpoint(
|
||||||
|
request: Request,
|
||||||
|
url: str = Query(..., description="Base64url-encoded segment URL"),
|
||||||
|
):
|
||||||
|
"""Relay an upstream media segment as a chunked byte stream.
|
||||||
|
|
||||||
|
Fetches the upstream segment (TS, fMP4, init segment, etc.) and streams
|
||||||
|
it to the client using chunked transfer encoding. Never buffers the
|
||||||
|
full segment in memory.
|
||||||
|
|
||||||
|
The `url` parameter must be base64url-encoded to avoid URL encoding issues.
|
||||||
|
|
||||||
|
Supports HTTP Range requests for seeking.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
GET /relay?url=aHR0cHM6Ly9leGFtcGxlLmNvbS9zZWdtZW50LnRz
|
||||||
|
"""
|
||||||
|
range_header = request.headers.get("range")
|
||||||
|
|
||||||
|
stream_gen, headers, status_code = await relay_stream(url, range_header)
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
stream_gen,
|
||||||
|
status_code=status_code,
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
|
||||||
|
|
|
||||||
229
stacks/f1-stream/files/backend/proxy.py
Normal file
229
stacks/f1-stream/files/backend/proxy.py
Normal file
|
|
@ -0,0 +1,229 @@
|
||||||
|
"""HLS proxy - fetches upstream m3u8 playlists and relays media segments.
|
||||||
|
|
||||||
|
Two core functions:
|
||||||
|
1. Playlist proxy: fetches an upstream m3u8 playlist, rewrites all URIs
|
||||||
|
to route through our /proxy and /relay endpoints, returns the rewritten
|
||||||
|
playlist to the client.
|
||||||
|
2. Segment relay: fetches an upstream media segment (TS, fMP4, init) and
|
||||||
|
streams it to the client using chunked transfer encoding, never buffering
|
||||||
|
the full segment in memory.
|
||||||
|
|
||||||
|
All responses include CORS headers for browser playback.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import AsyncGenerator
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from fastapi import HTTPException
|
||||||
|
|
||||||
|
from backend.m3u8_rewriter import decode_url, rewrite_playlist
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Chunk size for relay streaming (64 KB)
|
||||||
|
RELAY_CHUNK_SIZE = 65536
|
||||||
|
|
||||||
|
# Timeout for upstream playlist fetches (seconds)
|
||||||
|
PLAYLIST_TIMEOUT = 15.0
|
||||||
|
|
||||||
|
# Timeout for upstream segment relay - longer because segments are bigger
|
||||||
|
RELAY_TIMEOUT = 30.0
|
||||||
|
|
||||||
|
# User-Agent for upstream requests
|
||||||
|
USER_AGENT = (
|
||||||
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||||||
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||||||
|
"Chrome/120.0.0.0 Safari/537.36"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def proxy_playlist(encoded_url: str, proxy_base: str) -> str:
|
||||||
|
"""Fetch an upstream m3u8 playlist and rewrite all URIs through our proxy.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
encoded_url: Base64url-encoded URL of the upstream m3u8 playlist.
|
||||||
|
proxy_base: The base URL of our proxy service for rewriting URIs
|
||||||
|
(e.g., "https://f1.viktorbarzin.me").
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The rewritten m3u8 playlist text.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException: If the URL can't be decoded, upstream fails, or
|
||||||
|
content is not a valid HLS playlist.
|
||||||
|
"""
|
||||||
|
# Decode the URL
|
||||||
|
try:
|
||||||
|
url = decode_url(encoded_url)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to decode proxy URL: %s", e)
|
||||||
|
raise HTTPException(status_code=400, detail=f"Invalid encoded URL: {e}")
|
||||||
|
|
||||||
|
logger.info("Proxying playlist: %s", url)
|
||||||
|
|
||||||
|
# Fetch the upstream playlist
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(
|
||||||
|
timeout=PLAYLIST_TIMEOUT,
|
||||||
|
follow_redirects=True,
|
||||||
|
headers={
|
||||||
|
"User-Agent": USER_AGENT,
|
||||||
|
"Accept": "*/*",
|
||||||
|
},
|
||||||
|
) as client:
|
||||||
|
response = await client.get(url)
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
logger.warning(
|
||||||
|
"Upstream playlist returned HTTP %d for %s",
|
||||||
|
response.status_code,
|
||||||
|
url,
|
||||||
|
)
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=502,
|
||||||
|
detail=f"Upstream returned HTTP {response.status_code}",
|
||||||
|
)
|
||||||
|
|
||||||
|
content = response.text
|
||||||
|
|
||||||
|
except httpx.TimeoutException:
|
||||||
|
logger.error("Timeout fetching upstream playlist: %s", url)
|
||||||
|
raise HTTPException(status_code=504, detail="Upstream playlist timeout")
|
||||||
|
except httpx.HTTPError as e:
|
||||||
|
logger.error("HTTP error fetching upstream playlist: %s - %s", url, e)
|
||||||
|
raise HTTPException(status_code=502, detail=f"Upstream error: {e}")
|
||||||
|
except HTTPException:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception("Unexpected error fetching playlist: %s", url)
|
||||||
|
raise HTTPException(status_code=500, detail=f"Internal error: {e}")
|
||||||
|
|
||||||
|
# Validate it looks like an m3u8 playlist
|
||||||
|
if "#EXTM3U" not in content:
|
||||||
|
logger.warning("Upstream response is not a valid m3u8 playlist: %s", url)
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=502,
|
||||||
|
detail="Upstream response is not a valid HLS playlist",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Rewrite all URIs to go through our proxy
|
||||||
|
rewritten = rewrite_playlist(content, url, proxy_base)
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
"Proxied playlist from %s: %d bytes -> %d bytes",
|
||||||
|
url,
|
||||||
|
len(content),
|
||||||
|
len(rewritten),
|
||||||
|
)
|
||||||
|
|
||||||
|
return rewritten
|
||||||
|
|
||||||
|
|
||||||
|
async def relay_stream(
|
||||||
|
encoded_url: str,
|
||||||
|
range_header: str | None = None,
|
||||||
|
) -> tuple[AsyncGenerator[bytes, None], dict[str, str], int]:
|
||||||
|
"""Relay an upstream media segment as a chunked byte stream.
|
||||||
|
|
||||||
|
Never buffers the full segment in memory. Streams chunks as they
|
||||||
|
arrive from the upstream server.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
encoded_url: Base64url-encoded URL of the upstream segment.
|
||||||
|
range_header: Optional HTTP Range header from the client to
|
||||||
|
forward to upstream.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A tuple of (async_generator, headers_dict, status_code) where:
|
||||||
|
- async_generator yields bytes chunks
|
||||||
|
- headers_dict contains content-type and other relevant headers
|
||||||
|
- status_code is the HTTP status (200 or 206)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException: If the URL can't be decoded or upstream fails.
|
||||||
|
"""
|
||||||
|
# Decode the URL
|
||||||
|
try:
|
||||||
|
url = decode_url(encoded_url)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to decode relay URL: %s", e)
|
||||||
|
raise HTTPException(status_code=400, detail=f"Invalid encoded URL: {e}")
|
||||||
|
|
||||||
|
logger.debug("Relaying segment: %s", url)
|
||||||
|
|
||||||
|
# Build upstream request headers
|
||||||
|
headers = {
|
||||||
|
"User-Agent": USER_AGENT,
|
||||||
|
"Accept": "*/*",
|
||||||
|
}
|
||||||
|
if range_header:
|
||||||
|
headers["Range"] = range_header
|
||||||
|
|
||||||
|
# Create the client and stream - caller is responsible for cleanup
|
||||||
|
# via the async generator protocol
|
||||||
|
client = httpx.AsyncClient(
|
||||||
|
timeout=RELAY_TIMEOUT,
|
||||||
|
follow_redirects=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = await client.send(
|
||||||
|
client.build_request("GET", url, headers=headers),
|
||||||
|
stream=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code not in (200, 206):
|
||||||
|
await response.aclose()
|
||||||
|
await client.aclose()
|
||||||
|
logger.warning(
|
||||||
|
"Upstream segment returned HTTP %d for %s",
|
||||||
|
response.status_code,
|
||||||
|
url,
|
||||||
|
)
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=502,
|
||||||
|
detail=f"Upstream returned HTTP {response.status_code}",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Collect relevant response headers to forward
|
||||||
|
response_headers: dict[str, str] = {}
|
||||||
|
|
||||||
|
content_type = response.headers.get("content-type", "video/mp2t")
|
||||||
|
response_headers["Content-Type"] = content_type
|
||||||
|
|
||||||
|
if "content-length" in response.headers:
|
||||||
|
response_headers["Content-Length"] = response.headers["content-length"]
|
||||||
|
|
||||||
|
if "content-range" in response.headers:
|
||||||
|
response_headers["Content-Range"] = response.headers["content-range"]
|
||||||
|
|
||||||
|
status_code = response.status_code
|
||||||
|
|
||||||
|
async def _stream_chunks() -> AsyncGenerator[bytes, None]:
|
||||||
|
"""Yield chunks from the upstream response, then clean up."""
|
||||||
|
try:
|
||||||
|
async for chunk in response.aiter_bytes(chunk_size=RELAY_CHUNK_SIZE):
|
||||||
|
yield chunk
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error streaming segment from %s: %s", url, e)
|
||||||
|
finally:
|
||||||
|
await response.aclose()
|
||||||
|
await client.aclose()
|
||||||
|
|
||||||
|
return _stream_chunks(), response_headers, status_code
|
||||||
|
|
||||||
|
except HTTPException:
|
||||||
|
raise
|
||||||
|
except httpx.TimeoutException:
|
||||||
|
await client.aclose()
|
||||||
|
logger.error("Timeout relaying segment: %s", url)
|
||||||
|
raise HTTPException(status_code=504, detail="Upstream segment timeout")
|
||||||
|
except httpx.HTTPError as e:
|
||||||
|
await client.aclose()
|
||||||
|
logger.error("HTTP error relaying segment: %s - %s", url, e)
|
||||||
|
raise HTTPException(status_code=502, detail=f"Upstream error: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
await client.aclose()
|
||||||
|
logger.exception("Unexpected error relaying segment: %s", url)
|
||||||
|
raise HTTPException(status_code=500, detail=f"Internal error: {e}")
|
||||||
Loading…
Add table
Add a link
Reference in a new issue