17 Commits

Author SHA1 Message Date
sickprodigy 63ea596a9f fix: add .venv to .gitignore to exclude virtual environment files 2026-06-07 12:24:26 -04:00
sickprodigy b6fd70b05a feat: enhance README with support for Last.fm and MusicBrainz, add unrated-only mode and clarify provider scoring 2026-05-30 17:43:52 -04:00
sickprodigy 4581c5ac20 feat: add support for multiple popularity providers and unrated-only mode in track processing 2026-05-30 17:43:52 -04:00
sickprodigy fe0cc31b0d fix: update .gitignore to include __pycache__ and logs directory 2026-05-30 17:43:51 -04:00
sickprodigy 23ba6bb4ee fix: add LASTFM_API_KEY to environment example 2026-05-30 17:43:51 -04:00
EffakT e3997167e3 fix: ZeroDivisionError when Total_Tracks is 0 2025-06-05 09:04:56 +12:00
EffakT c1c20adbfb Move lock file to logs dir to fix docker support 2025-06-02 10:47:04 +12:00
EffakT 26e22cd385 Switch lock file to atomic, to prevent corruption, More ratelimit delay to run on album instead of song.
Don't remove song variations form parentheses (remix, instrumental, etc)
2025-06-01 09:03:27 +12:00
EffakT c52a6eeb9e handle corrupted lock files 2025-05-28 08:58:22 +12:00
EffakT 76e2bca65d refactor: should be called lock, not cache, Adding lock jitter to prevent a lock stampede, unless duration is set to 0. Lock even if track is not found. 2025-05-28 08:48:07 +12:00
EffakT 173f893410 Add documentation for --cache-duration argument 2025-05-27 12:41:01 +12:00
EffakT 1896c500df Add per-song update cache with configurable duration, and force-update option. 2025-05-27 09:15:17 +12:00
EffakT ba0c066bf8 fix: output index is incorrect when start arg is supplied. 2025-05-27 09:00:34 +12:00
EffakT 4599a8c392 Implement token refreshing 2025-05-27 08:55:45 +12:00
EffakT 8aec18a580 Implement retry functionality, with a sleep to help with ratelimiting 2025-05-27 08:42:48 +12:00
Kevin Restaino ffc501a48d feat: arm64 and amd64 builds 2024-01-09 13:59:10 -05:00
Kevin Restaino e20f9155cb chore: create LICENSE 2024-01-08 18:15:10 -05:00
7 changed files with 788 additions and 133 deletions
+2 -1
View File
@@ -2,4 +2,5 @@ NAV_BASE_URL=your_navidrome_server_url
NAV_USER=your_navidrome_username
NAV_PASS=your_navidrome_password
SPOTIFY_CLIENT_ID=your_spotify_client_id
SPOTIFY_CLIENT_SECRET=your_spotify_client_secret
SPOTIFY_CLIENT_SECRET=your_spotify_client_secret
LASTFM_API_KEY=your_lastfm_api_key
+3 -1
View File
@@ -1,3 +1,5 @@
logs/*.log
logs/
__pycache__/
.venv/
.env
docker-compose.yml
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 Kevin Restaino
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+54 -10
View File
@@ -15,7 +15,7 @@ This script was developed as a solution to repurpose the star ratings in Navidro
7. [Examples](#examples)
8. [Resuming Interrupted Sessions](#resuming-interrupted-sessions)
9. [Managing Docker Containers](#managing-docker-containers)
10. [Mapping Spotify Popularity to Navidrome Ratings](#mapping-spotify-popularity-to-navidrome-ratings)
10. [Mapping Spotify/Lastfm/MusicBrainz Popularity to Navidrome Ratings](#mapping-provider-scores-to-navidrome-ratings)
11. [Estimated Processing Times](#estimated-processing-times)
12. [Importance of Accurate Metadata for Track Lookup](#importance-of-accurate-metadata-for-track-lookup)
13. [Logs](#logs)
@@ -23,8 +23,11 @@ This script was developed as a solution to repurpose the star ratings in Navidro
## Features
- **Spotify Integration**: Connects to Spotify's API to fetch popularity data for tracks.
- **Navidrome Integration**: Updates track ratings in Navidrome based on Spotify popularity.
- **Last.fm Support**: Can use Last.fm artist top-track position, listeners, and playcounts as an alternate popularity source. (This is a great option from spotify, will pull ratings on most songs.)
- **MusicBrainz Support**: Can use MusicBrainz community ratings as a third rating source. (Currently not a lot of ratings available, but worth having as an option for the future.)
- **Navidrome Integration**: Updates track ratings in Navidrome based on provider scores.
- **Flexible Processing**: Process specific artists, albums, or a range of artists or albums.
- **Unrated-Only Mode**: Skip tracks that already have a rating and only update unrated songs.
- **Preview Mode**: Run the script in preview mode to see changes without making any actual updates.
- **Logging**: Detailed logging of the process, both in the console and to a file.
- **Docker Support**: Run the script in a Docker container for consistent environments and ease of use.
@@ -32,8 +35,9 @@ This script was developed as a solution to repurpose the star ratings in Navidro
## Requirements
- Python 3.x or Docker
- A Spotify Developer account with an API key ([Get your Spotify API key here](https://developer.spotify.com/dashboard/create))
- Access to a Navidrome server
- Optional: A Spotify Developer account with an API key ([Get your Spotify API key here](https://developer.spotify.com/dashboard/create))
- Optional: a Last.fm API key if you want to use Last.fm as the popularity provider
**Compatibility Note**: While this script was built with Navidrome in mind, it should theoretically work on any Subsonic server. If you successfully use it with other Subsonic servers, please open an issue to let me know, so I can document it and assist others.
@@ -51,6 +55,8 @@ docker run -t \
krestaino/sptnr:latest
```
If you want to use Last.fm instead of Spotify, add `-e LASTFM_API_KEY=your_lastfm_api_key` and run with `--provider lastfm`.
**Note**: The `-t` flag is used to allocate a pseudo-terminal which assists in displaying colored and bold text in the terminal output, which this script uses.
### Using Docker Compose
@@ -138,6 +144,9 @@ The script supports various options for flexible usage. Below are examples of ho
- `-b, --album ALBUM_ID`: Process a specific album. Multiple albums can be specified.
- `-s, --start START_INDEX`: Start processing from the artist at the specified index (0-based).
- `-l, --limit LIMIT`: Limit the processing to a specific number of artists from the start index.
- `-d, --lock-duration DURATION`: Number of days to lock song updates (0 to force update every time).
- `--provider spotify|lastfm|musicbrainz`: Choose the source used to derive the rating. Spotify is the default.
- `--unrated-only`: Only update songs that do not already have a Navidrome rating.
### Command Formats
@@ -187,6 +196,32 @@ The script supports various options for flexible usage. Below are examples of ho
- Docker Compose: `docker-compose run sptnr -s 10 -l 5`
- Docker Run: `docker run -t [env vars] krestaino/sptnr:latest -s 10 -l 5`
- **Only Update Unrated Songs**:
Skip tracks that already have a rating in Navidrome.
- Python: `python sptnr.py --unrated-only`
- Docker Compose: `docker-compose run sptnr --unrated-only`
- Docker Run: `docker run -t [env vars] krestaino/sptnr:latest --unrated-only`
- **Use Last.fm as the Provider**:
Use Last.fm artist top-track position, listener, and playcount data instead of Spotify popularity.
- Python: `LASTFM_API_KEY=... python sptnr.py --provider lastfm`
- Docker Compose: `docker-compose run -e LASTFM_API_KEY=... sptnr --provider lastfm`
- Docker Run: `docker run -t -e LASTFM_API_KEY=... [env vars] krestaino/sptnr:latest --provider lastfm`
Last.fm does not provide a Spotify-style popularity number, so the script derives one from three signals:
- **Artist top-track position**: where the track appears in Last.fm's `artist.getTopTracks` results for that artist. This is the largest part of the score.
- **Listener reach**: the track's unique Last.fm listener count, scaled logarithmically.
- **Replay engagement**: plays per listener, used as a small capped bonus.
This keeps ordinary catalog tracks mostly in the 2-3 star range while letting occasional artist standouts reach 4-5 stars.
- **Use MusicBrainz as the Provider**:
Use MusicBrainz ratings to derive the same 0-5 Navidrome rating.
- Python: `python sptnr.py --provider musicbrainz`
- Docker Compose: `docker-compose run sptnr --provider musicbrainz`
- Docker Run: `docker run -t [env vars] krestaino/sptnr:latest --provider musicbrainz`
## Resuming Interrupted Sessions
In cases where your session gets interrupted - for instance, if your machine goes to sleep, you encounter rate limits from Spotify, or for any other reason that causes the script to not complete - you have the option to resume from where you left off.
@@ -213,9 +248,18 @@ In this project, `docker-compose run` is used instead of `docker-compose up`. Th
docker container prune
```
## Mapping Spotify Popularity to Navidrome Ratings
## Mapping Provider Scores to Navidrome Ratings
The script translates Spotify's popularity metric, which ranges from 0 to 100, into Navidrome's 5-star rating system. This conversion allows you to quickly gauge a track's popularity on Spotify directly within Navidrome. The mapping is as follows:
The script translates provider scores into Navidrome's 5-star rating system. Each provider gets its score a little differently:
- **Spotify**: Uses Spotify's native track `popularity` value, which is already a 0 to 100 score.
- **Last.fm**: Derives a 0 to 100 score from three signals:
- artist top-track position from Last.fm's `artist.getTopTracks` results
- listener reach from the track's unique listener count
- replay engagement from plays per listener
- **MusicBrainz**: Reads the community recording rating from MusicBrainz's recording lookup endpoint. MusicBrainz ratings are already on a 0 to 5 scale, so they are rounded directly to Navidrome's 0 to 5 rating range instead of using the 0 to 100 mapping below.
For Spotify and Last.fm, the resulting 0 to 100 score is mapped as follows:
- **0 to 16**: Mapped to 0 stars in Navidrome (Not popular)
- **17 to 33**: Mapped to 1 star (Low popularity)
@@ -240,16 +284,16 @@ These estimates are based on the script's performance with my library of 6,481 t
## Importance of Accurate Metadata for Track Lookup
The effectiveness of this script heavily relies on the accuracy of the artist, album, and track titles in your music library. For Spotify to successfully recognize and match songs, these metadata details need to be precise.
The effectiveness of this script heavily relies on the accuracy of the artist, album, and track titles in your music library. For the supported providers to successfully recognize and match songs, these metadata details need to be precise.
I personally recommend using **MusicBrainz** to tag your music library. MusicBrainz is a comprehensive music database that provides reliable and standardized music metadata, which significantly enhances the accuracy of track matching with Spotify.
I personally recommend using **MusicBrainz** to tag your music library. MusicBrainz is a comprehensive music database that provides reliable and standardized music metadata, which significantly enhances the accuracy of track matching with any of the supported providers.
However, it's important to acknowledge that even with a perfectly tagged MusicBrainz library, discrepancies can still occur between Spotify and MusicBrainz data. This may result in the script missing some songs during the matching process.
To give you an idea of the matching accuracy you can expect, here are some statistics from my own library, which is tagged using MusicBrainz:
- **Total Tracks**: 6,481
- **Tracks Found on Spotify**: 6,390
- **Tracks Matched**: 6,390
- **Tracks Not Found**: 91
- **Match Percentage**: 98.6%
@@ -261,9 +305,9 @@ Logs are stored in the `logs` directory, and each script execution creates a new
The script logs its actions in a straightforward format, using `p:49 → r:2` to summarize operations:
- `p:49` indicates the Spotify popularity score, where `49` is the specific score.
- `p:49` indicates the provider score, where `49` is the specific score.
- `` symbolizes the mapping performed by the script.
- `r:2` shows the Navidrome rating assigned based on the Spotify score.
- `r:2` shows the Navidrome rating assigned based on the provider score.
### Terminal Output Colors
+1 -1
View File
@@ -1 +1 @@
1.2.0
1.3.0
+7 -7
View File
@@ -6,14 +6,14 @@ set -e
# Read version from the VERSION file
VERSION=$(cat VERSION)
# Build the Docker image with the version tag
docker build -t krestaino/sptnr:$VERSION .
# Set up the builder instance (only needs to be done once, so you can comment this out after the first run)
# docker buildx create --name mybuilder --use
# docker buildx inspect mybuilder --bootstrap
# Tag the built image as latest
docker tag krestaino/sptnr:$VERSION krestaino/sptnr:latest
# Build and push the Docker image for both arm64 and amd64 platforms with the version tag
docker buildx build --platform linux/arm64,linux/amd64 -t krestaino/sptnr:$VERSION . --push
# Push both tags to the Docker registry
docker push krestaino/sptnr:$VERSION
docker push krestaino/sptnr:latest
# Build and push the 'latest' tag as well
docker buildx build --platform linux/arm64,linux/amd64 -t krestaino/sptnr:latest . --push
echo "Docker images tagged and pushed: $VERSION and latest"
+700 -113
View File
@@ -4,17 +4,21 @@ with open("VERSION", "r") as file:
import argparse
import base64
import json
import math
import logging
import os
import re
import sys
import time
import urllib.parse
import random
import tempfile
from dotenv import load_dotenv
import requests
from colorama import init, Fore, Style
from tqdm import tqdm
from datetime import timedelta
# Load environment variables from .env file if it exists
if os.path.exists(".env"):
@@ -23,12 +27,21 @@ if os.path.exists(".env"):
# Record the start time
start_time = time.time()
LOCK_DIR = "logs"
LOCK_FILENAME = "song_update_lock.json"
LOCK_FILE = LOCK_DIR + "/" + LOCK_FILENAME
# Config
NAV_BASE_URL = os.getenv("NAV_BASE_URL")
NAV_USER = os.getenv("NAV_USER")
NAV_PASS = os.getenv("NAV_PASS")
SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
LASTFM_API_KEY = os.getenv("LASTFM_API_KEY")
LASTFM_API_URL = "https://ws.audioscrobbler.com/2.0/"
LASTFM_ARTIST_TOP_TRACKS = {}
MUSICBRAINZ_API_URL = "https://musicbrainz.org/ws/2/"
MUSICBRAINZ_LAST_REQUEST_AT = 0
# Colors
LIGHT_PURPLE = Fore.MAGENTA + Style.BRIGHT
@@ -47,46 +60,112 @@ if not os.path.exists(LOG_DIR):
LOGFILE = os.path.join(LOG_DIR, f"spotify-popularity_{int(time.time())}.log")
class NoColorFormatter(logging.Formatter):
ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
def format(self, record):
record.msg = self.ansi_escape.sub("", record.msg)
return super(NoColorFormatter, self).format(record)
# Set up the stream handler (console logging) without timestamp
logging.basicConfig(
level=logging.INFO, format="%(message)s", handlers=[logging.StreamHandler()]
)
# Set up the file handler (file logging) with timestamp
file_handler = logging.FileHandler(LOGFILE, "a")
file_handler.setFormatter(NoColorFormatter("[%(asctime)s] %(message)s"))
logging.getLogger().addHandler(file_handler)
# Auth
assert NAV_PASS is not None
HEX_ENCODED_PASS = NAV_PASS.encode().hex()
TOKEN_AUTH = base64.b64encode(
f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode()
).decode()
TOKEN_URL = "https://accounts.spotify.com/api/token"
response = requests.post(
TOKEN_URL,
headers={"Authorization": f"Basic {TOKEN_AUTH}"},
data={"grant_type": "client_credentials"},
)
if response.status_code != 200:
error_info = response.json() # Assuming the error response is in JSON format
error_description = error_info.get("error_description", "Unknown error")
logging.error(
f"{LIGHT_RED}Spotify Authentication Error: {error_description}{RESET}"
)
sys.exit(1)
class SpotifyTokenManager:
def __init__(self, client_id, client_secret, token_url):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.token = None
self.expires_at = 0
self._authenticate()
SPOTIFY_TOKEN = response.json()["access_token"]
def _authenticate(self):
token_auth = base64.b64encode(
f"{self.client_id}:{self.client_secret}".encode()
).decode()
response = requests.post(
self.token_url,
headers={"Authorization": f"Basic {token_auth}"},
data={"grant_type": "client_credentials"},
)
if response.status_code != 200:
error_info = response.json()
error_description = error_info.get("error_description", "Unknown error")
logging.error(
f"{LIGHT_RED}Spotify Authentication Error: {error_description}{RESET}"
)
sys.exit(1)
token_data = response.json()
self.token = token_data["access_token"]
self.expires_at = time.time() + token_data["expires_in"] - 60 # refresh 1 min early
def get_token(self):
if time.time() >= self.expires_at:
self._authenticate()
return self.token
class SafeAsciiFormatter(logging.Formatter):
ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
def format(self, record):
rendered = super().format(record)
rendered = self.ansi_escape.sub("", rendered)
return rendered.encode("ascii", "backslashreplace").decode("ascii")
def load_lock():
if not os.path.exists(LOCK_FILE):
return {}
try:
with open(LOCK_FILE, "r") as f:
return json.load(f)
except json.JSONDecodeError:
logging.error(f"{LIGHT_RED}Lock file '{LOCK_FILE}' is corrupt or not valid JSON. Starting with an empty lock.{RESET}")
os.rename(LOCK_FILE, LOCK_FILE + ".corrupt")
return {}
except Exception as e:
logging.error(f"{LIGHT_RED}Error loading lock file '{LOCK_FILE}': {e}{RESET}")
return {}
def save_lock(lock):
# Write to a temp file first, then atomically replace the lock file
dir_name = os.path.dirname(LOCK_FILE) or "."
with tempfile.NamedTemporaryFile("w", dir=dir_name, delete=False) as tf:
json.dump(lock, tf)
tempname = tf.name
os.replace(tempname, LOCK_FILE)
def should_update(song_id):
lock_expiry_seconds = get_lock_expiry()
if lock_expiry_seconds == 0:
return True
last_update_ts = LOCK.get(song_id)
if not last_update_ts:
return True
return (time.time() - last_update_ts) > lock_expiry_seconds
def get_lock_expiry():
if (BASE_LOCK_DURATION == 0):
return 0 # No lock duration, force update every time
base_expiry = timedelta(days=BASE_LOCK_DURATION)
jitter = timedelta(hours=random.uniform(-LOCK_JITTER/2, LOCK_JITTER/2))
expiry = base_expiry + jitter
# Ensure expiry is at least 1 day
expiry = max(expiry, timedelta(days=1))
return expiry.total_seconds()
# Set up the stream handler (console logging) without timestamp
console_handler = logging.StreamHandler()
console_handler.setFormatter(SafeAsciiFormatter("%(message)s"))
logging.basicConfig(level=logging.INFO, handlers=[console_handler])
# Set up the file handler (file logging) with timestamp
file_handler = logging.FileHandler(LOGFILE, "a", encoding="ascii", errors="backslashreplace")
file_handler.setFormatter(SafeAsciiFormatter("[%(asctime)s] %(message)s"))
logging.getLogger().addHandler(file_handler)
# Authentication
spotify_token_manager = None
SPOTIFY_TOKEN = None
init(autoreset=True)
@@ -102,6 +181,7 @@ ARTISTS_PROCESSED = 0
TOTAL_TRACKS = 0
FOUND_AND_UPDATED = 0
NOT_FOUND = 0
SKIPPED_RATED = 0
UNMATCHED_TRACKS = []
# Parse arguments
@@ -142,6 +222,31 @@ parser.add_argument(
type=int,
help="limit to processing [NUM] artists from the start index",
)
parser.add_argument(
"-d",
"--lock-duration",
type=int,
default=7,
help="Number of days to lock song updates (0 to force update every time)",
)
parser.add_argument(
"-j",
"--lock-jitter",
type=int,
default=24,
help="Number of hours to add random jitter to the lock duration",
)
parser.add_argument(
"--provider",
choices=["spotify", "lastfm", "musicbrainz"],
default="spotify",
help="Popularity provider to use for updates",
)
parser.add_argument(
"--unrated-only",
action="store_true",
help="Only update songs that do not already have a Navidrome rating",
)
parser.add_argument(
"-v", "--version", action="version", version=f"%(prog)s {__version__}"
@@ -154,9 +259,32 @@ ARTIST_IDs = args.artist if args.artist else []
ALBUM_IDs = args.album if args.album else []
START = args.start
LIMIT = args.limit
BASE_LOCK_DURATION = args.lock_duration
LOCK_JITTER = args.lock_jitter
PROVIDER = args.provider
UNRATED_ONLY = args.unrated_only
# Build only the provider-specific client we actually need.
if PROVIDER == "spotify":
if not SPOTIFY_CLIENT_ID or not SPOTIFY_CLIENT_SECRET:
logging.error(f"{LIGHT_RED}Config Error: SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET are required when using --provider spotify.{RESET}")
sys.exit(1)
spotify_token_manager = SpotifyTokenManager(
SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, TOKEN_URL
)
assert spotify_token_manager is not None
SPOTIFY_TOKEN = spotify_token_manager.get_token()
elif PROVIDER == "lastfm":
if not LASTFM_API_KEY:
logging.error(f"{LIGHT_RED}Config Error: LASTFM_API_KEY is required when using --provider lastfm.{RESET}")
sys.exit(1)
logging.info(f"{BOLD}Version:{RESET} {LIGHT_YELLOW}sptnr v{__version__}{RESET}")
LOCK = load_lock()
SHOULD_DELAY = False
if args.preview:
logging.info(f"{LIGHT_YELLOW}Preview mode, no changes will be made.{RESET}")
PREVIEW = 1
@@ -171,7 +299,7 @@ if ARTIST_IDs and (START != 0 or LIMIT != 0):
if not args.preview:
logging.info(
f"{BOLD}Syncing Spotify {LIGHT_CYAN}popularity{RESET}{BOLD} with Navidrome {LIGHT_BLUE}rating{RESET}...{RESET}"
f"{BOLD}Syncing {PROVIDER.title()} {LIGHT_CYAN}popularity{RESET}{BOLD} with Navidrome {LIGHT_BLUE}rating{RESET}...{RESET}"
)
@@ -193,108 +321,552 @@ def url_encode(string):
return urllib.parse.quote_plus(string)
def get_rating_from_popularity(popularity):
popularity = float(popularity)
if popularity < 16.66:
# Convert the popularity-like score into Navidrome's 0-5 rating buckets.
def get_rating_from_popularity(provider_popularity):
provider_popularity = float(provider_popularity)
if provider_popularity < 16.66:
return 0
elif popularity < 33.33:
elif provider_popularity < 33.33:
return 1
elif popularity < 50:
elif provider_popularity < 50:
return 2
elif popularity < 66.66:
elif provider_popularity < 66.66:
return 3
elif popularity < 83.33:
elif provider_popularity < 83.33:
return 4
else:
return 5
# Read the current Navidrome rating for this track so unrated-only mode can skip already-rated songs.
def get_existing_rating(track_id):
nav_url = f"{NAV_BASE_URL}/rest/getSong?id={track_id}&u={NAV_USER}&p=enc:{HEX_ENCODED_PASS}&v=1.12.0&c=myapp&f=json"
try:
response = requests.get(nav_url, timeout=5)
response.raise_for_status()
data = response.json()
song = data["subsonic-response"]["song"]
rating_value = song.get("rating", song.get("userRating", 0))
if rating_value in (None, ""):
return 0
return int(rating_value)
except (requests.exceptions.RequestException, ValueError, KeyError, TypeError) as e:
logging.warning(f"{LIGHT_YELLOW}Unable to read existing rating for {track_id}: {e}{RESET}")
return None
# Process one track end-to-end: skip locked or already-rated songs, look up popularity, then write the Navidrome rating.
def process_track(track_id, artist_name, album, track_name):
def search_spotify(query):
# Declare global variables
global FOUND_AND_UPDATED, UNMATCHED_TRACKS, NOT_FOUND, TOTAL_TRACKS, SKIPPED_RATED
# If the user asked for unrated-only, skip anything that already has a Navidrome score.
existing_rating = get_existing_rating(track_id)
if UNRATED_ONLY and existing_rating not in (None, 0):
logging.info(
f" {LIGHT_YELLOW}Skipping{RESET} {track_name} (Navidrome Rating: {existing_rating})"
)
SKIPPED_RATED += 1
TOTAL_TRACKS += 1
return
if not should_update(track_id):
print(f"Skipping {track_name}, recently updated.")
return
def search_spotify(query, max_retries=3):
# search_spotify: query Spotify's search API and return the track object.
# The returned track includes a 0-100 `popularity` field, so no extra
# follow-up lookup is required for Spotify ratings.
global SHOULD_DELAY
SHOULD_DELAY = True
assert spotify_token_manager is not None
SPOTIFY_TOKEN = spotify_token_manager.get_token()
spotify_url = f"https://api.spotify.com/v1/search?q={query}&type=track&limit=1"
headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"}
try:
response = requests.get(spotify_url, headers=headers)
except requests.exceptions.ConnectionError:
logging.error(f"{LIGHT_RED}Spotify Error: Unable to reach server.{RESET}")
sys.exit(1)
for attempt in range(max_retries):
try:
response = requests.get(spotify_url, headers=headers, timeout=10)
# Handle rate limiting
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 5))
logging.warning(f"Rate limited. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
continue
# Handle server errors with retry
if response.status_code >= 500:
wait_time = (attempt + 1) * 2 # Exponential backoff factor
logging.warning(f"Spotify server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
response.raise_for_status()
return response.json()
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
wait_time = (attempt + 1) * 2
logging.warning(f"Connection error: {e}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
except requests.exceptions.RequestException as e:
logging.error(f"Request failed: {e}")
break
# If we get here, all retries failed
logging.error(f"Failed after {max_retries} attempts for query: {query}")
return None
if response.status_code != 200:
if response.status_code == 429:
logging.error(
f"{LIGHT_RED}Spotify Error {response.status_code}: Retry after {BOLD}{response.headers.get('Retry-After', 'some time')}s{RESET}"
def search_lastfm(track_artist, track_title, max_retries=3):
# search_lastfm: call Last.fm's track.getInfo and return the track info.
# Last.fm provides `playcount` and `listeners` in the track info; we blend
# those into a 0-100 popularity-like value - no separate rating lookup needed.
for attempt in range(max_retries):
try:
response = requests.get(
LASTFM_API_URL,
params={
"method": "track.getInfo",
"api_key": LASTFM_API_KEY,
"artist": track_artist,
"track": track_title,
"autocorrect": 1,
"format": "json",
},
timeout=10,
)
else:
logging.error(
f"{LIGHT_RED}Spotify Error {response.status_code}: {response.text}{RESET}"
data = response.json()
if data.get("error") == 29:
retry_after = (attempt + 1) * 2
logging.warning(
f"Last.fm rate limit exceeded. Attempt {attempt + 1}/{max_retries}. Waiting {retry_after}s..."
)
time.sleep(retry_after)
continue
if data.get("error"):
logging.warning(
f"Last.fm error {data.get('error')}: {data.get('message', 'Unknown error')}"
)
return None
return data
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
wait_time = (attempt + 1) * 2
logging.warning(f"Connection error: {e}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
except requests.exceptions.RequestException as e:
logging.error(f"Request failed: {e}")
break
logging.error(f"Failed after {max_retries} attempts for query: {track_artist} - {track_title}")
return None
def normalize_lastfm_title(title):
normalized = re.sub(r"[^\w\s]", " ", title.casefold())
return re.sub(r"\s+", " ", normalized).strip()
def get_lastfm_artist_top_tracks(track_artist, max_retries=3):
if track_artist in LASTFM_ARTIST_TOP_TRACKS:
return LASTFM_ARTIST_TOP_TRACKS[track_artist]
for attempt in range(max_retries):
try:
response = requests.get(
LASTFM_API_URL,
params={
"method": "artist.getTopTracks",
"api_key": LASTFM_API_KEY,
"artist": track_artist,
"autocorrect": 1,
"limit": 500,
"format": "json",
},
timeout=10,
)
sys.exit(1)
try:
return response.json()
except ValueError as e:
logging.error(
f"{LIGHT_RED}Spotify Error: Error decoding JSON from Spotify API: {e}{RESET}"
)
sys.exit(1)
data = response.json()
if data.get("error") == 29:
retry_after = (attempt + 1) * 2
logging.warning(
f"Last.fm rate limit exceeded. Attempt {attempt + 1}/{max_retries}. Waiting {retry_after}s..."
)
time.sleep(retry_after)
continue
if data.get("error"):
logging.warning(
f"Last.fm artist top tracks error {data.get('error')}: {data.get('message', 'Unknown error')}"
)
LASTFM_ARTIST_TOP_TRACKS[track_artist] = {}
return {}
tracks = data.get("toptracks", {}).get("track", [])
rank_by_title = {}
for index, track in enumerate(tracks):
track_name = track.get("name")
if not track_name:
continue
normalized_title = normalize_lastfm_title(track_name)
if normalized_title in rank_by_title:
continue
rank_by_title[normalized_title] = {
"rank": index + 1,
"name": track_name,
"listeners": max(0, int(track.get("listeners", 0))),
"playcount": max(0, int(track.get("playcount", 0))),
}
LASTFM_ARTIST_TOP_TRACKS[track_artist] = rank_by_title
return rank_by_title
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
wait_time = (attempt + 1) * 2
logging.warning(f"Connection error: {e}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
except requests.exceptions.RequestException as e:
logging.error(f"Request failed: {e}")
break
logging.error(f"Failed after {max_retries} attempts for Last.fm top tracks: {track_artist}")
LASTFM_ARTIST_TOP_TRACKS[track_artist] = {}
return {}
# MusicBrainz is using first recording/release here: we search recordings/release-group by artist/title,
# optionally narrow the search by album, and use the first recording/release that
# comes back for the rating lookup. The album only helps narrow the match;
# it does not use release-group's recordings/releases average ratings.
# search_musicbrainz: search MusicBrainz recordings and return metadata (including MBID).
# Note: the search response does NOT include ratings; use lookup_musicbrainz_rating()
# with the returned recording id to fetch the recording's rating value.
def search_musicbrainz(track_artist, track_title, track_album=None, max_retries=3):
def escape_lucene(value):
return value.replace('"', '\\"')
query_parts = [
f'recording:"{escape_lucene(track_title)}"',
f'artist:"{escape_lucene(track_artist)}"',
]
if track_album:
query_parts.append(f'release:"{escape_lucene(track_album)}"')
query = " AND ".join(query_parts)
search_context = f"artist={track_artist!r}, title={track_title!r}"
if track_album:
search_context += f", album={track_album!r}"
headers = {
"User-Agent": f"sptnr/{__version__} (https://github.com/krestaino/sptnr)",
"Accept": "application/json",
}
for attempt in range(max_retries):
try:
global MUSICBRAINZ_LAST_REQUEST_AT
elapsed = time.time() - MUSICBRAINZ_LAST_REQUEST_AT
if elapsed < 1:
time.sleep(1 - elapsed)
MUSICBRAINZ_LAST_REQUEST_AT = time.time()
response = requests.get(
f"{MUSICBRAINZ_API_URL}recording",
params={"query": query, "fmt": "json", "limit": 1},
headers=headers,
timeout=10,
)
if response.status_code == 429:
wait_time = (attempt + 1) * 2
logging.warning(
f"MusicBrainz rate limit exceeded. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s..."
)
time.sleep(wait_time)
continue
if response.status_code >= 500:
wait_time = (attempt + 1) * 2
logging.warning(
f"MusicBrainz server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s..."
)
time.sleep(wait_time)
continue
response.raise_for_status()
data = response.json()
recordings = data.get("recordings", [])
if recordings:
return data
return None
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
wait_time = (attempt + 1) * 2
logging.warning(
f"Connection error while searching MusicBrainz ({search_context}): {e}. "
f"Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s..."
)
time.sleep(wait_time)
continue
except requests.exceptions.RequestException as e:
logging.error(f"Request failed: {e}")
break
logging.error(
f"Failed after {max_retries} attempts for MusicBrainz search ({search_context})"
)
return None
# lookup_musicbrainz_rating: fetch the recording/{id}?inc=ratings endpoint to
# retrieve the recording's rating (value is 0-5). This is separate from the
# search step because MusicBrainz intentionally exposes ratings on the
# recording lookup endpoint only.
def lookup_musicbrainz_rating(recording_id, max_retries=3):
rating_context = f"recording_id={recording_id!r}"
headers = {
"User-Agent": f"sptnr/{__version__} (https://github.com/krestaino/sptnr)",
"Accept": "application/json",
}
for attempt in range(max_retries):
try:
global MUSICBRAINZ_LAST_REQUEST_AT
elapsed = time.time() - MUSICBRAINZ_LAST_REQUEST_AT
if elapsed < 1:
time.sleep(1 - elapsed)
MUSICBRAINZ_LAST_REQUEST_AT = time.time()
response = requests.get(
f"{MUSICBRAINZ_API_URL}recording/{recording_id}",
params={"inc": "ratings", "fmt": "json"},
headers=headers,
timeout=10,
)
if response.status_code == 429:
wait_time = (attempt + 1) * 2
logging.warning(
f"MusicBrainz rate limit exceeded. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s..."
)
time.sleep(wait_time)
continue
if response.status_code >= 500:
wait_time = (attempt + 1) * 2
logging.warning(
f"MusicBrainz server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s..."
)
time.sleep(wait_time)
continue
response.raise_for_status()
recording_data = response.json()
recording = recording_data.get("recording", recording_data)
if not isinstance(recording, dict):
return None
for key in ("rating", "user-rating", "user_rating", "userRating"):
value = recording.get(key)
if isinstance(value, dict):
for nested_key in ("value", "rating", "user-rating", "user_rating"):
nested_value = value.get(nested_key)
if nested_value not in (None, ""):
return nested_value
elif value not in (None, ""):
return value
return None
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
wait_time = (attempt + 1) * 2
logging.warning(
f"Connection error while looking up MusicBrainz rating ({rating_context}): {e}. "
f"Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s..."
)
time.sleep(wait_time)
continue
except requests.exceptions.RequestException as e:
logging.error(f"Request failed: {e}")
break
logging.error(
f"Failed after {max_retries} attempts for MusicBrainz rating lookup ({rating_context})"
)
return None
def remove_parentheses_content(s):
"""Remove content inside parentheses from a string."""
return re.sub(r"\s*\(.*?\)\s*", " ", s).strip()
# Only remove parentheses if they do NOT contain important keywords
keywords = ["remix", "instrumental", "edit", "version", "mix", "karaoke", "live", "acoustic", "demo"]
def replacer(match):
content = match.group(1).lower()
if any(k in content for k in keywords):
return f"({match.group(1)})" # Keep it
return ""
return re.sub(r"\((.*?)\)", replacer, s).strip()
encoded_track_name = url_encode(track_name)
encoded_artist_name = url_encode(artist_name)
encoded_album = url_encode(album)
search_attempts = [
# Primary attempt with all info
lambda: f"{url_encode(track_name)}%20artist:{url_encode(artist_name)}%20album:{url_encode(album)}",
# Secondary attempt without album
lambda: f"{url_encode(remove_parentheses_content(track_name))}%20artist:{url_encode(artist_name)}",
# Tertiary attempt with modified track name
lambda: f"{url_encode(track_name.replace('Part', 'Pt.'))}%20artist:{url_encode(artist_name)}"
]
# Primary Search (with album)
spotify_data = search_spotify(
f"{encoded_track_name}%20artist:{encoded_artist_name}%20album:{encoded_album}"
spotify_popularity = None
lastfm_popularity = None
musicbrainz_rating = None
navidrome_rating = None
sp_track_name = track_name
if PROVIDER == "spotify":
# Spotify gives us popularity directly, so we only need to find the best matching track.
provider_data = None
for attempt in search_attempts:
provider_data = search_spotify(attempt())
if provider_data and provider_data.get("tracks", {}).get("items"):
break
if provider_data and provider_data.get("tracks", {}).get("items"):
track = provider_data["tracks"]["items"][0]
spotify_popularity = track.get("popularity", 0)
sp_track_name = track["name"]
elif PROVIDER == "lastfm":
# Last.fm exposes listeners and playcount instead of a Spotify-style popularity score.
# Artist top-track position drives the main score, global listener reach keeps scale
# across artists, and plays per listener adds a small capped engagement bonus.
lastfm_attempts = [
(artist_name, track_name),
(artist_name, remove_parentheses_content(track_name)),
(artist_name, track_name.replace("Part", "Pt.")),
]
provider_data = None
for attempt_artist, attempt_title in lastfm_attempts:
provider_data = search_lastfm(attempt_artist, attempt_title)
if provider_data and provider_data.get("track"):
break
if provider_data and provider_data.get("track"):
track = provider_data["track"]
playcount = max(0, int(track.get("playcount", 0)))
listeners = max(0, int(track.get("listeners", 0)))
top_tracks = get_lastfm_artist_top_tracks(artist_name)
matched_title = track.get("name", track_name)
top_track = top_tracks.get(normalize_lastfm_title(matched_title))
if not top_track:
top_track = top_tracks.get(normalize_lastfm_title(track_name))
top_track_position = None
if top_track:
top_track_position = top_track["rank"]
matched_title = top_track["name"]
listeners = max(listeners, top_track["listeners"])
playcount = max(playcount, top_track["playcount"])
top_track_position_score = 0
if top_track_position:
top_track_position_score = max(0, 100 - (math.log2(top_track_position) * 10))
if listeners == 0 or playcount == 0:
reach_score = 0
engagement_bonus = 0
else:
plays_per_listener = playcount / listeners
reach_score = min(90, math.log10(listeners + 1) * 13)
engagement_bonus = min(12, math.log2(plays_per_listener) * 4)
lastfm_popularity = round(
min(
100,
(top_track_position_score * 0.50)
+ (reach_score * 0.40)
+ (engagement_bonus * 0.10),
)
)
sp_track_name = matched_title
else: # MusicBrainz ratings come from a lookup on the matched recording MBID.
mb_attempts = [
(artist_name, track_name, album),
(artist_name, remove_parentheses_content(track_name), album),
(artist_name, track_name.replace("Part", "Pt."), album),
(artist_name, track_name, None),
]
provider_data = None
for attempt_artist, attempt_title, attempt_album in mb_attempts:
provider_data = search_musicbrainz(attempt_artist, attempt_title, attempt_album)
if provider_data and provider_data.get("recordings"):
break
if provider_data and provider_data.get("recordings"):
track = provider_data["recordings"][0]
musicbrainz_rating = lookup_musicbrainz_rating(track.get("id"))
musicbrainz_rating = max(0.0, min(5.0, float(musicbrainz_rating or 0)))
navidrome_rating = 0 if musicbrainz_rating == 0 else max(1, int(musicbrainz_rating + 0.5))
sp_track_name = track.get("title", track_name)
provider_value = (
spotify_popularity
if spotify_popularity is not None
else lastfm_popularity
if lastfm_popularity is not None
else musicbrainz_rating
)
found_track = len(spotify_data.get("tracks", {}).get("items", [])) > 0
if provider_value is not None:
# MusicBrainz ratings are already 0-5, so use directly; others are 0-100 and need mapping.
navidrome_rating = navidrome_rating if PROVIDER == "musicbrainz" else get_rating_from_popularity(provider_value)
provider_value_str = f"{provider_value} " if 0 <= provider_value <= 9 else str(provider_value)
source_label = "s" if PROVIDER == "musicbrainz" else "p"
if not found_track:
# Secondary Search (without album and parentheses content)
sanitized_track_name = url_encode(remove_parentheses_content(track_name))
spotify_data = search_spotify(
f"{sanitized_track_name}%20artist:{encoded_artist_name}"
)
found_track = len(spotify_data.get("tracks", {}).get("items", [])) > 0
if navidrome_rating == 0:
logging.info(f" {source_label}:{LIGHT_CYAN}{provider_value_str}{RESET} | Skipping {track_name} (Navidrome Rating: 0)")
else:
logging.info(f" {source_label}:{LIGHT_CYAN}{provider_value_str}{RESET} -> r:{LIGHT_BLUE}{navidrome_rating}{RESET} | {LIGHT_GREEN}{track_name} - {sp_track_name}{RESET}")
if not found_track:
# Tertiary Search (replace 'Part' with 'Pt.')
modified_track_name = track_name.replace("Part", "Pt.")
encoded_modified_track_name = url_encode(modified_track_name)
spotify_data = search_spotify(
f"{encoded_modified_track_name}%20artist:{encoded_artist_name}"
)
found_track = len(spotify_data.get("tracks", {}).get("items", []))
if found_track:
popularity = spotify_data["tracks"]["items"][0].get("popularity", 0)
rating = get_rating_from_popularity(popularity)
popularity_str = f"{popularity} " if 0 <= popularity <= 9 else str(popularity)
message = f" p:{LIGHT_CYAN}{popularity_str}{RESET} → r:{LIGHT_BLUE}{rating}{RESET} | {LIGHT_GREEN}{track_name}{RESET}"
logging.info(message)
if PREVIEW != 1:
nav_url = f"{NAV_BASE_URL}/rest/setRating?u={NAV_USER}&p=enc:{HEX_ENCODED_PASS}&v=1.12.0&c=myapp&id={track_id}&rating={rating}"
requests.get(nav_url)
global FOUND_AND_UPDATED
FOUND_AND_UPDATED += 1
if navidrome_rating == 0:
pass
elif PREVIEW != 1:
try:
nav_url = f"{NAV_BASE_URL}/rest/setRating?u={NAV_USER}&p=enc:{HEX_ENCODED_PASS}&v=1.12.0&c=myapp&id={track_id}&rating={navidrome_rating}"
requests.get(nav_url, timeout=5)
FOUND_AND_UPDATED += 1
LOCK[track_id] = time.time()
save_lock(LOCK)
except requests.exceptions.RequestException as e:
logging.error(f"Failed to update rating in Navidrome: {e}")
else:
logging.info(
f" p:{LIGHT_RED}??{RESET} → r:{LIGHT_BLUE}0{RESET} | {LIGHT_RED}(not found) {track_name}{RESET}"
)
global UNMATCHED_TRACKS
logging.info(f" p:{LIGHT_RED}??{RESET} -> r:{LIGHT_BLUE}0{RESET} | {LIGHT_RED}(not found) {track_name}{RESET}")
UNMATCHED_TRACKS.append(f"{artist_name} - {album} - {track_name}")
global NOT_FOUND
NOT_FOUND += 1
global TOTAL_TRACKS
if PREVIEW != 1:
try:
nav_url = f"{NAV_BASE_URL}/rest/setRating?u={NAV_USER}&p=enc:{HEX_ENCODED_PASS}&v=1.12.0&c=myapp&id={track_id}&rating=0"
requests.get(nav_url, timeout=5)
LOCK[track_id] = time.time()
save_lock(LOCK)
except requests.exceptions.RequestException as e:
logging.error(f"Failed to update rating in Navidrome: {e}")
TOTAL_TRACKS += 1
def process_album(album_id):
global SHOULD_DELAY
if SHOULD_DELAY:
# sleep for a short time to avoid hitting rate limits too quickly
time.sleep(4)
SHOULD_DELAY = False
nav_url = f"{NAV_BASE_URL}/rest/getAlbum?id={album_id}&u={NAV_USER}&p=enc:{HEX_ENCODED_PASS}&v=1.12.0&c=spotify_sync&f=json"
response = requests.get(nav_url).json()
@@ -417,7 +989,7 @@ else:
logging.info("")
logging.info(
f"Artist: {LIGHT_PURPLE}{ARTIST_NAME}{RESET} ({ARTIST_ID})[{index}]"
f"Artist: {LIGHT_PURPLE}{ARTIST_NAME}{RESET} ({ARTIST_ID})[{index+args.start}]"
)
process_artist(ARTIST_ID)
@@ -426,15 +998,21 @@ else:
# Display the results
logging.info("")
MATCH_PERCENTAGE = (FOUND_AND_UPDATED / TOTAL_TRACKS) * 100 if TOTAL_TRACKS != 0 else 0
processable_tracks = TOTAL_TRACKS - SKIPPED_RATED if UNRATED_ONLY else TOTAL_TRACKS
MATCH_PERCENTAGE = (FOUND_AND_UPDATED / processable_tracks) * 100 if processable_tracks != 0 else 0
FORMATTED_MATCH_PERCENTAGE = round(MATCH_PERCENTAGE, 2) # Rounding to 2 decimal places
TOTAL_BLOCKS = 20
color_found = LIGHT_GREEN if FOUND_AND_UPDATED == TOTAL_TRACKS else LIGHT_YELLOW
color_found_white = LIGHT_GREEN if FOUND_AND_UPDATED == TOTAL_TRACKS else BOLD
color_found = LIGHT_GREEN if FOUND_AND_UPDATED == processable_tracks else LIGHT_YELLOW
color_found_white = LIGHT_GREEN if FOUND_AND_UPDATED == processable_tracks else BOLD
color_not_found = LIGHT_GREEN if NOT_FOUND == 0 else LIGHT_RED
blocks_found = "" * round(FOUND_AND_UPDATED * TOTAL_BLOCKS / TOTAL_TRACKS)
blocks_not_found = "" * (TOTAL_BLOCKS - len(blocks_found))
if processable_tracks == 0:
blocks_found = ""
blocks_not_found = ""
else:
blocks_found = "#" * round(FOUND_AND_UPDATED * TOTAL_BLOCKS / processable_tracks)
blocks_not_found = "-" * (TOTAL_BLOCKS - len(blocks_found))
full_blocks_found = f"{color_found_white}{blocks_found}{RESET}"
full_blocks_not_found = f"{color_not_found}{blocks_not_found}{RESET}"
@@ -454,6 +1032,15 @@ if seconds or not parts: # Show seconds if it's the only value, even if it's 0
formatted_elapsed_time = " ".join(parts)
# logging.info(f"Processing completed in {int(hours):02}:{int(minutes):02}:{int(seconds):02}")
logging.info(
f"Tracks: {LIGHT_PURPLE}{TOTAL_TRACKS}{RESET} | Found: {color_found}{FOUND_AND_UPDATED}{RESET} |{full_blocks_found}{full_blocks_not_found}| Not Found: {color_not_found}{NOT_FOUND}{RESET} | Match: {color_found}{FORMATTED_MATCH_PERCENTAGE}%{RESET} | Time: {LIGHT_PURPLE}{formatted_elapsed_time}{RESET}"
summary_line = (
f"Tracks: {LIGHT_PURPLE}{TOTAL_TRACKS}{RESET} | "
f"Found: {color_found}{FOUND_AND_UPDATED}{RESET}"
)
if SKIPPED_RATED:
summary_line += f" | Skipped: {LIGHT_YELLOW}{SKIPPED_RATED}{RESET}"
summary_line += (
f" | Not Found: {color_not_found}{NOT_FOUND}{RESET} | "
f"Match: {color_found}{FORMATTED_MATCH_PERCENTAGE}%{RESET} | "
f"Time: {LIGHT_PURPLE}{formatted_elapsed_time}{RESET}"
)
logging.info(summary_line)