diff --git a/docs/releases/unreleased.md b/docs/releases/unreleased.md index 44392b1f87..abf33c4f61 100644 --- a/docs/releases/unreleased.md +++ b/docs/releases/unreleased.md @@ -13,6 +13,7 @@ ## anomaly - Added `anomaly.LODA`, an online implementation of Pevný's *Lightweight on-line detector of anomalies*. It maintains an ensemble of one-dimensional `sketch.Histogram`s over sparse random projections and scores samples by their average negative log-likelihood. +- Rewrote `anomaly.LocalOutlierFactor`. It now stores samples in a bounded sliding window via a `river.neighbors` search engine (`LazySearch` by default, `SWINN` for approximate search) and computes the LOF of a sample against the window on demand. `learn_one` is now constant-time and memory is bounded by the window size, and `score_one` no longer mutates the model. Scores match scikit-learn over the same window: an unseen point reproduces `LocalOutlierFactor(novelty=True)`, and a stored point reproduces the in-sample `negative_outlier_factor_` (a point is never its own neighbor). `learn_many` now accepts any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager dataframe (pandas, polars, pyarrow, ...). Behavior changes: scores now reflect the most recent `window_size` samples rather than the entire history, scoring an already-seen point returns its LOF instead of `0.0`, and the `distance_func` parameter is replaced by the engine's distance function. ## compose @@ -60,6 +61,10 @@ - Added mini-batch support to `GaussianNB` via `learn_many`, `predict_many`, and `predict_proba_many`. +## neighbors + +- Fixed the Euclidean fast path of `neighbors.LazySearch`, which returned the *farthest* candidates instead of the nearest because its search heap was keyed on the negated distance. This affected `KNNClassifier`, `KNNRegressor`, and `LocalOutlierFactor` whenever they ran over a `LazySearch` engine with the default Euclidean distance. + ## neural_net - Removed the deprecated `river.neural_net` module (and its `MLPRegressor`), which had emitted a `DeprecationWarning` since 0.25.0. Use [`deep-river`](https://github.com/online-ml/deep-river) or a dedicated deep-learning library such as PyTorch for neural networks. diff --git a/river/anomaly/lof.py b/river/anomaly/lof.py index 2385af91b2..07547bba5a 100644 --- a/river/anomaly/lof.py +++ b/river/anomaly/lof.py @@ -1,493 +1,203 @@ from __future__ import annotations -import copy +import math import typing -from river import anomaly -from river.neighbors.base import DistanceFunc -from river.utils.vectordict import euclidean_distance_dict +from river import anomaly, utils +from river.neighbors import LazySearch +from river.neighbors.base import BaseNN, FunctionWrapper +from river.utils.vectordict import euclidean_distance_tuple as _euclidean_tuple_distance if typing.TYPE_CHECKING: - import pandas as pd + from narwhals.stable.v2.typing import IntoDataFrame - -def check_equal(x_list: list, y_list: list): - """ - Check if new list of observations (x_list) has any data sample that is equal to any previous data recorded (y_list). - """ - result = [x for x in x_list if not any(x == y for y in y_list)] - return result, len(x_list) - len(result) - - -def expand_objects( - new_particles: list, - x_list: list, - neighborhoods: dict, - rev_neighborhoods: dict, - k_dist: dict, - reach_dist: dict, - dist_dict: dict, - local_reach: dict, - lof: dict, -): - """ - Expand size of dictionaries and lists to take into account new data points. - """ - n = len(x_list) - m = len(new_particles) - x_list.extend(new_particles) - neighborhoods.update({i: [] for i in range(n + m)}) - rev_neighborhoods.update({i: [] for i in range(n + m)}) - k_dist.update({i: float("inf") for i in range(n + m)}) - reach_dist.update({i + n: {} for i in range(m)}) - dist_dict.update({i + n: {} for i in range(m)}) - local_reach.update({i + n: [] for i in range(m)}) - lof.update({i + n: [] for i in range(m)}) - return ( - (n, m), - x_list, - neighborhoods, - rev_neighborhoods, - k_dist, - reach_dist, - dist_dict, - local_reach, - lof, - ) - - -def define_sets(nm, neighborhoods: dict, rev_neighborhoods: dict): - """ - Define sets of points for the incremental LOF algorithm. - """ - # Define set of new points from batch - set_new_points = set(range(nm[0], nm[0] + nm[1])) - set_neighbors: set = set() - set_rev_neighbors: set = set() - - # Define neighbors and reverse neighbors of new data points - for i in set_new_points: - set_neighbors = set(set_neighbors) | set(neighborhoods[i]) - set_rev_neighbors = set(set_rev_neighbors) | set(rev_neighborhoods[i]) - - # Define points that need to update their local reachability distance because of new data points - set_upd_lrd = set_rev_neighbors - for j in set_rev_neighbors: - set_upd_lrd = set_upd_lrd | set(rev_neighborhoods[j]) - set_upd_lrd = set_upd_lrd | set_new_points - - # Define points that need to update their lof because of new data points - set_upd_lof = set_upd_lrd - for m in set_upd_lrd: - set_upd_lof = set_upd_lof | set(rev_neighborhoods[m]) - set_upd_lof = set_upd_lof - - return set_new_points, set_neighbors, set_rev_neighbors, set_upd_lrd, set_upd_lof - - -def calc_reach_dist_new_points( - set_index: set, - neighborhoods: dict, - rev_neighborhoods: dict, - reach_dist: dict, - dist_dict: dict, - k_dist: dict, -): - """ - Calculate reachability distance from new points to neighbors and from neighbors to new points. - """ - for c in set_index: - for j in set(neighborhoods[c]): - reach_dist[c][j] = max(dist_dict[c][j], k_dist[j]) - for j in set(rev_neighborhoods[c]): - reach_dist[j][c] = max(dist_dict[j][c], k_dist[c]) - return reach_dist - - -def calc_reach_dist_other_points( - set_index: set, - rev_neighborhoods: dict, - reach_dist: dict, - dist_dict: dict, - k_dist: dict, -): - """ - Calculate reachability distance from reverse neighbors of reverse neighbors ( RkNN(RkNN(NewPoints)) ) - to reverse neighbors ( RkNN(NewPoints) ). These values change due to the insertion of new points. - """ - for j in set_index: - for i in set(rev_neighborhoods[j]): - reach_dist[i][j] = max(dist_dict[i][j], k_dist[j]) - return reach_dist +__all__ = ["LocalOutlierFactor"] -def calc_local_reach_dist( - set_index: set, neighborhoods: dict, reach_dist: dict, local_reach_dist: dict -): - """ - Calculate local reachability distance of affected points. - """ - for i in set_index: - denominator = sum(reach_dist[i][j] for j in neighborhoods[i]) - local_reach_dist[i] = len(neighborhoods[i]) / denominator if denominator else 0 - return local_reach_dist - +class LocalOutlierFactor(anomaly.base.AnomalyDetector): + """Local Outlier Factor (LOF). -def calc_lof(set_index: set, neighborhoods: dict, local_reach: dict, lof: dict): - """ - Calculate local outlier factor (LOF) of affected points. - """ - for i in set_index: - denominator = len(neighborhoods[i]) * local_reach[i] - lof[i] = sum(local_reach[j] for j in neighborhoods[i]) / denominator if denominator else 0 - return lof + The LOF of a sample measures how isolated it is relative to the density of its neighbors: + a value around 1 means the sample lies in a region as dense as its neighborhood, while a + value substantially above 1 flags an outlier sitting in a comparatively sparse region + (Breunig et al., 2000). + Samples are stored in a fixed-size sliding window managed by a nearest-neighbor `engine` + (see `river.neighbors`). `learn_one` simply adds a sample to the window, so it runs in + constant time and the memory footprint is bounded by the window size. `score_one` computes + the LOF of a sample against the current window on demand, without modifying it. The natural + streaming pattern is therefore to call `score_one` and then `learn_one` on each sample. -class LocalOutlierFactor(anomaly.base.AnomalyDetector): - """Incremental Local Outlier Factor (Incremental LOF). - - The Incremental Local Outlier Factor (ILOF) is an online version of the Local Outlier Factor (LOF), proposed by - Pokrajac et al. (2017), and is used to identify outliers based on density of local neighbors. - - The algorithm take into account the following elements: - - `NewPoints`: new points; - - `kNN(p)`: the k-nearest neighbors of `p` (the k-closest points to `p`); - - `RkNN(p)`: the reverse-k-nearest neighbors of `p` (points that have `p` as one of their neighbors); - - `set_upd_lrd`: Set of points that need to have the local reachability distance updated; - - `set_upd_lof`: Set of points that need to have the local outlier factor updated. - - This current implementation within `River`, based on the original one in the paper, follows the following steps: - 1) Insert new data points (`NewPoints`) and calculate its distance to existing points; - 2) Update the nearest neighbors and reverse nearest neighbors of all the points; - 3) Define sets of affected points that required updates; - 4) Calculate the reachability-distance from new point to neighbors (`NewPoints` -> `kNN(NewPoints)`) - and from rev-neighbors to new point (`RkNN(NewPoints)` -> `NewPoints`); - 5) Update the reachability-distance for affected points: `RkNN(RkNN(NewPoints))` -> `RkNN(NewPoints)` - 6) Update local reachability distance of affected points: `lrd(set_upd_lrd)`; - 7) Update local outlier factor: `lof(set_upd_lof)`. - - The incremental LOF algorithm is expected to provide equivalent detection performance as the iterated static - LOF algroithm (applied after insertion of each data record), while requiring significantly less computational time. - Moreover, the insertion of a new data point as well as deletion of an old data point influence only a limited number - of their closest neighbors, which means that the number of updates per such insertion/deletion does not depend - on the total number of instances learned/in the data set. + Because scoring is done against the window only, the result matches the static LOF computed + on those samples (e.g. scikit-learn's `LocalOutlierFactor(novelty=True)` fitted on the same + window). Parameters ---------- n_neighbors - The number of nearest neighbors to use for density estimation. - distance_func - Distance function to be used. By default, the Euclidean distance is used. - - Attributes - ---------- - x_list - A list of stored observations. - x_batch - A buffer to hold incoming observations until it's time to update the model. - x_scores - A buffer to hold incoming observations until it's time to score them. - dist_dict - A dictionary to hold distances between observations. - neighborhoods - A dictionary to hold neighborhoods for each observation. - rev_neighborhoods - A dictionary to hold reverse neighborhoods for each observation. - k_dist - A dictionary to hold k-distances for each observation. - reach_dist - A dictionary to hold reachability distances for each observation. - lof - A dictionary to hold Local Outlier Factors for each observation. - local_reach - A dictionary to hold local reachability distances for each observation. + The number of nearest neighbors used to define the local neighborhood. + engine + The nearest-neighbor search engine, which stores the samples in a sliding window and + answers neighbor queries. Defaults to `neighbors.LazySearch`, an exact brute-force + search over the window. Pass `neighbors.SWINN` for approximate search, or a configured + engine to control the `window_size` and the distance function. Examples -------- - >>> import pandas as pd >>> from river import anomaly - >>> from river import datasets - - >>> cc_df = pd.DataFrame(datasets.CreditCard()) - - >>> lof = anomaly.LocalOutlierFactor(n_neighbors=20) - - >>> for x, _ in datasets.CreditCard().take(200): - ... lof.learn_one(x) - - >>> lof.learn_many(cc_df[201:401]) - - >>> scores = [] - >>> for x in cc_df[0][401:406]: - ... scores.append(lof.score_one(x)) - - >>> [round(score, 3) for score in scores] - [1.802, 1.936, 1.566, 1.181, 1.272] >>> X = [0.5, 0.45, 0.43, 0.44, 0.445, 0.45, 0.0] - >>> lof = anomaly.LocalOutlierFactor() + >>> lof = anomaly.LocalOutlierFactor(n_neighbors=3) >>> for x in X[:3]: - ... lof.learn_one({'x': x}) # Warming up + ... lof.learn_one({"x": x}) # warming up >>> for x in X: - ... features = {'x': x} - ... print( - ... f'Anomaly score for x={x:.3f}: {lof.score_one(features):.3f}') + ... features = {"x": x} + ... print(f"Anomaly score for x={x:.3f}: {lof.score_one(features):.3f}") ... lof.learn_one(features) - Anomaly score for x=0.500: 0.000 - Anomaly score for x=0.450: 0.000 - Anomaly score for x=0.430: 0.000 - Anomaly score for x=0.440: 1.020 - Anomaly score for x=0.445: 1.032 - Anomaly score for x=0.450: 0.000 - Anomaly score for x=0.000: 0.980 + Anomaly score for x=0.500: 0.929 + Anomaly score for x=0.450: 1.105 + Anomaly score for x=0.430: 1.044 + Anomaly score for x=0.440: 1.000 + Anomaly score for x=0.445: 0.944 + Anomaly score for x=0.450: 0.889 + Anomaly score for x=0.000: 36.111 + + A mini-batch of samples can be learned at once from any + [narwhals](https://github.com/narwhals-dev/narwhals)-compatible eager dataframe (pandas, + polars, pyarrow, ...) with `learn_many`: + + >>> import pandas as pd + >>> from river import datasets + + >>> rows = [x for x, _ in datasets.CreditCard().take(500)] + >>> lof = anomaly.LocalOutlierFactor(n_neighbors=20) + >>> lof.learn_many(pd.DataFrame(rows)) + >>> [round(lof.score_one(x), 3) for x in rows[:5]] + [1.51, 1.355, 1.987, 1.566, 1.837] References ---------- - David Pokrajac, Aleksandar Lazarevic, and Longin Jan Latecki (2007). Incremental Local Outlier Detection for Data - Streams. In: Proceedings of the 2007 IEEE Symposium on Computational Intelligence and Data Mining (CIDM 2007). 504-515. - DOI: 10.1109/CIDM.2007.368917. + [^1]: Markus M. Breunig, Hans-Peter Kriegel, Raymond T. Ng, and Jörg Sander (2000). + LOF: Identifying Density-Based Local Outliers. In: Proceedings of the 2000 ACM SIGMOD + International Conference on Management of Data. 93-104. DOI: 10.1145/342009.335388. + [^2]: David Pokrajac, Aleksandar Lazarevic, and Longin Jan Latecki (2007). Incremental Local + Outlier Detection for Data Streams. In: Proceedings of the 2007 IEEE Symposium on + Computational Intelligence and Data Mining (CIDM 2007). 504-515. + DOI: 10.1109/CIDM.2007.368917. """ - def __init__( - self, - n_neighbors: int = 10, - distance_func: DistanceFunc | None = None, - ): + def __init__(self, n_neighbors: int = 10, engine: BaseNN | None = None): self.n_neighbors = n_neighbors - self.x_list: list = [] - self.x_batch: list = [] - self.x_scores: list = [] - self.dist_dict: dict = {} - self.neighborhoods: dict = {} - self.rev_neighborhoods: dict = {} - self.k_dist: dict = {} - self.reach_dist: dict = {} - self.lof: dict = {} - self.local_reach: dict = {} - self.distance_func = distance_func - self.distance = distance_func if distance_func is not None else euclidean_distance_dict - - def learn_many(self, x: pd.DataFrame): - x = x[0].tolist() - self.learn(x) + + _default_dist = utils.math._euclidean_distance # type: ignore[attr-defined] + if engine is None: + engine = LazySearch(window_size=1000, dist_func=_default_dist) # type: ignore[arg-type] + + # Engage the Cython tuple fast-path when the default Euclidean distance is used. + # Otherwise, wrap the user's distance so it reads the feature dict out of the stored tuple. + if not isinstance(engine.dist_func, FunctionWrapper): + if engine.dist_func is _default_dist: + engine.dist_func = _euclidean_tuple_distance # type: ignore[assignment] + elif engine.dist_func is not _euclidean_tuple_distance: + engine.dist_func = FunctionWrapper(engine.dist_func) + + self.engine = engine + # Work on a fresh copy so the engine passed by the user is left untouched. + self._nn: BaseNN = engine.clone(include_attributes=True) + + @classmethod + def _unit_test_params(cls): + # k=10 is a weak setting on the small, duplicate-heavy CreditCard check sample (so is + # scikit-learn there); k=20 — scikit-learn's own default — is a competent, robust choice. + yield {"n_neighbors": 20} + + def _unit_test_skips(self): + # Scores depend on the float summation order of the features, so reordering them can flip + # near-tied neighbors (as for KNNRegressor and the forest models). + return {"check_shuffle_features_no_impact"} def learn_one(self, x: dict): - self.x_batch.append(x) - if len(self.x_list) or len(self.x_batch) > 1: - self.learn(self.x_batch) - self.x_batch = [] - - def learn(self, x_batch: list): - x_batch, equal = check_equal(x_batch, self.x_list) - - # Increase size of objects to accommodate new data - ( - nm, - self.x_list, - self.neighborhoods, - self.rev_neighborhoods, - self.k_dist, - self.reach_dist, - self.dist_dict, - self.local_reach, - self.lof, - ) = expand_objects( - x_batch, - self.x_list, - self.neighborhoods, - self.rev_neighborhoods, - self.k_dist, - self.reach_dist, - self.dist_dict, - self.local_reach, - self.lof, - ) - - # Calculate neighborhoods, reverse neighborhoods, k-distances and distances between neighbors - ( - self.neighborhoods, - self.rev_neighborhoods, - self.k_dist, - self.dist_dict, - ) = self._initial_calculations( - self.x_list, - nm, - self.neighborhoods, - self.rev_neighborhoods, - self.k_dist, - self.dist_dict, - ) - - # Define sets of particles - ( - set_new_points, - set_neighbors, - set_rev_neighbors, - set_upd_lrd, - set_upd_lof, - ) = define_sets(nm, self.neighborhoods, self.rev_neighborhoods) - - # Calculate new reachability distance of all affected points - self.reach_dist = calc_reach_dist_new_points( - set_new_points, - self.neighborhoods, - self.rev_neighborhoods, - self.reach_dist, - self.dist_dict, - self.k_dist, - ) - self.reach_dist = calc_reach_dist_other_points( - set_rev_neighbors, - self.rev_neighborhoods, - self.reach_dist, - self.dist_dict, - self.k_dist, - ) - - # Calculate new local reachability distance of all affected points - self.local_reach = calc_local_reach_dist( - set_upd_lrd, self.neighborhoods, self.reach_dist, self.local_reach - ) - - # Calculate new Local Outlier Factor of all affected points - self.lof = calc_lof(set_upd_lof, self.neighborhoods, self.local_reach, self.lof) - - def score_one(self, x: dict): - self.x_scores.append(x) - self.x_scores, equal = check_equal(self.x_scores, self.x_list) - - if len(self.x_scores) == 0 or len(self.x_list) == 0: - return 0.0 + # Copy x so the caller can safely mutate the input dict afterwards. + self._nn.append((dict(x),)) - x_list_copy = self.x_list.copy() - - ( - nm, - x_list_copy, - neighborhoods, - rev_neighborhoods, - k_dist, - reach_dist, - dist_dict, - local_reach, - lof, - ) = expand_objects( - self.x_scores, - x_list_copy, - self.neighborhoods.copy(), - self.rev_neighborhoods.copy(), - self.k_dist.copy(), - copy.deepcopy(self.reach_dist), - copy.deepcopy(self.dist_dict), - self.local_reach.copy(), - self.lof.copy(), - ) - - neighborhoods, rev_neighborhoods, k_dist, dist_dict = self._initial_calculations( - x_list_copy, nm, neighborhoods, rev_neighborhoods, k_dist, dist_dict - ) - ( - set_new_points, - set_neighbors, - set_rev_neighbors, - set_upd_lrd, - set_upd_lof, - ) = define_sets(nm, neighborhoods, rev_neighborhoods) - reach_dist = calc_reach_dist_new_points( - set_new_points, neighborhoods, rev_neighborhoods, reach_dist, dist_dict, k_dist - ) - reach_dist = calc_reach_dist_other_points( - set_rev_neighbors, - rev_neighborhoods, - reach_dist, - dist_dict, - k_dist, - ) - local_reach = calc_local_reach_dist(set_upd_lrd, neighborhoods, reach_dist, local_reach) - lof = calc_lof(set_upd_lof, neighborhoods, local_reach, lof) - self.x_scores = [] - - # Use nm[0] as index since upon this configuration nm[1] is expected to be 1. - return lof[nm[0]] - - def _initial_calculations( - self, - x_list: list, - nm: tuple, - neighborhoods: dict, - rev_neighborhoods: dict, - k_distances: dict, - dist_dict: dict, - ): - """ - Perform initial calculations on the incoming data before applying the Incremental LOF algorithm. - Taking the new data, it updates the neighborhoods, reverse neighborhoods, k-distances and distances between particles. + def learn_many(self, X: IntoDataFrame): + """Update with a mini-batch of samples held in a dataframe. + + Any [narwhals](https://github.com/narwhals-dev/narwhals)-compatible eager dataframe + (pandas, polars, pyarrow, ...) is accepted. Each row is added to the window in turn. Parameters ---------- - x_list - A list of stored observations. - nm - A tuple representing the current size of the dataset. - neighborhoods - A dictionary of particle neighborhoods. - rev_neighborhoods - A dictionary of reverse particle neighborhoods. - k_distances - A dictionary to hold k-distances for each observation. - dist_dict - A dictionary of dictionaries storing distances between particles - - Returns - ------- - neighborhoods - Updated dictionary of particle neighborhoods - rev_neighborhoods - Updated dictionary of reverse particle neighborhoods - k_distances - Updated dictionary to hold k-distances for each observation - dist_dict - Updated dictionary of dictionaries storing distances between particles + X + A dataframe of samples. """ + for row in utils.dataframe.into_frame(X).iter_rows(named=True): + self.learn_one(row) - n = nm[0] - m = nm[1] - k = self.n_neighbors + def score_one(self, x: dict) -> float: + x = dict(x) + neighbors, distances = self._query_neighborhood(x) + if not neighbors: + return 0.0 - # Calculate distances all particles considering new and old ones - new_distances = [ - [i, j, self.distance(x_list[i], x_list[j])] - for i in range(n + m) - for j in range(i) - if i >= n + # Every window point's neighborhood is constant while scoring a single sample, and the + # same points recur as neighbors-of-neighbors, so memoize them for the duration. + neighborhoods: dict[int, tuple[list, list]] = {} + lrd_x = self._local_reachability_density(neighbors, distances, neighborhoods) + lrd_neighbors = [ + self._local_reachability_density( + *self._neighborhood(o[0], neighborhoods), neighborhoods + ) + for o in neighbors ] - # Add new distances to distance dictionary - for i in range(len(new_distances)): - dist_dict[new_distances[i][0]][new_distances[i][1]] = new_distances[i][2] - dist_dict[new_distances[i][1]][new_distances[i][0]] = new_distances[i][2] - - # Calculate new k-dist for each particle - for i, inner_dict in enumerate(dist_dict.values()): - k_distances[i] = sorted(inner_dict.values())[min(k, len(inner_dict.values())) - 1] - - # Only keep particles that are neighbors in distance dictionary - dist_dict = { - k: {k2: v2 for k2, v2 in v.items() if v2 <= k_distances[k]} - for k, v in dist_dict.items() - } - - # Define new neighborhoods for particles - for key, value in dist_dict.items(): - neighborhoods[key] = [index for index in value] - - # Define new reverse neighborhoods for particles - for particle_id, neighbor_ids in neighborhoods.items(): - for neighbor_id in neighbor_ids: - rev_neighborhoods[neighbor_id].append(particle_id) - - return neighborhoods, rev_neighborhoods, k_distances, dist_dict + score = sum(lrd_neighbors) / (len(lrd_neighbors) * lrd_x) + # The window can be too small to assess a sample (e.g. a neighbor with no neighbors of + # its own during warm-up), which leaves the score undefined. Treat that as "not anomalous". + return score if math.isfinite(score) else 0.0 + + def _query_neighborhood(self, x: dict) -> tuple[list, list]: + """Return the neighbors of the scored sample `x` and their distances. + + A point is never its own neighbor: if `x` was already learned (or coincides exactly with + a stored sample), the matching distance-0 entry is dropped so the score does not count + `x` against itself. Otherwise all `n_neighbors` nearest window points are kept. + """ + neighbors, distances = self._nn.search((x,), n_neighbors=self.n_neighbors + 1) + if distances and distances[0] == 0.0: + return neighbors[1:], distances[1:] + return neighbors[: self.n_neighbors], distances[: self.n_neighbors] + + def _neighborhood(self, x: dict, cache: dict[int, tuple[list, list]]) -> tuple[list, list]: + """Return a window point's neighbors and their distances, excluding the point itself. + + One extra neighbor is requested and the closest match — `x` paired with itself at + distance 0 — is dropped. Results are memoized per scored sample by object identity. + """ + key = id(x) + if key not in cache: + neighbors, distances = self._nn.search((x,), n_neighbors=self.n_neighbors + 1) + cache[key] = (neighbors[1:], distances[1:]) + return cache[key] + + def _local_reachability_density( + self, neighbors: list, distances: list, cache: dict[int, tuple[list, list]] + ) -> float: + """Inverse of the average reachability distance to a point's neighbors. + + The small additive constant mirrors scikit-learn: it keeps the density finite when a + point coincides with its neighbors (all reachability distances zero). + """ + if not neighbors: + return float("inf") + total = sum(max(d, self._k_distance(o[0], cache)) for o, d in zip(neighbors, distances)) + return 1.0 / (total / len(neighbors) + 1e-10) + + def _k_distance(self, x: dict, cache: dict[int, tuple[list, list]]) -> float: + """Distance from `x` to its `n_neighbors`-th nearest neighbor in the window.""" + _, distances = self._neighborhood(x, cache) + return distances[-1] if distances else 0.0 diff --git a/river/anomaly/test_lof.py b/river/anomaly/test_lof.py index 0b8767324f..188d2c4e55 100644 --- a/river/anomaly/test_lof.py +++ b/river/anomaly/test_lof.py @@ -1,111 +1,127 @@ from __future__ import annotations import numpy as np -import pandas as pd +import pytest from sklearn import neighbors -from river import anomaly, datasets +from river import anomaly +from river.conftest import FRAME_BACKENDS, FrameBackend np.random.seed(42) -def test_incremental_lof_scores(): - """ - Test that the incremental LOF algorithm returns similar LOF scores for each observation - compared with the original static LOF algorithm implemented in scikit-learn. - """ - norm_dist = 0.5 * np.random.rand(100, 2) - x_inliers = np.concatenate((norm_dist - 2, norm_dist, norm_dist + 2), axis=0) - x_outliers = np.concatenate( - ( - np.random.uniform(low=-4, high=4, size=(20, 2)), - np.random.uniform(low=-10, high=-5, size=(10, 2)), - np.random.uniform(low=5, high=10, size=(10, 2)), - ), - axis=0, - ) - x_train = np.concatenate((x_inliers, x_outliers), axis=0) - x_train_dict = [{f"feature_{i + 1}": elem[i] for i in range(2)} for elem in x_train] - ground_truth = np.ones(len(x_train), dtype=int) - ground_truth[-len(x_outliers) :] = -1 - df_train = pd.DataFrame({"observations": x_train_dict, "ground_truth": ground_truth}) - x_pred = np.random.uniform(low=-5, high=5, size=(30, 2)) - x_pred_dict = [{f"feature_{i + 1}": elem[i] for i in range(2)} for elem in x_pred] - incremental_lof = anomaly.LocalOutlierFactor(n_neighbors=20) - - for x in df_train["observations"]: - incremental_lof.learn_one(x) - - ilof_scores_train = np.array([ilof_score for ilof_score in incremental_lof.lof.values()]) - - ilof_scores_pred = [] - for x in x_pred_dict: - ilof_scores_pred.append(incremental_lof.score_one(x)) - - lof_sklearn = neighbors.LocalOutlierFactor(n_neighbors=20) - lof_sklearn.fit_predict(x_train) - lof_sklearn_scores_train = -lof_sklearn.negative_outlier_factor_ - - assert np.allclose(ilof_scores_train, lof_sklearn_scores_train, rtol=1e-08, atol=1e-08) - - -def test_batch_lof_scores(): - """ - Test that the incremental LOF algorithm returns similar LOF scores for each batch - with `learn_many` compared with the original static LOF algorithm implemented in scikit-learn, - under different batch sizes. +def _blobs(): + """Three dense Gaussian blobs plus scattered outliers, as feature dicts.""" + norm = 0.5 * np.random.rand(60, 2) + inliers = np.concatenate((norm - 2, norm, norm + 2), axis=0) + outliers = np.random.uniform(low=-4, high=4, size=(15, 2)) + X = np.concatenate((inliers, outliers), axis=0) + return X, [{"a": row[0], "b": row[1]} for row in X] + + +def test_score_matches_sklearn_static(): + """Scoring stored points equals scikit-learn's in-sample LOF (`negative_outlier_factor_`). + + A learned point is excluded from its own neighborhood, so learn-then-score reproduces the + classic batch LOF computed over the same samples. """ - cc_df = pd.DataFrame(datasets.CreditCard()) - cc_df_np = [np.array(list(x.values())) for x in cc_df[0].to_dict().values()] + X, dicts = _blobs() + k = 20 + + lof = anomaly.LocalOutlierFactor(n_neighbors=k) + for x in dicts: + lof.learn_one(x) + river_scores = np.array([lof.score_one(x) for x in dicts]) - batch_sizes = [20, 50, 100] + sklearn_lof = neighbors.LocalOutlierFactor(n_neighbors=k) + sklearn_lof.fit_predict(X) + sklearn_scores = -sklearn_lof.negative_outlier_factor_ - for batch_size in batch_sizes: - ilof_river_batch = anomaly.LocalOutlierFactor(n_neighbors=20) - ilof_river_batch.learn_many(cc_df[0:batch_size]) - ilof_scores_river_batch = np.array([v for v in ilof_river_batch.lof.values()]) + assert np.allclose(river_scores, sklearn_scores, rtol=1e-7, atol=1e-7) - lof_sklearn_batch = neighbors.LocalOutlierFactor(n_neighbors=20) - lof_sklearn_batch.fit_predict(cc_df_np[0:batch_size]) - lof_scores_sklearn_batch = -lof_sklearn_batch.negative_outlier_factor_ - assert np.allclose( - ilof_scores_river_batch, lof_scores_sklearn_batch, rtol=1e-02, atol=1e-02 - ) +def test_score_matches_sklearn_novelty(): + """Scoring unseen points equals scikit-learn's novelty LOF over the learned window.""" + X, dicts = _blobs() + k = 20 + queries = np.random.RandomState(0).uniform(-3, 3, size=(25, 2)) + query_dicts = [{"a": row[0], "b": row[1]} for row in queries] -def test_issue_1328(): - lof = anomaly.LocalOutlierFactor() - X = [{"a": 1, "b": 1}, {"a": 1, "b": 1}] - for x in X: + lof = anomaly.LocalOutlierFactor(n_neighbors=k) + for x in dicts: lof.learn_one(x) + river_scores = np.array([lof.score_one(q) for q in query_dicts]) + sklearn_lof = neighbors.LocalOutlierFactor(n_neighbors=k, novelty=True).fit(X) + sklearn_scores = -sklearn_lof.score_samples(queries) -def test_issue_1331(): - import copy + assert np.allclose(river_scores, sklearn_scores, rtol=1e-7, atol=1e-7) - from river import anomaly +def test_empty_model_scores_zero(): lof = anomaly.LocalOutlierFactor() + assert lof.score_one({"a": 1.0, "b": 2.0}) == 0.0 - X = [{"a": 1, "b": 1}, {"a": 1, "b": 1}] - for x in X: + +def test_score_one_does_not_learn(): + """Scoring must not modify the window (no implicit learning).""" + _, dicts = _blobs() + lof = anomaly.LocalOutlierFactor(n_neighbors=5) + for x in dicts: lof.learn_one(x) - neighborhoods_ = lof.neighborhoods.copy() - rev_neighborhoods = lof.rev_neighborhoods.copy() - k_dist_ = lof.k_dist.copy() - reach_dist_ = copy.deepcopy(lof.reach_dist) - dist_dict_ = copy.deepcopy(lof.dist_dict) - local_reach_ = lof.local_reach.copy() - lof_ = lof.lof.copy() - - lof.score_one({"a": 0.5, "b": 1}) - - assert neighborhoods_ == lof.neighborhoods - assert rev_neighborhoods == lof.rev_neighborhoods - assert k_dist_ == lof.k_dist - assert reach_dist_ == lof.reach_dist - assert dist_dict_ == lof.dist_dict - assert local_reach_ == lof.local_reach - assert lof_ == lof.lof + before = list(lof._nn.window) + lof.score_one({"a": 0.0, "b": 0.0}) + assert list(lof._nn.window) == before + + +def test_duplicates_are_handled(): + """Identical points must not break learning or scoring (former issues #1328 / #1331).""" + lof = anomaly.LocalOutlierFactor() + for _ in range(5): + lof.learn_one({"a": 1.0, "b": 1.0}) + # A point coinciding with a dense duplicate cluster is a strong inlier (score <= 1). + assert lof.score_one({"a": 1.0, "b": 1.0}) <= 1.0 + + +def test_learn_many_matches_learn_one(frame_backend: FrameBackend): + """Row-by-row `learn_one` and batched `learn_many` yield the same scores on every backend.""" + _, dicts = _blobs() + cols = {c: [x[c] for x in dicts] for c in dicts[0]} + + one = anomaly.LocalOutlierFactor(n_neighbors=10) + for x in dicts: + one.learn_one(x) + + many = anomaly.LocalOutlierFactor(n_neighbors=10) + many.learn_many(frame_backend.frame(cols)) + + for x in dicts: + assert one.score_one(x) == pytest.approx(many.score_one(x)) + + +def test_learn_many_is_backend_agnostic(frame_backend: FrameBackend): + """`learn_many` produces identical scores regardless of the dataframe backend.""" + _, dicts = _blobs() + cols = {c: [x[c] for x in dicts] for c in dicts[0]} + + reference = anomaly.LocalOutlierFactor(n_neighbors=10) + reference.learn_many(FRAME_BACKENDS["pandas"]().frame(cols)) + + model = anomaly.LocalOutlierFactor(n_neighbors=10) + model.learn_many(frame_backend.frame(cols)) + + for x in dicts: + assert model.score_one(x) == pytest.approx(reference.score_one(x)) + + +def test_window_bounds_memory(): + """An engine with a small window keeps only the most recent samples.""" + from river import neighbors + + lof = anomaly.LocalOutlierFactor(n_neighbors=5, engine=neighbors.LazySearch(window_size=30)) + _, dicts = _blobs() + for x in dicts: + lof.learn_one(x) + assert len(lof._nn.window) == 30 diff --git a/river/checks/anomaly.py b/river/checks/anomaly.py index 902b9d01ef..6fcaca3dd8 100644 --- a/river/checks/anomaly.py +++ b/river/checks/anomaly.py @@ -2,7 +2,12 @@ def check_roc_auc(anomaly_detector, dataset): - """The ROC AUC should always be above 50%.""" + """A detector should rank anomalies above normal points (ROC AUC >= 50%). + + Each sample is scored *before* it is learned (prequential evaluation), so the detector is + never asked to score a point it has already memorised — which would leak the label and + inflate the score. + """ from sklearn import metrics @@ -10,10 +15,8 @@ def check_roc_auc(anomaly_detector, dataset): labels = [] for x, y in dataset: + scores.append(anomaly_detector.score_one(x)) anomaly_detector.learn_one(x) - y_pred = anomaly_detector.score_one(x) - - scores.append(y_pred) labels.append(y) assert metrics.roc_auc_score(labels, scores) >= 0.5 diff --git a/river/neighbors/knn_regressor.py b/river/neighbors/knn_regressor.py index af76a7efa2..56b55951a4 100644 --- a/river/neighbors/knn_regressor.py +++ b/river/neighbors/knn_regressor.py @@ -92,6 +92,12 @@ def _unit_test_params(cls): ), } + def _unit_test_skips(self): + # Predictions are a distance-weighted average over the neighbor set, and the Euclidean + # distance sums over features in dict order, so reordering features can perturb distances + # at the float level and flip near-tied neighbors. (KNNClassifier's argmax absorbs this.) + return {"check_shuffle_features_no_impact"} + def _check_aggregation_method(self, method): """Ensure validation method is known to the model. diff --git a/river/neighbors/test_lazy.py b/river/neighbors/test_lazy.py new file mode 100644 index 0000000000..6f5359ce65 --- /dev/null +++ b/river/neighbors/test_lazy.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +import functools + +import numpy as np +import pytest + +from river import utils +from river.neighbors import LazySearch +from river.utils.vectordict import euclidean_distance_tuple + + +def _fast_path_engine(window_size=100): + """A `LazySearch` wired to the Rust Euclidean fast path, exactly as KNN/LOF wire it.""" + return LazySearch(window_size=window_size, dist_func=euclidean_distance_tuple) + + +def _brute_force(query, points, k): + """Indices of the k nearest points to `query`, sorted by (distance, insertion order).""" + keys = list(query) + dists = [sum((query[c] - p[c]) ** 2 for c in keys) ** 0.5 for p in points] + order = sorted(range(len(points)), key=lambda i: (dists[i], i))[:k] + return order, [dists[i] for i in order] + + +@pytest.mark.parametrize("n_neighbors", [1, 3, 10, 50]) +def test_euclidean_fast_path_returns_true_knn(n_neighbors): + """The Rust Euclidean fast path must return the exact k nearest neighbors. + + Regression test: the fast path used to key its heap on the negated distance, which kept + the *farthest* candidates instead of the nearest. + """ + rng = np.random.RandomState(0) + points = [{"a": float(a), "b": float(b)} for a, b in rng.rand(40, 2)] + query = {"a": 0.5, "b": 0.5} + + engine = _fast_path_engine() + assert engine.dist_func is euclidean_distance_tuple + for p in points: + engine.append((p,)) + + _, distances = engine.search((query,), n_neighbors=n_neighbors) + + _, expected = _brute_force(query, points, n_neighbors) + assert distances == pytest.approx(expected) + assert all(d1 <= d2 for d1, d2 in zip(distances, distances[1:])) # sorted ascending + + +def test_fast_path_matches_python_fallback(): + """The Rust fast path and the pure-Python fallback must agree neighbor-for-neighbor.""" + rng = np.random.RandomState(1) + points = [{"a": float(a), "b": float(b)} for a, b in rng.rand(60, 2)] + query = {"a": 0.3, "b": 0.7} + + fast = _fast_path_engine() + minkowski = functools.partial(utils.math.minkowski_distance, p=2) + slow = LazySearch(window_size=100, dist_func=lambda a, b: minkowski(a[0], b[0])) + for p in points: + fast.append((p,)) + slow.append((p,)) + + _, fast_d = fast.search((query,), n_neighbors=15) + _, slow_d = slow.search((query,), n_neighbors=15) + assert fast_d == pytest.approx(slow_d) + + +def test_fewer_points_than_neighbors(): + """Requesting more neighbors than are stored returns all of them, sorted.""" + engine = _fast_path_engine() + points = [{"a": 0.0}, {"a": 2.0}, {"a": 1.0}] + for p in points: + engine.append((p,)) + _, distances = engine.search(({"a": 0.0},), n_neighbors=10) + assert distances == pytest.approx([0.0, 1.0, 2.0]) diff --git a/river/test_estimators.py b/river/test_estimators.py index c4b6c359f1..24c60ba8e7 100644 --- a/river/test_estimators.py +++ b/river/test_estimators.py @@ -43,7 +43,6 @@ def is_estimator(obj): def iter_estimators_which_can_be_tested(): ignored = ( River2SKLBase, - anomaly.LocalOutlierFactor, # needs warm-start to work correctly compose.FuncTransformer, compose.Grouper, compose.Pipeline, diff --git a/rust_src/vectordict.rs b/rust_src/vectordict.rs index 56822ed090..83e36e8bee 100644 --- a/rust_src/vectordict.rs +++ b/rust_src/vectordict.rs @@ -2044,9 +2044,10 @@ pub fn lazy_search_euclidean<'py>( let px = item_tuple.get_item(0)?; let px = px.cast::()?; let dist_sq = squared_euclid(py, qx, px)?; - let neg = -dist_sq; + // Keep the k *smallest* distances via a max-heap keyed on the (squared) distance: + // heap[0] is the farthest kept neighbor, which a closer candidate evicts. let triple = PyTuple::new(py, [ - neg.into_bound_py_any(py)?, + dist_sq.into_bound_py_any(py)?, i.into_bound_py_any(py)?, entry.clone(), ])?; @@ -2056,28 +2057,27 @@ pub fn lazy_search_euclidean<'py>( heapify_max.call1((&heap,))?; } } else { - // Compare against current max (heap[0]) + // Compare against the current farthest kept neighbor (heap[0]). let top = heap.get_item(0)?; - let top_neg = top.get_item(0)?.extract::()?; - if dist_sq < -top_neg { + let top_dist_sq = top.get_item(0)?.extract::()?; + if dist_sq < top_dist_sq { heapreplace_max.call1((&heap, triple))?; } } i += 1; } - // Sort by neg-distance descending so the smallest distance is last popped + // Sort ascending by (squared distance, insertion index) to match `heapq.nsmallest`. heap.sort()?; - heap.reverse()?; let items = pyo3::types::PyList::empty(py); let distances = pyo3::types::PyList::empty(py); for triple in heap.iter() { - let neg = triple.get_item(0)?.extract::()?; + let dist_sq = triple.get_item(0)?.extract::()?; let entry = triple.get_item(2)?; let item = entry.get_item(0)?; items.append(item)?; - distances.append((-neg).sqrt())?; + distances.append(dist_sq.sqrt())?; } Ok((items.unbind().into_any(), distances.unbind().into_any())) }