Files
musicbrainz-ratings-helper/musicbrainz-ratings-helper.py

1237 lines
49 KiB
Python

#!/usr/bin/env python3
"""Push Navidrome ratings to MusicBrainz.
This script reads ratings from Navidrome over its Subsonic API and submits them
to the MusicBrainz XML API. It supports song, album, and artist ratings.
Song ratings are pushed to recording MBIDs. If a song belongs to a release group
and multiple releases in that group contain the same track title, all matching
recordings are rated instead of stopping at the first match.
"""
from __future__ import annotations
import argparse
import logging
import os
import re
from collections import defaultdict
from dataclasses import dataclass
import time
import xml.etree.ElementTree as ET
import requests
from requests.auth import HTTPDigestAuth
from colorama import init, Fore, Style
# Initialize colorama
init(autoreset=False)
MUSICBRAINZ_BASE_URL = "https://musicbrainz.org/ws/2"
MUSICBRAINZ_XML_NS = "http://musicbrainz.org/ns/mmd-2.0#"
CLIENT_NAME = "musicbrainz-ratings-helper-0.1.0"
SCRIPT_VERSION = "v0.1.0"
# Colors for logging
LIGHT_PURPLE = Fore.MAGENTA + Style.BRIGHT
LIGHT_GREEN = Fore.GREEN + Style.BRIGHT
LIGHT_RED = Fore.RED + Style.BRIGHT
LIGHT_BLUE = Fore.BLUE + Style.BRIGHT
LIGHT_CYAN = Fore.CYAN + Style.BRIGHT
LIGHT_YELLOW = Fore.YELLOW + Style.BRIGHT
BOLD = Style.BRIGHT
RESET = Style.RESET_ALL
class SafeAsciiFormatter(logging.Formatter):
"""Logging formatter that strips ANSI escape codes and encodes to ASCII."""
ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
def format(self, record):
rendered = super().format(record)
rendered = self.ansi_escape.sub("", rendered)
return rendered.encode("ascii", "backslashreplace").decode("ascii")
# Setup logs (match sptnr logging behavior)
LOG_DIR = "logs"
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
LOGFILE = os.path.join(LOG_DIR, f"musicbrainz-ratings-helper_{int(time.time())}.log")
@dataclass(frozen=True)
class RatingRow:
entity_type: str
navidrome_id: str
mbid: str
title: str
artist: str
release_group_mbid: str | None
release_mbid: str | None
rating: int
PreparedRow = tuple[str, str, int, RatingRow]
SubmissionCounts = dict[str, int]
def empty_submission_counts() -> SubmissionCounts:
return {"artist": 0, "release-group": 0, "recording": 0}
def add_submission_counts(total: SubmissionCounts, increment: SubmissionCounts) -> None:
for key in ("artist", "release-group", "recording"):
total[key] = total.get(key, 0) + increment.get(key, 0)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Push Navidrome ratings to MusicBrainz."
)
parser.add_argument(
"--navidrome-base-url",
default=None,
help="Base URL for Navidrome, for example https://navidrome.example.com.",
)
parser.add_argument(
"--navidrome-username",
default=None,
help="Navidrome username for Subsonic API auth.",
)
parser.add_argument(
"--navidrome-password",
default=None,
help="Navidrome password for Subsonic API auth.",
)
parser.add_argument(
"--entity",
action="append",
choices=["song", "album", "artist"],
help="Limit export to one or more entity types. Can be repeated.",
)
parser.add_argument(
"--expand-release-groups",
action=argparse.BooleanOptionalAction,
default=True,
help="Expand song ratings to all matching recordings in the same release group.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be submitted without posting to MusicBrainz.",
)
parser.add_argument(
"--max-artists",
type=int,
default=None,
help="Limit how many artist ratings are collected. Useful for short artist-only tests.",
)
parser.add_argument(
"--max-albums",
type=int,
default=None,
help="Limit how many album ratings are collected. Useful for short album-only tests.",
)
parser.add_argument(
"--artist-id",
default=None,
help="Limit album and song processing to a single Navidrome artist ID.",
)
parser.add_argument(
"--mb-username",
default=None,
help="MusicBrainz username. Defaults to MB_USERNAME.",
)
parser.add_argument(
"--mb-password",
default=None,
help="MusicBrainz password. Defaults to MB_PASSWORD.",
)
parser.add_argument(
"--log-level",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Logging verbosity.",
)
# submit order is hard-coded to artist, release-group, recording
return parser.parse_args()
def load_dotenv_file(path: str = ".env") -> None:
"""Load simple KEY=VALUE pairs from a local .env file."""
if not os.path.exists(path):
return
with open(path, "r", encoding="utf-8") as env_file:
for line in env_file:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
if not key:
continue
if (
len(value) >= 2
and value[0] == value[-1]
and value[0] in {"'", '"'}
):
value = value[1:-1]
os.environ.setdefault(key, value)
def normalize_text(value: str | None) -> str:
return " ".join((value or "").strip().lower().split())
def rating_to_musicbrainz(value: int) -> int:
value = max(0, min(5, value))
return int(round(value * 20))
def required_arg(value: str | None, env_name: str, label: str) -> str:
resolved = value or os.environ.get(env_name)
if not resolved:
logging.error(f"{LIGHT_RED}Missing required {label}. Set {env_name} or pass --{label}.{RESET}")
raise SystemExit(1)
return resolved
def log_blank_line() -> None:
logging.info("")
def log_artist_header(name: str, navidrome_id: str, index: int) -> None:
logging.info(f"Artist: {name} ({navidrome_id})[{index}]")
def log_album_header(
name: str,
navidrome_id: str,
rating: int | None = None,
has_rated_songs: bool = False,
release_group_mbid: str | None = None,
) -> None:
"""Log an album header with compact rating/skip context and resolved RG MBID.
Examples:
Album: Gucci (1MrF6...) | nr:20.0 | mbidRG:45380071-f2f0...
Album: Foo (abc123) | nr:n/a | contains rated songs | mbidRG:n/a
"""
if rating is None or rating <= 0:
rating_str = "n/a"
else:
rating_str = f"{rating:.1f}"
extra = f" | nr:{rating_str}"
if has_rated_songs and (rating is None or rating <= 0):
extra += " | contains rated songs"
rg = release_group_mbid or "n/a"
extra += f" | mbidRG:{rg}"
logging.info(f" Album: {name} ({navidrome_id}){extra}")
def log_skip(name: str, rating: int, entity: str = "Item") -> None:
logging.info(f"{entity}: s:{format_source_rating(rating)} -> mb:n/a | Skipping: {name}")
def _format_conn_error(exc: Exception, label: str) -> str:
"""Return a compact, human-friendly connection error string.
Examples:
MusicBrainz connection error, musicbrainz.org:443; Read timed out (10s)
Navidrome connection error, nav.example:443; Connection aborted
"""
text = str(exc)
# Try to extract host and port from common requests.ConnectionPool formatting
m = re.search(r"host='(?P<host>[^']+)'\s*,\s*port=(?P<port>\d+)", text)
hostport = None
if m:
host = m.group("host")
port = m.group("port")
hostport = f"{host}:{port}"
# Short message: prefer 'Read timed out' or the first sentence
short_msg = None
if "Read timed out" in text:
# try to find timeout seconds
tm = re.search(r"read timeout=?(?P<secs>\d+)", text)
if tm:
short_msg = f"Read timed out ({tm.group('secs')}s)"
else:
short_msg = "Read timed out"
else:
# take up to the first period or 120 chars
short_msg = text.split(".")[0][:120]
if hostport:
return f"{label} connection error, {hostport}; {short_msg}"
return f"{label} connection error; {short_msg}"
class NavidromeClient:
def __init__(self, base_url: str, username: str, password: str, client_name: str) -> None:
self.base_url = base_url.rstrip("/")
self.username = username
self.password = password
self.client_name = client_name
self.session = requests.Session()
self._next_request_at = 0.0
# Stats collected while scanning Navidrome library
self.stats: dict[str, int] = {
"tracks": 0,
"found": 0,
"skipped": 0,
"not_found": 0,
}
def _throttle(self) -> None:
now = time.monotonic()
if now < self._next_request_at:
time.sleep(self._next_request_at - now)
self._next_request_at = time.monotonic() + 1.05
def _request(self, endpoint: str, params: dict[str, object]) -> dict:
"""Make a Navidrome API request with retry logic and exponential backoff."""
max_retries = 5
for attempt in range(max_retries):
try:
self._throttle()
query = {
"u": self.username,
"p": self.password,
"v": "1.16.1",
"c": self.client_name,
"f": "json",
**{k: v for k, v in params.items() if v is not None},
}
# Ensure all query params are strings for requests and type-checkers
safe_query = {k: str(v) for k, v in query.items()}
response = self.session.get(f"{self.base_url}/rest/{endpoint}", params=safe_query, timeout=10)
# Handle server errors with retry
if response.status_code >= 500:
wait_time = (attempt + 1) * 2
logging.warning(f"{LIGHT_YELLOW}Navidrome server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}")
time.sleep(wait_time)
continue
response.raise_for_status()
payload = response.json()
if payload.get("error"):
logging.error(f"{LIGHT_RED}Navidrome API error for {endpoint}: {payload['error']}{RESET}")
raise SystemExit(1)
return payload.get("subsonic-response", payload)
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
wait_time = (attempt + 1) * 2
short = _format_conn_error(e, "Navidrome")
logging.warning(f"{LIGHT_YELLOW}{short}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}")
time.sleep(wait_time)
continue
except requests.exceptions.RequestException as e:
logging.error(f"{LIGHT_RED}Navidrome request failed: {e}{RESET}")
break
# If we get here, all retries failed
logging.error(f"{LIGHT_RED}Failed after {max_retries} attempts for Navidrome endpoint: {endpoint}{RESET}")
raise SystemExit(1)
def get_artists(self) -> list[dict]:
response = self._request("getArtists", {})
artists: list[dict] = []
for index in response.get("artists", {}).get("index", []):
artists.extend(index.get("artist", []))
return artists
def get_album_list_page(self, offset: int, size: int = 500) -> list[dict]:
response = self._request(
"getAlbumList2",
{"type": "alphabeticalByName", "offset": offset, "size": size},
)
return response.get("albumList2", {}).get("album", [])
def get_album(self, album_id: str) -> dict:
response = self._request("getAlbum", {"id": album_id})
return response.get("album", {})
def get_artist(self, artist_id: str) -> dict:
response = self._request("getArtist", {"id": artist_id})
return response.get("artist", {})
def get_all_albums(self, page_size: int = 500) -> list[dict]:
albums: list[dict] = []
offset = 0
while True:
page = self.get_album_list_page(offset, page_size)
if not page:
break
albums.extend(page)
if len(page) < page_size:
break
offset += len(page)
return albums
def build_rows(
self,
entities: set[str],
musicbrainz: "MusicBrainzClient",
max_artists: int | None = None,
max_albums: int | None = None,
artist_id: str | None = None,
):
# When an explicit artist_id is provided we may need the artist twice
# (for artist rows and for album/song collection). Fetch it once and reuse.
artist_source: list[dict] | None = None
if "artist" in entities:
artist_rows = 0
# If an artist_id is provided, limit artist collection to that artist only.
artist_source = ([self.get_artist(artist_id)] if artist_id else self.get_artists())
for artist_index, artist in enumerate(artist_source):
if not artist:
continue
if artist_index == 0:
log_blank_line()
log_artist_header(artist.get("name", ""), artist.get("id", ""), artist_index)
rating = int(artist.get("userRating") or 0)
if rating <= 0:
log_skip(artist.get("name", ""), rating, entity="Artist")
continue
mbid = artist.get("musicBrainzId") or ""
if not mbid:
continue
yield RatingRow(
entity_type="artist",
navidrome_id=artist.get("id", ""),
mbid=mbid,
title=artist.get("name", ""),
artist=artist.get("name", ""),
release_group_mbid=None,
release_mbid=None,
rating=rating,
)
artist_rows += 1
if max_artists is not None and artist_rows >= max_artists:
break
if "album" in entities or "song" in entities:
album_rows = 0
page_size = max_albums if max_albums is not None and max_albums > 0 else 500
# If an explicit artist_id was provided, reuse the previously fetched
# `artist_source` value instead of calling the API again.
selected_artist = None
if artist_id and artist_source:
# artist_source is a list with one element when artist_id was used
selected_artist = artist_source[0]
else:
selected_artist = self.get_artist(artist_id) if artist_id else None
album_source = selected_artist.get("album", []) if selected_artist else self.get_all_albums(page_size=page_size)
# Only print the artist header here when we didn't already collect artist rows
if selected_artist and "artist" not in entities:
log_blank_line()
log_artist_header(selected_artist.get("name", ""), selected_artist.get("id", ""), 0)
for album in album_source:
album_rows += 1
album_rating = int(album.get("userRating") or 0)
album_mbid = album.get("musicBrainzId") or ""
if album_mbid:
logging.debug(f"Resolving release-group for album '{album.get('name','')}' release:{album_mbid}")
album_release_group_mbid = resolve_release_group_mbid(musicbrainz, album_mbid)
logging.debug(f"Resolved release-group for album '{album.get('name','')}' -> {album_release_group_mbid or 'NONE'}")
else:
album_release_group_mbid = ""
# If we're collecting songs we need the album detail to know whether
# to print an album header (only print it when there are rated songs
# or the album itself has a rating).
album_detail = None
has_rated_songs = False
if "song" in entities:
logging.debug(f"Fetching album details for album id {album.get('id', '')}")
album_detail = self.get_album(album.get("id", ""))
for song in album_detail.get("song", []):
if int(song.get("userRating") or 0) > 0:
has_rated_songs = True
break
# Print album header only if the album has a rating or contains rated songs
if album_rating > 0 or has_rated_songs:
yield RatingRow(
entity_type="album-boundary",
navidrome_id=album.get("id", ""),
mbid=album_release_group_mbid or "",
title=album.get("name", ""),
artist=album.get("artist", ""),
release_group_mbid=album_release_group_mbid or None,
release_mbid=album_mbid or None,
rating=0,
)
log_album_header(
album.get("name", ""),
album.get("id", ""),
rating=album_rating,
has_rated_songs=has_rated_songs,
release_group_mbid=album_release_group_mbid or None,
)
if "album" in entities:
if album_rating > 0:
if album_release_group_mbid:
yield RatingRow(
entity_type="album",
navidrome_id=album.get("id", ""),
mbid=album_release_group_mbid,
title=album.get("name", ""),
artist=album.get("artist", ""),
release_group_mbid=album_release_group_mbid,
release_mbid=album_mbid or None,
rating=album_rating,
)
if max_albums is not None and album_rows >= max_albums:
break
else:
logging.warning(f"{LIGHT_YELLOW}Album '{album.get('name', '')}' has rating {album_rating} but no release-group MBID resolved.{RESET}")
else:
# If the album has no rating and no rated songs, log a compact skip line
if not has_rated_songs:
log_skip(album.get("name", ""), album_rating, entity="Album")
if "song" not in entities:
continue
# album_detail may already be fetched above
if album_detail is None:
album_detail = self.get_album(album.get("id", ""))
for song in album_detail.get("song", []):
song_rating = int(song.get("userRating") or 0)
# Count every track we inspect
try:
self.stats["tracks"] += 1
except Exception:
pass
if song_rating <= 0:
# Skipped due to no rating
try:
self.stats["skipped"] += 1
except Exception:
pass
logging.debug(f"nr:{song_rating:.1f} | Skipping Recording: {song.get('title', '')}")
continue
song_mbid = song.get("musicBrainzId") or ""
if not song_mbid:
# Rated but no direct MBID found. If we have a release-group
# match, still yield the row so the release-group expansion
# path can submit the rating instead of skipping it.
if album_release_group_mbid:
try:
self.stats["found"] += 1
except Exception:
pass
logging.debug(
f"{LIGHT_YELLOW}nr:{song_rating:.1f} | No song MBID; using release-group fallback for Recording: {song.get('title','')} ({song.get('id','')}){RESET}"
)
yield RatingRow(
entity_type="song",
navidrome_id=song.get("id", ""),
mbid="",
title=song.get("title", ""),
artist=song.get("artist", album.get("artist", "")),
release_group_mbid=album_release_group_mbid,
release_mbid=album_mbid or None,
rating=song_rating,
)
continue
# No direct MBID and no release-group to fall back to.
try:
self.stats["not_found"] += 1
except Exception:
pass
logging.debug(
f"{LIGHT_YELLOW}nr:{song_rating:.1f} | Rated but no MBID for Recording: {song.get('title','')} ({song.get('id','')}){RESET}"
)
continue
# We will yield a rated track that maps directly to a recording
try:
self.stats["found"] += 1
except Exception:
pass
yield RatingRow(
entity_type="song",
navidrome_id=song.get("id", ""),
mbid=song_mbid,
title=song.get("title", ""),
artist=song.get("artist", album.get("artist", "")),
release_group_mbid=album_release_group_mbid,
release_mbid=album_mbid or None,
rating=song_rating,
)
class MusicBrainzClient:
def __init__(self, client: str, username: str | None, password: str | None) -> None:
self.client = client
self.session = requests.Session()
self.session.headers.update({"User-Agent": client, "Accept": "application/json"})
if username and password:
self.session.auth = HTTPDigestAuth(username, password)
self._next_request_at = 0.0
def _throttle(self) -> None:
now = time.monotonic()
if now < self._next_request_at:
time.sleep(self._next_request_at - now)
self._next_request_at = time.monotonic() + 1.05
def get_json(self, path: str, params: dict[str, object], allow_404: bool = False) -> dict:
"""Make a MusicBrainz API GET request with retry logic and exponential backoff."""
max_retries = 3
for attempt in range(max_retries):
try:
self._throttle()
# Ensure params are strings to satisfy requests parameter types
safe_params = {k: str(v) for k, v in params.items() if v is not None}
response = self.session.get(f"{MUSICBRAINZ_BASE_URL}{path}", params=safe_params, timeout=20)
# Handle rate limiting
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 5))
logging.warning(f"{LIGHT_YELLOW}MusicBrainz rate limited. Retrying after {retry_after} seconds...{RESET}")
time.sleep(retry_after)
continue
# Handle server errors with retry
if response.status_code >= 500:
wait_time = (attempt + 1) * 2
logging.warning(f"{LIGHT_YELLOW}MusicBrainz server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}")
time.sleep(wait_time)
continue
if response.status_code == 404 and allow_404:
logging.warning(f"{LIGHT_YELLOW}MusicBrainz resource not found for {path}. Skipping.{RESET}")
return {}
response.raise_for_status()
return response.json()
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
wait_time = (attempt + 1) * 2
short = _format_conn_error(e, "MusicBrainz")
logging.warning(f"{LIGHT_YELLOW}{short}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}")
time.sleep(wait_time)
continue
except requests.exceptions.RequestException as e:
logging.error(f"{LIGHT_RED}MusicBrainz request failed: {e}{RESET}")
break
# If we get here, all retries failed
logging.error(f"{LIGHT_RED}Failed after {max_retries} attempts for MusicBrainz path: {path}{RESET}")
raise SystemExit(1)
def post_xml(self, path: str, xml_body: bytes) -> requests.Response:
"""Make a MusicBrainz API POST request with retry logic and exponential backoff."""
max_retries = 5
for attempt in range(max_retries):
try:
self._throttle()
headers = {"Content-Type": "application/xml; charset=utf-8"}
response = self.session.post(
f"{MUSICBRAINZ_BASE_URL}{path}",
params={"client": self.client},
data=xml_body,
headers=headers,
timeout=20,
)
# Handle rate limiting
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 5))
logging.warning(f"{LIGHT_YELLOW}MusicBrainz rate limited. Retrying after {retry_after} seconds...{RESET}")
time.sleep(retry_after)
continue
# Handle server errors with retry
if response.status_code >= 500:
wait_time = (attempt + 1) * 2
logging.warning(f"{LIGHT_YELLOW}MusicBrainz server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}")
time.sleep(wait_time)
continue
response.raise_for_status()
return response
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
wait_time = (attempt + 1) * 2
short = _format_conn_error(e, "MusicBrainz")
logging.warning(f"{LIGHT_YELLOW}{short}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}")
time.sleep(wait_time)
continue
except requests.exceptions.RequestException as e:
logging.error(f"{LIGHT_RED}MusicBrainz request failed: {e}{RESET}")
break
# If we get here, all retries failed
logging.error(f"{LIGHT_RED}Failed after {max_retries} attempts for MusicBrainz path: {path}{RESET}")
raise SystemExit(1)
def resolve_release_group_mbid(client: MusicBrainzClient, release_mbid: str) -> str:
payload = client.get_json(
f"/release/{release_mbid}",
{"inc": "release-groups", "fmt": "json"},
allow_404=True,
)
return payload.get("release-group", {}).get("id", "")
def release_group_recording_ids(
client: MusicBrainzClient,
release_group_mbid: str,
title: str,
artist: str,
release_mbid: str | None = None,
) -> list[str]:
if not release_group_mbid:
return []
wanted_title = normalize_text(title)
wanted_artist = normalize_text(artist)
candidates: list[str] = []
candidate_titles: list[str] = []
seen: set[str] = set()
offset = 0
while True:
payload = client.get_json(
"/release",
{
"release-group": release_group_mbid,
"inc": "recordings+artist-credits",
"fmt": "json",
"limit": 100,
"offset": offset,
},
allow_404=True,
)
releases = payload.get("releases", [])
if not releases:
# Diagnostic: show what the API returned so we can debug empty results
try:
keys = ",".join(sorted(payload.keys())) if isinstance(payload, dict) else str(type(payload))
except Exception:
keys = "(unprintable)"
logging.info(
f"{LIGHT_YELLOW}No releases found from /release for RG {release_group_mbid}. Payload keys: {keys}{RESET}"
)
if not releases:
# Try a fallback to the release-group endpoint if /release
# returned no releases (sometimes the API shape differs).
logging.debug(f"No releases from /release for RG {release_group_mbid}; trying /release-group fallback")
fallback = client.get_json(
f"/release-group/{release_group_mbid}",
{"inc": "releases+media+recordings+artist-credits", "fmt": "json"},
allow_404=True,
)
releases = fallback.get("releases", []) if isinstance(fallback, dict) else []
if not releases:
try:
keys = ",".join(sorted(fallback.keys())) if isinstance(fallback, dict) else str(type(fallback))
except Exception:
keys = "(unprintable)"
logging.info(
f"{LIGHT_YELLOW}No releases found from /release-group for RG {release_group_mbid}. Payload keys: {keys}{RESET}"
)
if not releases:
# If we have a specific release MBID (album-level MBID), try
# fetching that release directly as a last resort.
if release_mbid:
logging.debug(f"No releases in RG; trying specific release {release_mbid}")
rel_payload = client.get_json(
f"/release/{release_mbid}",
{"inc": "recordings+media+artist-credits", "fmt": "json"},
allow_404=True,
)
# Extract tracks from release media
for medium in rel_payload.get("media", []):
for track in medium.get("tracks", []):
recording = track.get("recording") or {}
recording_id = recording.get("id")
if not recording_id or recording_id in seen:
continue
track_title = normalize_text(track.get("title"))
track_artist = normalize_text(
" ".join(
credit.get("name", "")
for credit in track.get("artist-credit", [])
if isinstance(credit, dict)
)
)
recording_artist = normalize_text(
" ".join(
credit.get("name", "")
for credit in recording.get("artist-credit", [])
if isinstance(credit, dict)
)
)
if wanted_title and track_title and track_title != wanted_title:
continue
if wanted_artist and track_artist and track_artist != wanted_artist and recording_artist and recording_artist != wanted_artist:
artist_matches = (
not wanted_artist
or wanted_artist in track_artist
or wanted_artist in recording_artist
or track_artist in wanted_artist
or recording_artist in wanted_artist
)
if not artist_matches:
continue
seen.add(recording_id)
candidates.append(recording_id)
candidate_titles.append(track.get('title',''))
if candidates:
break
break
for release in releases:
for medium in release.get("media", []):
for track in medium.get("tracks", []):
recording = track.get("recording") or {}
recording_id = recording.get("id")
if not recording_id or recording_id in seen:
continue
track_title = normalize_text(track.get("title"))
if track_title and wanted_title and track_title != wanted_title:
continue
track_artist = normalize_text(
" ".join(
credit.get("name", "")
for credit in track.get("artist-credit", [])
if isinstance(credit, dict)
)
)
recording_artist = normalize_text(
" ".join(
credit.get("name", "")
for credit in recording.get("artist-credit", [])
if isinstance(credit, dict)
)
)
artist_matches = (
not wanted_artist
or wanted_artist in track_artist
or wanted_artist in recording_artist
or track_artist in wanted_artist
or recording_artist in wanted_artist
)
if not artist_matches:
continue
seen.add(recording_id)
candidates.append(recording_id)
candidate_titles.append(track.get('title',''))
if len(releases) < 100:
break
offset += len(releases)
# If we found no candidates for an expected title, log a concise
# diagnostic listing a few candidate track titles to help spot
# normalization or punctuation mismatches.
if not candidates and wanted_title:
sample = ", ".join([t for t in candidate_titles[:10]]) or "(no tracks found)"
logging.info(
f"{LIGHT_YELLOW}No recordings matched title '{title}' (normalized '{wanted_title}') in RG {release_group_mbid}. Candidate titles: {sample}{RESET}"
)
return candidates
def build_submission(rows: list[tuple[str, str, int]]) -> bytes:
root = ET.Element("metadata", {"xmlns": MUSICBRAINZ_XML_NS})
grouped: dict[str, list[tuple[str, int]]] = defaultdict(list)
for entity_type, entity_id, rating in rows:
grouped[entity_type].append((entity_id, rating))
entity_tags = {
"artist": "artist-list",
"recording": "recording-list",
"release": "release-list",
"release-group": "release-group-list",
"work": "work-list",
}
for entity_type in ["artist", "recording", "release", "release-group", "work"]:
values = grouped.get(entity_type, [])
if not values:
continue
entity_list = ET.SubElement(root, entity_tags[entity_type])
for entity_id, rating in values:
entity = ET.SubElement(entity_list, entity_type, {"id": entity_id})
user_rating = ET.SubElement(entity, "user-rating")
user_rating.text = str(rating)
return ET.tostring(root, encoding="utf-8", xml_declaration=True)
def prepare_target_row(
row: RatingRow,
client: MusicBrainzClient,
expand_release_groups: bool,
) -> list[PreparedRow]:
prepared: list[PreparedRow] = []
rating = rating_to_musicbrainz(row.rating)
if rating <= 0:
return prepared
targets: list[tuple[str, str]] = []
if row.entity_type == "song":
if row.mbid:
targets.append(("recording", row.mbid))
if expand_release_groups and row.release_group_mbid:
logging.debug(f"Expanding release-group {row.release_group_mbid} for '{row.title}' / '{row.artist}'")
recordings = release_group_recording_ids(
client, row.release_group_mbid, row.title, row.artist, getattr(row, "release_mbid", None)
)
logging.debug(f"Expanded release-group {row.release_group_mbid}: {len(recordings)} recordings")
for recording_id in recordings:
targets.append(("recording", recording_id))
if not targets and row.mbid:
targets.append(("recording", row.mbid))
elif row.entity_type == "album":
if row.mbid:
targets.append(("release-group", row.mbid))
if row.release_group_mbid and row.release_group_mbid != row.mbid:
targets.append(("release-group", row.release_group_mbid))
elif row.entity_type == "artist":
if row.mbid:
targets.append(("artist", row.mbid))
seen_targets: set[tuple[str, str]] = set()
for entity_type, entity_id in targets:
if not entity_id:
continue
key = (entity_type, entity_id)
if key in seen_targets:
continue
seen_targets.add(key)
prepared.append((entity_type, entity_id, rating, row))
return prepared
def format_source_rating(value: int | None) -> str:
if value is None:
return "n/a"
return str(value)
def format_musicbrainz_rating(value: int | None) -> str:
if value is None:
return "n/a"
return str(value)
def rating_entity_label(entity_type: str) -> str:
return {
"artist": "Artist",
"album": "Album",
"song": "Recording",
}.get(entity_type, entity_type.capitalize())
def log_rating_result(
row: RatingRow,
mb_rating: int,
status: int | str,
*,
color: str = LIGHT_GREEN,
) -> None:
title = row.title or row.artist or row.mbid
artist = row.artist or ""
label = rating_entity_label(row.entity_type)
logging.info(
f"{color}{label}: s:{format_source_rating(row.rating)} -> mb:{format_musicbrainz_rating(mb_rating)} | {title} / {artist}: {status}{RESET}"
)
def submit_ratings(client: MusicBrainzClient, rows: list[PreparedRow], dry_run: bool) -> SubmissionCounts:
# Group values by entity type but keep the original RatingRow for logging
grouped_by_type: dict[str, list[tuple[str, int, RatingRow]]] = defaultdict(list)
for entity_type, entity_id, rating, row in rows:
grouped_by_type[entity_type].append((entity_id, rating, row))
# Collect counts per entity type to return to the caller
counts = empty_submission_counts()
# Submit artist ratings first, then release-groups (albums), then recordings.
for entity_type in ["artist", "release-group", "recording"]:
values = grouped_by_type.get(entity_type, [])
counts[entity_type] = len(values)
if not values:
continue
# For release-group batches, emit a concise summary of how many
# rg_variants are in this submission batch (only for real runs).
if entity_type == "release-group" and not dry_run:
logging.info(f"{LIGHT_BLUE}rg_variants in this batch: {len(values)}{RESET}")
submission_rows = [(entity_type, entity_id, rating) for entity_id, rating, _ in values]
xml_body = build_submission(submission_rows)
if dry_run:
for _, mb_rating, row in values:
log_rating_result(row, mb_rating, "dry-run")
logging.info(f"{LIGHT_GREEN}Submitted {len(values)} {entity_type} ratings: dry-run{RESET}")
continue
response = client.post_xml("/rating", xml_body)
# After batch submit, log per-rating status lines and a summary
for entity_id, mb_rating, row in values:
log_rating_result(row, mb_rating, response.status_code)
logging.info(f"{LIGHT_GREEN}Submitted {len(values)} {entity_type} ratings: {response.status_code}{RESET}")
return counts
def flush_submission_buffer(
buffer: list[PreparedRow],
client: MusicBrainzClient,
dry_run: bool,
) -> SubmissionCounts:
if not buffer:
return empty_submission_counts()
# Deduplicate buffer by (entity_type, entity_id), keeping first occurrence.
deduped: list[PreparedRow] = []
seen: set[tuple[str, str]] = set()
for entity_type, entity_id, rating, row in buffer:
key = (entity_type, entity_id)
if key in seen:
continue
seen.add(key)
deduped.append((entity_type, entity_id, rating, row))
counts = submit_ratings(client, deduped, dry_run)
buffer.clear()
return counts
def album_batch_key(row: RatingRow) -> str:
return row.release_mbid or row.release_group_mbid or row.navidrome_id
def log_missing_target(row: RatingRow) -> None:
title = row.title or ""
artist = row.artist or ""
mbid_field = f" | mbid:{row.mbid}" if row.mbid else ""
logging.info(
f" {LIGHT_YELLOW}{rating_entity_label(row.entity_type)}: s:{format_source_rating(row.rating)} -> mb:0 | (not found) | {title} / {artist} ({row.navidrome_id}){mbid_field}{RESET}"
)
def log_prepared_targets_debug(row: RatingRow, prepared_rows: list[PreparedRow]) -> None:
if row.entity_type != "song":
return
recording_ids = [
entity_id
for entity_type, entity_id, _, _ in prepared_rows
if entity_type == "recording"
]
recording_list = ", ".join(recording_ids) if recording_ids else "(none)"
mbid_field = f" | mbid:{row.mbid}" if row.mbid else ""
logging.debug(
f" {LIGHT_BLUE}Song: {row.title} ({row.navidrome_id}){mbid_field} | matched_recordings:{len(recording_ids)} | recordings:{recording_list}{RESET}"
)
def flush_and_count(
buffer: list[PreparedRow],
client: MusicBrainzClient,
dry_run: bool,
submitted_counts: SubmissionCounts,
) -> None:
add_submission_counts(
submitted_counts,
flush_submission_buffer(buffer, client, dry_run),
)
def main() -> int:
args = parse_args()
load_dotenv_file()
# Start time for run duration reporting
start_time = time.time()
# Set up the stream handler (console logging) without timestamp
console_handler = logging.StreamHandler()
console_handler.setFormatter(SafeAsciiFormatter("%(message)s"))
logging.basicConfig(level=getattr(logging, args.log_level), handlers=[console_handler])
# Set up the file handler (file logging) with timestamp, matching sptnr.py
file_handler = logging.FileHandler(LOGFILE, "a", encoding="ascii", errors="backslashreplace")
file_handler.setFormatter(SafeAsciiFormatter("[%(asctime)s] %(message)s"))
logging.getLogger().addHandler(file_handler)
# Ensure colorama auto-reset for console
try:
init(autoreset=True)
except Exception:
pass
logging.info(f"Version: musicbrainz-ratings-helper {SCRIPT_VERSION}")
if args.dry_run:
logging.info("Preview mode, no changes will be made.")
navidrome_base_url = required_arg(
args.navidrome_base_url,
"NAVIDROME_BASE_URL",
"navidrome-base-url",
)
navidrome_username = required_arg(
args.navidrome_username,
"NAVIDROME_USERNAME",
"navidrome-username",
)
navidrome_password = required_arg(
args.navidrome_password,
"NAVIDROME_PASSWORD",
"navidrome-password",
)
mb_username = args.mb_username or os.environ.get("MB_USERNAME")
mb_password = args.mb_password or os.environ.get("MB_PASSWORD")
entities = set(args.entity or ["song", "album", "artist"])
client = MusicBrainzClient(CLIENT_NAME, mb_username, mb_password)
navidrome = NavidromeClient(
navidrome_base_url,
navidrome_username,
navidrome_password,
CLIENT_NAME,
)
total_artists: int | None = None
if args.artist_id:
total_artists = 1
else:
try:
total_artists = len(navidrome.get_artists())
except SystemExit:
raise
except Exception as exc:
logging.warning(f"Could not determine total artists to process: {exc}")
if total_artists is not None:
logging.info(f"Total artists to process: {total_artists}")
submission_buffer: list[PreparedRow] = []
submit_batch_size = 100
resolved_any = False
submitted_counts = empty_submission_counts()
release_groups_total: set[str] = set()
album_count = 0
current_album_key: str | None = None
for row in navidrome.build_rows(
entities,
client,
args.max_artists,
args.max_albums,
args.artist_id,
):
if row.entity_type == "album-boundary":
if submission_buffer:
flush_and_count(submission_buffer, client, args.dry_run, submitted_counts)
current_album_key = album_batch_key(row)
continue
prepared_rows = prepare_target_row(row, client, args.expand_release_groups)
if not prepared_rows:
log_missing_target(row)
continue
resolved_any = True
if row.entity_type == "artist":
if submission_buffer:
flush_and_count(submission_buffer, client, args.dry_run, submitted_counts)
current_album_key = None
add_submission_counts(submitted_counts, submit_ratings(client, prepared_rows, args.dry_run))
continue
if row.entity_type in {"album", "song"}:
next_album_key = album_batch_key(row)
if current_album_key is None:
current_album_key = next_album_key
elif next_album_key != current_album_key:
flush_and_count(submission_buffer, client, args.dry_run, submitted_counts)
current_album_key = next_album_key
rg_ids = {entity_id for entity_type, entity_id, _, _ in prepared_rows if entity_type == "release-group"}
log_prepared_targets_debug(row, prepared_rows)
# Track release-groups seen during the run
for rg in rg_ids:
if rg:
release_groups_total.add(rg)
# Track album count when a source album produced targets
if row.entity_type == "album" and prepared_rows:
album_count += 1
# Always buffer prepared rows; `submit_ratings()` handles formatting
# so preview mode will match the real-submission output.
submission_buffer.extend(prepared_rows)
if len(submission_buffer) >= submit_batch_size:
flush_and_count(submission_buffer, client, args.dry_run, submitted_counts)
if submission_buffer:
flush_and_count(submission_buffer, client, args.dry_run, submitted_counts)
if not resolved_any:
logging.info(f"{LIGHT_RED}No MusicBrainz targets could be resolved from the Navidrome ratings.{RESET}")
return 0
# Emit a concise run summary (Tracks / Found / Skipped / Not Found / Match% / Time)
elapsed = time.time() - start_time
minutes = int(elapsed // 60)
seconds = int(elapsed % 60)
tracks = getattr(navidrome, "stats", {}).get("tracks", 0)
found = getattr(navidrome, "stats", {}).get("found", 0)
skipped = getattr(navidrome, "stats", {}).get("skipped", 0)
not_found = getattr(navidrome, "stats", {}).get("not_found", 0)
match_pct = (found / tracks * 100.0) if tracks else 0.0
release_groups_count = len(release_groups_total)
artists_count = total_artists if total_artists is not None else getattr(navidrome, "stats", {}).get("artists", 0)
submitted_label = "Previewed" if args.dry_run else "Submitted"
logging.info(
f"Artists: {artists_count} | Albums: {album_count} | rg_variants: {release_groups_count} | Tracks: {tracks} | Found: {found} | Skipped: {skipped} | Not Found: {not_found} | Match: {match_pct:.1f}% | {submitted_label}: Artists {submitted_counts['artist']}, Albums {submitted_counts['release-group']}, Recordings {submitted_counts['recording']} | Time: {minutes}m {seconds}s"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())