From b0f7aac09421719544e946aa01470ce66114372c Mon Sep 17 00:00:00 2001 From: jvaquet Date: Thu, 15 May 2025 18:52:17 +0200 Subject: [PATCH 01/14] Added first version SAMkNN Classifier --- river/neighbors/__init__.py | 2 + river/neighbors/samknn_classifier.py | 401 +++++++++++++++++++++++++++ 2 files changed, 403 insertions(+) create mode 100644 river/neighbors/samknn_classifier.py diff --git a/river/neighbors/__init__.py b/river/neighbors/__init__.py index 8d8211d9e9..8bf0921514 100644 --- a/river/neighbors/__init__.py +++ b/river/neighbors/__init__.py @@ -11,10 +11,12 @@ from .knn_classifier import KNNClassifier from .knn_regressor import KNNRegressor from .lazy import LazySearch +from .samknn_classifier import SAMkNNClassifier __all__ = [ "LazySearch", "KNNClassifier", + "SAMkNNClassifier", "KNNRegressor", "SWINN", ] diff --git a/river/neighbors/samknn_classifier.py b/river/neighbors/samknn_classifier.py new file mode 100644 index 0000000000..5446e08a90 --- /dev/null +++ b/river/neighbors/samknn_classifier.py @@ -0,0 +1,401 @@ +from __future__ import annotations + +import collections +import functools +import operator +import typing + +from sklearn.cluster import KMeans + +from river import base, utils + +from .base import FunctionWrapper, DistanceFunc + + + +class SAMkNNClassifier(base.Classifier): + """Self Adjusting Memory k-Nearest Neighbors (SAMkNN) for classification. + + High level description. + + Parameters + ---------- + n_neighbors. + Number of neighbors to use for the underlying k nearest neighbor + classifier. + max_mem_size. + Maximum size of the Short and Long Term Memory combined. + max_ltm_size. + Maximum size of the Long Term Memory. If LTM reaches this size, it is + compressed. + min_stm_size. + Minimum size of the Short Term Memory. Smaller sizes will not be + considered while calculating optimal STM size. + weighted. + Use distance weighted kNN. If turned off majority voting is used. + softmax. + Apply softmax on the output probabilities. + dist_func. + Distance function to use for the k nearest neighbor classifier. + recalculate_stm_error. + Disables a heuristic that incrementally computes the interleaved-test- + then-train accuracy for the optimal STM size estimation. Activating this + increases runtime but may result in slightly better model performance. + + Notes + ----- + As the LTM compression mechanism uses kmeans, SAM-kNN only works with + nummerical features and every datapoint is required to have a value for + every feature. + + Examples + -------- + >>> from river import evaluate, metrics + >>> from river.datasets import Bananas + >>> from river.neighbors import SAMkNNClassifier + + >>> samknn = SAMkNNClassifier() + >>> dataset = Bananas() + + >>> evaluate.progressive_val_score(dataset, samknn, metrics.Accuracy()) + """ + + def __init__(self, + n_neighbors: int = 5, + max_mem_size: int = 100, + max_ltm_size: int = 50, + min_stm_size: int = 10, + weighted: bool = True, + softmax: bool = False, + dist_func: DistanceFunc | FunctionWrapper | None = None, + recalculate_stm_error: bool = False): + + self.n_neighbors = n_neighbors + self.max_mem_size = max_mem_size + self.max_ltm_size = max_ltm_size + self.min_stm_size = min_stm_size + self.weighted = weighted + self.softmax = softmax + + self.classes: set[base.typing.ClfTarget] = set() + self.weights: typing.Dict[str, int] = { + 'stm': 0, + 'ltm': 0, + 'cm': 0 + } + + if dist_func is None: + dist_func = functools.partial(utils.math.minkowski_distance, p=2) + if not isinstance(dist_func, FunctionWrapper): + dist_func = FunctionWrapper(dist_func) + + self.stm = SAMkNNShortTermMemory(n_neighbors=self.n_neighbors, + dist_func=dist_func, + min_stm_size=self.min_stm_size, + weighted=self.weighted, + recalculate_stm_error=recalculate_stm_error) + self.ltm = SAMkNNLongTermMemory(self.n_neighbors, + dist_func=dist_func) + + @property + def _multiclass(self): + return True + + def learn_one(self, x, y, **kwargs): + self.classes.add(y) + + # Update memory weights + for memory in self.weights.keys(): + self.weights[memory] += (self.predict_one(x, memory=memory) == y) + + # Append (x, y) to STM + self.stm.append((x, y)) + + # Check if max memory size is exceeded + if self.stm.size() + self.ltm.size() > self.max_mem_size: + + # Transfer items from STM to LTM and compress LTM + n_items_to_transfer = self.max_ltm_size - self.ltm.size() + for item in self.stm.pop_n(n_items_to_transfer): + self.ltm.append(item) + self.ltm.compress() + + # Clean LTM with (x, y) + clean_dist = self.stm.get_clean_distance((x, y)) + self.ltm.clean((x, y), clean_dist) + + # Determine optimal STM size + optimal_stm_size = self.stm.optimial_size() + if optimal_stm_size != self.stm.size(): + + # Transfer items to LTM to achieve optimal STM size + n_items_to_transfer = self.stm.size() - optimal_stm_size + new_ltm_items = [] + for item in self.stm.pop_n(n_items_to_transfer): + new_ltm_items.append(item) + + # Clean new LTM samples before appending + cleaned_new_ltm_items = [new_ltm_item for + new_ltm_item in new_ltm_items + if all([ + clean_dist == 0 + or new_ltm_item[1] != stm_item[1] + or self.ltm.dist_func(new_ltm_item, stm_item) > clean_dist + for stm_item, clean_dist in zip(self.stm, map(self.stm.get_clean_distance, self.stm))])] + + self.ltm.append(cleaned_new_ltm_items) + + def predict_proba_one(self, x, memory=None, **kwargs): + + # Select memory by weight, if none is specified + if memory is None: + memory = max(self.weights, key=self.weights.get) + + # Make predictions using the selected memory + if memory == 'stm': + nearest = self.stm.search((x, None)) + elif memory == 'ltm': + nearest = self.ltm.search((x, None)) + else: + nearest_stm = self.stm.search((x, None)) + nearest_ltm = self.ltm.search((x, None)) + nearest = sorted(nearest_stm + nearest_ltm, key=operator.itemgetter(1))[:self.n_neighbors] + + # Create probability for each known class + probas = {c: 0.0 for c in self.classes} + + # If no neighbors are found, return a uniform distribution + if not nearest: + return {cls: 1 / len(self.classes) for cls in self.classes} + + # Add up unnormalized probas + for item, dist in nearest: + probas[item[1]] += 1/dist if self.weighted else 1 + + # If softmax is enabled, return softmax probas + if self.softmax: + return utils.math.softmax(probas) + + # Return normalized probas + return {cls: proba/sum(probas.values()) for cls, proba in probas.items()} + + +class SAMkNNMemory: + + def __init__(self, + n_neighbors: int, + dist_func: FunctionWrapper): + self.n_neighbors = n_neighbors + self.dist_func = dist_func + + self.items: typing.List[typing.Tuple[dict, base.typing.ClfTarget]] = [] + self.last_search_item: typing.Tuple[dict, base.typing.ClfTarget] = None + + def append(self, + item: typing.List[typing.Tuple[dict, base.typing.ClfTarget]] | + typing.Tuple[dict, base.typing.ClfTarget]): + if isinstance(item, list): + self.items += item + else: + self.items.append(item) + + self.last_search_item = None + + def size(self): + return len(self.items) + + def search(self, + item: typing.Tuple[dict, base.typing.ClfTarget], + n_neighbors: int | None = None): + + # If search result is cached, return it + if self.last_search_item is not None and self.last_search_item == item: + return self.last_search_result + + if n_neighbors is None: + n_neighbors = self.n_neighbors + + # Find nearest neighbors + items_distances = ((p, self.dist_func(item, p)) for p in self.items) + search_result = sorted(items_distances, + key=operator.itemgetter(1))[:n_neighbors] + + # Cache this search result + self.last_search_item = item + self.last_search_result = search_result + + return search_result + + +class SAMkNNShortTermMemory(SAMkNNMemory): + + def __init__(self, + n_neighbors: int, + dist_func: FunctionWrapper, + min_stm_size: int, + weighted: bool, + recalculate_stm_error: bool): + super().__init__(n_neighbors, dist_func) + self.min_stm_size = min_stm_size + self.weighted = weighted + self.recalculate_stm_error = recalculate_stm_error + + self.prediction_histories: typing.List[bool] = {} + + def pop_n(self, n: int): + + # Invalidate cache and prediction histories as items are changed + self.last_search_item = None + self.prediction_histories = {} + + for _ in range(n): + yield self.items.pop(0) + + def __iter__(self): + yield from self.items + + def get_clean_distance(self, + item: typing.Tuple[dict, base.typing.ClfTarget]): + + # As item itself is included in window, + # search for self.n_neighbors+1 neighbors + nearest = self.search(item, n_neighbors=self.n_neighbors+1) + furthest_distance_same_label = max([item_dist[1] + for item_dist in nearest + if item_dist[0][1] == item[1]]) + + return furthest_distance_same_label + + def partial_interleaved_test_train_error(self, size: int): + + start_idx = len(self.items)-size + + if start_idx in self.prediction_histories.keys(): + + # Make new prediction and append to prediction history + item = self.items[-1] + items_distances = ((p, self.dist_func(item, p)) + for p in self.items[start_idx:-1]) + nearest = sorted(items_distances, + key=operator.itemgetter(1))[:self.n_neighbors] + + probas = collections.defaultdict(lambda: 0) + for item, dist in nearest: + probas[item[1]] += 1/dist if self.weighted else 1 + prediction = max(probas, key=probas.get) + self.prediction_histories[start_idx].append(prediction == item[1]) + + elif start_idx-1 in self.prediction_histories.keys() \ + and not self.recalculate_stm_error: + + # Use prediction history with start shifted by 1 + self.prediction_histories[start_idx] = self.prediction_histories[start_idx-1] + del self.prediction_histories[start_idx-1] + self.prediction_histories[start_idx].pop(0) + + # Make new prediction and append to prediction history + item = self.items[-1] + items_distances = ((p, self.dist_func(item, p)) + for p in self.items[start_idx:-1]) + nearest = sorted(items_distances, + key=operator.itemgetter(1))[:self.n_neighbors] + + probas = collections.defaultdict(lambda: 0) + for item, dist in nearest: + probas[item[1]] += 1/dist if self.weighted else 1 + prediction = max(probas, key=probas.get) + self.prediction_histories[start_idx].append(prediction == item[1]) + + else: + # Generate new Prediction history from scratch + self.prediction_histories[start_idx] = [] + for cur_idx in range(start_idx+1, len(self.items)): + item = self.items[cur_idx] + items_distances = ((p, self.dist_func(item, p)) + for p in self.items[start_idx:cur_idx]) + nearest = sorted(items_distances, + key=operator.itemgetter(1))[:self.n_neighbors] + probas = collections.defaultdict(lambda: 0) + for item, dist in nearest: + probas[item[1]] += 1/dist if self.weighted else 1 + + prediction = max(probas, key=probas.get) + self.prediction_histories[start_idx].append(prediction == item[1]) + + # Return interleaved-test-then-train accuracy + return sum(self.prediction_histories[start_idx]) / len(self.prediction_histories[start_idx]) + + def optimial_size(self): + + # Generate candidate sizes using repeated halving + candidate_sizes = [] + cur_candidate_size = len(self.items) + while cur_candidate_size > self.min_stm_size: + candidate_sizes.append(cur_candidate_size) + cur_candidate_size //= 2 + + # If no alternative candidate sizes exist, return the current size + if len(candidate_sizes) <= 1: + return self.size() + + # Score all candidate sizes + candidate_sizes_scores = {size: self.partial_interleaved_test_train_error(size) + for size in candidate_sizes} + + # Delete unused prediction histories if necessary + if self.recalculate_stm_error: + for start_idx in list(self.prediction_histories.keys()): + if len(self.items) - start_idx not in candidate_sizes: + del self.prediction_histories[start_idx] + + best_size = max(candidate_sizes_scores, key=candidate_sizes_scores.get) + return best_size + + +class SAMkNNLongTermMemory(SAMkNNMemory): + + def compress(self): + + # Invalidate search cache, as items are compressed + self.last_search_item = None + + # Class-wise, generate compressed items using clustering + compressed_items = [] + classes = collections.Counter(sample[1] for sample in self.items) + for cls, cls_count in classes.items(): + + # Convert dict to lists + fields, values = zip(*[tuple(zip(*item[0].items())) + for item in self.items + if item[1] == cls]) + + # Ensure that all items have the same features + fields = set(fields) + assert len(fields) == 1, "Not all datapoints have the same fields. Can not compress LTM!" + fields = fields.pop() + + # Generate and add compressed data + kmeans = KMeans(n_clusters=max(1, cls_count//2), random_state=0) + kmeans.fit(values) + compressed_items += [({cur_field: cur_value for cur_field, cur_value in zip(fields, cur_values)}, cls) + for cur_values in kmeans.cluster_centers_] + + # Overwrite items with compressed items + self.items = compressed_items + + def clean(self, + item: typing.Tuple[dict, base.typing.ClfTarget], + clean_dist: float): + + # If the clean distance is 0, nothing needs to be done + if clean_dist == 0: + return + + # Clean items + self.items = [cur_item for cur_item in self.items + if cur_item[1] != item[1] + or self.dist_func(cur_item, item) > clean_dist] + + # Invalidate search cache as the items are changed + self.last_search_item = None + \ No newline at end of file From 8958a11e1984561ac429fb5698e1e62e8f723a2b Mon Sep 17 00:00:00 2001 From: jvaquet Date: Thu, 15 May 2025 18:56:24 +0200 Subject: [PATCH 02/14] Automatic formatting --- river/neighbors/samknn_classifier.py | 258 +++++++++++++-------------- 1 file changed, 121 insertions(+), 137 deletions(-) diff --git a/river/neighbors/samknn_classifier.py b/river/neighbors/samknn_classifier.py index 5446e08a90..e53442385d 100644 --- a/river/neighbors/samknn_classifier.py +++ b/river/neighbors/samknn_classifier.py @@ -3,14 +3,12 @@ import collections import functools import operator -import typing from sklearn.cluster import KMeans from river import base, utils -from .base import FunctionWrapper, DistanceFunc - +from .base import DistanceFunc, FunctionWrapper class SAMkNNClassifier(base.Classifier): @@ -21,7 +19,7 @@ class SAMkNNClassifier(base.Classifier): Parameters ---------- n_neighbors. - Number of neighbors to use for the underlying k nearest neighbor + Number of neighbors to use for the underlying k nearest neighbor classifier. max_mem_size. Maximum size of the Short and Long Term Memory combined. @@ -29,8 +27,8 @@ class SAMkNNClassifier(base.Classifier): Maximum size of the Long Term Memory. If LTM reaches this size, it is compressed. min_stm_size. - Minimum size of the Short Term Memory. Smaller sizes will not be - considered while calculating optimal STM size. + Minimum size of the Short Term Memory. Smaller sizes will not be + considered while calculating optimal STM size. weighted. Use distance weighted kNN. If turned off majority voting is used. softmax. @@ -44,8 +42,8 @@ class SAMkNNClassifier(base.Classifier): Notes ----- - As the LTM compression mechanism uses kmeans, SAM-kNN only works with - nummerical features and every datapoint is required to have a value for + As the LTM compression mechanism uses kmeans, SAM-kNN only works with + nummerical features and every datapoint is required to have a value for every feature. Examples @@ -60,42 +58,40 @@ class SAMkNNClassifier(base.Classifier): >>> evaluate.progressive_val_score(dataset, samknn, metrics.Accuracy()) """ - def __init__(self, - n_neighbors: int = 5, - max_mem_size: int = 100, - max_ltm_size: int = 50, - min_stm_size: int = 10, - weighted: bool = True, - softmax: bool = False, - dist_func: DistanceFunc | FunctionWrapper | None = None, - recalculate_stm_error: bool = False): - + def __init__( + self, + n_neighbors: int = 5, + max_mem_size: int = 100, + max_ltm_size: int = 50, + min_stm_size: int = 10, + weighted: bool = True, + softmax: bool = False, + dist_func: DistanceFunc | FunctionWrapper | None = None, + recalculate_stm_error: bool = False, + ): self.n_neighbors = n_neighbors self.max_mem_size = max_mem_size self.max_ltm_size = max_ltm_size self.min_stm_size = min_stm_size self.weighted = weighted self.softmax = softmax - + self.classes: set[base.typing.ClfTarget] = set() - self.weights: typing.Dict[str, int] = { - 'stm': 0, - 'ltm': 0, - 'cm': 0 - } + self.weights: dict[str, int] = {"stm": 0, "ltm": 0, "cm": 0} if dist_func is None: dist_func = functools.partial(utils.math.minkowski_distance, p=2) if not isinstance(dist_func, FunctionWrapper): dist_func = FunctionWrapper(dist_func) - - self.stm = SAMkNNShortTermMemory(n_neighbors=self.n_neighbors, - dist_func=dist_func, - min_stm_size=self.min_stm_size, - weighted=self.weighted, - recalculate_stm_error=recalculate_stm_error) - self.ltm = SAMkNNLongTermMemory(self.n_neighbors, - dist_func=dist_func) + + self.stm = SAMkNNShortTermMemory( + n_neighbors=self.n_neighbors, + dist_func=dist_func, + min_stm_size=self.min_stm_size, + weighted=self.weighted, + recalculate_stm_error=recalculate_stm_error, + ) + self.ltm = SAMkNNLongTermMemory(self.n_neighbors, dist_func=dist_func) @property def _multiclass(self): @@ -106,20 +102,19 @@ def learn_one(self, x, y, **kwargs): # Update memory weights for memory in self.weights.keys(): - self.weights[memory] += (self.predict_one(x, memory=memory) == y) + self.weights[memory] += self.predict_one(x, memory=memory) == y # Append (x, y) to STM self.stm.append((x, y)) # Check if max memory size is exceeded if self.stm.size() + self.ltm.size() > self.max_mem_size: - # Transfer items from STM to LTM and compress LTM n_items_to_transfer = self.max_ltm_size - self.ltm.size() for item in self.stm.pop_n(n_items_to_transfer): self.ltm.append(item) self.ltm.compress() - + # Clean LTM with (x, y) clean_dist = self.stm.get_clean_distance((x, y)) self.ltm.clean((x, y), clean_dist) @@ -127,7 +122,6 @@ def learn_one(self, x, y, **kwargs): # Determine optimal STM size optimal_stm_size = self.stm.optimial_size() if optimal_stm_size != self.stm.size(): - # Transfer items to LTM to achieve optimal STM size n_items_to_transfer = self.stm.size() - optimal_stm_size new_ltm_items = [] @@ -135,32 +129,40 @@ def learn_one(self, x, y, **kwargs): new_ltm_items.append(item) # Clean new LTM samples before appending - cleaned_new_ltm_items = [new_ltm_item for - new_ltm_item in new_ltm_items - if all([ - clean_dist == 0 - or new_ltm_item[1] != stm_item[1] - or self.ltm.dist_func(new_ltm_item, stm_item) > clean_dist - for stm_item, clean_dist in zip(self.stm, map(self.stm.get_clean_distance, self.stm))])] - + cleaned_new_ltm_items = [ + new_ltm_item + for new_ltm_item in new_ltm_items + if all( + [ + clean_dist == 0 + or new_ltm_item[1] != stm_item[1] + or self.ltm.dist_func(new_ltm_item, stm_item) > clean_dist + for stm_item, clean_dist in zip( + self.stm, map(self.stm.get_clean_distance, self.stm) + ) + ] + ) + ] + self.ltm.append(cleaned_new_ltm_items) def predict_proba_one(self, x, memory=None, **kwargs): - # Select memory by weight, if none is specified - if memory is None: + if memory is None: memory = max(self.weights, key=self.weights.get) # Make predictions using the selected memory - if memory == 'stm': + if memory == "stm": nearest = self.stm.search((x, None)) - elif memory == 'ltm': + elif memory == "ltm": nearest = self.ltm.search((x, None)) else: nearest_stm = self.stm.search((x, None)) nearest_ltm = self.ltm.search((x, None)) - nearest = sorted(nearest_stm + nearest_ltm, key=operator.itemgetter(1))[:self.n_neighbors] - + nearest = sorted(nearest_stm + nearest_ltm, key=operator.itemgetter(1))[ + : self.n_neighbors + ] + # Create probability for each known class probas = {c: 0.0 for c in self.classes} @@ -170,80 +172,73 @@ def predict_proba_one(self, x, memory=None, **kwargs): # Add up unnormalized probas for item, dist in nearest: - probas[item[1]] += 1/dist if self.weighted else 1 + probas[item[1]] += 1 / dist if self.weighted else 1 # If softmax is enabled, return softmax probas if self.softmax: return utils.math.softmax(probas) # Return normalized probas - return {cls: proba/sum(probas.values()) for cls, proba in probas.items()} + return {cls: proba / sum(probas.values()) for cls, proba in probas.items()} class SAMkNNMemory: - - def __init__(self, - n_neighbors: int, - dist_func: FunctionWrapper): + def __init__(self, n_neighbors: int, dist_func: FunctionWrapper): self.n_neighbors = n_neighbors self.dist_func = dist_func - self.items: typing.List[typing.Tuple[dict, base.typing.ClfTarget]] = [] - self.last_search_item: typing.Tuple[dict, base.typing.ClfTarget] = None + self.items: list[tuple[dict, base.typing.ClfTarget]] = [] + self.last_search_item: tuple[dict, base.typing.ClfTarget] = None - def append(self, - item: typing.List[typing.Tuple[dict, base.typing.ClfTarget]] | - typing.Tuple[dict, base.typing.ClfTarget]): + def append( + self, item: list[tuple[dict, base.typing.ClfTarget]] | tuple[dict, base.typing.ClfTarget] + ): if isinstance(item, list): self.items += item else: self.items.append(item) - + self.last_search_item = None def size(self): return len(self.items) - def search(self, - item: typing.Tuple[dict, base.typing.ClfTarget], - n_neighbors: int | None = None): - + def search(self, item: tuple[dict, base.typing.ClfTarget], n_neighbors: int | None = None): # If search result is cached, return it if self.last_search_item is not None and self.last_search_item == item: return self.last_search_result - + if n_neighbors is None: n_neighbors = self.n_neighbors # Find nearest neighbors items_distances = ((p, self.dist_func(item, p)) for p in self.items) - search_result = sorted(items_distances, - key=operator.itemgetter(1))[:n_neighbors] + search_result = sorted(items_distances, key=operator.itemgetter(1))[:n_neighbors] # Cache this search result self.last_search_item = item self.last_search_result = search_result - return search_result + return search_result class SAMkNNShortTermMemory(SAMkNNMemory): - - def __init__(self, - n_neighbors: int, - dist_func: FunctionWrapper, - min_stm_size: int, - weighted: bool, - recalculate_stm_error: bool): + def __init__( + self, + n_neighbors: int, + dist_func: FunctionWrapper, + min_stm_size: int, + weighted: bool, + recalculate_stm_error: bool, + ): super().__init__(n_neighbors, dist_func) self.min_stm_size = min_stm_size self.weighted = weighted self.recalculate_stm_error = recalculate_stm_error - self.prediction_histories: typing.List[bool] = {} + self.prediction_histories: list[bool] = {} def pop_n(self, n: int): - # Invalidate cache and prediction histories as items are changed self.last_search_item = None self.prediction_histories = {} @@ -254,70 +249,60 @@ def pop_n(self, n: int): def __iter__(self): yield from self.items - def get_clean_distance(self, - item: typing.Tuple[dict, base.typing.ClfTarget]): - - # As item itself is included in window, + def get_clean_distance(self, item: tuple[dict, base.typing.ClfTarget]): + # As item itself is included in window, # search for self.n_neighbors+1 neighbors - nearest = self.search(item, n_neighbors=self.n_neighbors+1) - furthest_distance_same_label = max([item_dist[1] - for item_dist in nearest - if item_dist[0][1] == item[1]]) - + nearest = self.search(item, n_neighbors=self.n_neighbors + 1) + furthest_distance_same_label = max( + [item_dist[1] for item_dist in nearest if item_dist[0][1] == item[1]] + ) + return furthest_distance_same_label - - def partial_interleaved_test_train_error(self, size: int): - start_idx = len(self.items)-size + def partial_interleaved_test_train_error(self, size: int): + start_idx = len(self.items) - size if start_idx in self.prediction_histories.keys(): - # Make new prediction and append to prediction history item = self.items[-1] - items_distances = ((p, self.dist_func(item, p)) - for p in self.items[start_idx:-1]) - nearest = sorted(items_distances, - key=operator.itemgetter(1))[:self.n_neighbors] + items_distances = ((p, self.dist_func(item, p)) for p in self.items[start_idx:-1]) + nearest = sorted(items_distances, key=operator.itemgetter(1))[: self.n_neighbors] probas = collections.defaultdict(lambda: 0) for item, dist in nearest: - probas[item[1]] += 1/dist if self.weighted else 1 + probas[item[1]] += 1 / dist if self.weighted else 1 prediction = max(probas, key=probas.get) self.prediction_histories[start_idx].append(prediction == item[1]) - elif start_idx-1 in self.prediction_histories.keys() \ - and not self.recalculate_stm_error: - + elif start_idx - 1 in self.prediction_histories.keys() and not self.recalculate_stm_error: # Use prediction history with start shifted by 1 - self.prediction_histories[start_idx] = self.prediction_histories[start_idx-1] - del self.prediction_histories[start_idx-1] + self.prediction_histories[start_idx] = self.prediction_histories[start_idx - 1] + del self.prediction_histories[start_idx - 1] self.prediction_histories[start_idx].pop(0) # Make new prediction and append to prediction history item = self.items[-1] - items_distances = ((p, self.dist_func(item, p)) - for p in self.items[start_idx:-1]) - nearest = sorted(items_distances, - key=operator.itemgetter(1))[:self.n_neighbors] + items_distances = ((p, self.dist_func(item, p)) for p in self.items[start_idx:-1]) + nearest = sorted(items_distances, key=operator.itemgetter(1))[: self.n_neighbors] probas = collections.defaultdict(lambda: 0) for item, dist in nearest: - probas[item[1]] += 1/dist if self.weighted else 1 + probas[item[1]] += 1 / dist if self.weighted else 1 prediction = max(probas, key=probas.get) self.prediction_histories[start_idx].append(prediction == item[1]) else: # Generate new Prediction history from scratch self.prediction_histories[start_idx] = [] - for cur_idx in range(start_idx+1, len(self.items)): + for cur_idx in range(start_idx + 1, len(self.items)): item = self.items[cur_idx] - items_distances = ((p, self.dist_func(item, p)) - for p in self.items[start_idx:cur_idx]) - nearest = sorted(items_distances, - key=operator.itemgetter(1))[:self.n_neighbors] + items_distances = ( + (p, self.dist_func(item, p)) for p in self.items[start_idx:cur_idx] + ) + nearest = sorted(items_distances, key=operator.itemgetter(1))[: self.n_neighbors] probas = collections.defaultdict(lambda: 0) for item, dist in nearest: - probas[item[1]] += 1/dist if self.weighted else 1 + probas[item[1]] += 1 / dist if self.weighted else 1 prediction = max(probas, key=probas.get) self.prediction_histories[start_idx].append(prediction == item[1]) @@ -326,21 +311,21 @@ def partial_interleaved_test_train_error(self, size: int): return sum(self.prediction_histories[start_idx]) / len(self.prediction_histories[start_idx]) def optimial_size(self): - # Generate candidate sizes using repeated halving candidate_sizes = [] cur_candidate_size = len(self.items) while cur_candidate_size > self.min_stm_size: candidate_sizes.append(cur_candidate_size) cur_candidate_size //= 2 - + # If no alternative candidate sizes exist, return the current size if len(candidate_sizes) <= 1: return self.size() - + # Score all candidate sizes - candidate_sizes_scores = {size: self.partial_interleaved_test_train_error(size) - for size in candidate_sizes} + candidate_sizes_scores = { + size: self.partial_interleaved_test_train_error(size) for size in candidate_sizes + } # Delete unused prediction histories if necessary if self.recalculate_stm_error: @@ -353,9 +338,7 @@ def optimial_size(self): class SAMkNNLongTermMemory(SAMkNNMemory): - def compress(self): - # Invalidate search cache, as items are compressed self.last_search_item = None @@ -363,39 +346,40 @@ def compress(self): compressed_items = [] classes = collections.Counter(sample[1] for sample in self.items) for cls, cls_count in classes.items(): - # Convert dict to lists - fields, values = zip(*[tuple(zip(*item[0].items())) - for item in self.items - if item[1] == cls]) + fields, values = zip( + *[tuple(zip(*item[0].items())) for item in self.items if item[1] == cls] + ) # Ensure that all items have the same features fields = set(fields) - assert len(fields) == 1, "Not all datapoints have the same fields. Can not compress LTM!" + assert ( + len(fields) == 1 + ), "Not all datapoints have the same fields. Can not compress LTM!" fields = fields.pop() # Generate and add compressed data - kmeans = KMeans(n_clusters=max(1, cls_count//2), random_state=0) + kmeans = KMeans(n_clusters=max(1, cls_count // 2), random_state=0) kmeans.fit(values) - compressed_items += [({cur_field: cur_value for cur_field, cur_value in zip(fields, cur_values)}, cls) - for cur_values in kmeans.cluster_centers_] + compressed_items += [ + ({cur_field: cur_value for cur_field, cur_value in zip(fields, cur_values)}, cls) + for cur_values in kmeans.cluster_centers_ + ] # Overwrite items with compressed items self.items = compressed_items - def clean(self, - item: typing.Tuple[dict, base.typing.ClfTarget], - clean_dist: float): - + def clean(self, item: tuple[dict, base.typing.ClfTarget], clean_dist: float): # If the clean distance is 0, nothing needs to be done if clean_dist == 0: return - + # Clean items - self.items = [cur_item for cur_item in self.items - if cur_item[1] != item[1] - or self.dist_func(cur_item, item) > clean_dist] + self.items = [ + cur_item + for cur_item in self.items + if cur_item[1] != item[1] or self.dist_func(cur_item, item) > clean_dist + ] # Invalidate search cache as the items are changed self.last_search_item = None - \ No newline at end of file From f127c1fc8b91a2dcef6930c90979e6e8f237d5af Mon Sep 17 00:00:00 2001 From: jvaquet Date: Thu, 15 May 2025 19:05:04 +0200 Subject: [PATCH 03/14] Fixed linter errors --- river/neighbors/samknn_classifier.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/river/neighbors/samknn_classifier.py b/river/neighbors/samknn_classifier.py index e53442385d..c12826e0ff 100644 --- a/river/neighbors/samknn_classifier.py +++ b/river/neighbors/samknn_classifier.py @@ -188,7 +188,10 @@ def __init__(self, n_neighbors: int, dist_func: FunctionWrapper): self.dist_func = dist_func self.items: list[tuple[dict, base.typing.ClfTarget]] = [] - self.last_search_item: tuple[dict, base.typing.ClfTarget] = None + self.last_search_item: tuple[dict, base.typing.ClfTarget] | None = None + self.last_search_result: list[tuple[tuple[dict, base.typing.ClfTarget], float]] | None = ( + None + ) def append( self, item: list[tuple[dict, base.typing.ClfTarget]] | tuple[dict, base.typing.ClfTarget] @@ -236,7 +239,7 @@ def __init__( self.weighted = weighted self.recalculate_stm_error = recalculate_stm_error - self.prediction_histories: list[bool] = {} + self.prediction_histories: dict[int, list[bool]] = {} def pop_n(self, n: int): # Invalidate cache and prediction histories as items are changed @@ -268,10 +271,10 @@ def partial_interleaved_test_train_error(self, size: int): items_distances = ((p, self.dist_func(item, p)) for p in self.items[start_idx:-1]) nearest = sorted(items_distances, key=operator.itemgetter(1))[: self.n_neighbors] - probas = collections.defaultdict(lambda: 0) + probas: dict[base.typing.ClfTarget, float] = collections.defaultdict(lambda: 0) for item, dist in nearest: probas[item[1]] += 1 / dist if self.weighted else 1 - prediction = max(probas, key=probas.get) + prediction = max(probas, key=probas.__getitem__) self.prediction_histories[start_idx].append(prediction == item[1]) elif start_idx - 1 in self.prediction_histories.keys() and not self.recalculate_stm_error: @@ -288,7 +291,7 @@ def partial_interleaved_test_train_error(self, size: int): probas = collections.defaultdict(lambda: 0) for item, dist in nearest: probas[item[1]] += 1 / dist if self.weighted else 1 - prediction = max(probas, key=probas.get) + prediction = max(probas, key=probas.__getitem__) self.prediction_histories[start_idx].append(prediction == item[1]) else: @@ -304,7 +307,7 @@ def partial_interleaved_test_train_error(self, size: int): for item, dist in nearest: probas[item[1]] += 1 / dist if self.weighted else 1 - prediction = max(probas, key=probas.get) + prediction = max(probas, key=probas.__getitem__) self.prediction_histories[start_idx].append(prediction == item[1]) # Return interleaved-test-then-train accuracy From ad3c91b7d27298e4a27dd7de1e70d87a90986ed9 Mon Sep 17 00:00:00 2001 From: jvaquet Date: Fri, 16 May 2025 14:26:16 +0200 Subject: [PATCH 04/14] Added dist_func and recalculate_stm_error to classifier attirbutes --- river/neighbors/samknn_classifier.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/river/neighbors/samknn_classifier.py b/river/neighbors/samknn_classifier.py index c12826e0ff..c1a92e0ade 100644 --- a/river/neighbors/samknn_classifier.py +++ b/river/neighbors/samknn_classifier.py @@ -75,23 +75,25 @@ def __init__( self.min_stm_size = min_stm_size self.weighted = weighted self.softmax = softmax + self.dist_func = dist_func + self.recalculate_stm_error = recalculate_stm_error self.classes: set[base.typing.ClfTarget] = set() self.weights: dict[str, int] = {"stm": 0, "ltm": 0, "cm": 0} - if dist_func is None: - dist_func = functools.partial(utils.math.minkowski_distance, p=2) - if not isinstance(dist_func, FunctionWrapper): - dist_func = FunctionWrapper(dist_func) + if self.dist_func is None: + self.dist_func = functools.partial(utils.math.minkowski_distance, p=2) + if not isinstance(self.dist_func, FunctionWrapper): + self.dist_func = FunctionWrapper(self.dist_func) self.stm = SAMkNNShortTermMemory( n_neighbors=self.n_neighbors, - dist_func=dist_func, + dist_func=self.dist_func, min_stm_size=self.min_stm_size, weighted=self.weighted, - recalculate_stm_error=recalculate_stm_error, + recalculate_stm_error=self.recalculate_stm_error, ) - self.ltm = SAMkNNLongTermMemory(self.n_neighbors, dist_func=dist_func) + self.ltm = SAMkNNLongTermMemory(self.n_neighbors, dist_func=self.dist_func) @property def _multiclass(self): From 46f51ea66c8bcddcd22cccd0981007a67f367150 Mon Sep 17 00:00:00 2001 From: jvaquet Date: Fri, 16 May 2025 14:26:59 +0200 Subject: [PATCH 05/14] using self.dist_func instead of self.ltm.dist_func in SAMkNNClassifier --- river/neighbors/samknn_classifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river/neighbors/samknn_classifier.py b/river/neighbors/samknn_classifier.py index c1a92e0ade..b1926439f4 100644 --- a/river/neighbors/samknn_classifier.py +++ b/river/neighbors/samknn_classifier.py @@ -138,7 +138,7 @@ def learn_one(self, x, y, **kwargs): [ clean_dist == 0 or new_ltm_item[1] != stm_item[1] - or self.ltm.dist_func(new_ltm_item, stm_item) > clean_dist + or self.dist_func(new_ltm_item, stm_item) > clean_dist for stm_item, clean_dist in zip( self.stm, map(self.stm.get_clean_distance, self.stm) ) From 9144f1616eb81579bd7b36bb2d32c763d0a8b048 Mon Sep 17 00:00:00 2001 From: jvaquet Date: Fri, 16 May 2025 14:40:46 +0200 Subject: [PATCH 06/14] Avoid dividing by 0 --- river/neighbors/samknn_classifier.py | 35 +++++++++++++++++++--------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/river/neighbors/samknn_classifier.py b/river/neighbors/samknn_classifier.py index b1926439f4..0ab2813eee 100644 --- a/river/neighbors/samknn_classifier.py +++ b/river/neighbors/samknn_classifier.py @@ -171,6 +171,10 @@ def predict_proba_one(self, x, memory=None, **kwargs): # If no neighbors are found, return a uniform distribution if not nearest: return {cls: 1 / len(self.classes) for cls in self.classes} + + # If closest neighbor is exact match, assign it a probability of 1 + if nearest[0][1] == 0: + return {cls: 1 if cls == nearest[0][0][1] else 0 for cls in self.classes} # Add up unnormalized probas for item, dist in nearest: @@ -273,10 +277,13 @@ def partial_interleaved_test_train_error(self, size: int): items_distances = ((p, self.dist_func(item, p)) for p in self.items[start_idx:-1]) nearest = sorted(items_distances, key=operator.itemgetter(1))[: self.n_neighbors] - probas: dict[base.typing.ClfTarget, float] = collections.defaultdict(lambda: 0) - for item, dist in nearest: - probas[item[1]] += 1 / dist if self.weighted else 1 - prediction = max(probas, key=probas.__getitem__) + if nearest[0][1] == 0: + prediction = nearest[0][0][1] + else: + probas: dict[base.typing.ClfTarget, float] = collections.defaultdict(lambda: 0) + for item, dist in nearest: + probas[item[1]] += 1 / dist if self.weighted else 1 + prediction = max(probas, key=probas.__getitem__) self.prediction_histories[start_idx].append(prediction == item[1]) elif start_idx - 1 in self.prediction_histories.keys() and not self.recalculate_stm_error: @@ -290,10 +297,13 @@ def partial_interleaved_test_train_error(self, size: int): items_distances = ((p, self.dist_func(item, p)) for p in self.items[start_idx:-1]) nearest = sorted(items_distances, key=operator.itemgetter(1))[: self.n_neighbors] - probas = collections.defaultdict(lambda: 0) - for item, dist in nearest: - probas[item[1]] += 1 / dist if self.weighted else 1 - prediction = max(probas, key=probas.__getitem__) + if nearest[0][1] == 0: + prediction = nearest[0][0][1] + else: + probas = collections.defaultdict(lambda: 0) + for item, dist in nearest: + probas[item[1]] += 1 / dist if self.weighted else 1 + prediction = max(probas, key=probas.__getitem__) self.prediction_histories[start_idx].append(prediction == item[1]) else: @@ -305,9 +315,12 @@ def partial_interleaved_test_train_error(self, size: int): (p, self.dist_func(item, p)) for p in self.items[start_idx:cur_idx] ) nearest = sorted(items_distances, key=operator.itemgetter(1))[: self.n_neighbors] - probas = collections.defaultdict(lambda: 0) - for item, dist in nearest: - probas[item[1]] += 1 / dist if self.weighted else 1 + if nearest[0][1] == 0: + prediction = nearest[0][0][1] + else: + probas = collections.defaultdict(lambda: 0) + for item, dist in nearest: + probas[item[1]] += 1 / dist if self.weighted else 1 prediction = max(probas, key=probas.__getitem__) self.prediction_histories[start_idx].append(prediction == item[1]) From 05d2ea216d05bacdb22a707c599ed08c1d76d1ff Mon Sep 17 00:00:00 2001 From: jvaquet Date: Fri, 16 May 2025 14:50:37 +0200 Subject: [PATCH 07/14] Added unit test skips for emerging and disappearing features --- river/neighbors/samknn_classifier.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/river/neighbors/samknn_classifier.py b/river/neighbors/samknn_classifier.py index 0ab2813eee..b5c4e101a0 100644 --- a/river/neighbors/samknn_classifier.py +++ b/river/neighbors/samknn_classifier.py @@ -43,8 +43,7 @@ class SAMkNNClassifier(base.Classifier): Notes ----- As the LTM compression mechanism uses kmeans, SAM-kNN only works with - nummerical features and every datapoint is required to have a value for - every feature. + nummerical features and all datapoints are required to have the same features. Examples -------- @@ -95,6 +94,9 @@ def __init__( ) self.ltm = SAMkNNLongTermMemory(self.n_neighbors, dist_func=self.dist_func) + def _unit_test_skips(self): + return {"check_emerging_features", "check_disappearing_features"} + @property def _multiclass(self): return True @@ -373,7 +375,7 @@ def compress(self): fields = set(fields) assert ( len(fields) == 1 - ), "Not all datapoints have the same fields. Can not compress LTM!" + ), "Not all datapoints have the same features. Can not compress LTM!" fields = fields.pop() # Generate and add compressed data From ac2c6cc10a6fc5c627e8923e3ea7959bf8c2e796 Mon Sep 17 00:00:00 2001 From: jvaquet Date: Fri, 16 May 2025 14:53:00 +0200 Subject: [PATCH 08/14] Added result to docstring example --- river/neighbors/samknn_classifier.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/river/neighbors/samknn_classifier.py b/river/neighbors/samknn_classifier.py index b5c4e101a0..8e825f19d9 100644 --- a/river/neighbors/samknn_classifier.py +++ b/river/neighbors/samknn_classifier.py @@ -55,6 +55,8 @@ class SAMkNNClassifier(base.Classifier): >>> dataset = Bananas() >>> evaluate.progressive_val_score(dataset, samknn, metrics.Accuracy()) + + Accuracy: 77.18% """ def __init__( From 663e56de2b40753963446326c8e0a01438576f03 Mon Sep 17 00:00:00 2001 From: jvaquet Date: Fri, 16 May 2025 14:56:47 +0200 Subject: [PATCH 09/14] Corrected docstring formatting --- river/neighbors/samknn_classifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river/neighbors/samknn_classifier.py b/river/neighbors/samknn_classifier.py index 8e825f19d9..e80bfd1ea9 100644 --- a/river/neighbors/samknn_classifier.py +++ b/river/neighbors/samknn_classifier.py @@ -55,8 +55,8 @@ class SAMkNNClassifier(base.Classifier): >>> dataset = Bananas() >>> evaluate.progressive_val_score(dataset, samknn, metrics.Accuracy()) - Accuracy: 77.18% + """ def __init__( From 982071dbf8ac6e72efa55442d874a8542d5c9bbd Mon Sep 17 00:00:00 2001 From: jvaquet Date: Fri, 16 May 2025 15:29:38 +0200 Subject: [PATCH 10/14] Added release notes --- docs/releases/unreleased.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/releases/unreleased.md b/docs/releases/unreleased.md index 179c249086..34d6b3ae3c 100644 --- a/docs/releases/unreleased.md +++ b/docs/releases/unreleased.md @@ -5,3 +5,7 @@ - The `tags` and `more_tags` properties of `base.Estimator` are now both a set of strings. - The `base` module is now fully type-annotated. Some type hints have changed, but this does not impact the behaviour of the code. For instance, the regression target is now indicated as a float instead of a Number. - `base.Ensemble`, `base.Wrapper`, and `base.WrapperEnsemble` became generic with regard to the type they encapsulate. + +## neighbors + +- Added `neighbors.SAMkNNClassifier` implementing the SAM-kNN Classifier From 5f65cbc48fa79a297a5b7382d586fdbacb9b5d5b Mon Sep 17 00:00:00 2001 From: jvaquet Date: Fri, 16 May 2025 15:03:36 +0200 Subject: [PATCH 11/14] Autoremove leading spaces --- river/neighbors/samknn_classifier.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/river/neighbors/samknn_classifier.py b/river/neighbors/samknn_classifier.py index e80bfd1ea9..fdd1453b9f 100644 --- a/river/neighbors/samknn_classifier.py +++ b/river/neighbors/samknn_classifier.py @@ -56,7 +56,7 @@ class SAMkNNClassifier(base.Classifier): >>> evaluate.progressive_val_score(dataset, samknn, metrics.Accuracy()) Accuracy: 77.18% - + """ def __init__( @@ -175,7 +175,7 @@ def predict_proba_one(self, x, memory=None, **kwargs): # If no neighbors are found, return a uniform distribution if not nearest: return {cls: 1 / len(self.classes) for cls in self.classes} - + # If closest neighbor is exact match, assign it a probability of 1 if nearest[0][1] == 0: return {cls: 1 if cls == nearest[0][0][1] else 0 for cls in self.classes} From 74f0d9b6e341fb6dca54a4e814d05e71b9f120aa Mon Sep 17 00:00:00 2001 From: jvaquet Date: Fri, 16 May 2025 15:22:15 +0200 Subject: [PATCH 12/14] Added docstring description --- river/neighbors/samknn_classifier.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/river/neighbors/samknn_classifier.py b/river/neighbors/samknn_classifier.py index fdd1453b9f..02cc570ffe 100644 --- a/river/neighbors/samknn_classifier.py +++ b/river/neighbors/samknn_classifier.py @@ -12,9 +12,14 @@ class SAMkNNClassifier(base.Classifier): - """Self Adjusting Memory k-Nearest Neighbors (SAMkNN) for classification. + """Self Adjusting Memory k-Nearest Neighbors (SAMkNN) [^1] for classification. - High level description. + SAM-kNN is a neighbors based online classifier designed to handle + heterogeneous concept drift. To do so, it splits up its memory into Short + Term Memory (STM) and Long Term Memory (LTM). The STM tracks the currently + active concept and is continually resized to best represent it. Observations + discarded from the STM are transferred to LTM. To limit the memory size + without the need to discard observations, the LTM is regularly compressed. Parameters ---------- @@ -57,6 +62,10 @@ class SAMkNNClassifier(base.Classifier): >>> evaluate.progressive_val_score(dataset, samknn, metrics.Accuracy()) Accuracy: 77.18% + References + ---------- + [^1]: [KNN Classifier with Self Adjusting Memory for Heterogeneous Concept Drift — V. Losing, B. Hammer and H. Wersing](https://doi.org/10.1109/ICDM.2016.0040) + """ def __init__( From c4ecf668b8a84ee020b017fae38c101cc6d7eee3 Mon Sep 17 00:00:00 2001 From: jvaquet Date: Fri, 16 May 2025 15:22:49 +0200 Subject: [PATCH 13/14] removed trailing whitespaces from docstring --- river/neighbors/samknn_classifier.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/river/neighbors/samknn_classifier.py b/river/neighbors/samknn_classifier.py index 02cc570ffe..dbcc3abb0f 100644 --- a/river/neighbors/samknn_classifier.py +++ b/river/neighbors/samknn_classifier.py @@ -14,11 +14,11 @@ class SAMkNNClassifier(base.Classifier): """Self Adjusting Memory k-Nearest Neighbors (SAMkNN) [^1] for classification. - SAM-kNN is a neighbors based online classifier designed to handle - heterogeneous concept drift. To do so, it splits up its memory into Short + SAM-kNN is a neighbors based online classifier designed to handle + heterogeneous concept drift. To do so, it splits up its memory into Short Term Memory (STM) and Long Term Memory (LTM). The STM tracks the currently active concept and is continually resized to best represent it. Observations - discarded from the STM are transferred to LTM. To limit the memory size + discarded from the STM are transferred to LTM. To limit the memory size without the need to discard observations, the LTM is regularly compressed. Parameters From 96a5618401c4f2002a6b6327158dffc263d77404 Mon Sep 17 00:00:00 2001 From: jvaquet Date: Fri, 16 May 2025 16:03:53 +0200 Subject: [PATCH 14/14] Excluded unit test radically disappearing features --- river/neighbors/samknn_classifier.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/river/neighbors/samknn_classifier.py b/river/neighbors/samknn_classifier.py index dbcc3abb0f..17dd954b23 100644 --- a/river/neighbors/samknn_classifier.py +++ b/river/neighbors/samknn_classifier.py @@ -106,7 +106,11 @@ def __init__( self.ltm = SAMkNNLongTermMemory(self.n_neighbors, dist_func=self.dist_func) def _unit_test_skips(self): - return {"check_emerging_features", "check_disappearing_features"} + return { + "check_emerging_features", + "check_disappearing_features", + "check_radically_disappearing_features", + } @property def _multiclass(self):