Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 150 additions & 38 deletions beetsplug/lastgenre/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@
from __future__ import annotations

import os
import re
from collections import defaultdict
from functools import singledispatchmethod
from pathlib import Path
from typing import TYPE_CHECKING, Any

import confuse
import yaml

from beets import config, library, plugins, ui
from beets.library import Album, Item
from beets.util import plurality, unique_list
from beetsplug.lastgenre.utils import drop_ignored_genres, is_ignored

from .client import LastFmClient

Expand All @@ -44,6 +48,8 @@
from beets.importer import ImportSession, ImportTask
from beets.library import LibModel

from .utils import GenreIgnorePatterns

Whitelist = set[str]
"""Set of valid genre names (lowercase). Empty set means all genres allowed."""

Expand Down Expand Up @@ -130,6 +136,7 @@ def __init__(self) -> None:
"prefer_specific": False,
"title_case": True,
"pretend": False,
"ignorelist": {},
}
)
self.setup()
Expand All @@ -139,12 +146,13 @@ def setup(self) -> None:
if self.config["auto"]:
self.import_stages = [self.imported]

self.client = LastFmClient(
self._log, self.config["min_weight"].get(int)
)
self.whitelist: Whitelist = self._load_whitelist()
self.c14n_branches: CanonTree
self.c14n_branches, self.canonicalize = self._load_c14n_tree()
self.ignore_patterns: GenreIgnorePatterns = self._load_ignorelist()
self.client = LastFmClient(
self._log, self.config["min_weight"].get(int), self.ignore_patterns
)

def _load_whitelist(self) -> Whitelist:
"""Load the whitelist from a text file.
Expand Down Expand Up @@ -187,6 +195,57 @@ def _load_c14n_tree(self) -> tuple[CanonTree, bool]:
flatten_tree(genres_tree, [], c14n_branches)
return c14n_branches, canonicalize

def _load_ignorelist(self) -> GenreIgnorePatterns:
r"""Load patterns from configuration and compile them.

Mapping of artist names to regex or literal patterns. Use the
quoted ``'*'`` key to define globally ignored genres::

lastgenre:
ignorelist:
'*':
- spoken word
- comedy
Artist Name:
- .*rock.*
- .*metal.*

