#!/usr/bin/env python3 """Push Navidrome ratings to MusicBrainz. This script reads ratings from Navidrome over its Subsonic API and submits them to the MusicBrainz XML API. It supports song, album, and artist ratings. Song ratings are pushed to recording MBIDs. If a song belongs to a release group and multiple releases in that group contain the same track title, all matching recordings are rated instead of stopping at the first match. """ from __future__ import annotations import argparse import logging import os import re from collections import defaultdict from dataclasses import dataclass import time import xml.etree.ElementTree as ET import requests from requests.auth import HTTPDigestAuth from colorama import init, Fore, Style # Initialize colorama init(autoreset=False) MUSICBRAINZ_BASE_URL = "https://musicbrainz.org/ws/2" MUSICBRAINZ_XML_NS = "http://musicbrainz.org/ns/mmd-2.0#" CLIENT_NAME = "musicbrainz-ratings-helper-0.1.0" SCRIPT_VERSION = "v0.1.0" # Colors for logging LIGHT_PURPLE = Fore.MAGENTA + Style.BRIGHT LIGHT_GREEN = Fore.GREEN + Style.BRIGHT LIGHT_RED = Fore.RED + Style.BRIGHT LIGHT_BLUE = Fore.BLUE + Style.BRIGHT LIGHT_CYAN = Fore.CYAN + Style.BRIGHT LIGHT_YELLOW = Fore.YELLOW + Style.BRIGHT BOLD = Style.BRIGHT RESET = Style.RESET_ALL class SafeAsciiFormatter(logging.Formatter): """Logging formatter that strips ANSI escape codes and encodes to ASCII.""" ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]") def format(self, record): rendered = super().format(record) rendered = self.ansi_escape.sub("", rendered) return rendered.encode("ascii", "backslashreplace").decode("ascii") # Setup logs (match sptnr logging behavior) LOG_DIR = "logs" if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) LOGFILE = os.path.join(LOG_DIR, f"musicbrainz-ratings-helper_{int(time.time())}.log") @dataclass(frozen=True) class RatingRow: entity_type: str navidrome_id: str mbid: str title: str artist: str release_group_mbid: str | None release_mbid: str | None rating: int PreparedRow = tuple[str, str, int, RatingRow] SubmissionCounts = dict[str, int] def empty_submission_counts() -> SubmissionCounts: return {"artist": 0, "release-group": 0, "recording": 0} def add_submission_counts(total: SubmissionCounts, increment: SubmissionCounts) -> None: for key in ("artist", "release-group", "recording"): total[key] = total.get(key, 0) + increment.get(key, 0) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Push Navidrome ratings to MusicBrainz." ) parser.add_argument( "--navidrome-base-url", default=None, help="Base URL for Navidrome, for example https://navidrome.example.com.", ) parser.add_argument( "--navidrome-username", default=None, help="Navidrome username for Subsonic API auth.", ) parser.add_argument( "--navidrome-password", default=None, help="Navidrome password for Subsonic API auth.", ) parser.add_argument( "--entity", action="append", choices=["song", "album", "artist"], help="Limit export to one or more entity types. Can be repeated.", ) parser.add_argument( "--expand-release-groups", action=argparse.BooleanOptionalAction, default=True, help="Expand song ratings to all matching recordings in the same release group.", ) parser.add_argument( "--dry-run", action="store_true", help="Show what would be submitted without posting to MusicBrainz.", ) parser.add_argument( "--max-artists", type=int, default=None, help="Limit how many artist ratings are collected. Useful for short artist-only tests.", ) parser.add_argument( "--max-albums", type=int, default=None, help="Limit how many album ratings are collected. Useful for short album-only tests.", ) parser.add_argument( "--artist-id", default=None, help="Limit album and song processing to a single Navidrome artist ID.", ) parser.add_argument( "--mb-username", default=None, help="MusicBrainz username. Defaults to MB_USERNAME.", ) parser.add_argument( "--mb-password", default=None, help="MusicBrainz password. Defaults to MB_PASSWORD.", ) parser.add_argument( "--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Logging verbosity.", ) # submit order is hard-coded to artist, release-group, recording return parser.parse_args() def load_dotenv_file(path: str = ".env") -> None: """Load simple KEY=VALUE pairs from a local .env file.""" if not os.path.exists(path): return with open(path, "r", encoding="utf-8") as env_file: for line in env_file: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip() if not key: continue if ( len(value) >= 2 and value[0] == value[-1] and value[0] in {"'", '"'} ): value = value[1:-1] os.environ.setdefault(key, value) def normalize_text(value: str | None) -> str: return " ".join((value or "").strip().lower().split()) def rating_to_musicbrainz(value: int) -> int: value = max(0, min(5, value)) return int(round(value * 20)) def required_arg(value: str | None, env_name: str, label: str) -> str: resolved = value or os.environ.get(env_name) if not resolved: logging.error(f"{LIGHT_RED}Missing required {label}. Set {env_name} or pass --{label}.{RESET}") raise SystemExit(1) return resolved def log_blank_line() -> None: logging.info("") def log_artist_header(name: str, navidrome_id: str, index: int) -> None: logging.info(f"Artist: {name} ({navidrome_id})[{index}]") def log_album_header( name: str, navidrome_id: str, rating: int | None = None, has_rated_songs: bool = False, release_group_mbid: str | None = None, ) -> None: """Log an album header with compact rating/skip context and resolved RG MBID. Examples: Album: Gucci (1MrF6...) | nr:20.0 | mbidRG:45380071-f2f0... Album: Foo (abc123) | nr:n/a | contains rated songs | mbidRG:n/a """ if rating is None or rating <= 0: rating_str = "n/a" else: rating_str = f"{rating:.1f}" extra = f" | nr:{rating_str}" if has_rated_songs and (rating is None or rating <= 0): extra += " | contains rated songs" rg = release_group_mbid or "n/a" extra += f" | mbidRG:{rg}" logging.info(f" Album: {name} ({navidrome_id}){extra}") def log_skip(name: str, rating: int, entity: str = "Item") -> None: logging.info(f"{entity}: s:{format_source_rating(rating)} -> mb:n/a | Skipping: {name}") def _format_conn_error(exc: Exception, label: str) -> str: """Return a compact, human-friendly connection error string. Examples: MusicBrainz connection error, musicbrainz.org:443; Read timed out (10s) Navidrome connection error, nav.example:443; Connection aborted """ text = str(exc) # Try to extract host and port from common requests.ConnectionPool formatting m = re.search(r"host='(?P[^']+)'\s*,\s*port=(?P\d+)", text) hostport = None if m: host = m.group("host") port = m.group("port") hostport = f"{host}:{port}" # Short message: prefer 'Read timed out' or the first sentence short_msg = None if "Read timed out" in text: # try to find timeout seconds tm = re.search(r"read timeout=?(?P\d+)", text) if tm: short_msg = f"Read timed out ({tm.group('secs')}s)" else: short_msg = "Read timed out" else: # take up to the first period or 120 chars short_msg = text.split(".")[0][:120] if hostport: return f"{label} connection error, {hostport}; {short_msg}" return f"{label} connection error; {short_msg}" class NavidromeClient: def __init__(self, base_url: str, username: str, password: str, client_name: str) -> None: self.base_url = base_url.rstrip("/") self.username = username self.password = password self.client_name = client_name self.session = requests.Session() self._next_request_at = 0.0 # Stats collected while scanning Navidrome library self.stats: dict[str, int] = { "tracks": 0, "found": 0, "skipped": 0, "not_found": 0, } def _throttle(self) -> None: now = time.monotonic() if now < self._next_request_at: time.sleep(self._next_request_at - now) self._next_request_at = time.monotonic() + 1.05 def _request(self, endpoint: str, params: dict[str, object]) -> dict: """Make a Navidrome API request with retry logic and exponential backoff.""" max_retries = 5 for attempt in range(max_retries): try: self._throttle() query = { "u": self.username, "p": self.password, "v": "1.16.1", "c": self.client_name, "f": "json", **{k: v for k, v in params.items() if v is not None}, } # Ensure all query params are strings for requests and type-checkers safe_query = {k: str(v) for k, v in query.items()} response = self.session.get(f"{self.base_url}/rest/{endpoint}", params=safe_query, timeout=10) # Handle server errors with retry if response.status_code >= 500: wait_time = (attempt + 1) * 2 logging.warning(f"{LIGHT_YELLOW}Navidrome server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}") time.sleep(wait_time) continue response.raise_for_status() payload = response.json() if payload.get("error"): logging.error(f"{LIGHT_RED}Navidrome API error for {endpoint}: {payload['error']}{RESET}") raise SystemExit(1) return payload.get("subsonic-response", payload) except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: wait_time = (attempt + 1) * 2 short = _format_conn_error(e, "Navidrome") logging.warning(f"{LIGHT_YELLOW}{short}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}") time.sleep(wait_time) continue except requests.exceptions.RequestException as e: logging.error(f"{LIGHT_RED}Navidrome request failed: {e}{RESET}") break # If we get here, all retries failed logging.error(f"{LIGHT_RED}Failed after {max_retries} attempts for Navidrome endpoint: {endpoint}{RESET}") raise SystemExit(1) def get_artists(self) -> list[dict]: response = self._request("getArtists", {}) artists: list[dict] = [] for index in response.get("artists", {}).get("index", []): artists.extend(index.get("artist", [])) return artists def get_album_list_page(self, offset: int, size: int = 500) -> list[dict]: response = self._request( "getAlbumList2", {"type": "alphabeticalByName", "offset": offset, "size": size}, ) return response.get("albumList2", {}).get("album", []) def get_album(self, album_id: str) -> dict: response = self._request("getAlbum", {"id": album_id}) return response.get("album", {}) def get_artist(self, artist_id: str) -> dict: response = self._request("getArtist", {"id": artist_id}) return response.get("artist", {}) def get_all_albums(self, page_size: int = 500) -> list[dict]: albums: list[dict] = [] offset = 0 while True: page = self.get_album_list_page(offset, page_size) if not page: break albums.extend(page) if len(page) < page_size: break offset += len(page) return albums def build_rows( self, entities: set[str], musicbrainz: "MusicBrainzClient", max_artists: int | None = None, max_albums: int | None = None, artist_id: str | None = None, ): # When an explicit artist_id is provided we may need the artist twice # (for artist rows and for album/song collection). Fetch it once and reuse. artist_source: list[dict] | None = None if "artist" in entities: artist_rows = 0 # If an artist_id is provided, limit artist collection to that artist only. artist_source = ([self.get_artist(artist_id)] if artist_id else self.get_artists()) for artist_index, artist in enumerate(artist_source): if not artist: continue if artist_index == 0: log_blank_line() log_artist_header(artist.get("name", ""), artist.get("id", ""), artist_index) rating = int(artist.get("userRating") or 0) if rating <= 0: log_skip(artist.get("name", ""), rating, entity="Artist") continue mbid = artist.get("musicBrainzId") or "" if not mbid: continue yield RatingRow( entity_type="artist", navidrome_id=artist.get("id", ""), mbid=mbid, title=artist.get("name", ""), artist=artist.get("name", ""), release_group_mbid=None, release_mbid=None, rating=rating, ) artist_rows += 1 if max_artists is not None and artist_rows >= max_artists: break if "album" in entities or "song" in entities: album_rows = 0 page_size = max_albums if max_albums is not None and max_albums > 0 else 500 # If an explicit artist_id was provided, reuse the previously fetched # `artist_source` value instead of calling the API again. selected_artist = None if artist_id and artist_source: # artist_source is a list with one element when artist_id was used selected_artist = artist_source[0] else: selected_artist = self.get_artist(artist_id) if artist_id else None album_source = selected_artist.get("album", []) if selected_artist else self.get_all_albums(page_size=page_size) # Only print the artist header here when we didn't already collect artist rows if selected_artist and "artist" not in entities: log_blank_line() log_artist_header(selected_artist.get("name", ""), selected_artist.get("id", ""), 0) for album in album_source: album_rows += 1 album_rating = int(album.get("userRating") or 0) album_mbid = album.get("musicBrainzId") or "" if album_mbid: logging.debug(f"Resolving release-group for album '{album.get('name','')}' release:{album_mbid}") album_release_group_mbid = resolve_release_group_mbid(musicbrainz, album_mbid) logging.debug(f"Resolved release-group for album '{album.get('name','')}' -> {album_release_group_mbid or 'NONE'}") else: album_release_group_mbid = "" # If we're collecting songs we need the album detail to know whether # to print an album header (only print it when there are rated songs # or the album itself has a rating). album_detail = None has_rated_songs = False if "song" in entities: logging.debug(f"Fetching album details for album id {album.get('id', '')}") album_detail = self.get_album(album.get("id", "")) for song in album_detail.get("song", []): if int(song.get("userRating") or 0) > 0: has_rated_songs = True break # Print album header only if the album has a rating or contains rated songs if album_rating > 0 or has_rated_songs: yield RatingRow( entity_type="album-boundary", navidrome_id=album.get("id", ""), mbid=album_release_group_mbid or "", title=album.get("name", ""), artist=album.get("artist", ""), release_group_mbid=album_release_group_mbid or None, release_mbid=album_mbid or None, rating=0, ) log_album_header( album.get("name", ""), album.get("id", ""), rating=album_rating, has_rated_songs=has_rated_songs, release_group_mbid=album_release_group_mbid or None, ) if "album" in entities: if album_rating > 0: if album_release_group_mbid: yield RatingRow( entity_type="album", navidrome_id=album.get("id", ""), mbid=album_release_group_mbid, title=album.get("name", ""), artist=album.get("artist", ""), release_group_mbid=album_release_group_mbid, release_mbid=album_mbid or None, rating=album_rating, ) if max_albums is not None and album_rows >= max_albums: break else: logging.warning(f"{LIGHT_YELLOW}Album '{album.get('name', '')}' has rating {album_rating} but no release-group MBID resolved.{RESET}") else: # If the album has no rating and no rated songs, log a compact skip line if not has_rated_songs: log_skip(album.get("name", ""), album_rating, entity="Album") if "song" not in entities: continue # album_detail may already be fetched above if album_detail is None: album_detail = self.get_album(album.get("id", "")) for song in album_detail.get("song", []): song_rating = int(song.get("userRating") or 0) # Count every track we inspect try: self.stats["tracks"] += 1 except Exception: pass if song_rating <= 0: # Skipped due to no rating try: self.stats["skipped"] += 1 except Exception: pass logging.debug(f"nr:{song_rating:.1f} | Skipping Recording: {song.get('title', '')}") continue song_mbid = song.get("musicBrainzId") or "" if not song_mbid: # Rated but no direct MBID found. If we have a release-group # match, still yield the row so the release-group expansion # path can submit the rating instead of skipping it. if album_release_group_mbid: try: self.stats["found"] += 1 except Exception: pass logging.debug( f"{LIGHT_YELLOW}nr:{song_rating:.1f} | No song MBID; using release-group fallback for Recording: {song.get('title','')} ({song.get('id','')}){RESET}" ) yield RatingRow( entity_type="song", navidrome_id=song.get("id", ""), mbid="", title=song.get("title", ""), artist=song.get("artist", album.get("artist", "")), release_group_mbid=album_release_group_mbid, release_mbid=album_mbid or None, rating=song_rating, ) continue # No direct MBID and no release-group to fall back to. try: self.stats["not_found"] += 1 except Exception: pass logging.debug( f"{LIGHT_YELLOW}nr:{song_rating:.1f} | Rated but no MBID for Recording: {song.get('title','')} ({song.get('id','')}){RESET}" ) continue # We will yield a rated track that maps directly to a recording try: self.stats["found"] += 1 except Exception: pass yield RatingRow( entity_type="song", navidrome_id=song.get("id", ""), mbid=song_mbid, title=song.get("title", ""), artist=song.get("artist", album.get("artist", "")), release_group_mbid=album_release_group_mbid, release_mbid=album_mbid or None, rating=song_rating, ) class MusicBrainzClient: def __init__(self, client: str, username: str | None, password: str | None) -> None: self.client = client self.session = requests.Session() self.session.headers.update({"User-Agent": client, "Accept": "application/json"}) if username and password: self.session.auth = HTTPDigestAuth(username, password) self._next_request_at = 0.0 def _throttle(self) -> None: now = time.monotonic() if now < self._next_request_at: time.sleep(self._next_request_at - now) self._next_request_at = time.monotonic() + 1.05 def get_json(self, path: str, params: dict[str, object], allow_404: bool = False) -> dict: """Make a MusicBrainz API GET request with retry logic and exponential backoff.""" max_retries = 3 for attempt in range(max_retries): try: self._throttle() # Ensure params are strings to satisfy requests parameter types safe_params = {k: str(v) for k, v in params.items() if v is not None} response = self.session.get(f"{MUSICBRAINZ_BASE_URL}{path}", params=safe_params, timeout=20) # Handle rate limiting if response.status_code == 429: retry_after = int(response.headers.get('Retry-After', 5)) logging.warning(f"{LIGHT_YELLOW}MusicBrainz rate limited. Retrying after {retry_after} seconds...{RESET}") time.sleep(retry_after) continue # Handle server errors with retry if response.status_code >= 500: wait_time = (attempt + 1) * 2 logging.warning(f"{LIGHT_YELLOW}MusicBrainz server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}") time.sleep(wait_time) continue if response.status_code == 404 and allow_404: logging.warning(f"{LIGHT_YELLOW}MusicBrainz resource not found for {path}. Skipping.{RESET}") return {} response.raise_for_status() return response.json() except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: wait_time = (attempt + 1) * 2 short = _format_conn_error(e, "MusicBrainz") logging.warning(f"{LIGHT_YELLOW}{short}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}") time.sleep(wait_time) continue except requests.exceptions.RequestException as e: logging.error(f"{LIGHT_RED}MusicBrainz request failed: {e}{RESET}") break # If we get here, all retries failed logging.error(f"{LIGHT_RED}Failed after {max_retries} attempts for MusicBrainz path: {path}{RESET}") raise SystemExit(1) def post_xml(self, path: str, xml_body: bytes) -> requests.Response: """Make a MusicBrainz API POST request with retry logic and exponential backoff.""" max_retries = 5 for attempt in range(max_retries): try: self._throttle() headers = {"Content-Type": "application/xml; charset=utf-8"} response = self.session.post( f"{MUSICBRAINZ_BASE_URL}{path}", params={"client": self.client}, data=xml_body, headers=headers, timeout=20, ) # Handle rate limiting if response.status_code == 429: retry_after = int(response.headers.get('Retry-After', 5)) logging.warning(f"{LIGHT_YELLOW}MusicBrainz rate limited. Retrying after {retry_after} seconds...{RESET}") time.sleep(retry_after) continue # Handle server errors with retry if response.status_code >= 500: wait_time = (attempt + 1) * 2 logging.warning(f"{LIGHT_YELLOW}MusicBrainz server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}") time.sleep(wait_time) continue response.raise_for_status() return response except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: wait_time = (attempt + 1) * 2 short = _format_conn_error(e, "MusicBrainz") logging.warning(f"{LIGHT_YELLOW}{short}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}") time.sleep(wait_time) continue except requests.exceptions.RequestException as e: logging.error(f"{LIGHT_RED}MusicBrainz request failed: {e}{RESET}") break # If we get here, all retries failed logging.error(f"{LIGHT_RED}Failed after {max_retries} attempts for MusicBrainz path: {path}{RESET}") raise SystemExit(1) def resolve_release_group_mbid(client: MusicBrainzClient, release_mbid: str) -> str: payload = client.get_json( f"/release/{release_mbid}", {"inc": "release-groups", "fmt": "json"}, allow_404=True, ) return payload.get("release-group", {}).get("id", "") def release_group_recording_ids( client: MusicBrainzClient, release_group_mbid: str, title: str, artist: str, release_mbid: str | None = None, ) -> list[str]: if not release_group_mbid: return [] wanted_title = normalize_text(title) wanted_artist = normalize_text(artist) candidates: list[str] = [] candidate_titles: list[str] = [] seen: set[str] = set() offset = 0 while True: payload = client.get_json( "/release", { "release-group": release_group_mbid, "inc": "recordings+artist-credits", "fmt": "json", "limit": 100, "offset": offset, }, allow_404=True, ) releases = payload.get("releases", []) if not releases: # Diagnostic: show what the API returned so we can debug empty results try: keys = ",".join(sorted(payload.keys())) if isinstance(payload, dict) else str(type(payload)) except Exception: keys = "(unprintable)" logging.info( f"{LIGHT_YELLOW}No releases found from /release for RG {release_group_mbid}. Payload keys: {keys}{RESET}" ) if not releases: # Try a fallback to the release-group endpoint if /release # returned no releases (sometimes the API shape differs). logging.debug(f"No releases from /release for RG {release_group_mbid}; trying /release-group fallback") fallback = client.get_json( f"/release-group/{release_group_mbid}", {"inc": "releases+media+recordings+artist-credits", "fmt": "json"}, allow_404=True, ) releases = fallback.get("releases", []) if isinstance(fallback, dict) else [] if not releases: try: keys = ",".join(sorted(fallback.keys())) if isinstance(fallback, dict) else str(type(fallback)) except Exception: keys = "(unprintable)" logging.info( f"{LIGHT_YELLOW}No releases found from /release-group for RG {release_group_mbid}. Payload keys: {keys}{RESET}" ) if not releases: # If we have a specific release MBID (album-level MBID), try # fetching that release directly as a last resort. if release_mbid: logging.debug(f"No releases in RG; trying specific release {release_mbid}") rel_payload = client.get_json( f"/release/{release_mbid}", {"inc": "recordings+media+artist-credits", "fmt": "json"}, allow_404=True, ) # Extract tracks from release media for medium in rel_payload.get("media", []): for track in medium.get("tracks", []): recording = track.get("recording") or {} recording_id = recording.get("id") if not recording_id or recording_id in seen: continue track_title = normalize_text(track.get("title")) track_artist = normalize_text( " ".join( credit.get("name", "") for credit in track.get("artist-credit", []) if isinstance(credit, dict) ) ) recording_artist = normalize_text( " ".join( credit.get("name", "") for credit in recording.get("artist-credit", []) if isinstance(credit, dict) ) ) if wanted_title and track_title and track_title != wanted_title: continue if wanted_artist and track_artist and track_artist != wanted_artist and recording_artist and recording_artist != wanted_artist: artist_matches = ( not wanted_artist or wanted_artist in track_artist or wanted_artist in recording_artist or track_artist in wanted_artist or recording_artist in wanted_artist ) if not artist_matches: continue seen.add(recording_id) candidates.append(recording_id) candidate_titles.append(track.get('title','')) if candidates: break break for release in releases: for medium in release.get("media", []): for track in medium.get("tracks", []): recording = track.get("recording") or {} recording_id = recording.get("id") if not recording_id or recording_id in seen: continue track_title = normalize_text(track.get("title")) if track_title and wanted_title and track_title != wanted_title: continue track_artist = normalize_text( " ".join( credit.get("name", "") for credit in track.get("artist-credit", []) if isinstance(credit, dict) ) ) recording_artist = normalize_text( " ".join( credit.get("name", "") for credit in recording.get("artist-credit", []) if isinstance(credit, dict) ) ) artist_matches = ( not wanted_artist or wanted_artist in track_artist or wanted_artist in recording_artist or track_artist in wanted_artist or recording_artist in wanted_artist ) if not artist_matches: continue seen.add(recording_id) candidates.append(recording_id) candidate_titles.append(track.get('title','')) if len(releases) < 100: break offset += len(releases) # If we found no candidates for an expected title, log a concise # diagnostic listing a few candidate track titles to help spot # normalization or punctuation mismatches. if not candidates and wanted_title: sample = ", ".join([t for t in candidate_titles[:10]]) or "(no tracks found)" logging.info( f"{LIGHT_YELLOW}No recordings matched title '{title}' (normalized '{wanted_title}') in RG {release_group_mbid}. Candidate titles: {sample}{RESET}" ) return candidates def build_submission(rows: list[tuple[str, str, int]]) -> bytes: root = ET.Element("metadata", {"xmlns": MUSICBRAINZ_XML_NS}) grouped: dict[str, list[tuple[str, int]]] = defaultdict(list) for entity_type, entity_id, rating in rows: grouped[entity_type].append((entity_id, rating)) entity_tags = { "artist": "artist-list", "recording": "recording-list", "release": "release-list", "release-group": "release-group-list", "work": "work-list", } for entity_type in ["artist", "recording", "release", "release-group", "work"]: values = grouped.get(entity_type, []) if not values: continue entity_list = ET.SubElement(root, entity_tags[entity_type]) for entity_id, rating in values: entity = ET.SubElement(entity_list, entity_type, {"id": entity_id}) user_rating = ET.SubElement(entity, "user-rating") user_rating.text = str(rating) return ET.tostring(root, encoding="utf-8", xml_declaration=True) def prepare_target_row( row: RatingRow, client: MusicBrainzClient, expand_release_groups: bool, ) -> list[PreparedRow]: prepared: list[PreparedRow] = [] rating = rating_to_musicbrainz(row.rating) if rating <= 0: return prepared targets: list[tuple[str, str]] = [] if row.entity_type == "song": if row.mbid: targets.append(("recording", row.mbid)) if expand_release_groups and row.release_group_mbid: logging.debug(f"Expanding release-group {row.release_group_mbid} for '{row.title}' / '{row.artist}'") recordings = release_group_recording_ids( client, row.release_group_mbid, row.title, row.artist, getattr(row, "release_mbid", None) ) logging.debug(f"Expanded release-group {row.release_group_mbid}: {len(recordings)} recordings") for recording_id in recordings: targets.append(("recording", recording_id)) if not targets and row.mbid: targets.append(("recording", row.mbid)) elif row.entity_type == "album": if row.mbid: targets.append(("release-group", row.mbid)) if row.release_group_mbid and row.release_group_mbid != row.mbid: targets.append(("release-group", row.release_group_mbid)) elif row.entity_type == "artist": if row.mbid: targets.append(("artist", row.mbid)) seen_targets: set[tuple[str, str]] = set() for entity_type, entity_id in targets: if not entity_id: continue key = (entity_type, entity_id) if key in seen_targets: continue seen_targets.add(key) prepared.append((entity_type, entity_id, rating, row)) return prepared def format_source_rating(value: int | None) -> str: if value is None: return "n/a" return str(value) def format_musicbrainz_rating(value: int | None) -> str: if value is None: return "n/a" return str(value) def rating_entity_label(entity_type: str) -> str: return { "artist": "Artist", "album": "Album", "song": "Recording", }.get(entity_type, entity_type.capitalize()) def log_rating_result( row: RatingRow, mb_rating: int, status: int | str, *, color: str = LIGHT_GREEN, ) -> None: title = row.title or row.artist or row.mbid artist = row.artist or "" label = rating_entity_label(row.entity_type) logging.info( f"{color}{label}: s:{format_source_rating(row.rating)} -> mb:{format_musicbrainz_rating(mb_rating)} | {title} / {artist}: {status}{RESET}" ) def submit_ratings(client: MusicBrainzClient, rows: list[PreparedRow], dry_run: bool) -> SubmissionCounts: # Group values by entity type but keep the original RatingRow for logging grouped_by_type: dict[str, list[tuple[str, int, RatingRow]]] = defaultdict(list) for entity_type, entity_id, rating, row in rows: grouped_by_type[entity_type].append((entity_id, rating, row)) # Collect counts per entity type to return to the caller counts = empty_submission_counts() # Submit artist ratings first, then release-groups (albums), then recordings. for entity_type in ["artist", "release-group", "recording"]: values = grouped_by_type.get(entity_type, []) counts[entity_type] = len(values) if not values: continue # For release-group batches, emit a concise summary of how many # rg_variants are in this submission batch (only for real runs). if entity_type == "release-group" and not dry_run: logging.info(f"{LIGHT_BLUE}rg_variants in this batch: {len(values)}{RESET}") submission_rows = [(entity_type, entity_id, rating) for entity_id, rating, _ in values] xml_body = build_submission(submission_rows) if dry_run: for _, mb_rating, row in values: log_rating_result(row, mb_rating, "dry-run") logging.info(f"{LIGHT_GREEN}Submitted {len(values)} {entity_type} ratings: dry-run{RESET}") continue response = client.post_xml("/rating", xml_body) # After batch submit, log per-rating status lines and a summary for entity_id, mb_rating, row in values: log_rating_result(row, mb_rating, response.status_code) logging.info(f"{LIGHT_GREEN}Submitted {len(values)} {entity_type} ratings: {response.status_code}{RESET}") return counts def flush_submission_buffer( buffer: list[PreparedRow], client: MusicBrainzClient, dry_run: bool, ) -> SubmissionCounts: if not buffer: return empty_submission_counts() # Deduplicate buffer by (entity_type, entity_id), keeping first occurrence. deduped: list[PreparedRow] = [] seen: set[tuple[str, str]] = set() for entity_type, entity_id, rating, row in buffer: key = (entity_type, entity_id) if key in seen: continue seen.add(key) deduped.append((entity_type, entity_id, rating, row)) counts = submit_ratings(client, deduped, dry_run) buffer.clear() return counts def album_batch_key(row: RatingRow) -> str: return row.release_mbid or row.release_group_mbid or row.navidrome_id def log_missing_target(row: RatingRow) -> None: title = row.title or "" artist = row.artist or "" mbid_field = f" | mbid:{row.mbid}" if row.mbid else "" logging.info( f" {LIGHT_YELLOW}{rating_entity_label(row.entity_type)}: s:{format_source_rating(row.rating)} -> mb:0 | (not found) | {title} / {artist} ({row.navidrome_id}){mbid_field}{RESET}" ) def log_prepared_targets_debug(row: RatingRow, prepared_rows: list[PreparedRow]) -> None: if row.entity_type != "song": return recording_ids = [ entity_id for entity_type, entity_id, _, _ in prepared_rows if entity_type == "recording" ] recording_list = ", ".join(recording_ids) if recording_ids else "(none)" mbid_field = f" | mbid:{row.mbid}" if row.mbid else "" logging.debug( f" {LIGHT_BLUE}Song: {row.title} ({row.navidrome_id}){mbid_field} | matched_recordings:{len(recording_ids)} | recordings:{recording_list}{RESET}" ) def flush_and_count( buffer: list[PreparedRow], client: MusicBrainzClient, dry_run: bool, submitted_counts: SubmissionCounts, ) -> None: add_submission_counts( submitted_counts, flush_submission_buffer(buffer, client, dry_run), ) def main() -> int: args = parse_args() load_dotenv_file() # Start time for run duration reporting start_time = time.time() # Set up the stream handler (console logging) without timestamp console_handler = logging.StreamHandler() console_handler.setFormatter(SafeAsciiFormatter("%(message)s")) logging.basicConfig(level=getattr(logging, args.log_level), handlers=[console_handler]) # Set up the file handler (file logging) with timestamp, matching sptnr.py file_handler = logging.FileHandler(LOGFILE, "a", encoding="ascii", errors="backslashreplace") file_handler.setFormatter(SafeAsciiFormatter("[%(asctime)s] %(message)s")) logging.getLogger().addHandler(file_handler) # Ensure colorama auto-reset for console try: init(autoreset=True) except Exception: pass logging.info(f"Version: musicbrainz-ratings-helper {SCRIPT_VERSION}") if args.dry_run: logging.info("Preview mode, no changes will be made.") navidrome_base_url = required_arg( args.navidrome_base_url, "NAVIDROME_BASE_URL", "navidrome-base-url", ) navidrome_username = required_arg( args.navidrome_username, "NAVIDROME_USERNAME", "navidrome-username", ) navidrome_password = required_arg( args.navidrome_password, "NAVIDROME_PASSWORD", "navidrome-password", ) mb_username = args.mb_username or os.environ.get("MB_USERNAME") mb_password = args.mb_password or os.environ.get("MB_PASSWORD") entities = set(args.entity or ["song", "album", "artist"]) client = MusicBrainzClient(CLIENT_NAME, mb_username, mb_password) navidrome = NavidromeClient( navidrome_base_url, navidrome_username, navidrome_password, CLIENT_NAME, ) total_artists: int | None = None if args.artist_id: total_artists = 1 else: try: total_artists = len(navidrome.get_artists()) except SystemExit: raise except Exception as exc: logging.warning(f"Could not determine total artists to process: {exc}") if total_artists is not None: logging.info(f"Total artists to process: {total_artists}") submission_buffer: list[PreparedRow] = [] submit_batch_size = 100 resolved_any = False submitted_counts = empty_submission_counts() release_groups_total: set[str] = set() album_count = 0 current_album_key: str | None = None for row in navidrome.build_rows( entities, client, args.max_artists, args.max_albums, args.artist_id, ): if row.entity_type == "album-boundary": if submission_buffer: flush_and_count(submission_buffer, client, args.dry_run, submitted_counts) current_album_key = album_batch_key(row) continue prepared_rows = prepare_target_row(row, client, args.expand_release_groups) if not prepared_rows: log_missing_target(row) continue resolved_any = True if row.entity_type == "artist": if submission_buffer: flush_and_count(submission_buffer, client, args.dry_run, submitted_counts) current_album_key = None add_submission_counts(submitted_counts, submit_ratings(client, prepared_rows, args.dry_run)) continue if row.entity_type in {"album", "song"}: next_album_key = album_batch_key(row) if current_album_key is None: current_album_key = next_album_key elif next_album_key != current_album_key: flush_and_count(submission_buffer, client, args.dry_run, submitted_counts) current_album_key = next_album_key rg_ids = {entity_id for entity_type, entity_id, _, _ in prepared_rows if entity_type == "release-group"} log_prepared_targets_debug(row, prepared_rows) # Track release-groups seen during the run for rg in rg_ids: if rg: release_groups_total.add(rg) # Track album count when a source album produced targets if row.entity_type == "album" and prepared_rows: album_count += 1 # Always buffer prepared rows; `submit_ratings()` handles formatting # so preview mode will match the real-submission output. submission_buffer.extend(prepared_rows) if len(submission_buffer) >= submit_batch_size: flush_and_count(submission_buffer, client, args.dry_run, submitted_counts) if submission_buffer: flush_and_count(submission_buffer, client, args.dry_run, submitted_counts) if not resolved_any: logging.info(f"{LIGHT_RED}No MusicBrainz targets could be resolved from the Navidrome ratings.{RESET}") return 0 # Emit a concise run summary (Tracks / Found / Skipped / Not Found / Match% / Time) elapsed = time.time() - start_time minutes = int(elapsed // 60) seconds = int(elapsed % 60) tracks = getattr(navidrome, "stats", {}).get("tracks", 0) found = getattr(navidrome, "stats", {}).get("found", 0) skipped = getattr(navidrome, "stats", {}).get("skipped", 0) not_found = getattr(navidrome, "stats", {}).get("not_found", 0) match_pct = (found / tracks * 100.0) if tracks else 0.0 release_groups_count = len(release_groups_total) artists_count = total_artists if total_artists is not None else getattr(navidrome, "stats", {}).get("artists", 0) submitted_label = "Previewed" if args.dry_run else "Submitted" logging.info( f"Artists: {artists_count} | Albums: {album_count} | rg_variants: {release_groups_count} | Tracks: {tracks} | Found: {found} | Skipped: {skipped} | Not Found: {not_found} | Match: {match_pct:.1f}% | {submitted_label}: Artists {submitted_counts['artist']}, Albums {submitted_counts['release-group']}, Recordings {submitted_counts['recording']} | Time: {minutes}m {seconds}s" ) return 0 if __name__ == "__main__": raise SystemExit(main())