freedify/app/ytmusic_service.py
2026-01-13 22:26:48 +00:00

158 lines
6.2 KiB
Python

"""
YouTube Music service for Freedify.
Uses ytmusicapi for searching YouTube Music catalog.
Streaming is handled by existing audio_service (yt-dlp).
"""
from ytmusicapi import YTMusic
from typing import Optional, Dict, List, Any
import logging
logger = logging.getLogger(__name__)
class YTMusicService:
"""Service for searching YouTube Music."""
def __init__(self):
# Initialize without auth (works for search)
self.ytm = YTMusic()
async def search_tracks(self, query: str, limit: int = 20, offset: int = 0) -> List[Dict[str, Any]]:
"""Search for songs on YouTube Music."""
try:
# YTMusic doesn't have native offset, so we fetch more and slice
total_needed = offset + limit
results = self.ytm.search(query, filter="songs", limit=total_needed)
# Slice to get the offset range
sliced = results[offset:offset + limit] if offset > 0 else results[:limit]
return [self._format_track(item) for item in sliced if item.get("videoId")]
except Exception as e:
logger.error(f"YTMusic search error: {e}")
return []
async def search_albums(self, query: str, limit: int = 20) -> List[Dict[str, Any]]:
"""Search for albums on YouTube Music."""
try:
results = self.ytm.search(query, filter="albums", limit=limit)
return [self._format_album(item) for item in results if item.get("browseId")]
except Exception as e:
logger.error(f"YTMusic album search error: {e}")
return []
async def get_album(self, album_id: str) -> Optional[Dict[str, Any]]:
"""Get album details with tracks."""
try:
# Remove ytm_ prefix if present
clean_id = album_id.replace("ytm_", "")
data = self.ytm.get_album(clean_id)
album = {
"id": f"ytm_{clean_id}",
"type": "album",
"name": data.get("title", ""),
"artists": ", ".join([a.get("name", "") for a in data.get("artists", [])]),
"album_art": self._get_thumbnail(data.get("thumbnails")),
"total_tracks": data.get("trackCount", 0),
"release_date": data.get("year", ""),
"source": "ytmusic",
}
tracks = []
for item in data.get("tracks", []):
if not item.get("videoId"):
continue
track = self._format_track(item)
track["album"] = album["name"]
track["album_art"] = album["album_art"]
tracks.append(track)
album["tracks"] = tracks
return album
except Exception as e:
logger.error(f"YTMusic get_album error: {e}")
return None
def _format_track(self, item: dict) -> dict:
"""Format track data for frontend."""
artists = item.get("artists", [])
artist_str = ", ".join([a.get("name", "") for a in artists]) if artists else ""
# Get album info if available
album = item.get("album", {}) or {}
# Duration can be in seconds or as string "3:45"
duration_str = item.get("duration", "0:00")
duration_ms = self._parse_duration(duration_str)
return {
"id": f"ytm_{item.get('videoId', '')}",
"type": "track",
"name": item.get("title", ""),
"artists": artist_str,
"artist_names": [a.get("name", "") for a in artists],
"album": album.get("name", "") if isinstance(album, dict) else str(album),
"album_id": f"ytm_{album.get('id', '')}" if isinstance(album, dict) else "",
"album_art": self._get_thumbnail(item.get("thumbnails")),
"duration_ms": duration_ms,
"duration": duration_str if isinstance(duration_str, str) else self._format_duration(duration_ms),
"isrc": f"ytm_{item.get('videoId', '')}", # Use prefixed videoId for streaming
"source": "ytmusic",
"video_id": item.get("videoId", ""), # Keep for reference
}
def _format_album(self, item: dict) -> dict:
"""Format album data for frontend."""
artists = item.get("artists", [])
artist_str = ", ".join([a.get("name", "") for a in artists]) if artists else ""
return {
"id": f"ytm_{item.get('browseId', '')}",
"type": "album",
"name": item.get("title", ""),
"artists": artist_str,
"album_art": self._get_thumbnail(item.get("thumbnails")),
"release_date": item.get("year", ""),
"source": "ytmusic",
}
def _get_thumbnail(self, thumbnails: list) -> str:
"""Get highest quality thumbnail."""
if not thumbnails:
return "/static/icon.svg"
# Sort by width descending and get the largest
sorted_thumbs = sorted(thumbnails, key=lambda x: x.get("width", 0), reverse=True)
url = sorted_thumbs[0].get("url", "/static/icon.svg")
# Proxy googleusercontent images to avoid 429
if "googleusercontent.com" in url or "ggpht.com" in url:
import urllib.parse
return f"/api/proxy_image?url={urllib.parse.quote(url)}"
return url
def _parse_duration(self, duration) -> int:
"""Parse duration string to milliseconds."""
if isinstance(duration, int):
return duration * 1000
if not duration or not isinstance(duration, str):
return 0
try:
parts = duration.split(":")
if len(parts) == 2:
return (int(parts[0]) * 60 + int(parts[1])) * 1000
elif len(parts) == 3:
return (int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])) * 1000
return 0
except:
return 0
def _format_duration(self, ms: int) -> str:
"""Format milliseconds to mm:ss."""
seconds = ms // 1000
mins = seconds // 60
secs = seconds % 60
return f"{mins}:{secs:02d}"
# Singleton instance
ytmusic_service = YTMusicService()