forked from CopyBot/sptnr
Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 63ea596a9f | |||
| b6fd70b05a | |||
| 4581c5ac20 | |||
| fe0cc31b0d | |||
| 23ba6bb4ee | |||
| e3997167e3 | |||
| c1c20adbfb | |||
| 26e22cd385 | |||
| c52a6eeb9e | |||
| 76e2bca65d | |||
| 173f893410 | |||
| 1896c500df | |||
| ba0c066bf8 | |||
| 4599a8c392 | |||
| 8aec18a580 | |||
| ffc501a48d | |||
| e20f9155cb | |||
| 816444e309 | |||
| fce99b39cb | |||
| f3fe277017 |
@@ -3,3 +3,4 @@ NAV_USER=your_navidrome_username
|
||||
NAV_PASS=your_navidrome_password
|
||||
SPOTIFY_CLIENT_ID=your_spotify_client_id
|
||||
SPOTIFY_CLIENT_SECRET=your_spotify_client_secret
|
||||
LASTFM_API_KEY=your_lastfm_api_key
|
||||
|
||||
+3
-1
@@ -1,3 +1,5 @@
|
||||
logs/*.log
|
||||
logs/
|
||||
__pycache__/
|
||||
.venv/
|
||||
.env
|
||||
docker-compose.yml
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2024 Kevin Restaino
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -15,7 +15,7 @@ This script was developed as a solution to repurpose the star ratings in Navidro
|
||||
7. [Examples](#examples)
|
||||
8. [Resuming Interrupted Sessions](#resuming-interrupted-sessions)
|
||||
9. [Managing Docker Containers](#managing-docker-containers)
|
||||
10. [Mapping Spotify Popularity to Navidrome Ratings](#mapping-spotify-popularity-to-navidrome-ratings)
|
||||
10. [Mapping Spotify/Lastfm/MusicBrainz Popularity to Navidrome Ratings](#mapping-provider-scores-to-navidrome-ratings)
|
||||
11. [Estimated Processing Times](#estimated-processing-times)
|
||||
12. [Importance of Accurate Metadata for Track Lookup](#importance-of-accurate-metadata-for-track-lookup)
|
||||
13. [Logs](#logs)
|
||||
@@ -23,8 +23,11 @@ This script was developed as a solution to repurpose the star ratings in Navidro
|
||||
## Features
|
||||
|
||||
- **Spotify Integration**: Connects to Spotify's API to fetch popularity data for tracks.
|
||||
- **Navidrome Integration**: Updates track ratings in Navidrome based on Spotify popularity.
|
||||
- **Last.fm Support**: Can use Last.fm artist top-track position, listeners, and playcounts as an alternate popularity source. (This is a great option from spotify, will pull ratings on most songs.)
|
||||
- **MusicBrainz Support**: Can use MusicBrainz community ratings as a third rating source. (Currently not a lot of ratings available, but worth having as an option for the future.)
|
||||
- **Navidrome Integration**: Updates track ratings in Navidrome based on provider scores.
|
||||
- **Flexible Processing**: Process specific artists, albums, or a range of artists or albums.
|
||||
- **Unrated-Only Mode**: Skip tracks that already have a rating and only update unrated songs.
|
||||
- **Preview Mode**: Run the script in preview mode to see changes without making any actual updates.
|
||||
- **Logging**: Detailed logging of the process, both in the console and to a file.
|
||||
- **Docker Support**: Run the script in a Docker container for consistent environments and ease of use.
|
||||
@@ -32,8 +35,9 @@ This script was developed as a solution to repurpose the star ratings in Navidro
|
||||
## Requirements
|
||||
|
||||
- Python 3.x or Docker
|
||||
- A Spotify Developer account with an API key ([Get your Spotify API key here](https://developer.spotify.com/dashboard/create))
|
||||
- Access to a Navidrome server
|
||||
- Optional: A Spotify Developer account with an API key ([Get your Spotify API key here](https://developer.spotify.com/dashboard/create))
|
||||
- Optional: a Last.fm API key if you want to use Last.fm as the popularity provider
|
||||
|
||||
**Compatibility Note**: While this script was built with Navidrome in mind, it should theoretically work on any Subsonic server. If you successfully use it with other Subsonic servers, please open an issue to let me know, so I can document it and assist others.
|
||||
|
||||
@@ -51,6 +55,8 @@ docker run -t \
|
||||
krestaino/sptnr:latest
|
||||
```
|
||||
|
||||
If you want to use Last.fm instead of Spotify, add `-e LASTFM_API_KEY=your_lastfm_api_key` and run with `--provider lastfm`.
|
||||
|
||||
**Note**: The `-t` flag is used to allocate a pseudo-terminal which assists in displaying colored and bold text in the terminal output, which this script uses.
|
||||
|
||||
### Using Docker Compose
|
||||
@@ -138,6 +144,9 @@ The script supports various options for flexible usage. Below are examples of ho
|
||||
- `-b, --album ALBUM_ID`: Process a specific album. Multiple albums can be specified.
|
||||
- `-s, --start START_INDEX`: Start processing from the artist at the specified index (0-based).
|
||||
- `-l, --limit LIMIT`: Limit the processing to a specific number of artists from the start index.
|
||||
- `-d, --lock-duration DURATION`: Number of days to lock song updates (0 to force update every time).
|
||||
- `--provider spotify|lastfm|musicbrainz`: Choose the source used to derive the rating. Spotify is the default.
|
||||
- `--unrated-only`: Only update songs that do not already have a Navidrome rating.
|
||||
|
||||
### Command Formats
|
||||
|
||||
@@ -187,6 +196,32 @@ The script supports various options for flexible usage. Below are examples of ho
|
||||
- Docker Compose: `docker-compose run sptnr -s 10 -l 5`
|
||||
- Docker Run: `docker run -t [env vars] krestaino/sptnr:latest -s 10 -l 5`
|
||||
|
||||
- **Only Update Unrated Songs**:
|
||||
Skip tracks that already have a rating in Navidrome.
|
||||
- Python: `python sptnr.py --unrated-only`
|
||||
- Docker Compose: `docker-compose run sptnr --unrated-only`
|
||||
- Docker Run: `docker run -t [env vars] krestaino/sptnr:latest --unrated-only`
|
||||
|
||||
- **Use Last.fm as the Provider**:
|
||||
Use Last.fm artist top-track position, listener, and playcount data instead of Spotify popularity.
|
||||
- Python: `LASTFM_API_KEY=... python sptnr.py --provider lastfm`
|
||||
- Docker Compose: `docker-compose run -e LASTFM_API_KEY=... sptnr --provider lastfm`
|
||||
- Docker Run: `docker run -t -e LASTFM_API_KEY=... [env vars] krestaino/sptnr:latest --provider lastfm`
|
||||
|
||||
Last.fm does not provide a Spotify-style popularity number, so the script derives one from three signals:
|
||||
|
||||
- **Artist top-track position**: where the track appears in Last.fm's `artist.getTopTracks` results for that artist. This is the largest part of the score.
|
||||
- **Listener reach**: the track's unique Last.fm listener count, scaled logarithmically.
|
||||
- **Replay engagement**: plays per listener, used as a small capped bonus.
|
||||
|
||||
This keeps ordinary catalog tracks mostly in the 2-3 star range while letting occasional artist standouts reach 4-5 stars.
|
||||
|
||||
- **Use MusicBrainz as the Provider**:
|
||||
Use MusicBrainz ratings to derive the same 0-5 Navidrome rating.
|
||||
- Python: `python sptnr.py --provider musicbrainz`
|
||||
- Docker Compose: `docker-compose run sptnr --provider musicbrainz`
|
||||
- Docker Run: `docker run -t [env vars] krestaino/sptnr:latest --provider musicbrainz`
|
||||
|
||||
## Resuming Interrupted Sessions
|
||||
|
||||
In cases where your session gets interrupted - for instance, if your machine goes to sleep, you encounter rate limits from Spotify, or for any other reason that causes the script to not complete - you have the option to resume from where you left off.
|
||||
@@ -213,9 +248,18 @@ In this project, `docker-compose run` is used instead of `docker-compose up`. Th
|
||||
docker container prune
|
||||
```
|
||||
|
||||
## Mapping Spotify Popularity to Navidrome Ratings
|
||||
## Mapping Provider Scores to Navidrome Ratings
|
||||
|
||||
The script translates Spotify's popularity metric, which ranges from 0 to 100, into Navidrome's 5-star rating system. This conversion allows you to quickly gauge a track's popularity on Spotify directly within Navidrome. The mapping is as follows:
|
||||
The script translates provider scores into Navidrome's 5-star rating system. Each provider gets its score a little differently:
|
||||
|
||||
- **Spotify**: Uses Spotify's native track `popularity` value, which is already a 0 to 100 score.
|
||||
- **Last.fm**: Derives a 0 to 100 score from three signals:
|
||||
- artist top-track position from Last.fm's `artist.getTopTracks` results
|
||||
- listener reach from the track's unique listener count
|
||||
- replay engagement from plays per listener
|
||||
- **MusicBrainz**: Reads the community recording rating from MusicBrainz's recording lookup endpoint. MusicBrainz ratings are already on a 0 to 5 scale, so they are rounded directly to Navidrome's 0 to 5 rating range instead of using the 0 to 100 mapping below.
|
||||
|
||||
For Spotify and Last.fm, the resulting 0 to 100 score is mapped as follows:
|
||||
|
||||
- **0 to 16**: Mapped to 0 stars in Navidrome (Not popular)
|
||||
- **17 to 33**: Mapped to 1 star (Low popularity)
|
||||
@@ -240,16 +284,16 @@ These estimates are based on the script's performance with my library of 6,481 t
|
||||
|
||||
## Importance of Accurate Metadata for Track Lookup
|
||||
|
||||
The effectiveness of this script heavily relies on the accuracy of the artist, album, and track titles in your music library. For Spotify to successfully recognize and match songs, these metadata details need to be precise.
|
||||
The effectiveness of this script heavily relies on the accuracy of the artist, album, and track titles in your music library. For the supported providers to successfully recognize and match songs, these metadata details need to be precise.
|
||||
|
||||
I personally recommend using **MusicBrainz** to tag your music library. MusicBrainz is a comprehensive music database that provides reliable and standardized music metadata, which significantly enhances the accuracy of track matching with Spotify.
|
||||
I personally recommend using **MusicBrainz** to tag your music library. MusicBrainz is a comprehensive music database that provides reliable and standardized music metadata, which significantly enhances the accuracy of track matching with any of the supported providers.
|
||||
|
||||
However, it's important to acknowledge that even with a perfectly tagged MusicBrainz library, discrepancies can still occur between Spotify and MusicBrainz data. This may result in the script missing some songs during the matching process.
|
||||
|
||||
To give you an idea of the matching accuracy you can expect, here are some statistics from my own library, which is tagged using MusicBrainz:
|
||||
|
||||
- **Total Tracks**: 6,481
|
||||
- **Tracks Found on Spotify**: 6,390
|
||||
- **Tracks Matched**: 6,390
|
||||
- **Tracks Not Found**: 91
|
||||
- **Match Percentage**: 98.6%
|
||||
|
||||
@@ -261,9 +305,9 @@ Logs are stored in the `logs` directory, and each script execution creates a new
|
||||
|
||||
The script logs its actions in a straightforward format, using `p:49 → r:2` to summarize operations:
|
||||
|
||||
- `p:49` indicates the Spotify popularity score, where `49` is the specific score.
|
||||
- `p:49` indicates the provider score, where `49` is the specific score.
|
||||
- `→` symbolizes the mapping performed by the script.
|
||||
- `r:2` shows the Navidrome rating assigned based on the Spotify score.
|
||||
- `r:2` shows the Navidrome rating assigned based on the provider score.
|
||||
|
||||
### Terminal Output Colors
|
||||
|
||||
|
||||
Executable
+19
@@ -0,0 +1,19 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Ensure the script stops if there is an error
|
||||
set -e
|
||||
|
||||
# Read version from the VERSION file
|
||||
VERSION=$(cat VERSION)
|
||||
|
||||
# Set up the builder instance (only needs to be done once, so you can comment this out after the first run)
|
||||
# docker buildx create --name mybuilder --use
|
||||
# docker buildx inspect mybuilder --bootstrap
|
||||
|
||||
# Build and push the Docker image for both arm64 and amd64 platforms with the version tag
|
||||
docker buildx build --platform linux/arm64,linux/amd64 -t krestaino/sptnr:$VERSION . --push
|
||||
|
||||
# Build and push the 'latest' tag as well
|
||||
docker buildx build --platform linux/arm64,linux/amd64 -t krestaino/sptnr:latest . --push
|
||||
|
||||
echo "Docker images tagged and pushed: $VERSION and latest"
|
||||
@@ -4,17 +4,21 @@ with open("VERSION", "r") as file:
|
||||
import argparse
|
||||
import base64
|
||||
import json
|
||||
import math
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import urllib.parse
|
||||
import random
|
||||
import tempfile
|
||||
|
||||
from dotenv import load_dotenv
|
||||
import requests
|
||||
from colorama import init, Fore, Style
|
||||
from tqdm import tqdm
|
||||
from datetime import timedelta
|
||||
|
||||
# Load environment variables from .env file if it exists
|
||||
if os.path.exists(".env"):
|
||||
@@ -23,12 +27,21 @@ if os.path.exists(".env"):
|
||||
# Record the start time
|
||||
start_time = time.time()
|
||||
|
||||
LOCK_DIR = "logs"
|
||||
LOCK_FILENAME = "song_update_lock.json"
|
||||
LOCK_FILE = LOCK_DIR + "/" + LOCK_FILENAME
|
||||
|
||||
# Config
|
||||
NAV_BASE_URL = os.getenv("NAV_BASE_URL")
|
||||
NAV_USER = os.getenv("NAV_USER")
|
||||
NAV_PASS = os.getenv("NAV_PASS")
|
||||
SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
|
||||
SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
|
||||
LASTFM_API_KEY = os.getenv("LASTFM_API_KEY")
|
||||
LASTFM_API_URL = "https://ws.audioscrobbler.com/2.0/"
|
||||
LASTFM_ARTIST_TOP_TRACKS = {}
|
||||
MUSICBRAINZ_API_URL = "https://musicbrainz.org/ws/2/"
|
||||
MUSICBRAINZ_LAST_REQUEST_AT = 0
|
||||
|
||||
# Colors
|
||||
LIGHT_PURPLE = Fore.MAGENTA + Style.BRIGHT
|
||||
@@ -47,46 +60,112 @@ if not os.path.exists(LOG_DIR):
|
||||
|
||||
LOGFILE = os.path.join(LOG_DIR, f"spotify-popularity_{int(time.time())}.log")
|
||||
|
||||
|
||||
class NoColorFormatter(logging.Formatter):
|
||||
ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
|
||||
|
||||
def format(self, record):
|
||||
record.msg = self.ansi_escape.sub("", record.msg)
|
||||
return super(NoColorFormatter, self).format(record)
|
||||
|
||||
|
||||
# Set up the stream handler (console logging) without timestamp
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(message)s", handlers=[logging.StreamHandler()]
|
||||
)
|
||||
|
||||
# Set up the file handler (file logging) with timestamp
|
||||
file_handler = logging.FileHandler(LOGFILE, "a")
|
||||
file_handler.setFormatter(NoColorFormatter("[%(asctime)s] %(message)s"))
|
||||
logging.getLogger().addHandler(file_handler)
|
||||
|
||||
# Auth
|
||||
assert NAV_PASS is not None
|
||||
HEX_ENCODED_PASS = NAV_PASS.encode().hex()
|
||||
TOKEN_AUTH = base64.b64encode(
|
||||
f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode()
|
||||
).decode()
|
||||
TOKEN_URL = "https://accounts.spotify.com/api/token"
|
||||
response = requests.post(
|
||||
TOKEN_URL,
|
||||
headers={"Authorization": f"Basic {TOKEN_AUTH}"},
|
||||
data={"grant_type": "client_credentials"},
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
error_info = response.json() # Assuming the error response is in JSON format
|
||||
error_description = error_info.get("error_description", "Unknown error")
|
||||
logging.error(
|
||||
f"{LIGHT_RED}Spotify Authentication Error: {error_description}{RESET}"
|
||||
)
|
||||
sys.exit(1)
|
||||
class SpotifyTokenManager:
|
||||
def __init__(self, client_id, client_secret, token_url):
|
||||
self.client_id = client_id
|
||||
self.client_secret = client_secret
|
||||
self.token_url = token_url
|
||||
self.token = None
|
||||
self.expires_at = 0
|
||||
self._authenticate()
|
||||
|
||||
SPOTIFY_TOKEN = response.json()["access_token"]
|
||||
def _authenticate(self):
|
||||
token_auth = base64.b64encode(
|
||||
f"{self.client_id}:{self.client_secret}".encode()
|
||||
).decode()
|
||||
response = requests.post(
|
||||
self.token_url,
|
||||
headers={"Authorization": f"Basic {token_auth}"},
|
||||
data={"grant_type": "client_credentials"},
|
||||
)
|
||||
if response.status_code != 200:
|
||||
error_info = response.json()
|
||||
error_description = error_info.get("error_description", "Unknown error")
|
||||
logging.error(
|
||||
f"{LIGHT_RED}Spotify Authentication Error: {error_description}{RESET}"
|
||||
)
|
||||
sys.exit(1)
|
||||
token_data = response.json()
|
||||
self.token = token_data["access_token"]
|
||||
self.expires_at = time.time() + token_data["expires_in"] - 60 # refresh 1 min early
|
||||
|
||||
def get_token(self):
|
||||
if time.time() >= self.expires_at:
|
||||
self._authenticate()
|
||||
return self.token
|
||||
|
||||
class SafeAsciiFormatter(logging.Formatter):
|
||||
ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
|
||||
|
||||
def format(self, record):
|
||||
rendered = super().format(record)
|
||||
rendered = self.ansi_escape.sub("", rendered)
|
||||
return rendered.encode("ascii", "backslashreplace").decode("ascii")
|
||||
|
||||
|
||||
def load_lock():
|
||||
if not os.path.exists(LOCK_FILE):
|
||||
return {}
|
||||
try:
|
||||
with open(LOCK_FILE, "r") as f:
|
||||
return json.load(f)
|
||||
except json.JSONDecodeError:
|
||||
logging.error(f"{LIGHT_RED}Lock file '{LOCK_FILE}' is corrupt or not valid JSON. Starting with an empty lock.{RESET}")
|
||||
os.rename(LOCK_FILE, LOCK_FILE + ".corrupt")
|
||||
return {}
|
||||
except Exception as e:
|
||||
logging.error(f"{LIGHT_RED}Error loading lock file '{LOCK_FILE}': {e}{RESET}")
|
||||
return {}
|
||||
|
||||
def save_lock(lock):
|
||||
# Write to a temp file first, then atomically replace the lock file
|
||||
dir_name = os.path.dirname(LOCK_FILE) or "."
|
||||
with tempfile.NamedTemporaryFile("w", dir=dir_name, delete=False) as tf:
|
||||
json.dump(lock, tf)
|
||||
tempname = tf.name
|
||||
os.replace(tempname, LOCK_FILE)
|
||||
|
||||
def should_update(song_id):
|
||||
lock_expiry_seconds = get_lock_expiry()
|
||||
if lock_expiry_seconds == 0:
|
||||
return True
|
||||
last_update_ts = LOCK.get(song_id)
|
||||
if not last_update_ts:
|
||||
return True
|
||||
return (time.time() - last_update_ts) > lock_expiry_seconds
|
||||
|
||||
|
||||
def get_lock_expiry():
|
||||
if (BASE_LOCK_DURATION == 0):
|
||||
return 0 # No lock duration, force update every time
|
||||
|
||||
base_expiry = timedelta(days=BASE_LOCK_DURATION)
|
||||
jitter = timedelta(hours=random.uniform(-LOCK_JITTER/2, LOCK_JITTER/2))
|
||||
expiry = base_expiry + jitter
|
||||
# Ensure expiry is at least 1 day
|
||||
expiry = max(expiry, timedelta(days=1))
|
||||
return expiry.total_seconds()
|
||||
|
||||
# Set up the stream handler (console logging) without timestamp
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setFormatter(SafeAsciiFormatter("%(message)s"))
|
||||
logging.basicConfig(level=logging.INFO, handlers=[console_handler])
|
||||
|
||||
# Set up the file handler (file logging) with timestamp
|
||||
file_handler = logging.FileHandler(LOGFILE, "a", encoding="ascii", errors="backslashreplace")
|
||||
file_handler.setFormatter(SafeAsciiFormatter("[%(asctime)s] %(message)s"))
|
||||
logging.getLogger().addHandler(file_handler)
|
||||
|
||||
# Authentication
|
||||
spotify_token_manager = None
|
||||
SPOTIFY_TOKEN = None
|
||||
|
||||
init(autoreset=True)
|
||||
|
||||
@@ -102,6 +181,7 @@ ARTISTS_PROCESSED = 0
|
||||
TOTAL_TRACKS = 0
|
||||
FOUND_AND_UPDATED = 0
|
||||
NOT_FOUND = 0
|
||||
SKIPPED_RATED = 0
|
||||
UNMATCHED_TRACKS = []
|
||||
|
||||
# Parse arguments
|
||||
@@ -142,6 +222,36 @@ parser.add_argument(
|
||||
type=int,
|
||||
help="limit to processing [NUM] artists from the start index",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
"--lock-duration",
|
||||
type=int,
|
||||
default=7,
|
||||
help="Number of days to lock song updates (0 to force update every time)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-j",
|
||||
"--lock-jitter",
|
||||
type=int,
|
||||
default=24,
|
||||
help="Number of hours to add random jitter to the lock duration",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--provider",
|
||||
choices=["spotify", "lastfm", "musicbrainz"],
|
||||
default="spotify",
|
||||
help="Popularity provider to use for updates",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--unrated-only",
|
||||
action="store_true",
|
||||
help="Only update songs that do not already have a Navidrome rating",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-v", "--version", action="version", version=f"%(prog)s {__version__}"
|
||||
)
|
||||
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -149,6 +259,31 @@ ARTIST_IDs = args.artist if args.artist else []
|
||||
ALBUM_IDs = args.album if args.album else []
|
||||
START = args.start
|
||||
LIMIT = args.limit
|
||||
BASE_LOCK_DURATION = args.lock_duration
|
||||
LOCK_JITTER = args.lock_jitter
|
||||
PROVIDER = args.provider
|
||||
UNRATED_ONLY = args.unrated_only
|
||||
|
||||
# Build only the provider-specific client we actually need.
|
||||
if PROVIDER == "spotify":
|
||||
if not SPOTIFY_CLIENT_ID or not SPOTIFY_CLIENT_SECRET:
|
||||
logging.error(f"{LIGHT_RED}Config Error: SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET are required when using --provider spotify.{RESET}")
|
||||
sys.exit(1)
|
||||
spotify_token_manager = SpotifyTokenManager(
|
||||
SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, TOKEN_URL
|
||||
)
|
||||
assert spotify_token_manager is not None
|
||||
SPOTIFY_TOKEN = spotify_token_manager.get_token()
|
||||
elif PROVIDER == "lastfm":
|
||||
if not LASTFM_API_KEY:
|
||||
logging.error(f"{LIGHT_RED}Config Error: LASTFM_API_KEY is required when using --provider lastfm.{RESET}")
|
||||
sys.exit(1)
|
||||
|
||||
logging.info(f"{BOLD}Version:{RESET} {LIGHT_YELLOW}sptnr v{__version__}{RESET}")
|
||||
|
||||
LOCK = load_lock()
|
||||
|
||||
SHOULD_DELAY = False
|
||||
|
||||
if args.preview:
|
||||
logging.info(f"{LIGHT_YELLOW}Preview mode, no changes will be made.{RESET}")
|
||||
@@ -164,7 +299,7 @@ if ARTIST_IDs and (START != 0 or LIMIT != 0):
|
||||
|
||||
if not args.preview:
|
||||
logging.info(
|
||||
f"{BOLD}Syncing Spotify {LIGHT_CYAN}popularity{RESET}{BOLD} with Navidrome {LIGHT_BLUE}rating{RESET}...{RESET}"
|
||||
f"{BOLD}Syncing {PROVIDER.title()} {LIGHT_CYAN}popularity{RESET}{BOLD} with Navidrome {LIGHT_BLUE}rating{RESET}...{RESET}"
|
||||
)
|
||||
|
||||
|
||||
@@ -186,99 +321,552 @@ def url_encode(string):
|
||||
return urllib.parse.quote_plus(string)
|
||||
|
||||
|
||||
def get_rating_from_popularity(popularity):
|
||||
popularity = float(popularity)
|
||||
if popularity < 16.66:
|
||||
# Convert the popularity-like score into Navidrome's 0-5 rating buckets.
|
||||
def get_rating_from_popularity(provider_popularity):
|
||||
provider_popularity = float(provider_popularity)
|
||||
if provider_popularity < 16.66:
|
||||
return 0
|
||||
elif popularity < 33.33:
|
||||
elif provider_popularity < 33.33:
|
||||
return 1
|
||||
elif popularity < 50:
|
||||
elif provider_popularity < 50:
|
||||
return 2
|
||||
elif popularity < 66.66:
|
||||
elif provider_popularity < 66.66:
|
||||
return 3
|
||||
elif popularity < 83.33:
|
||||
elif provider_popularity < 83.33:
|
||||
return 4
|
||||
else:
|
||||
return 5
|
||||
|
||||
|
||||
# Read the current Navidrome rating for this track so unrated-only mode can skip already-rated songs.
|
||||
def get_existing_rating(track_id):
|
||||
nav_url = f"{NAV_BASE_URL}/rest/getSong?id={track_id}&u={NAV_USER}&p=enc:{HEX_ENCODED_PASS}&v=1.12.0&c=myapp&f=json"
|
||||
try:
|
||||
response = requests.get(nav_url, timeout=5)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
song = data["subsonic-response"]["song"]
|
||||
rating_value = song.get("rating", song.get("userRating", 0))
|
||||
if rating_value in (None, ""):
|
||||
return 0
|
||||
return int(rating_value)
|
||||
except (requests.exceptions.RequestException, ValueError, KeyError, TypeError) as e:
|
||||
logging.warning(f"{LIGHT_YELLOW}Unable to read existing rating for {track_id}: {e}{RESET}")
|
||||
return None
|
||||
|
||||
|
||||
# Process one track end-to-end: skip locked or already-rated songs, look up popularity, then write the Navidrome rating.
|
||||
def process_track(track_id, artist_name, album, track_name):
|
||||
def search_spotify(query):
|
||||
|
||||
# Declare global variables
|
||||
global FOUND_AND_UPDATED, UNMATCHED_TRACKS, NOT_FOUND, TOTAL_TRACKS, SKIPPED_RATED
|
||||
|
||||
# If the user asked for unrated-only, skip anything that already has a Navidrome score.
|
||||
existing_rating = get_existing_rating(track_id)
|
||||
if UNRATED_ONLY and existing_rating not in (None, 0):
|
||||
logging.info(
|
||||
f" {LIGHT_YELLOW}Skipping{RESET} {track_name} (Navidrome Rating: {existing_rating})"
|
||||
)
|
||||
SKIPPED_RATED += 1
|
||||
TOTAL_TRACKS += 1
|
||||
return
|
||||
|
||||
|
||||
if not should_update(track_id):
|
||||
print(f"Skipping {track_name}, recently updated.")
|
||||
return
|
||||
|
||||
def search_spotify(query, max_retries=3):
|
||||
# search_spotify: query Spotify's search API and return the track object.
|
||||
# The returned track includes a 0-100 `popularity` field, so no extra
|
||||
# follow-up lookup is required for Spotify ratings.
|
||||
global SHOULD_DELAY
|
||||
SHOULD_DELAY = True
|
||||
|
||||
assert spotify_token_manager is not None
|
||||
SPOTIFY_TOKEN = spotify_token_manager.get_token()
|
||||
|
||||
spotify_url = f"https://api.spotify.com/v1/search?q={query}&type=track&limit=1"
|
||||
headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"}
|
||||
|
||||
try:
|
||||
response = requests.get(spotify_url, headers=headers)
|
||||
except requests.exceptions.ConnectionError:
|
||||
logging.error(f"{LIGHT_RED}Spotify Error: Unable to reach server.{RESET}")
|
||||
sys.exit(1)
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
response = requests.get(spotify_url, headers=headers, timeout=10)
|
||||
|
||||
if response.status_code != 200:
|
||||
if response.status_code == 429:
|
||||
logging.error(
|
||||
f"{LIGHT_RED}Spotify Error {response.status_code}: Retry after {BOLD}{response.headers.get('Retry-After', 'some time')}s{RESET}"
|
||||
# Handle rate limiting
|
||||
if response.status_code == 429:
|
||||
retry_after = int(response.headers.get('Retry-After', 5))
|
||||
logging.warning(f"Rate limited. Retrying after {retry_after} seconds...")
|
||||
time.sleep(retry_after)
|
||||
continue
|
||||
|
||||
# Handle server errors with retry
|
||||
if response.status_code >= 500:
|
||||
wait_time = (attempt + 1) * 2 # Exponential backoff factor
|
||||
logging.warning(f"Spotify server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...")
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||
wait_time = (attempt + 1) * 2
|
||||
logging.warning(f"Connection error: {e}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...")
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Request failed: {e}")
|
||||
break
|
||||
|
||||
# If we get here, all retries failed
|
||||
logging.error(f"Failed after {max_retries} attempts for query: {query}")
|
||||
return None
|
||||
|
||||
def search_lastfm(track_artist, track_title, max_retries=3):
|
||||
# search_lastfm: call Last.fm's track.getInfo and return the track info.
|
||||
# Last.fm provides `playcount` and `listeners` in the track info; we blend
|
||||
# those into a 0-100 popularity-like value - no separate rating lookup needed.
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
response = requests.get(
|
||||
LASTFM_API_URL,
|
||||
params={
|
||||
"method": "track.getInfo",
|
||||
"api_key": LASTFM_API_KEY,
|
||||
"artist": track_artist,
|
||||
"track": track_title,
|
||||
"autocorrect": 1,
|
||||
"format": "json",
|
||||
},
|
||||
timeout=10,
|
||||
)
|
||||
else:
|
||||
logging.error(
|
||||
f"{LIGHT_RED}Spotify Error {response.status_code}: {response.text}{RESET}"
|
||||
data = response.json()
|
||||
|
||||
if data.get("error") == 29:
|
||||
retry_after = (attempt + 1) * 2
|
||||
logging.warning(
|
||||
f"Last.fm rate limit exceeded. Attempt {attempt + 1}/{max_retries}. Waiting {retry_after}s..."
|
||||
)
|
||||
time.sleep(retry_after)
|
||||
continue
|
||||
|
||||
if data.get("error"):
|
||||
logging.warning(
|
||||
f"Last.fm error {data.get('error')}: {data.get('message', 'Unknown error')}"
|
||||
)
|
||||
return None
|
||||
|
||||
return data
|
||||
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||
wait_time = (attempt + 1) * 2
|
||||
logging.warning(f"Connection error: {e}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...")
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Request failed: {e}")
|
||||
break
|
||||
|
||||
logging.error(f"Failed after {max_retries} attempts for query: {track_artist} - {track_title}")
|
||||
return None
|
||||
|
||||
def normalize_lastfm_title(title):
|
||||
normalized = re.sub(r"[^\w\s]", " ", title.casefold())
|
||||
return re.sub(r"\s+", " ", normalized).strip()
|
||||
|
||||
def get_lastfm_artist_top_tracks(track_artist, max_retries=3):
|
||||
if track_artist in LASTFM_ARTIST_TOP_TRACKS:
|
||||
return LASTFM_ARTIST_TOP_TRACKS[track_artist]
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
response = requests.get(
|
||||
LASTFM_API_URL,
|
||||
params={
|
||||
"method": "artist.getTopTracks",
|
||||
"api_key": LASTFM_API_KEY,
|
||||
"artist": track_artist,
|
||||
"autocorrect": 1,
|
||||
"limit": 500,
|
||||
"format": "json",
|
||||
},
|
||||
timeout=10,
|
||||
)
|
||||
sys.exit(1)
|
||||
try:
|
||||
return response.json()
|
||||
except ValueError as e:
|
||||
logging.error(
|
||||
f"{LIGHT_RED}Spotify Error: Error decoding JSON from Spotify API: {e}{RESET}"
|
||||
)
|
||||
sys.exit(1)
|
||||
data = response.json()
|
||||
|
||||
if data.get("error") == 29:
|
||||
retry_after = (attempt + 1) * 2
|
||||
logging.warning(
|
||||
f"Last.fm rate limit exceeded. Attempt {attempt + 1}/{max_retries}. Waiting {retry_after}s..."
|
||||
)
|
||||
time.sleep(retry_after)
|
||||
continue
|
||||
|
||||
if data.get("error"):
|
||||
logging.warning(
|
||||
f"Last.fm artist top tracks error {data.get('error')}: {data.get('message', 'Unknown error')}"
|
||||
)
|
||||
LASTFM_ARTIST_TOP_TRACKS[track_artist] = {}
|
||||
return {}
|
||||
|
||||
tracks = data.get("toptracks", {}).get("track", [])
|
||||
rank_by_title = {}
|
||||
for index, track in enumerate(tracks):
|
||||
track_name = track.get("name")
|
||||
if not track_name:
|
||||
continue
|
||||
normalized_title = normalize_lastfm_title(track_name)
|
||||
if normalized_title in rank_by_title:
|
||||
continue
|
||||
rank_by_title[normalized_title] = {
|
||||
"rank": index + 1,
|
||||
"name": track_name,
|
||||
"listeners": max(0, int(track.get("listeners", 0))),
|
||||
"playcount": max(0, int(track.get("playcount", 0))),
|
||||
}
|
||||
LASTFM_ARTIST_TOP_TRACKS[track_artist] = rank_by_title
|
||||
return rank_by_title
|
||||
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||
wait_time = (attempt + 1) * 2
|
||||
logging.warning(f"Connection error: {e}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s...")
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Request failed: {e}")
|
||||
break
|
||||
|
||||
logging.error(f"Failed after {max_retries} attempts for Last.fm top tracks: {track_artist}")
|
||||
LASTFM_ARTIST_TOP_TRACKS[track_artist] = {}
|
||||
return {}
|
||||
|
||||
# MusicBrainz is using first recording/release here: we search recordings/release-group by artist/title,
|
||||
# optionally narrow the search by album, and use the first recording/release that
|
||||
# comes back for the rating lookup. The album only helps narrow the match;
|
||||
# it does not use release-group's recordings/releases average ratings.
|
||||
# search_musicbrainz: search MusicBrainz recordings and return metadata (including MBID).
|
||||
# Note: the search response does NOT include ratings; use lookup_musicbrainz_rating()
|
||||
# with the returned recording id to fetch the recording's rating value.
|
||||
def search_musicbrainz(track_artist, track_title, track_album=None, max_retries=3):
|
||||
def escape_lucene(value):
|
||||
return value.replace('"', '\\"')
|
||||
|
||||
query_parts = [
|
||||
f'recording:"{escape_lucene(track_title)}"',
|
||||
f'artist:"{escape_lucene(track_artist)}"',
|
||||
]
|
||||
if track_album:
|
||||
query_parts.append(f'release:"{escape_lucene(track_album)}"')
|
||||
|
||||
query = " AND ".join(query_parts)
|
||||
search_context = f"artist={track_artist!r}, title={track_title!r}"
|
||||
if track_album:
|
||||
search_context += f", album={track_album!r}"
|
||||
headers = {
|
||||
"User-Agent": f"sptnr/{__version__} (https://github.com/krestaino/sptnr)",
|
||||
"Accept": "application/json",
|
||||
}
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
global MUSICBRAINZ_LAST_REQUEST_AT
|
||||
elapsed = time.time() - MUSICBRAINZ_LAST_REQUEST_AT
|
||||
if elapsed < 1:
|
||||
time.sleep(1 - elapsed)
|
||||
MUSICBRAINZ_LAST_REQUEST_AT = time.time()
|
||||
|
||||
response = requests.get(
|
||||
f"{MUSICBRAINZ_API_URL}recording",
|
||||
params={"query": query, "fmt": "json", "limit": 1},
|
||||
headers=headers,
|
||||
timeout=10,
|
||||
)
|
||||
|
||||
if response.status_code == 429:
|
||||
wait_time = (attempt + 1) * 2
|
||||
logging.warning(
|
||||
f"MusicBrainz rate limit exceeded. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s..."
|
||||
)
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
|
||||
if response.status_code >= 500:
|
||||
wait_time = (attempt + 1) * 2
|
||||
logging.warning(
|
||||
f"MusicBrainz server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s..."
|
||||
)
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
recordings = data.get("recordings", [])
|
||||
if recordings:
|
||||
return data
|
||||
return None
|
||||
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||
wait_time = (attempt + 1) * 2
|
||||
logging.warning(
|
||||
f"Connection error while searching MusicBrainz ({search_context}): {e}. "
|
||||
f"Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s..."
|
||||
)
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Request failed: {e}")
|
||||
break
|
||||
|
||||
logging.error(
|
||||
f"Failed after {max_retries} attempts for MusicBrainz search ({search_context})"
|
||||
)
|
||||
return None
|
||||
|
||||
# lookup_musicbrainz_rating: fetch the recording/{id}?inc=ratings endpoint to
|
||||
# retrieve the recording's rating (value is 0-5). This is separate from the
|
||||
# search step because MusicBrainz intentionally exposes ratings on the
|
||||
# recording lookup endpoint only.
|
||||
def lookup_musicbrainz_rating(recording_id, max_retries=3):
|
||||
rating_context = f"recording_id={recording_id!r}"
|
||||
headers = {
|
||||
"User-Agent": f"sptnr/{__version__} (https://github.com/krestaino/sptnr)",
|
||||
"Accept": "application/json",
|
||||
}
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
global MUSICBRAINZ_LAST_REQUEST_AT
|
||||
elapsed = time.time() - MUSICBRAINZ_LAST_REQUEST_AT
|
||||
if elapsed < 1:
|
||||
time.sleep(1 - elapsed)
|
||||
MUSICBRAINZ_LAST_REQUEST_AT = time.time()
|
||||
|
||||
response = requests.get(
|
||||
f"{MUSICBRAINZ_API_URL}recording/{recording_id}",
|
||||
params={"inc": "ratings", "fmt": "json"},
|
||||
headers=headers,
|
||||
timeout=10,
|
||||
)
|
||||
|
||||
if response.status_code == 429:
|
||||
wait_time = (attempt + 1) * 2
|
||||
logging.warning(
|
||||
f"MusicBrainz rate limit exceeded. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s..."
|
||||
)
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
|
||||
if response.status_code >= 500:
|
||||
wait_time = (attempt + 1) * 2
|
||||
logging.warning(
|
||||
f"MusicBrainz server error {response.status_code}. Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s..."
|
||||
)
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
|
||||
response.raise_for_status()
|
||||
recording_data = response.json()
|
||||
recording = recording_data.get("recording", recording_data)
|
||||
if not isinstance(recording, dict):
|
||||
return None
|
||||
|
||||
for key in ("rating", "user-rating", "user_rating", "userRating"):
|
||||
value = recording.get(key)
|
||||
if isinstance(value, dict):
|
||||
for nested_key in ("value", "rating", "user-rating", "user_rating"):
|
||||
nested_value = value.get(nested_key)
|
||||
if nested_value not in (None, ""):
|
||||
return nested_value
|
||||
elif value not in (None, ""):
|
||||
return value
|
||||
|
||||
return None
|
||||
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||
wait_time = (attempt + 1) * 2
|
||||
logging.warning(
|
||||
f"Connection error while looking up MusicBrainz rating ({rating_context}): {e}. "
|
||||
f"Attempt {attempt + 1}/{max_retries}. Waiting {wait_time}s..."
|
||||
)
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Request failed: {e}")
|
||||
break
|
||||
|
||||
logging.error(
|
||||
f"Failed after {max_retries} attempts for MusicBrainz rating lookup ({rating_context})"
|
||||
)
|
||||
return None
|
||||
|
||||
def remove_parentheses_content(s):
|
||||
"""Remove content inside parentheses from a string."""
|
||||
return re.sub(r"\s*\(.*?\)\s*", " ", s).strip()
|
||||
# Only remove parentheses if they do NOT contain important keywords
|
||||
keywords = ["remix", "instrumental", "edit", "version", "mix", "karaoke", "live", "acoustic", "demo"]
|
||||
def replacer(match):
|
||||
content = match.group(1).lower()
|
||||
if any(k in content for k in keywords):
|
||||
return f"({match.group(1)})" # Keep it
|
||||
return ""
|
||||
return re.sub(r"\((.*?)\)", replacer, s).strip()
|
||||
|
||||
encoded_track_name = url_encode(track_name)
|
||||
encoded_artist_name = url_encode(artist_name)
|
||||
encoded_album = url_encode(album)
|
||||
search_attempts = [
|
||||
# Primary attempt with all info
|
||||
lambda: f"{url_encode(track_name)}%20artist:{url_encode(artist_name)}%20album:{url_encode(album)}",
|
||||
|
||||
# Primary Search (with album)
|
||||
spotify_data = search_spotify(
|
||||
f"{encoded_track_name}%20artist:{encoded_artist_name}%20album:{encoded_album}"
|
||||
# Secondary attempt without album
|
||||
lambda: f"{url_encode(remove_parentheses_content(track_name))}%20artist:{url_encode(artist_name)}",
|
||||
|
||||
# Tertiary attempt with modified track name
|
||||
lambda: f"{url_encode(track_name.replace('Part', 'Pt.'))}%20artist:{url_encode(artist_name)}"
|
||||
]
|
||||
|
||||
spotify_popularity = None
|
||||
lastfm_popularity = None
|
||||
musicbrainz_rating = None
|
||||
navidrome_rating = None
|
||||
sp_track_name = track_name
|
||||
|
||||
if PROVIDER == "spotify":
|
||||
# Spotify gives us popularity directly, so we only need to find the best matching track.
|
||||
provider_data = None
|
||||
for attempt in search_attempts:
|
||||
provider_data = search_spotify(attempt())
|
||||
if provider_data and provider_data.get("tracks", {}).get("items"):
|
||||
break
|
||||
|
||||
if provider_data and provider_data.get("tracks", {}).get("items"):
|
||||
track = provider_data["tracks"]["items"][0]
|
||||
spotify_popularity = track.get("popularity", 0)
|
||||
sp_track_name = track["name"]
|
||||
elif PROVIDER == "lastfm":
|
||||
# Last.fm exposes listeners and playcount instead of a Spotify-style popularity score.
|
||||
# Artist top-track position drives the main score, global listener reach keeps scale
|
||||
# across artists, and plays per listener adds a small capped engagement bonus.
|
||||
lastfm_attempts = [
|
||||
(artist_name, track_name),
|
||||
(artist_name, remove_parentheses_content(track_name)),
|
||||
(artist_name, track_name.replace("Part", "Pt.")),
|
||||
]
|
||||
provider_data = None
|
||||
for attempt_artist, attempt_title in lastfm_attempts:
|
||||
provider_data = search_lastfm(attempt_artist, attempt_title)
|
||||
if provider_data and provider_data.get("track"):
|
||||
break
|
||||
|
||||
if provider_data and provider_data.get("track"):
|
||||
track = provider_data["track"]
|
||||
playcount = max(0, int(track.get("playcount", 0)))
|
||||
listeners = max(0, int(track.get("listeners", 0)))
|
||||
top_tracks = get_lastfm_artist_top_tracks(artist_name)
|
||||
matched_title = track.get("name", track_name)
|
||||
top_track = top_tracks.get(normalize_lastfm_title(matched_title))
|
||||
if not top_track:
|
||||
top_track = top_tracks.get(normalize_lastfm_title(track_name))
|
||||
|
||||
top_track_position = None
|
||||
if top_track:
|
||||
top_track_position = top_track["rank"]
|
||||
matched_title = top_track["name"]
|
||||
listeners = max(listeners, top_track["listeners"])
|
||||
playcount = max(playcount, top_track["playcount"])
|
||||
|
||||
top_track_position_score = 0
|
||||
if top_track_position:
|
||||
top_track_position_score = max(0, 100 - (math.log2(top_track_position) * 10))
|
||||
|
||||
if listeners == 0 or playcount == 0:
|
||||
reach_score = 0
|
||||
engagement_bonus = 0
|
||||
else:
|
||||
plays_per_listener = playcount / listeners
|
||||
reach_score = min(90, math.log10(listeners + 1) * 13)
|
||||
engagement_bonus = min(12, math.log2(plays_per_listener) * 4)
|
||||
lastfm_popularity = round(
|
||||
min(
|
||||
100,
|
||||
(top_track_position_score * 0.50)
|
||||
+ (reach_score * 0.40)
|
||||
+ (engagement_bonus * 0.10),
|
||||
)
|
||||
)
|
||||
sp_track_name = matched_title
|
||||
else: # MusicBrainz ratings come from a lookup on the matched recording MBID.
|
||||
mb_attempts = [
|
||||
(artist_name, track_name, album),
|
||||
(artist_name, remove_parentheses_content(track_name), album),
|
||||
(artist_name, track_name.replace("Part", "Pt."), album),
|
||||
(artist_name, track_name, None),
|
||||
]
|
||||
provider_data = None
|
||||
for attempt_artist, attempt_title, attempt_album in mb_attempts:
|
||||
provider_data = search_musicbrainz(attempt_artist, attempt_title, attempt_album)
|
||||
if provider_data and provider_data.get("recordings"):
|
||||
break
|
||||
|
||||
if provider_data and provider_data.get("recordings"):
|
||||
track = provider_data["recordings"][0]
|
||||
musicbrainz_rating = lookup_musicbrainz_rating(track.get("id"))
|
||||
musicbrainz_rating = max(0.0, min(5.0, float(musicbrainz_rating or 0)))
|
||||
navidrome_rating = 0 if musicbrainz_rating == 0 else max(1, int(musicbrainz_rating + 0.5))
|
||||
sp_track_name = track.get("title", track_name)
|
||||
|
||||
provider_value = (
|
||||
spotify_popularity
|
||||
if spotify_popularity is not None
|
||||
else lastfm_popularity
|
||||
if lastfm_popularity is not None
|
||||
else musicbrainz_rating
|
||||
)
|
||||
|
||||
found_track = len(spotify_data.get("tracks", {}).get("items", [])) > 0
|
||||
if provider_value is not None:
|
||||
# MusicBrainz ratings are already 0-5, so use directly; others are 0-100 and need mapping.
|
||||
navidrome_rating = navidrome_rating if PROVIDER == "musicbrainz" else get_rating_from_popularity(provider_value)
|
||||
provider_value_str = f"{provider_value} " if 0 <= provider_value <= 9 else str(provider_value)
|
||||
source_label = "s" if PROVIDER == "musicbrainz" else "p"
|
||||
|
||||
if not found_track:
|
||||
# Secondary Search (without album and parentheses content)
|
||||
sanitized_track_name = url_encode(remove_parentheses_content(track_name))
|
||||
spotify_data = search_spotify(
|
||||
f"{sanitized_track_name}%20artist:{encoded_artist_name}"
|
||||
)
|
||||
found_track = len(spotify_data.get("tracks", {}).get("items", [])) > 0
|
||||
if navidrome_rating == 0:
|
||||
logging.info(f" {source_label}:{LIGHT_CYAN}{provider_value_str}{RESET} | Skipping {track_name} (Navidrome Rating: 0)")
|
||||
else:
|
||||
logging.info(f" {source_label}:{LIGHT_CYAN}{provider_value_str}{RESET} -> r:{LIGHT_BLUE}{navidrome_rating}{RESET} | {LIGHT_GREEN}{track_name} - {sp_track_name}{RESET}")
|
||||
|
||||
if found_track:
|
||||
popularity = spotify_data["tracks"]["items"][0].get("popularity", 0)
|
||||
rating = get_rating_from_popularity(popularity)
|
||||
popularity_str = f"{popularity} " if 0 <= popularity <= 9 else str(popularity)
|
||||
message = f" p:{LIGHT_CYAN}{popularity_str}{RESET} → r:{LIGHT_BLUE}{rating}{RESET} | {LIGHT_GREEN}{track_name}{RESET}"
|
||||
logging.info(message)
|
||||
if PREVIEW != 1:
|
||||
nav_url = f"{NAV_BASE_URL}/rest/setRating?u={NAV_USER}&p=enc:{HEX_ENCODED_PASS}&v=1.12.0&c=myapp&id={track_id}&rating={rating}"
|
||||
requests.get(nav_url)
|
||||
global FOUND_AND_UPDATED
|
||||
FOUND_AND_UPDATED += 1
|
||||
if navidrome_rating == 0:
|
||||
pass
|
||||
elif PREVIEW != 1:
|
||||
try:
|
||||
nav_url = f"{NAV_BASE_URL}/rest/setRating?u={NAV_USER}&p=enc:{HEX_ENCODED_PASS}&v=1.12.0&c=myapp&id={track_id}&rating={navidrome_rating}"
|
||||
requests.get(nav_url, timeout=5)
|
||||
FOUND_AND_UPDATED += 1
|
||||
LOCK[track_id] = time.time()
|
||||
save_lock(LOCK)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Failed to update rating in Navidrome: {e}")
|
||||
else:
|
||||
logging.info(
|
||||
f" p:{LIGHT_RED}??{RESET} → r:{LIGHT_BLUE}0{RESET} | {LIGHT_RED}(not found) {track_name}{RESET}"
|
||||
)
|
||||
global UNMATCHED_TRACKS
|
||||
logging.info(f" p:{LIGHT_RED}??{RESET} -> r:{LIGHT_BLUE}0{RESET} | {LIGHT_RED}(not found) {track_name}{RESET}")
|
||||
UNMATCHED_TRACKS.append(f"{artist_name} - {album} - {track_name}")
|
||||
global NOT_FOUND
|
||||
NOT_FOUND += 1
|
||||
|
||||
global TOTAL_TRACKS
|
||||
if PREVIEW != 1:
|
||||
try:
|
||||
nav_url = f"{NAV_BASE_URL}/rest/setRating?u={NAV_USER}&p=enc:{HEX_ENCODED_PASS}&v=1.12.0&c=myapp&id={track_id}&rating=0"
|
||||
requests.get(nav_url, timeout=5)
|
||||
LOCK[track_id] = time.time()
|
||||
save_lock(LOCK)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Failed to update rating in Navidrome: {e}")
|
||||
|
||||
|
||||
TOTAL_TRACKS += 1
|
||||
|
||||
|
||||
def process_album(album_id):
|
||||
|
||||
global SHOULD_DELAY
|
||||
|
||||
if SHOULD_DELAY:
|
||||
# sleep for a short time to avoid hitting rate limits too quickly
|
||||
time.sleep(4)
|
||||
|
||||
SHOULD_DELAY = False
|
||||
nav_url = f"{NAV_BASE_URL}/rest/getAlbum?id={album_id}&u={NAV_USER}&p=enc:{HEX_ENCODED_PASS}&v=1.12.0&c=spotify_sync&f=json"
|
||||
response = requests.get(nav_url).json()
|
||||
|
||||
@@ -401,7 +989,7 @@ else:
|
||||
|
||||
logging.info("")
|
||||
logging.info(
|
||||
f"Artist: {LIGHT_PURPLE}{ARTIST_NAME}{RESET} ({ARTIST_ID})[{index}]"
|
||||
f"Artist: {LIGHT_PURPLE}{ARTIST_NAME}{RESET} ({ARTIST_ID})[{index+args.start}]"
|
||||
)
|
||||
process_artist(ARTIST_ID)
|
||||
|
||||
@@ -410,15 +998,21 @@ else:
|
||||
|
||||
# Display the results
|
||||
logging.info("")
|
||||
MATCH_PERCENTAGE = (FOUND_AND_UPDATED / TOTAL_TRACKS) * 100 if TOTAL_TRACKS != 0 else 0
|
||||
processable_tracks = TOTAL_TRACKS - SKIPPED_RATED if UNRATED_ONLY else TOTAL_TRACKS
|
||||
MATCH_PERCENTAGE = (FOUND_AND_UPDATED / processable_tracks) * 100 if processable_tracks != 0 else 0
|
||||
FORMATTED_MATCH_PERCENTAGE = round(MATCH_PERCENTAGE, 2) # Rounding to 2 decimal places
|
||||
TOTAL_BLOCKS = 20
|
||||
|
||||
color_found = LIGHT_GREEN if FOUND_AND_UPDATED == TOTAL_TRACKS else LIGHT_YELLOW
|
||||
color_found_white = LIGHT_GREEN if FOUND_AND_UPDATED == TOTAL_TRACKS else BOLD
|
||||
color_found = LIGHT_GREEN if FOUND_AND_UPDATED == processable_tracks else LIGHT_YELLOW
|
||||
color_found_white = LIGHT_GREEN if FOUND_AND_UPDATED == processable_tracks else BOLD
|
||||
color_not_found = LIGHT_GREEN if NOT_FOUND == 0 else LIGHT_RED
|
||||
blocks_found = "█" * round(FOUND_AND_UPDATED * TOTAL_BLOCKS / TOTAL_TRACKS)
|
||||
blocks_not_found = "█" * (TOTAL_BLOCKS - len(blocks_found))
|
||||
|
||||
if processable_tracks == 0:
|
||||
blocks_found = ""
|
||||
blocks_not_found = ""
|
||||
else:
|
||||
blocks_found = "#" * round(FOUND_AND_UPDATED * TOTAL_BLOCKS / processable_tracks)
|
||||
blocks_not_found = "-" * (TOTAL_BLOCKS - len(blocks_found))
|
||||
full_blocks_found = f"{color_found_white}{blocks_found}{RESET}"
|
||||
full_blocks_not_found = f"{color_not_found}{blocks_not_found}{RESET}"
|
||||
|
||||
@@ -438,6 +1032,15 @@ if seconds or not parts: # Show seconds if it's the only value, even if it's 0
|
||||
formatted_elapsed_time = " ".join(parts)
|
||||
|
||||
# logging.info(f"Processing completed in {int(hours):02}:{int(minutes):02}:{int(seconds):02}")
|
||||
logging.info(
|
||||
f"Tracks: {LIGHT_PURPLE}{TOTAL_TRACKS}{RESET} | Found: {color_found}{FOUND_AND_UPDATED}{RESET} |{full_blocks_found}{full_blocks_not_found}| Not Found: {color_not_found}{NOT_FOUND}{RESET} | Match: {color_found}{FORMATTED_MATCH_PERCENTAGE}%{RESET} | Time: {LIGHT_PURPLE}{formatted_elapsed_time}{RESET}"
|
||||
summary_line = (
|
||||
f"Tracks: {LIGHT_PURPLE}{TOTAL_TRACKS}{RESET} | "
|
||||
f"Found: {color_found}{FOUND_AND_UPDATED}{RESET}"
|
||||
)
|
||||
if SKIPPED_RATED:
|
||||
summary_line += f" | Skipped: {LIGHT_YELLOW}{SKIPPED_RATED}{RESET}"
|
||||
summary_line += (
|
||||
f" | Not Found: {color_not_found}{NOT_FOUND}{RESET} | "
|
||||
f"Match: {color_found}{FORMATTED_MATCH_PERCENTAGE}%{RESET} | "
|
||||
f"Time: {LIGHT_PURPLE}{formatted_elapsed_time}{RESET}"
|
||||
)
|
||||
logging.info(summary_line)
|
||||
|
||||
Reference in New Issue
Block a user