Matching is case-insensitive and full-match. Because patterns are
parsed as plain YAML scalars, backslashes (e.g. ``\w``) should
not be double-escaped. Quotes are primarily needed for special
YAML characters (e.g., ``*`` or ``[``); prefer single-quotes.

Raises:
Several confuse.ConfigError's that tell the user about the expected
format when the config is invalid.
"""
if not self.config["ignorelist"].get():
return {}

raw_ignorelist = self.config["ignorelist"].get(
confuse.MappingValues(confuse.Sequence(str))
)

compiled_ignorelist: GenreIgnorePatterns = defaultdict(list)
for artist, patterns in raw_ignorelist.items():
artist_patterns = []
for pattern in patterns:
try:
artist_patterns.append(re.compile(pattern, re.IGNORECASE))
except re.error:
artist_patterns.append(
re.compile(re.escape(pattern), re.IGNORECASE)
)
self._log.extra_debug(
"ignore for {}: {}",
artist,
[p.pattern for p in artist_patterns],
)

compiled_ignorelist[artist] = artist_patterns

return compiled_ignorelist

@property
def sources(self) -> tuple[str, ...]:
"""A tuple of allowed genre sources. May contain 'track',
Expand All @@ -202,7 +261,9 @@ def sources(self) -> tuple[str, ...]:

# Genre list processing.

def _resolve_genres(self, tags: list[str]) -> list[str]:
def _resolve_genres(
self, tags: list[str], artist: str | None = None
) -> list[str]:
"""Canonicalize, sort and filter a list of genres.

- Returns an empty list if the input tags list is empty.
Expand All @@ -217,6 +278,9 @@ def _resolve_genres(self, tags: list[str]) -> list[str]:
by the specificity (depth in the canonicalization tree) of the genres.
- Finally applies whitelist filtering to ensure that only valid
genres are kept. (This may result in no genres at all being retained).
- Ignorelist is applied at each stage: ignored input tags skip ancestry
entirely, ignored ancestor tags are dropped, and ignored tags are
removed in the final filter.
- Returns the filtered list of genres, limited to the configured count.
"""
if not tags:
Expand All @@ -229,14 +293,29 @@ def _resolve_genres(self, tags: list[str]) -> list[str]:
# Extend the list to consider tags parents in the c14n tree
tags_all = []
for tag in tags:
# Add parents that are in the whitelist, or add the oldest
# ancestor if no whitelist
# Skip ignored tags entirely — don't walk their ancestry.
if is_ignored(self._log, self.ignore_patterns, tag, artist):
continue

# Add parents that pass whitelist (and are not ignored, which
# is checked in _filter_valid). With whitelist, we may include
# multiple parents
if self.whitelist:
parents = self._filter_valid(
find_parents(tag, self.c14n_branches)
find_parents(tag, self.c14n_branches),
artist=artist,
)
else:
parents = [find_parents(tag, self.c14n_branches)[-1]]
# No whitelist: take only the oldest ancestor, skipping it
# if it is in the ignorelist
oldest = find_parents(tag, self.c14n_branches)[-1]
parents = (
[]
if is_ignored(
self._log, self.ignore_patterns, oldest, artist
)
else [oldest]
)

tags_all += parents
# Stop if we have enough tags already, unless we need to find
Expand All @@ -254,24 +333,34 @@ def _resolve_genres(self, tags: list[str]) -> list[str]:
if self.config["prefer_specific"]:
tags = sort_by_depth(tags, self.c14n_branches)

# c14n only adds allowed genres but we may have had forbidden genres in
# the original tags list
valid_tags = self._filter_valid(tags)
# Final filter: applies when c14n is disabled, or when c14n ran without
# whitelist filtering in the loop (no-whitelist path).
valid_tags = self._filter_valid(tags, artist=artist)
return valid_tags[:count]

def _filter_valid(self, genres: Iterable[str]) -> list[str]:
"""Filter genres based on whitelist.
def _filter_valid(
self, genres: Iterable[str], artist: str | None = None
) -> list[str]:
"""Filter genres through whitelist and ignorelist.

Returns all genres if no whitelist is configured, otherwise returns
only genres that are in the whitelist.
Drops empty/whitespace-only strings, then applies whitelist and
ignorelist checks. Returns all genres if neither is configured.
Whitelist is checked first for performance reasons (ignorelist regex
matching is more expensive and for some call sites ignored genres were
already filtered).
"""
# First, drop any falsy or whitespace-only genre strings to avoid
# retaining empty tags from multi-valued fields.
cleaned = [g for g in genres if g and g.strip()]
if not self.whitelist:
if not self.whitelist and not self.ignore_patterns:
return cleaned

return [g for g in cleaned if g.lower() in self.whitelist]
whitelisted = [
g
for g in cleaned
if not self.whitelist or g.lower() in self.whitelist
]
return drop_ignored_genres(
self._log, self.ignore_patterns, whitelisted, artist
)

# Genre resolution pipeline.

Expand All @@ -282,6 +371,14 @@ def _format_genres(self, tags: list[str]) -> list[str]:
else:
return tags

def _artist_for_filter(self, obj: LibModel) -> str | None:
"""Return the representative artist for genre resolution and filtering."""
return (
obj.artist
if isinstance(obj, library.Item)
else obj.albumartist or obj.artist
)

def _get_existing_genres(self, obj: LibModel) -> list[str]:
"""Return a list of genres for this Item or Album."""
if isinstance(obj, library.Item):
Expand All @@ -292,13 +389,13 @@ def _get_existing_genres(self, obj: LibModel) -> list[str]:
return genres_list

def _combine_resolve_and_log(
self, old: list[str], new: list[str]
self, old: list[str], new: list[str], artist: str | None = None
) -> list[str]:
"""Combine old and new genres and process via _resolve_genres."""
self._log.debug("raw last.fm tags: {}", new)
self._log.debug("existing genres taken into account: {}", old)
combined = old + new
return self._resolve_genres(combined)
return self._resolve_genres(combined, artist=artist)

def _get_genre(self, obj: LibModel) -> tuple[list[str], str]:
"""Get the final genre list for an Album or Item object.
Expand All @@ -320,12 +417,21 @@ def _get_genre(self, obj: LibModel) -> tuple[list[str], str]:
and the whitelist feature was disabled.
"""

def _fallback_stage() -> tuple[list[str], str]:
"""Return the fallback genre and label."""
if fallback := self.config["fallback"].get():
return [fallback], "fallback"
return [], "fallback unconfigured"

def _try_resolve_stage(
stage_label: str, keep_genres: list[str], new_genres: list[str]
stage_label: str,
keep_genres: list[str],
new_genres: list[str],
artist: str | None = None,
) -> tuple[list[str], str] | None:
"""Try to resolve genres for a given stage and log the result."""
resolved_genres = self._combine_resolve_and_log(
keep_genres, new_genres
keep_genres, new_genres, artist=artist
)
if resolved_genres:
suffix = "whitelist" if self.whitelist else "any"
Expand All @@ -345,11 +451,15 @@ def _try_resolve_stage(
# If none are found, we use the fallback (if set).
if self.config["cleanup_existing"]:
keep_genres = [g.lower() for g in genres]
if result := _try_resolve_stage("cleanup", keep_genres, []):
if result := _try_resolve_stage(
"cleanup",
keep_genres,
[],
artist=self._artist_for_filter(obj),
):
return result

# Return fallback string (None if not set).
return self.config["fallback"].get(), "fallback"
return _fallback_stage()

# If cleanup_existing is not set, the pre-populated tags are
# returned as-is.
Expand All @@ -368,7 +478,7 @@ def _try_resolve_stage(
obj.artist, obj.title
):
if result := _try_resolve_stage(
"track", keep_genres, new_genres
"track", keep_genres, new_genres, artist=obj.artist
):
return result

Expand All @@ -377,18 +487,21 @@ def _try_resolve_stage(
obj.albumartist, obj.album
):
if result := _try_resolve_stage(
"album", keep_genres, new_genres
"album", keep_genres, new_genres, artist=obj.albumartist
):
return result

if "artist" in self.sources:
new_genres = []
stage_artist: str | None = None
if isinstance(obj, library.Item):
new_genres = self.client.fetch_artist_genre(obj.artist)
stage_label = "artist"
stage_artist = obj.artist
elif obj.albumartist != config["va_name"].as_str():
new_genres = self.client.fetch_artist_genre(obj.albumartist)
stage_label = "album artist"
stage_artist = obj.albumartist
if not new_genres:
self._log.extra_debug(
'No album artist genre found for "{}", '
Expand All @@ -405,6 +518,9 @@ def _try_resolve_stage(
)
if new_genres:
stage_label = "multi-valued album artist"
stage_artist = (
None # Already filtered per-artist in client
)
else:
# For "Various Artists", pick the most popular track genre.
item_genres = []
Expand All @@ -431,27 +547,23 @@ def _try_resolve_stage(

if new_genres:
if result := _try_resolve_stage(
stage_label, keep_genres, new_genres
stage_label, keep_genres, new_genres, artist=stage_artist
):
return result

# Nothing found, leave original if configured and valid.
if genres and self.config["keep_existing"]:
if valid_genres := self._filter_valid(genres):
if genres and self.config["keep_existing"].get():
artist = self._artist_for_filter(obj)
if valid_genres := self._filter_valid(genres, artist=artist):
return valid_genres, "original fallback"
# If the original genre doesn't match a whitelisted genre, check
# if we can canonicalize it to find a matching, whitelisted genre!
if result := _try_resolve_stage(
"original fallback", keep_genres, []
"original fallback", keep_genres, [], artist=artist
):
return result

# Return fallback as a list.
if fallback := self.config["fallback"].get():
return [fallback], "fallback"

# No fallback configured.
return [], "fallback unconfigured"
return _fallback_stage()

# Beets plugin hooks and CLI.

Expand Down
Loading
Loading