f1-stream: consume Forgejo-registry image; drop in-monorepo source

The actively-developed f1-stream (infra files/ copy: 12 active extractors +
Playwright/chrome-service verifier) is now its own repo viktor/f1-stream and is
the deployed app (replacing the stale March github build).

- main.tf: image -> forgejo.viktorbarzin.me/viktor/f1-stream:${var.image_tag}
  + image_pull_secrets registry-credentials. Image stays in KEEL_IGNORE_IMAGE.
- Remove stacks/f1-stream/files/ (source now in viktor/f1-stream).
- docs/plans: extraction design + plan pair.

Applied via tg + kubectl set image to forgejo:24857a82; live /health green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-06-05 06:51:22 +00:00
parent 99f9bf8d89
commit e8bfb4d06b
51 changed files with 131 additions and 9556 deletions

View file

@ -1,93 +0,0 @@
"""Stream extraction framework.
To add a new extractor:
1. Create a new file in this package (e.g., my_site.py)
2. Subclass BaseExtractor from backend.extractors.base
3. Implement site_key, site_name, and extract()
4. Import and register it in this file's create_registry() function
Example:
from backend.extractors.my_site import MySiteExtractor
registry.register(MySiteExtractor())
"""
from backend.extractors.aceztrims import AceztrimsExtractor
from backend.extractors.chrome_browser import ChromeBrowserExtractor
from backend.extractors.curated import CuratedExtractor
from backend.extractors.dd12 import DD12Extractor
from backend.extractors.hmembeds import HmembedsExtractor
from backend.extractors.stremio import StremioAddonExtractor
from backend.extractors.subreddit import SubredditExtractor
from backend.extractors.daddylive import DaddyLiveExtractor
from backend.extractors.discord_source import DiscordExtractor
from backend.extractors.models import ExtractedStream
from backend.extractors.pitsport import PitsportExtractor
from backend.extractors.ppv import PPVExtractor
from backend.extractors.registry import ExtractorRegistry
from backend.extractors.service import ExtractionService
from backend.extractors.streamed import StreamedExtractor
from backend.extractors.timstreams import TimStreamsExtractor
__all__ = [
"ExtractedStream",
"ExtractorRegistry",
"ExtractionService",
"create_registry",
"create_extraction_service",
]
def create_registry() -> ExtractorRegistry:
"""Create and populate the extractor registry with all known extractors.
Add new extractors here by importing and registering them.
"""
registry = ExtractorRegistry()
# --- Register extractors below ---
# CuratedExtractor previously surfaced two hmembeds 24/7 channels (Sky
# Sports F1, DAZN F1) but their JW Player decoder produces an empty
# playlist in our environment (error 102630) regardless of headed mode,
# IP, or fingerprint we tried. The streams loaded the upstream's ad
# overlay but never produced a video element, so they confused users —
# disabled until/unless we find a working bypass.
# registry.register(CuratedExtractor())
registry.register(StreamedExtractor())
# ChromeBrowserExtractor drives the in-cluster chrome-service via the
# CHROME_WS_URL / CHROME_WS_TOKEN env vars to scrape JS-rendered
# pages whose m3u8 is computed at runtime.
registry.register(ChromeBrowserExtractor())
# SubredditExtractor pulls live-stream posts from motorsport subreddits.
# Returns embed-type streams; the verifier will visit each via
# chrome-service to confirm playability.
registry.register(SubredditExtractor())
# DD12Extractor scrapes DD12Streams' per-channel pages for the inline
# JW Player file URL. The site embeds the m3u8 in HTML so curl-based
# parsing is enough — no browser needed.
registry.register(DD12Extractor())
# HmembedsExtractor offline-decodes hmembeds.one JWT m3u8 URLs
# (base64+XOR with hardcoded key per page; reverse-engineered
# 2026-05-07). Verifier filters dead origins.
registry.register(HmembedsExtractor())
# StremioAddonExtractor calls Stremio addon HTTP APIs (TvVoo, StremVerse)
# which already index Sky F1 / DAZN F1 / Vavoo IPTV channels. No
# Stremio client needed — just /stream/<type>/<id>.json calls.
registry.register(StremioAddonExtractor())
registry.register(DaddyLiveExtractor())
registry.register(AceztrimsExtractor())
registry.register(PitsportExtractor())
registry.register(PPVExtractor())
registry.register(TimStreamsExtractor())
registry.register(DiscordExtractor())
return registry
def create_extraction_service() -> ExtractionService:
"""Create an ExtractionService with all extractors registered.
This is the main entry point for the extraction framework.
Call this once during app startup.
"""
registry = create_registry()
return ExtractionService(registry)

View file

@ -1,122 +0,0 @@
"""Aceztrims extractor — scrapes embed URLs from acestrlms.pages.dev/f11/.
The page (Cloudflare Pages, no anti-bot) hosts an iframe + a strip of
onclick channel-switcher buttons. Each button rewrites the iframe via
`document.getElementById('iframe').src = '<embed_url>'`. The initial
channel is hard-coded as `<iframe id='iframe' src='...'>`.
We strip HTML comments first because the page keeps ~20 legacy channel
buttons inside `<!-- ... -->` blocks for easy re-enablement; the previous
loose regex picked them up as false positives.
All channels are iframe embeds (no direct m3u8) `stream_type='embed'`.
Site naming note: the extractor key stays `aceztrims` (the previous
domain) so registry/cache identifiers don't churn. The current domain
is `acestrlms.pages.dev` and the F1 path is `/f11/` (two ones `/f1/`
is the cross-sport schedule page and has no stream buttons).
"""
import logging
import re
import httpx
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
BASE_URL = "https://acestrlms.pages.dev"
F1_PAGES = [
("/f11/", "Formula 1"),
]
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
# `document.getElementById('iframe').src = '<URL>'` — current channel-switcher format.
_ONCLICK_IFRAME_SRC = re.compile(
r"""document\.getElementById\(['"]iframe['"]\)\.src\s*=\s*['"]([^'"]+)['"]""",
re.IGNORECASE,
)
# `<iframe id='iframe' src='<URL>'>` — the default/initial channel.
_DEFAULT_IFRAME = re.compile(
r"""<iframe[^>]*id\s*=\s*['"]iframe['"][^>]*src\s*=\s*['"]([^'"]+)['"]""",
re.IGNORECASE,
)
_HTML_COMMENT = re.compile(r"<!--.*?-->", re.DOTALL)
class AceztrimsExtractor(BaseExtractor):
"""Pulls iframe embed URLs out of the acestrlms.pages.dev F1 page."""
@property
def site_key(self) -> str:
return "aceztrims"
@property
def site_name(self) -> str:
return "Aceztrims"
async def extract(self) -> list[ExtractedStream]:
streams: list[ExtractedStream] = []
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": USER_AGENT},
) as client:
for path, category in F1_PAGES:
try:
streams.extend(await self._scrape_page(client, path, category))
except Exception:
logger.exception("[aceztrims] Failed to scrape %s", path)
logger.info("[aceztrims] Extracted %d stream(s)", len(streams))
return streams
async def _scrape_page(
self, client: httpx.AsyncClient, path: str, category: str
) -> list[ExtractedStream]:
url = f"{BASE_URL}{path}"
resp = await client.get(url)
if resp.status_code != 200:
logger.warning(
"[aceztrims] %s returned HTTP %d", path, resp.status_code
)
return []
# The page keeps a block of legacy channel buttons inside
# `<!-- ... -->` for quick re-enablement. Strip comments first so
# the regex only sees live buttons.
html = _HTML_COMMENT.sub("", resp.text)
seen: set[str] = set()
streams: list[ExtractedStream] = []
for pattern in (_DEFAULT_IFRAME, _ONCLICK_IFRAME_SRC):
for match in pattern.finditer(html):
embed_url = match.group(1).strip()
if not embed_url or embed_url in seen:
continue
seen.add(embed_url)
streams.append(
ExtractedStream(
url=embed_url,
site_key=self.site_key,
site_name=self.site_name,
quality="",
title=f"{category} Stream",
stream_type="embed",
embed_url=embed_url,
)
)
logger.info(
"[aceztrims] Found %d stream(s) on %s", len(streams), path
)
return streams

View file

@ -1,118 +0,0 @@
"""Base class for all site-specific stream extractors."""
import logging
from abc import ABC, abstractmethod
import httpx
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
class BaseExtractor(ABC):
"""Abstract base class for site-specific stream extractors.
To create a new extractor:
1. Create a new file in backend/extractors/
2. Subclass BaseExtractor
3. Implement site_key, site_name, and extract()
4. Register it in backend/extractors/__init__.py
"""
@property
@abstractmethod
def site_key(self) -> str:
"""Unique identifier for this site (e.g., 'sportsurge').
Must be lowercase, alphanumeric with hyphens/underscores only.
Used as the cache key and in API responses.
"""
@property
@abstractmethod
def site_name(self) -> str:
"""Human-readable name (e.g., 'SportSurge').
Displayed in the UI and API responses.
"""
@abstractmethod
async def extract(self) -> list[ExtractedStream]:
"""Extract stream URLs from this site.
Returns a list of ExtractedStream objects. Each represents a
discovered stream URL. The extractor should set url, quality,
and title fields; site_key, site_name, and extracted_at are
auto-populated if left empty.
Implementations should:
- Use httpx for HTTP requests
- Handle their own errors gracefully (log and return empty list)
- Set quality when detectable from the source
- Set title to something descriptive
"""
async def health_check(self, url: str) -> bool:
"""Verify a URL is live (HEAD request, check for m3u8 content).
Sends a HEAD request and checks:
1. HTTP 200 response
2. Content-Type suggests HLS/media content (if available)
Returns True if the URL appears to be a live stream.
"""
try:
async with httpx.AsyncClient(
timeout=10.0,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0"},
) as client:
response = await client.head(url)
if response.status_code != 200:
logger.debug(
"[%s] Health check failed for %s: HTTP %d",
self.site_key,
url,
response.status_code,
)
return False
content_type = response.headers.get("content-type", "").lower()
# m3u8 streams typically have these content types
live_indicators = [
"application/vnd.apple.mpegurl",
"application/x-mpegurl",
"video/",
"audio/",
"octet-stream",
]
# If content-type is present and doesn't look like media,
# the URL might not be a stream. But some servers don't set
# content-type properly for HEAD, so we still return True
# if content-type is missing or generic.
if content_type and not any(ind in content_type for ind in live_indicators):
# Content type present but doesn't look like media.
# Could still be valid (some servers return text/plain for m3u8).
if "text/" in content_type or "html" in content_type:
logger.debug(
"[%s] Health check suspect for %s: content-type=%s",
self.site_key,
url,
content_type,
)
return False
return True
except httpx.TimeoutException:
logger.debug("[%s] Health check timed out for %s", self.site_key, url)
return False
except httpx.HTTPError as e:
logger.debug("[%s] Health check error for %s: %s", self.site_key, url, e)
return False
except Exception:
logger.exception("[%s] Unexpected error during health check for %s", self.site_key, url)
return False

View file

