Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/releases/unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

## compose

- The mini-batch methods of `compose.Pipeline`, `compose.TransformerUnion`, `compose.Select`, and `compose.TransformerProduct` now accept and return any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager backend (pandas, polars, pyarrow, ...) instead of being pandas-only, so a whole pipeline can be mini-batched on a non-pandas backend. The input backend is preserved on output, including the pandas index, and `pandas` is no longer required unless the input is a pandas frame. `TransformerProduct` keeps the pandas `Sparse[uint8]` fast path when crossing one-hot encoded features.
- `compose.Pipeline` now forwards extra keyword arguments (such as the timestamp `t` used by `utils.TimeRolling`, or a sample weight `w`) to each step whose method declares them, and drops them for steps that don't. This makes `feature_extraction.Agg`/`TargetAgg` backed by `utils.TimeRolling` work inside a pipeline via `model.learn_one(x, y, t=t)`. Routing applies to `learn_one` and to the predict-time methods (`predict_one`, `predict_proba_one`, `score_one`, `transform_one`), so it also works under `compose.learn_during_predict` where unsupervised steps learn during `predict_one(x, t=t)`. Fixes [#1600](https://github.com/online-ml/river/issues/1600). The accepted arguments are determined once when the pipeline plan is built, so pipelines with no extra arguments keep their previous speed.

## covariance
Expand Down Expand Up @@ -78,6 +79,7 @@
- `preprocessing.FeatureHasher` now hashes with MurmurHash3 in Rust, making it much faster. It gains an `alternate_sign` parameter (default `True`, matching scikit-learn) and returns a plain `dict`. Hashed feature indices differ from previous versions.
- `preprocessing.OneHotEncoder` mini-batch methods (`learn_many`, `transform_many`) now accept and return any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager backend (pandas, polars, pyarrow, ...) instead of being pandas-only, preserving the input backend (including the pandas index) on output. The pandas path keeps returning `Sparse[uint8]` columns; other backends return dense integer columns, as they have no sparse-array equivalent. `transform_many` only requires `pandas` when the input is a pandas frame.
- `preprocessing.OrdinalEncoder` mini-batch methods (`learn_many`, `predict_many`, `predict_proba_many`) now accept and return any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager backend (pandas, polars, pyarrow, ...) instead of being pandas-only. The input backend is preserved on output, including the pandas index. These methods no longer require `pandas` to be installed.
- `preprocessing.StandardScaler` mini-batch methods (`learn_many`, `transform_many`) now accept and return any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager backend (pandas, polars, pyarrow, ...) instead of being pandas-only, preserving the input backend (including the pandas index) on output. Classic numpy-backed pandas keeps the historical fast path (and its float dtype, e.g. `float32`); other backends are scaled through a `float64` path. `transform_many` only requires `pandas` when the input is a pandas frame. Outputs are unchanged on the pandas path.

## proba

Expand Down
12 changes: 6 additions & 6 deletions river/compose/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from river import base

if typing.TYPE_CHECKING:
import pandas as pd
from narwhals.stable.v2.typing import IntoDataFrame, IntoSeries

from . import func, union

Expand Down Expand Up @@ -779,7 +779,7 @@ def print_title(title, indent=False):

# Mini-batch methods

def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None, **params):
def learn_many(self, X: IntoDataFrame, y: IntoSeries | None = None, **params):
"""Fit to a mini-batch.

Parameters
Expand Down Expand Up @@ -822,7 +822,7 @@ def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None, **params):
else:
last_step.learn_many(X=X, **params)

def _transform_many(self, X: pd.DataFrame):
def _transform_many(self, X: IntoDataFrame):
"""This methods takes care of applying the first n - 1 steps of the pipeline, which are
supposedly transformers. It also returns the final step so that other functions can do
something with it.
Expand All @@ -846,7 +846,7 @@ def _transform_many(self, X: pd.DataFrame):
last_step = next(steps)
return X, last_step

