diff --git a/docs/releases/unreleased.md b/docs/releases/unreleased.md index ee0b08745b..afbd846ee3 100644 --- a/docs/releases/unreleased.md +++ b/docs/releases/unreleased.md @@ -15,6 +15,9 @@ ## covariance +- Added `EwaCovariance`, `LedoitWolfCovariance`, `OASCovariance`, and `ShrunkCovariance`: online covariance estimators for non-stationary streams (exponentially weighted, recency-biased) and high-dimensional / few-sample regimes (shrinkage towards a well-conditioned target). They are dict-native like `EmpiricalCovariance` and support mini-batches via `update_many` on any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager backend. +- Added `EwaPrecision`, an exponentially weighted precision (inverse covariance) matrix maintained online via a forgetting-factor Sherman-Morrison update. The recency-weighted counterpart of `EmpiricalPrecision`, useful for tracking Mahalanobis distances and Gaussian likelihoods on non-stationary streams. +- `EmpiricalCovariance.update_many` and `EmpiricalPrecision.update_many` now accept any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager dataframe (pandas, polars, pyarrow, ...) instead of pandas only. Outputs are unchanged for the pandas path. - Added weighted sample support to `EmpiricalCovariance.update` and `EmpiricalCovariance.revert` by accepting an optional `w` parameter and propagating it to the underlying `stats.Cov` and `stats.Var` statistics. - Sped up `EmpiricalCovariance.update`/`revert` (~40% faster at 30 features) by caching the sorted feature list and pair iteration in the hot path. No semantic change. - Restructured `EmpiricalPrecision` around NumPy-backed dense state, removing the per-update dict ↔ numpy marshalling. ~7× faster on 2000 × 20 sample streams. @@ -24,6 +27,7 @@ - Added `datasets.CriteoAds`, a 100,000-row sample of the Criteo Display Advertising Challenge (binary click prediction with 13 integer and 26 high-cardinality categorical features). A natural fit for one-hot models such as `linear_model.AdPredictor`. - Added `datasets.Shuttle`, the UCI Statlog (Shuttle) dataset cast as a binary anomaly-detection task following the ODDS benchmark (49,097 observations, 9 numerical features, ~7% anomalies). Ships bundled with River. +- Added `datasets.SP500Stocks`, daily returns (1,257 trading days, 2013-2018) for ten large-cap S&P 500 stocks across diverse sectors. A natural fit for the online covariance estimators in `river.covariance`. ## facto @@ -91,6 +95,7 @@ ## stats +- Added `stats.EWCov`, an exponentially weighted covariance between two variables (the bivariate counterpart of `stats.EWVar`). - Added `stats.ChiSquared`, a streaming Chi-squared statistic between two categorical variables. Wrap it with `utils.Rolling` for a rolling version. ## stream diff --git a/river/covariance/__init__.py b/river/covariance/__init__.py index a026e17807..9413f778f8 100644 --- a/river/covariance/__init__.py +++ b/river/covariance/__init__.py @@ -1,7 +1,34 @@ -"""Online estimation of covariance and precision matrices.""" +"""Online estimation of covariance and precision matrices. + +A covariance matrix summarises how a set of variables move together. It is the engine behind +portfolio risk, anomaly detection (via the Mahalanobis distance), Gaussian models, and many +dimensionality-reduction methods. This module estimates it (and its inverse, the precision +matrix) incrementally from a stream, without storing the data. See each estimator's docstring for +what it does and when to reach for it. + +The estimators are dict-native: `update(x)` takes a mapping and the `matrix` is a dict of pairwise +values. Most also expose an `update_many` method for mini-batches of any narwhals-compatible +dataframe. + +""" from __future__ import annotations from .emp import EmpiricalCovariance, EmpiricalPrecision +from .ewa import ( + EwaCovariance, + EwaPrecision, + LedoitWolfCovariance, + OASCovariance, + ShrunkCovariance, +) -__all__ = ["EmpiricalCovariance", "EmpiricalPrecision"] +__all__ = [ + "EmpiricalCovariance", + "EmpiricalPrecision", + "EwaCovariance", + "EwaPrecision", + "LedoitWolfCovariance", + "OASCovariance", + "ShrunkCovariance", +] diff --git a/river/covariance/emp.py b/river/covariance/emp.py index e442be4a23..13c9bc2a51 100644 --- a/river/covariance/emp.py +++ b/river/covariance/emp.py @@ -9,7 +9,7 @@ from river import stats, utils if typing.TYPE_CHECKING: - import pandas as pd + from narwhals.stable.v2.typing import IntoDataFrame class SymmetricMatrix(abc.ABC): @@ -33,6 +33,8 @@ def __getitem__(self, key): def __repr__(self): names = sorted({i for i, _ in self.matrix}) + if not names: + return f"{type(self).__name__} (empty)" headers = [""] + list(map(str, names)) columns = [headers[1:]] @@ -177,9 +179,12 @@ def revert(self, x: dict, w: float = 1.0): for i in keys: cov_dict[i, i].revert(x[i], w) - def update_many(self, X: pd.DataFrame): + def update_many(self, X: IntoDataFrame): """Update with a dataframe of samples. + Any [narwhals](https://github.com/narwhals-dev/narwhals)-compatible eager dataframe + (pandas, polars, pyarrow, ...) is accepted. + Parameters ---------- X @@ -187,15 +192,17 @@ def update_many(self, X: pd.DataFrame): """ - X_arr = X.values + frame = utils.dataframe.into_frame(X) + columns = list(frame.columns) + X_arr = utils.dataframe.to_numpy(frame) mean_arr = X_arr.mean(axis=0) cov_arr = np.cov(X_arr.T, ddof=self.ddof) - n = len(X) - mean = dict(zip(X.columns, mean_arr)) + n = len(frame) + mean = dict(zip(columns, mean_arr)) cov = { (i, j): cov_arr[r, c] - for (r, i), (c, j) in itertools.combinations_with_replacement(enumerate(X.columns), r=2) + for (r, i), (c, j) in itertools.combinations_with_replacement(enumerate(columns), r=2) } self._update_from_state(n=n, mean=mean, cov=cov) @@ -215,6 +222,7 @@ def _update_from_state(self, n: int, mean: dict, cov: float | dict): Raises ---------- KeyError: If an element in `mean` or `cov` is missing. + """ for i, j in itertools.combinations(sorted(mean.keys()), r=2): try: @@ -264,6 +272,7 @@ def _from_state(cls, n: int, mean: dict, cov: float | dict, *, ddof=1): Returns ---------- cls: A new instance of the class with updated covariance matrix. + """ new = cls(ddof=ddof) new._update_from_state(n=n, mean=mean, cov=cov) @@ -405,17 +414,21 @@ def update(self, x): self._w_arr[ids] = w self._inv_cov_mat[ix] = 0.5 * (block + block.T) - def update_many(self, X: pd.DataFrame): + def update_many(self, X: IntoDataFrame): """Update with a dataframe of samples. + Any [narwhals](https://github.com/narwhals-dev/narwhals)-compatible eager dataframe + (pandas, polars, pyarrow, ...) is accepted. + Parameters ---------- X A dataframe of samples. """ - ids = self._ensure_features(X.columns) - X_arr = np.asarray(X.values, dtype=np.float64) + frame = utils.dataframe.into_frame(X) + ids = self._ensure_features(frame.columns) + X_arr = utils.dataframe.to_numpy(frame) loc = self._loc_arr[ids].copy() w = self._w_arr[ids].copy() @@ -423,7 +436,7 @@ def update_many(self, X: pd.DataFrame): inv_cov = np.asfortranarray(self._inv_cov_mat[ix]) / np.maximum(w, 1) # update formulas - n_batch = len(X) + n_batch = len(frame) diff = X_arr - loc loc = (w * loc + n_batch * X_arr.mean(axis=0)) / (w + n_batch) w += n_batch diff --git a/river/covariance/ewa.py b/river/covariance/ewa.py new file mode 100644 index 0000000000..011e208760 --- /dev/null +++ b/river/covariance/ewa.py @@ -0,0 +1,560 @@ +from __future__ import annotations + +import typing + +import numpy as np + +from river.utils import dataframe as dataframe_utils + +from .emp import SymmetricMatrix + +if typing.TYPE_CHECKING: + from narwhals.stable.v2.typing import IntoDataFrame + +__all__ = [ + "EwaCovariance", + "EwaPrecision", + "LedoitWolfCovariance", + "OASCovariance", + "ShrunkCovariance", +] + +_EPS = 1e-12 + + +class _EWMatrix(SymmetricMatrix): + """Array-backed exponentially weighted matrix estimator with a dict-native interface. + + Shared engine for the exponentially weighted covariance and precision estimators. It keeps a + running exponentially weighted mean vector (using the same convention as `stats.EWMean`) plus a + matrix-valued state, stored as dense numpy arrays with a feature to index map (the same pattern + as `covariance.EmpiricalPrecision`). A consistent set of features is assumed across + observations, mirroring the usual covariance-estimation setting; new features seen mid-stream + grow the matrix. + """ + + def __init__(self, fading_factor: float = 0.5): + if not 0 <= fading_factor <= 1: + raise ValueError("fading_factor is not comprised between 0 and 1") + self.fading_factor = float(fading_factor) + self._features: list = [] + self._idx: dict = {} + self._mean = np.zeros(0, dtype=np.float64) + self._initialized = np.zeros(0, dtype=bool) + self._n = 0 + self._matrix_cache: np.ndarray | None = None + + # --------------------------------------------------------------- internals + def _on_grow(self, old_dim: int, new_dim: int) -> None: + """Resize the subclass's matrix state when new features appear.""" + raise NotImplementedError + + def _grow(self, new_keys: list) -> None: + old_dim = len(self._features) + for k in new_keys: + self._idx[k] = len(self._features) + self._features.append(k) + new_dim = len(self._features) + mean = np.zeros(new_dim, dtype=np.float64) + mean[:old_dim] = self._mean + init = np.zeros(new_dim, dtype=bool) + init[:old_dim] = self._initialized + self._mean, self._initialized = mean, init + self._on_grow(old_dim, new_dim) + + def _vector(self, x: dict) -> np.ndarray: + new_keys = [k for k in x if k not in self._idx] + if new_keys: + self._grow(new_keys) + try: + return np.array([x[f] for f in self._features], dtype=np.float64) + except KeyError as e: + raise ValueError( + f"observation is missing feature {e.args[0]!r}; the exponentially weighted " + "estimators assume a consistent set of features" + ) from e + + def _fresh(self) -> np.ndarray: + return ~self._initialized + + def _blend_mean(self, v: np.ndarray, fresh: np.ndarray) -> np.ndarray: + f = self.fading_factor + # Seed freshly-seen features with the observed value (like stats.EWMean's first step). + if fresh.any(): + return np.where(fresh, v, (1 - f) * self._mean + f * v) + return (1 - f) * self._mean + f * v + + def _learn_vector(self, v: np.ndarray) -> None: + raise NotImplementedError + + def _to_matrix_array(self) -> np.ndarray: + raise NotImplementedError + + def _matrix_array(self) -> np.ndarray: + if self._matrix_cache is None: + self._matrix_cache = self._to_matrix_array() + return self._matrix_cache + + # -------------------------------------------------------------- public API + def update(self, x: dict): + """Update with a single sample. + + Parameters + ---------- + x + A sample. + + """ + self._learn_vector(self._vector(x)) + + def update_many(self, X: IntoDataFrame): + """Update with a dataframe of samples. + + Any [narwhals](https://github.com/narwhals-dev/narwhals)-compatible eager dataframe + (pandas, polars, pyarrow, ...) is accepted. The result is identical to feeding the rows + one at a time with `update`, in row order. + + Parameters + ---------- + X + A dataframe of samples. + + """ + frame = dataframe_utils.into_frame(X) + columns = list(frame.columns) + new_keys = [k for k in columns if k not in self._idx] + if new_keys: + self._grow(new_keys) + ids = [self._idx[c] for c in columns] + # Lay each row out in the estimator's feature order before the sequential update. + arr = dataframe_utils.to_numpy(frame) + layout = np.empty((arr.shape[0], len(self._features)), dtype=np.float64) + layout[:] = self._mean # carry forward known features absent from this batch + layout[:, ids] = arr + for row in layout: + self._learn_vector(row) + + @property + def matrix(self) -> dict: + if not self._features: + return {} + arr = self._matrix_array() + out = {} + for ai, fa in enumerate(self._features): + for bi in range(ai, len(self._features)): + fb = self._features[bi] + out[min((fa, fb), (fb, fa))] = float(arr[ai, bi]) + return out + + def __getitem__(self, key): + i, j = key + ai = self._idx.get(i) + bi = self._idx.get(j) + if ai is None or bi is None: + raise KeyError(key) + return float(self._matrix_array()[ai, bi]) + + +class _EWCovariance(_EWMatrix): + """Exponentially weighted covariance engine. + + Tracks an exponentially weighted mean vector and second-moment matrix and exposes the + covariance ``Σ = E[xxᵀ] - E[x]E[x]ᵀ``. The diagonal matches `stats.EWVar` and the off-diagonals + match `stats.EWCov`. Shrinkage subclasses override `_to_matrix_array`. + """ + + def __init__(self, fading_factor: float = 0.5): + super().__init__(fading_factor) + self._M2 = np.zeros((0, 0), dtype=np.float64) + + def _on_grow(self, old_dim: int, new_dim: int) -> None: + M2 = np.zeros((new_dim, new_dim), dtype=np.float64) + M2[:old_dim, :old_dim] = self._M2 + self._M2 = M2 + + def _before_update(self, v: np.ndarray) -> None: + """Hook called with the new observation before the state is updated (mean/M2 are old).""" + + def _learn_vector(self, v: np.ndarray) -> None: + self._before_update(v) + f = self.fading_factor + fresh = self._fresh() + outer = np.outer(v, v) + self._mean = self._blend_mean(v, fresh) + if fresh.any(): + fresh_pair = fresh[:, None] | fresh[None, :] + self._M2 = np.where(fresh_pair, outer, (1 - f) * self._M2 + f * outer) + else: + self._M2 = (1 - f) * self._M2 + f * outer + self._initialized[fresh] = True + self._n += 1 + self._matrix_cache = None + + def _raw_cov(self) -> np.ndarray: + return self._M2 - np.outer(self._mean, self._mean) + + def _to_matrix_array(self) -> np.ndarray: + return self._raw_cov() + + +class EwaCovariance(_EWCovariance): + """Exponentially weighted covariance matrix. + + A recency-weighted estimate of the covariance: each new observation is blended into the + estimate with weight ``fading_factor``, so the influence of past observations decays + geometrically. This is the streaming analogue of the RiskMetrics covariance and the matrix + counterpart of `stats.EWVar` / `stats.EWCov` (the diagonal is exactly `stats.EWVar` and each + off-diagonal is exactly `stats.EWCov`). + + **When to use it.** Reach for this over `EmpiricalCovariance` when the relationships between + your variables change over time. The textbook example is asset returns, whose volatilities and + correlations move with the market regime: a plain empirical covariance weights a return from + years ago the same as yesterday's, whereas an exponentially weighted one forgets the distant + past so the risk estimate tracks current conditions. Larger `fading_factor` reacts faster + (shorter memory); smaller is smoother. + + Parameters + ---------- + fading_factor + The closer `fading_factor` is to 1 the more weight recent observations carry. The + effective memory is roughly ``1 / fading_factor`` observations. + + Examples + -------- + + We estimate the covariance of daily returns (in percent) for a few stocks from the + `datasets.SP500Stocks` dataset. + + >>> from river import covariance, datasets + + >>> tickers = ["AAPL", "JPM", "XOM"] + >>> cov = covariance.EwaCovariance(fading_factor=0.02) + >>> for x, _ in datasets.SP500Stocks(): + ... cov.update({t: x[t] for t in tickers}) + >>> cov + AAPL JPM XOM + AAPL 1.944 0.766 0.760 + JPM 0.766 1.492 0.934 + XOM 0.760 0.934 1.705 + + There is also an `update_many` method to process mini-batches. It accepts any + narwhals-compatible dataframe and yields the same result as feeding the rows one by one. + + >>> import pandas as pd + >>> returns = pd.DataFrame(x for x, _ in datasets.SP500Stocks())[tickers] + >>> cov = covariance.EwaCovariance(fading_factor=0.02) + >>> cov.update_many(returns) + >>> cov + AAPL JPM XOM + AAPL 1.944 0.766 0.760 + JPM 0.766 1.492 0.934 + XOM 0.760 0.934 1.705 + + Individual entries are accessible by key: + + >>> cov["AAPL", "JPM"] + 0.766... + + References + ---------- + [^1]: [RiskMetrics Technical Document (J.P. Morgan, 1996)](https://www.msci.com/documents/10199/5915b101-4206-4ba0-aee2-3449d5c7e95a) + + """ + + +class LedoitWolfCovariance(_EWCovariance): + """Online Ledoit-Wolf shrinkage covariance. + + Shrinks the exponentially weighted sample covariance towards a scaled identity (a sphere) by a + data-driven intensity, following Ledoit & Wolf (2004). The shrinkage intensity is estimated + online from the dispersion of the per-observation scatter around the running covariance, and is + recomputed on read; the per-step update stays O(d^2) with no stored window. + + **When to use it.** The raw sample covariance is a poor estimate when the number of variables + is large relative to the (effective) number of observations: it is noisy and often + ill-conditioned or singular, which wrecks anything that inverts it (portfolio optimisation, + Mahalanobis distances, Gaussian likelihoods). Shrinkage trades a little bias for a large + reduction in variance, producing a well-conditioned, invertible matrix. Ledoit-Wolf picks the + shrinkage intensity for you, so it is a strong default when there are many assets relative to + clean history. + + Parameters + ---------- + fading_factor + Recency weight of the underlying exponentially weighted covariance. The effective sample + size is roughly ``1 / fading_factor``. + + Examples + -------- + + On the ten stocks of `datasets.SP500Stocks` with a short effective memory, the raw covariance is + poorly conditioned; Ledoit-Wolf shrinkage tames it. + + >>> import numpy as np + >>> from river import covariance, datasets + + >>> ewa = covariance.EwaCovariance(fading_factor=0.05) + >>> lw = covariance.LedoitWolfCovariance(fading_factor=0.05) + >>> for x, _ in datasets.SP500Stocks(): + ... ewa.update(x) + ... lw.update(x) + + >>> def condition_number(cov): + ... names = sorted({i for i, _ in cov.matrix}) + ... M = np.array([[cov[i, j] for j in names] for i in names]) + ... return np.linalg.cond(M) + + >>> condition_number(ewa) > condition_number(lw) + np.True_ + + References + ---------- + [^1]: [Ledoit, O. and Wolf, M., 2004. A well-conditioned estimator for large-dimensional covariance matrices. Journal of Multivariate Analysis, 88(2), pp.365-411.](https://www.ledoit.net/honey.pdf) + + """ + + def __init__(self, fading_factor: float = 0.5): + super().__init__(fading_factor) + self._pi_bar = 0.0 # running dispersion of the per-sample scatter about the covariance + + def _before_update(self, v: np.ndarray) -> None: + d = len(v) + if d == 0: + return + dev = v - self._mean + scatter = np.outer(dev, dev) + cov_old = self._raw_cov() + q = float(((scatter - cov_old) ** 2).sum() / d) + f = self.fading_factor + self._pi_bar = (1 - f) * self._pi_bar + f * q + + def _to_matrix_array(self) -> np.ndarray: + S = self._raw_cov() + d = S.shape[0] + mu = np.trace(S) / d + identity = np.eye(d) + disp = float(((S - mu * identity) ** 2).sum() / d) + if disp <= _EPS or self._pi_bar <= 0: + return S + # Effective sample size of the exponentially weighted estimator is ~ 1 / fading_factor. + b2 = min(self._pi_bar * self.fading_factor, disp) + intensity = float(np.clip(b2 / disp, 0.0, 1.0)) + return (1 - intensity) * S + intensity * mu * identity + + +class OASCovariance(_EWCovariance): + """Online Oracle Approximating Shrinkage (OAS) covariance. + + Like `LedoitWolfCovariance`, OAS shrinks the exponentially weighted sample covariance towards a + scaled identity, but uses the Chen, Wiesel, Eldar & Hero (2010) shrinkage intensity, which is + often better conditioned for approximately Gaussian data. The intensity is a closed form in the + traces of the running covariance, applied on read; the per-step update is a plain O(d^2) + exponentially weighted covariance update. + + **When to use it.** The same high-dimensional / few-sample situations as + `LedoitWolfCovariance`. OAS tends to shrink slightly more aggressively and is a good choice when + the data are close to Gaussian; otherwise the two are interchangeable and worth comparing. + + Parameters + ---------- + fading_factor + Recency weight of the underlying exponentially weighted covariance. The effective sample + size is roughly ``1 / fading_factor``. + + Examples + -------- + + >>> import numpy as np + >>> from river import covariance, datasets + + >>> oas = covariance.OASCovariance(fading_factor=0.05) + >>> for x, _ in datasets.SP500Stocks(): + ... oas.update(x) + + The shrinkage keeps the matrix positive-definite and invertible: + + >>> names = sorted({i for i, _ in oas.matrix}) + >>> M = np.array([[oas[i, j] for j in names] for i in names]) + >>> bool(np.all(np.linalg.eigvalsh(M) > 0)) + True + + References + ---------- + [^1]: [Chen, Y., Wiesel, A., Eldar, Y.C. and Hero, A.O., 2010. Shrinkage algorithms for MMSE covariance estimation. IEEE Transactions on Signal Processing, 58(10), pp.5016-5029.](https://arxiv.org/abs/0907.4698) + + """ + + def _to_matrix_array(self) -> np.ndarray: + S = self._raw_cov() + d = S.shape[0] + n = max(1.0 / self.fading_factor, 2.0) + tr = np.trace(S) + tr2 = np.trace(S @ S) + mu = tr / d + num = (1.0 - 2.0 / d) * tr2 + tr * tr + den = (n + 1.0 - 2.0 / d) * (tr2 - tr * tr / d) + rho = 1.0 if den <= 0 else min(max(num / den, 0.0), 1.0) + return (1.0 - rho) * S + rho * mu * np.eye(d) + + +class ShrunkCovariance(_EWCovariance): + """Fixed-intensity shrinkage covariance, towards a constant-correlation or identity target. + + Where `LedoitWolfCovariance` and `OASCovariance` estimate the shrinkage intensity from the + data, `ShrunkCovariance` uses a fixed `delta` (transparent and predictable, mirroring + `sklearn.covariance.ShrunkCovariance`) and offers a choice of target. + + **When to use it.** When you want explicit, reproducible control over how much regularisation is + applied rather than letting the data decide. The **constant-correlation** target (every pair + shrunk towards the average sample correlation) is the finance-relevant default: assets share a + positive baseline correlation, so pulling towards that is more sensible than pulling towards the + zero-correlation identity target. + + Parameters + ---------- + fading_factor + Recency weight of the underlying exponentially weighted covariance. + delta + Shrinkage intensity in [0, 1] (0 = the raw covariance, 1 = the target). + target + Either ``"constant_correlation"`` (default) or ``"identity"``. + + Examples + -------- + + >>> from river import covariance, datasets + + >>> tickers = ["AAPL", "JPM", "XOM"] + >>> cov = covariance.ShrunkCovariance(fading_factor=0.02, delta=0.3) + >>> for x, _ in datasets.SP500Stocks(): + ... cov.update({t: x[t] for t in tickers}) + >>> cov + AAPL JPM XOM + AAPL 1.944 0.784 0.797 + JPM 0.784 1.492 0.885 + XOM 0.797 0.885 1.705 + + With ``delta=0`` the estimator reduces to a plain exponentially weighted covariance, and with + ``delta=1`` it returns the target exactly. + + References + ---------- + [^1]: [Ledoit, O. and Wolf, M., 2003. Improved estimation of the covariance matrix of stock returns with an application to portfolio selection. Journal of Empirical Finance, 10(5), pp.603-621.](https://www.ledoit.net/ole2.pdf) + + """ + + def __init__( + self, + fading_factor: float = 0.5, + delta: float = 0.1, + target: str = "constant_correlation", + ): + if target not in ("constant_correlation", "identity"): + raise ValueError("target must be 'constant_correlation' or 'identity'") + super().__init__(fading_factor) + self.delta = delta + self.target = target + + def _to_matrix_array(self) -> np.ndarray: + S = self._raw_cov() + d = S.shape[0] + if self.target == "identity": + target = (np.trace(S) / d) * np.eye(d) + else: # constant_correlation + sd = np.sqrt(np.maximum(np.diag(S), _EPS)) + corr = S / np.outer(sd, sd) + off = corr[~np.eye(d, dtype=bool)] + rbar = float(np.mean(off)) if off.size else 0.0 + target = rbar * np.outer(sd, sd) + np.fill_diagonal(target, np.diag(S)) # preserve the variances + return (1.0 - self.delta) * S + self.delta * target + + +class EwaPrecision(_EWMatrix): + """Exponentially weighted precision (inverse covariance) matrix. + + The recency-weighted analogue of `EmpiricalPrecision`. It maintains the inverse of an + exponentially weighted second-moment matrix online via a forgetting-factor Sherman-Morrison + update (the recursive-least-squares trick), and applies the mean centering on read. It is + genuinely online: the per-step cost is O(d^2) and the matrix is never explicitly inverted. + + **When to use it.** Several methods need the *precision* matrix rather than the covariance: + the Mahalanobis distance (anomaly detection), the Gaussian log-likelihood, and the weights of a + Gaussian graphical model. Use this when those quantities must track a non-stationary stream, + where inverting a stale covariance would lag. Like `EmpiricalPrecision`, the result is not + guaranteed identical to inverting the matching covariance (there is a decaying identity prior), + but the difference shrinks as observations accumulate. Requires ``0 < fading_factor < 1``. + + Parameters + ---------- + fading_factor + Recency weight of the most recent observation. The effective memory is roughly + ``1 / fading_factor`` observations. + + Examples + -------- + + >>> from river import covariance, datasets + + >>> tickers = ["AAPL", "JPM", "XOM"] + >>> prec = covariance.EwaPrecision(fading_factor=0.02) + >>> for x, _ in datasets.SP500Stocks(): + ... prec.update({t: x[t] for t in tickers}) + >>> prec + AAPL JPM XOM + AAPL 0.676 -0.241 -0.169 + JPM -0.241 1.105 -0.498 + XOM -0.169 -0.498 0.934 + + Up to the decaying prior, this approximates the inverse of the matching `EwaCovariance`: + + >>> import numpy as np + >>> cov = covariance.EwaCovariance(fading_factor=0.02) + >>> for x, _ in datasets.SP500Stocks(): + ... cov.update({t: x[t] for t in tickers}) + >>> S = np.array([[cov[i, j] for j in tickers] for i in tickers]) + >>> P = np.array([[prec[i, j] for j in tickers] for i in tickers]) + >>> bool(np.allclose(P @ S, np.eye(3), atol=1e-6)) + True + + References + ---------- + [^1]: [Online Estimation of the Inverse Covariance Matrix - Markus Thill](https://markusthill.github.io/math/stats/ml/online-estimation-of-the-inverse-covariance-matrix/) + [^2]: [Recursive least squares filter](https://en.wikipedia.org/wiki/Recursive_least_squares_filter) + + """ + + def __init__(self, fading_factor: float = 0.5): + if not 0 < fading_factor < 1: + raise ValueError("fading_factor must be strictly between 0 and 1") + super().__init__(fading_factor) + self._Pm = np.zeros((0, 0), dtype=np.float64) # inverse of the EW second-moment matrix + + def _on_grow(self, old_dim: int, new_dim: int) -> None: + # New features start from an identity prior on the second-moment matrix. + Pm = np.eye(new_dim, dtype=np.float64) + Pm[:old_dim, :old_dim] = self._Pm + self._Pm = Pm + + def _learn_vector(self, v: np.ndarray) -> None: + f = self.fading_factor + fresh = self._fresh() + # Forgetting-factor Sherman-Morrison update of inv(M2), where M2 <- (1-f) M2 + f v vᵀ. + Pm = self._Pm / (1 - f) + Pv = Pm @ v + denom = 1.0 + f * float(v @ Pv) + Pm = Pm - f * np.outer(Pv, Pv) / denom + self._Pm = 0.5 * (Pm + Pm.T) + self._mean = self._blend_mean(v, fresh) + self._initialized[fresh] = True + self._n += 1 + self._matrix_cache = None + + def _to_matrix_array(self) -> np.ndarray: + # precision = inv(M2 - mean meanᵀ), via a rank-one Sherman-Morrison downdate of inv(M2). + Pm = self._Pm + m = self._mean + Pm_m = Pm @ m + denom = 1.0 - float(m @ Pm_m) + prec = Pm if denom <= _EPS else Pm + np.outer(Pm_m, Pm_m) / denom + return 0.5 * (prec + prec.T) diff --git a/river/covariance/test_emp.py b/river/covariance/test_emp.py index be826625e4..fd34dba7ce 100644 --- a/river/covariance/test_emp.py +++ b/river/covariance/test_emp.py @@ -164,6 +164,37 @@ def test_covariance_update_many_sampled(): assert math.isclose(cov[i, j].get(), pd_cov.loc[i, j]) +def test_covariance_update_many_backend_agnostic(frame_backend): + """`update_many` yields identical covariances across every dataframe backend.""" + np.random.seed(0) + data = {col: np.random.random(40).tolist() for col in ["a", "b", "c"]} + + reference = covariance.EmpiricalCovariance() + reference.update_many(pd.DataFrame(data)) + + cov = covariance.EmpiricalCovariance() + cov.update_many(frame_backend.frame(data)) + + assert cov.matrix.keys() == reference.matrix.keys() + for key in reference.matrix: + assert math.isclose(cov[key].get(), reference[key].get(), rel_tol=1e-9, abs_tol=1e-12) + + +def test_precision_update_many_backend_agnostic(frame_backend): + """`update_many` yields identical precisions across every dataframe backend.""" + np.random.seed(0) + data = {col: np.random.random(60).tolist() for col in ["a", "b", "c"]} + + reference = covariance.EmpiricalPrecision() + reference.update_many(pd.DataFrame(data)) + + prec = covariance.EmpiricalPrecision() + prec.update_many(frame_backend.frame(data)) + + for i, j in reference.matrix: + assert math.isclose(prec[i, j], reference[i, j], rel_tol=1e-9, abs_tol=1e-12) + + def test_precision_update_shuffled(): C1 = covariance.EmpiricalPrecision() C2 = covariance.EmpiricalPrecision() diff --git a/river/covariance/test_ewa.py b/river/covariance/test_ewa.py new file mode 100644 index 0000000000..6d84114d34 --- /dev/null +++ b/river/covariance/test_ewa.py @@ -0,0 +1,342 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd +import pytest + +from river import covariance, stats, stream + + +def _value(entry): + # EmpiricalCovariance stores stats.Cov/Var objects; EWA stores plain floats. + return entry.get() if isinstance(entry, stats.base.Statistic) else entry + + +def _dense(cov): + """Materialize a SymmetricMatrix into a dense numpy array (public-API only).""" + names = sorted({i for i, _ in cov.matrix}) + return np.array([[_value(cov[i, j]) for j in names] for i in names], dtype=float) + + +def _ewa_reference(X, fading_factor): + """Independent exponentially weighted covariance, matching the stats.EWMean convention.""" + f = fading_factor + mean = M2 = None + for v in X: + v = np.asarray(v, dtype=float) + if mean is None: + mean, M2 = v.copy(), np.outer(v, v) + else: + mean = (1 - f) * mean + f * v + M2 = (1 - f) * M2 + f * np.outer(v, v) + return M2 - np.outer(mean, mean) + + +def _precision_reference(X, fading_factor): + """Independent EW precision: inverse of the EW second-moment matrix (identity prior).""" + f = fading_factor + d = X.shape[1] + M2 = np.eye(d) + mean = None + for v in X: + v = np.asarray(v, dtype=float) + M2 = (1 - f) * M2 + f * np.outer(v, v) + mean = v.copy() if mean is None else (1 - f) * mean + f * v + return np.linalg.inv(M2 - np.outer(mean, mean)) + + +@pytest.fixture +def returns(): + rng = np.random.default_rng(0) + true_cov = [[1.0, 0.5, 0.3], [0.5, 1.0, 0.4], [0.3, 0.4, 1.0]] + return rng.multivariate_normal(mean=[0, 0, 0], cov=true_cov, size=300) + + +# --------------------------------------------------------------------- EwaCovariance core + + +def test_ewa_matches_numpy_reference(returns): + cov = covariance.EwaCovariance(fading_factor=0.05) + for x, _ in stream.iter_array(returns): + cov.update(x) + np.testing.assert_allclose(_dense(cov), _ewa_reference(returns, 0.05)) + + +def test_ewa_diagonal_matches_ewvar_and_offdiag_ewcov(returns): + f = 0.05 + cov = covariance.EwaCovariance(fading_factor=f) + ewvars = [stats.EWVar(fading_factor=f) for _ in range(3)] + ewcov_01 = stats.EWCov(fading_factor=f) + for x, _ in stream.iter_array(returns): + cov.update(x) + for i in range(3): + ewvars[i].update(x[i]) + ewcov_01.update(x[0], x[1]) + + for i in range(3): + assert cov[i, i] == pytest.approx(ewvars[i].get()) + assert cov[0, 1] == pytest.approx(ewcov_01.get()) + + +@pytest.mark.parametrize( + "estimator", + [ + covariance.EwaCovariance, + covariance.EwaPrecision, + covariance.LedoitWolfCovariance, + covariance.OASCovariance, + covariance.ShrunkCovariance, + ], +) +def test_single_equals_minibatch(returns, estimator): + one_by_one = estimator() + for x, _ in stream.iter_array(returns): + one_by_one.update(x) + + batched = estimator() + batched.update_many(pd.DataFrame(returns, columns=[0, 1, 2])) + + np.testing.assert_allclose(_dense(one_by_one), _dense(batched)) + + +def test_minibatch_backend_agnostic(returns): + pl = pytest.importorskip("polars") + df = pd.DataFrame(returns, columns=["a", "b", "c"]) + + pandas_cov = covariance.EwaCovariance(fading_factor=0.05) + pandas_cov.update_many(df) + + polars_cov = covariance.EwaCovariance(fading_factor=0.05) + polars_cov.update_many(pl.from_pandas(df)) + + np.testing.assert_allclose(_dense(pandas_cov), _dense(polars_cov)) + + +@pytest.mark.parametrize( + "estimator", + [ + covariance.EmpiricalCovariance, + covariance.EmpiricalPrecision, + covariance.EwaCovariance, + covariance.EwaPrecision, + covariance.ShrunkCovariance, + ], +) +def test_empty_repr_does_not_crash(estimator): + assert repr(estimator()) == f"{estimator.__name__} (empty)" + + +def test_missing_feature_raises(): + cov = covariance.EwaCovariance() + cov.update({"a": 1.0, "b": 2.0}) + with pytest.raises(ValueError, match="missing feature"): + cov.update({"a": 1.0}) + + +# --------------------------------------------------------------------- EwaPrecision + + +def test_ewa_precision_matches_reference(returns): + prec = covariance.EwaPrecision(fading_factor=0.05) + for x, _ in stream.iter_array(returns): + prec.update(x) + np.testing.assert_allclose(_dense(prec), _precision_reference(returns, 0.05)) + + +def test_ewa_precision_inverts_covariance(): + # Enough samples that the decaying identity prior (~ (1 - f)^n) is numerically negligible. + rng = np.random.default_rng(2) + X = rng.multivariate_normal( + mean=[0, 0, 0], cov=[[1.0, 0.5, 0.3], [0.5, 1.0, 0.4], [0.3, 0.4, 1.0]], size=2000 + ) + f = 0.05 + cov = covariance.EwaCovariance(fading_factor=f) + prec = covariance.EwaPrecision(fading_factor=f) + for x, _ in stream.iter_array(X): + cov.update(x) + prec.update(x) + S, P = _dense(cov), _dense(prec) + np.testing.assert_allclose(P @ S, np.eye(S.shape[0]), atol=1e-9) + + +def test_ewa_precision_requires_strict_fading_factor(): + for bad in (0.0, 1.0): + with pytest.raises(ValueError, match="strictly between 0 and 1"): + covariance.EwaPrecision(fading_factor=bad) + + +# --------------------------------------------------------------------- shrinkage estimators + + +@pytest.mark.parametrize( + "estimator", + [ + covariance.LedoitWolfCovariance, + covariance.OASCovariance, + covariance.EwaPrecision, + lambda: covariance.ShrunkCovariance(target="identity", delta=0.2), + lambda: covariance.ShrunkCovariance(target="constant_correlation", delta=0.2), + ], +) +def test_matrix_is_symmetric(returns, estimator): + cov = estimator() + for x, _ in stream.iter_array(returns): + cov.update(x) + M = _dense(cov) + np.testing.assert_allclose(M, M.T) + + +@pytest.mark.parametrize( + "estimator", + [ + covariance.LedoitWolfCovariance, + covariance.OASCovariance, + lambda: covariance.ShrunkCovariance(target="identity", delta=0.2), + ], +) +def test_shrinkage_towards_identity_is_psd(returns, estimator): + cov = estimator() + for x, _ in stream.iter_array(returns): + cov.update(x) + eigvals = np.linalg.eigvalsh(_dense(cov)) + assert np.all(eigvals > -1e-9) + + +def test_shrunk_identity_matches_sklearn(returns): + from sklearn.covariance import shrunk_covariance + + f, delta = 0.05, 0.3 + raw = covariance.EwaCovariance(fading_factor=f) + shrunk = covariance.ShrunkCovariance(fading_factor=f, delta=delta, target="identity") + for x, _ in stream.iter_array(returns): + raw.update(x) + shrunk.update(x) + + expected = shrunk_covariance(_dense(raw), shrinkage=delta) + np.testing.assert_allclose(_dense(shrunk), expected) + + +def test_shrunk_delta_bounds(returns): + f = 0.05 + raw = covariance.EwaCovariance(fading_factor=f) + no_shrink = covariance.ShrunkCovariance(fading_factor=f, delta=0.0, target="identity") + full_shrink = covariance.ShrunkCovariance(fading_factor=f, delta=1.0, target="identity") + for x, _ in stream.iter_array(returns): + raw.update(x) + no_shrink.update(x) + full_shrink.update(x) + + S = _dense(raw) + np.testing.assert_allclose(_dense(no_shrink), S) + mu = np.trace(S) / S.shape[0] + np.testing.assert_allclose(_dense(full_shrink), mu * np.eye(S.shape[0]), atol=1e-9) + + +def test_ledoitwolf_is_convex_combination_towards_identity(returns): + f = 0.05 + raw = covariance.EwaCovariance(fading_factor=f) + lw = covariance.LedoitWolfCovariance(fading_factor=f) + for x, _ in stream.iter_array(returns): + raw.update(x) + lw.update(x) + + S, L = _dense(raw), _dense(lw) + mu = np.trace(S) / S.shape[0] + # Recover the intensity from an off-diagonal entry (the identity target is 0 there). + delta = 1 - L[0, 1] / S[0, 1] + assert 0.0 <= delta <= 1.0 + np.testing.assert_allclose(L, (1 - delta) * S + delta * mu * np.eye(S.shape[0]), atol=1e-9) + + +def test_oas_matches_formula(returns): + f = 0.05 + raw = covariance.EwaCovariance(fading_factor=f) + oas = covariance.OASCovariance(fading_factor=f) + for x, _ in stream.iter_array(returns): + raw.update(x) + oas.update(x) + + S = _dense(raw) + d = S.shape[0] + n = max(1.0 / f, 2.0) + tr, tr2 = np.trace(S), np.trace(S @ S) + mu = tr / d + num = (1 - 2 / d) * tr2 + tr * tr + den = (n + 1 - 2 / d) * (tr2 - tr * tr / d) + rho = 1.0 if den <= 0 else min(max(num / den, 0.0), 1.0) + np.testing.assert_allclose(_dense(oas), (1 - rho) * S + rho * mu * np.eye(d)) + + +def test_shrinkage_improves_conditioning(): + # Many variables, short effective memory: the raw covariance is ill-conditioned. + rng = np.random.default_rng(1) + true_cov = 0.7 * np.ones((12, 12)) + 0.3 * np.eye(12) + X = rng.multivariate_normal(mean=np.zeros(12), cov=true_cov, size=200) + + raw = covariance.EwaCovariance(fading_factor=0.05) + lw = covariance.LedoitWolfCovariance(fading_factor=0.05) + oas = covariance.OASCovariance(fading_factor=0.05) + for x, _ in stream.iter_array(X): + raw.update(x) + lw.update(x) + oas.update(x) + + raw_cond = np.linalg.cond(_dense(raw)) + assert np.linalg.cond(_dense(lw)) < raw_cond + assert np.linalg.cond(_dense(oas)) < raw_cond + + +@pytest.mark.parametrize( + "estimator", + [ + covariance.EwaCovariance, + covariance.EwaPrecision, + covariance.LedoitWolfCovariance, + covariance.OASCovariance, + covariance.ShrunkCovariance, + ], +) +def test_pickle_round_trip(returns, estimator): + import pickle + + cov = estimator() + for x, _ in stream.iter_array(returns): + cov.update(x) + restored = pickle.loads(pickle.dumps(cov)) + np.testing.assert_allclose(_dense(restored), _dense(cov)) + + +# --------------------------------------------------------------------- EmpiricalCovariance + + +def test_empirical_covariance_matches_sklearn(returns): + from sklearn.covariance import EmpiricalCovariance as SklearnEmpiricalCovariance + + # sklearn uses the maximum-likelihood estimate, i.e. ddof=0. + cov = covariance.EmpiricalCovariance(ddof=0) + for x, _ in stream.iter_array(returns): + cov.update(x) + + expected = SklearnEmpiricalCovariance().fit(returns).covariance_ + np.testing.assert_allclose(_dense(cov), expected) + + +# --------------------------------------------------------------------- stats.EWCov + + +def test_ewcov_matches_pandas(): + # pandas computes the EW covariance independently of the E[xy] - E[x]E[y] identity used by + # stats.EWCov. adjust=False / bias=True match its recursive, uncorrected convention. + f = 0.3 + x = [1.0, 3.0, 5.0, 4.0, 6.0] + y = [2.0, 4.0, 3.0, 6.0, 5.0] + ewcov = stats.EWCov(fading_factor=f) + + values = [] + for xi, yi in zip(x, y): + ewcov.update(xi, yi) + values.append(ewcov.get()) + + df = pd.DataFrame({"x": x, "y": y}) + expected = df["x"].ewm(alpha=f, adjust=False).cov(df["y"], bias=True) + np.testing.assert_allclose(values, expected.to_numpy()) diff --git a/river/datasets/__init__.py b/river/datasets/__init__.py index c6c7f83a36..118a5ff4d2 100644 --- a/river/datasets/__init__.py +++ b/river/datasets/__init__.py @@ -32,6 +32,7 @@ from .sms_spam import SMSSpam from .smtp import SMTP from .solar_flare import SolarFlare +from .sp500 import SP500Stocks from .taxis import Taxis from .trec07 import TREC07 from .trump_approval import TrumpApproval @@ -63,6 +64,7 @@ "SMSSpam", "SMTP", "SolarFlare", + "SP500Stocks", "synth", "Taxis", "TREC07", diff --git a/river/datasets/sp500.csv.gz b/river/datasets/sp500.csv.gz new file mode 100644 index 0000000000..b769a2c3bc Binary files /dev/null and b/river/datasets/sp500.csv.gz differ diff --git a/river/datasets/sp500.py b/river/datasets/sp500.py new file mode 100644 index 0000000000..8de635ad4b --- /dev/null +++ b/river/datasets/sp500.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from river import stream + +from . import base + +_TICKERS = ["AAPL", "AMZN", "IBM", "INTC", "JNJ", "JPM", "KO", "MSFT", "WMT", "XOM"] + + +class SP500Stocks(base.FileDataset): + """Daily returns of ten S&P 500 stocks. + + Daily simple returns of ten large-capitalisation S&P 500 stocks, spanning a diverse set of + sectors (technology, retail, finance, energy, healthcare, consumer goods): Apple (`AAPL`), + Amazon (`AMZN`), IBM (`IBM`), Intel (`INTC`), Johnson & Johnson (`JNJ`), JPMorgan (`JPM`), + Coca-Cola (`KO`), Microsoft (`MSFT`), Walmart (`WMT`) and ExxonMobil (`XOM`). The returns are + computed from split-adjusted closing prices over February 2013 to February 2018 (1,257 trading + days), a subset of the popular "S&P 500" stock dataset, and are expressed as percentages + (a return of 1.0 means +1%). + + Each observation is one trading day: the features are that day's returns for the ten stocks, + and the target is the *next* trading day's return of an equal-weighted portfolio of the ten + (a simple market-direction forecasting target). The features on their own are a natural fit + for the online covariance estimators in `river.covariance`. + + References + ---------- + [^1]: [S&P 500 stock data (Kaggle)](https://www.kaggle.com/datasets/camnugent/sandp500) + + """ + + def __init__(self): + super().__init__( + filename="sp500.csv.gz", + task=base.REG, + n_features=10, + n_samples=1_257, + ) + + def __iter__(self): + return stream.iter_csv( + self.path, + target="next_day_return", + drop=["date"], + converters={ticker: float for ticker in _TICKERS} | {"next_day_return": float}, + ) diff --git a/river/stats/__init__.py b/river/stats/__init__.py index 06cee3f43b..88549a5786 100644 --- a/river/stats/__init__.py +++ b/river/stats/__init__.py @@ -8,6 +8,7 @@ from .count import Count from .cov import Cov from .entropy import Entropy +from .ewcov import EWCov from .ewmean import EWMean from .ewvar import EWVar from .iqr import IQR, RollingIQR @@ -37,6 +38,7 @@ "Cov", "ChiSquared", "Entropy", + "EWCov", "EWMean", "EWVar", "IQR", diff --git a/river/stats/ewcov.py b/river/stats/ewcov.py new file mode 100644 index 0000000000..fc2011c051 --- /dev/null +++ b/river/stats/ewcov.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from river import stats + + +class EWCov(stats.base.Bivariate): + """Exponentially weighted covariance. + + This is the bivariate counterpart of `stats.EWVar`. It tracks the covariance between two + variables while giving more weight to recent observations, which is what you want when the + relationship between the variables drifts over time (e.g. the co-movement of two asset + returns during a changing market regime). + + Internally it uses the identity $Cov(x, y) = E[xy] - E[x]E[y]$, with each expectation + estimated by an exponentially weighted mean (`stats.EWMean`). Using the same `fading_factor` + on the diagonal recovers `stats.EWVar` exactly. + + Parameters + ---------- + fading_factor + The closer `fading_factor` is to 1 the more the statistic will adapt to recent values. + + Examples + -------- + + >>> from river import stats + + >>> x = [1, 3, 5, 4] + >>> y = [2, 4, 3, 6] + + >>> ewcov = stats.EWCov(fading_factor=0.5) + >>> for xi, yi in zip(x, y): + ... ewcov.update(xi, yi) + ... print(ewcov.get()) + 0.0 + 1.0 + 0.5 + 0.625 + + References + ---------- + [^1]: [Finch, T., 2009. Incremental calculation of weighted mean and variance. University of Cambridge, 4(11-5), pp.41-42.](https://fanf2.user.srcf.net/hermes/doc/antiforgery/stats.pdf) + [^2]: [Exponential Moving Average on Streaming Data](https://dev.to/nestedsoftware/exponential-moving-average-on-streaming-data-4hhl) + + """ + + def __init__(self, fading_factor=0.5): + if not 0 <= fading_factor <= 1: + raise ValueError("fading_factor is not comprised between 0 and 1") + self.fading_factor = fading_factor + self._mean_x = stats.EWMean(fading_factor) + self._mean_y = stats.EWMean(fading_factor) + self._mean_xy = stats.EWMean(fading_factor) + + @property + def name(self): + return f"ewcov_{self.fading_factor}" + + def update(self, x, y): + self._mean_x.update(x) + self._mean_y.update(y) + self._mean_xy.update(x * y) + + def get(self): + return self._mean_xy.get() - self._mean_x.get() * self._mean_y.get()