diff --git a/selector/methods/partition.py b/selector/methods/partition.py index 39dc6cae..5baef1be 100644 --- a/selector/methods/partition.py +++ b/selector/methods/partition.py @@ -25,6 +25,7 @@ import collections import math +from typing import List, Union import bitarray import numpy as np @@ -64,15 +65,20 @@ class GridPartition(SelectionBase): """ def __init__( - self, nbins_axis: int, bin_method: str = "equisized_independent", random_seed: int = 42 + self, + nbins_axis: Union[int, List[int]], + bin_method: str = "equisized_independent", + random_seed: int = 42, ): """Initialize class. Parameters ---------- - nbins_axis: int - Number of bins to partition each axis into. The total number of resulting bins is - `numb_bins_axis` raised to the power of the dimensionality of the feature space. + nbins_axis: int or list[int] + Number of bins to partition each axis into. If an integer is provided, the same + number of bins is used for all axes. If a list of integers is provided, each + element specifies the number of bins for the corresponding feature axis. The + length of the list must match the number of features in the data. bin_method: str, optional Method used to partition the sample points into bins. Options include: "equisized_independent", "equisized_dependent", "equifrequent_independent" and @@ -80,8 +86,10 @@ def __init__( random_seed: int, optional Seed for random selection of sample points from each bin. """ - if not isinstance(nbins_axis, int): - raise TypeError(f"Number of bins should be integer, got {type(nbins_axis)}.") + if not isinstance(nbins_axis, (int, list, np.ndarray)): + raise TypeError( + f"Number of bins should be integer or list of integers, got {type(nbins_axis)}." + ) if not isinstance(random_seed, int): raise TypeError(f"The random seed should be integer, got {type(random_seed)}.") if not isinstance(bin_method, str): @@ -211,35 +219,52 @@ def get_bins_from_method(self, X): # sample) and the values are the list of sample indices in that bin. bins = {} + # convert nbins_axis into a per-axis array. + if isinstance(self.nbins_axis, int): + nbins_per_axis = np.full(X.shape[1], self.nbins_axis) + else: + nbins_per_axis = np.array(self.nbins_axis) + if len(nbins_per_axis) != X.shape[1]: + raise ValueError( + f"Length of nbins_axis ({len(nbins_per_axis)}) does not match " + f"number of features ({X.shape[1]})." + ) + if self.bin_method == "equisized_independent": - # partition each dimension/feature independently into `num_bins_axis` bins - unique_bin_index, inverse_index = self.partition_points_to_bins_equisized( - X, self.nbins_axis + # partition each dimension/feature independently using `nbins_per_axis` bin counts + bins_features = np.zeros(X.shape, dtype=int) + for index_feature in range(X.shape[1]): + unique_bin_index, inverse_index = self.partition_points_to_bins_equisized( + X[:, index_feature], nbins_per_axis[index_feature] + ) + bins_features[:, index_feature] = unique_bin_index[inverse_index].flatten() + unique_bin_index, inverse_index = np.unique( + bins_features, return_inverse=True, axis=0 ) # populate bins dictionary for i, key in enumerate(unique_bin_index): bins[tuple(key)] = list(np.where(inverse_index == i)[0]) elif self.bin_method == "equisized_dependent": - # partition the first dimension (1st feature axis) into `num_bins_axis` bins + # partition the first dimension (1st feature axis) into `nbins_per_axis[0]` bins unique_bin_index, inverse_index = self.partition_points_to_bins_equisized( - X[:, 0], self.nbins_axis + X[:, 0], nbins_per_axis[0] ) # populate bins dictionary based on the 1st feature for i, key in enumerate(unique_bin_index): bins[tuple([key])] = list(np.where(inverse_index == i)[0]) # loop over the remaining dimensions (2nd to last feature axis), and for each axis - # partition the points in each bin of the previous axes into `num_bins_axis` bins + # partition the points in each bin of the previous axes into `nbins_per_axis` bins # as a result, each iteration adds a new dimension to the bins dictionary for index_feature in range(1, X.shape[1]): # make a dictionary to store the bins for the current axis bins_axis = {} - # divide points in each bin into `num_bins_axis` bins based on the i-th feature + # divide points in each bin into `nbins_per_axis` bins based on the i-th feature for bin, index_samples in bins.items(): # equisized partition of points in bin along i-th feature unique_bin_index, inverse_index = self.partition_points_to_bins_equisized( - X[index_samples, index_feature], self.nbins_axis + X[index_samples, index_feature], nbins_per_axis[index_feature] ) # update the bins_axis to include the new dimension/feature for the current bin for i, bin_index in enumerate(unique_bin_index): @@ -251,11 +276,11 @@ def get_bins_from_method(self, X): bins = bins_axis elif self.bin_method == "equifrequent_independent": - # partition each dimension of feature space independently into `num_bins_axis` bins + # partition each dimension of feature space independently into `nbins_per_axis` bins bins_features = np.zeros(X.shape, dtype=int) for index_feature in range(0, X.shape[1]): unique_bin_index, inverse_index = self.partition_points_to_bins_equifrequent( - X[:, index_feature], self.nbins_axis + X[:, index_feature], nbins_per_axis[index_feature] ) bins_features[:, index_feature] = unique_bin_index[inverse_index] unique_bin_index, inverse_index = np.unique(bins_features, return_inverse=True, axis=0) @@ -265,24 +290,24 @@ def get_bins_from_method(self, X): bins[tuple(key)] = list(np.where(inverse_index == i)[0]) elif self.bin_method == "equifrequent_dependent": - # partition the first dimension (1st feature axis) into `num_bins_axis` bins + # partition the first dimension (1st feature axis) into `nbins_per_axis[0]` bins unique_bin_index, inverse_index = self.partition_points_to_bins_equifrequent( - X[:, 0], self.nbins_axis + X[:, 0], nbins_per_axis[0] ) # populate bins dictionary based on the 1st feature for i, key in enumerate(unique_bin_index): bins[tuple([key])] = list(np.where(inverse_index == i)[0]) # loop over the remaining dimensions (2nd to last feature axis), and for each axis - # partition the points in each bin of the previous axes into `num_bins_axis` bins + # partition the points in each bin of the previous axes into `nbins_per_axis` bins for index_feature in range(1, X.shape[1]): # make a dictionary to store the bins for the current axis bins_axis = {} - # divide points in each bin, based on the i-th feature, into `num_bins_axis` bins + # divide points in each bin, based on the i-th feature, into `nbins_per_axis` bins for bin, index_samples in bins.items(): # equifrequent partition of points in bin along i-th feature unique_bin_index, inverse_index = self.partition_points_to_bins_equifrequent( - X[index_samples, index_feature], self.nbins_axis + X[index_samples, index_feature], nbins_per_axis[index_feature] ) # update the bins_axis to include the new dimension/feature for the current bin for i, key in enumerate(unique_bin_index): diff --git a/selector/methods/tests/test_partition.py b/selector/methods/tests/test_partition.py index 3e613e2d..8d84eee5 100644 --- a/selector/methods/tests/test_partition.py +++ b/selector/methods/tests/test_partition.py @@ -183,3 +183,65 @@ def test_medoid(): selector = Medoid() selected_ids = selector.select(features, size=2) assert_equal(selected_ids, [0, 3]) + + +def test_grid_partition_per_axis_bins_initialization(): + """Test initializing GridPartition with per-axis bin counts.""" + # Valid list input + selector = GridPartition(nbins_axis=[3, 5]) + assert selector.nbins_axis == [3, 5] + + # Valid int input + selector = GridPartition(nbins_axis=5) + assert selector.nbins_axis == 5 + + +@pytest.mark.parametrize("method", [ + "equisized_independent", + "equisized_dependent", + "equifrequent_independent", + "equifrequent_dependent", +]) +def test_grid_partition_per_axis_bins_execution(method): + """Test execution of all partitioning methods with per-axis bin counts.""" + rng = np.random.default_rng(42) + X = rng.standard_normal((100, 2)) + + nbins_axis = [3, 7] + selector = GridPartition(nbins_axis=nbins_axis, bin_method=method) + + # Should not raise any errors + bins = selector.get_bins_from_method(X) + + # Every sample must appear in exactly one bin + all_indices = [] + for sample_list in bins.values(): + all_indices.extend(sample_list) + assert sorted(all_indices) == list(range(100)) + + +def test_grid_partition_per_axis_mismatch(): + """Test that ValueError is raised when nbins_axis length doesn't match features.""" + rng = np.random.default_rng(42) + X = rng.standard_normal((100, 2)) + + # 3 bin counts for 2 features should fail + selector = GridPartition(nbins_axis=[2, 5, 10]) + with assert_raises(ValueError): + selector.get_bins_from_method(X) + + +def test_grid_partition_backward_compatibility(): + """Ensure single integer nbins_axis produces identical results to new list form.""" + rng = np.random.default_rng(42) + X = rng.standard_normal((50, 2)) + + selector_int = GridPartition(nbins_axis=5, bin_method="equifrequent_independent") + selector_list = GridPartition(nbins_axis=[5, 5], bin_method="equifrequent_independent") + + bins_int = selector_int.get_bins_from_method(X) + bins_list = selector_list.get_bins_from_method(X) + + assert bins_int.keys() == bins_list.keys() + for key in bins_int: + assert bins_int[key] == bins_list[key]