@ -1,247 +0,0 @@
"""Generic chrome-service-driven extractor.
Drives the in-cluster headed Chromium pool (chrome-service) to load a list
of stream/aggregator pages, captures any HLS playlist URL the page fetches
at runtime, and returns one ExtractedStream per discovered playlist.
Unlike the API-based extractors (pitsport/streamed/ppv) this one handles
sites where the m3u8 is computed by JavaScript at page load time the
URL only exists after the page evaluates an obfuscated decoder, fetches a
token, etc. Curl can't see it; a real browser can.
Add new targets via the `TARGETS` constant below. Each entry is a (label,
title, page_url) tuple. The extractor visits each URL with a stealthed
context, waits for the JS to settle, and yields any captured HLS URL.
"""
import asyncio
import logging
import os
import re
import urllib.parse
from dataclasses import dataclass
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
# Best-effort pause between navigation and capture. The decoder usually
# fires within 5s; 12s gives slow JS time to settle without dragging the
# extraction round.
DEFAULT_SETTLE_SECONDS = 12
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/17.4 Safari/605.1.15"
)
@dataclass(frozen=True)
class _Target:
label: str # site_name (homepage label in the UI)
title: str # human-readable stream title
url: str # page to navigate
settle: int = DEFAULT_SETTLE_SECONDS
# ---------------------------------------------------------------------------
# Target list. F1-relevant 24/7 channels and motorsport aggregator pages
# whose m3u8 is JS-computed. Add freely — each one takes ~12s to scrape.
# ---------------------------------------------------------------------------
TARGETS: tuple[_Target, ...] = (
# MotoMundo embed pages — the community-curated WordPress site for
# MotoGP. Each /e/<id> URL is one of the iframes their "Watch Online"
# post lists for the active session (FP/Q/Race). The m3u8 is
# JS-computed at load time so a real browser is required to capture
# it. Update IDs each weekend to match the current race; subreddit.py
# discovers them from the Reddit "[Watch / Download]" thread.
_Target(
label="MotoMundo",
title="MotoGP Live (MotoMundo) — French GP / Le Mans",
url="https://motomundo.top/e/9yzn08jk9py4",
settle=15,
),
_Target(
label="MotoMundo",
title="MotoGP Live (MotoMundo upns) — French GP / Le Mans",
url="https://motomundo.upns.xyz/#kqasde",
settle=15,
),
)
# Heuristic to recognise an HLS playlist URL from network capture. Most CDNs
# use `.m3u8`; some (pushembdz/oe1.ossfeed) disguise the playlist as `.css`
# under a /out/v… or /hls/ path. Filter out obvious junk (.css for actual
# stylesheets, .ts segments — we only want the playlist).
_HLS_URL_RE = re.compile(r"\.m3u8(\?|$)|/out/v[0-9]+/.+\.css(\?|$)|/hls/.+/master\.css(\?|$)")
_SEGMENT_EXT_RE = re.compile(r"\.(ts|m4s|aac|key)(\?|$)")
def _looks_like_hls_playlist(url: str) -> bool:
if _SEGMENT_EXT_RE.search(url):
return False
return bool(_HLS_URL_RE.search(url))
def _resolve_chrome_cdp() -> str | None:
"""Resolve the CHROME_CDP_URL env var (set by f1-stream's TF stack).
Migrated 2026-06-04 from CHROME_WS_URL/CHROME_WS_TOKEN. chrome-service
now runs chromium directly with CDP exposed on :9222 so its persistent
user-data-dir actually persists cookies (the old playwright launch-server
pattern created ephemeral contexts per `connect()`). NetworkPolicy
(labelled client namespaces only) is the only gate no path token.
"""
return os.getenv("CHROME_CDP_URL")
class ChromeBrowserExtractor(BaseExtractor):
"""Drive chrome-service to capture m3u8 URLs from JS-heavy pages."""
@property
def site_key(self) -> str:
return "chrome-browser"
@property
def site_name(self) -> str:
return "Chrome Browser"
async def extract(self) -> list[ExtractedStream]:
cdp_url = _resolve_chrome_cdp()
if not cdp_url:
logger.warning(
"[chrome-browser] CHROME_CDP_URL not set — extractor disabled"
)
return []
try:
from playwright.async_api import async_playwright
except ImportError:
logger.warning("[chrome-browser] playwright not installed — disabled")
return []
# One Playwright instance + one browser connection per extraction
# round. Contexts are cheap; the browser is shared.
async with async_playwright() as p:
try:
browser = await p.chromium.connect_over_cdp(cdp_url, timeout=15_000)
except Exception:
logger.exception("[chrome-browser] CDP connect to chrome-service failed")
return []
results: list[ExtractedStream] = []
for target in TARGETS:
try:
stream = await self._scrape(browser, target)
if stream:
results.append(stream)
except Exception:
logger.exception(
"[chrome-browser] failed to scrape %s", target.url
)
try:
await browser.close()
except Exception:
pass
logger.info("[chrome-browser] returned %d stream(s)", len(results))
return results
async def _scrape(self, browser, target: _Target) -> ExtractedStream | None:
ctx = await browser.new_context(
user_agent=USER_AGENT,
viewport={"width": 1280, "height": 720},
bypass_csp=True,
)
# Inject the same stealth script the verifier uses so anti-bot
# checks don't trip the page before its decoder runs.
try:
from backend.stealth import STEALTH_JS
await ctx.add_init_script(STEALTH_JS)
except Exception:
pass
page = await ctx.new_page()
captured: list[str] = []
def on_response(resp):
try:
if _looks_like_hls_playlist(resp.url):
captured.append(resp.url)
except Exception:
pass
page.on("response", on_response)
# Some pages (DD12 variants) load the player in a child iframe;
# frame events catch nested navigations.
page.on(
"framenavigated",
lambda fr: captured.append(fr.url) if _looks_like_hls_playlist(fr.url) else None,
)
try:
await page.goto(target.url, wait_until="domcontentloaded", timeout=20_000)
except Exception as e:
logger.debug("[chrome-browser] %s goto failed: %s", target.url, e)
await ctx.close()
return None
# Let the page's JS settle.
await asyncio.sleep(target.settle)
# Also probe child iframes — `pushembdz`, `pooembed`, `embedsports`
# all live behind one. Collect any HLS URL the iframes loaded.
for fr in page.frames:
if fr is page.main_frame:
continue
try:
# JW Player and Clappr both expose the playing source via
# a <video>/`<source>` element after setup completes.
sources = await fr.evaluate(
"() => Array.from(document.querySelectorAll('video, source')).map(e => e.currentSrc || e.src || '').filter(s => s.includes('.m3u8') || s.includes('.css'))"
)
for s in sources:
if _looks_like_hls_playlist(s):
captured.append(s)
except Exception:
pass
await ctx.close()
# Pick the first plausible URL (any subsequent are usually variant
# playlists referenced from the master). Prefer URLs that look like
# full master playlists.
unique = list(dict.fromkeys(captured))
if not unique:
logger.debug("[chrome-browser] %s yielded no HLS URL", target.url)
return None
# Prefer URLs that look like a master/index playlist over variant
# playlists when both are captured.
master = next(
(u for u in unique if "master" in u.lower() or "index" in u.lower()),
unique[0],
)
# Strip query strings on URLs that include short-lived tokens —
# the verifier and frontend re-resolve them per request.
# (Some CDNs require the query though; only strip when obvious.)
m3u8 = master
# Decode URL-encoded characters so the proxy gets a clean URL.
m3u8 = urllib.parse.unquote(m3u8)
logger.info(
"[chrome-browser] %s -> %s",
target.url, m3u8[:120],
)
return ExtractedStream(
url=m3u8,
site_key=self.site_key,
site_name=target.label,
quality="",
title=target.title,
stream_type="m3u8",
)

View file

@ -1,61 +0,0 @@
"""Curated extractor — known-good 24/7 F1 channels via direct embed URLs.
Returns a small, hand-picked list of embed URLs that are reliable enough to
be served as fallback "always-on" streams when the dynamic extractors find
nothing (e.g. between race weekends, when API providers are down).
These are direct embed URLs. The frontend routes them through /embed so the
iframe-stripping proxy bypasses any frame-buster JS in the upstream player.
"""
import logging
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
# Curated list. Each entry is a known direct embed URL. These were sourced
# from the timstreams.py ALWAYS_INCLUDE_HASHES list (Sky Sports F1, DAZN F1)
# and are documented as 24/7 channels that play F1 content year-round.
_CURATED_STREAMS = [
{
"url": "https://hmembeds.one/embed/888520f36cd94c5da4c71fddc1a5fc9b",
"title": "Sky Sports F1 (24/7)",
"quality": "HD",
},
{
"url": "https://hmembeds.one/embed/fc3a54634d0867b0c02ee3223292e7c6",
"title": "DAZN F1 (24/7)",
"quality": "HD",
},
]
class CuratedExtractor(BaseExtractor):
"""Returns curated known-good 24/7 F1 channel embed URLs."""
@property
def site_key(self) -> str:
return "curated"
@property
def site_name(self) -> str:
return "Curated 24/7 Channels"
async def extract(self) -> list[ExtractedStream]:
streams = [
ExtractedStream(
url=entry["url"],
site_key=self.site_key,
site_name=self.site_name,
quality=entry["quality"],
title=entry["title"],
stream_type="embed",
embed_url=entry["url"],
)
for entry in _CURATED_STREAMS
]
logger.info("[curated] Returning %d curated stream(s)", len(streams))
return streams

View file

@ -1,181 +0,0 @@
"""DaddyLive extractor - extracts m3u8 streams from DaddyLive for F1 channels.
Extraction chain:
1. Fetch stream page parse iframe src
2. Fetch player page XOR-decode auth params (key=109)
3. Call server lookup API get server_key
4. Construct m3u8 URL from server_key + channel key
"""
import logging
import re
import httpx
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
# F1-relevant channel IDs on DaddyLive
F1_CHANNELS = {
60: "Sky Sports F1 UK",
}
DLHD_BASE = "https://dlhd.link"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
XOR_KEY = 109
def _xor_decode(encoded: str) -> str:
"""XOR-decode a string using key 109."""
return "".join(chr(ord(c) ^ XOR_KEY) for c in encoded)
class DaddyLiveExtractor(BaseExtractor):
"""Extracts m3u8 streams from DaddyLive for Sky Sports F1.
The extraction chain requires maintaining referer headers throughout:
1. Fetch stream page at dlhd.link
2. Parse iframe src pointing to the player page
3. XOR-decode auth params from the player page to get channelKey
4. Call server lookup API to get server_key
5. Construct the final m3u8 URL
"""
@property
def site_key(self) -> str:
return "daddylive"
@property
def site_name(self) -> str:
return "DaddyLive"
async def extract(self) -> list[ExtractedStream]:
"""Extract m3u8 URLs for all configured F1 channels."""
streams: list[ExtractedStream] = []
for channel_id, channel_name in F1_CHANNELS.items():
try:
stream = await self._extract_channel(channel_id, channel_name)
if stream:
streams.append(stream)
except Exception:
logger.exception(
"[daddylive] Failed to extract channel %d (%s)",
channel_id,
channel_name,
)
logger.info("[daddylive] Extracted %d stream(s)", len(streams))
return streams
async def _extract_channel(
self, channel_id: int, channel_name: str
) -> ExtractedStream | None:
"""Extract a single channel's m3u8 URL through the full chain."""
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": USER_AGENT},
) as client:
# Step 1: Fetch stream page and parse iframe src
stream_page_url = f"{DLHD_BASE}/stream/stream-{channel_id}.php"
resp = await client.get(
stream_page_url,
headers={"Referer": f"{DLHD_BASE}/"},
)
if resp.status_code != 200:
logger.warning(
"[daddylive] Stream page returned HTTP %d for channel %d",
resp.status_code,
channel_id,
)
return None
# Parse iframe src from the stream page
iframe_match = re.search(
r'<iframe[^>]+src=["\']([^"\']+)["\']', resp.text, re.IGNORECASE
)
if not iframe_match:
logger.warning(
"[daddylive] No iframe found on stream page for channel %d",
channel_id,
)
return None
player_url = iframe_match.group(1)
if player_url.startswith("//"):
player_url = "https:" + player_url
logger.debug("[daddylive] Player URL for channel %d: %s", channel_id, player_url)
# Step 2: Fetch player page and extract XOR-encoded params
resp = await client.get(
player_url,
headers={"Referer": stream_page_url},
)
if resp.status_code != 200:
logger.warning(
"[daddylive] Player page returned HTTP %d for channel %d",
resp.status_code,
channel_id,
)
return None
# Look for the channel key - the XOR-encoded value that decodes to premium{id}
# Try to find the encoded channel parameter in the page
channel_key = f"premium{channel_id}"
# Step 3: Call server lookup API
lookup_url = f"https://chevy.vovlacosa.sbs/server_lookup?channel_id={channel_key}"
resp = await client.get(
lookup_url,
headers={"Referer": player_url},
)
if resp.status_code != 200:
logger.warning(
"[daddylive] Server lookup returned HTTP %d for channel %d",
resp.status_code,
channel_id,
)
return None
try:
lookup_data = resp.json()
server_key = lookup_data.get("server_key", "")
except Exception:
logger.warning(
"[daddylive] Failed to parse server lookup response for channel %d",
channel_id,
)
return None
if not server_key:
logger.warning(
"[daddylive] No server_key in lookup response for channel %d",
channel_id,
)
return None
# Step 4: Construct m3u8 URL
m3u8_url = (
f"https://chevy.adsfadfds.cfd/proxy/{server_key}/{channel_key}/mono.css"
)
logger.info(
"[daddylive] Constructed m3u8 for channel %d: %s", channel_id, m3u8_url
)
return ExtractedStream(
url=m3u8_url,
site_key=self.site_key,
site_name=self.site_name,
quality="HD",
title=channel_name,
stream_type="m3u8",
)

