Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 46 additions & 21 deletions selector/methods/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import collections
import math
from typing import List, Union

import bitarray
import numpy as np
Expand Down Expand Up @@ -64,24 +65,31 @@ class GridPartition(SelectionBase):
"""

def __init__(
self, nbins_axis: int, bin_method: str = "equisized_independent", random_seed: int = 42
self,
nbins_axis: Union[int, List[int]],
bin_method: str = "equisized_independent",
random_seed: int = 42,
):
"""Initialize class.

Parameters
----------
nbins_axis: int
Number of bins to partition each axis into. The total number of resulting bins is
`numb_bins_axis` raised to the power of the dimensionality of the feature space.
nbins_axis: int or list[int]
Number of bins to partition each axis into. If an integer is provided, the same
number of bins is used for all axes. If a list of integers is provided, each
element specifies the number of bins for the corresponding feature axis. The
length of the list must match the number of features in the data.
bin_method: str, optional
Method used to partition the sample points into bins. Options include:
"equisized_independent", "equisized_dependent", "equifrequent_independent" and
"equifrequent_dependent".
random_seed: int, optional
Seed for random selection of sample points from each bin.
"""
if not isinstance(nbins_axis, int):
raise TypeError(f"Number of bins should be integer, got {type(nbins_axis)}.")
if not isinstance(nbins_axis, (int, list, np.ndarray)):
raise TypeError(
f"Number of bins should be integer or list of integers, got {type(nbins_axis)}."
)
if not isinstance(random_seed, int):
raise TypeError(f"The random seed should be integer, got {type(random_seed)}.")
if not isinstance(bin_method, str):
Expand Down Expand Up @@ -211,35 +219,52 @@ def get_bins_from_method(self, X):
# sample) and the values are the list of sample indices in that bin.
bins = {}

# convert nbins_axis into a per-axis array.
if isinstance(self.nbins_axis, int):
nbins_per_axis = np.full(X.shape[1], self.nbins_axis)
else:
nbins_per_axis = np.array(self.nbins_axis)
if len(nbins_per_axis) != X.shape[1]:
raise ValueError(
f"Length of nbins_axis ({len(nbins_per_axis)}) does not match "
f"number of features ({X.shape[1]})."
)

if self.bin_method == "equisized_independent":
# partition each dimension/feature independently into `num_bins_axis` bins
unique_bin_index, inverse_index = self.partition_points_to_bins_equisized(
X, self.nbins_axis
# partition each dimension/feature independently using `nbins_per_axis` bin counts
bins_features = np.zeros(X.shape, dtype=int)
for index_feature in range(X.shape[1]):
unique_bin_index, inverse_index = self.partition_points_to_bins_equisized(
X[:, index_feature], nbins_per_axis[index_feature]
)
bins_features[:, index_feature] = unique_bin_index[inverse_index].flatten()
unique_bin_index, inverse_index = np.unique(
bins_features, return_inverse=True, axis=0
)
# populate bins dictionary
for i, key in enumerate(unique_bin_index):
bins[tuple(key)] = list(np.where(inverse_index == i)[0])

elif self.bin_method == "equisized_dependent":
# partition the first dimension (1st feature axis) into `num_bins_axis` bins
# partition the first dimension (1st feature axis) into `nbins_per_axis[0]` bins
unique_bin_index, inverse_index = self.partition_points_to_bins_equisized(
X[:, 0], self.nbins_axis
X[:, 0], nbins_per_axis[0]
)
# populate bins dictionary based on the 1st feature
for i, key in enumerate(unique_bin_index):
bins[tuple([key])] = list(np.where(inverse_index == i)[0])

# loop over the remaining dimensions (2nd to last feature axis), and for each axis
# partition the points in each bin of the previous axes into `num_bins_axis` bins
# partition the points in each bin of the previous axes into `nbins_per_axis` bins
# as a result, each iteration adds a new dimension to the bins dictionary
for index_feature in range(1, X.shape[1]):
# make a dictionary to store the bins for the current axis
bins_axis = {}
# divide points in each bin into `num_bins_axis` bins based on the i-th feature
# divide points in each bin into `nbins_per_axis` bins based on the i-th feature
for bin, index_samples in bins.items():
# equisized partition of points in bin along i-th feature
unique_bin_index, inverse_index = self.partition_points_to_bins_equisized(
X[index_samples, index_feature], self.nbins_axis
X[index_samples, index_feature], nbins_per_axis[index_feature]
)
# update the bins_axis to include the new dimension/feature for the current bin
for i, bin_index in enumerate(unique_bin_index):
Expand All @@ -251,11 +276,11 @@ def get_bins_from_method(self, X):
bins = bins_axis

elif self.bin_method == "equifrequent_independent":
# partition each dimension of feature space independently into `num_bins_axis` bins
# partition each dimension of feature space independently into `nbins_per_axis` bins
bins_features = np.zeros(X.shape, dtype=int)
for index_feature in range(0, X.shape[1]):
unique_bin_index, inverse_index = self.partition_points_to_bins_equifrequent(
X[:, index_feature], self.nbins_axis
X[:, index_feature], nbins_per_axis[index_feature]
)
bins_features[:, index_feature] = unique_bin_index[inverse_index]
unique_bin_index, inverse_index = np.unique(bins_features, return_inverse=True, axis=0)
Expand All @@ -265,24 +290,24 @@ def get_bins_from_method(self, X):
bins[tuple(key)] = list(np.where(inverse_index == i)[0])

elif self.bin_method == "equifrequent_dependent":
# partition the first dimension (1st feature axis) into `num_bins_axis` bins
# partition the first dimension (1st feature axis) into `nbins_per_axis[0]` bins
unique_bin_index, inverse_index = self.partition_points_to_bins_equifrequent(
X[:, 0], self.nbins_axis
X[:, 0], nbins_per_axis[0]
)
# populate bins dictionary based on the 1st feature
for i, key in enumerate(unique_bin_index):
bins[tuple([key])] = list(np.where(inverse_index == i)[0])

# loop over the remaining dimensions (2nd to last feature axis), and for each axis
# partition the points in each bin of the previous axes into `num_bins_axis` bins
# partition the points in each bin of the previous axes into `nbins_per_axis` bins
for index_feature in range(1, X.shape[1]):
# make a dictionary to store the bins for the current axis
bins_axis = {}
# divide points in each bin, based on the i-th feature, into `num_bins_axis` bins
# divide points in each bin, based on the i-th feature, into `nbins_per_axis` bins
for bin, index_samples in bins.items():
# equifrequent partition of points in bin along i-th feature
unique_bin_index, inverse_index = self.partition_points_to_bins_equifrequent(
X[index_samples, index_feature], self.nbins_axis
X[index_samples, index_feature], nbins_per_axis[index_feature]
)
# update the bins_axis to include the new dimension/feature for the current bin
for i, key in enumerate(unique_bin_index):
Expand Down
62 changes: 62 additions & 0 deletions selector/methods/tests/test_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,65 @@ def test_medoid():
selector = Medoid()
selected_ids = selector.select(features, size=2)
assert_equal(selected_ids, [0, 3])


def test_grid_partition_per_axis_bins_initialization():
"""Test initializing GridPartition with per-axis bin counts."""
# Valid list input
selector = GridPartition(nbins_axis=[3, 5])
assert selector.nbins_axis == [3, 5]

# Valid int input
selector = GridPartition(nbins_axis=5)
assert selector.nbins_axis == 5


@pytest.mark.parametrize("method", [
"equisized_independent",
"equisized_dependent",
"equifrequent_independent",
"equifrequent_dependent",
])
def test_grid_partition_per_axis_bins_execution(method):
"""Test execution of all partitioning methods with per-axis bin counts."""
rng = np.random.default_rng(42)
X = rng.standard_normal((100, 2))

nbins_axis = [3, 7]
selector = GridPartition(nbins_axis=nbins_axis, bin_method=method)

# Should not raise any errors
bins = selector.get_bins_from_method(X)

# Every sample must appear in exactly one bin
all_indices = []
for sample_list in bins.values():
all_indices.extend(sample_list)
assert sorted(all_indices) == list(range(100))


def test_grid_partition_per_axis_mismatch():
"""Test that ValueError is raised when nbins_axis length doesn't match features."""
rng = np.random.default_rng(42)
X = rng.standard_normal((100, 2))

# 3 bin counts for 2 features should fail
selector = GridPartition(nbins_axis=[2, 5, 10])
with assert_raises(ValueError):
selector.get_bins_from_method(X)


def test_grid_partition_backward_compatibility():
"""Ensure single integer nbins_axis produces identical results to new list form."""
rng = np.random.default_rng(42)
X = rng.standard_normal((50, 2))

selector_int = GridPartition(nbins_axis=5, bin_method="equifrequent_independent")
selector_list = GridPartition(nbins_axis=[5, 5], bin_method="equifrequent_independent")

bins_int = selector_int.get_bins_from_method(X)
bins_list = selector_list.get_bins_from_method(X)

assert bins_int.keys() == bins_list.keys()
for key in bins_int:
assert bins_int[key] == bins_list[key]
Loading