1212 lines
48 KiB
Python
1212 lines
48 KiB
Python
#!/usr/bin/env python3
|
|
"""Push Navidrome ratings to MusicBrainz.
|
|
|
|
This script reads ratings from Navidrome over its Subsonic API and submits them
|
|
to the MusicBrainz XML API. It supports song, album, and artist ratings.
|
|
|
|
Song ratings are pushed to recording MBIDs. If a song belongs to a release group
|
|
and multiple releases in that group contain the same track title, all matching
|
|
recordings are rated instead of stopping at the first match.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import re
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass
|
|
import time
|
|
import xml.etree.ElementTree as ET
|
|
|
|
import requests
|
|
from requests.auth import HTTPDigestAuth
|
|
from colorama import init, Fore, Style
|
|
|
|
# Initialize colorama
|
|
init(autoreset=False)
|
|
|
|
|
|
MUSICBRAINZ_BASE_URL = "https://musicbrainz.org/ws/2"
|
|
MUSICBRAINZ_XML_NS = "http://musicbrainz.org/ns/mmd-2.0#"
|
|
CLIENT_NAME = "musicbrainz-ratings-helper-0.1.0"
|
|
SCRIPT_VERSION = "v0.1.0"
|
|
|
|
# Colors for logging
|
|
LIGHT_PURPLE = Fore.MAGENTA + Style.BRIGHT
|
|
LIGHT_GREEN = Fore.GREEN + Style.BRIGHT
|
|
LIGHT_RED = Fore.RED + Style.BRIGHT
|
|
LIGHT_BLUE = Fore.BLUE + Style.BRIGHT
|
|
LIGHT_CYAN = Fore.CYAN + Style.BRIGHT
|
|
LIGHT_YELLOW = Fore.YELLOW + Style.BRIGHT
|
|
BOLD = Style.BRIGHT
|
|
RESET = Style.RESET_ALL
|
|
|
|
|
|
class SafeAsciiFormatter(logging.Formatter):
|
|
"""Logging formatter that strips ANSI escape codes and encodes to ASCII."""
|
|
|
|
ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
|
|
|
|
def format(self, record):
|
|
rendered = super().format(record)
|
|
rendered = self.ansi_escape.sub("", rendered)
|
|
return rendered.encode("ascii", "backslashreplace").decode("ascii")
|
|
|
|
|
|
# Setup logs (match sptnr logging behavior)
|
|
LOG_DIR = "logs"
|
|
if not os.path.exists(LOG_DIR):
|
|
os.makedirs(LOG_DIR)
|
|
|
|
LOGFILE = os.path.join(LOG_DIR, f"musicbrainz-ratings-helper_{int(time.time())}.log")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RatingRow:
|
|
entity_type: str
|
|
navidrome_id: str
|
|
mbid: str
|
|
title: str
|
|
artist: str
|
|
release_group_mbid: str | None
|
|
release_mbid: str | None
|
|
rating: int
|
|
|
|
|
|
PreparedRow = tuple[str, str, int, RatingRow]
|
|
SubmissionCounts = dict[str, int]
|
|
|
|
|
|
def empty_submission_counts() -> SubmissionCounts:
|
|
return {"artist": 0, "release-group": 0, "recording": 0}
|
|
|
|
|
|
def add_submission_counts(total: SubmissionCounts, increment: SubmissionCounts) -> None:
|
|
for key in ("artist", "release-group", "recording"):
|
|
total[key] = total.get(key, 0) + increment.get(key, 0)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Push Navidrome ratings to MusicBrainz."
|
|
)
|
|
parser.add_argument(
|
|
"--navidrome-base-url",
|
|
default=None,
|
|
help="Base URL for Navidrome, for example https://navidrome.example.com.",
|
|
)
|
|
parser.add_argument(
|
|
"--navidrome-username",
|
|
default=None,
|
|
help="Navidrome username for Subsonic API auth.",
|
|
)
|
|
parser.add_argument(
|
|
"--navidrome-password",
|
|
default=None,
|
|
help="Navidrome password for Subsonic API auth.",
|
|
)
|
|
parser.add_argument(
|
|
"--entity",
|
|
action="append",
|
|
choices=["song", "album", "artist"],
|
|
help="Limit export to one or more entity types. Can be repeated.",
|
|
)
|
|
parser.add_argument(
|
|
"--expand-release-groups",
|
|
action=argparse.BooleanOptionalAction,
|
|
default=True,
|
|
help="Expand song ratings to all matching recordings in the same release group.",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Show what would be submitted without posting to MusicBrainz.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-artists",
|
|
type=int,
|
|
default=None,
|
|
help="Limit how many artist ratings are collected. Useful for short artist-only tests.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-albums",
|
|
type=int,
|
|
default=None,
|
|
help="Limit how many album ratings are collected. Useful for short album-only tests.",
|
|
)
|
|
parser.add_argument(
|
|
"--artist-id",
|
|
default=None,
|
|
help="Limit album and song processing to a single Navidrome artist ID.",
|
|
)
|
|
parser.add_argument(
|
|
"--mb-username",
|
|
default=None,
|
|
help="MusicBrainz username. Defaults to MB_USERNAME.",
|
|
)
|
|
parser.add_argument(
|
|
"--mb-password",
|
|
default=None,
|
|
help="MusicBrainz password. Defaults to MB_PASSWORD.",
|
|
)
|
|
parser.add_argument(
|
|
"--log-level",
|
|
default="INFO",
|
|
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
|
|
help="Logging verbosity.",
|
|
)
|
|
# submit order is hard-coded to artist, release-group, recording
|
|
return parser.parse_args()
|
|
|
|
|
|
def normalize_text(value: str | None) -> str:
|
|
return " ".join((value or "").strip().lower().split())
|
|
|
|
|
|
def rating_to_musicbrainz(value: int) -> int:
|
|
value = max(0, min(5, value))
|
|
return int(round(value * 20))
|
|
|
|
|
|
def required_arg(value: str | None, env_name: str, label: str) -> str:
|
|
resolved = value or os.environ.get(env_name)
|
|
if not resolved:
|
|
logging.error(f"{LIGHT_RED}Missing required {label}. Set {env_name} or pass --{label}.{RESET}")
|
|
raise SystemExit(1)
|
|
return resolved
|
|
|
|
|
|
def log_blank_line() -> None:
|
|
logging.info("")
|
|
|
|
|
|
def log_artist_header(name: str, navidrome_id: str, index: int) -> None:
|
|
logging.info(f"Artist: {name} ({navidrome_id})[{index}]")
|
|
|
|
|
|
def log_album_header(
|
|
name: str,
|
|
navidrome_id: str,
|
|
rating: int | None = None,
|
|
has_rated_songs: bool = False,
|
|
release_group_mbid: str | None = None,
|
|
) -> None:
|
|
"""Log an album header with compact rating/skip context and resolved RG MBID.
|
|
|
|
Examples:
|
|
Album: Gucci (1MrF6...) | nr:20.0 | mbidRG:45380071-f2f0...
|
|
Album: Foo (abc123) | nr:n/a | contains rated songs | mbidRG:n/a
|
|
"""
|
|
if rating is None or rating <= 0:
|
|
rating_str = "n/a"
|
|
else:
|
|
rating_str = f"{rating:.1f}"
|
|
|
|
extra = f" | nr:{rating_str}"
|
|
if has_rated_songs and (rating is None or rating <= 0):
|
|
extra += " | contains rated songs"
|
|
|
|
rg = release_group_mbid or "n/a"
|
|
extra += f" | mbidRG:{rg}"
|
|
|
|
logging.info(f" Album: {name} ({navidrome_id}){extra}")
|
|
|
|
|
|
def log_skip(name: str, rating: int, entity: str = "Item") -> None:
|
|
logging.info(f"{entity}: s:{format_source_rating(rating)} -> mb:n/a | Skipping: {name}")
|
|
|
|
|
|
def _format_conn_error(exc: Exception, label: str) -> str:
|
|
"""Return a compact, human-friendly connection error string.
|
|
|
|
Examples:
|
|
MusicBrainz connection error, musicbrainz.org:443; Read timed out (10s)
|
|
Navidrome connection error, nav.example:443; Connection aborted
|
|
"""
|
|
text = str(exc)
|
|
# Try to extract host and port from common requests.ConnectionPool formatting
|
|
m = re.search(r"host='(?P<host>[^']+)'\s*,\s*port=(?P<port>\d+)", text)
|
|
hostport = None
|
|
if m:
|
|
host = m.group("host")
|
|
port = m.group("port")
|
|
hostport = f"{host}:{port}"
|
|
|
|
# Short message: prefer 'Read timed out' or the first sentence
|
|
short_msg = None
|
|
if "Read timed out" in text:
|
|
# try to find timeout seconds
|
|
tm = re.search(r"read timeout=?(?P<secs>\d+)", text)
|
|
if tm:
|
|
short_msg = f"Read timed out ({tm.group('secs')}s)"
|
|
else:
|
|
short_msg = "Read timed out"
|
|
else:
|
|
# take up to the first period or 120 chars
|
|
short_msg = text.split(".")[0][:120]
|
|
|
|
if hostport:
|
|
return f"{label} connection error, {hostport}; {short_msg}"
|
|
return f"{label} connection error; {short_msg}"
|
|
|
|
|
|
class NavidromeClient:
|
|
def __init__(self, base_url: str, username: str, password: str, client_name: str) -> None:
|
|
self.base_url = base_url.rstrip("/")
|
|
self.username = username
|
|
self.password = password
|
|
self.client_name = client_name
|
|
self.session = requests.Session()
|
|
self._next_request_at = 0.0
|
|
# Stats collected while scanning Navidrome library
|
|
self.stats: dict[str, int] = {
|
|
"tracks": 0,
|
|
"found": 0,
|
|
"skipped": 0,
|
|
"not_found": 0,
|
|
}
|
|
|
|
def _throttle(self) -> None:
|
|
now = time.monotonic()
|
|
if now < self._next_request_at:
|
|
time.sleep(self._next_request_at - now)
|
|
self._next_request_at = time.monotonic() + 1.05
|
|
|
|
def _request(self, endpoint: str, params: dict[str, object]) -> dict:
|
|
"""Make a Navidrome API request with retry logic and exponential backoff."""
|
|
max_retries = 5
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
self._throttle()
|
|
query = {
|
|
"u": self.username,
|
|
"p": self.password,
|
|
"v": "1.16.1",
|
|
"c": self.client_name,
|
|
"f": "json",
|
|
**{k: v for k, v in params.items() if v is not None},
|
|
}
|
|
# Ensure all query params are strings for requests and type-checkers
|
|
safe_query = {k: str(v) for k, v in query.items()}
|
|
response = self.session.get(f"{self.base_url}/rest/{endpoint}", params=safe_query, timeout=10)
|
|
|
|
# Handle server errors with retry
|
|
if response.status_code >= 500:
|
|
wait_time = (attempt + 1) * 2
|
|
logging.warning(f"{LIGHT_YELLOW}Navidrome server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}")
|
|
time.sleep(wait_time)
|
|
continue
|
|
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
|
|
if payload.get("error"):
|
|
logging.error(f"{LIGHT_RED}Navidrome API error for {endpoint}: {payload['error']}{RESET}")
|
|
raise SystemExit(1)
|
|
|
|
return payload.get("subsonic-response", payload)
|
|
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
|
wait_time = (attempt + 1) * 2
|
|
short = _format_conn_error(e, "Navidrome")
|
|
logging.warning(f"{LIGHT_YELLOW}{short}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}")
|
|
time.sleep(wait_time)
|
|
continue
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"{LIGHT_RED}Navidrome request failed: {e}{RESET}")
|
|
break
|
|
|
|
# If we get here, all retries failed
|
|
logging.error(f"{LIGHT_RED}Failed after {max_retries} attempts for Navidrome endpoint: {endpoint}{RESET}")
|
|
raise SystemExit(1)
|
|
|
|
def get_artists(self) -> list[dict]:
|
|
response = self._request("getArtists", {})
|
|
artists: list[dict] = []
|
|
for index in response.get("artists", {}).get("index", []):
|
|
artists.extend(index.get("artist", []))
|
|
return artists
|
|
|
|
def get_album_list_page(self, offset: int, size: int = 500) -> list[dict]:
|
|
response = self._request(
|
|
"getAlbumList2",
|
|
{"type": "alphabeticalByName", "offset": offset, "size": size},
|
|
)
|
|
return response.get("albumList2", {}).get("album", [])
|
|
|
|
def get_album(self, album_id: str) -> dict:
|
|
response = self._request("getAlbum", {"id": album_id})
|
|
return response.get("album", {})
|
|
|
|
def get_artist(self, artist_id: str) -> dict:
|
|
response = self._request("getArtist", {"id": artist_id})
|
|
return response.get("artist", {})
|
|
|
|
def get_all_albums(self, page_size: int = 500) -> list[dict]:
|
|
albums: list[dict] = []
|
|
offset = 0
|
|
while True:
|
|
page = self.get_album_list_page(offset, page_size)
|
|
if not page:
|
|
break
|
|
albums.extend(page)
|
|
if len(page) < page_size:
|
|
break
|
|
offset += len(page)
|
|
return albums
|
|
|
|
def build_rows(
|
|
self,
|
|
entities: set[str],
|
|
musicbrainz: "MusicBrainzClient",
|
|
max_artists: int | None = None,
|
|
max_albums: int | None = None,
|
|
artist_id: str | None = None,
|
|
):
|
|
# When an explicit artist_id is provided we may need the artist twice
|
|
# (for artist rows and for album/song collection). Fetch it once and reuse.
|
|
artist_source: list[dict] | None = None
|
|
if "artist" in entities:
|
|
artist_rows = 0
|
|
# If an artist_id is provided, limit artist collection to that artist only.
|
|
artist_source = ([self.get_artist(artist_id)] if artist_id else self.get_artists())
|
|
for artist_index, artist in enumerate(artist_source):
|
|
if not artist:
|
|
continue
|
|
if artist_index == 0:
|
|
log_blank_line()
|
|
log_artist_header(artist.get("name", ""), artist.get("id", ""), artist_index)
|
|
rating = int(artist.get("userRating") or 0)
|
|
if rating <= 0:
|
|
log_skip(artist.get("name", ""), rating, entity="Artist")
|
|
continue
|
|
mbid = artist.get("musicBrainzId") or ""
|
|
if not mbid:
|
|
continue
|
|
yield RatingRow(
|
|
entity_type="artist",
|
|
navidrome_id=artist.get("id", ""),
|
|
mbid=mbid,
|
|
title=artist.get("name", ""),
|
|
artist=artist.get("name", ""),
|
|
release_group_mbid=None,
|
|
release_mbid=None,
|
|
rating=rating,
|
|
)
|
|
artist_rows += 1
|
|
if max_artists is not None and artist_rows >= max_artists:
|
|
break
|
|
|
|
if "album" in entities or "song" in entities:
|
|
album_rows = 0
|
|
page_size = max_albums if max_albums is not None and max_albums > 0 else 500
|
|
# If an explicit artist_id was provided, reuse the previously fetched
|
|
# `artist_source` value instead of calling the API again.
|
|
selected_artist = None
|
|
if artist_id and artist_source:
|
|
# artist_source is a list with one element when artist_id was used
|
|
selected_artist = artist_source[0]
|
|
else:
|
|
selected_artist = self.get_artist(artist_id) if artist_id else None
|
|
|
|
album_source = selected_artist.get("album", []) if selected_artist else self.get_all_albums(page_size=page_size)
|
|
# Only print the artist header here when we didn't already collect artist rows
|
|
if selected_artist and "artist" not in entities:
|
|
log_blank_line()
|
|
log_artist_header(selected_artist.get("name", ""), selected_artist.get("id", ""), 0)
|
|
for album in album_source:
|
|
album_rows += 1
|
|
album_rating = int(album.get("userRating") or 0)
|
|
album_mbid = album.get("musicBrainzId") or ""
|
|
if album_mbid:
|
|
logging.debug(f"Resolving release-group for album '{album.get('name','')}' release:{album_mbid}")
|
|
album_release_group_mbid = resolve_release_group_mbid(musicbrainz, album_mbid)
|
|
logging.debug(f"Resolved release-group for album '{album.get('name','')}' -> {album_release_group_mbid or 'NONE'}")
|
|
else:
|
|
album_release_group_mbid = ""
|
|
|
|
# If we're collecting songs we need the album detail to know whether
|
|
# to print an album header (only print it when there are rated songs
|
|
# or the album itself has a rating).
|
|
album_detail = None
|
|
has_rated_songs = False
|
|
if "song" in entities:
|
|
logging.debug(f"Fetching album details for album id {album.get('id', '')}")
|
|
album_detail = self.get_album(album.get("id", ""))
|
|
for song in album_detail.get("song", []):
|
|
if int(song.get("userRating") or 0) > 0:
|
|
has_rated_songs = True
|
|
break
|
|
|
|
# Print album header only if the album has a rating or contains rated songs
|
|
if album_rating > 0 or has_rated_songs:
|
|
yield RatingRow(
|
|
entity_type="album-boundary",
|
|
navidrome_id=album.get("id", ""),
|
|
mbid=album_release_group_mbid or "",
|
|
title=album.get("name", ""),
|
|
artist=album.get("artist", ""),
|
|
release_group_mbid=album_release_group_mbid or None,
|
|
release_mbid=album_mbid or None,
|
|
rating=0,
|
|
)
|
|
log_album_header(
|
|
album.get("name", ""),
|
|
album.get("id", ""),
|
|
rating=album_rating,
|
|
has_rated_songs=has_rated_songs,
|
|
release_group_mbid=album_release_group_mbid or None,
|
|
)
|
|
|
|
if "album" in entities:
|
|
if album_rating > 0:
|
|
if album_release_group_mbid:
|
|
yield RatingRow(
|
|
entity_type="album",
|
|
navidrome_id=album.get("id", ""),
|
|
mbid=album_release_group_mbid,
|
|
title=album.get("name", ""),
|
|
artist=album.get("artist", ""),
|
|
release_group_mbid=album_release_group_mbid,
|
|
release_mbid=album_mbid or None,
|
|
rating=album_rating,
|
|
)
|
|
if max_albums is not None and album_rows >= max_albums:
|
|
break
|
|
else:
|
|
logging.warning(f"{LIGHT_YELLOW}Album '{album.get('name', '')}' has rating {album_rating} but no release-group MBID resolved.{RESET}")
|
|
else:
|
|
# If the album has no rating and no rated songs, log a compact skip line
|
|
if not has_rated_songs:
|
|
log_skip(album.get("name", ""), album_rating, entity="Album")
|
|
|
|
if "song" not in entities:
|
|
continue
|
|
|
|
# album_detail may already be fetched above
|
|
if album_detail is None:
|
|
album_detail = self.get_album(album.get("id", ""))
|
|
|
|
for song in album_detail.get("song", []):
|
|
song_rating = int(song.get("userRating") or 0)
|
|
# Count every track we inspect
|
|
try:
|
|
self.stats["tracks"] += 1
|
|
except Exception:
|
|
pass
|
|
|
|
if song_rating <= 0:
|
|
# Skipped due to no rating
|
|
try:
|
|
self.stats["skipped"] += 1
|
|
except Exception:
|
|
pass
|
|
logging.debug(f"nr:{song_rating:.1f} | Skipping Recording: {song.get('title', '')}")
|
|
continue
|
|
song_mbid = song.get("musicBrainzId") or ""
|
|
if not song_mbid:
|
|
# Rated but no direct MBID found. If we have a release-group
|
|
# match, still yield the row so the release-group expansion
|
|
# path can submit the rating instead of skipping it.
|
|
if album_release_group_mbid:
|
|
try:
|
|
self.stats["found"] += 1
|
|
except Exception:
|
|
pass
|
|
logging.debug(
|
|
f"{LIGHT_YELLOW}nr:{song_rating:.1f} | No song MBID; using release-group fallback for Recording: {song.get('title','')} ({song.get('id','')}){RESET}"
|
|
)
|
|
yield RatingRow(
|
|
entity_type="song",
|
|
navidrome_id=song.get("id", ""),
|
|
mbid="",
|
|
title=song.get("title", ""),
|
|
artist=song.get("artist", album.get("artist", "")),
|
|
release_group_mbid=album_release_group_mbid,
|
|
release_mbid=album_mbid or None,
|
|
rating=song_rating,
|
|
)
|
|
continue
|
|
|
|
# No direct MBID and no release-group to fall back to.
|
|
try:
|
|
self.stats["not_found"] += 1
|
|
except Exception:
|
|
pass
|
|
logging.debug(
|
|
f"{LIGHT_YELLOW}nr:{song_rating:.1f} | Rated but no MBID for Recording: {song.get('title','')} ({song.get('id','')}){RESET}"
|
|
)
|
|
continue
|
|
|
|
# We will yield a rated track that maps directly to a recording
|
|
try:
|
|
self.stats["found"] += 1
|
|
except Exception:
|
|
pass
|
|
|
|
yield RatingRow(
|
|
entity_type="song",
|
|
navidrome_id=song.get("id", ""),
|
|
mbid=song_mbid,
|
|
title=song.get("title", ""),
|
|
artist=song.get("artist", album.get("artist", "")),
|
|
release_group_mbid=album_release_group_mbid,
|
|
release_mbid=album_mbid or None,
|
|
rating=song_rating,
|
|
)
|
|
|
|
|
|
class MusicBrainzClient:
|
|
def __init__(self, client: str, username: str | None, password: str | None) -> None:
|
|
self.client = client
|
|
self.session = requests.Session()
|
|
self.session.headers.update({"User-Agent": client, "Accept": "application/json"})
|
|
if username and password:
|
|
self.session.auth = HTTPDigestAuth(username, password)
|
|
self._next_request_at = 0.0
|
|
|
|
def _throttle(self) -> None:
|
|
now = time.monotonic()
|
|
if now < self._next_request_at:
|
|
time.sleep(self._next_request_at - now)
|
|
self._next_request_at = time.monotonic() + 1.05
|
|
|
|
def get_json(self, path: str, params: dict[str, object], allow_404: bool = False) -> dict:
|
|
"""Make a MusicBrainz API GET request with retry logic and exponential backoff."""
|
|
max_retries = 3
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
self._throttle()
|
|
# Ensure params are strings to satisfy requests parameter types
|
|
safe_params = {k: str(v) for k, v in params.items() if v is not None}
|
|
response = self.session.get(f"{MUSICBRAINZ_BASE_URL}{path}", params=safe_params, timeout=20)
|
|
|
|
# Handle rate limiting
|
|
if response.status_code == 429:
|
|
retry_after = int(response.headers.get('Retry-After', 5))
|
|
logging.warning(f"{LIGHT_YELLOW}MusicBrainz rate limited. Retrying after {retry_after} seconds...{RESET}")
|
|
time.sleep(retry_after)
|
|
continue
|
|
|
|
# Handle server errors with retry
|
|
if response.status_code >= 500:
|
|
wait_time = (attempt + 1) * 2
|
|
logging.warning(f"{LIGHT_YELLOW}MusicBrainz server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}")
|
|
time.sleep(wait_time)
|
|
continue
|
|
|
|
if response.status_code == 404 and allow_404:
|
|
logging.warning(f"{LIGHT_YELLOW}MusicBrainz resource not found for {path}. Skipping.{RESET}")
|
|
return {}
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
|
wait_time = (attempt + 1) * 2
|
|
short = _format_conn_error(e, "MusicBrainz")
|
|
logging.warning(f"{LIGHT_YELLOW}{short}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}")
|
|
time.sleep(wait_time)
|
|
continue
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"{LIGHT_RED}MusicBrainz request failed: {e}{RESET}")
|
|
break
|
|
|
|
# If we get here, all retries failed
|
|
logging.error(f"{LIGHT_RED}Failed after {max_retries} attempts for MusicBrainz path: {path}{RESET}")
|
|
raise SystemExit(1)
|
|
|
|
def post_xml(self, path: str, xml_body: bytes) -> requests.Response:
|
|
"""Make a MusicBrainz API POST request with retry logic and exponential backoff."""
|
|
max_retries = 5
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
self._throttle()
|
|
headers = {"Content-Type": "application/xml; charset=utf-8"}
|
|
response = self.session.post(
|
|
f"{MUSICBRAINZ_BASE_URL}{path}",
|
|
params={"client": self.client},
|
|
data=xml_body,
|
|
headers=headers,
|
|
timeout=20,
|
|
)
|
|
|
|
# Handle rate limiting
|
|
if response.status_code == 429:
|
|
retry_after = int(response.headers.get('Retry-After', 5))
|
|
logging.warning(f"{LIGHT_YELLOW}MusicBrainz rate limited. Retrying after {retry_after} seconds...{RESET}")
|
|
time.sleep(retry_after)
|
|
continue
|
|
|
|
# Handle server errors with retry
|
|
if response.status_code >= 500:
|
|
wait_time = (attempt + 1) * 2
|
|
logging.warning(f"{LIGHT_YELLOW}MusicBrainz server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}")
|
|
time.sleep(wait_time)
|
|
continue
|
|
|
|
response.raise_for_status()
|
|
return response
|
|
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
|
wait_time = (attempt + 1) * 2
|
|
short = _format_conn_error(e, "MusicBrainz")
|
|
logging.warning(f"{LIGHT_YELLOW}{short}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...{RESET}")
|
|
time.sleep(wait_time)
|
|
continue
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"{LIGHT_RED}MusicBrainz request failed: {e}{RESET}")
|
|
break
|
|
|
|
# If we get here, all retries failed
|
|
logging.error(f"{LIGHT_RED}Failed after {max_retries} attempts for MusicBrainz path: {path}{RESET}")
|
|
raise SystemExit(1)
|
|
|
|
|
|
def resolve_release_group_mbid(client: MusicBrainzClient, release_mbid: str) -> str:
|
|
payload = client.get_json(
|
|
f"/release/{release_mbid}",
|
|
{"inc": "release-groups", "fmt": "json"},
|
|
allow_404=True,
|
|
)
|
|
return payload.get("release-group", {}).get("id", "")
|
|
|
|
|
|
def release_group_recording_ids(
|
|
client: MusicBrainzClient,
|
|
release_group_mbid: str,
|
|
title: str,
|
|
artist: str,
|
|
release_mbid: str | None = None,
|
|
) -> list[str]:
|
|
if not release_group_mbid:
|
|
return []
|
|
|
|
wanted_title = normalize_text(title)
|
|
wanted_artist = normalize_text(artist)
|
|
candidates: list[str] = []
|
|
candidate_titles: list[str] = []
|
|
seen: set[str] = set()
|
|
|
|
offset = 0
|
|
while True:
|
|
payload = client.get_json(
|
|
"/release",
|
|
{
|
|
"release-group": release_group_mbid,
|
|
"inc": "recordings+artist-credits",
|
|
"fmt": "json",
|
|
"limit": 100,
|
|
"offset": offset,
|
|
},
|
|
allow_404=True,
|
|
)
|
|
releases = payload.get("releases", [])
|
|
if not releases:
|
|
# Diagnostic: show what the API returned so we can debug empty results
|
|
try:
|
|
keys = ",".join(sorted(payload.keys())) if isinstance(payload, dict) else str(type(payload))
|
|
except Exception:
|
|
keys = "(unprintable)"
|
|
logging.info(
|
|
f"{LIGHT_YELLOW}No releases found from /release for RG {release_group_mbid}. Payload keys: {keys}{RESET}"
|
|
)
|
|
if not releases:
|
|
# Try a fallback to the release-group endpoint if /release
|
|
# returned no releases (sometimes the API shape differs).
|
|
logging.debug(f"No releases from /release for RG {release_group_mbid}; trying /release-group fallback")
|
|
fallback = client.get_json(
|
|
f"/release-group/{release_group_mbid}",
|
|
{"inc": "releases+media+recordings+artist-credits", "fmt": "json"},
|
|
allow_404=True,
|
|
)
|
|
releases = fallback.get("releases", []) if isinstance(fallback, dict) else []
|
|
if not releases:
|
|
try:
|
|
keys = ",".join(sorted(fallback.keys())) if isinstance(fallback, dict) else str(type(fallback))
|
|
except Exception:
|
|
keys = "(unprintable)"
|
|
logging.info(
|
|
f"{LIGHT_YELLOW}No releases found from /release-group for RG {release_group_mbid}. Payload keys: {keys}{RESET}"
|
|
)
|
|
if not releases:
|
|
# If we have a specific release MBID (album-level MBID), try
|
|
# fetching that release directly as a last resort.
|
|
if release_mbid:
|
|
logging.debug(f"No releases in RG; trying specific release {release_mbid}")
|
|
rel_payload = client.get_json(
|
|
f"/release/{release_mbid}",
|
|
{"inc": "recordings+media+artist-credits", "fmt": "json"},
|
|
allow_404=True,
|
|
)
|
|
# Extract tracks from release media
|
|
for medium in rel_payload.get("media", []):
|
|
for track in medium.get("tracks", []):
|
|
recording = track.get("recording") or {}
|
|
recording_id = recording.get("id")
|
|
if not recording_id or recording_id in seen:
|
|
continue
|
|
track_title = normalize_text(track.get("title"))
|
|
track_artist = normalize_text(
|
|
" ".join(
|
|
credit.get("name", "")
|
|
for credit in track.get("artist-credit", [])
|
|
if isinstance(credit, dict)
|
|
)
|
|
)
|
|
recording_artist = normalize_text(
|
|
" ".join(
|
|
credit.get("name", "")
|
|
for credit in recording.get("artist-credit", [])
|
|
if isinstance(credit, dict)
|
|
)
|
|
)
|
|
if wanted_title and track_title and track_title != wanted_title:
|
|
continue
|
|
if wanted_artist and track_artist and track_artist != wanted_artist and recording_artist and recording_artist != wanted_artist:
|
|
artist_matches = (
|
|
not wanted_artist
|
|
or wanted_artist in track_artist
|
|
or wanted_artist in recording_artist
|
|
or track_artist in wanted_artist
|
|
or recording_artist in wanted_artist
|
|
)
|
|
if not artist_matches:
|
|
continue
|
|
seen.add(recording_id)
|
|
candidates.append(recording_id)
|
|
candidate_titles.append(track.get('title',''))
|
|
if candidates:
|
|
break
|
|
break
|
|
|
|
for release in releases:
|
|
for medium in release.get("media", []):
|
|
for track in medium.get("tracks", []):
|
|
recording = track.get("recording") or {}
|
|
recording_id = recording.get("id")
|
|
if not recording_id or recording_id in seen:
|
|
continue
|
|
|
|
track_title = normalize_text(track.get("title"))
|
|
if track_title and wanted_title and track_title != wanted_title:
|
|
continue
|
|
|
|
track_artist = normalize_text(
|
|
" ".join(
|
|
credit.get("name", "")
|
|
for credit in track.get("artist-credit", [])
|
|
if isinstance(credit, dict)
|
|
)
|
|
)
|
|
recording_artist = normalize_text(
|
|
" ".join(
|
|
credit.get("name", "")
|
|
for credit in recording.get("artist-credit", [])
|
|
if isinstance(credit, dict)
|
|
)
|
|
)
|
|
|
|
artist_matches = (
|
|
not wanted_artist
|
|
or wanted_artist in track_artist
|
|
or wanted_artist in recording_artist
|
|
or track_artist in wanted_artist
|
|
or recording_artist in wanted_artist
|
|
)
|
|
if not artist_matches:
|
|
continue
|
|
|
|
seen.add(recording_id)
|
|
candidates.append(recording_id)
|
|
candidate_titles.append(track.get('title',''))
|
|
|
|
if len(releases) < 100:
|
|
break
|
|
offset += len(releases)
|
|
|
|
# If we found no candidates for an expected title, log a concise
|
|
# diagnostic listing a few candidate track titles to help spot
|
|
# normalization or punctuation mismatches.
|
|
if not candidates and wanted_title:
|
|
sample = ", ".join([t for t in candidate_titles[:10]]) or "(no tracks found)"
|
|
logging.info(
|
|
f"{LIGHT_YELLOW}No recordings matched title '{title}' (normalized '{wanted_title}') in RG {release_group_mbid}. Candidate titles: {sample}{RESET}"
|
|
)
|
|
|
|
return candidates
|
|
|
|
|
|
def build_submission(rows: list[tuple[str, str, int]]) -> bytes:
|
|
root = ET.Element("metadata", {"xmlns": MUSICBRAINZ_XML_NS})
|
|
grouped: dict[str, list[tuple[str, int]]] = defaultdict(list)
|
|
for entity_type, entity_id, rating in rows:
|
|
grouped[entity_type].append((entity_id, rating))
|
|
|
|
entity_tags = {
|
|
"artist": "artist-list",
|
|
"recording": "recording-list",
|
|
"release": "release-list",
|
|
"release-group": "release-group-list",
|
|
"work": "work-list",
|
|
}
|
|
|
|
for entity_type in ["artist", "recording", "release", "release-group", "work"]:
|
|
values = grouped.get(entity_type, [])
|
|
if not values:
|
|
continue
|
|
entity_list = ET.SubElement(root, entity_tags[entity_type])
|
|
for entity_id, rating in values:
|
|
entity = ET.SubElement(entity_list, entity_type, {"id": entity_id})
|
|
user_rating = ET.SubElement(entity, "user-rating")
|
|
user_rating.text = str(rating)
|
|
|
|
return ET.tostring(root, encoding="utf-8", xml_declaration=True)
|
|
|
|
|
|
def prepare_target_row(
|
|
row: RatingRow,
|
|
client: MusicBrainzClient,
|
|
expand_release_groups: bool,
|
|
) -> list[PreparedRow]:
|
|
prepared: list[PreparedRow] = []
|
|
rating = rating_to_musicbrainz(row.rating)
|
|
if rating <= 0:
|
|
return prepared
|
|
|
|
targets: list[tuple[str, str]] = []
|
|
|
|
if row.entity_type == "song":
|
|
if row.mbid:
|
|
targets.append(("recording", row.mbid))
|
|
if expand_release_groups and row.release_group_mbid:
|
|
logging.debug(f"Expanding release-group {row.release_group_mbid} for '{row.title}' / '{row.artist}'")
|
|
recordings = release_group_recording_ids(
|
|
client, row.release_group_mbid, row.title, row.artist, getattr(row, "release_mbid", None)
|
|
)
|
|
logging.debug(f"Expanded release-group {row.release_group_mbid}: {len(recordings)} recordings")
|
|
for recording_id in recordings:
|
|
targets.append(("recording", recording_id))
|
|
if not targets and row.mbid:
|
|
targets.append(("recording", row.mbid))
|
|
|
|
elif row.entity_type == "album":
|
|
if row.mbid:
|
|
targets.append(("release-group", row.mbid))
|
|
if row.release_group_mbid and row.release_group_mbid != row.mbid:
|
|
targets.append(("release-group", row.release_group_mbid))
|
|
|
|
elif row.entity_type == "artist":
|
|
if row.mbid:
|
|
targets.append(("artist", row.mbid))
|
|
|
|
seen_targets: set[tuple[str, str]] = set()
|
|
for entity_type, entity_id in targets:
|
|
if not entity_id:
|
|
continue
|
|
key = (entity_type, entity_id)
|
|
if key in seen_targets:
|
|
continue
|
|
seen_targets.add(key)
|
|
prepared.append((entity_type, entity_id, rating, row))
|
|
|
|
return prepared
|
|
|
|
|
|
def format_source_rating(value: int | None) -> str:
|
|
if value is None:
|
|
return "n/a"
|
|
return str(value)
|
|
|
|
|
|
def format_musicbrainz_rating(value: int | None) -> str:
|
|
if value is None:
|
|
return "n/a"
|
|
return str(value)
|
|
|
|
|
|
def rating_entity_label(entity_type: str) -> str:
|
|
return {
|
|
"artist": "Artist",
|
|
"album": "Album",
|
|
"song": "Recording",
|
|
}.get(entity_type, entity_type.capitalize())
|
|
|
|
|
|
def log_rating_result(
|
|
row: RatingRow,
|
|
mb_rating: int,
|
|
status: int | str,
|
|
*,
|
|
color: str = LIGHT_GREEN,
|
|
) -> None:
|
|
title = row.title or row.artist or row.mbid
|
|
artist = row.artist or ""
|
|
label = rating_entity_label(row.entity_type)
|
|
logging.info(
|
|
f"{color}{label}: s:{format_source_rating(row.rating)} -> mb:{format_musicbrainz_rating(mb_rating)} | {title} / {artist}: {status}{RESET}"
|
|
)
|
|
|
|
|
|
def submit_ratings(client: MusicBrainzClient, rows: list[PreparedRow], dry_run: bool) -> SubmissionCounts:
|
|
# Group values by entity type but keep the original RatingRow for logging
|
|
grouped_by_type: dict[str, list[tuple[str, int, RatingRow]]] = defaultdict(list)
|
|
for entity_type, entity_id, rating, row in rows:
|
|
grouped_by_type[entity_type].append((entity_id, rating, row))
|
|
|
|
# Collect counts per entity type to return to the caller
|
|
counts = empty_submission_counts()
|
|
|
|
# Submit artist ratings first, then release-groups (albums), then recordings.
|
|
for entity_type in ["artist", "release-group", "recording"]:
|
|
values = grouped_by_type.get(entity_type, [])
|
|
counts[entity_type] = len(values)
|
|
if not values:
|
|
continue
|
|
|
|
# For release-group batches, emit a concise summary of how many
|
|
# rg_variants are in this submission batch (only for real runs).
|
|
if entity_type == "release-group" and not dry_run:
|
|
logging.info(f"{LIGHT_BLUE}rg_variants in this batch: {len(values)}{RESET}")
|
|
|
|
submission_rows = [(entity_type, entity_id, rating) for entity_id, rating, _ in values]
|
|
xml_body = build_submission(submission_rows)
|
|
|
|
if dry_run:
|
|
for _, mb_rating, row in values:
|
|
log_rating_result(row, mb_rating, "dry-run")
|
|
logging.info(f"{LIGHT_GREEN}Submitted {len(values)} {entity_type} ratings: dry-run{RESET}")
|
|
continue
|
|
|
|
response = client.post_xml("/rating", xml_body)
|
|
|
|
# After batch submit, log per-rating status lines and a summary
|
|
for entity_id, mb_rating, row in values:
|
|
log_rating_result(row, mb_rating, response.status_code)
|
|
|
|
logging.info(f"{LIGHT_GREEN}Submitted {len(values)} {entity_type} ratings: {response.status_code}{RESET}")
|
|
|
|
return counts
|
|
|
|
|
|
def flush_submission_buffer(
|
|
buffer: list[PreparedRow],
|
|
client: MusicBrainzClient,
|
|
dry_run: bool,
|
|
) -> SubmissionCounts:
|
|
if not buffer:
|
|
return empty_submission_counts()
|
|
# Deduplicate buffer by (entity_type, entity_id), keeping first occurrence.
|
|
deduped: list[PreparedRow] = []
|
|
seen: set[tuple[str, str]] = set()
|
|
for entity_type, entity_id, rating, row in buffer:
|
|
key = (entity_type, entity_id)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
deduped.append((entity_type, entity_id, rating, row))
|
|
|
|
counts = submit_ratings(client, deduped, dry_run)
|
|
buffer.clear()
|
|
return counts
|
|
|
|
|
|
def album_batch_key(row: RatingRow) -> str:
|
|
return row.release_mbid or row.release_group_mbid or row.navidrome_id
|
|
|
|
|
|
def log_missing_target(row: RatingRow) -> None:
|
|
title = row.title or ""
|
|
artist = row.artist or ""
|
|
mbid_field = f" | mbid:{row.mbid}" if row.mbid else ""
|
|
logging.info(
|
|
f" {LIGHT_YELLOW}{rating_entity_label(row.entity_type)}: s:{format_source_rating(row.rating)} -> mb:0 | (not found) | {title} / {artist} ({row.navidrome_id}){mbid_field}{RESET}"
|
|
)
|
|
|
|
|
|
def log_prepared_targets_debug(row: RatingRow, prepared_rows: list[PreparedRow]) -> None:
|
|
if row.entity_type != "song":
|
|
return
|
|
recording_ids = [
|
|
entity_id
|
|
for entity_type, entity_id, _, _ in prepared_rows
|
|
if entity_type == "recording"
|
|
]
|
|
recording_list = ", ".join(recording_ids) if recording_ids else "(none)"
|
|
mbid_field = f" | mbid:{row.mbid}" if row.mbid else ""
|
|
logging.debug(
|
|
f" {LIGHT_BLUE}Song: {row.title} ({row.navidrome_id}){mbid_field} | matched_recordings:{len(recording_ids)} | recordings:{recording_list}{RESET}"
|
|
)
|
|
|
|
|
|
def flush_and_count(
|
|
buffer: list[PreparedRow],
|
|
client: MusicBrainzClient,
|
|
dry_run: bool,
|
|
submitted_counts: SubmissionCounts,
|
|
) -> None:
|
|
add_submission_counts(
|
|
submitted_counts,
|
|
flush_submission_buffer(buffer, client, dry_run),
|
|
)
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
|
|
# Start time for run duration reporting
|
|
start_time = time.time()
|
|
|
|
# Set up the stream handler (console logging) without timestamp
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(SafeAsciiFormatter("%(message)s"))
|
|
logging.basicConfig(level=getattr(logging, args.log_level), handlers=[console_handler])
|
|
|
|
# Set up the file handler (file logging) with timestamp, matching sptnr.py
|
|
file_handler = logging.FileHandler(LOGFILE, "a", encoding="ascii", errors="backslashreplace")
|
|
file_handler.setFormatter(SafeAsciiFormatter("[%(asctime)s] %(message)s"))
|
|
logging.getLogger().addHandler(file_handler)
|
|
# Ensure colorama auto-reset for console
|
|
try:
|
|
init(autoreset=True)
|
|
except Exception:
|
|
pass
|
|
|
|
logging.info(f"Version: musicbrainz-ratings-helper {SCRIPT_VERSION}")
|
|
if args.dry_run:
|
|
logging.info("Preview mode, no changes will be made.")
|
|
|
|
navidrome_base_url = required_arg(
|
|
args.navidrome_base_url,
|
|
"NAVIDROME_BASE_URL",
|
|
"navidrome-base-url",
|
|
)
|
|
navidrome_username = required_arg(
|
|
args.navidrome_username,
|
|
"NAVIDROME_USERNAME",
|
|
"navidrome-username",
|
|
)
|
|
navidrome_password = required_arg(
|
|
args.navidrome_password,
|
|
"NAVIDROME_PASSWORD",
|
|
"navidrome-password",
|
|
)
|
|
mb_username = args.mb_username or os.environ.get("MB_USERNAME")
|
|
mb_password = args.mb_password or os.environ.get("MB_PASSWORD")
|
|
|
|
entities = set(args.entity or ["song", "album", "artist"])
|
|
|
|
client = MusicBrainzClient(CLIENT_NAME, mb_username, mb_password)
|
|
navidrome = NavidromeClient(
|
|
navidrome_base_url,
|
|
navidrome_username,
|
|
navidrome_password,
|
|
CLIENT_NAME,
|
|
)
|
|
|
|
total_artists: int | None = None
|
|
if args.artist_id:
|
|
total_artists = 1
|
|
else:
|
|
try:
|
|
total_artists = len(navidrome.get_artists())
|
|
except SystemExit:
|
|
raise
|
|
except Exception as exc:
|
|
logging.warning(f"Could not determine total artists to process: {exc}")
|
|
if total_artists is not None:
|
|
logging.info(f"Total artists to process: {total_artists}")
|
|
|
|
submission_buffer: list[PreparedRow] = []
|
|
submit_batch_size = 100
|
|
resolved_any = False
|
|
submitted_counts = empty_submission_counts()
|
|
release_groups_total: set[str] = set()
|
|
album_count = 0
|
|
current_album_key: str | None = None
|
|
|
|
for row in navidrome.build_rows(
|
|
entities,
|
|
client,
|
|
args.max_artists,
|
|
args.max_albums,
|
|
args.artist_id,
|
|
):
|
|
if row.entity_type == "album-boundary":
|
|
if submission_buffer:
|
|
flush_and_count(submission_buffer, client, args.dry_run, submitted_counts)
|
|
current_album_key = album_batch_key(row)
|
|
continue
|
|
|
|
prepared_rows = prepare_target_row(row, client, args.expand_release_groups)
|
|
if not prepared_rows:
|
|
log_missing_target(row)
|
|
continue
|
|
resolved_any = True
|
|
|
|
if row.entity_type == "artist":
|
|
if submission_buffer:
|
|
flush_and_count(submission_buffer, client, args.dry_run, submitted_counts)
|
|
current_album_key = None
|
|
add_submission_counts(submitted_counts, submit_ratings(client, prepared_rows, args.dry_run))
|
|
continue
|
|
|
|
if row.entity_type in {"album", "song"}:
|
|
next_album_key = album_batch_key(row)
|
|
if current_album_key is None:
|
|
current_album_key = next_album_key
|
|
elif next_album_key != current_album_key:
|
|
flush_and_count(submission_buffer, client, args.dry_run, submitted_counts)
|
|
current_album_key = next_album_key
|
|
|
|
rg_ids = {entity_id for entity_type, entity_id, _, _ in prepared_rows if entity_type == "release-group"}
|
|
log_prepared_targets_debug(row, prepared_rows)
|
|
|
|
# Track release-groups seen during the run
|
|
for rg in rg_ids:
|
|
if rg:
|
|
release_groups_total.add(rg)
|
|
# Track album count when a source album produced targets
|
|
if row.entity_type == "album" and prepared_rows:
|
|
album_count += 1
|
|
|
|
# Always buffer prepared rows; `submit_ratings()` handles formatting
|
|
# so preview mode will match the real-submission output.
|
|
submission_buffer.extend(prepared_rows)
|
|
if len(submission_buffer) >= submit_batch_size:
|
|
flush_and_count(submission_buffer, client, args.dry_run, submitted_counts)
|
|
|
|
if submission_buffer:
|
|
flush_and_count(submission_buffer, client, args.dry_run, submitted_counts)
|
|
|
|
if not resolved_any:
|
|
logging.info(f"{LIGHT_RED}No MusicBrainz targets could be resolved from the Navidrome ratings.{RESET}")
|
|
return 0
|
|
|
|
# Emit a concise run summary (Tracks / Found / Skipped / Not Found / Match% / Time)
|
|
elapsed = time.time() - start_time
|
|
minutes = int(elapsed // 60)
|
|
seconds = int(elapsed % 60)
|
|
tracks = getattr(navidrome, "stats", {}).get("tracks", 0)
|
|
found = getattr(navidrome, "stats", {}).get("found", 0)
|
|
skipped = getattr(navidrome, "stats", {}).get("skipped", 0)
|
|
not_found = getattr(navidrome, "stats", {}).get("not_found", 0)
|
|
match_pct = (found / tracks * 100.0) if tracks else 0.0
|
|
release_groups_count = len(release_groups_total)
|
|
artists_count = total_artists if total_artists is not None else getattr(navidrome, "stats", {}).get("artists", 0)
|
|
submitted_label = "Previewed" if args.dry_run else "Submitted"
|
|
logging.info(
|
|
f"Artists: {artists_count} | Albums: {album_count} | rg_variants: {release_groups_count} | Tracks: {tracks} | Found: {found} | Skipped: {skipped} | Not Found: {not_found} | Match: {match_pct:.1f}% | {submitted_label}: Artists {submitted_counts['artist']}, Albums {submitted_counts['release-group']}, Recordings {submitted_counts['recording']} | Time: {minutes}m {seconds}s"
|
|
)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|