View file

@ -1,111 +0,0 @@
"""DD12Streams extractor — scrapes inline m3u8 URLs from per-channel pages.
Each DD12 sport page (`/nas`, `/f1`, `/sky`, etc.) renders an iframe to
`/<channel>c1` which 302-redirects to `/new-<channel>/jwplayer`. That
page contains a JW Player setup with the m3u8 URL hard-coded inline:
playerInstance.setup({
file: "https://...b-cdn.net/.../master.m3u8",
...
});
The JW Player runtime fails in our cluster (same fingerprint trap as
hmembeds), but we don't need it — the file URL is in the HTML and any
browser with H.264 codecs can play it directly via hls.js.
Channel discovery: probe a known list. New ones can be added by checking
DD12's own homepage / nav.
"""
import logging
import re
import httpx
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
BASE = "https://dd12streams.com"
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/17.4 Safari/605.1.15"
)
# (path, channel_label, title). Add as DD12 surfaces new channels.
CHANNELS = (
("nas", "DD12Streams", "NASCAR Cup Series (24/7) — DD12"),
)
_FILE_URL_RE = re.compile(r"""file\s*:\s*["']([^"']+\.m3u8[^"']*)["']""")
class DD12Extractor(BaseExtractor):
@property
def site_key(self) -> str:
return "dd12"
@property
def site_name(self) -> str:
return "DD12Streams"
async def extract(self) -> list[ExtractedStream]:
results: list[ExtractedStream] = []
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": USER_AGENT},
) as client:
for path, label, title in CHANNELS:
try:
page_url = f"{BASE}/{path}"
resp = await client.get(page_url)
if resp.status_code != 200:
continue
iframe_path = self._extract_iframe(resp.text)
if not iframe_path:
continue
iframe_url = (
iframe_path
if iframe_path.startswith("http")
else f"{BASE}{iframe_path}"
)
iframe_resp = await client.get(
iframe_url, headers={"Referer": page_url}
)
if iframe_resp.status_code != 200:
continue
m3u8 = self._find_m3u8(iframe_resp.text)
if not m3u8:
continue
results.append(
ExtractedStream(
url=m3u8,
site_key=self.site_key,
site_name=label,
quality="",
title=title,
stream_type="m3u8",
)
)
except Exception:
logger.debug(
"[dd12] /%s extraction failed", path, exc_info=True
)
logger.info("[dd12] Extracted %d stream(s)", len(results))
return results
@staticmethod
def _extract_iframe(html: str) -> str | None:
m = re.search(
r'<iframe[^>]+id=["\']vplayer["\'][^>]+src=["\']([^"\']+)["\']',
html,
)
return m.group(1) if m else None
@staticmethod
def _find_m3u8(html: str) -> str | None:
m = _FILE_URL_RE.search(html)
return m.group(1) if m else None

View file

@ -1,75 +0,0 @@
"""Demo extractor - returns hardcoded test streams for framework testing.
This extractor exists purely for testing the extraction pipeline end-to-end.
It does NOT connect to any real streaming site. Disable it in production by
removing its registration from __init__.py or setting DEMO_EXTRACTOR_ENABLED=false.
"""
import logging
import os
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
# Set DEMO_EXTRACTOR_ENABLED=false to disable this extractor
DEMO_ENABLED = os.getenv("DEMO_EXTRACTOR_ENABLED", "true").lower() in ("true", "1", "yes")
class DemoExtractor(BaseExtractor):
"""Demo extractor that returns hardcoded test streams.
Use this to verify the extraction framework works end-to-end without
needing a real streaming site. The streams are publicly available HLS
test streams from Apple and others.
"""
@property
def site_key(self) -> str:
return "demo"
@property
def site_name(self) -> str:
return "Demo (Test Streams)"
async def extract(self) -> list[ExtractedStream]:
"""Return hardcoded test streams for framework testing."""
if not DEMO_ENABLED:
logger.info("[demo] Demo extractor is disabled via DEMO_EXTRACTOR_ENABLED")
return []
logger.info("[demo] Returning demo test streams")
streams = [
ExtractedStream(
url="https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8",
site_key=self.site_key,
site_name=self.site_name,
quality="720p",
title="Big Buck Bunny (Test Stream)",
is_live=False,
),
ExtractedStream(
url="https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_16x9/bipbop_16x9_variant.m3u8",
site_key=self.site_key,
site_name=self.site_name,
quality="1080p",
title="Apple Bipbop (Test Stream)",
is_live=False,
),
ExtractedStream(
url="https://demo.unified-streaming.com/k8s/features/stable/video/tears-of-steel/tears-of-steel.ism/.m3u8",
site_key=self.site_key,
site_name=self.site_name,
quality="1080p",
title="Tears of Steel (Test Stream)",
is_live=False,
),
]
# Optionally run health checks on the demo streams
for stream in streams:
stream.is_live = await self.health_check(stream.url)
return streams

View file

@ -1,203 +0,0 @@
"""Discord extractor - monitors Discord channels for F1 stream links.
Reads recent messages from configured Discord channels using a user token,
extracts URLs that look like stream links, and returns them as embed streams.
"""
import logging
import os
import re
import httpx
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
DISCORD_API = "https://discord.com/api/v9"
DISCORD_TOKEN = os.getenv("DISCORD_TOKEN", "")
# Comma-separated channel IDs to monitor
DISCORD_CHANNELS = os.getenv("DISCORD_CHANNELS", "").split(",")
# How many messages to fetch per channel
MESSAGE_LIMIT = 50
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
# URL pattern to match stream links (exclude Discord CDN, images, etc.)
URL_PATTERN = re.compile(r"https?://[^\s<>\)\]\"']+", re.IGNORECASE)
# Domains that publish news/articles, not playable streams. Discord users share
# these links during race weekends; they are NOT streams and pollute the list.
EXCLUDED_DOMAINS = {
"discord.com", "discord.gg", "cdn.discordapp.com",
"tenor.com", "giphy.com", "imgur.com",
"youtube.com", "youtu.be", "twitter.com", "x.com",
"reddit.com", "instagram.com", "tiktok.com",
"fmhy.net", "github.com", "freemotorsports.com",
# News / official sites — never playable embeds
"formula1.com", "fia.com", "skysports.com", "motorsport.com",
"driverdb.com", "autosport.com", "the-race.com", "racefans.net",
"wikipedia.org", "fantasy.formula1.com",
}
# A URL is treated as a candidate stream embed only if its path looks like
# a *direct* player/embed page — `/embed/{id}`, `/player/{...}`, `*.m3u8`,
# `*.php` (legacy iframe1.php style). Aggregator landing pages
# (`/event/...`, `/watch?session=...`, etc.) are rejected because they
# show a list of links instead of playing automatically — those produce
# verifier-passing UI without actual playback.
_PATH_KEYWORDS = (
"/embed/", "/player/", ".m3u8", ".php",
)
def _is_stream_url(url: str) -> bool:
"""Heuristic: does this URL look like an actual stream/embed/player link?
Discord users share lots of news links during race weekends. The old
filter only blocked specific domains and let everything else through,
which produced a stream list dominated by formula1.com news articles.
The new filter is positive-match: a URL must contain at least one
stream-shaped path keyword to be included.
"""
from urllib.parse import urlparse
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
path = parsed.path.lower()
except Exception:
return False
if not domain:
return False
for excluded in EXCLUDED_DOMAINS:
if excluded in domain:
return False
if any(path.endswith(ext) for ext in (".png", ".jpg", ".jpeg", ".gif", ".webp", ".mp4", ".webm", ".svg", ".css", ".js")):
return False
full = path + ("?" + parsed.query if parsed.query else "")
if not any(kw in full for kw in _PATH_KEYWORDS):
return False
return True
class DiscordExtractor(BaseExtractor):
"""Extracts stream links from Discord channel messages.
Monitors configured Discord channels for URLs shared by users,
filters to likely stream links, and returns them as embed streams.
"""
@property
def site_key(self) -> str:
return "discord"
@property
def site_name(self) -> str:
return "Discord Community"
async def extract(self) -> list[ExtractedStream]:
"""Fetch recent messages from Discord channels and extract URLs."""
if not DISCORD_TOKEN:
logger.info("[discord] No DISCORD_TOKEN set, skipping")
return []
channels = [c.strip() for c in DISCORD_CHANNELS if c.strip()]
if not channels:
logger.info("[discord] No DISCORD_CHANNELS configured, skipping")
return []
streams: list[ExtractedStream] = []
seen_urls: set[str] = set()
try:
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={
"Authorization": DISCORD_TOKEN,
"User-Agent": USER_AGENT,
},
) as client:
for channel_id in channels:
try:
channel_streams = await self._fetch_channel(
client, channel_id, seen_urls
)
streams.extend(channel_streams)
except Exception:
logger.debug(
"[discord] Failed to fetch channel %s",
channel_id,
exc_info=True,
)
except Exception:
logger.exception("[discord] Failed to connect to Discord API")
logger.info("[discord] Extracted %d stream(s) from %d channel(s)", len(streams), len(channels))
return streams
async def _fetch_channel(
self,
client: httpx.AsyncClient,
channel_id: str,
seen_urls: set[str],
) -> list[ExtractedStream]:
"""Fetch messages from a single channel and extract stream URLs."""
resp = await client.get(
f"{DISCORD_API}/channels/{channel_id}/messages",
params={"limit": MESSAGE_LIMIT},
)
if resp.status_code != 200:
logger.warning(
"[discord] Channel %s returned HTTP %d", channel_id, resp.status_code
)
return []
messages = resp.json()
if not isinstance(messages, list):
return []
streams: list[ExtractedStream] = []
for msg in messages:
content = msg.get("content", "")
author = msg.get("author", {}).get("username", "unknown")
# Extract URLs from message content
urls = URL_PATTERN.findall(content)
# Also check embeds
for embed in msg.get("embeds", []):
if embed.get("url"):
urls.append(embed["url"])
for url in urls:
# Clean trailing punctuation
url = url.rstrip(".,;:!?)")
if url in seen_urls:
continue
if not _is_stream_url(url):
continue
seen_urls.add(url)
streams.append(
ExtractedStream(
url=url,
site_key=self.site_key,
site_name=self.site_name,
quality="",
title=f"Shared by {author}",
stream_type="embed",
embed_url=url,
)
)
return streams

View file

