freedify/app/spotify_service.py
2026-01-13 22:26:48 +00:00

496 lines
21 KiB
Python

"""
Spotify service for Freedify.
Provides playlist/album fetching and URL parsing.
ONLY used when a Spotify URL is pasted - not for search (to avoid rate limits).
"""
import httpx
import re
from typing import Optional, Dict, List, Any, Tuple
import logging
from random import randrange
logger = logging.getLogger(__name__)
def get_random_user_agent():
return f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_{randrange(11, 15)}_{randrange(4, 9)}) AppleWebKit/{randrange(530, 537)}.{randrange(30, 37)} (KHTML, like Gecko) Chrome/{randrange(80, 105)}.0.{randrange(3000, 4500)}.{randrange(60, 125)} Safari/{randrange(530, 537)}.{randrange(30, 36)}"
class SpotifyService:
"""Service for fetching metadata from Spotify URLs (not for search)."""
TOKEN_URL = "https://open.spotify.com/get_access_token?reason=transport&productType=web_player"
AUTH_URL = "https://accounts.spotify.com/api/token"
API_BASE = "https://api.spotify.com/v1"
# Regex patterns for Spotify URLs
URL_PATTERNS = {
'track': re.compile(r'(?:spotify\.com/track/|spotify:track:)([a-zA-Z0-9]+)'),
'album': re.compile(r'(?:spotify\.com/album/|spotify:album:)([a-zA-Z0-9]+)'),
'playlist': re.compile(r'(?:spotify\.com/playlist/|spotify:playlist:)([a-zA-Z0-9]+)'),
'artist': re.compile(r'(?:spotify\.com/artist/|spotify:artist:)([a-zA-Z0-9]+)'),
}
def __init__(self):
import os
self.access_token: Optional[str] = None
self.client_id = os.environ.get("SPOTIFY_CLIENT_ID")
self.client_secret = os.environ.get("SPOTIFY_CLIENT_SECRET")
self.sp_dc = os.environ.get("SPOTIFY_SP_DC")
self.client = httpx.AsyncClient(timeout=30.0)
async def _get_access_token(self) -> str:
"""Get access token (Client Creds > Cookie > Web Player > Embed)."""
if self.access_token:
return self.access_token
# 1. Try Client Credentials Flow
if self.client_id and self.client_secret:
try:
import base64
auth_str = f"{self.client_id}:{self.client_secret}"
b64_auth = base64.b64encode(auth_str.encode()).decode()
headers = {
"Authorization": f"Basic {b64_auth}",
"Content-Type": "application/x-www-form-urlencoded"
}
data = {"grant_type": "client_credentials"}
response = await self.client.post(self.AUTH_URL, headers=headers, data=data)
if response.status_code == 200:
token_data = response.json()
self.access_token = token_data.get("access_token")
logger.info("Got Spotify token via Client Credentials")
return self.access_token
except Exception as e:
logger.error(f"Client Credentials auth failed: {e}")
# 2. Try Cookie Auth (sp_dc) - Mimics logged-in Web Player
# This is the best fallback if Developer App creation is blocked
cookies = None
if self.sp_dc:
cookies = {"sp_dc": self.sp_dc}
logger.info("Using provided sp_dc cookie for authentication")
# 3. Web Player Token (Anonymous or Authenticated via Cookie)
headers = {
"User-Agent": get_random_user_agent(),
"Accept": "application/json",
"Referer": "https://open.spotify.com/",
}
try:
# If cookies are passed, this request becomes authenticated!
response = await self.client.get(self.TOKEN_URL, headers=headers, cookies=cookies)
if response.status_code == 200:
data = response.json()
self.access_token = data.get("accessToken")
if self.access_token:
logger.info(f"Got Spotify token via Web Player ({'Authenticated' if cookies else 'Anonymous'})")
return self.access_token
except Exception as e:
logger.warning(f"Web Player token fetch failed: {e}")
# 4. Fallback: Embed Page
if response.status_code == 200:
data = response.json()
self.access_token = data.get("accessToken")
if self.access_token:
logger.info("Got Spotify token via direct method")
return self.access_token
except Exception as e:
logger.warning(f"Direct token fetch failed: {e}")
# 3. Fallback: Embed Page
try:
embed_url = "https://open.spotify.com/embed/track/4cOdK2wGLETKBW3PvgPWqT"
response = await self.client.get(embed_url, headers={"User-Agent": get_random_user_agent()})
if response.status_code == 200:
token_match = re.search(r'"accessToken":"([^"]+)"', response.text)
if token_match:
self.access_token = token_match.group(1)
logger.info("Got Spotify token via embed page")
return self.access_token
except Exception as e:
logger.warning(f"Embed token fetch failed: {e}")
raise Exception("Failed to get Spotify access token")
async def _api_request(self, endpoint: str, params: dict = None) -> dict:
"""Make authenticated API request with rate limit handling."""
import asyncio
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
token = await self._get_access_token()
headers = {
"Authorization": f"Bearer {token}",
"User-Agent": get_random_user_agent(),
"Accept": "application/json",
}
response = await self.client.get(f"{self.API_BASE}{endpoint}", headers=headers, params=params)
if response.status_code == 401:
logger.warning("Got 401, refreshing Spotify token...")
self.access_token = None
continue
if response.status_code == 429:
retry_after = min(int(response.headers.get("Retry-After", retry_delay)), 10)
logger.warning(f"Rate limited (429). Waiting {retry_after}s before retry {attempt + 1}/{max_retries}")
await asyncio.sleep(retry_after)
retry_delay *= 2
continue
response.raise_for_status()
return response.json()
response.raise_for_status()
return response.json()
def parse_spotify_url(self, url: str) -> Optional[Tuple[str, str]]:
"""Parse Spotify URL and return (type, id) or None."""
for url_type, pattern in self.URL_PATTERNS.items():
match = pattern.search(url)
if match:
return (url_type, match.group(1))
return None
def is_spotify_url(self, url: str) -> bool:
"""Check if a URL is a Spotify URL."""
return 'spotify.com/' in url or 'spotify:' in url
# ========== TRACK METHODS ==========
async def get_track_by_id(self, track_id: str) -> Optional[Dict[str, Any]]:
"""Get a single track by ID."""
try:
data = await self._api_request(f"/tracks/{track_id}", {"market": "US"})
return self._format_track(data)
except:
return None
def _format_track(self, item: dict) -> dict:
"""Format track data for frontend."""
return {
"id": item["id"],
"type": "track",
"name": item["name"],
"artists": ", ".join(a["name"] for a in item["artists"]),
"artist_names": [a["name"] for a in item["artists"]],
"album": item["album"]["name"],
"album_id": item["album"]["id"],
"album_art": self._get_best_image(item["album"]["images"]),
"duration_ms": item["duration_ms"],
"duration": self._format_duration(item["duration_ms"]),
"isrc": item.get("external_ids", {}).get("isrc"),
"source": "spotify",
}
# ========== ALBUM METHODS ==========
async def get_album(self, album_id: str) -> Optional[Dict[str, Any]]:
"""Get album with all tracks."""
try:
data = await self._api_request(f"/albums/{album_id}", {"market": "US"})
album = self._format_album(data)
tracks = []
for item in data.get("tracks", {}).get("items", []):
track = {
"id": item["id"],
"type": "track",
"name": item["name"],
"artists": ", ".join(a["name"] for a in item["artists"]),
"artist_names": [a["name"] for a in item["artists"]],
"album": data["name"],
"album_id": album_id,
"album_art": album["album_art"],
"duration_ms": item["duration_ms"],
"duration": self._format_duration(item["duration_ms"]),
"isrc": None,
"source": "spotify",
}
tracks.append(track)
album["tracks"] = tracks
return album
except Exception as e:
logger.error(f"Error fetching Spotify album {album_id}: {e}")
return None
def _format_album(self, item: dict) -> dict:
return {
"id": item["id"],
"type": "album",
"name": item["name"],
"artists": ", ".join(a["name"] for a in item.get("artists", [])),
"album_art": self._get_best_image(item.get("images", [])),
"release_date": item.get("release_date", ""),
"total_tracks": item.get("total_tracks", 0),
"source": "spotify",
}
# ========== PLAYLIST METHODS ==========
async def get_playlist(self, playlist_id: str) -> Optional[Dict[str, Any]]:
"""Get playlist with all tracks."""
try:
data = await self._api_request(f"/playlists/{playlist_id}", {"market": "US"})
playlist = {
"id": data["id"],
"type": "playlist",
"name": data["name"],
"description": data.get("description", ""),
"album_art": self._get_best_image(data.get("images", [])),
"owner": data.get("owner", {}).get("display_name", ""),
"total_tracks": data.get("tracks", {}).get("total", 0),
"source": "spotify",
}
tracks = []
for item in data.get("tracks", {}).get("items", []):
track_data = item.get("track")
if track_data and track_data.get("id"):
tracks.append(self._format_track(track_data))
playlist["tracks"] = tracks
return playlist
except Exception as e:
logger.error(f"Error fetching Spotify playlist {playlist_id}: {e}")
return None
# ========== ARTIST METHODS ==========
async def get_artist(self, artist_id: str) -> Optional[Dict[str, Any]]:
"""Get artist info with top tracks."""
try:
artist_data = await self._api_request(f"/artists/{artist_id}")
artist = {
"id": artist_data["id"],
"type": "artist",
"name": artist_data["name"],
"image": self._get_best_image(artist_data.get("images", [])),
"genres": artist_data.get("genres", []),
"followers": artist_data.get("followers", {}).get("total", 0),
"source": "spotify",
}
top_tracks = await self._api_request(f"/artists/{artist_id}/top-tracks", {"market": "US"})
artist["tracks"] = [self._format_track(t) for t in top_tracks.get("tracks", [])]
return artist
except Exception as e:
logger.error(f"Error fetching Spotify artist {artist_id}: {e}")
return None
# ========== AUDIO FEATURES & CAMELOT ==========
# Camelot Wheel: Maps (pitch_class, mode) to Camelot notation
# pitch_class: 0=C, 1=C#, 2=D, ..., 11=B
# mode: 1=Major (B), 0=Minor (A)
CAMELOT_MAP = {
(0, 1): "8B", (0, 0): "5A", # C Major / C Minor
(1, 1): "3B", (1, 0): "12A", # C# Major / C# Minor
(2, 1): "10B", (2, 0): "7A", # D Major / D Minor
(3, 1): "5B", (3, 0): "2A", # D# Major / D# Minor
(4, 1): "12B", (4, 0): "9A", # E Major / E Minor
(5, 1): "7B", (5, 0): "4A", # F Major / F Minor
(6, 1): "2B", (6, 0): "11A", # F# Major / F# Minor
(7, 1): "9B", (7, 0): "6A", # G Major / G Minor
(8, 1): "4B", (8, 0): "1A", # G# Major / G# Minor
(9, 1): "11B", (9, 0): "8A", # A Major / A Minor
(10, 1): "6B", (10, 0): "3A", # A# Major / A# Minor
(11, 1): "1B", (11, 0): "10A", # B Major / B Minor
}
def _to_camelot(self, key: int, mode: int) -> str:
"""Convert Spotify key/mode to Camelot notation."""
return self.CAMELOT_MAP.get((key, mode), "?")
async def search_track_by_isrc(self, isrc: str) -> Optional[str]:
"""Search for a track by ISRC and return Spotify track ID."""
try:
data = await self._api_request("/search", {"q": f"isrc:{isrc}", "type": "track", "limit": 1})
tracks = data.get("tracks", {}).get("items", [])
if tracks:
return tracks[0].get("id")
except Exception as e:
logger.warning(f"ISRC search failed for {isrc}: {e}")
return None
async def search_track_by_name(self, name: str, artist: str) -> Optional[str]:
"""Search for a track by name and artist, return Spotify track ID."""
try:
# 1. Try strict search first
query = f"track:{name} artist:{artist}"
data = await self._api_request("/search", {"q": query, "type": "track", "limit": 1, "market": "US"})
tracks = data.get("tracks", {}).get("items", [])
if tracks:
return tracks[0].get("id")
# 2. Fallback to loose search (just string matching)
# Remove special chars and extra artists for better matching
clean_name = name.split('(')[0].split('-')[0].strip()
clean_artist = artist.split(',')[0].strip()
query = f"{clean_name} {clean_artist}"
data = await self._api_request("/search", {"q": query, "type": "track", "limit": 1, "market": "US"})
tracks = data.get("tracks", {}).get("items", [])
if tracks:
return tracks[0].get("id")
except Exception as e:
logger.warning(f"Name search failed for {name} by {artist}: {e}")
return None
async def get_audio_features(self, track_id: str, isrc: str = None, name: str = None, artist: str = None) -> Optional[Dict[str, Any]]:
"""Get audio features (BPM, key, energy) for a single track.
If track_id starts with 'dz_' (Deezer), will try ISRC or name/artist lookup first.
"""
spotify_id = track_id
# Handle Deezer tracks - need to find Spotify equivalent
if track_id.startswith("dz_"):
spotify_id = None
# Try ISRC first
if isrc:
spotify_id = await self.search_track_by_isrc(isrc)
# Fallback to name/artist search
if not spotify_id and name and artist:
spotify_id = await self.search_track_by_name(name, artist)
if not spotify_id:
logger.warning(f"Could not find Spotify ID for Deezer track {track_id}")
return None
try:
data = await self._api_request(f"/audio-features/{spotify_id}")
return self._format_audio_features(data)
except Exception as e:
# 403 Forbidden likely means token lacks permission (scraper token) or ID is invalid
if "403" in str(e):
logger.warning(f"Spotify 403 Forbidden for {spotify_id} (token permissions?)")
else:
logger.error(f"Error fetching audio features for {spotify_id}: {e}")
return None
async def get_audio_features_batch(self, track_ids: List[str]) -> List[Optional[Dict[str, Any]]]:
"""Get audio features for multiple tracks (max 100 per request)."""
if not track_ids:
return []
# Spotify API limit is 100 tracks per request
results = []
for i in range(0, len(track_ids), 100):
batch = track_ids[i:i+100]
try:
data = await self._api_request("/audio-features", {"ids": ",".join(batch)})
for features in data.get("audio_features", []):
if features:
results.append(self._format_audio_features(features))
else:
results.append(None)
except Exception as e:
logger.error(f"Error fetching batch audio features: {e}")
results.extend([None] * len(batch))
return results
def _format_audio_features(self, data: dict) -> dict:
"""Format audio features for frontend."""
key = data.get("key", -1)
mode = data.get("mode", 0)
return {
"track_id": data.get("id"),
"bpm": round(data.get("tempo", 0)),
"key": key,
"mode": mode,
"camelot": self._to_camelot(key, mode) if key >= 0 else "?",
"energy": round(data.get("energy", 0), 2),
"danceability": round(data.get("danceability", 0), 2),
"valence": round(data.get("valence", 0), 2), # "happiness"
}
# ========== UTILITIES ==========
def _get_best_image(self, images: List[Dict]) -> Optional[str]:
if not images:
return None
sorted_images = sorted(images, key=lambda x: x.get("width", 0), reverse=True)
return sorted_images[0]["url"] if sorted_images else None
def _format_duration(self, ms: int) -> str:
seconds = ms // 1000
minutes = seconds // 60
secs = seconds % 60
return f"{minutes}:{secs:02d}"
async def get_made_for_you_playlists(self) -> List[Dict[str, Any]]:
"""
Get 'Made For You' playlists (Daily Mix, Discover Weekly, etc.).
Uses search API with strict filtering for Spotify-owned playlists.
Requires authenticated token (sp_dc cookie).
"""
try:
# Check if we have a valid token (will try cookie auth)
token = await self._get_access_token()
if not token:
logger.warning("No Spotify token available for Made For You")
return []
mixes = []
queries = ["Daily Mix", "Discover Weekly", "Release Radar", "On Repeat", "Repeat Rewind"]
for q in queries:
try:
data = await self._api_request("/search", {"q": q, "type": "playlist", "limit": 10})
if not data:
continue
items = data.get("playlists", {}).get("items", [])
for item in items:
if not item:
continue
owner_id = item.get("owner", {}).get("id", "")
name = item.get("name", "")
# Filter: owned by "spotify" OR name starts with one of our keywords
# (Daily Mix 1, Daily Mix 2, etc. are personalized)
is_spotify_owned = owner_id == "spotify"
name_matches = any(name.startswith(kw) or name == kw for kw in queries)
if is_spotify_owned or name_matches:
mixes.append({
"id": item["id"],
"name": name,
"description": item.get("description", ""),
"image": self._get_best_image(item.get("images", [])),
"owner": "Spotify",
"type": "playlist",
"source": "spotify"
})
except Exception as e:
logger.warning(f"Failed to fetch mix '{q}': {e}")
# Deduplicate by ID
unique_mixes = {m['id']: m for m in mixes}.values()
logger.info(f"Found {len(list(unique_mixes))} Made For You playlists")
return list(unique_mixes)
except Exception as e:
logger.error(f"Error fetching Made For You playlists: {e}")
return []
async def close(self):
await self.client.aclose()
# Singleton instance
spotify_service = SpotifyService()