[ci skip] f1-stream: add F1 schedule subsystem (Phase 2)

- Fetch 2026 F1 race calendar from jolpica API with all sessions
  (FP1-3, Qualifying, Sprint, Race) and UTC timestamps
- Persist schedule to NFS as JSON, load on startup if fresh
- APScheduler daily refresh at 03:00 UTC
- GET /schedule endpoint with live/upcoming/past session status
- POST /schedule/refresh for manual refresh trigger
This commit is contained in:
Viktor Barzin 2026-02-23 22:55:13 +00:00
parent f423a4d60c
commit d7d347de27
No known key found for this signature in database
GPG key ID: 0EB088298288D958
6 changed files with 305 additions and 2 deletions

View file

@ -1,6 +1,54 @@
"""F1 Streams - FastAPI backend with schedule service."""
import logging
from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from fastapi import FastAPI
app = FastAPI(title="F1 Streams")
from backend.schedule import ScheduleService
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
schedule_service = ScheduleService()
scheduler = AsyncIOScheduler()
async def _scheduled_refresh() -> None:
"""Callback for APScheduler daily refresh."""
logger.info("Running scheduled schedule refresh...")
await schedule_service.refresh()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown lifecycle handler."""
# Startup: load schedule and start background scheduler
await schedule_service.initialize()
scheduler.add_job(
_scheduled_refresh,
trigger=CronTrigger(hour=3, minute=0, timezone="UTC"),
id="daily_schedule_refresh",
name="Refresh F1 schedule daily at 03:00 UTC",
replace_existing=True,
)
scheduler.start()
logger.info("APScheduler started - daily refresh at 03:00 UTC")
yield
# Shutdown
scheduler.shutdown(wait=False)
logger.info("APScheduler shut down")
app = FastAPI(title="F1 Streams", lifespan=lifespan)
@app.get("/health")
@ -13,6 +61,19 @@ async def root():
return {"service": "f1-streams", "version": "2.0.1"}
@app.get("/schedule")
async def get_schedule():
"""Return the F1 race schedule for the current season with session statuses."""
return schedule_service.get_schedule()
@app.post("/schedule/refresh")
async def refresh_schedule():
"""Manually trigger a schedule refresh from the jolpica API."""
await schedule_service.refresh()
return {"status": "refreshed"}
if __name__ == "__main__":
import uvicorn

View file

@ -1,2 +1,4 @@
fastapi==0.115.0
uvicorn[standard]
httpx>=0.27.0
apscheduler>=3.10.0,<4.0

View file

@ -0,0 +1,240 @@
"""F1 Schedule Service - fetches, caches, and serves the F1 race calendar."""
import json
import logging
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
import httpx
logger = logging.getLogger(__name__)
JOLPICA_API_URL = "https://api.jolpi.ca/ergast/f1/current.json"
SCHEDULE_PATH = Path(os.getenv("SCHEDULE_PATH", "/data/schedule.json"))
STALE_THRESHOLD = timedelta(hours=24)
# Typical session durations in minutes
SESSION_DURATIONS = {
"fp1": 60,
"fp2": 60,
"fp3": 60,
"qualifying": 60,
"sprint_qualifying": 30,
"sprint": 30,
"race": 120,
}
def _parse_session_datetime(session: dict[str, str] | None) -> str | None:
"""Parse a session dict with 'date' and 'time' fields into an ISO 8601 UTC string."""
if not session or "date" not in session or "time" not in session:
return None
# Time format from API: "14:30:00Z"
time_str = session["time"].rstrip("Z")
return f"{session['date']}T{time_str}+00:00"
def _parse_race(race: dict[str, Any]) -> dict[str, Any]:
"""Transform a raw jolpica/Ergast race object into our internal format."""
circuit = race.get("Circuit", {})
location = circuit.get("Location", {})
# Build session list
sessions = []
# Map API keys to our session types, in chronological order for a race weekend
session_map = [
("FirstPractice", "fp1", "FP1"),
("SecondPractice", "fp2", "FP2"),
("ThirdPractice", "fp3", "FP3"),
("SprintQualifying", "sprint_qualifying", "Sprint Qualifying"),
("SprintShootout", "sprint_qualifying", "Sprint Qualifying"),
("Sprint", "sprint", "Sprint"),
("Qualifying", "qualifying", "Qualifying"),
]
seen_types = set()
for api_key, session_type, display_name in session_map:
if api_key in race and session_type not in seen_types:
dt_str = _parse_session_datetime(race[api_key])
if dt_str:
sessions.append(
{
"type": session_type,
"name": display_name,
"start_utc": dt_str,
"duration_minutes": SESSION_DURATIONS.get(session_type, 60),
}
)
seen_types.add(session_type)
# Race session itself (date and time are top-level)
race_dt = _parse_session_datetime({"date": race.get("date", ""), "time": race.get("time", "")})
if race_dt:
sessions.append(
{
"type": "race",
"name": "Race",
"start_utc": race_dt,
"duration_minutes": SESSION_DURATIONS["race"],
}
)
# Sort sessions chronologically
sessions.sort(key=lambda s: s["start_utc"])
return {
"round": int(race.get("round", 0)),
"race_name": race.get("raceName", ""),
"circuit": circuit.get("circuitName", ""),
"circuit_id": circuit.get("circuitId", ""),
"country": location.get("country", ""),
"locality": location.get("locality", ""),
"date": race.get("date", ""),
"url": race.get("url", ""),
"sessions": sessions,
}
def _compute_session_status(session: dict[str, Any], now: datetime) -> str:
"""Determine if a session is 'past', 'live', or 'upcoming'."""
try:
start = datetime.fromisoformat(session["start_utc"])
except (ValueError, KeyError):
return "upcoming"
duration = timedelta(minutes=session.get("duration_minutes", 60))
end = start + duration
if now >= end:
return "past"
elif now >= start:
return "live"
else:
return "upcoming"
class ScheduleService:
"""Manages the F1 schedule: fetching, caching, and serving."""
def __init__(self) -> None:
self._schedule: dict[str, Any] | None = None
async def fetch_schedule(self) -> dict[str, Any]:
"""Fetch the current season schedule from the jolpica API."""
logger.info("Fetching F1 schedule from jolpica API...")
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(JOLPICA_API_URL)
response.raise_for_status()
data = response.json()
race_table = data.get("MRData", {}).get("RaceTable", {})
season = race_table.get("season", "")
raw_races = race_table.get("Races", [])
races = [_parse_race(r) for r in raw_races]
schedule = {
"season": season,
"fetched_at": datetime.now(timezone.utc).isoformat(),
"races": races,
}
self._schedule = schedule
logger.info("Fetched schedule for %s season: %d races", season, len(races))
return schedule
def load_from_disk(self) -> bool:
"""Load schedule from NFS-backed JSON file. Returns True if loaded successfully."""
if not SCHEDULE_PATH.exists():
logger.info("No cached schedule found at %s", SCHEDULE_PATH)
return False
try:
with open(SCHEDULE_PATH, "r") as f:
self._schedule = json.load(f)
logger.info("Loaded cached schedule from %s", SCHEDULE_PATH)
return True
except (json.JSONDecodeError, OSError) as e:
logger.warning("Failed to load cached schedule: %s", e)
return False
def save_to_disk(self) -> None:
"""Persist current schedule to NFS-backed JSON file."""
if not self._schedule:
logger.warning("No schedule data to save")
return
try:
SCHEDULE_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(SCHEDULE_PATH, "w") as f:
json.dump(self._schedule, f, indent=2)
logger.info("Saved schedule to %s", SCHEDULE_PATH)
except OSError as e:
logger.error("Failed to save schedule to disk: %s", e)
def is_stale(self) -> bool:
"""Check if the cached schedule data is older than the stale threshold."""
if not self._schedule:
return True
fetched_at_str = self._schedule.get("fetched_at")
if not fetched_at_str:
return True
try:
fetched_at = datetime.fromisoformat(fetched_at_str)
return datetime.now(timezone.utc) - fetched_at > STALE_THRESHOLD
except ValueError:
return True
def get_schedule(self) -> dict[str, Any]:
"""Return the current schedule with computed session statuses."""
if not self._schedule:
return {"season": "", "races": [], "error": "No schedule data available"}
now = datetime.now(timezone.utc)
races = []
for race in self._schedule.get("races", []):
sessions = []
for session in race.get("sessions", []):
sessions.append(
{
**session,
"status": _compute_session_status(session, now),
}
)
races.append(
{
**race,
"sessions": sessions,
}
)
return {
"season": self._schedule.get("season", ""),
"fetched_at": self._schedule.get("fetched_at", ""),
"races": races,
}
async def refresh(self) -> None:
"""Fetch fresh schedule and persist to disk. Falls back to cached data on error."""
try:
await self.fetch_schedule()
self.save_to_disk()
except httpx.HTTPError as e:
logger.error("Failed to refresh schedule from API: %s", e)
if not self._schedule:
logger.warning("No cached data available either - schedule will be empty")
except Exception:
logger.exception("Unexpected error during schedule refresh")
async def initialize(self) -> None:
"""Load from disk on startup and refresh if stale."""
self.load_from_disk()
if self.is_stale():
await self.refresh()

View file

@ -36,7 +36,7 @@ resource "kubernetes_deployment" "f1-stream" {
}
spec {
container {
image = "viktorbarzin/f1-stream:v2.0.1"
image = "viktorbarzin/f1-stream:v2.0.3"
name = "f1-stream"
resources {
limits = {