def transform_many(self, X: pd.DataFrame):
def transform_many(self, X: IntoDataFrame):
"""Apply each transformer in the pipeline to some features.

The final step in the pipeline will be applied if it is a transformer. If not, then it will
Expand All @@ -861,12 +861,12 @@ def transform_many(self, X: pd.DataFrame):
return last_step.transform_many(X)
return X

def predict_many(self, X: pd.DataFrame):
def predict_many(self, X: IntoDataFrame):
"""Call transform_many, and then predict_many on the final step."""
X, last_step = self._transform_many(X=X)
return last_step.predict_many(X=X)

def predict_proba_many(self, X: pd.DataFrame):
def predict_proba_many(self, X: IntoDataFrame):
"""Call transform_many, and then predict_proba_many on the final step."""
X, last_step = self._transform_many(X=X)
return last_step.predict_proba_many(X=X)
Expand Down
28 changes: 23 additions & 5 deletions river/compose/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

import functools
import itertools
import typing

import narwhals.stable.v2 as nw
import numpy as np

from river import utils

if typing.TYPE_CHECKING:
pass

from . import union

__all__ = ["TransformerProduct"]
Expand Down Expand Up @@ -89,8 +86,29 @@ def transform_one(self, x):
}

def transform_many(self, X):
pd = utils.pandas.import_pandas()
outputs = [t.transform_many(X) for t in self.transformers.values()]
# Classic pandas keeps the sparse-aware fast path (preserving the memory savings when
# crossing one-hot encoded features); every other backend takes the agnostic path.
if utils.dataframe.into_frame(outputs[0]).implementation.is_pandas():
return self._transform_many_pandas(X, outputs)
return self._transform_many_narwhals(outputs)

def _transform_many_narwhals(self, outputs):
# Cross every column of each output with every column of the others, multiplying them
# element-wise. Series share the input's backend, so the products do too.
frames = [utils.dataframe.into_frame(output) for output in outputs]
crossed = []
for combo in itertools.product(*(frame.columns for frame in frames)):
name = "*".join(combo)
product = functools.reduce(
lambda a, b: a * b,
(frame[col] for frame, col in zip(frames, combo)),
)
crossed.append(product.rename(name).to_frame())
return nw.concat(crossed, how="horizontal").to_native()

def _transform_many_pandas(self, X, outputs):
pd = utils.pandas.import_pandas()

def multiply(a, b):
# Fast-track for sparse[uint8] * sparse[uint8]
Expand Down
18 changes: 11 additions & 7 deletions river/compose/select.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from __future__ import annotations

from river import base
import typing

from river import base, utils

if typing.TYPE_CHECKING:
from narwhals.stable.v2.typing import IntoDataFrameT

__all__ = ["Discard", "Select", "SelectType"]

Expand Down Expand Up @@ -130,12 +135,11 @@ def __init__(self, *keys: base.typing.FeatureName):
def transform_one(self, x):
return {i: x[i] for i in self.keys}

def transform_many(self, X):
# INFO: has either side-effects or doesn't have copy - choose your poison
# REFLECTION: worth adding `copy=True` parameter to the object constructor to allow both?
# << convention is to have pure methods/functions
return X.loc[:, list(self.keys)].copy()
# return X.loc[:, self.keys]
def transform_many(self, X: IntoDataFrameT) -> IntoDataFrameT:
# `select` returns a fresh frame in the caller's own backend (and keeps the pandas index),
# so the method stays pure — no view aliasing back onto the input.
keys = typing.cast("list[str]", list(self.keys))
return utils.dataframe.into_frame(X).select(keys).to_native()

def __str__(self):
return str(sorted(self.keys))
Expand Down
24 changes: 15 additions & 9 deletions river/compose/union.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import types
import typing

import narwhals.stable.v2 as nw

from river import base, utils

if typing.TYPE_CHECKING:
import pandas as pd
from narwhals.stable.v2.typing import IntoDataFrame, IntoSeries

from . import func

Expand Down Expand Up @@ -284,7 +286,7 @@ def transform_one(self, x):

# Mini-batch methods

def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None):
def learn_many(self, X: IntoDataFrame, y: IntoSeries | None = None):
"""Update each transformer.

Parameters
Expand All @@ -303,10 +305,14 @@ def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None):
t.learn_many(X)

def transform_many(self, X):
"""Passes the data through each transformer and packs the results together."""
pd = utils.pandas.import_pandas()
return pd.concat(
(t.transform_many(X) for t in self.transformers.values()),
copy=False,
axis=1,
)
"""Passes the data through each transformer and packs the results together.

The per-transformer outputs are concatenated column-wise via narwhals, so the result is
rebuilt in the caller's own backend (pandas, polars, pyarrow, ...). All transformers see
the same rows in the same order, so a positional horizontal concat matches the historical
index-aligned `pandas.concat(axis=1)`.
"""
frames = [
utils.dataframe.into_frame(t.transform_many(X)) for t in self.transformers.values()
]
return nw.concat(frames, how="horizontal").to_native()
69 changes: 56 additions & 13 deletions river/preprocessing/scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import numbers
import typing

import narwhals.stable.v2 as nw
import numpy as np

from river import base, stats, utils

if typing.TYPE_CHECKING:
import pandas as pd
from narwhals.stable.v2.typing import IntoDataFrame, IntoDataFrameT