@ -1,131 +0,0 @@
"""hmembeds.one decoder + extractor.
Reverse-engineered 2026-05-07 (4-agent parallel session). The hmembeds
embed page contains an inline `<script>` block of the form:
var k = "<16-char ASCII key>";
var b = atob("<URI-encoded XOR-encrypted blob>");
var c = decodeURIComponent(escape(b));
var d = "";
for (var i = 0; i < c.length; i++)
d += String.fromCharCode(c.charCodeAt(i) ^ k.charCodeAt(i % k.length));
(new Function(d))();
The decoded `d` is plain JavaScript that calls `jwplayer('player').setup({
file: <m3u8_url>, ... })`. The `<m3u8_url>` is a JWT-bound URL on
`amsterdam-0183.zulo-0084.online/sec/<JWT>/<embed_id>.m3u8` where the
JWT pins the request to a /24 of the requestor's IP.
So: pure client-side decoding. No fingerprint check, no canvas hash, no
browser-derived input. We can produce the m3u8 URL with curl + Python
faster than launching Chromium.
**Caveat (2026-05-07 reality)**: the hmembeds backend issues JWT URLs
for the curated `888520f3...` (Sky Sports F1 24/7) and `fc3a5463...`
(DAZN F1 24/7) embeds, but the origin (`amsterdam-0183.zulo-0084.online`)
returns 404/403 on the m3u8 fetch from any IP we tested (cluster IPv4
176.12.22.x, dev VM IPv6 2001:470:6f:43d::). Both legacy embed IDs
appear to be offline upstream. This extractor will produce JWT URLs
that the verifier marks unplayable for those specific embeds; if the
upstream broadcasts come back online or fresh IDs are added, the same
extractor logic just works.
"""
import base64
import logging
import re
import urllib.parse
import httpx
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/17.4 Safari/605.1.15"
)
# Curated hmembeds embed IDs that the community treats as 24/7 channels.
# `_CHANNELS` mirrors the legacy `CuratedExtractor` list — keeping them
# here means the resolver can attempt offline-decoded JWT URLs and the
# verifier filters out the ones that are upstream-offline.
_CHANNELS = (
("888520f36cd94c5da4c71fddc1a5fc9b", "Sky Sports F1 (24/7) — hmembeds"),
("fc3a54634d0867b0c02ee3223292e7c6", "DAZN F1 (24/7) — hmembeds"),
)
_KEY_RE = re.compile(r'k\s*=\s*"([a-z0-9]+)"')
_BLOB_RE = re.compile(r'b\s*=\s*atob\("([^"]+)"\)')
_URL_RE = re.compile(r'streamUrl\s*=\s*"([^"]+)"')
def decode_embed(html: str) -> str | None:
"""Pull the m3u8 URL out of an hmembeds embed HTML.
Returns the JWT-bound m3u8 URL the page would tell JW Player to
play, or None if the page doesn't match the expected shape.
"""
km = _KEY_RE.search(html)
bm = _BLOB_RE.search(html)
if not km or not bm:
return None
key = km.group(1)
blob = bm.group(1)
try:
# b = atob(blob) — base64-decode bytes
# c = decodeURIComponent(escape(b)) — Latin-1 → UTF-8 round-trip
# d[i] = c[i] ^ k[i % len(k)] — XOR with rotating key
raw = base64.b64decode(blob).decode("latin-1")
deuri = urllib.parse.unquote(raw)
decoded = "".join(
chr(ord(c) ^ ord(key[i % len(key)])) for i, c in enumerate(deuri)
)
except Exception:
return None
m = _URL_RE.search(decoded)
return m.group(1) if m else None
class HmembedsExtractor(BaseExtractor):
@property
def site_key(self) -> str:
return "hmembeds"
@property
def site_name(self) -> str:
return "hmembeds.one"
async def extract(self) -> list[ExtractedStream]:
results: list[ExtractedStream] = []
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": USER_AGENT, "Referer": "https://hmembeds.one/"},
) as client:
for embed_id, label in _CHANNELS:
try:
page = await client.get(f"https://hmembeds.one/embed/{embed_id}")
except Exception:
logger.debug("[hmembeds] embed %s fetch failed", embed_id, exc_info=True)
continue
if page.status_code != 200:
continue
m3u8 = decode_embed(page.text)
if not m3u8:
continue
results.append(
ExtractedStream(
url=m3u8,
site_key=self.site_key,
site_name=self.site_name,
quality="",
title=label,
stream_type="m3u8",
)
)
logger.info("[hmembeds] resolved %d JWT URL(s) (verifier filters dead origins)", len(results))
return results

View file

@ -1,39 +0,0 @@
"""Data models for the stream extraction framework."""
from dataclasses import dataclass, field
from datetime import datetime, timezone
@dataclass
class ExtractedStream:
"""Represents a single stream URL discovered by an extractor."""
url: str # The HLS/m3u8 URL
site_key: str # Which extractor found it
site_name: str # Human-readable name
quality: str = "" # e.g., "720p", "1080p", or empty
title: str = "" # e.g., "F1 Race Live"
extracted_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
is_live: bool = False # Whether it passed health check
response_time_ms: int = 0 # Health check response time (lower = better)
checked_at: str = "" # ISO timestamp of last health check
bitrate: int = 0 # Bitrate in bps if detectable from m3u8 playlist
stream_type: str = "m3u8" # "m3u8" for direct HLS, "embed" for iframe embed URL
embed_url: str = "" # The iframe-embeddable URL (when stream_type is "embed")
def to_dict(self) -> dict:
"""Serialize to a plain dictionary for JSON responses."""
return {
"url": self.url,
"site_key": self.site_key,
"site_name": self.site_name,
"quality": self.quality,
"title": self.title,
"extracted_at": self.extracted_at,
"is_live": self.is_live,
"response_time_ms": self.response_time_ms,
"checked_at": self.checked_at,
"bitrate": self.bitrate,
"stream_type": self.stream_type,
"embed_url": self.embed_url,
}

View file

