Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/releases/unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

## compose

- The mini-batch methods of `compose.Pipeline`, `compose.TransformerUnion`, `compose.Select`, and `compose.TransformerProduct` now accept and return any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager backend (pandas, polars, pyarrow, ...) instead of being pandas-only, so a whole pipeline can be mini-batched on a non-pandas backend. The input backend is preserved on output, including the pandas index, and `pandas` is no longer required unless the input is a pandas frame. `TransformerProduct` keeps the pandas `Sparse[uint8]` fast path when crossing one-hot encoded features.
- `compose.Pipeline` now forwards extra keyword arguments (such as the timestamp `t` used by `utils.TimeRolling`, or a sample weight `w`) to each step whose method declares them, and drops them for steps that don't. This makes `feature_extraction.Agg`/`TargetAgg` backed by `utils.TimeRolling` work inside a pipeline via `model.learn_one(x, y, t=t)`. Routing applies to `learn_one` and to the predict-time methods (`predict_one`, `predict_proba_one`, `score_one`, `transform_one`), so it also works under `compose.learn_during_predict` where unsupervised steps learn during `predict_one(x, t=t)`. Fixes [#1600](https://github.com/online-ml/river/issues/1600). The accepted arguments are determined once when the pipeline plan is built, so pipelines with no extra arguments keep their previous speed.

## covariance
Expand Down Expand Up @@ -78,6 +79,7 @@
- `preprocessing.FeatureHasher` now hashes with MurmurHash3 in Rust, making it much faster. It gains an `alternate_sign` parameter (default `True`, matching scikit-learn) and returns a plain `dict`. Hashed feature indices differ from previous versions.
- `preprocessing.OneHotEncoder` mini-batch methods (`learn_many`, `transform_many`) now accept and return any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager backend (pandas, polars, pyarrow, ...) instead of being pandas-only, preserving the input backend (including the pandas index) on output. The pandas path keeps returning `Sparse[uint8]` columns; other backends return dense integer columns, as they have no sparse-array equivalent. `transform_many` only requires `pandas` when the input is a pandas frame.
- `preprocessing.OrdinalEncoder` mini-batch methods (`learn_many`, `predict_many`, `predict_proba_many`) now accept and return any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager backend (pandas, polars, pyarrow, ...) instead of being pandas-only. The input backend is preserved on output, including the pandas index. These methods no longer require `pandas` to be installed.
- `preprocessing.StandardScaler` mini-batch methods (`learn_many`, `transform_many`) now accept and return any [narwhals](https://github.com/narwhals-dev/narwhals)-supported eager backend (pandas, polars, pyarrow, ...) instead of being pandas-only, preserving the input backend (including the pandas index) on output. Classic numpy-backed pandas keeps the historical fast path (and its float dtype, e.g. `float32`); other backends are scaled through a `float64` path. `transform_many` only requires `pandas` when the input is a pandas frame. Outputs are unchanged on the pandas path.

## proba

Expand Down
12 changes: 6 additions & 6 deletions river/compose/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from river import base

if typing.TYPE_CHECKING:
import pandas as pd
from narwhals.stable.v2.typing import IntoDataFrame, IntoSeries

from . import func, union

Expand Down Expand Up @@ -779,7 +779,7 @@ def print_title(title, indent=False):

# Mini-batch methods

def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None, **params):
def learn_many(self, X: IntoDataFrame, y: IntoSeries | None = None, **params):
"""Fit to a mini-batch.

Parameters
Expand Down Expand Up @@ -822,7 +822,7 @@ def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None, **params):
else:
last_step.learn_many(X=X, **params)

def _transform_many(self, X: pd.DataFrame):
def _transform_many(self, X: IntoDataFrame):
"""This methods takes care of applying the first n - 1 steps of the pipeline, which are
supposedly transformers. It also returns the final step so that other functions can do
something with it.
Expand All @@ -846,7 +846,7 @@ def _transform_many(self, X: pd.DataFrame):
last_step = next(steps)
return X, last_step

