From 8aac6f5bbe21d4b759ac37fdff9add508a8fc614 Mon Sep 17 00:00:00 2001 From: Max Halford Date: Fri, 26 Jun 2026 16:24:33 +0200 Subject: [PATCH 1/3] feat(preprocessing,compose): make StandardScaler & compose mini-batching dataframe-agnostic via narwhals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Route StandardScaler's and the composition primitives' mini-batch methods through the same narwhals boundary as the GLM/OneHotEncoder paths, so a whole pipeline can be mini-batched on any narwhals-supported eager backend (pandas, polars, pyarrow, nullable/arrow-backed pandas, ...). The numpy compute cores are untouched and the input backend (including the pandas index) is rebuilt on output. preprocessing/scale.py — StandardScaler: - learn_many wraps via into_frame and drops to a float64 numpy matrix; the windowed branch iterates rows backend-agnostically. - transform_many keeps a verbatim classic-pandas fast path (in-place divide, no-copy frame, float-dtype preservation) and adds an agnostic float64 path for every other backend. Pandas output is byte-for-byte unchanged. compose: - Pipeline: type hints only — the orchestration was already backend-agnostic. - TransformerUnion: pd.concat(axis=1) -> nw.concat(how="horizontal"). - Select: X.loc[...].copy() -> narwhals select (still pure). - TransformerProduct: keeps the pandas Sparse[uint8] fast path, adds an agnostic elementwise-product path for other backends. Adds cross-backend tests (mixed-dtype TrumpApproval, chunked learning, emerging/ disappearing/reordered features, native-backend round-trip) using the frame_backend fixture. Pandas remains the oracle. Refs #1919. Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/releases/unreleased.md | 2 + river/compose/pipeline.py | 12 +- river/compose/product.py | 28 ++++- river/compose/select.py | 18 +-- river/compose/union.py | 24 ++-- river/preprocessing/scale.py | 69 ++++++++-- river/preprocessing/test_scale.py | 202 ++++++++++++++++++++++++++++++ 7 files changed, 315 insertions(+), 40 deletions(-) diff --git a/docs/releases/unreleased.md b/docs/releases/unreleased.md index 44392b1f87..5b169da26d 100644 --- a/docs/releases/unreleased.md +++ b/docs/releases/unreleased.md @@ -16,6 +16,7 @@ ## compose +- The mini-batch methods of `compose.Pipeline`, `compose.TransformerUnion`, `compose.Select`, and `compose.TransformerProduct` now accept and return any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager backend (pandas, polars, pyarrow, ...) instead of being pandas-only, so a whole pipeline can be mini-batched on a non-pandas backend. The input backend is preserved on output, including the pandas index, and `pandas` is no longer required unless the input is a pandas frame. `TransformerProduct` keeps the pandas `Sparse[uint8]` fast path when crossing one-hot encoded features. - `compose.Pipeline` now forwards extra keyword arguments (such as the timestamp `t` used by `utils.TimeRolling`, or a sample weight `w`) to each step whose method declares them, and drops them for steps that don't. This makes `feature_extraction.Agg`/`TargetAgg` backed by `utils.TimeRolling` work inside a pipeline via `model.learn_one(x, y, t=t)`. Routing applies to `learn_one` and to the predict-time methods (`predict_one`, `predict_proba_one`, `score_one`, `transform_one`), so it also works under `compose.learn_during_predict` where unsupervised steps learn during `predict_one(x, t=t)`. Fixes [#1600](https://github.com/online-ml/river/issues/1600). The accepted arguments are determined once when the pipeline plan is built, so pipelines with no extra arguments keep their previous speed. ## covariance @@ -78,6 +79,7 @@ - `preprocessing.FeatureHasher` now hashes with MurmurHash3 in Rust, making it much faster. It gains an `alternate_sign` parameter (default `True`, matching scikit-learn) and returns a plain `dict`. Hashed feature indices differ from previous versions. - `preprocessing.OneHotEncoder` mini-batch methods (`learn_many`, `transform_many`) now accept and return any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager backend (pandas, polars, pyarrow, ...) instead of being pandas-only, preserving the input backend (including the pandas index) on output. The pandas path keeps returning `Sparse[uint8]` columns; other backends return dense integer columns, as they have no sparse-array equivalent. `transform_many` only requires `pandas` when the input is a pandas frame. - `preprocessing.OrdinalEncoder` mini-batch methods (`learn_many`, `predict_many`, `predict_proba_many`) now accept and return any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager backend (pandas, polars, pyarrow, ...) instead of being pandas-only. The input backend is preserved on output, including the pandas index. These methods no longer require `pandas` to be installed. +- `preprocessing.StandardScaler` mini-batch methods (`learn_many`, `transform_many`) now accept and return any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager backend (pandas, polars, pyarrow, ...) instead of being pandas-only, preserving the input backend (including the pandas index) on output. Classic numpy-backed pandas keeps the historical fast path (and its float dtype, e.g. `float32`); other backends are scaled through a `float64` path. `transform_many` only requires `pandas` when the input is a pandas frame. Outputs are unchanged on the pandas path. ## proba diff --git a/river/compose/pipeline.py b/river/compose/pipeline.py index 0b3e7bf427..8eb95eb3fc 100644 --- a/river/compose/pipeline.py +++ b/river/compose/pipeline.py @@ -12,7 +12,7 @@ from river import base if typing.TYPE_CHECKING: - import pandas as pd + from narwhals.stable.v2.typing import IntoDataFrame, IntoSeries from . import func, union @@ -779,7 +779,7 @@ def print_title(title, indent=False): # Mini-batch methods - def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None, **params): + def learn_many(self, X: IntoDataFrame, y: IntoSeries | None = None, **params): """Fit to a mini-batch. Parameters @@ -822,7 +822,7 @@ def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None, **params): else: last_step.learn_many(X=X, **params) - def _transform_many(self, X: pd.DataFrame): + def _transform_many(self, X: IntoDataFrame): """This methods takes care of applying the first n - 1 steps of the pipeline, which are supposedly transformers. It also returns the final step so that other functions can do something with it. @@ -846,7 +846,7 @@ def _transform_many(self, X: pd.DataFrame): last_step = next(steps) return X, last_step - def transform_many(self, X: pd.DataFrame): + def transform_many(self, X: IntoDataFrame): """Apply each transformer in the pipeline to some features. The final step in the pipeline will be applied if it is a transformer. If not, then it will @@ -861,12 +861,12 @@ def transform_many(self, X: pd.DataFrame): return last_step.transform_many(X) return X - def predict_many(self, X: pd.DataFrame): + def predict_many(self, X: IntoDataFrame): """Call transform_many, and then predict_many on the final step.""" X, last_step = self._transform_many(X=X) return last_step.predict_many(X=X) - def predict_proba_many(self, X: pd.DataFrame): + def predict_proba_many(self, X: IntoDataFrame): """Call transform_many, and then predict_proba_many on the final step.""" X, last_step = self._transform_many(X=X) return last_step.predict_proba_many(X=X) diff --git a/river/compose/product.py b/river/compose/product.py index 468a7575a5..d4734b4a36 100644 --- a/river/compose/product.py +++ b/river/compose/product.py @@ -2,15 +2,12 @@ import functools import itertools -import typing +import narwhals.stable.v2 as nw import numpy as np from river import utils -if typing.TYPE_CHECKING: - pass - from . import union __all__ = ["TransformerProduct"] @@ -89,8 +86,29 @@ def transform_one(self, x): } def transform_many(self, X): - pd = utils.pandas.import_pandas() outputs = [t.transform_many(X) for t in self.transformers.values()] + # Classic pandas keeps the sparse-aware fast path (preserving the memory savings when + # crossing one-hot encoded features); every other backend takes the agnostic path. + if utils.dataframe.into_frame(outputs[0]).implementation.is_pandas(): + return self._transform_many_pandas(X, outputs) + return self._transform_many_narwhals(outputs) + + def _transform_many_narwhals(self, outputs): + # Cross every column of each output with every column of the others, multiplying them + # element-wise. Series share the input's backend, so the products do too. + frames = [utils.dataframe.into_frame(output) for output in outputs] + crossed = [] + for combo in itertools.product(*(frame.columns for frame in frames)): + name = "*".join(combo) + product = functools.reduce( + lambda a, b: a * b, + (frame[col] for frame, col in zip(frames, combo)), + ) + crossed.append(product.rename(name).to_frame()) + return nw.concat(crossed, how="horizontal").to_native() + + def _transform_many_pandas(self, X, outputs): + pd = utils.pandas.import_pandas() def multiply(a, b): # Fast-track for sparse[uint8] * sparse[uint8] diff --git a/river/compose/select.py b/river/compose/select.py index df13c6542a..a38622339a 100644 --- a/river/compose/select.py +++ b/river/compose/select.py @@ -1,6 +1,11 @@ from __future__ import annotations -from river import base +import typing + +from river import base, utils + +if typing.TYPE_CHECKING: + from narwhals.stable.v2.typing import IntoDataFrameT __all__ = ["Discard", "Select", "SelectType"] @@ -130,12 +135,11 @@ def __init__(self, *keys: base.typing.FeatureName): def transform_one(self, x): return {i: x[i] for i in self.keys} - def transform_many(self, X): - # INFO: has either side-effects or doesn't have copy - choose your poison - # REFLECTION: worth adding `copy=True` parameter to the object constructor to allow both? - # << convention is to have pure methods/functions - return X.loc[:, list(self.keys)].copy() - # return X.loc[:, self.keys] + def transform_many(self, X: IntoDataFrameT) -> IntoDataFrameT: + # `select` returns a fresh frame in the caller's own backend (and keeps the pandas index), + # so the method stays pure — no view aliasing back onto the input. + keys = typing.cast("list[str]", list(self.keys)) + return utils.dataframe.into_frame(X).select(keys).to_native() def __str__(self): return str(sorted(self.keys)) diff --git a/river/compose/union.py b/river/compose/union.py index 216539cdab..77d93ea903 100644 --- a/river/compose/union.py +++ b/river/compose/union.py @@ -3,10 +3,12 @@ import types import typing +import narwhals.stable.v2 as nw + from river import base, utils if typing.TYPE_CHECKING: - import pandas as pd + from narwhals.stable.v2.typing import IntoDataFrame, IntoSeries from . import func @@ -284,7 +286,7 @@ def transform_one(self, x): # Mini-batch methods - def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None): + def learn_many(self, X: IntoDataFrame, y: IntoSeries | None = None): """Update each transformer. Parameters @@ -303,10 +305,14 @@ def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None): t.learn_many(X) def transform_many(self, X): - """Passes the data through each transformer and packs the results together.""" - pd = utils.pandas.import_pandas() - return pd.concat( - (t.transform_many(X) for t in self.transformers.values()), - copy=False, - axis=1, - ) + """Passes the data through each transformer and packs the results together. + + The per-transformer outputs are concatenated column-wise via narwhals, so the result is + rebuilt in the caller's own backend (pandas, polars, pyarrow, ...). All transformers see + the same rows in the same order, so a positional horizontal concat matches the historical + index-aligned `pandas.concat(axis=1)`. + """ + frames = [ + utils.dataframe.into_frame(t.transform_many(X)) for t in self.transformers.values() + ] + return nw.concat(frames, how="horizontal").to_native() diff --git a/river/preprocessing/scale.py b/river/preprocessing/scale.py index fe209e428e..a67baa28ab 100644 --- a/river/preprocessing/scale.py +++ b/river/preprocessing/scale.py @@ -6,12 +6,14 @@ import numbers import typing +import narwhals.stable.v2 as nw import numpy as np from river import base, stats, utils if typing.TYPE_CHECKING: import pandas as pd + from narwhals.stable.v2.typing import IntoDataFrame, IntoDataFrameT __all__ = [ "AdaptiveStandardScaler", @@ -299,7 +301,7 @@ def transform_one(self, x): return result return {i: xi - means[i] for i, xi in x.items()} - def learn_many(self, X: pd.DataFrame): + def learn_many(self, X: IntoDataFrame) -> None: """Update with a mini-batch of features. Note that the update formulas for mean and variance are slightly different than in the @@ -310,30 +312,32 @@ def learn_many(self, X: pd.DataFrame): Parameters ---------- X - A dataframe where each column is a feature. + A dataframe where each column is a feature. Any narwhals-supported eager backend + (pandas, polars, pyarrow, ...) is accepted. """ + Xnw = utils.dataframe.into_frame(X) + if self.window_size is not None: # Row-by-row to preserve correct rolling-window semantics. - columns = X.columns - for row in X.values: - self.learn_one(dict(zip(columns, row))) + for row in Xnw.iter_rows(named=True): + self.learn_one(row) return - # Operating on X.values, which is a view to the underlying numpy array, is slightly faster - # than operating on X - columns = X.columns - X = X.values + # Drop to a float64 numpy matrix for the compute core; the column labels drive the + # per-feature statistics, so the batch may add/drop/reorder columns between calls. + columns = Xnw.columns + X_np = utils.dataframe.to_numpy(Xnw) # In the rest of this method, old_* refers to the existing statistics, whilst new_* refers # to the statistics of the current mini-batch. - new_means = np.nanmean(X, axis=0) + new_means = np.nanmean(X_np, axis=0) # We could call np.var, but we already have the mean so we can be smart if self.with_std: - new_vars = np.einsum("ij,ij->j", X, X) / len(X) - new_means**2 + new_vars = np.einsum("ij,ij->j", X_np, X_np) / len(X_np) - new_means**2 else: new_vars = [] - new_counts = np.sum(~np.isnan(X), axis=0) + new_counts = np.sum(~np.isnan(X_np), axis=0) for col, new_mean, new_var, new_count in itertools.zip_longest( columns, new_means, new_vars, new_counts @@ -352,9 +356,14 @@ def learn_many(self, X: pd.DataFrame): ).item() self.counts[col] += new_count.item() - def transform_many(self, X: pd.DataFrame): + def transform_many(self, X: IntoDataFrameT) -> IntoDataFrameT: """Scale a mini-batch of features. + Classic numpy-backed pandas keeps the historical fast path, which preserves the input's + float dtype (e.g. ``float32`` stays ``float32``). Every other narwhals-supported backend + (polars, pyarrow, nullable/arrow-backed pandas, ...) is scaled through a backend-agnostic + path whose compute runs in ``float64``. + Parameters ---------- X @@ -362,6 +371,19 @@ def transform_many(self, X: pd.DataFrame): the features has not been seen during a previous call to `learn_many`. """ + Xnw = utils.dataframe.into_frame(X) + # The fast path relies on `.values` yielding a numeric numpy view, which only holds for + # classic numpy-backed pandas; nullable/arrow-backed pandas return object arrays and so + # take the agnostic path alongside polars/pyarrow. + if Xnw.implementation.is_pandas() and all( + isinstance(dtype, np.dtype) for dtype in typing.cast("pd.DataFrame", X).dtypes + ): + native = self._transform_many_pandas(typing.cast("pd.DataFrame", X)) + else: + native = self._transform_many_narwhals(Xnw) + return typing.cast("IntoDataFrameT", native) + + def _transform_many_pandas(self, X: pd.DataFrame) -> pd.DataFrame: pd = utils.pandas.import_pandas() # Determine dtype of input dtypes = X.dtypes.unique() @@ -387,6 +409,27 @@ def transform_many(self, X: pd.DataFrame): return pd.DataFrame(Xt, index=X.index, columns=X.columns, copy=False) + def _transform_many_narwhals(self, Xnw: nw.DataFrame[IntoDataFrameT]) -> IntoDataFrameT: + columns = Xnw.columns + + if self.window_size is None: + means = np.array([self.means[c] for c in columns], dtype=np.float64) + else: + means = np.array([self.means[c].get() for c in columns], dtype=np.float64) + Xt = utils.dataframe.to_numpy(Xnw) - means + + if self.with_std: + if self.window_size is None: + stds = np.array([self.vars[c] ** 0.5 for c in columns], dtype=np.float64) + else: + stds = np.array([self.vars[c].get() ** 0.5 for c in columns], dtype=np.float64) + np.divide(Xt, stds, where=stds > 0, out=Xt) + + native = utils.dataframe.to_native_frame( + {col: Xt[:, j] for j, col in enumerate(columns)}, like=Xnw + ) + return typing.cast("IntoDataFrameT", native) + class MinMaxScaler(base.Transformer): """Scales the data to a fixed range from 0 to 1. diff --git a/river/preprocessing/test_scale.py b/river/preprocessing/test_scale.py index 9b979f2575..89ee43853e 100644 --- a/river/preprocessing/test_scale.py +++ b/river/preprocessing/test_scale.py @@ -3,12 +3,18 @@ import math import pickle import random +import typing +import narwhals.stable.v2 as nw import numpy as np import pandas as pd +import pytest from river import datasets, preprocessing, stats, stream, utils +if typing.TYPE_CHECKING: + from river.conftest import FrameBackend + def _pd_split(df, n): """Split a pandas DataFrame or Series into n chunks without triggering swapaxes deprecation.""" @@ -530,3 +536,199 @@ def test_issue_1313(): dtype: object """ + + +# `StandardScaler`'s mini-batch path is routed through narwhals: classic numpy-backed pandas keeps +# the historical fast path (preserving its float dtype), while every other backend is scaled +# through a backend-agnostic float64 path. These tests pin the cross-backend behaviour, using the +# pandas path as the oracle. + + +def _numeric_batch(n: int = 40) -> dict[str, list[float]]: + """Build a reproducible batch of two numeric columns.""" + rng = random.Random(42) + return { + "x": [rng.uniform(8, 12) for _ in range(n)], + "y": [rng.uniform(-5, 5) for _ in range(n)], + } + + +def _trump_columns() -> dict[str, list[float]]: + """The TrumpApproval dataset as a column dict (mixed int64/float64 features).""" + return pd.read_csv(datasets.TrumpApproval().path).to_dict(orient="list") + + +def _chunk_dict(data: dict[str, list], n: int) -> list[dict[str, list]]: + """Split a column dict into `n` row-wise chunks, preserving column order.""" + length = len(next(iter(data.values()))) + bounds = np.array_split(range(length), n) + return [{col: [values[i] for i in idx] for col, values in data.items()} for idx in bounds] + + +# Mini-batches with a shifting column set: ``a`` disappears then reappears, ``c`` emerges, and the +# column order is permuted between calls. `StandardScaler` advertises support for this, so it must +# behave identically on every backend. +EMERGING_BATCHES: list[dict[str, list[float]]] = [ + {"a": [1.0, 2.0, 3.0, 4.0], "b": [10.0, 20.0, 30.0, 40.0]}, + {"c": [-1.0, -2.0, -3.0], "b": [50.0, 60.0, 70.0]}, + {"b": [80.0, 90.0], "a": [5.0, 6.0], "c": [-4.0, -5.0]}, +] +# A frame whose columns were all seen during the batches above, used to probe `transform_many`. +EMERGING_TRANSFORM: dict[str, list[float]] = { + "a": [1.5, 6.5, 3.0], + "b": [15.0, 95.0, 45.0], + "c": [-1.5, -4.5, -2.0], +} + + +def _frame_to_numpy(native) -> np.ndarray: + """Read a native frame back into a float64 numpy matrix via narwhals.""" + return np.asarray(nw.from_native(native, eager_only=True).to_numpy(), dtype=np.float64) + + +@pytest.mark.parametrize("with_std", [True, False]) +@pytest.mark.parametrize("window_size", [None, 7]) +def test_standard_scaler_transform_many_backend_agnostic( + frame_backend: FrameBackend, with_std: bool, window_size: int | None +) -> None: + """`transform_many` yields the same values on every backend as the pandas reference.""" + data = _numeric_batch() + + reference = preprocessing.StandardScaler(with_std=with_std, window_size=window_size) + reference.learn_many(pd.DataFrame(data)) + expected = reference.transform_many(pd.DataFrame(data)).to_numpy() + + scaler = preprocessing.StandardScaler(with_std=with_std, window_size=window_size) + scaler.learn_many(frame_backend.frame(data)) + got = _frame_to_numpy(scaler.transform_many(frame_backend.frame(data))) + + np.testing.assert_allclose(got, expected, atol=1e-9) + + +@pytest.mark.parametrize("window_size", [None, 7]) +def test_standard_scaler_learn_many_backend_agnostic( + frame_backend: FrameBackend, window_size: int | None +) -> None: + """`learn_many` accumulates identical running statistics across backends.""" + data = _numeric_batch() + + reference = preprocessing.StandardScaler(window_size=window_size) + reference.learn_many(pd.DataFrame(data)) + + scaler = preprocessing.StandardScaler(window_size=window_size) + scaler.learn_many(frame_backend.frame(data)) + + for col in data: + ref_mean = reference.means[col].get() if window_size else reference.means[col] + got_mean = scaler.means[col].get() if window_size else scaler.means[col] + ref_var = reference.vars[col].get() if window_size else reference.vars[col] + got_var = scaler.vars[col].get() if window_size else scaler.vars[col] + assert scaler.counts[col] == reference.counts[col] + assert math.isclose(got_mean, ref_mean, abs_tol=1e-9) + assert math.isclose(got_var, ref_var, abs_tol=1e-9) + + +def test_standard_scaler_transform_many_returns_native_backend( + frame_backend: FrameBackend, +) -> None: + """The output frame is rebuilt in the caller's own backend.""" + data = _numeric_batch() + native = frame_backend.frame(data) + + scaler = preprocessing.StandardScaler() + scaler.learn_many(native) + out = scaler.transform_many(native) + + assert type(out) is type(native) + + +def test_standard_scaler_transform_many_chunked_backend_agnostic( + frame_backend: FrameBackend, +) -> None: + """Learning in mini-batches then transforming matches the pandas reference.""" + data = _numeric_batch(60) + pdf = pd.DataFrame(data) + + reference = preprocessing.StandardScaler() + for chunk in _pd_split(pdf, 5): + reference.learn_many(chunk) + expected = reference.transform_many(pdf).to_numpy() + + scaler = preprocessing.StandardScaler() + native = frame_backend.frame(data) + # Slice the native frame into the same row ranges via narwhals. + nw_frame = nw.from_native(native, eager_only=True) + bounds = np.array_split(range(len(pdf)), 5) + for idx in bounds: + scaler.learn_many(nw_frame[int(idx[0]) : int(idx[-1]) + 1].to_native()) + got = _frame_to_numpy(scaler.transform_many(native)) + + np.testing.assert_allclose(got, expected, atol=1e-9) + + +@pytest.mark.parametrize("with_std", [True, False]) +def test_standard_scaler_real_dataset_backend_agnostic( + frame_backend: FrameBackend, with_std: bool +) -> None: + """A real, mixed-dtype dataset (TrumpApproval) scales identically on every backend. + + The frame mixes an ``int64`` column with several ``float64`` ones and is learned in chunks, + exercising both the running-statistics merge and the integer-to-float promotion path. + """ + data = _trump_columns() + chunks = _chunk_dict(data, 8) + + reference = preprocessing.StandardScaler(with_std=with_std) + for chunk in chunks: + reference.learn_many(pd.DataFrame(chunk)) + expected = reference.transform_many(pd.DataFrame(data)).to_numpy() + + scaler = preprocessing.StandardScaler(with_std=with_std) + for chunk in chunks: + scaler.learn_many(frame_backend.frame(chunk)) + got = _frame_to_numpy(scaler.transform_many(frame_backend.frame(data))) + + np.testing.assert_allclose(got, expected, atol=1e-9) + + +@pytest.mark.parametrize("with_std", [True, False]) +@pytest.mark.parametrize("window_size", [None, 5]) +def test_standard_scaler_emerging_features_learn_backend_agnostic( + frame_backend: FrameBackend, with_std: bool, window_size: int | None +) -> None: + """Adding/removing/reordering columns between `learn_many` calls is backend-agnostic. + + Each feature's running statistics must depend only on the rows in which it actually appears, + identically across backends, even as the column set and order change between mini-batches. + """ + reference = preprocessing.StandardScaler(with_std=with_std, window_size=window_size) + scaler = preprocessing.StandardScaler(with_std=with_std, window_size=window_size) + for batch in EMERGING_BATCHES: + reference.learn_many(pd.DataFrame(batch)) + scaler.learn_many(frame_backend.frame(batch)) + + for col in ("a", "b", "c"): + ref_mean = reference.means[col].get() if window_size else reference.means[col] + got_mean = scaler.means[col].get() if window_size else scaler.means[col] + assert scaler.counts[col] == reference.counts[col] + assert math.isclose(got_mean, ref_mean, abs_tol=1e-9) + if with_std: + ref_var = reference.vars[col].get() if window_size else reference.vars[col] + got_var = scaler.vars[col].get() if window_size else scaler.vars[col] + assert math.isclose(got_var, ref_var, abs_tol=1e-9) + + +def test_standard_scaler_emerging_features_transform_backend_agnostic( + frame_backend: FrameBackend, +) -> None: + """After learning on a shifting column set, `transform_many` matches the pandas oracle.""" + reference = preprocessing.StandardScaler() + scaler = preprocessing.StandardScaler() + for batch in EMERGING_BATCHES: + reference.learn_many(pd.DataFrame(batch)) + scaler.learn_many(frame_backend.frame(batch)) + + expected = reference.transform_many(pd.DataFrame(EMERGING_TRANSFORM)).to_numpy() + got = _frame_to_numpy(scaler.transform_many(frame_backend.frame(EMERGING_TRANSFORM))) + + np.testing.assert_allclose(got, expected, atol=1e-9) From f6379ded633a1f6042bd01715c69065cc2cae103 Mon Sep 17 00:00:00 2001 From: Max Halford Date: Fri, 26 Jun 2026 16:29:07 +0200 Subject: [PATCH 2/3] test(utils): update transform_many pandas-requirement test for narwhals StandardScaler `StandardScaler.transform_many` no longer imports pandas unconditionally: it only needs pandas on the classic-pandas fast path. The old test passed `object()` (which now fails at the narwhals boundary, and isn't a valid `IntoDataFrameT`). Split it into two: a pandas input still raises ImportError when pandas is missing, while a polars input goes through the agnostic path and works without pandas. Co-Authored-By: Claude Opus 4.8 (1M context) --- river/utils/test_pandas.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/river/utils/test_pandas.py b/river/utils/test_pandas.py index 4329dc7e28..b788273199 100644 --- a/river/utils/test_pandas.py +++ b/river/utils/test_pandas.py @@ -10,11 +10,35 @@ def _raise_missing_pandas() -> None: raise ImportError("`pandas` is required for this operation.") -def test_transform_many_requires_pandas(monkeypatch: pytest.MonkeyPatch) -> None: +def test_transform_many_requires_pandas_only_for_pandas_input( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # `StandardScaler.transform_many` keeps a pandas fast path, so a pandas input still needs + # pandas. `learn_many` is routed through narwhals and never imports pandas. + import pandas as pd + monkeypatch.setattr(pandas_utils, "import_pandas", _raise_missing_pandas) + scaler = preprocessing.StandardScaler() + scaler.learn_many(pd.DataFrame({"x": [1.0, 2.0, 3.0]})) with pytest.raises(ImportError, match="pandas"): - preprocessing.StandardScaler().transform_many(object()) + scaler.transform_many(pd.DataFrame({"x": [1.0, 2.0, 3.0]})) + + +def test_transform_many_does_not_require_pandas_for_polars( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # A non-pandas backend takes the agnostic path, so `transform_many` must work even when + # pandas is unavailable, returning the caller's own backend. + monkeypatch.setattr(pandas_utils, "import_pandas", _raise_missing_pandas) + pl = pytest.importorskip("polars") + + scaler = preprocessing.StandardScaler() + scaler.learn_many(pl.DataFrame({"x": [1.0, 2.0, 3.0]})) + out = scaler.transform_many(pl.DataFrame({"x": [1.0, 2.0, 3.0]})) + + assert isinstance(out, pl.DataFrame) + assert len(out) == 3 def test_predict_many_does_not_require_pandas(monkeypatch: pytest.MonkeyPatch) -> None: From bc7104bf8745c49cbaa48f25148f6f692188e7c9 Mon Sep 17 00:00:00 2001 From: Francesco Bruzzesi <42817048+FBruzzesi@users.noreply.github.com> Date: Mon, 29 Jun 2026 22:01:52 +0200 Subject: [PATCH 3/3] perf(utils, preprocessing): Build native frames directly from 2D numpy, simplify `StandardScaler` (#1935) * Allow to create dataframe directly from 2d numpy array * fixup square array issue * simplify StandardScaler --- river/preprocessing/scale.py | 79 +++++++++++-------------------- river/preprocessing/test_scale.py | 8 ++-- river/utils/dataframe.py | 68 ++++++++++++++++++++------ river/utils/test_pandas.py | 13 +++-- 4 files changed, 94 insertions(+), 74 deletions(-) diff --git a/river/preprocessing/scale.py b/river/preprocessing/scale.py index a67baa28ab..234a5189d9 100644 --- a/river/preprocessing/scale.py +++ b/river/preprocessing/scale.py @@ -12,7 +12,8 @@ from river import base, stats, utils if typing.TYPE_CHECKING: - import pandas as pd + from collections.abc import Mapping + from narwhals.stable.v2.typing import IntoDataFrame, IntoDataFrameT __all__ = [ @@ -25,6 +26,17 @@ "StandardScaler", ] +NW_TO_NP_DTYPES: Mapping[nw.dtypes.DType, np.number] = { + nw.Int8(): np.float16(), + nw.Int16(): np.float32(), + nw.Int32(): np.float64(), + nw.UInt8(): np.float16(), + nw.UInt16(): np.float32(), + nw.UInt32(): np.float64(), + nw.Float32(): np.float32(), + nw.Float64(): np.float64(), +} + def safe_div(a, b): """Return a if b is nil, else divides a by b. @@ -359,10 +371,11 @@ def learn_many(self, X: IntoDataFrame) -> None: def transform_many(self, X: IntoDataFrameT) -> IntoDataFrameT: """Scale a mini-batch of features. - Classic numpy-backed pandas keeps the historical fast path, which preserves the input's - float dtype (e.g. ``float32`` stays ``float32``). Every other narwhals-supported backend - (polars, pyarrow, nullable/arrow-backed pandas, ...) is scaled through a backend-agnostic - path whose compute runs in ``float64``. + Every narwhals-supported backend (pandas, polars, pyarrow, nullable/arrow-backed + pandas, ...) takes the same backend-agnostic path. The compute dtype is inferred from + the input schema, so a feature's float dtype is preserved (e.g. ``float32`` stays + ``float32``); integer columns are widened to the matching float and anything else falls + back to ``float64``. Parameters ---------- @@ -372,62 +385,26 @@ def transform_many(self, X: IntoDataFrameT) -> IntoDataFrameT: """ Xnw = utils.dataframe.into_frame(X) - # The fast path relies on `.values` yielding a numeric numpy view, which only holds for - # classic numpy-backed pandas; nullable/arrow-backed pandas return object arrays and so - # take the agnostic path alongside polars/pyarrow. - if Xnw.implementation.is_pandas() and all( - isinstance(dtype, np.dtype) for dtype in typing.cast("pd.DataFrame", X).dtypes - ): - native = self._transform_many_pandas(typing.cast("pd.DataFrame", X)) - else: - native = self._transform_many_narwhals(Xnw) - return typing.cast("IntoDataFrameT", native) - - def _transform_many_pandas(self, X: pd.DataFrame) -> pd.DataFrame: - pd = utils.pandas.import_pandas() - # Determine dtype of input - dtypes = X.dtypes.unique() - dtype = dtypes[0] if len(dtypes) == 1 else np.float64 - - # Check if the dtype is integer type and convert to corresponding float type - if np.issubdtype(dtype, np.integer): - bytes_size = dtype.itemsize - dtype = np.dtype(f"float{bytes_size * 8}") # type: ignore[operator] + schema = Xnw.schema + columns = schema.names() + dtypes = {NW_TO_NP_DTYPES.get(dtype, np.float64()) for dtype in schema.dtypes()} + dtype = np.result_type(*dtypes) if self.window_size is None: - means = np.array([self.means[c] for c in X.columns], dtype=dtype) + means = np.array([self.means[c] for c in columns], dtype=dtype) else: - means = np.array([self.means[c].get() for c in X.columns], dtype=dtype) - Xt = X.values - means + means = np.array([self.means[c].get() for c in columns], dtype=dtype) - if self.with_std: - if self.window_size is None: - stds = np.array([self.vars[c] ** 0.5 for c in X.columns], dtype=dtype) - else: - stds = np.array([self.vars[c].get() ** 0.5 for c in X.columns], dtype=dtype) - np.divide(Xt, stds, where=stds > 0, out=Xt) - - return pd.DataFrame(Xt, index=X.index, columns=X.columns, copy=False) - - def _transform_many_narwhals(self, Xnw: nw.DataFrame[IntoDataFrameT]) -> IntoDataFrameT: - columns = Xnw.columns - - if self.window_size is None: - means = np.array([self.means[c] for c in columns], dtype=np.float64) - else: - means = np.array([self.means[c].get() for c in columns], dtype=np.float64) - Xt = utils.dataframe.to_numpy(Xnw) - means + Xt = utils.dataframe.to_numpy(Xnw, dtype=dtype) - means if self.with_std: if self.window_size is None: - stds = np.array([self.vars[c] ** 0.5 for c in columns], dtype=np.float64) + stds = np.array([self.vars[c] ** 0.5 for c in columns], dtype=dtype) else: - stds = np.array([self.vars[c].get() ** 0.5 for c in columns], dtype=np.float64) + stds = np.array([self.vars[c].get() ** 0.5 for c in columns], dtype=dtype) np.divide(Xt, stds, where=stds > 0, out=Xt) - native = utils.dataframe.to_native_frame( - {col: Xt[:, j] for j, col in enumerate(columns)}, like=Xnw - ) + native = utils.dataframe.to_native_frame(Xt, columns=columns, like=Xnw) return typing.cast("IntoDataFrameT", native) diff --git a/river/preprocessing/test_scale.py b/river/preprocessing/test_scale.py index 89ee43853e..1b2bb102f8 100644 --- a/river/preprocessing/test_scale.py +++ b/river/preprocessing/test_scale.py @@ -538,10 +538,10 @@ def test_issue_1313(): """ -# `StandardScaler`'s mini-batch path is routed through narwhals: classic numpy-backed pandas keeps -# the historical fast path (preserving its float dtype), while every other backend is scaled -# through a backend-agnostic float64 path. These tests pin the cross-backend behaviour, using the -# pandas path as the oracle. +# `StandardScaler`'s mini-batch path is routed through narwhals: every backend takes the same +# backend-agnostic path, which infers the compute dtype from the input schema and so preserves the +# input's float dtype. These tests pin the cross-backend behaviour, using the pandas path as the +# oracle. def _numeric_batch(n: int = 40) -> dict[str, list[float]]: diff --git a/river/utils/dataframe.py b/river/utils/dataframe.py index 8e2f0e7aa0..986064f690 100644 --- a/river/utils/dataframe.py +++ b/river/utils/dataframe.py @@ -27,7 +27,7 @@ IntoSeries, IntoSeriesT, ) - from numpy.typing import NDArray + from numpy.typing import DTypeLike, NDArray from scipy import sparse __all__ = [ @@ -40,15 +40,16 @@ ] -def to_numpy(frame: nw.DataFrame[Any]) -> NDArray[np.float64]: - """Extract a `float64` numpy matrix from a narwhals dataframe for the numpy compute core. +def to_numpy(frame: nw.DataFrame[Any], dtype: DTypeLike = np.float64) -> NDArray[Any]: + """Extract a numpy matrix from a narwhals dataframe for the numpy compute core. A pandas frame backed by pyarrow (`ArrowDtype`) columns returns an ``object`` array from ``.to_numpy()``, which breaks downstream ufuncs (e.g. ``np.exp`` raises *"loop of ufunc does - not support argument 0 of type float"*). Coercing to ``float64`` at the boundary keeps the - core backend-agnostic; ``np.asarray`` is a no-op when the frame already yields ``float64``. + not support argument 0 of type float"*). Coercing to a floating ``dtype`` (``float64`` by + default) at the boundary keeps the core backend-agnostic; ``np.asarray`` is a no-op when the + frame already yields that dtype. """ - return np.asarray(frame.to_numpy(), dtype=np.float64) + return np.asarray(frame.to_numpy(), dtype=dtype) def into_frame(X: IntoDataFrameT) -> nw.DataFrame[IntoDataFrameT]: @@ -89,8 +90,26 @@ def to_native_series( return typing.cast("IntoSeries", native) +@typing.overload def to_native_frame( - data: Mapping[Any, NDArray[Any] | Sequence[Any]], *, like: nw.DataFrame[Any] + data: Mapping[Any, NDArray[Any] | Sequence[Any]], + *, + like: nw.DataFrame[Any], + columns: None = None, +) -> IntoDataFrame: ... + + +@typing.overload +def to_native_frame( + data: NDArray[Any], *, like: nw.DataFrame[Any], columns: Sequence[Any] +) -> IntoDataFrame: ... + + +def to_native_frame( + data: Mapping[Any, NDArray[Any] | Sequence[Any]] | NDArray[Any], + *, + like: nw.DataFrame[Any], + columns: Sequence[Any] | None = None, ) -> IntoDataFrame: """Build a native dataframe matching the backend (and pandas index) of `like`. @@ -101,16 +120,37 @@ def to_native_frame( Parameters ---------- data - A mapping from column label to column values (numpy arrays from the numpy core). + Either a mapping from column label to column values, or a single 2D numpy array. The + array form takes the fast `narwhals.from_numpy` path (no per-column slicing) and + requires `columns` to name its columns. like The narwhals dataframe the call received as input. Its backend determines the return type, and its index (pandas only) is carried over to the result. + columns + Column labels for the array form of `data`. Ignored when `data` is a mapping. """ impl = like.implementation - if not impl.is_pandas_like(): - data = {str(key): value for key, value in data.items()} - frame = nw.from_dict(data, backend=impl).to_native() + if isinstance(data, np.ndarray): + if columns is None: + raise ValueError("`columns` must be provided when `data` is a numpy array.") + # narwhals requires string column names for non-pandas backends; pandas keeps the + # original labels (mirroring the mapping path above). + names = list(columns) if impl.is_pandas_like() else [str(col) for col in columns] + # `nw.from_numpy` does not expose `orient`, so polars infers row/column orientation. + # For a square array with column names pinned (as here), it breaks the tie from the + # memory layout and reads a Fortran-contiguous array column-major: a silent transpose. + # `to_numpy` returns exactly such an F-contiguous array for a polars frame, and + # arithmetic on it (e.g. the centering/scaling in `StandardScaler`) keeps that layout. + # Forcing C-contiguity pins the read to row-major. Cost: a no-op (returns `data` + # unchanged) when `data` is already C-contiguous; otherwise a single O(rows * cols) + # memcpy, on the same order as building the frame, so negligible relative to the + # conversion itself. + frame = nw.from_numpy(np.ascontiguousarray(data), schema=names, backend=impl).to_native() + else: + if not impl.is_pandas_like(): + data = {str(key): value for key, value in data.items()} + frame = nw.from_dict(data, backend=impl).to_native() # Carry over the pandas index; no-op for non-pandas backends (maybe_get_index -> None). if (index := nw.maybe_get_index(like)) is not None: frame.index = index @@ -146,9 +186,9 @@ def sparse_to_native_frame( ns = nw.get_native_namespace(like) frame = ns.DataFrame.sparse.from_spmatrix(matrix, columns=list(columns)) else: - dense = matrix.toarray() - data = {str(col): dense[:, j] for j, col in enumerate(columns)} - frame = nw.from_dict(data, backend=impl).to_native() + dense = typing.cast("NDArray[Any]", matrix.toarray()) + schema = [str(col) for col in columns] + frame = nw.from_numpy(dense, schema=schema, backend=impl).to_native() # Carry over the pandas index; no-op for non-pandas backends (maybe_get_index -> None). if (index := nw.maybe_get_index(like)) is not None: frame.index = index diff --git a/river/utils/test_pandas.py b/river/utils/test_pandas.py index b788273199..4570e07c8e 100644 --- a/river/utils/test_pandas.py +++ b/river/utils/test_pandas.py @@ -10,19 +10,22 @@ def _raise_missing_pandas() -> None: raise ImportError("`pandas` is required for this operation.") -def test_transform_many_requires_pandas_only_for_pandas_input( +def test_transform_many_does_not_require_pandas_helper_for_pandas_input( monkeypatch: pytest.MonkeyPatch, ) -> None: - # `StandardScaler.transform_many` keeps a pandas fast path, so a pandas input still needs - # pandas. `learn_many` is routed through narwhals and never imports pandas. + # `StandardScaler` mini-batching is fully routed through narwhals: there is no longer a + # pandas-specific fast path, so neither `learn_many` nor `transform_many` go through the + # `import_pandas` helper, even for pandas input. import pandas as pd monkeypatch.setattr(pandas_utils, "import_pandas", _raise_missing_pandas) scaler = preprocessing.StandardScaler() scaler.learn_many(pd.DataFrame({"x": [1.0, 2.0, 3.0]})) - with pytest.raises(ImportError, match="pandas"): - scaler.transform_many(pd.DataFrame({"x": [1.0, 2.0, 3.0]})) + out = scaler.transform_many(pd.DataFrame({"x": [1.0, 2.0, 3.0]})) + + assert isinstance(out, pd.DataFrame) + assert len(out) == 3 def test_transform_many_does_not_require_pandas_for_polars(