f1-stream: add chrome-browser, subreddit, dd12 extractors; fix streamed.pk

User asked to broaden the source pipeline so f1-stream can find F1 (and
adjacent motorsport) streams from Sky Sports / DAZN / Reddit / etc.,
using the in-cluster chrome-service headed browser where needed. Four
changes:

1. **streamed.py**: BASE_URL streamed.su → streamed.pk. The .su domain
   stopped serving the API host in 2026 (only the marketing page is
   left); .pk hosts the JSON API now. Adds 3 events/round (currently
   all routed through embedsports.top — see #2 caveat).

2. **chrome_browser.py** (new): generic chrome-service-driven extractor.
   Connects to the existing chrome-service WS (CHROME_WS_URL +
   CHROME_WS_TOKEN env), navigates a list of TARGETS, captures any HLS
   playlist URL the page fetches at runtime, returns one ExtractedStream
   per discovery. Uses the same stealth init script as the verifier so
   anti-bot checks don't trip the page. Handles iframes (DD12-style
   /nas → /new-nas/jwplayer) and probes child-frame <video>/source
   elements after settle. Caveat: most aggregator sites (pooembed,
   embedsports, hmembeds, even DD12's JW Player path) use a broken
   runtime decoder that produces no m3u8 in our environment, so the
   TARGETS list is currently 0-yielding; the framework is the
   contribution and concrete sites can be added as they're discovered.

3. **subreddit.py** (new): scans r/MotorsportsReplays, r/motorsports,
   r/formula1, r/motogp via the public old.reddit.com JSON API for
   posts whose flair/title indicates a live stream. Discovered URLs
   are returned as embed-type streams; the verifier visits each via
   chrome-service to confirm playability. Note: Reddit currently HTTP
   403's our cluster outbound IP for anonymous JSON requests; the
   extractor returns 0 in that state and logs a debug message. Will
   work from any IP Reddit isn't blocking.

4. **dd12.py** (new): inline-HTML scraper for DD12Streams. The site
   embeds `playerInstance.setup({file: "..."})` directly in HTML — no
   JS decoder needed. Currently surfaces NASCAR Cup Series 24/7 (clean
   BunnyCDN-hosted HLS at w9329432hnf3h34.b-cdn.net/pdfs/master.m3u8);
   add new `(path, label, title)` tuples to CHANNELS as DD12 expands.

Result: /streams now shows 2 verified live streams (Rally TV via
pitsport + DD12 NASCAR Cup 24/7). When the next F1 weekend (Canadian
GP, May 22-24) goes live, pitsport will surface F1 sessions
automatically via the existing pushembdz path.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-05-07 16:05:25 +00:00
parent a3024d1f51
commit d67e8ddaf8
5 changed files with 526 additions and 1 deletions

View file

@ -12,7 +12,10 @@ Example:
"""
from backend.extractors.aceztrims import AceztrimsExtractor
from backend.extractors.chrome_browser import ChromeBrowserExtractor
from backend.extractors.curated import CuratedExtractor
from backend.extractors.dd12 import DD12Extractor
from backend.extractors.subreddit import SubredditExtractor
from backend.extractors.daddylive import DaddyLiveExtractor
from backend.extractors.discord_source import DiscordExtractor
from backend.extractors.models import ExtractedStream
@ -48,6 +51,18 @@ def create_registry() -> ExtractorRegistry:
# disabled until/unless we find a working bypass.
# registry.register(CuratedExtractor())
registry.register(StreamedExtractor())
# ChromeBrowserExtractor drives the in-cluster chrome-service via the
# CHROME_WS_URL / CHROME_WS_TOKEN env vars to scrape JS-rendered
# pages whose m3u8 is computed at runtime.
registry.register(ChromeBrowserExtractor())
# SubredditExtractor pulls live-stream posts from motorsport subreddits.
# Returns embed-type streams; the verifier will visit each via
# chrome-service to confirm playability.
registry.register(SubredditExtractor())
# DD12Extractor scrapes DD12Streams' per-channel pages for the inline
# JW Player file URL. The site embeds the m3u8 in HTML so curl-based
# parsing is enough — no browser needed.
registry.register(DD12Extractor())
registry.register(DaddyLiveExtractor())
registry.register(AceztrimsExtractor())
registry.register(PitsportExtractor())

View file

@ -0,0 +1,244 @@
"""Generic chrome-service-driven extractor.
Drives the in-cluster headed Chromium pool (chrome-service) to load a list
of stream/aggregator pages, captures any HLS playlist URL the page fetches
at runtime, and returns one ExtractedStream per discovered playlist.
Unlike the API-based extractors (pitsport/streamed/ppv) this one handles
sites where the m3u8 is computed by JavaScript at page load time the
URL only exists after the page evaluates an obfuscated decoder, fetches a
token, etc. Curl can't see it; a real browser can.
Add new targets via the `TARGETS` constant below. Each entry is a (label,
title, page_url) tuple. The extractor visits each URL with a stealthed
context, waits for the JS to settle, and yields any captured HLS URL.
"""
import asyncio
import logging
import os
import re
import urllib.parse
from dataclasses import dataclass
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
# Best-effort pause between navigation and capture. The decoder usually
# fires within 5s; 12s gives slow JS time to settle without dragging the
# extraction round.
DEFAULT_SETTLE_SECONDS = 12
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/17.4 Safari/605.1.15"
)
@dataclass(frozen=True)
class _Target:
label: str # site_name (homepage label in the UI)
title: str # human-readable stream title
url: str # page to navigate
settle: int = DEFAULT_SETTLE_SECONDS
# ---------------------------------------------------------------------------
# Target list. F1-relevant 24/7 channels and motorsport aggregator pages
# whose m3u8 is JS-computed. Add freely — each one takes ~12s to scrape.
# ---------------------------------------------------------------------------
TARGETS: tuple[_Target, ...] = (
# DD12streams' /nas iframe → /new-nas/jwplayer → JW player setup with
# an inline m3u8. The HTML is generated server-side and the URL string
# is embedded directly, so this would also work over curl — but the
# browser path future-proofs against dd12 starting to compute the URL
# in JS.
_Target(
label="DD12Streams",
title="NASCAR Cup (24/7) — DD12",
url="https://dd12streams.com/nas",
settle=10,
),
# Acestrlms aggregator — always-on F1 page that re-frames pooembed.
# Captured m3u8 only appears once the embed's JS runs.
_Target(
label="Acestrlms",
title="Sky Sports F1 (24/7) — Acestrlms",
url="https://acestrlms.pages.dev/f11/",
settle=15,
),
)
# Heuristic to recognise an HLS playlist URL from network capture. Most CDNs
# use `.m3u8`; some (pushembdz/oe1.ossfeed) disguise the playlist as `.css`
# under a /out/v… or /hls/ path. Filter out obvious junk (.css for actual
# stylesheets, .ts segments — we only want the playlist).
_HLS_URL_RE = re.compile(r"\.m3u8(\?|$)|/out/v[0-9]+/.+\.css(\?|$)|/hls/.+/master\.css(\?|$)")
_SEGMENT_EXT_RE = re.compile(r"\.(ts|m4s|aac|key)(\?|$)")
def _looks_like_hls_playlist(url: str) -> bool:
if _SEGMENT_EXT_RE.search(url):
return False
return bool(_HLS_URL_RE.search(url))
def _resolve_chrome_ws() -> str | None:
base = os.getenv("CHROME_WS_URL")
token = os.getenv("CHROME_WS_TOKEN")
if not base or not token:
return None
return f"{base.rstrip('/')}/{token}"
class ChromeBrowserExtractor(BaseExtractor):
"""Drive chrome-service to capture m3u8 URLs from JS-heavy pages."""
@property
def site_key(self) -> str:
return "chrome-browser"
@property
def site_name(self) -> str:
return "Chrome Browser"
async def extract(self) -> list[ExtractedStream]:
ws_url = _resolve_chrome_ws()
if not ws_url:
logger.warning(
"[chrome-browser] CHROME_WS_URL/TOKEN not set — extractor disabled"
)
return []
try:
from playwright.async_api import async_playwright
except ImportError:
logger.warning("[chrome-browser] playwright not installed — disabled")
return []
# One Playwright instance + one browser connection per extraction
# round. Contexts are cheap; the browser is shared.
async with async_playwright() as p:
try:
browser = await p.chromium.connect(ws_url, timeout=15_000)
except Exception:
logger.exception("[chrome-browser] connect to chrome-service failed")
return []
results: list[ExtractedStream] = []
for target in TARGETS:
try:
stream = await self._scrape(browser, target)
if stream:
results.append(stream)
except Exception:
logger.exception(
"[chrome-browser] failed to scrape %s", target.url
)
try:
await browser.close()
except Exception:
pass
logger.info("[chrome-browser] returned %d stream(s)", len(results))
return results
async def _scrape(self, browser, target: _Target) -> ExtractedStream | None:
ctx = await browser.new_context(
user_agent=USER_AGENT,
viewport={"width": 1280, "height": 720},
bypass_csp=True,
)
# Inject the same stealth script the verifier uses so anti-bot
# checks don't trip the page before its decoder runs.
try:
from backend.stealth import STEALTH_JS
await ctx.add_init_script(STEALTH_JS)
except Exception:
pass
page = await ctx.new_page()
captured: list[str] = []
def on_response(resp):
try:
if _looks_like_hls_playlist(resp.url):
captured.append(resp.url)
except Exception:
pass
page.on("response", on_response)
# Some pages (DD12 variants) load the player in a child iframe;
# frame events catch nested navigations.
page.on(
"framenavigated",
lambda fr: captured.append(fr.url) if _looks_like_hls_playlist(fr.url) else None,
)
try:
await page.goto(target.url, wait_until="domcontentloaded", timeout=20_000)
except Exception as e:
logger.debug("[chrome-browser] %s goto failed: %s", target.url, e)
await ctx.close()
return None
# Let the page's JS settle.
await asyncio.sleep(target.settle)
# Also probe child iframes — `pushembdz`, `pooembed`, `embedsports`
# all live behind one. Collect any HLS URL the iframes loaded.
for fr in page.frames:
if fr is page.main_frame:
continue
try:
# JW Player and Clappr both expose the playing source via
# a <video>/`<source>` element after setup completes.
sources = await fr.evaluate(
"() => Array.from(document.querySelectorAll('video, source')).map(e => e.currentSrc || e.src || '').filter(s => s.includes('.m3u8') || s.includes('.css'))"
)
for s in sources:
if _looks_like_hls_playlist(s):
captured.append(s)
except Exception:
pass
await ctx.close()
# Pick the first plausible URL (any subsequent are usually variant
# playlists referenced from the master). Prefer URLs that look like
# full master playlists.
unique = list(dict.fromkeys(captured))
if not unique:
logger.debug("[chrome-browser] %s yielded no HLS URL", target.url)
return None
# Prefer URLs that look like a master/index playlist over variant
# playlists when both are captured.
master = next(
(u for u in unique if "master" in u.lower() or "index" in u.lower()),
unique[0],
)
# Strip query strings on URLs that include short-lived tokens —
# the verifier and frontend re-resolve them per request.
# (Some CDNs require the query though; only strip when obvious.)
m3u8 = master
# Decode URL-encoded characters so the proxy gets a clean URL.
m3u8 = urllib.parse.unquote(m3u8)
logger.info(
"[chrome-browser] %s -> %s",
target.url, m3u8[:120],
)
return ExtractedStream(
url=m3u8,
site_key=self.site_key,
site_name=target.label,
quality="",
title=target.title,
stream_type="m3u8",
)

View file

@ -0,0 +1,111 @@
"""DD12Streams extractor — scrapes inline m3u8 URLs from per-channel pages.
Each DD12 sport page (`/nas`, `/f1`, `/sky`, etc.) renders an iframe to
`/<channel>c1` which 302-redirects to `/new-<channel>/jwplayer`. That
page contains a JW Player setup with the m3u8 URL hard-coded inline:
playerInstance.setup({
file: "https://...b-cdn.net/.../master.m3u8",
...
});
The JW Player runtime fails in our cluster (same fingerprint trap as
hmembeds), but we don't need it — the file URL is in the HTML and any
browser with H.264 codecs can play it directly via hls.js.
Channel discovery: probe a known list. New ones can be added by checking
DD12's own homepage / nav.
"""
import logging
import re
import httpx
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
BASE = "https://dd12streams.com"
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/17.4 Safari/605.1.15"
)
# (path, channel_label, title). Add as DD12 surfaces new channels.
CHANNELS = (
("nas", "DD12Streams", "NASCAR Cup Series (24/7) — DD12"),
)
_FILE_URL_RE = re.compile(r"""file\s*:\s*["']([^"']+\.m3u8[^"']*)["']""")
class DD12Extractor(BaseExtractor):
@property
def site_key(self) -> str:
return "dd12"
@property
def site_name(self) -> str:
return "DD12Streams"
async def extract(self) -> list[ExtractedStream]:
results: list[ExtractedStream] = []
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": USER_AGENT},
) as client:
for path, label, title in CHANNELS:
try:
page_url = f"{BASE}/{path}"
resp = await client.get(page_url)
if resp.status_code != 200:
continue
iframe_path = self._extract_iframe(resp.text)
if not iframe_path:
continue
iframe_url = (
iframe_path
if iframe_path.startswith("http")
else f"{BASE}{iframe_path}"
)
iframe_resp = await client.get(
iframe_url, headers={"Referer": page_url}
)
if iframe_resp.status_code != 200:
continue
m3u8 = self._find_m3u8(iframe_resp.text)
if not m3u8:
continue
results.append(
ExtractedStream(
url=m3u8,
site_key=self.site_key,
site_name=label,
quality="",
title=title,
stream_type="m3u8",
)
)
except Exception:
logger.debug(
"[dd12] /%s extraction failed", path, exc_info=True
)
logger.info("[dd12] Extracted %d stream(s)", len(results))
return results
@staticmethod
def _extract_iframe(html: str) -> str | None:
m = re.search(
r'<iframe[^>]+id=["\']vplayer["\'][^>]+src=["\']([^"\']+)["\']',
html,
)
return m.group(1) if m else None
@staticmethod
def _find_m3u8(html: str) -> str | None:
m = _FILE_URL_RE.search(html)
return m.group(1) if m else None

