Implement token refreshing

This commit is contained in:
EffakT
2025-05-27 08:55:45 +12:00
committed by GitHub
parent 8aec18a580
commit 4599a8c392
+46 -20
View File
@@ -47,6 +47,45 @@ if not os.path.exists(LOG_DIR):
LOGFILE = os.path.join(LOG_DIR, f"spotify-popularity_{int(time.time())}.log")
HEX_ENCODED_PASS = NAV_PASS.encode().hex()
TOKEN_AUTH = base64.b64encode(
f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode()
).decode()
TOKEN_URL = "https://accounts.spotify.com/api/token"
class SpotifyTokenManager:
def __init__(self, client_id, client_secret, token_url):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.token = None
self.expires_at = 0
self._authenticate()
def _authenticate(self):
token_auth = base64.b64encode(
f"{self.client_id}:{self.client_secret}".encode()
).decode()
response = requests.post(
self.token_url,
headers={"Authorization": f"Basic {token_auth}"},
data={"grant_type": "client_credentials"},
)
if response.status_code != 200:
error_info = response.json()
error_description = error_info.get("error_description", "Unknown error")
logging.error(
f"{LIGHT_RED}Spotify Authentication Error: {error_description}{RESET}"
)
sys.exit(1)
token_data = response.json()
self.token = token_data["access_token"]
self.expires_at = time.time() + token_data["expires_in"] - 60 # refresh 1 min early
def get_token(self):
if time.time() >= self.expires_at:
self._authenticate()
return self.token
class NoColorFormatter(logging.Formatter):
ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
@@ -66,27 +105,11 @@ file_handler = logging.FileHandler(LOGFILE, "a")
file_handler.setFormatter(NoColorFormatter("[%(asctime)s] %(message)s"))
logging.getLogger().addHandler(file_handler)
# Auth
HEX_ENCODED_PASS = NAV_PASS.encode().hex()
TOKEN_AUTH = base64.b64encode(
f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode()
).decode()
TOKEN_URL = "https://accounts.spotify.com/api/token"
response = requests.post(
TOKEN_URL,
headers={"Authorization": f"Basic {TOKEN_AUTH}"},
data={"grant_type": "client_credentials"},
# Authentication
spotify_token_manager = SpotifyTokenManager(
SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, TOKEN_URL
)
if response.status_code != 200:
error_info = response.json() # Assuming the error response is in JSON format
error_description = error_info.get("error_description", "Unknown error")
logging.error(
f"{LIGHT_RED}Spotify Authentication Error: {error_description}{RESET}"
)
sys.exit(1)
SPOTIFY_TOKEN = response.json()["access_token"]
SPOTIFY_TOKEN = spotify_token_manager.get_token()
init(autoreset=True)
@@ -215,6 +238,9 @@ def process_track(track_id, artist_name, album, track_name):
global FOUND_AND_UPDATED, UNMATCHED_TRACKS, NOT_FOUND, TOTAL_TRACKS
def search_spotify(query, max_retries=3):
SPOTIFY_TOKEN = spotify_token_manager.get_token()
spotify_url = f"https://api.spotify.com/v1/search?q={query}&type=track&limit=1"
headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"}