[ci skip] f1-stream: add CDN token refresh, SvelteKit frontend, multi-stream layout (Phases 6-8)

- Phase 6: CDN token lifecycle with 3-strategy URL matching and periodic refresh
- Phase 7: SvelteKit 2/Svelte 5 frontend with schedule calendar and hls.js player
- Phase 8: Multi-stream layout supporting up to 4 simultaneous HLS streams
- Update Dockerfile to multi-stage build (Node.js frontend + Python backend)
- Switch deployment to :latest tag with Always pull policy for CI-driven deploys
- Update Woodpecker CI to use explicit latest tag
This commit is contained in:
Viktor Barzin 2026-02-23 23:59:35 +00:00
parent 6867036087
commit 9fd788b158
19 changed files with 3843 additions and 17 deletions

View file

@ -1,6 +1,7 @@
"""F1 Streams - FastAPI backend with schedule, stream extraction, health checking, and HLS proxy."""
"""F1 Streams - FastAPI backend with schedule, stream extraction, health checking, HLS proxy, and token refresh."""
import logging
import os
from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
@ -8,11 +9,14 @@ from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from fastapi import FastAPI, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from starlette.responses import Response, StreamingResponse
from backend.extractors import create_extraction_service
from backend.proxy import proxy_playlist, relay_stream
from backend.schedule import ScheduleService
from backend.token_refresh import TokenRefreshManager
logging.basicConfig(
level=logging.INFO,
@ -22,9 +26,29 @@ logger = logging.getLogger(__name__)
schedule_service = ScheduleService()
extraction_service = create_extraction_service()
token_refresh_manager = TokenRefreshManager(extraction_service)
scheduler = AsyncIOScheduler()
# --- Pydantic models for request bodies ---
class ActivateStreamRequest(BaseModel):
"""Request body for POST /streams/activate."""
url: str
site_key: str = ""
class DeactivateStreamRequest(BaseModel):
"""Request body for POST /streams/deactivate."""
url: str
# --- Scheduled callbacks ---
async def _scheduled_refresh() -> None:
"""Callback for APScheduler daily schedule refresh."""
logger.info("Running scheduled schedule refresh...")
@ -71,6 +95,22 @@ async def _scheduled_extraction() -> None:
)
async def _scheduled_token_refresh() -> None:
"""Callback for APScheduler token refresh.
Only performs work when there are active streams. Re-runs extractors
to get fresh CDN tokens for streams being actively watched.
"""
if not token_refresh_manager.has_active_streams:
return
logger.info("Running scheduled token refresh...")
try:
await token_refresh_manager.refresh_active_streams()
except Exception:
logger.exception("Token refresh failed (non-fatal)")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown lifecycle handler."""
@ -99,8 +139,20 @@ async def lifespan(app: FastAPI):
replace_existing=True,
)
# Schedule token refresh every 4 minutes (safe margin for 5-min CDN tokens).
# The callback is a no-op when there are no active streams.
scheduler.add_job(
_scheduled_token_refresh,
trigger=IntervalTrigger(minutes=4),
id="token_refresh",
name="Refresh CDN tokens for active streams",
replace_existing=True,
)
scheduler.start()
logger.info("APScheduler started - schedule refresh at 03:00 UTC, extraction every 30m")
logger.info(
"APScheduler started - schedule refresh at 03:00 UTC, extraction every 30m, token refresh every 4m"
)
yield
@ -116,8 +168,8 @@ app = FastAPI(title="F1 Streams", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "OPTIONS"],
allow_headers=["Range"],
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["Range", "Content-Type"],
expose_headers=["Content-Range", "Content-Length", "Content-Type"],
)
@ -130,11 +182,6 @@ async def health():
return {"status": "ok"}
@app.get("/")
async def root():
return {"service": "f1-streams", "version": "4.0.0"}
# --- Schedule ---
@ -183,6 +230,79 @@ async def get_all_streams():
}
@app.post("/streams/activate")
async def activate_stream(body: ActivateStreamRequest):
"""Mark a stream as actively being watched.
When a stream is active, the token refresh manager will periodically
re-run the extractor that found it to get fresh CDN tokens before
they expire.
If site_key is not provided, attempts to look it up from the cached
streams.
Body:
{"url": "https://...", "site_key": "optional-site-key"}
"""
url = body.url
site_key = body.site_key
# If site_key not provided, try to look it up from cached streams
if not site_key:
for streams in extraction_service._cache.values():
for stream in streams:
if stream.url == url:
site_key = stream.site_key
break
if site_key:
break
if not site_key:
return {
"status": "error",
"detail": "Could not determine site_key for this URL. Provide it explicitly.",
}
token_refresh_manager.mark_stream_active(url, site_key)
return {
"status": "activated",
"url": url,
"site_key": site_key,
"active_count": len(token_refresh_manager.get_active_streams()),
}
@app.post("/streams/deactivate")
async def deactivate_stream(body: DeactivateStreamRequest):
"""Mark a stream as no longer being watched.
Stops the token refresh manager from refreshing CDN tokens for this stream.
Body:
{"url": "https://..."}
"""
token_refresh_manager.mark_stream_inactive(body.url)
return {
"status": "deactivated",
"url": body.url,
"active_count": len(token_refresh_manager.get_active_streams()),
}
@app.get("/streams/active")
async def get_active_streams():
"""List currently active streams with their refresh status.
Returns all streams that are being actively watched, including
their current (potentially refreshed) URLs and refresh counts.
"""
active = token_refresh_manager.get_active_streams()
return {
"streams": active,
"count": len(active),
}
@app.get("/extractors")
async def get_extractors():
"""List registered extractors and their current status."""
@ -220,6 +340,11 @@ def _get_proxy_base(request: Request) -> str:
async def proxy_endpoint(
request: Request,
url: str = Query(..., description="Base64url-encoded m3u8 playlist URL"),
quality: int | None = Query(
None,
description="0-based quality variant index (0=highest bandwidth). "
"Only applies to master playlists.",
),
):
"""Proxy an upstream m3u8 playlist with URI rewriting.
@ -229,11 +354,22 @@ async def proxy_endpoint(
The `url` parameter must be base64url-encoded to avoid URL encoding issues.
Example:
If `quality` is specified and the upstream is a master playlist (with
multiple quality variants), the proxy will fetch the selected variant's
media playlist directly instead of returning the master playlist.
Quality index 0 = highest bandwidth, 1 = second highest, etc.
Examples:
GET /proxy?url=aHR0cHM6Ly9leGFtcGxlLmNvbS9zdHJlYW0ubTN1OA
GET /proxy?url=aHR0cHM6Ly9leGFtcGxlLmNvbS9zdHJlYW0ubTN1OA&quality=0
"""
# Check if we have a fresher URL from token refresh
fresh_url = token_refresh_manager.get_fresh_url(url)
if fresh_url != url:
logger.info("Using refreshed URL from token manager")
proxy_base = _get_proxy_base(request)
rewritten = await proxy_playlist(url, proxy_base)
rewritten = await proxy_playlist(fresh_url, proxy_base, quality=quality)
return Response(
content=rewritten,
@ -273,6 +409,19 @@ async def relay_endpoint(
)
# --- Frontend Static Files ---
# Mount the SvelteKit static build AFTER all API routes so API endpoints take priority.
_frontend_dir = os.path.join(os.path.dirname(__file__), "..", "frontend", "build")
if os.path.exists(_frontend_dir):
app.mount("/", StaticFiles(directory=_frontend_dir, html=True), name="frontend")
logger.info("Serving frontend from %s", _frontend_dir)
else:
# Fallback root when no frontend build exists
@app.get("/")
async def root():
return {"service": "f1-streams", "version": "5.0.0"}
if __name__ == "__main__":
import uvicorn