@ -1,595 +0,0 @@
"""Pitsport.xyz extractor - fetches F1 streams from the Next.js RSC payload.
Architecture:
- Main page (pitsport.xyz) has a "Live Now" section with event cards containing
category, title, time, imageUrl props and /watch/{UUID} links.
- Schedule page (pitsport.xyz/schedule) lists all events grouped by category
(h2 headings) with /watch/{UUID} links and event titles.
- Watch pages (/watch/{UUID}) embed iframes from pushembdz.store/embed/{EMBED_UUID}.
- Embed pages contain an RSC payload with a stream config: {title, link, method}.
- When method is "player" or "hls", the link field points to a serveplay.site
m3u8 playlist. Otherwise we return the embed URL for iframe playback.
"""
import logging
import re
from dataclasses import dataclass
import httpx
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
PITSPORT_BASE = "https://pitsport.xyz"
EMBED_BASE = "https://pushembdz.store"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
# Categories to include (case-insensitive match). Broadened beyond F1
# to also surface MotoGP and adjacent motorsports — keeps the f1-stream
# UI useful between race weekends and during the off-season.
MOTORSPORT_CATEGORIES = {
"f1", "formula 1", "formula 2", "formula 3",
"motogp", "moto gp", "moto2", "moto3", "motoe",
"world rally championship", "wrc",
"world endurance championship", "wec",
"indycar series", "indycar", "indynxt",
"nascar cup series", "nascar truck series", "nascar o'reilly auto parts series",
"nascar xfinity series", "nascar",
}
# Title keywords that are strong positives even when the category text
# is missing (live-now cards sometimes elide it).
MOTORSPORT_KEYWORDS = {
"formula 1", "formula one", "f1",
"motogp", "moto gp", "moto2", "moto3",
"rally", "wrc",
"indycar", "indy car",
"nascar",
"le mans", "lemans", "wec", "endurance",
}
GP_KEYWORD = "grand prix"
@dataclass
class _PitsportEvent:
"""An event discovered from the Pitsport site."""
category: str
title: str
watch_uuid: str
def _is_motorsport_category(category: str) -> bool:
"""Check if a category string matches an included motorsport series."""
return category.strip().lower() in MOTORSPORT_CATEGORIES
def _is_motorsport_event(category: str, title: str) -> bool:
"""Accept anything pitsport.xyz lists. Pitsport curates sports
broadcasts (WRC, MotoGP, IndyCar, NASCAR, Premier League Darts,
Premier League football, etc.) the site's own selection is the
filter we want. Empty/garbage events still get filtered downstream
when `_resolve_event_streams` produces no playable URL."""
return bool(category or title)
# Aliases kept so older call-sites stay compiling. Both now point at the
# broadened motorsport filter.
_is_f1_category = _is_motorsport_category
_is_f1_event = _is_motorsport_event
def _decode_rsc_payload(html: str) -> str:
"""Concatenate and unescape all `self.__next_f.push([1, "..."])` chunks.
Next.js RSC ships its tree as escape-encoded strings inside repeated
`self.__next_f.push` calls. Regex over the raw HTML misses everything
interesting; we have to decode unicode escapes first.
"""
chunks = re.findall(r'self\.__next_f\.push\(\[1,"(.*?)"\]\)', html, re.DOTALL)
if not chunks:
return ""
payload = ""
for chunk in chunks:
try:
payload += chunk.encode().decode("unicode_escape")
except Exception:
payload += chunk
return payload
def _parse_live_events(html: str) -> list[_PitsportEvent]:
"""Parse live events from the main page (or `/live-now`) RSC payload.
The pages embed event cards inside the Next.js RSC payload; the raw
HTML keeps it escape-encoded so we decode first, then match.
Two shapes are common:
1) Older card props: "category":"...","title":"..." next to
"href":"/watch/UUID".
2) Newer `event` prop: an `event` object with `uri:"/watch/UUID"`
carrying `category` and `title`.
"""
payload = _decode_rsc_payload(html) or html
events: list[_PitsportEvent] = []
href_pattern = re.compile(
r'"href":"(/watch/([0-9a-f-]{36}))"[^}]*?"category":"([^"]+)","title":"([^"]+)"',
)
for match in href_pattern.finditer(payload):
_, uuid, category, title = match.groups()
events.append(_PitsportEvent(category=category, title=title, watch_uuid=uuid))
event_pattern = re.compile(
r'"event":\{[^{}]*?"title":"([^"]+)"[^{}]*?"uri":"/watch/([0-9a-f-]{36})"[^{}]*?"category":"([^"]+)"',
)
for match in event_pattern.finditer(payload):
title, uuid, category = match.groups()
events.append(_PitsportEvent(category=category, title=title, watch_uuid=uuid))
event_pattern_alt = re.compile(
r'"event":\{[^{}]*?"category":"([^"]+)"[^{}]*?"title":"([^"]+)"[^{}]*?"uri":"/watch/([0-9a-f-]{36})"',
)
for match in event_pattern_alt.finditer(payload):
category, title, uuid = match.groups()
events.append(_PitsportEvent(category=category, title=title, watch_uuid=uuid))
return events
def _parse_schedule_events(html: str) -> list[_PitsportEvent]:
"""Parse events from the schedule page.
The schedule page groups events under category headers (h2 elements).
In the rendered HTML:
<h2 ...>Formula 1</h2>
<div ...>
<a href="/watch/UUID">...</a>
...
</div>
In the RSC payload, similar structure with section divs containing
a category h2 and child event links with titles.
"""
events: list[_PitsportEvent] = []
# Strategy 1: Parse from rendered HTML
# Find category sections: >CategoryName</h2> followed by watch links
# Split HTML at each category header
section_pattern = re.compile(
r'>([^<]+)</h2>\s*<div[^>]*class="flex flex-wrap gap-6">(.*?)(?=</div>\s*</div>\s*(?:<div|</div>|$))',
re.DOTALL,
)
for section_match in section_pattern.finditer(html):
category = section_match.group(1).strip()
section_html = section_match.group(2)
# Find all watch links in this section
link_pattern = re.compile(
r'href="/watch/([0-9a-f-]{36})".*?<h1[^>]*>([^<]+)</h1>',
re.DOTALL,
)
for link_match in link_pattern.finditer(section_html):
uuid = link_match.group(1)
title = link_match.group(2).strip()
events.append(
_PitsportEvent(category=category, title=title, watch_uuid=uuid)
)
# Strategy 2: Parse from RSC payload if rendered HTML didn't yield results
# The RSC payload has patterns like:
# "children":"Formula 1"}] ... "/watch/UUID" ... "title":"EventTitle"
if not events:
events = _parse_schedule_rsc(html)
return events
def _parse_schedule_rsc(html: str) -> list[_PitsportEvent]:
"""Parse events from schedule page RSC payload as fallback.
Extracts category section divs from the RSC JSON structure.
"""
events: list[_PitsportEvent] = []
# Find the RSC payload chunks
rsc_chunks = re.findall(
r'self\.__next_f\.push\(\[1,"(.*?)"\]\)', html, re.DOTALL
)
if not rsc_chunks:
return events
# Concatenate and unescape
full_payload = ""
for chunk in rsc_chunks:
try:
full_payload += chunk.encode().decode("unicode_escape")
except Exception:
full_payload += chunk
# Find category sections in the RSC data
# Pattern: "children":"CategoryName"}],["$","div",...watch links...
# Each section div contains an h2 with the category name and watch links
cat_pattern = re.compile(
r'border-gray-700 pb-2","children":"([^"]+)"\}.*?'
r'(?=border-gray-700 pb-2","children"|$)',
re.DOTALL,
)
for cat_match in cat_pattern.finditer(full_payload):
category = cat_match.group(1)
section_text = cat_match.group(0)
# Find watch UUIDs and titles in this section
# Pattern: "/watch/UUID" ... "title":"EventTitle"
event_pattern = re.compile(
r'/watch/([0-9a-f-]{36}).*?"title":"([^"]+)"',
)
for ev_match in event_pattern.finditer(section_text):
uuid = ev_match.group(1)
title = ev_match.group(2)
events.append(
_PitsportEvent(category=category, title=title, watch_uuid=uuid)
)
return events
def _parse_embed_uuids(html: str) -> list[str]:
"""Extract embed UUIDs from a watch page.
Watch pages contain iframes like:
<iframe src="https://pushembdz.store/embed/{EMBED_UUID}" ...>
And in the RSC payload:
"iframe":"https://pushembdz.store/embed/{EMBED_UUID}"
"""
uuids: list[str] = []
# From rendered HTML
iframe_pattern = re.compile(
r'pushembdz\.store/embed/([0-9a-f-]{36})',
)
for match in iframe_pattern.finditer(html):
uuid = match.group(1)
if uuid not in uuids:
uuids.append(uuid)
return uuids
@dataclass
class _StreamConfig:
"""Stream configuration extracted from an embed page."""
title: str
link: str
method: str
def _parse_stream_config(html: str) -> _StreamConfig | None:
"""Extract stream config from an embed page RSC payload.
The embed page now uses a `safeStream` payload that elides the link:
4:["$","$Ld",null,{"safeStream":{"title":"Rally TV","method":"jwp"},
"error":null,"slug":"..."}]
The actual stream URL is fetched at runtime via
pushembdz.store/api/stream/<slug>. Older payloads used "stream" with
inline title+link+method kept as fallback.
"""
# Current format: safeStream with title + method only (link via API).
pattern_safe = re.compile(
r'\\?"safeStream\\?"\s*:\s*\{'
r'\\?"title\\?"\s*:\s*\\?"([^"\\]+)\\?"\s*,\s*'
r'\\?"method\\?"\s*:\s*\\?"([^"\\]+)\\?"',
)
match = pattern_safe.search(html)
if match:
return _StreamConfig(
title=match.group(1),
link="", # filled in by the caller via the api/stream endpoint
method=match.group(2),
)
# Legacy: escaped RSC payload with inline link.
pattern = re.compile(
r'"stream":\{["\']?\\?"title\\?"["\']?:["\']?\\?"([^"\\]+)\\?"["\']?,'
r'["\']?\\?"link\\?"["\']?:["\']?\\?"([^"\\]+)\\?"["\']?,'
r'["\']?\\?"method\\?"["\']?:["\']?\\?"([^"\\]+)\\?"',
)
match = pattern.search(html)
if match:
return _StreamConfig(title=match.group(1), link=match.group(2), method=match.group(3))
pattern2 = re.compile(
r'\\?"stream\\?":\{\\?"title\\?":\\?"([^\\]+)\\?",'
r'\\?"link\\?":\\?"([^\\]+)\\?",'
r'\\?"method\\?":\\?"([^\\]+)\\?"',
)
match = pattern2.search(html)
if match:
return _StreamConfig(title=match.group(1), link=match.group(2), method=match.group(3))
pattern3 = re.compile(
r'"stream"\s*:\s*\{\s*"title"\s*:\s*"([^"]+)"\s*,'
r'\s*"link"\s*:\s*"([^"]+)"\s*,'
r'\s*"method"\s*:\s*"([^"]+)"',
)
match = pattern3.search(html)
if match:
return _StreamConfig(title=match.group(1), link=match.group(2), method=match.group(3))
return None
def _is_m3u8_method(method: str) -> bool:
"""Check if the stream method indicates a direct HLS stream."""
# `jwp` (current pushembdz format) returns an m3u8 from the api/stream
# endpoint regardless of player UI; treat it as HLS.
return method.lower() in ("player", "hls", "jwp")
def _extract_m3u8_url(link: str) -> str:
"""Pass through the link from pushembdz's `api/stream/<slug>` response.
The host has rotated over time (serveplay.site oe1.ossfeed.store
); the response is always a master playlist URL we hand to the
player as-is. Content-Type may be `text/css` or `application/json`
treat as HLS based on body sniffing (`#EXTM3U`), not MIME.
"""
return link
class PitsportExtractor(BaseExtractor):
"""Extracts F1 streams from Pitsport.xyz.
Scrapes the Next.js RSC payload from the main page and schedule page
to find F1 events, then resolves embed UUIDs to stream configurations.
"""
@property
def site_key(self) -> str:
return "pitsport"
@property
def site_name(self) -> str:
return "Pitsport"
async def extract(self) -> list[ExtractedStream]:
"""Fetch F1 events and return stream URLs or embed URLs."""
streams: list[ExtractedStream] = []
try:
async with httpx.AsyncClient(
timeout=20.0,
follow_redirects=True,
headers={"User-Agent": USER_AGENT},
) as client:
# Fetch both pages to get comprehensive event data
events = await self._discover_events(client)
logger.info(
"[pitsport] Found %d F1 event(s) to process", len(events)
)
# Deduplicate by watch UUID
seen_uuids: set[str] = set()
unique_events: list[_PitsportEvent] = []
for ev in events:
if ev.watch_uuid not in seen_uuids:
seen_uuids.add(ev.watch_uuid)
unique_events.append(ev)
# For each event, resolve streams
for event in unique_events:
event_streams = await self._resolve_event_streams(
client, event
)
streams.extend(event_streams)
except Exception:
logger.exception("[pitsport] Failed to extract streams")
logger.info("[pitsport] Extracted %d stream(s)", len(streams))
return streams
async def _discover_events(
self, client: httpx.AsyncClient
) -> list[_PitsportEvent]:
"""Discover F1 events from both main page and schedule page."""
all_events: list[_PitsportEvent] = []
# Fetch main page for live events
try:
resp = await client.get(PITSPORT_BASE)
if resp.status_code == 200:
live_events = _parse_live_events(resp.text)
logger.info(
"[pitsport] Main page: %d live event(s)", len(live_events)
)
for ev in live_events:
if _is_f1_event(ev.category, ev.title):
all_events.append(ev)
else:
logger.warning(
"[pitsport] Main page returned HTTP %d", resp.status_code
)
except Exception:
logger.exception("[pitsport] Failed to fetch main page")
# Fetch /live-now — canonical "currently live" list, added 2026.
try:
resp = await client.get(f"{PITSPORT_BASE}/live-now")
if resp.status_code == 200:
live_now_events = _parse_live_events(resp.text)
logger.info(
"[pitsport] Live-now page: %d event(s)", len(live_now_events)
)
for ev in live_now_events:
if _is_f1_event(ev.category, ev.title):
all_events.append(ev)
else:
logger.warning(
"[pitsport] Live-now page returned HTTP %d", resp.status_code
)
except Exception:
logger.exception("[pitsport] Failed to fetch live-now page")
# Fetch schedule page for upcoming events
try:
resp = await client.get(f"{PITSPORT_BASE}/schedule")
if resp.status_code == 200:
schedule_events = _parse_schedule_events(resp.text)
logger.info(
"[pitsport] Schedule page: %d total event(s)",
len(schedule_events),
)
for ev in schedule_events:
if _is_f1_event(ev.category, ev.title):
all_events.append(ev)
else:
logger.warning(
"[pitsport] Schedule page returned HTTP %d",
resp.status_code,
)
except Exception:
logger.exception("[pitsport] Failed to fetch schedule page")
return all_events
async def _resolve_event_streams(
self, client: httpx.AsyncClient, event: _PitsportEvent
) -> list[ExtractedStream]:
"""Resolve an event's watch page to actual stream URLs."""
streams: list[ExtractedStream] = []
try:
# Fetch the watch page to get embed UUIDs
watch_url = f"{PITSPORT_BASE}/watch/{event.watch_uuid}"
resp = await client.get(watch_url)
if resp.status_code != 200:
logger.debug(
"[pitsport] Watch page %s returned HTTP %d",
event.watch_uuid,
resp.status_code,
)
return []
embed_uuids = _parse_embed_uuids(resp.text)
if not embed_uuids:
logger.debug(
"[pitsport] No embed UUIDs found for %s", event.watch_uuid
)
return []
logger.debug(
"[pitsport] Event '%s' has %d embed(s)",
event.title,
len(embed_uuids),
)
# Resolve each embed to a stream config
for i, embed_uuid in enumerate(embed_uuids):
stream = await self._resolve_embed(
client, embed_uuid, event, stream_num=i + 1
)
if stream:
streams.append(stream)
except Exception:
logger.debug(
"[pitsport] Failed to resolve event %s",
event.watch_uuid,
exc_info=True,
)
return streams
async def _resolve_embed(
self,
client: httpx.AsyncClient,
embed_uuid: str,
event: _PitsportEvent,
stream_num: int,
) -> ExtractedStream | None:
"""Resolve an embed UUID to a stream configuration."""
try:
embed_url = f"{EMBED_BASE}/embed/{embed_uuid}"
resp = await client.get(embed_url)
if resp.status_code != 200:
logger.debug(
"[pitsport] Embed page %s returned HTTP %d",
embed_uuid,
resp.status_code,
)
return None
config = _parse_stream_config(resp.text)
if not config:
logger.debug(
"[pitsport] No stream config found in embed %s",
embed_uuid,
)
return None
# Build the stream title
stream_title = f"{event.category} - {event.title}"
if config.title:
stream_title += f" ({config.title})"
if stream_num > 1:
stream_title += f" #{stream_num}"
# `safeStream` payload elides the link — fetch it from the
# pushembdz.store/api/stream/<slug> endpoint. Older `stream`
# payloads provided the link inline.
link = config.link
if not link and _is_m3u8_method(config.method):
api_url = f"{EMBED_BASE}/api/stream/{embed_uuid}"
try:
api_resp = await client.get(
api_url,
headers={"Referer": embed_url, "Accept": "application/json"},
)
if api_resp.status_code == 200:
link = (api_resp.json() or {}).get("link", "")
except Exception:
logger.debug(
"[pitsport] api/stream lookup failed for %s",
embed_uuid,
exc_info=True,
)
# Treat any HLS-ish URL (m3u8, or pushembdz's .css disguise) as m3u8.
looks_hls = link and (".m3u8" in link or link.endswith(".css") or "serveplay.site" in link)
if _is_m3u8_method(config.method) and looks_hls:
return ExtractedStream(
url=link,
site_key=self.site_key,
site_name=self.site_name,
quality="",
title=stream_title,
stream_type="m3u8",
)
else:
# Iframe embed fallback
return ExtractedStream(
url=embed_url,
site_key=self.site_key,
site_name=self.site_name,
quality="",
title=stream_title,
stream_type="embed",
embed_url=embed_url,
)
except Exception:
logger.debug(
"[pitsport] Failed to resolve embed %s",
embed_uuid,
exc_info=True,
)
return None

View file