def transform_many(self, X: pd.DataFrame):
def transform_many(self, X: IntoDataFrame):
"""Apply each transformer in the pipeline to some features.

The final step in the pipeline will be applied if it is a transformer. If not, then it will
Expand All @@ -861,12 +861,12 @@ def transform_many(self, X: pd.DataFrame):
return last_step.transform_many(X)
return X

def predict_many(self, X: pd.DataFrame):
def predict_many(self, X: IntoDataFrame):
"""Call transform_many, and then predict_many on the final step."""
X, last_step = self._transform_many(X=X)
return last_step.predict_many(X=X)

def predict_proba_many(self, X: pd.DataFrame):
def predict_proba_many(self, X: IntoDataFrame):
"""Call transform_many, and then predict_proba_many on the final step."""
X, last_step = self._transform_many(X=X)
return last_step.predict_proba_many(X=X)
Expand Down
28 changes: 23 additions & 5 deletions river/compose/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

import functools
import itertools
import typing

import narwhals.stable.v2 as nw
import numpy as np

from river import utils

if typing.TYPE_CHECKING:
pass

from . import union

__all__ = ["TransformerProduct"]
Expand Down Expand Up @@ -89,8 +86,29 @@ def transform_one(self, x):
}

def transform_many(self, X):
pd = utils.pandas.import_pandas()
outputs = [t.transform_many(X) for t in self.transformers.values()]
# Classic pandas keeps the sparse-aware fast path (preserving the memory savings when
# crossing one-hot encoded features); every other backend takes the agnostic path.
if utils.dataframe.into_frame(outputs[0]).implementation.is_pandas():
return self._transform_many_pandas(X, outputs)
return self._transform_many_narwhals(outputs)

def _transform_many_narwhals(self, outputs):
# Cross every column of each output with every column of the others, multiplying them
# element-wise. Series share the input's backend, so the products do too.
frames = [utils.dataframe.into_frame(output) for output in outputs]
crossed = []
for combo in itertools.product(*(frame.columns for frame in frames)):
name = "*".join(combo)
product = functools.reduce(
lambda a, b: a * b,
(frame[col] for frame, col in zip(frames, combo)),
)
crossed.append(product.rename(name).to_frame())
return nw.concat(crossed, how="horizontal").to_native()

def _transform_many_pandas(self, X, outputs):
pd = utils.pandas.import_pandas()

def multiply(a, b):
# Fast-track for sparse[uint8] * sparse[uint8]
Expand Down
18 changes: 11 additions & 7 deletions river/compose/select.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from __future__ import annotations

from river import base
import typing

from river import base, utils

if typing.TYPE_CHECKING:
from narwhals.stable.v2.typing import IntoDataFrameT

__all__ = ["Discard", "Select", "SelectType"]

Expand Down Expand Up @@ -130,12 +135,11 @@ def __init__(self, *keys: base.typing.FeatureName):
def transform_one(self, x):
return {i: x[i] for i in self.keys}

def transform_many(self, X):
# INFO: has either side-effects or doesn't have copy - choose your poison
# REFLECTION: worth adding `copy=True` parameter to the object constructor to allow both?
# << convention is to have pure methods/functions
return X.loc[:, list(self.keys)].copy()
# return X.loc[:, self.keys]
def transform_many(self, X: IntoDataFrameT) -> IntoDataFrameT:
# `select` returns a fresh frame in the caller's own backend (and keeps the pandas index),
# so the method stays pure — no view aliasing back onto the input.
keys = typing.cast("list[str]", list(self.keys))
return utils.dataframe.into_frame(X).select(keys).to_native()

def __str__(self):
return str(sorted(self.keys))
Expand Down
24 changes: 15 additions & 9 deletions river/compose/union.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import types
import typing

import narwhals.stable.v2 as nw

from river import base, utils

if typing.TYPE_CHECKING:
import pandas as pd
from narwhals.stable.v2.typing import IntoDataFrame, IntoSeries

from . import func

Expand Down Expand Up @@ -284,7 +286,7 @@ def transform_one(self, x):

# Mini-batch methods

def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None):
def learn_many(self, X: IntoDataFrame, y: IntoSeries | None = None):
"""Update each transformer.

Parameters
Expand All @@ -303,10 +305,14 @@ def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None):
t.learn_many(X)

def transform_many(self, X):
"""Passes the data through each transformer and packs the results together."""
pd = utils.pandas.import_pandas()
return pd.concat(
(t.transform_many(X) for t in self.transformers.values()),
copy=False,
axis=1,
)
"""Passes the data through each transformer and packs the results together.

The per-transformer outputs are concatenated column-wise via narwhals, so the result is
rebuilt in the caller's own backend (pandas, polars, pyarrow, ...). All transformers see
the same rows in the same order, so a positional horizontal concat matches the historical
index-aligned `pandas.concat(axis=1)`.
"""
frames = [
utils.dataframe.into_frame(t.transform_many(X)) for t in self.transformers.values()
]
return nw.concat(frames, how="horizontal").to_native()
78 changes: 49 additions & 29 deletions river/preprocessing/scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
import numbers
import typing

import narwhals.stable.v2 as nw
import numpy as np

from river import base, stats, utils

if typing.TYPE_CHECKING:
import pandas as pd
from collections.abc import Mapping

from narwhals.stable.v2.typing import IntoDataFrame, IntoDataFrameT

__all__ = [
"AdaptiveStandardScaler",
Expand All @@ -23,6 +26,17 @@
"StandardScaler",
]

NW_TO_NP_DTYPES: Mapping[nw.dtypes.DType, np.number] = {
nw.Int8(): np.float16(),
nw.Int16(): np.float32(),
nw.Int32(): np.float64(),
nw.UInt8(): np.float16(),
nw.UInt16(): np.float32(),
nw.UInt32(): np.float64(),
nw.Float32(): np.float32(),
nw.Float64(): np.float64(),
}


