Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 82 additions & 40 deletions beetsplug/duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@

"""List duplicate tracks or albums."""

from __future__ import annotations

import os
import shlex
from typing import TYPE_CHECKING, Any, TypeAlias, cast

from beets.library import Album, Item
from beets.plugins import BeetsPlugin
Expand All @@ -28,8 +31,23 @@
subprocess,
)

if TYPE_CHECKING:
import optparse
from collections import defaultdict
from collections.abc import Iterator

from beets.dbcore.db import Results
from beets.library.library import Library

PLUGIN = "duplicates"

# Value of `tiebreak` config item.
# Really the key is "item" or "album".
Tiebreak: TypeAlias = dict[str, list[str]]
# Attribute values; these are formed by taking the values of `keys` (a list of
# strings) on the items. These are used as dict keys in a bunch of places.
KeyValues: TypeAlias = tuple[Any]


class DuplicatesPlugin(BeetsPlugin):
"""List duplicate tracks or albums"""
Expand Down Expand Up @@ -57,7 +75,9 @@
}
)

self._command = Subcommand("duplicates", help=__doc__, aliases=["dup"])
self._command = Subcommand(
"duplicates", help=cast(str, __doc__), aliases=["dup"]
)
self._command.parser.add_option(
"-c",
"--count",
Expand Down Expand Up @@ -142,23 +162,23 @@
self._command.parser.add_all_common_options()

def commands(self):
def _dup(lib, opts, args):
def _dup(lib: Library, opts: optparse.Values, args: list[str]):
self.config.set_args(opts)
album = self.config["album"].get(bool)
checksum = self.config["checksum"].get(str)
copy = bytestring_path(self.config["copy"].as_str())
count = self.config["count"].get(bool)
delete = self.config["delete"].get(bool)
remove = self.config["remove"].get(bool)
fmt_tmpl = self.config["format"].get(str)
full = self.config["full"].get(bool)
keys = self.config["keys"].as_str_seq()
merge = self.config["merge"].get(bool)
move = bytestring_path(self.config["move"].as_str())
path = self.config["path"].get(bool)
tiebreak = self.config["tiebreak"].get(dict)
strict = self.config["strict"].get(bool)
tag = self.config["tag"].get(str)
album: bool = self.config["album"].get(bool) # type: ignore
checksum: str = self.config["checksum"].get(str) # type: ignore
copy: bytes = bytestring_path(self.config["copy"].as_str()) # type: ignore
count: bool = self.config["count"].get(bool) # type: ignore
delete: bool = self.config["delete"].get(bool) # type: ignore
remove: bool = self.config["remove"].get(bool) # type: ignore
fmt_tmpl: str = self.config["format"].get(str) # type: ignore
full: bool = self.config["full"].get(bool) # type: ignore
keys: list[str] = self.config["keys"].as_str_seq() # type: ignore
merge: bool = self.config["merge"].get(bool) # type: ignore
move: bytes = bytestring_path(self.config["move"].as_str()) # type: ignore
path: bool = self.config["path"].get(bool) # type: ignore
tiebreak: Tiebreak = self.config["tiebreak"].get(dict) # type: ignore
strict: bool = self.config["strict"].get(bool) # type: ignore
tag: str = self.config["tag"].get(str) # type: ignore

if album:
if not keys:
Expand All @@ -167,7 +187,7 @@
else:
if not keys:
keys = ["mb_trackid", "mb_albumid"]
items = lib.items(args)

Check failure on line 190 in beetsplug/duplicates.py

View workflow job for this annotation

GitHub Actions / Check types with mypy

Incompatible types in assignment (expression has type "Results[Item]", variable has type "Results[Album]")

# If there's nothing to do, return early. The code below assumes
# `items` to be non-empty.
Expand All @@ -185,9 +205,11 @@
fmt_tmpl = "$albumartist - $album - $title"

if checksum:
k = None
for i in items:
k, _ = self._checksum(i, checksum)
keys = [k]
if k is not None:
keys = [k]

for obj_id, obj_count, objs in self._duplicates(
items,
Expand All @@ -214,13 +236,13 @@

def _process_item(
self,
item,
copy=False,
move=False,
delete=False,
tag=False,
fmt="",
remove=False,
item: Item | Album,
copy: bytes | None = None,
move: bytes | None = None,
delete: bool = False,
tag: str | None = None,
fmt: str = "",
remove: bool = False,
):
"""Process Item `item`."""
print_(format(item, fmt))
Expand All @@ -242,16 +264,17 @@
setattr(item, k, v)
item.store()

def _checksum(self, item, prog):
def _checksum(
self, item: Item | Album, prog: str
) -> tuple[str, bytes | None]:
"""Run external `prog` on file path associated with `item`, cache
output as flexattr on a key that is the name of the program, and
return the key, checksum tuple.
"""
args = [
p.format(file=os.fsdecode(item.path)) for p in shlex.split(prog)
]
path = os.fsdecode(item.path)
args = [p.format(file=path) for p in shlex.split(prog)]
key = args[0]
checksum = getattr(item, key, False)
checksum = cast(bytes | None, getattr(item, key, None))
if not checksum:
self._log.debug(
"key {} on item {.filepath} not cached:computing checksum",
Expand All @@ -275,7 +298,14 @@
)
return key, checksum

def _group_by(self, objs, keys, strict):
def _group_by(
self,
objs: Results[Album] | Results[Item],
keys: list[str],
strict: bool,
) -> (
defaultdict[KeyValues, list[Album]] | defaultdict[KeyValues, list[Item]]
):
"""Return a dictionary with keys arbitrary concatenations of attributes
and values lists of objects (Albums or Items) with those keys.

Expand Down Expand Up @@ -303,9 +333,13 @@
key = tuple(values)
counts[key].append(obj)

return counts

Check failure on line 336 in beetsplug/duplicates.py

View workflow job for this annotation

GitHub Actions / Check types with mypy

Incompatible return value type (got "defaultdict[tuple[Any | None, ...], list[Album | Item]]", expected "defaultdict[tuple[Any], list[Album]] | defaultdict[tuple[Any], list[Item]]")

def _order(self, objs, tiebreak=None):
def _order(
self,
objs: list[Album] | list[Item],
tiebreak: dict[str, list[str]] | None = None,
) -> list[Album] | list[Item]:
"""Return the objects (Items or Albums) sorted by descending
order of priority.

Expand Down Expand Up @@ -340,7 +374,7 @@
def key(x):
return len(x.items())

return sorted(objs, key=key, reverse=True)
return sorted(objs, key=key, reverse=True) # type: ignore

def _merge_items(self, objs):
"""Merge Item objs by copying missing fields from items in the tail to
Expand All @@ -366,7 +400,7 @@
break
return objs

def _merge_albums(self, objs):
def _merge_albums(self, objs: list[Album]) -> list[Album]:
"""Merge Album objs by copying missing items from albums in the tail
to the head album.

Expand Down Expand Up @@ -400,12 +434,20 @@
objs = self._merge_albums(objs)
return objs

def _duplicates(self, objs, keys, full, strict, tiebreak, merge):
def _duplicates(
self,
objs: Results[Album] | Results[Item],
keys: list[str],
full: bool,
strict: bool,
tiebreak: dict[str, list[str]],
merge: bool,
) -> Iterator[tuple[KeyValues, int, list[Album] | list[Item]]]:
"""Generate triples of keys, duplicate counts, and constituent objects."""
offset = 0 if full else 1
for k, objs in self._group_by(objs, keys, strict).items():
if len(objs) > 1:
objs = self._order(objs, tiebreak)
for k, grouped_objs in self._group_by(objs, keys, strict).items():
if len(grouped_objs) > 1:
ordered_objs = self._order(grouped_objs, tiebreak)
if merge:
objs = self._merge(objs)
yield (k, len(objs) - offset, objs[offset:])
ordered_objs = self._merge(ordered_objs)
yield (k, len(ordered_objs) - offset, ordered_objs[offset:])
Loading