@ -1,273 +0,0 @@
"""PPV.to extractor - fetches F1 streams via the public PPV API.
Returns embed URLs (pooembed.eu) for iframe playback.
The API at api.ppv.to/api/streams requires no authentication.
Falls back to api.ppv.st if the primary API is unreachable.
"""
import logging
import httpx
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
PRIMARY_API = "https://api.ppv.to/api/streams"
FALLBACK_API = "https://api.ppv.st/api/streams"
EMBED_BASE = "https://pooembed.eu/embed"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
# Category name for motorsport on PPV.to
MOTORSPORT_CATEGORY = "motorsports"
# Only include events matching these keywords (case-insensitive)
F1_KEYWORDS = {"formula 1", "formula one", "f1", "sky sports f1"}
# Grand Prix is shared with MotoGP/IndyCar — only match if no other series keywords
GP_KEYWORD = "grand prix"
NON_F1_KEYWORDS = {
"motogp", "moto gp", "moto2", "moto3", "motoe",
"indycar", "indy car", "firestone", "nascar",
"rally", "wrc", "wec", "lemans", "le mans",
"superbike", "dtm", "supercars",
}
def _is_f1_stream(name: str, category_name: str = "") -> bool:
"""Check if a stream is Formula 1 related.
Checks both the stream name and the category name.
A stream qualifies if:
- It is in the motorsport category AND matches F1 keywords, OR
- It matches F1 keywords regardless of category.
"""
lower_name = name.lower()
lower_cat = category_name.lower()
# Reject if it contains non-F1 motorsport keywords
if any(kw in lower_name for kw in NON_F1_KEYWORDS):
return False
# Direct F1 keyword match in the stream name
if any(kw in lower_name for kw in F1_KEYWORDS):
return True
# "grand prix" in the name, only if in motorsports category and no non-F1 keywords
if GP_KEYWORD in lower_name and MOTORSPORT_CATEGORY in lower_cat:
return True
# If the category is motorsport, also check category-level keywords
if MOTORSPORT_CATEGORY in lower_cat and any(kw in lower_cat for kw in F1_KEYWORDS):
return True
return False
class PPVExtractor(BaseExtractor):
"""Extracts embed URLs from PPV.to's public JSON API.
Uses the endpoint:
- GET https://api.ppv.to/api/streams -> all streams grouped by category
- Fallback: https://api.ppv.st/api/streams
Each stream object contains an `iframe` field with the embed URL,
or a `uri_name` from which the embed URL can be constructed.
"""
@property
def site_key(self) -> str:
return "ppv"
@property
def site_name(self) -> str:
return "PPV.to"
async def _fetch_streams(self, client: httpx.AsyncClient) -> dict | None:
"""Try primary and fallback APIs, return parsed JSON or None."""
for api_url in (PRIMARY_API, FALLBACK_API):
try:
resp = await client.get(api_url)
if resp.status_code == 200:
data = resp.json()
logger.info("[ppv] Fetched streams from %s", api_url)
return data
logger.warning(
"[ppv] %s returned HTTP %d", api_url, resp.status_code
)
except Exception:
logger.debug(
"[ppv] Failed to reach %s", api_url, exc_info=True
)
return None
async def extract(self) -> list[ExtractedStream]:
"""Fetch F1 streams and return embed URLs for iframe playback."""
streams: list[ExtractedStream] = []
try:
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": USER_AGENT, "Accept": "application/json"},
) as client:
data = await self._fetch_streams(client)
if data is None:
logger.warning("[ppv] Could not fetch streams from any API")
return []
# The API returns:
# { "streams": [ { "category": "Name", "id": N, "streams": [...] }, ... ] }
# Flatten into (category_name, stream_obj) tuples.
all_streams = self._normalize_streams(data)
logger.info(
"[ppv] Found %d total stream(s) across all categories",
len(all_streams),
)
for category_name, stream_obj in all_streams:
name = stream_obj.get("name", "") or stream_obj.get("title", "")
if not _is_f1_stream(name, category_name):
continue
# Build the embed URL
embed_url = self._get_embed_url(stream_obj)
if not embed_url:
logger.debug("[ppv] No embed URL for stream: %s", name)
continue
# Extract quality from tag if present
tag = stream_obj.get("tag", "")
quality = tag if tag else ""
# Build descriptive title
title = name
viewers = stream_obj.get("viewers")
if viewers and int(viewers) > 0:
title += f" ({viewers} viewers)"
# Always emit the parent stream — substreams are
# additional language/source variants, not replacements.
streams.append(
ExtractedStream(
url=embed_url,
site_key=self.site_key,
site_name=self.site_name,
quality=quality,
title=title,
stream_type="embed",
embed_url=embed_url,
)
)
substreams = stream_obj.get("substreams")
if isinstance(substreams, list):
for i, sub in enumerate(substreams):
sub_embed = sub.get("iframe", "") or sub.get("embed_url", "")
if not sub_embed:
sub_embed = embed_url
sub_name = (
sub.get("source_tag", "")
or sub.get("name", "")
or sub.get("label", "")
)
sub_quality = sub.get("tag", "") or sub.get("quality", "") or quality
sub_title = f"{name}"
if sub_name:
sub_title += f" - {sub_name}"
else:
sub_title += f" #{i + 2}"
streams.append(
ExtractedStream(
url=sub_embed,
site_key=self.site_key,
site_name=self.site_name,
quality=sub_quality,
title=sub_title,
stream_type="embed",
embed_url=sub_embed,
)
)
except Exception:
logger.exception("[ppv] Failed to extract streams")
logger.info("[ppv] Extracted %d F1 stream(s)", len(streams))
return streams
@staticmethod
def _normalize_streams(data: dict | list) -> list[tuple[str, dict]]:
"""Normalize the API response into a flat list of (category_name, stream_dict) tuples.
The PPV API returns data in this shape:
{
"streams": [
{
"category": "Motorsports",
"id": 35,
"streams": [ { stream objects... } ]
},
...
]
}
Each category group has a "category" string and a nested "streams" list.
"""
result: list[tuple[str, dict]] = []
# Handle the top-level wrapper
if isinstance(data, dict):
categories = data.get("streams", [])
elif isinstance(data, list):
categories = data
else:
return result
for category_group in categories:
if not isinstance(category_group, dict):
continue
category_name = category_group.get("category", "")
# The nested streams within this category
inner_streams = category_group.get("streams", [])
if isinstance(inner_streams, list):
for stream_obj in inner_streams:
if isinstance(stream_obj, dict):
# Attach category_name to each stream for filtering
result.append((category_name, stream_obj))
elif isinstance(category_group, dict) and "name" in category_group:
# Fallback: the item itself is a stream (flat list format)
result.append((category_name, category_group))
return result
@staticmethod
def _get_embed_url(stream: dict) -> str:
"""Extract or construct the embed URL for a stream."""
# Prefer the iframe field directly
iframe = stream.get("iframe", "")
if iframe:
return iframe
# Construct from uri_name
uri_name = stream.get("uri_name", "") or stream.get("uri", "")
if uri_name:
# Strip leading slash if present
uri_name = uri_name.lstrip("/")
return f"{EMBED_BASE}/{uri_name}"
# Last resort: use the stream id
stream_id = stream.get("id")
if stream_id:
return f"{EMBED_BASE}/{stream_id}"
return ""

View file

@ -1,116 +0,0 @@
"""Central registry for stream extractors."""
import asyncio
import logging
from datetime import datetime, timezone
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
class ExtractorRegistry:
"""Central registry for all site extractors.
Manages extractor instances and provides fan-out extraction across
all registered extractors with independent error handling.
"""
def __init__(self) -> None:
self._extractors: dict[str, BaseExtractor] = {}
def register(self, extractor: BaseExtractor) -> None:
"""Register an extractor instance.
Args:
extractor: A BaseExtractor subclass instance.
Raises:
ValueError: If an extractor with the same site_key is already registered.
"""
key = extractor.site_key
if key in self._extractors:
raise ValueError(
f"Extractor with site_key '{key}' is already registered "
f"(existing: {self._extractors[key].site_name}, "
f"new: {extractor.site_name})"
)
self._extractors[key] = extractor
logger.info("Registered extractor: %s (%s)", extractor.site_name, key)
def get(self, site_key: str) -> BaseExtractor | None:
"""Get an extractor by its site_key.
Args:
site_key: The unique identifier of the extractor.
Returns:
The extractor instance, or None if not found.
"""
return self._extractors.get(site_key)
def list_extractors(self) -> list[dict]:
"""List all registered extractors.
Returns:
A list of dicts with site_key and site_name for each extractor.
"""
return [
{"site_key": ext.site_key, "site_name": ext.site_name}
for ext in self._extractors.values()
]
async def extract_all(self) -> list[ExtractedStream]:
"""Fan-out extraction to all registered extractors concurrently.
Each extractor runs independently. If one fails, the others
continue and their results are still collected.
Returns:
Combined list of ExtractedStream from all extractors.
"""
if not self._extractors:
logger.warning("No extractors registered, nothing to extract")
return []
logger.info(
"Running extraction across %d extractor(s): %s",
len(self._extractors),
", ".join(self._extractors.keys()),
)
async def _safe_extract(extractor: BaseExtractor) -> list[ExtractedStream]:
"""Run a single extractor with error isolation."""
try:
streams = await extractor.extract()
# Fill in site_key/site_name if the extractor didn't set them
now = datetime.now(timezone.utc).isoformat()
for stream in streams:
if not stream.site_key:
stream.site_key = extractor.site_key
if not stream.site_name:
stream.site_name = extractor.site_name
if not stream.extracted_at:
stream.extracted_at = now
logger.info(
"[%s] Extracted %d stream(s)", extractor.site_key, len(streams)
)
return streams
except Exception:
logger.exception(
"[%s] Extractor failed during extraction", extractor.site_key
)
return []
# Run all extractors concurrently
tasks = [_safe_extract(ext) for ext in self._extractors.values()]
results = await asyncio.gather(*tasks)
# Flatten results
all_streams: list[ExtractedStream] = []
for stream_list in results:
all_streams.extend(stream_list)
logger.info("Extraction complete: %d total stream(s) found", len(all_streams))
return all_streams

View file

@ -1,270 +0,0 @@
"""Extraction service - manages extraction lifecycle: polling, caching, health checking, serving."""
import logging
from datetime import datetime, timezone
from backend.extractors.models import ExtractedStream
from backend.extractors.registry import ExtractorRegistry
from backend.health import StreamHealthChecker
from backend.playback_verifier import PlaybackVerifier
logger = logging.getLogger(__name__)
class ExtractionService:
"""Manages the extraction lifecycle: polling, caching, health checking, and serving.
Extraction runs on a background schedule (via APScheduler), never on
client request path. After extraction, health checks verify each stream
is live. Results are cached in memory, keyed by site_key.
GET /streams only returns streams that passed health checks, sorted by:
1. is_live (live streams first)
2. response_time_ms (fastest first)
"""
def __init__(self, registry: ExtractorRegistry) -> None:
self._registry = registry
# Cache: site_key -> list of ExtractedStream
self._cache: dict[str, list[ExtractedStream]] = {}
self._last_run: str | None = None
self._last_run_stream_count: int = 0
self._health_checker = StreamHealthChecker()
self._playback_verifier = PlaybackVerifier()
async def shutdown(self) -> None:
"""Release the headless browser instance owned by the verifier."""
await self._playback_verifier.shutdown()
async def run_extraction(self) -> None:
"""Run all extractors, health-check results, and cache them.
This is called by the background scheduler. Each extractor's
results replace its previous cache entry entirely. After extraction,
health checks are run to verify streams are live and measure
response times.
"""
logger.info("Starting extraction run...")
start = datetime.now(timezone.utc)
streams = await self._registry.extract_all()
# Dedupe by canonical URL — pitsport surfaces every WRC stage as a
# separate event but they all point at the same RallyTV master.m3u8
# (and similar for MotoGP weekend sessions). Keep the first
# occurrence so the user sees one entry per actual stream.
deduped: list[ExtractedStream] = []
seen_urls: set[str] = set()
for stream in streams:
key = (stream.embed_url or "").strip() or (stream.url or "").strip()
if not key or key in seen_urls:
continue
seen_urls.add(key)
deduped.append(stream)
if len(deduped) < len(streams):
logger.info(
"Deduped streams: %d -> %d (collapsed %d duplicate URL(s))",
len(streams), len(deduped), len(streams) - len(deduped),
)
streams = deduped
# Run health checks + headless-browser playback verification.
# Both stream types are now verified end-to-end so the user only
# ever sees streams that actually play in a browser.
if streams:
m3u8_streams = [s for s in streams if s.stream_type != "embed"]
embed_streams = [s for s in streams if s.stream_type == "embed"]
# m3u8 streams: cheap structural health check (validates manifest,
# checks first variant playlist), then a headless-browser test
# to confirm hls.js can decode and render frames.
if m3u8_streams:
stream_dicts = [s.to_dict() for s in m3u8_streams]
health_map = await self._health_checker.check_all(stream_dicts)
for stream in m3u8_streams:
health = health_map.get(stream.url)
if health:
stream.response_time_ms = health.response_time_ms
stream.checked_at = health.checked_at
if health.bitrate > 0:
stream.bitrate = health.bitrate
# tentatively mark live; final word comes from the verifier
stream.is_live = health.is_live
# Browser verification: applies to both m3u8 (only those that
# passed structural health) and embed (always — they have no
# other way to verify).
verify_items: list[tuple[str, str]] = []
for stream in m3u8_streams:
if stream.is_live:
verify_items.append((stream.url, "m3u8"))
for stream in embed_streams:
verify_items.append((stream.embed_url or stream.url, "embed"))
verdicts = await self._playback_verifier.verify_many(verify_items)
now_iso = datetime.now(timezone.utc).isoformat()
for stream in m3u8_streams:
if not stream.is_live:
continue # already failed health check
verdict = verdicts.get(stream.url)
if verdict is None:
continue # verifier disabled or unavailable
stream.is_live = verdict.is_playable
stream.checked_at = now_iso
# Curated streams skip the verifier — they are hand-picked
# 24/7 channels whose embed pages aggressively detect headless
# automation. We can't reliably confirm playback server-side,
# but we trust the curator. The user's real browser does NOT
# trigger the same anti-bot heuristics (real plugins, real
# mouse movements, etc.).
CURATED_BYPASS = {"curated"}
for stream in embed_streams:
stream.checked_at = now_iso
if stream.site_key in CURATED_BYPASS:
stream.is_live = True
stream.response_time_ms = 0
continue
key = stream.embed_url or stream.url
verdict = verdicts.get(key)
if verdict is None:
# Verifier unavailable — fall back to "trust extractor".
# This keeps the service usable even without playwright.
stream.is_live = True
stream.response_time_ms = 0
else:
stream.is_live = verdict.is_playable
stream.response_time_ms = verdict.elapsed_ms
# Group streams by site_key and update cache
new_cache: dict[str, list[ExtractedStream]] = {}
for stream in streams:
new_cache.setdefault(stream.site_key, []).append(stream)
# Replace cache for extractors that returned results.
# Clear cache for extractors that returned nothing (site went down, etc.)
for extractor_info in self._registry.list_extractors():
key = extractor_info["site_key"]
if key in new_cache:
self._cache[key] = new_cache[key]
else:
# Extractor returned nothing - clear its cache
self._cache.pop(key, None)
self._last_run = start.isoformat()
self._last_run_stream_count = len(streams)
live_count = sum(
1 for streams_list in self._cache.values()
for s in streams_list if s.is_live
)
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
logger.info(
"Extraction run complete: %d stream(s) from %d extractor(s) in %.1fs (%d live)",
len(streams),
len(new_cache),
elapsed,
live_count,
)
def get_streams(self) -> list[dict]:
"""Return all cached streams as a sorted list of dicts.
Only returns streams that passed health checks (is_live=True).
Sorted by fallback priority:
1. is_live (live streams first) - filters to live only
2. response_time_ms (fastest first)
Returns:
List of serialized ExtractedStream dicts from all extractors,
filtered to live-only and sorted by response time.
"""
all_streams: list[ExtractedStream] = []
for streams in self._cache.values():
all_streams.extend(streams)
# Sort by fallback priority: live first, then fastest response
all_streams.sort(
key=lambda s: (not s.is_live, s.response_time_ms)
)
# Only return live streams to clients
live_streams = [s for s in all_streams if s.is_live]
return [s.to_dict() for s in live_streams]
def get_all_streams_unfiltered(self) -> list[dict]:
"""Return ALL cached streams including unhealthy ones.
Used for debugging and status endpoints. Sorted by fallback priority
but includes streams that failed health checks.
Returns:
List of all serialized ExtractedStream dicts.
"""
all_streams: list[ExtractedStream] = []
for streams in self._cache.values():
all_streams.extend(streams)
# Sort by fallback priority: live first, then fastest response
all_streams.sort(
key=lambda s: (not s.is_live, s.response_time_ms)
)
return [s.to_dict() for s in all_streams]
def get_streams_for_session(self, session_type: str) -> list[dict]:
"""Return cached streams filtered/annotated for a specific session type.
Currently returns all live streams (extractors don't yet differentiate by
session type). This method exists as a hook for future filtering,
e.g., some extractors might only have race streams but not FP streams.
Args:
session_type: The F1 session type (e.g., "race", "qualifying", "fp1").
Returns:
List of serialized ExtractedStream dicts (live only, sorted).
"""
# For now, all streams are potentially relevant to any session.
# Future extractors may tag streams with session types, at which
# point this method will filter accordingly.
streams = self.get_streams()
logger.debug(
"Returning %d stream(s) for session type '%s'",
len(streams),
session_type,
)
return streams
def get_status(self) -> dict:
"""Return extraction service status for the /extractors endpoint."""
extractor_list = self._registry.list_extractors()
extractor_statuses = []
for info in extractor_list:
key = info["site_key"]
cached = self._cache.get(key, [])
live_count = sum(1 for s in cached if s.is_live)
extractor_statuses.append(
{
"site_key": key,
"site_name": info["site_name"],
"cached_streams": len(cached),
"live_streams": live_count,
}
)
total_cached = sum(len(streams) for streams in self._cache.values())
total_live = sum(
1 for streams in self._cache.values()
for s in streams if s.is_live
)
return {
"extractors": extractor_statuses,
"total_cached_streams": total_cached,
"total_live_streams": total_live,
"last_run": self._last_run,
"last_run_stream_count": self._last_run_stream_count,
}