__all__ = [
"AdaptiveStandardScaler",
Expand Down Expand Up @@ -299,7 +301,7 @@ def transform_one(self, x):
return result
return {i: xi - means[i] for i, xi in x.items()}

def learn_many(self, X: pd.DataFrame):
def learn_many(self, X: IntoDataFrame) -> None:
"""Update with a mini-batch of features.

Note that the update formulas for mean and variance are slightly different than in the
Expand All @@ -310,30 +312,32 @@ def learn_many(self, X: pd.DataFrame):
Parameters
----------
X
A dataframe where each column is a feature.
A dataframe where each column is a feature. Any narwhals-supported eager backend
(pandas, polars, pyarrow, ...) is accepted.
"""
Xnw = utils.dataframe.into_frame(X)

if self.window_size is not None:
# Row-by-row to preserve correct rolling-window semantics.
columns = X.columns
for row in X.values:
self.learn_one(dict(zip(columns, row)))
for row in Xnw.iter_rows(named=True):
self.learn_one(row)
return

# Operating on X.values, which is a view to the underlying numpy array, is slightly faster
# than operating on X
columns = X.columns
X = X.values
# Drop to a float64 numpy matrix for the compute core; the column labels drive the
# per-feature statistics, so the batch may add/drop/reorder columns between calls.
columns = Xnw.columns
X_np = utils.dataframe.to_numpy(Xnw)

# In the rest of this method, old_* refers to the existing statistics, whilst new_* refers
# to the statistics of the current mini-batch.

new_means = np.nanmean(X, axis=0)
new_means = np.nanmean(X_np, axis=0)
# We could call np.var, but we already have the mean so we can be smart
if self.with_std:
new_vars = np.einsum("ij,ij->j", X, X) / len(X) - new_means**2
new_vars = np.einsum("ij,ij->j", X_np, X_np) / len(X_np) - new_means**2
else:
new_vars = []
new_counts = np.sum(~np.isnan(X), axis=0)
new_counts = np.sum(~np.isnan(X_np), axis=0)

for col, new_mean, new_var, new_count in itertools.zip_longest(
columns, new_means, new_vars, new_counts
Expand All @@ -352,16 +356,34 @@ def learn_many(self, X: pd.DataFrame):
).item()
self.counts[col] += new_count.item()

def transform_many(self, X: pd.DataFrame):
def transform_many(self, X: IntoDataFrameT) -> IntoDataFrameT:
"""Scale a mini-batch of features.

Classic numpy-backed pandas keeps the historical fast path, which preserves the input's
float dtype (e.g. ``float32`` stays ``float32``). Every other narwhals-supported backend
(polars, pyarrow, nullable/arrow-backed pandas, ...) is scaled through a backend-agnostic
path whose compute runs in ``float64``.

Parameters
----------
X
A dataframe where each column is a feature. An exception will be raised if any of
the features has not been seen during a previous call to `learn_many`.

"""
Xnw = utils.dataframe.into_frame(X)
# The fast path relies on `.values` yielding a numeric numpy view, which only holds for
# classic numpy-backed pandas; nullable/arrow-backed pandas return object arrays and so
# take the agnostic path alongside polars/pyarrow.
if Xnw.implementation.is_pandas() and all(
isinstance(dtype, np.dtype) for dtype in typing.cast("pd.DataFrame", X).dtypes
):
native = self._transform_many_pandas(typing.cast("pd.DataFrame", X))
else:
native = self._transform_many_narwhals(Xnw)
return typing.cast("IntoDataFrameT", native)

def _transform_many_pandas(self, X: pd.DataFrame) -> pd.DataFrame:
pd = utils.pandas.import_pandas()
# Determine dtype of input
dtypes = X.dtypes.unique()
Expand All @@ -387,6 +409,27 @@ def transform_many(self, X: pd.DataFrame):

return pd.DataFrame(Xt, index=X.index, columns=X.columns, copy=False)

def _transform_many_narwhals(self, Xnw: nw.DataFrame[IntoDataFrameT]) -> IntoDataFrameT:
columns = Xnw.columns

if self.window_size is None:
means = np.array([self.means[c] for c in columns], dtype=np.float64)
else:
means = np.array([self.means[c].get() for c in columns], dtype=np.float64)
Xt = utils.dataframe.to_numpy(Xnw) - means

if self.with_std:
if self.window_size is None:
stds = np.array([self.vars[c] ** 0.5 for c in columns], dtype=np.float64)
else:
stds = np.array([self.vars[c].get() ** 0.5 for c in columns], dtype=np.float64)
np.divide(Xt, stds, where=stds > 0, out=Xt)

native = utils.dataframe.to_native_frame(
{col: Xt[:, j] for j, col in enumerate(columns)}, like=Xnw
)
return typing.cast("IntoDataFrameT", native)


class MinMaxScaler(base.Transformer):
"""Scales the data to a fixed range from 0 to 1.
Expand Down
Loading