- Phase 6: CDN token lifecycle with 3-strategy URL matching and periodic refresh - Phase 7: SvelteKit 2/Svelte 5 frontend with schedule calendar and hls.js player - Phase 8: Multi-stream layout supporting up to 4 simultaneous HLS streams - Update Dockerfile to multi-stage build (Node.js frontend + Python backend) - Switch deployment to :latest tag with Always pull policy for CI-driven deploys - Update Woodpecker CI to use explicit latest tag
362 lines
13 KiB
Python
362 lines
13 KiB
Python
"""Token refresh manager - keeps CDN tokens fresh for active streams.
|
|
|
|
CDN tokens embedded in stream URLs expire after 5-30 minutes. During a 2+ hour
|
|
F1 session, URLs must be refreshed before they expire. This manager periodically
|
|
re-runs the extractor that found each active stream to get a fresh URL with a
|
|
new CDN token.
|
|
|
|
Usage:
|
|
1. When a user starts watching, call mark_stream_active(url, site_key)
|
|
2. The background scheduler calls refresh_active_streams() every 4 minutes
|
|
3. The proxy calls get_fresh_url(url) to resolve the latest URL
|
|
4. When the user stops watching, call mark_stream_inactive(url)
|
|
"""
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class ActiveStream:
|
|
"""Tracks a stream that a user is currently watching.
|
|
|
|
The original_url is the URL the user initially activated. After a token
|
|
refresh, current_url may differ (new CDN token, different edge server, etc.)
|
|
but the original_url remains the key for lookups.
|
|
"""
|
|
|
|
original_url: str
|
|
current_url: str # May differ from original after refresh
|
|
site_key: str
|
|
last_refreshed: str
|
|
refresh_count: int = 0
|
|
last_error: str = ""
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Serialize to a plain dictionary for JSON responses."""
|
|
return {
|
|
"original_url": self.original_url,
|
|
"current_url": self.current_url,
|
|
"site_key": self.site_key,
|
|
"last_refreshed": self.last_refreshed,
|
|
"refresh_count": self.refresh_count,
|
|
"last_error": self.last_error,
|
|
}
|
|
|
|
|
|
class TokenRefreshManager:
|
|
"""Manages background token refresh for active streams.
|
|
|
|
When a user is watching a stream, the manager periodically re-runs
|
|
the extractor that found it to get a fresh URL with a new token.
|
|
The fresh URL is stored so the /proxy endpoint can use it on the
|
|
next playlist fetch.
|
|
"""
|
|
|
|
def __init__(self, extraction_service) -> None:
|
|
"""Initialize the token refresh manager.
|
|
|
|
Args:
|
|
extraction_service: The ExtractionService instance used to
|
|
re-run extractors and look up streams by site_key.
|
|
"""
|
|
# Import here to avoid circular imports at module level
|
|
from backend.extractors.service import ExtractionService
|
|
|
|
self._extraction_service: ExtractionService = extraction_service
|
|
self._active_streams: dict[str, ActiveStream] = {}
|
|
self._refresh_interval = 240 # 4 minutes (safe margin for 5-min tokens)
|
|
|
|
@property
|
|
def refresh_interval(self) -> int:
|
|
"""Refresh interval in seconds."""
|
|
return self._refresh_interval
|
|
|
|
@property
|
|
def has_active_streams(self) -> bool:
|
|
"""Whether there are any active streams being watched."""
|
|
return len(self._active_streams) > 0
|
|
|
|
def mark_stream_active(self, url: str, site_key: str) -> None:
|
|
"""Mark a stream as being actively watched.
|
|
|
|
If the stream is already active, this is a no-op (idempotent).
|
|
|
|
Args:
|
|
url: The stream URL the user is watching.
|
|
site_key: The extractor site_key that found this stream.
|
|
"""
|
|
if url in self._active_streams:
|
|
logger.debug("Stream already active: %s", url)
|
|
return
|
|
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
self._active_streams[url] = ActiveStream(
|
|
original_url=url,
|
|
current_url=url,
|
|
site_key=site_key,
|
|
last_refreshed=now,
|
|
)
|
|
logger.info(
|
|
"Stream marked active: %s (site_key=%s, total_active=%d)",
|
|
url,
|
|
site_key,
|
|
len(self._active_streams),
|
|
)
|
|
|
|
def mark_stream_inactive(self, url: str) -> None:
|
|
"""Mark a stream as no longer watched.
|
|
|
|
If the stream is not active, this is a no-op.
|
|
|
|
Args:
|
|
url: The original stream URL to deactivate.
|
|
"""
|
|
removed = self._active_streams.pop(url, None)
|
|
if removed:
|
|
logger.info(
|
|
"Stream marked inactive: %s (was refreshed %d times, total_active=%d)",
|
|
url,
|
|
removed.refresh_count,
|
|
len(self._active_streams),
|
|
)
|
|
else:
|
|
logger.debug("Stream was not active, nothing to deactivate: %s", url)
|
|
|
|
async def refresh_active_streams(self) -> None:
|
|
"""Re-run extractors for all active streams to get fresh URLs.
|
|
|
|
For each active stream, re-runs the extractor that originally found it
|
|
and tries to match the stream in the new results. If a match is found,
|
|
updates the current_url. If not, the previous URL is kept (it may still
|
|
work until its token expires).
|
|
|
|
This method is called by the background scheduler every 4 minutes.
|
|
Token refresh failures are logged but never crash the process.
|
|
"""
|
|
if not self._active_streams:
|
|
logger.debug("No active streams to refresh")
|
|
return
|
|
|
|
logger.info(
|
|
"Refreshing tokens for %d active stream(s)...",
|
|
len(self._active_streams),
|
|
)
|
|
|
|
# Group active streams by site_key to avoid re-running the same
|
|
# extractor multiple times
|
|
streams_by_site: dict[str, list[ActiveStream]] = {}
|
|
for stream in self._active_streams.values():
|
|
streams_by_site.setdefault(stream.site_key, []).append(stream)
|
|
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
for site_key, active_list in streams_by_site.items():
|
|
try:
|
|
await self._refresh_site(site_key, active_list, now)
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed to refresh tokens for site_key=%s", site_key
|
|
)
|
|
# Mark the error on all streams from this site
|
|
for stream in active_list:
|
|
stream.last_error = f"Refresh failed at {now}"
|
|
|
|
async def _refresh_site(
|
|
self, site_key: str, active_list: list[ActiveStream], now: str
|
|
) -> None:
|
|
"""Re-run a single extractor and update active streams from its results.
|
|
|
|
Args:
|
|
site_key: The extractor's site_key.
|
|
active_list: List of ActiveStream objects from this extractor.
|
|
now: ISO timestamp for this refresh cycle.
|
|
"""
|
|
registry = self._extraction_service._registry
|
|
extractor = registry.get(site_key)
|
|
|
|
if extractor is None:
|
|
logger.warning(
|
|
"Extractor '%s' not found in registry, skipping refresh",
|
|
site_key,
|
|
)
|
|
for stream in active_list:
|
|
stream.last_error = f"Extractor '{site_key}' not found"
|
|
return
|
|
|
|
logger.info(
|
|
"Re-running extractor '%s' for token refresh (%d active stream(s))",
|
|
site_key,
|
|
len(active_list),
|
|
)
|
|
|
|
# Re-run the extractor to get fresh URLs
|
|
try:
|
|
fresh_streams = await extractor.extract()
|
|
except Exception as e:
|
|
logger.error(
|
|
"Extractor '%s' failed during token refresh: %s", site_key, e
|
|
)
|
|
for stream in active_list:
|
|
stream.last_error = f"Extraction failed: {e}"
|
|
return
|
|
|
|
if not fresh_streams:
|
|
logger.warning(
|
|
"Extractor '%s' returned no streams during token refresh",
|
|
site_key,
|
|
)
|
|
for stream in active_list:
|
|
stream.last_error = "Extractor returned no streams"
|
|
return
|
|
|
|
# Build a lookup of fresh URLs by quality+title for matching
|
|
# Since the URL itself changes (new token), we match by metadata
|
|
fresh_by_key: dict[str, str] = {}
|
|
for fs in fresh_streams:
|
|
# Use quality+title as a matching key (these stay the same across refreshes)
|
|
match_key = f"{fs.quality}|{fs.title}"
|
|
fresh_by_key[match_key] = fs.url
|
|
|
|
# Also keep all fresh URLs for fallback matching
|
|
all_fresh_urls = [fs.url for fs in fresh_streams]
|
|
|
|
for stream in active_list:
|
|
# Try to find the matching stream in fresh results
|
|
# Strategy 1: Match by quality+title
|
|
match_key = self._build_match_key(stream)
|
|
if match_key and match_key in fresh_by_key:
|
|
new_url = fresh_by_key[match_key]
|
|
if new_url != stream.current_url:
|
|
logger.info(
|
|
"Token refreshed for stream (quality+title match): %s -> %s",
|
|
stream.current_url[:80],
|
|
new_url[:80],
|
|
)
|
|
stream.current_url = new_url
|
|
stream.last_refreshed = now
|
|
stream.refresh_count += 1
|
|
stream.last_error = ""
|
|
continue
|
|
|
|
# Strategy 2: Match by URL path similarity (ignoring query params / tokens)
|
|
matched_url = self._find_url_by_path(stream.current_url, all_fresh_urls)
|
|
if matched_url:
|
|
if matched_url != stream.current_url:
|
|
logger.info(
|
|
"Token refreshed for stream (path match): %s -> %s",
|
|
stream.current_url[:80],
|
|
matched_url[:80],
|
|
)
|
|
stream.current_url = matched_url
|
|
stream.last_refreshed = now
|
|
stream.refresh_count += 1
|
|
stream.last_error = ""
|
|
continue
|
|
|
|
# Strategy 3: If only one fresh stream, assume it's the same
|
|
if len(all_fresh_urls) == 1:
|
|
new_url = all_fresh_urls[0]
|
|
if new_url != stream.current_url:
|
|
logger.info(
|
|
"Token refreshed for stream (single result fallback): %s -> %s",
|
|
stream.current_url[:80],
|
|
new_url[:80],
|
|
)
|
|
stream.current_url = new_url
|
|
stream.last_refreshed = now
|
|
stream.refresh_count += 1
|
|
stream.last_error = ""
|
|
continue
|
|
|
|
# No match found - keep the old URL and log
|
|
logger.warning(
|
|
"Could not match active stream to fresh results: %s",
|
|
stream.original_url[:80],
|
|
)
|
|
stream.last_error = "No matching stream in fresh results"
|
|
|
|
def _build_match_key(self, stream: ActiveStream) -> str:
|
|
"""Build a match key from cached stream metadata.
|
|
|
|
Looks up the stream in the extraction service cache to get
|
|
quality and title metadata for matching.
|
|
|
|
Returns:
|
|
A match key string, or empty string if metadata not found.
|
|
"""
|
|
# Look up the stream in the extraction cache
|
|
cached_streams = self._extraction_service._cache.get(stream.site_key, [])
|
|
for cs in cached_streams:
|
|
if cs.url == stream.current_url or cs.url == stream.original_url:
|
|
return f"{cs.quality}|{cs.title}"
|
|
return ""
|
|
|
|
@staticmethod
|
|
def _find_url_by_path(current_url: str, fresh_urls: list[str]) -> str | None:
|
|
"""Find a fresh URL that matches the current URL by path (ignoring query params).
|
|
|
|
CDN token refreshes typically change query parameters but keep the
|
|
same path structure. This matcher strips query params and compares
|
|
the path component.
|
|
|
|
Args:
|
|
current_url: The current (possibly expired) URL.
|
|
fresh_urls: List of fresh URLs to match against.
|
|
|
|
Returns:
|
|
The matching fresh URL, or None if no match.
|
|
"""
|
|
from urllib.parse import urlparse
|
|
|
|
current_parsed = urlparse(current_url)
|
|
current_path = current_parsed.path
|
|
|
|
for fresh_url in fresh_urls:
|
|
fresh_parsed = urlparse(fresh_url)
|
|
# Match on host + path (token is typically in query string)
|
|
if (
|
|
fresh_parsed.netloc == current_parsed.netloc
|
|
and fresh_parsed.path == current_path
|
|
):
|
|
return fresh_url
|
|
|
|
return None
|
|
|
|
def get_fresh_url(self, original_url: str) -> str:
|
|
"""Get the latest URL for a stream (may have changed due to token refresh).
|
|
|
|
If the stream is not active or has not been refreshed, returns the
|
|
original URL unchanged.
|
|
|
|
Args:
|
|
original_url: The URL to look up (can be the original or any
|
|
previous current_url).
|
|
|
|
Returns:
|
|
The most recent URL for this stream.
|
|
"""
|
|
# Direct lookup by original URL
|
|
stream = self._active_streams.get(original_url)
|
|
if stream:
|
|
return stream.current_url
|
|
|
|
# Also check if the URL matches any current_url (in case the caller
|
|
# is using an intermediate refreshed URL)
|
|
for stream in self._active_streams.values():
|
|
if stream.current_url == original_url:
|
|
return stream.current_url
|
|
|
|
# Not an active stream - return as-is
|
|
return original_url
|
|
|
|
def get_active_streams(self) -> list[dict]:
|
|
"""Return all active streams with their refresh status.
|
|
|
|
Returns:
|
|
List of serialized ActiveStream dicts.
|
|
"""
|
|
return [stream.to_dict() for stream in self._active_streams.values()]
|