View file

@ -1,125 +0,0 @@
"""Streamed.pk extractor - fetches F1/motorsport streams via public JSON API."""
import logging
import httpx
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
# Site renamed from streamed.su → streamed.pk in 2026; the .su domain
# stopped resolving the API host (only the marketing page is left).
BASE_URL = "https://streamed.pk"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
class StreamedExtractor(BaseExtractor):
"""Extracts streams from Streamed.pk's public JSON API.
Uses two endpoints:
- GET /api/matches/motor-sports list of events with sources
- GET /api/stream/{source}/{id} embed URL for a specific source
"""
@property
def site_key(self) -> str:
return "streamed"
@property
def site_name(self) -> str:
return "Streamed"
async def extract(self) -> list[ExtractedStream]:
"""Fetch motorsport events and resolve embed URLs for each source."""
streams: list[ExtractedStream] = []
try:
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": USER_AGENT, "Accept": "application/json"},
) as client:
# Get motorsport events
resp = await client.get(f"{BASE_URL}/api/matches/motor-sports")
if resp.status_code != 200:
logger.warning(
"[streamed] Events API returned HTTP %d", resp.status_code
)
return []
events = resp.json()
if not isinstance(events, list):
logger.warning("[streamed] Unexpected events response type")
return []
logger.info("[streamed] Found %d motorsport event(s)", len(events))
for event in events:
title = event.get("title", "Unknown Event")
sources = event.get("sources", [])
if not sources:
continue
for source_info in sources:
source_name = source_info.get("source", "")
source_id = source_info.get("id", "")
if not source_name or not source_id:
continue
try:
stream_resp = await client.get(
f"{BASE_URL}/api/stream/{source_name}/{source_id}"
)
if stream_resp.status_code != 200:
continue
stream_data = stream_resp.json()
if not isinstance(stream_data, list):
stream_data = [stream_data]
for item in stream_data:
embed_url = item.get("embedUrl", "")
if not embed_url:
continue
language = item.get("language", "")
hd = item.get("hd", False)
stream_no = item.get("streamNo", 1)
quality = "HD" if hd else "SD"
stream_title = f"{title}"
if language:
stream_title += f" ({language})"
if stream_no > 1:
stream_title += f" #{stream_no}"
streams.append(
ExtractedStream(
url=embed_url,
site_key=self.site_key,
site_name=self.site_name,
quality=quality,
title=stream_title,
stream_type="embed",
embed_url=embed_url,
)
)
except Exception:
logger.debug(
"[streamed] Failed to fetch stream for %s/%s",
source_name,
source_id,
exc_info=True,
)
except Exception:
logger.exception("[streamed] Failed to fetch events")
logger.info("[streamed] Extracted %d stream(s)", len(streams))
return streams

View file

@ -1,161 +0,0 @@
"""Stremio-addon-driven extractor.
Stremio addons expose a public HTTP API: each addon has a manifest at
`<base>/manifest.json` and per-resource endpoints like
`<base>/stream/<type>/<id>.json` returning `{streams:[{url,name,...}]}`.
This extractor calls a curated set of live-TV addons that surface F1
and Sky-Sports-class motorsport channels. We treat each returned URL as
an ExtractedStream and let the playback verifier confirm playability.
We don't need a Stremio client — we just call the documented HTTP API.
Findings from initial research (2026-05-07):
- **TvVoo** (`tvvoo.hayd.uk`) wraps the Vavoo IPTV network, lists
Sky Sports F1 (UK + IT + DE), DAZN F1, Movistar F1, Canal+ F1,
Viaplay F1. The returned m3u8 URLs are IP-bound at the Vavoo CDN
(`*.ngolpdkyoctjcddxshli469r.org/sunshine/...`); they're tokenised
to whichever IP fetched the manifest. Currently their SSL certs have
expired which fails most clients the addon framework is right but
delivery is degraded today.
- **StremVerse** (`stremverse.onrender.com`) returns 11+ streams per
catalog id (`stremevent_591`=F1, `stremevent_866`=MotoGP). Mix of
DRM-walled DASH, JW-Player-broken-chain JWT, and apar151 HuggingFace
proxy URLs. Master playlists parse; variant URLs sometimes return 404
if they're meant to be resolved by the addon's player rather than
directly.
Adding a new addon = one entry in `_ADDONS`. Each addon's resolver only
needs the manifest + stream endpoints; the addon does the heavy lifting.
"""
import asyncio
import logging
from dataclasses import dataclass
from typing import Iterable
import httpx
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/17.4 Safari/605.1.15"
)
@dataclass(frozen=True)
class _Addon:
name: str
base: str # e.g. "https://tvvoo.hayd.uk"
stream_ids: tuple[tuple[str, str, str], ...]
"""(stream_type, stream_id, label) per F1/motorsport entry."""
# Curated addon list — see module docstring. These IDs are documented in
# the addons' manifests / channel lists. Update when channel names/IDs
# rotate.
_ADDONS: tuple[_Addon, ...] = (
_Addon(
name="TvVoo",
base="https://tvvoo.hayd.uk",
stream_ids=(
("tv", "vavoo_SKY%20SPORTS%20F1|group:uk", "Sky Sports F1 UK (Vavoo)"),
("tv", "vavoo_SKY%20SPORTS%20F1%20HD|group:uk", "Sky Sports F1 HD UK (Vavoo)"),
("tv", "vavoo_SKY%20SPORT%20F1|group:it", "Sky Sport F1 IT (Vavoo)"),
("tv", "vavoo_SKY%20SPORT%20F1%20HD|group:de", "Sky Sport F1 DE (Vavoo)"),
("tv", "vavoo_DAZN%20F1|group:es", "DAZN F1 ES (Vavoo)"),
),
),
_Addon(
name="StremVerse",
base="https://stremverse.onrender.com",
stream_ids=(
("tv", "stremevent_591", "Formula 1 (StremVerse)"),
("tv", "stremevent_866", "MotoGP (StremVerse)"),
),
),
)
class StremioAddonExtractor(BaseExtractor):
"""Pull F1 + Sky-class motorsport URLs from public Stremio addons."""
@property
def site_key(self) -> str:
return "stremio"
@property
def site_name(self) -> str:
return "Stremio Addon"
async def extract(self) -> list[ExtractedStream]:
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": USER_AGENT},
# Some addons (TvVoo→Vavoo) hand back URLs whose origin certs
# are expired; honest-default verify=True is preserved here so
# the verifier sees the same TLS errors a browser would.
) as client:
tasks = []
for addon in _ADDONS:
for stype, sid, label in addon.stream_ids:
tasks.append(self._resolve(client, addon, stype, sid, label))
results = await asyncio.gather(*tasks, return_exceptions=True)
streams: list[ExtractedStream] = []
for r in results:
if isinstance(r, Exception):
logger.debug("[stremio] resolve failed: %s", r)
continue
streams.extend(r)
logger.info("[stremio] surfaced %d candidate stream URL(s) across %d addon(s)",
len(streams), len(_ADDONS))
return streams
async def _resolve(
self, client: httpx.AsyncClient, addon: _Addon,
stype: str, sid: str, label: str,
) -> list[ExtractedStream]:
url = f"{addon.base}/stream/{stype}/{sid}.json"
try:
resp = await client.get(url)
except Exception as e:
logger.debug("[stremio] %s fetch failed: %s", url, e)
return []
if resp.status_code != 200:
logger.debug("[stremio] %s -> HTTP %d", url, resp.status_code)
return []
try:
data = resp.json()
except Exception:
return []
out: list[ExtractedStream] = []
for idx, s in enumerate(data.get("streams") or []):
stream_url = (s.get("url") or "").strip()
if not stream_url:
continue
# Skip DRM-tagged entries — they need Widevine which neither
# our verifier nor a clean hls.js path can play.
if "DRM" in (s.get("name") or "").upper():
continue
title = label
if idx > 0:
title = f"{label} #{idx + 1}"
out.append(
ExtractedStream(
url=stream_url,
site_key=self.site_key,
site_name=f"{addon.name}",
quality="",
title=title,
stream_type="m3u8",
)
)
return out

View file

