Add three new extractors (Streamed.pk, DaddyLive, Aceztrims) for live F1 streams. Extend ExtractedStream model with stream_type/embed_url fields, skip health checks for embed streams, fix broken Akamai demo stream, add variant playlist validation, and add iframe player support in the frontend for embed-type streams.
202 lines
7.7 KiB
Python
202 lines
7.7 KiB
Python
"""Extraction service - manages extraction lifecycle: polling, caching, health checking, serving."""
|
|
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
from backend.extractors.models import ExtractedStream
|
|
from backend.extractors.registry import ExtractorRegistry
|
|
from backend.health import StreamHealthChecker
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ExtractionService:
|
|
"""Manages the extraction lifecycle: polling, caching, health checking, and serving.
|
|
|
|
Extraction runs on a background schedule (via APScheduler), never on
|
|
client request path. After extraction, health checks verify each stream
|
|
is live. Results are cached in memory, keyed by site_key.
|
|
|
|
GET /streams only returns streams that passed health checks, sorted by:
|
|
1. is_live (live streams first)
|
|
2. response_time_ms (fastest first)
|
|
"""
|
|
|
|
def __init__(self, registry: ExtractorRegistry) -> None:
|
|
self._registry = registry
|
|
# Cache: site_key -> list of ExtractedStream
|
|
self._cache: dict[str, list[ExtractedStream]] = {}
|
|
self._last_run: str | None = None
|
|
self._last_run_stream_count: int = 0
|
|
self._health_checker = StreamHealthChecker()
|
|
|
|
async def run_extraction(self) -> None:
|
|
"""Run all extractors, health-check results, and cache them.
|
|
|
|
This is called by the background scheduler. Each extractor's
|
|
results replace its previous cache entry entirely. After extraction,
|
|
health checks are run to verify streams are live and measure
|
|
response times.
|
|
"""
|
|
logger.info("Starting extraction run...")
|
|
start = datetime.now(timezone.utc)
|
|
|
|
streams = await self._registry.extract_all()
|
|
|
|
# Run health checks on all extracted streams
|
|
if streams:
|
|
# Separate m3u8 streams (need health check) from embed streams (skip)
|
|
m3u8_streams = [s for s in streams if s.stream_type != "embed"]
|
|
embed_streams = [s for s in streams if s.stream_type == "embed"]
|
|
|
|
# Mark embed streams as live (no health check possible for iframes)
|
|
for stream in embed_streams:
|
|
stream.is_live = True
|
|
stream.response_time_ms = 0
|
|
stream.checked_at = start.isoformat()
|
|
|
|
# Health-check only m3u8 streams
|
|
if m3u8_streams:
|
|
stream_dicts = [s.to_dict() for s in m3u8_streams]
|
|
health_map = await self._health_checker.check_all(stream_dicts)
|
|
|
|
for stream in m3u8_streams:
|
|
health = health_map.get(stream.url)
|
|
if health:
|
|
stream.is_live = health.is_live
|
|
stream.response_time_ms = health.response_time_ms
|
|
stream.checked_at = health.checked_at
|
|
if health.bitrate > 0:
|
|
stream.bitrate = health.bitrate
|
|
|
|
# Group streams by site_key and update cache
|
|
new_cache: dict[str, list[ExtractedStream]] = {}
|
|
for stream in streams:
|
|
new_cache.setdefault(stream.site_key, []).append(stream)
|
|
|
|
# Replace cache for extractors that returned results.
|
|
# Clear cache for extractors that returned nothing (site went down, etc.)
|
|
for extractor_info in self._registry.list_extractors():
|
|
key = extractor_info["site_key"]
|
|
if key in new_cache:
|
|
self._cache[key] = new_cache[key]
|
|
else:
|
|
# Extractor returned nothing - clear its cache
|
|
self._cache.pop(key, None)
|
|
|
|
self._last_run = start.isoformat()
|
|
self._last_run_stream_count = len(streams)
|
|
|
|
live_count = sum(
|
|
1 for streams_list in self._cache.values()
|
|
for s in streams_list if s.is_live
|
|
)
|
|
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
|
|
logger.info(
|
|
"Extraction run complete: %d stream(s) from %d extractor(s) in %.1fs (%d live)",
|
|
len(streams),
|
|
len(new_cache),
|
|
elapsed,
|
|
live_count,
|
|
)
|
|
|
|
def get_streams(self) -> list[dict]:
|
|
"""Return all cached streams as a sorted list of dicts.
|
|
|
|
Only returns streams that passed health checks (is_live=True).
|
|
Sorted by fallback priority:
|
|
1. is_live (live streams first) - filters to live only
|
|
2. response_time_ms (fastest first)
|
|
|
|
Returns:
|
|
List of serialized ExtractedStream dicts from all extractors,
|
|
filtered to live-only and sorted by response time.
|
|
"""
|
|
all_streams: list[ExtractedStream] = []
|
|
for streams in self._cache.values():
|
|
all_streams.extend(streams)
|
|
|
|
# Sort by fallback priority: live first, then fastest response
|
|
all_streams.sort(
|
|
key=lambda s: (not s.is_live, s.response_time_ms)
|
|
)
|
|
|
|
# Only return live streams to clients
|
|
live_streams = [s for s in all_streams if s.is_live]
|
|
return [s.to_dict() for s in live_streams]
|
|
|
|
def get_all_streams_unfiltered(self) -> list[dict]:
|
|
"""Return ALL cached streams including unhealthy ones.
|
|
|
|
Used for debugging and status endpoints. Sorted by fallback priority
|
|
but includes streams that failed health checks.
|
|
|
|
Returns:
|
|
List of all serialized ExtractedStream dicts.
|
|
"""
|
|
all_streams: list[ExtractedStream] = []
|
|
for streams in self._cache.values():
|
|
all_streams.extend(streams)
|
|
|
|
# Sort by fallback priority: live first, then fastest response
|
|
all_streams.sort(
|
|
key=lambda s: (not s.is_live, s.response_time_ms)
|
|
)
|
|
|
|
return [s.to_dict() for s in all_streams]
|
|
|
|
def get_streams_for_session(self, session_type: str) -> list[dict]:
|
|
"""Return cached streams filtered/annotated for a specific session type.
|
|
|
|
Currently returns all live streams (extractors don't yet differentiate by
|
|
session type). This method exists as a hook for future filtering,
|
|
e.g., some extractors might only have race streams but not FP streams.
|
|
|
|
Args:
|
|
session_type: The F1 session type (e.g., "race", "qualifying", "fp1").
|
|
|
|
Returns:
|
|
List of serialized ExtractedStream dicts (live only, sorted).
|
|
"""
|
|
# For now, all streams are potentially relevant to any session.
|
|
# Future extractors may tag streams with session types, at which
|
|
# point this method will filter accordingly.
|
|
streams = self.get_streams()
|
|
logger.debug(
|
|
"Returning %d stream(s) for session type '%s'",
|
|
len(streams),
|
|
session_type,
|
|
)
|
|
return streams
|
|
|
|
def get_status(self) -> dict:
|
|
"""Return extraction service status for the /extractors endpoint."""
|
|
extractor_list = self._registry.list_extractors()
|
|
extractor_statuses = []
|
|
|
|
for info in extractor_list:
|
|
key = info["site_key"]
|
|
cached = self._cache.get(key, [])
|
|
live_count = sum(1 for s in cached if s.is_live)
|
|
extractor_statuses.append(
|
|
{
|
|
"site_key": key,
|
|
"site_name": info["site_name"],
|
|
"cached_streams": len(cached),
|
|
"live_streams": live_count,
|
|
}
|
|
)
|
|
|
|
total_cached = sum(len(streams) for streams in self._cache.values())
|
|
total_live = sum(
|
|
1 for streams in self._cache.values()
|
|
for s in streams if s.is_live
|
|
)
|
|
|
|
return {
|
|
"extractors": extractor_statuses,
|
|
"total_cached_streams": total_cached,
|
|
"total_live_streams": total_live,
|
|
"last_run": self._last_run,
|
|
"last_run_stream_count": self._last_run_stream_count,
|
|
}
|