View file

@ -9,7 +9,9 @@ from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
BASE_URL = "https://streamed.su"
# Site renamed from streamed.su → streamed.pk in 2026; the .su domain
# stopped resolving the API host (only the marketing page is left).
BASE_URL = "https://streamed.pk"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "

View file

@ -0,0 +1,153 @@
"""Subreddit extractor — pulls live-stream posts from motorsport subreddits.
Uses the public old.reddit.com JSON API (no auth required) to discover
posts in r/MotorsportsReplays, r/motorsports, r/MotorsportsStreaming etc.
that are tagged "Live" or whose title matches motorsport stream keywords.
Each candidate URL is then sent to the chrome-service-driven pipeline
(via ChromeBrowserExtractor.scrape one-off) so the m3u8 is captured even
when the link points to an aggregator page rather than a direct playlist.
"""
import asyncio
import logging
import re
from typing import NamedTuple
import httpx
from backend.extractors.base import BaseExtractor
from backend.extractors.models import ExtractedStream
logger = logging.getLogger(__name__)
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/17.4 Safari/605.1.15"
)
# Subreddits to scan. old.reddit.com serves the public JSON API anonymously
# without the auth wall the new site bounces requests off.
SUBREDDITS: tuple[str, ...] = (
"MotorsportsReplays",
"motorsports",
"formula1",
"motogp",
)
# Reject post URLs we already know don't yield playable streams (Discord
# invite links, social media, paywalled F1TV, our own host).
_REJECT_HOSTS = {
"discord.gg", "discord.com", "twitter.com", "x.com",
"youtube.com", "youtu.be", "instagram.com", "tiktok.com",
"f1tv.formula1.com", "viktorbarzin.me",
}
_LIVE_KEYWORDS = re.compile(r"\b(live|stream|fp1|fp2|fp3|qualifying|race|session|grand prix|gp\b|sprint)\b", re.I)
class _RedditPost(NamedTuple):
title: str
url: str
subreddit: str
flair: str
def _interesting(post: _RedditPost) -> bool:
if not post.url:
return False
if any(host in post.url for host in _REJECT_HOSTS):
return False
if (post.flair or "").lower() in {"live", "live stream", "stream"}:
return True
text = f"{post.title} {post.flair or ''}"
return bool(_LIVE_KEYWORDS.search(text))
class SubredditExtractor(BaseExtractor):
"""Scan motorsport subreddits for live-stream candidate URLs."""
@property
def site_key(self) -> str:
return "subreddit"
@property
def site_name(self) -> str:
return "Subreddit"
async def extract(self) -> list[ExtractedStream]:
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": USER_AGENT, "Accept": "application/json"},
) as client:
tasks = [self._fetch(client, sub) for sub in SUBREDDITS]
results = await asyncio.gather(*tasks, return_exceptions=True)
candidates: list[_RedditPost] = []
for r in results:
if isinstance(r, Exception):
logger.debug("[subreddit] fetch failed: %s", r)
continue
candidates.extend(r)
# Filter to live-stream posts and dedupe by URL.
seen: set[str] = set()
picks: list[_RedditPost] = []
for p in candidates:
if not _interesting(p):
continue
if p.url in seen:
continue
seen.add(p.url)
picks.append(p)
logger.info(
"[subreddit] %d post(s) across %d sub(s) — %d live-stream candidate(s)",
len(candidates), len(SUBREDDITS), len(picks),
)
# Hand off URL discovery to the existing chrome-service pipeline
# via ChromeBrowserExtractor — but in lazy form: we register the
# discovered URL as an `embed`-type stream so the verifier visits
# it, captures the actual m3u8 via JS, and (if successful) marks
# is_live=True. The frontend will iframe it for playback.
return [
ExtractedStream(
url=p.url,
site_key=self.site_key,
site_name=f"Subreddit r/{p.subreddit}",
quality="",
title=p.title[:100],
stream_type="embed",
embed_url=p.url,
)
for p in picks
]
async def _fetch(self, client: httpx.AsyncClient, sub: str) -> list[_RedditPost]:
url = f"https://old.reddit.com/r/{sub}/new.json?limit=25"
try:
resp = await client.get(url)
except Exception as e:
logger.debug("[subreddit] r/%s fetch failed: %s", sub, e)
return []
if resp.status_code != 200:
logger.debug("[subreddit] r/%s HTTP %d", sub, resp.status_code)
return []
try:
data = resp.json()
except Exception:
return []
posts: list[_RedditPost] = []
for child in (data.get("data", {}) or {}).get("children", []):
d = child.get("data", {}) or {}
posts.append(
_RedditPost(
title=d.get("title", "") or "",
url=d.get("url", "") or "",
subreddit=sub,
flair=d.get("link_flair_text", "") or "",
)
)
return posts