@ -1,249 +0,0 @@
"""Subreddit extractor — pulls community-curated live-stream URLs from
the *MotorsportsReplays* subreddit (and a few siblings).
The community follows a stable pattern: a single mod-curated post titled
`[Watch / Download] <Series> <Year> - <Round> | <Event>` goes up on or
near each race weekend with a `**Watch Online:**` link in the selftext,
pointing at an admin-run WordPress site (motomundo.net for MotoGP, the
F1 equivalent has rotated over the years). That WordPress page hosts
iframe embeds whose m3u8 is JS-computed at load time ideal target for
the chrome-service pipeline downstream.
This extractor:
- Hits Reddit with a real-browser User-Agent (httpx default UA + cluster
IP combo gets HTTP 403'd on r/motogp; a Safari UA does not).
- Searches for the `[Watch` thread pattern AND scans `/new.json` for
any flair set to LIVE.
- Pulls selftext URLs and returns each candidate as an `embed`-type
ExtractedStream. The verifier already drives chrome-service for embed
streams, so the m3u8 capture happens there.
"""
import asyncio
import logging
import re
import urllib.parse
from typing import NamedTuple
import httpx
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/17.4 Safari/605.1.15"
)
# Subreddits to scan.
# - r/motorsportsstreams2 is the active 12.5k-sub successor to the banned
# r/motorsportstreams; race-weekend "[F1 STREAM]" posts include
# `boxboxbox.pro/stream-1` URLs and similar fresh aggregator links.
# - r/MotorsportsReplays runs the [Watch / Download] mod-post pattern
# linking to motomundo.net (MotoGP) and sister sites.
# - The rest are low-yield but cost nothing.
SUBREDDITS: tuple[str, ...] = (
"motorsportsstreams2",
"MotorsportsReplays",
"f1streams",
"motorsports",
"formula1",
"motogp",
)
# Search queries fired against r/motorsportsstreams2 + r/MotorsportsReplays.
# The first set captures the [Watch / Download] mod posts; the second set
# catches race-weekend live discussion threads.
SEARCH_QUERIES: tuple[str, ...] = (
"Watch Download F1 2026",
"Watch Download MotoGP 2026",
"Watch Online F1 2026",
"F1 STREAM live",
"Sky Sports F1 live",
"Sky F1 stream",
)
# Hosts we accept as "interesting" stream-page URLs. These are the
# admin-curated WordPress / aggregator sites the community links to.
# Anchored to what r/motorsportsstreams2 currently posts (May 2026 sweep).
_INTERESTING_HOSTS = (
# WordPress wrappers / community-run sites
"motomundo.net", # MotoGP — admin-curated WP
"motomundo.top", # MotoMundo embed host
"motomundo.upns.xyz", # MotoMundo embed host (newer)
"freemotorsports.com", # WAC successor curated link list
"boxboxbox.pro", # F1 race-weekend aggregator (community fav)
"boxboxbox.live", # boxboxbox sister
"boxboxbox.lol",
# Aggregators we already have direct extractors for, but Reddit may
# surface event-specific deeplinks (e.g. /watch/<UUID>) we'd miss
# otherwise.
"pitsport.xyz",
"pitsport.live",
"rerace.io",
"dd12streams.com",
"ppv.to",
"streamed.pk",
"acestrlms.pages.dev",
"aceztrims.pages.dev",
# Sport-specific direct CDNs that occasionally appear in posts
"racelive.jp", # Super Formula
"cdn.sfgo.jp", # Super Formula CDN
# Speculative F1 sister sites — pattern likely if motomundo for MotoGP
"f1mundo.net",
"f1.live",
"f1live",
"skystreams",
"raceon",
"watchf1",
)
# URLs we actively never try to scrape (auth-walled, social media,
# direct downloads with no live stream).
_REJECT_HOSTS = (
"discord.gg", "discord.com",
"twitter.com", "x.com",
"youtube.com", "youtu.be",
"instagram.com", "tiktok.com",
"f1tv.formula1.com",
"viktorbarzin.me",
"gofile.io",
"mega.nz", "drive.google.com",
"1fichier.com", "rapidgator", "uploaded.net",
"magnet:",
)
_URL_RE = re.compile(r"https?://[^\s\)\]\>\"']+")
class _Candidate(NamedTuple):
title: str
url: str
subreddit: str
flair: str
def _is_interesting(url: str) -> bool:
low = url.lower()
if any(host in low for host in _REJECT_HOSTS):
return False
return any(host in low for host in _INTERESTING_HOSTS)
def _has_live_marker(post: dict) -> bool:
title = (post.get("title") or "").lower()
flair = (post.get("link_flair_text") or "").lower()
if "[watch" in title or "watch online" in title or "live" in flair:
return True
return False
class SubredditExtractor(BaseExtractor):
"""Scan motorsport subreddits for community-curated live-stream URLs."""
@property
def site_key(self) -> str:
return "subreddit"
@property
def site_name(self) -> str:
return "Subreddit"
async def extract(self) -> list[ExtractedStream]:
# NB: do NOT send `Accept: application/json` — Reddit's anti-bot
# fingerprint flags that header from datacenter IPs and returns
# HTTP 403 with HTML. Default Accept (`*/*`) gets through fine
# and `.json` URLs always return JSON regardless.
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": USER_AGENT},
) as client:
tasks = [self._fetch_new(client, sub) for sub in SUBREDDITS]
tasks.extend(self._search(client, q) for q in SEARCH_QUERIES)
results = await asyncio.gather(*tasks, return_exceptions=True)
candidates: list[_Candidate] = []
for r in results:
if isinstance(r, Exception):
logger.debug("[subreddit] fetch failed: %s", r)
continue
candidates.extend(r)
# Dedupe by URL, keep first occurrence.
seen: set[str] = set()
picks: list[_Candidate] = []
for c in candidates:
if c.url in seen:
continue
seen.add(c.url)
picks.append(c)
logger.info(
"[subreddit] scanned %d source(s) — %d unique candidate URL(s)",
len(SUBREDDITS) + len(SEARCH_QUERIES), len(picks),
)
return [
ExtractedStream(
url=c.url,
site_key=self.site_key,
site_name=f"r/{c.subreddit}",
quality="",
title=c.title[:100],
stream_type="embed",
embed_url=c.url,
)
for c in picks
]
async def _fetch_new(self, client: httpx.AsyncClient, sub: str) -> list[_Candidate]:
return await self._collect(
client,
f"https://www.reddit.com/r/{sub}/new.json?limit=25",
sub,
)
async def _search(self, client: httpx.AsyncClient, query: str) -> list[_Candidate]:
q = urllib.parse.quote_plus(query)
return await self._collect(
client,
f"https://www.reddit.com/r/MotorsportsReplays/search.json?q={q}&restrict_sr=on&sort=new&limit=10",
"MotorsportsReplays",
)
async def _collect(
self, client: httpx.AsyncClient, url: str, sub: str
) -> list[_Candidate]:
try:
resp = await client.get(url)
except Exception as e:
logger.debug("[subreddit] fetch %s failed: %s", url, e)
return []
if resp.status_code != 200:
logger.debug("[subreddit] %s -> HTTP %d", url, resp.status_code)
return []
try:
data = resp.json()
except Exception:
return []
out: list[_Candidate] = []
for child in (data.get("data", {}) or {}).get("children", []):
d = child.get("data", {}) or {}
if not _has_live_marker(d):
continue
text = (d.get("selftext") or "")
title = d.get("title") or ""
flair = d.get("link_flair_text") or ""
# First, the linked URL itself (if it's a recognised live site).
top = d.get("url") or ""
if top and _is_interesting(top):
out.append(_Candidate(title, top, sub, flair))
# Then any URL embedded in the selftext that points at a
# community-curated live page.
for u in _URL_RE.findall(text):
if _is_interesting(u):
out.append(_Candidate(title, u, sub, flair))
return out

View file

@ -1,190 +0,0 @@
"""TimStreams extractor - fetches F1 streams from the TimStreams JSON API.
Returns embed URLs from hmembeds.one for iframe playback.
The public API at stra.viaplus.site/main requires no authentication
and returns all events/channels across Events, Replays, and 24/7 categories.
"""
import logging
import httpx
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
API_URL = "https://stra.viaplus.site/main"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
# Direct F1 keyword matches (case-insensitive)
F1_KEYWORDS = {"formula 1", "formula one", "f1", "sky sports f1", "dazn f1"}
# "Grand prix" is F1-related only if non-F1 motorsport keywords are absent
GP_KEYWORD = "grand prix"
# Exclude these motorsport series when matching on "grand prix"
NON_F1_KEYWORDS = {
"motogp", "moto gp", "moto2", "moto3", "motoe",
"indycar", "indy car", "nascar",
"rally", "wrc", "wec", "lemans", "le mans",
"superbike", "dtm", "supercars",
}
# 24/7 channels that should always be included (embed hashes on hmembeds.one)
ALWAYS_INCLUDE_HASHES = {
"888520f36cd94c5da4c71fddc1a5fc9b", # Sky Sports F1
"fc3a54634d0867b0c02ee3223292e7c6", # DAZN F1
}
def _is_f1_event(name: str) -> bool:
"""Check if an event/channel is Formula 1 related by name.
Returns True when the name contains a direct F1 keyword, or contains
"grand prix" without non-F1 series keywords.
Note: The TimStreams API genre field (genre=2) covers ALL sports channels,
not just motorsport, so we rely solely on name-based matching.
"""
lower = name.lower()
# Direct F1 keyword match
if any(kw in lower for kw in F1_KEYWORDS):
return True
# Grand prix without competing series
if GP_KEYWORD in lower and not any(kw in lower for kw in NON_F1_KEYWORDS):
return True
return False
def _extract_embed_hash(url: str) -> str | None:
"""Extract the hash from an hmembeds.one embed URL.
Expected format: https://hmembeds.one/embed/{hash}
Returns the hash string, or None if the URL is not in the expected format.
"""
if not url:
return None
# Handle both with and without trailing slash
url = url.rstrip("/")
prefix = "https://hmembeds.one/embed/"
alt_prefix = "http://hmembeds.one/embed/"
if url.startswith(prefix):
return url[len(prefix):] or None
if url.startswith(alt_prefix):
return url[len(alt_prefix):] or None
return None
def _is_always_include(url: str) -> bool:
"""Check if a stream URL is one of the always-include 24/7 channels."""
embed_hash = _extract_embed_hash(url)
return embed_hash in ALWAYS_INCLUDE_HASHES if embed_hash else False
class TimStreamsExtractor(BaseExtractor):
"""Extracts embed URLs from TimStreams' public JSON API.
The API at stra.viaplus.site/main returns a JSON array of categories,
each containing events with stream URLs pointing to hmembeds.one embeds.
"""
@property
def site_key(self) -> str:
return "timstreams"
@property
def site_name(self) -> str:
return "TimStreams"
async def extract(self) -> list[ExtractedStream]:
"""Fetch F1 events/channels and return embed URLs for iframe playback."""
streams: list[ExtractedStream] = []
seen_urls: set[str] = set()
try:
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": USER_AGENT, "Accept": "application/json"},
) as client:
resp = await client.get(API_URL)
if resp.status_code != 200:
logger.warning(
"[timstreams] API returned HTTP %d", resp.status_code
)
return []
data = resp.json()
if not isinstance(data, list):
logger.warning("[timstreams] Unexpected API response type: %s", type(data).__name__)
return []
logger.info("[timstreams] API returned %d categorie(s)", len(data))
for category in data:
category_name = category.get("category", "Unknown")
events = category.get("events", [])
if not isinstance(events, list):
continue
for event in events:
event_name = event.get("name", "Unknown")
event_streams = event.get("streams", [])
if not isinstance(event_streams, list) or not event_streams:
continue
# Check if any stream URL matches an always-include channel
always_include = any(
_is_always_include(s.get("url", ""))
for s in event_streams
)
# Filter: must be F1-related or an always-include channel
if not always_include and not _is_f1_event(event_name):
continue
for stream_info in event_streams:
stream_name = stream_info.get("name", "")
stream_url = stream_info.get("url", "")
if not stream_url:
continue
# Deduplicate by URL
if stream_url in seen_urls:
continue
seen_urls.add(stream_url)
# Build a descriptive title
title = event_name
if stream_name and stream_name.lower() != event_name.lower():
title = f"{event_name} - {stream_name}"
if category_name:
title = f"[{category_name}] {title}"
streams.append(
ExtractedStream(
url=stream_url,
site_key=self.site_key,
site_name=self.site_name,
quality="",
title=title,
stream_type="embed",
embed_url=stream_url,
)
)
except httpx.TimeoutException:
logger.warning("[timstreams] API request timed out")
except Exception:
logger.exception("[timstreams] Failed to fetch from API")
logger.info("[timstreams] Extracted %d stream(s)", len(streams))
return streams