def safe_div(a, b):
"""Return a if b is nil, else divides a by b.
Expand Down Expand Up @@ -299,7 +313,7 @@ def transform_one(self, x):
return result
return {i: xi - means[i] for i, xi in x.items()}

def learn_many(self, X: pd.DataFrame):
def learn_many(self, X: IntoDataFrame) -> None:
"""Update with a mini-batch of features.

Note that the update formulas for mean and variance are slightly different than in the
Expand All @@ -310,30 +324,32 @@ def learn_many(self, X: pd.DataFrame):
Parameters
----------
X
A dataframe where each column is a feature.
A dataframe where each column is a feature. Any narwhals-supported eager backend
(pandas, polars, pyarrow, ...) is accepted.
"""
Xnw = utils.dataframe.into_frame(X)

if self.window_size is not None:
# Row-by-row to preserve correct rolling-window semantics.
columns = X.columns
for row in X.values:
self.learn_one(dict(zip(columns, row)))
for row in Xnw.iter_rows(named=True):
self.learn_one(row)
return

# Operating on X.values, which is a view to the underlying numpy array, is slightly faster
# than operating on X
columns = X.columns
X = X.values
# Drop to a float64 numpy matrix for the compute core; the column labels drive the
# per-feature statistics, so the batch may add/drop/reorder columns between calls.
columns = Xnw.columns
X_np = utils.dataframe.to_numpy(Xnw)

# In the rest of this method, old_* refers to the existing statistics, whilst new_* refers
# to the statistics of the current mini-batch.

new_means = np.nanmean(X, axis=0)
new_means = np.nanmean(X_np, axis=0)
# We could call np.var, but we already have the mean so we can be smart
if self.with_std:
new_vars = np.einsum("ij,ij->j", X, X) / len(X) - new_means**2
new_vars = np.einsum("ij,ij->j", X_np, X_np) / len(X_np) - new_means**2
else:
new_vars = []
new_counts = np.sum(~np.isnan(X), axis=0)
new_counts = np.sum(~np.isnan(X_np), axis=0)

for col, new_mean, new_var, new_count in itertools.zip_longest(
columns, new_means, new_vars, new_counts
Expand All @@ -352,40 +368,44 @@ def learn_many(self, X: pd.DataFrame):
).item()
self.counts[col] += new_count.item()

def transform_many(self, X: pd.DataFrame):
def transform_many(self, X: IntoDataFrameT) -> IntoDataFrameT:
"""Scale a mini-batch of features.

Every narwhals-supported backend (pandas, polars, pyarrow, nullable/arrow-backed
pandas, ...) takes the same backend-agnostic path. The compute dtype is inferred from
the input schema, so a feature's float dtype is preserved (e.g. ``float32`` stays
``float32``); integer columns are widened to the matching float and anything else falls
back to ``float64``.

Parameters
----------
X
A dataframe where each column is a feature. An exception will be raised if any of
the features has not been seen during a previous call to `learn_many`.

"""
pd = utils.pandas.import_pandas()
# Determine dtype of input
dtypes = X.dtypes.unique()
dtype = dtypes[0] if len(dtypes) == 1 else np.float64

# Check if the dtype is integer type and convert to corresponding float type
if np.issubdtype(dtype, np.integer):
bytes_size = dtype.itemsize
dtype = np.dtype(f"float{bytes_size * 8}") # type: ignore[operator]
Xnw = utils.dataframe.into_frame(X)
schema = Xnw.schema
columns = schema.names()
dtypes = {NW_TO_NP_DTYPES.get(dtype, np.float64()) for dtype in schema.dtypes()}
dtype = np.result_type(*dtypes)

if self.window_size is None:
means = np.array([self.means[c] for c in X.columns], dtype=dtype)
means = np.array([self.means[c] for c in columns], dtype=dtype)
else:
means = np.array([self.means[c].get() for c in X.columns], dtype=dtype)
Xt = X.values - means
means = np.array([self.means[c].get() for c in columns], dtype=dtype)

Xt = utils.dataframe.to_numpy(Xnw, dtype=dtype) - means

if self.with_std:
if self.window_size is None:
stds = np.array([self.vars[c] ** 0.5 for c in X.columns], dtype=dtype)
stds = np.array([self.vars[c] ** 0.5 for c in columns], dtype=dtype)
else:
stds = np.array([self.vars[c].get() ** 0.5 for c in X.columns], dtype=dtype)
stds = np.array([self.vars[c].get() ** 0.5 for c in columns], dtype=dtype)
np.divide(Xt, stds, where=stds > 0, out=Xt)

return pd.DataFrame(Xt, index=X.index, columns=X.columns, copy=False)
native = utils.dataframe.to_native_frame(Xt, columns=columns, like=Xnw)
return typing.cast("IntoDataFrameT", native)


class MinMaxScaler(base.Transformer):
Expand Down
Loading