diff --git a/bofire/benchmarks/api.py b/bofire/benchmarks/api.py index e12bf59dc..b859b8ca0 100644 --- a/bofire/benchmarks/api.py +++ b/bofire/benchmarks/api.py @@ -20,13 +20,19 @@ ) from bofire.benchmarks.single import ( Ackley, + Booth, Branin, Branin30, + CrossInTray, + Easom, Hartmann, Hartmann6plus, Himmelblau, + HolderTable, Multinormalpdfs, MultiTaskHimmelblau, + Rosenbrock, + SixHumpCamel, ) @@ -42,11 +48,17 @@ ] AnySingleBenchmark = Union[ Ackley, + Booth, Branin, Branin30, + CrossInTray, + Easom, Hartmann, Hartmann6plus, Himmelblau, + HolderTable, MultiTaskHimmelblau, Multinormalpdfs, + Rosenbrock, + SixHumpCamel, ] diff --git a/bofire/benchmarks/single.py b/bofire/benchmarks/single.py index d08394e82..010823a47 100644 --- a/bofire/benchmarks/single.py +++ b/bofire/benchmarks/single.py @@ -735,3 +735,261 @@ def get_optima(self) -> pd.DataFrame: index=[0], ) return pd.concat([x_opt, self._f(x_opt)], axis=1) + + +class SixHumpCamel(Benchmark): + """Six-Hump Camel function. + + f(x) = (4 - 2.1*x1^2 + x1^4/3)*x1^2 + x1*x2 + (-4 + 4*x2^2)*x2^2 + + Domain: x1 in [-3, 3], x2 in [-2, 2]. + Global minimum: f ≈ -1.0316 at (0.0898, -0.7126) and (-0.0898, 0.7126). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._domain = Domain( + inputs=Inputs( + features=[ + ContinuousInput(key="x_1", bounds=[-3, 3]), + ContinuousInput(key="x_2", bounds=[-2, 2]), + ] + ), + outputs=Outputs( + features=[ + ContinuousOutput(key="y", objective=MinimizeObjective(w=1.0)) + ] + ), + ) + + def _f(self, X: pd.DataFrame, **kwargs) -> pd.DataFrame: + x1 = X["x_1"].values + x2 = X["x_2"].values + y = ( + (4 - 2.1 * x1**2 + x1**4 / 3) * x1**2 + + x1 * x2 + + (-4 + 4 * x2**2) * x2**2 + ) + return pd.DataFrame({"y": y, "valid_y": 1}) + + def get_optima(self) -> pd.DataFrame: + return pd.DataFrame( + { + "x_1": [0.0898, -0.0898], + "x_2": [-0.7126, 0.7126], + "y": [-1.0316284534898, -1.0316284534898], + "valid_y": [1, 1], + } + ) + + +class Easom(Benchmark): + """Easom function. + + f(x) = -cos(x1)*cos(x2)*exp(-((x1 - pi)^2 + (x2 - pi)^2)) + + Domain: [-100, 100]^2. + Global minimum: f = -1 at (pi, pi). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._domain = Domain( + inputs=Inputs( + features=[ + ContinuousInput(key="x_1", bounds=[-100, 100]), + ContinuousInput(key="x_2", bounds=[-100, 100]), + ] + ), + outputs=Outputs( + features=[ + ContinuousOutput(key="y", objective=MinimizeObjective(w=1.0)) + ] + ), + ) + + def _f(self, X: pd.DataFrame, **kwargs) -> pd.DataFrame: + x1 = X["x_1"].values + x2 = X["x_2"].values + y = ( + -np.cos(x1) + * np.cos(x2) + * np.exp(-((x1 - np.pi) ** 2 + (x2 - np.pi) ** 2)) + ) + return pd.DataFrame({"y": y, "valid_y": 1}) + + def get_optima(self) -> pd.DataFrame: + return pd.DataFrame( + {"x_1": [np.pi], "x_2": [np.pi], "y": [-1.0], "valid_y": [1]} + ) + + +class Rosenbrock(Benchmark): + """Rosenbrock function. + + f(x) = sum_{i=1}^{d-1} [100*(x_{i+1} - x_i^2)^2 + (1 - x_i)^2] + + Domain: [-5, 10]^d (default d=2). + Global minimum: f = 0 at (1, 1, ..., 1). + """ + + def __init__(self, dim: PositiveInt = 2, **kwargs): + super().__init__(**kwargs) + self.dim = dim + self._domain = Domain( + inputs=Inputs( + features=[ + ContinuousInput(key=f"x_{i + 1}", bounds=[-5, 10]) + for i in range(dim) + ] + ), + outputs=Outputs( + features=[ + ContinuousOutput(key="y", objective=MinimizeObjective(w=1.0)) + ] + ), + ) + + def _f(self, X: pd.DataFrame, **kwargs) -> pd.DataFrame: + x = np.column_stack([X[f"x_{i + 1}"].values for i in range(self.dim)]) + y = np.sum( + 100.0 * (x[:, 1:] - x[:, :-1] ** 2) ** 2 + (1.0 - x[:, :-1]) ** 2, + axis=1, + ) + return pd.DataFrame({"y": y, "valid_y": 1}) + + def get_optima(self) -> pd.DataFrame: + opt = {f"x_{i + 1}": [1.0] for i in range(self.dim)} + opt["y"] = [0.0] + opt["valid_y"] = [1] + return pd.DataFrame(opt) + + +class Booth(Benchmark): + """Booth function. + + f(x) = (x1 + 2*x2 - 7)^2 + (2*x1 + x2 - 5)^2 + + Domain: [-10, 10]^2. + Global minimum: f = 0 at (1, 3). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._domain = Domain( + inputs=Inputs( + features=[ + ContinuousInput(key="x_1", bounds=[-10, 10]), + ContinuousInput(key="x_2", bounds=[-10, 10]), + ] + ), + outputs=Outputs( + features=[ + ContinuousOutput(key="y", objective=MinimizeObjective(w=1.0)) + ] + ), + ) + + def _f(self, X: pd.DataFrame, **kwargs) -> pd.DataFrame: + x1 = X["x_1"].values + x2 = X["x_2"].values + y = (x1 + 2 * x2 - 7) ** 2 + (2 * x1 + x2 - 5) ** 2 + return pd.DataFrame({"y": y, "valid_y": 1}) + + def get_optima(self) -> pd.DataFrame: + return pd.DataFrame( + {"x_1": [1.0], "x_2": [3.0], "y": [0.0], "valid_y": [1]} + ) + + +class HolderTable(Benchmark): + """Holder Table function. + + f(x) = -|sin(x1)*cos(x2)*exp(|1 - sqrt(x1^2 + x2^2)/pi|)| + + Domain: [-10, 10]^2. + Global minimum: f ≈ -19.2085 at (±8.05502, ±9.66459). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._domain = Domain( + inputs=Inputs( + features=[ + ContinuousInput(key="x_1", bounds=[-10, 10]), + ContinuousInput(key="x_2", bounds=[-10, 10]), + ] + ), + outputs=Outputs( + features=[ + ContinuousOutput(key="y", objective=MinimizeObjective(w=1.0)) + ] + ), + ) + + def _f(self, X: pd.DataFrame, **kwargs) -> pd.DataFrame: + x1 = X["x_1"].values + x2 = X["x_2"].values + y = -np.abs( + np.sin(x1) + * np.cos(x2) + * np.exp(np.abs(1 - np.sqrt(x1**2 + x2**2) / np.pi)) + ) + return pd.DataFrame({"y": y, "valid_y": 1}) + + def get_optima(self) -> pd.DataFrame: + return pd.DataFrame( + { + "x_1": [8.05502, -8.05502, 8.05502, -8.05502], + "x_2": [9.66459, 9.66459, -9.66459, -9.66459], + "y": [-19.2085] * 4, + "valid_y": [1] * 4, + } + ) + + +class CrossInTray(Benchmark): + """Cross-in-Tray function. + + f(x) = -0.0001 * (|sin(x1)*sin(x2)*exp(|100 - sqrt(x1^2+x2^2)/pi|)| + 1)^0.1 + + Domain: [-10, 10]^2. + Global minimum: f ≈ -2.06261 at (±1.3491, ±1.3491). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._domain = Domain( + inputs=Inputs( + features=[ + ContinuousInput(key="x_1", bounds=[-10, 10]), + ContinuousInput(key="x_2", bounds=[-10, 10]), + ] + ), + outputs=Outputs( + features=[ + ContinuousOutput(key="y", objective=MinimizeObjective(w=1.0)) + ] + ), + ) + + def _f(self, X: pd.DataFrame, **kwargs) -> pd.DataFrame: + x1 = X["x_1"].values + x2 = X["x_2"].values + inner = np.abs( + np.sin(x1) + * np.sin(x2) + * np.exp(np.abs(100 - np.sqrt(x1**2 + x2**2) / np.pi)) + ) + y = -0.0001 * (inner + 1) ** 0.1 + return pd.DataFrame({"y": y, "valid_y": 1}) + + def get_optima(self) -> pd.DataFrame: + return pd.DataFrame( + { + "x_1": [1.3491, -1.3491, 1.3491, -1.3491], + "x_2": [1.3491, 1.3491, -1.3491, -1.3491], + "y": [-2.06261] * 4, + "valid_y": [1] * 4, + } + ) diff --git a/bofire/data_models/strategies/api.py b/bofire/data_models/strategies/api.py index 3d96adc7f..e60dd3a88 100644 --- a/bofire/data_models/strategies/api.py +++ b/bofire/data_models/strategies/api.py @@ -58,8 +58,11 @@ AlwaysTrueCondition, AnyCondition, CombiCondition, + ExpMinRegretGapCondition, FeasibleExperimentCondition, + LogEIPCCondition, NumberOfExperimentsCondition, + UCBLCBRegretBoundCondition, ) from bofire.data_models.strategies.stepwise.stepwise import Step, StepwiseStrategy from bofire.data_models.strategies.strategy import Strategy diff --git a/bofire/data_models/strategies/stepwise/conditions.py b/bofire/data_models/strategies/stepwise/conditions.py index ec263d45b..7665c34cc 100644 --- a/bofire/data_models/strategies/stepwise/conditions.py +++ b/bofire/data_models/strategies/stepwise/conditions.py @@ -1,17 +1,21 @@ from abc import abstractmethod from typing import Annotated, Any, List, Literal, Optional, Union +import numpy as np import pandas as pd -from pydantic import Field, PositiveInt, field_validator +from pydantic import Field, PositiveFloat, PositiveInt, PrivateAttr, field_validator, model_validator from bofire.data_models.base import BaseModel from bofire.data_models.domain.api import Domain from bofire.data_models.objectives.api import ConstrainedObjective + class EvaluateableCondition: @abstractmethod - def evaluate(self, domain: Domain, experiments: Optional[pd.DataFrame]) -> bool: + def evaluate( + self, domain: Domain, experiments: Optional[pd.DataFrame], **kwargs + ) -> bool: pass @@ -45,7 +49,9 @@ class FeasibleExperimentCondition(SingleCondition, EvaluateableCondition): n_required_feasible_experiments: PositiveInt = 1 threshold: Annotated[float, Field(ge=0, le=1)] = 0.9 - def evaluate(self, domain: Domain, experiments: Optional[pd.DataFrame]) -> bool: + def evaluate( + self, domain: Domain, experiments: Optional[pd.DataFrame], **kwargs + ) -> bool: constrained_outputs = domain.outputs.get_by_objective(ConstrainedObjective) if len(constrained_outputs) == 0: return False @@ -80,7 +86,9 @@ class NumberOfExperimentsCondition(SingleCondition, EvaluateableCondition): type: Literal["NumberOfExperimentsCondition"] = "NumberOfExperimentsCondition" n_experiments: Annotated[int, Field(ge=1)] - def evaluate(self, domain: Domain, experiments: Optional[pd.DataFrame]) -> bool: + def evaluate( + self, domain: Domain, experiments: Optional[pd.DataFrame], **kwargs + ) -> bool: if experiments is None: n_experiments = 0 else: @@ -93,7 +101,9 @@ def evaluate(self, domain: Domain, experiments: Optional[pd.DataFrame]) -> bool: class AlwaysTrueCondition(SingleCondition, EvaluateableCondition): type: Literal["AlwaysTrueCondition"] = "AlwaysTrueCondition" - def evaluate(self, domain: Domain, experiments: Optional[pd.DataFrame]) -> bool: + def evaluate( + self, domain: Domain, experiments: Optional[pd.DataFrame], **kwargs + ) -> bool: return True @@ -101,7 +111,13 @@ class CombiCondition(Condition, EvaluateableCondition): type: Literal["CombiCondition"] = "CombiCondition" conditions: Annotated[ List[ - Union[NumberOfExperimentsCondition, "CombiCondition", AlwaysTrueCondition] + Union[ + NumberOfExperimentsCondition, + "CombiCondition", + AlwaysTrueCondition, + "UCBLCBRegretBoundCondition", + "ExpMinRegretGapCondition", + ] ], Field(min_length=2), ] @@ -116,19 +132,358 @@ def validate_n_required_conditions(cls, v, info): ) return v - def evaluate(self, domain: Domain, experiments: Optional[pd.DataFrame]) -> bool: + def evaluate( + self, domain: Domain, experiments: Optional[pd.DataFrame], **kwargs + ) -> bool: n_matched_conditions = 0 for c in self.conditions: - if c.evaluate(domain, experiments): + if c.evaluate(domain, experiments, **kwargs): n_matched_conditions += 1 if n_matched_conditions >= self.n_required_conditions: return True return False +class UCBLCBRegretBoundCondition(SingleCondition, EvaluateableCondition): + """Condition based on the UCB-LCB regret bound from Makarova et al. (2022). + + Returns ``True`` (keep going) while the regret bound + ``min_x_evaluated UCB(x) - min_x_domain LCB(x)`` exceeds the threshold + ``epsilon_BO``, using GP-UCB style bounds + ``mu(x) ± sqrt(beta) * sigma(x)``. + + The threshold ``epsilon_BO`` depends on ``noise_variance``: + + - ``None`` (default): GP-estimated noise ``likelihood.noise``. + - ``"cv"``: corrected CV-fold std of the incumbent + (Nadeau and Bengio, 2003). Requires ``cv_fold_columns``. + - positive float: used directly as the noise variance. + + In all cases the threshold is ``threshold_factor * ``. + + Requires a fitted GP-based strategy (e.g. ``SoboStrategy``) passed via + the ``strategy`` kwarg from the ``StepwiseStrategy``. + + Reference: + Makarova et al. (2022): "Automatic Termination for Hyperparameter + Optimization" (AutoML 2022). + + Attributes: + noise_variance: Noise variance source (see description). + threshold_factor: Multiplier for the threshold (``decay`` in + Makarova et al. 2022 for the CV mode). + cv_fold_columns: Column names with per-fold CV scores; required + when ``noise_variance="cv"``. + topq: Fraction of best observations used for the internal + regret-bound GP. ``1.0`` disables filtering. The main + strategy's GP is unaffected. + min_topq: Minimum observations kept under top-q filtering. + min_experiments: Minimum experiments before termination is checked. + """ + + type: Literal["UCBLCBRegretBoundCondition"] = "UCBLCBRegretBoundCondition" + noise_variance: Optional[Union[PositiveFloat, Literal["cv"]]] = None + threshold_factor: PositiveFloat = 1.0 + cv_fold_columns: Optional[List[str]] = None + topq: Annotated[float, Field(gt=0, le=1)] = 1.0 + min_topq: PositiveInt = 20 + min_experiments: PositiveInt = 5 + + @model_validator(mode="after") + def validate_cv_fold_columns(self): + if self.noise_variance == "cv": + if self.cv_fold_columns is None or len(self.cv_fold_columns) < 2: + raise ValueError( + "cv_fold_columns must be a list of at least 2 column names " + 'when noise_variance="cv".', + ) + return self + + def evaluate( + self, domain: Domain, experiments: Optional[pd.DataFrame], **kwargs + ) -> bool: + """Check if optimization should continue (``True``) or stop (``False``). + + Args: + domain: The optimization domain. + experiments: Experiments conducted so far. + **kwargs: Must include ``strategy`` — the fitted ``BotorchStrategy``. + + Returns: + ``True`` if optimization should continue, ``False`` if the + regret bound has dropped below ``epsilon_BO``. + """ + strategy = kwargs.get("strategy") + + if strategy is None: + return True + if not getattr(strategy, "is_fitted", False) or getattr( + strategy, "model", None + ) is None: + return True + + if experiments is None or len(experiments) < self.min_experiments: + return True + + from bofire.termination.evaluator import UCBLCBRegretEvaluator + + evaluator = UCBLCBRegretEvaluator() + + # Top-q filtering: refit the regret-bound GP on the best fraction. + eval_strategy = strategy + eval_experiments = experiments + if self.topq < 1.0: + output_key = domain.outputs.get_keys()[0] + y_values = experiments[output_key].values + n = len(y_values) + topn = max(self.min_topq, int(n * self.topq)) + if topn < n: + top_indices = np.argsort(y_values)[:topn] + eval_experiments = experiments.iloc[top_indices].reset_index( + drop=True, + ) + from bofire.strategies.mapper import map as map_strategy + + try: + eval_strategy = map_strategy(strategy._data_model) + eval_strategy.tell(eval_experiments) + except Exception: + return True + + metrics = evaluator.evaluate( + eval_strategy, eval_experiments, len(experiments), + ) + + if not metrics: + return True + + regret_bound = metrics["regret_bound"] + + from bofire.termination.thresholds import ( + compute_threshold_cv, + compute_threshold_noise, + ) + + output_key = domain.outputs.get_keys()[0] + + if isinstance(self.noise_variance, (int, float)): + epsilon_bo = compute_threshold_noise( + self.noise_variance, self.threshold_factor, + ) + elif self.noise_variance == "cv": + epsilon_bo = compute_threshold_cv( + experiments, output_key, self.cv_fold_columns, + self.threshold_factor, + ) + else: + epsilon_bo = compute_threshold_noise( + metrics.get("estimated_noise_variance"), + self.threshold_factor, + ) + + if epsilon_bo is None: + return True + + return regret_bound >= epsilon_bo + + +class ExpMinRegretGapCondition(SingleCondition, EvaluateableCondition): + """Expected minimum regret gap stopping criterion (Ishibashi et al. 2023). + + Returns ``True`` (keep optimising) while the stopping value + ``delta_f + ei_diff + kappa * sqrt(KL / 2)`` exceeds the threshold, + and ``False`` (stop) otherwise. + + Two threshold modes: + + - ``"adaptive"`` (default): theoretically motivated threshold from the + GP noise and posterior variances; Ishibashi et al. (2023), eq. (16). + - ``"median"``: heuristic ``rate * median(early values)`` over the + first ``start_timing`` stopping values. + + Stateful: keeps the previous-iteration GP model to compare consecutive + posteriors. Always returns ``True`` before ``min_experiments`` is + reached. + + Reference: + Ishibashi & Ye (2023): "A stopping criterion for Bayesian optimization + by the gap of expected minimum simple regrets" (AISTATS 2023). + + Attributes: + threshold_mode: ``"adaptive"`` or ``"median"``. + delta: Confidence parameter for beta and the adaptive threshold. + rate: Fraction of the median stopping value used as threshold in + ``"median"`` mode. + start_timing: Stopping values collected before the median threshold + can be computed / the condition can trigger. + min_experiments: Minimum experiments before checking. + beta_scale: Scaling factor for the GP-UCB beta parameter. + n_samples_lcb: Random samples for the min-LCB estimate in kappa. + """ + + type: Literal["ExpMinRegretGapCondition"] = "ExpMinRegretGapCondition" + threshold_mode: Literal["adaptive", "median"] = "adaptive" + delta: PositiveFloat = 0.1 + rate: PositiveFloat = 0.1 + start_timing: PositiveInt = 10 + min_experiments: PositiveInt = 5 + beta_scale: PositiveFloat = 1.0 + n_samples_lcb: PositiveInt = 1000 + + _evaluator: Any = PrivateAttr(default=None) + + def _get_evaluator(self): + if self._evaluator is None: + from bofire.termination.evaluator import ExpMinRegretGapEvaluator + + self._evaluator = ExpMinRegretGapEvaluator( + delta=self.delta, + rate=self.rate, + start_timing=self.start_timing, + beta_scale=self.beta_scale, + n_samples_lcb=self.n_samples_lcb, + ) + return self._evaluator + + def evaluate( + self, domain: Domain, experiments: Optional[pd.DataFrame], **kwargs + ) -> bool: + """Check if optimization should continue (``True``) or stop (``False``). + + Args: + domain: The optimization domain. + experiments: Experiments conducted so far. + **kwargs: Must include ``strategy`` — the fitted ``BotorchStrategy``. + + Returns: + ``True`` if optimization should continue, ``False`` when the + stopping value drops below the selected threshold. + """ + strategy = kwargs.get("strategy") + + if strategy is None: + return True + if not getattr(strategy, "is_fitted", False) or getattr( + strategy, "model", None + ) is None: + return True + + if experiments is None or len(experiments) < self.min_experiments: + return True + + evaluator = self._get_evaluator() + metrics = evaluator.evaluate(strategy, experiments, len(experiments)) + + if not metrics: + return True + + stopping_value = metrics["stopping_value"] + + if self.threshold_mode == "adaptive": + threshold = metrics.get("threshold_adaptive") + else: + threshold = metrics.get("threshold_median") + + if threshold is None: + return True + + return bool(stopping_value >= threshold) + + +class LogEIPCCondition(SingleCondition, EvaluateableCondition): + """Cost-aware stopping criterion (Xie et al., 2025). + + Stops (returns ``False``) when the maximum log expected-improvement-per-cost + over the domain drops to zero or below — i.e. no unevaluated point's + expected improvement is worth its evaluation cost: + + stop when max_x [ LogEI(x) - alpha * log(c(x)) - log(lambda_cost) ] <= 0 + + Ideal for chemical experiments where reagent, time, or equipment costs + matter. The ``cost_column`` attribute lets you record the actual cost of + each experiment and use the running mean as the cost estimate. + + Attributes: + lambda_cost: Exchange rate between cost and improvement. Higher values + favour earlier stopping (require higher improvement-to-cost ratio + to continue). Default ``1.0``. + cost_column: Name of the column in the experiments DataFrame that + records the cost of each experiment. When set, the mean of past + costs is used as the cost estimate. Takes priority over + ``cost_value``. + cost_value: Fixed cost per experiment used when ``cost_column`` is not + provided. Default ``1.0``. + alpha: Exponent applied to the cost in the LogEIPC formula. ``1.0`` + (default) matches the paper's primary formulation. + min_experiments: Minimum experiments before the condition is checked. + n_samples: Random domain samples used to approximate the max LogEIPC. + + Reference: + Xie et al. (2025): "Cost-Aware Stopping for Bayesian Optimization" + (arXiv:2507.12453). + """ + + type: Literal["LogEIPCCondition"] = "LogEIPCCondition" + lambda_cost: PositiveFloat = 1.0 + cost_column: Optional[str] = None + cost_value: PositiveFloat = 1.0 + alpha: PositiveFloat = 1.0 + min_experiments: PositiveInt = 5 + n_samples: PositiveInt = 2000 + search_method: Literal["sample", "optimize"] = "sample" + cost_model: Literal["mean", "gp"] = "mean" + + def evaluate( + self, domain: Domain, experiments: Optional[pd.DataFrame], **kwargs + ) -> bool: + """Check if optimization should continue (``True``) or stop (``False``). + + Args: + domain: The optimization domain. + experiments: Experiments conducted so far. + **kwargs: Must include ``strategy`` — the fitted ``BotorchStrategy``. + + Returns: + ``True`` if optimization should continue, ``False`` when + ``max_log_eipc <= 0``. + """ + strategy = kwargs.get("strategy") + + if strategy is None: + return True + if not getattr(strategy, "is_fitted", False) or getattr( + strategy, "model", None + ) is None: + return True + + if experiments is None or len(experiments) < self.min_experiments: + return True + + from bofire.termination.evaluator import LogEIPCEvaluator + + evaluator = LogEIPCEvaluator( + lambda_cost=self.lambda_cost, + cost_column=self.cost_column, + cost_value=self.cost_value, + alpha=self.alpha, + n_samples=self.n_samples, + search_method=self.search_method, + cost_model=self.cost_model, + ) + metrics = evaluator.evaluate(strategy, experiments, len(experiments)) + + if not metrics: + return True + + return bool(metrics["max_log_eipc"] > 0) + + AnyCondition = Union[ NumberOfExperimentsCondition, CombiCondition, AlwaysTrueCondition, FeasibleExperimentCondition, + UCBLCBRegretBoundCondition, + ExpMinRegretGapCondition, + LogEIPCCondition, ] diff --git a/bofire/runners/run.py b/bofire/runners/run.py index 977521fd1..600677999 100644 --- a/bofire/runners/run.py +++ b/bofire/runners/run.py @@ -75,9 +75,7 @@ def autosafe_results(benchmark): pbar.set_postfix({"Current Best:": f"{metric_values[i]:0.3f}"}) if (i + 1) % safe_interval == 0: autosafe_results(benchmark=benchmark) - return strategy.experiments, pd.Series( - metric_values - ) # ty: ignore[invalid-return-type] + return strategy.experiments, pd.Series(metric_values) def run( diff --git a/bofire/strategies/api.py b/bofire/strategies/api.py index 7cad2a275..355f68c52 100644 --- a/bofire/strategies/api.py +++ b/bofire/strategies/api.py @@ -21,5 +21,5 @@ ) from bofire.strategies.random import RandomStrategy from bofire.strategies.shortest_path import ShortestPathStrategy -from bofire.strategies.stepwise.stepwise import StepwiseStrategy +from bofire.strategies.stepwise.stepwise import OptimizationComplete, StepwiseStrategy from bofire.strategies.strategy import Strategy diff --git a/bofire/strategies/stepwise/stepwise.py b/bofire/strategies/stepwise/stepwise.py index c5dc850d1..2c51bf103 100644 --- a/bofire/strategies/stepwise/stepwise.py +++ b/bofire/strategies/stepwise/stepwise.py @@ -15,6 +15,17 @@ from bofire.transforms.transform import Transform +class OptimizationComplete(Exception): + """Raised by StepwiseStrategy when no step's condition is satisfied. + + This signals that the optimization is complete, e.g., because a + termination condition (such as UCBLCBRegretBoundCondition) has been met + and there is no further step to execute. + """ + + pass + + T = TypeVar("T", pd.DataFrame, Domain) @@ -43,13 +54,28 @@ def has_sufficient_experiments(self) -> bool: return True def get_step(self) -> Tuple[Strategy, Optional[Transform]]: - """Returns the strategy at the current step and the corresponding transform if given.""" + """Returns the strategy at the current step and the corresponding transform if given. + + The strategy for each step is passed to the condition via the + ``strategy`` keyword argument. This allows conditions like + ``UCBLCBRegretBoundCondition`` to access the fitted model. + + Raises: + OptimizationComplete: When no step's condition is satisfied, + signalling that the optimization is done. + """ for i, condition in enumerate(self.conditions): - if condition.evaluate(self.domain, experiments=self.experiments): + if condition.evaluate( + self.domain, + experiments=self.experiments, + strategy=self.strategies[i], + ): return self.strategies[i], self.transforms[ i ] # ty: ignore[invalid-return-type] - raise ValueError("No condition could be satisfied.") + raise OptimizationComplete( + "No step condition is satisfied. Optimization is complete." + ) def _ask(self, candidate_count: Optional[PositiveInt]) -> pd.DataFrame: strategy, transform = self.get_step() diff --git a/bofire/termination/__init__.py b/bofire/termination/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/bofire/termination/api.py b/bofire/termination/api.py new file mode 100644 index 000000000..dc1e477f7 --- /dev/null +++ b/bofire/termination/api.py @@ -0,0 +1,16 @@ +"""API exports for termination module.""" + +from bofire.termination.evaluator import ( + ExpMinRegretGapEvaluator, + LogEIPCEvaluator, + TerminationEvaluator, + UCBLCBRegretEvaluator, +) + + +__all__ = [ + "ExpMinRegretGapEvaluator", + "LogEIPCEvaluator", + "TerminationEvaluator", + "UCBLCBRegretEvaluator", +] diff --git a/bofire/termination/evaluator.py b/bofire/termination/evaluator.py new file mode 100644 index 000000000..e8a359a63 --- /dev/null +++ b/bofire/termination/evaluator.py @@ -0,0 +1,1120 @@ +"""Termination evaluators that compute metrics for termination conditions.""" + +import base64 +import io +from abc import ABC, abstractmethod +from typing import Any, Callable, Dict, List, Literal, Optional, Union + +import numpy as np +import pandas as pd +import torch +from botorch.acquisition import AnalyticAcquisitionFunction +from botorch.acquisition.analytic import ( + LogExpectedImprovement, + UpperConfidenceBound, + _log_ei_helper, +) +from botorch.acquisition.analytic import PosteriorTransform +from botorch.utils.transforms import t_batch_mode_transform +from botorch.fit import fit_gpytorch_mll +from botorch.models import SingleTaskGP +from botorch.models.model import Model +from gpytorch.mlls import ExactMarginalLogLikelihood +from scipy.stats import norm + +from bofire.utils.torch_tools import tkwargs + + +class TerminationEvaluator(ABC): + """Base class for termination evaluators. + + Computes metrics from a BO strategy that termination conditions use to + decide whether to stop. + + Args: + delta: Confidence parameter for the GP-UCB beta formula (Srinivas et + al., 2010). Controls how optimistic the confidence bounds are. + Smaller values give larger beta (wider bounds). Default ``0.1``. + beta_scale: Scaling factor applied to the GP-UCB beta. Makarova et + al. use ``0.2``; Ishibashi et al. use ``1.0``. + beta_log_multiplier: Multiplier inside the log term of the beta + formula. Default ``2.0``. + beta_log_denominator: Denominator inside the log term of the beta + formula. Default ``6.0``. + beta_min: Floor on beta to avoid degenerate near-zero bounds. + Default ``0.01``. + beta_t_offset: If set, the beta formula uses + ``t = num_observed - beta_t_offset`` instead of the total + observation count. Useful when papers index beta by BO iteration + rather than total evaluations (e.g. excluding the random init). + """ + + def __init__( + self, + delta: float = 0.1, + beta_scale: float = 0.2, + beta_log_multiplier: float = 2.0, + beta_log_denominator: float = 6.0, + beta_min: float = 0.01, + beta_t_offset: Optional[int] = None, + ): + # GP-UCB beta parameters (Srinivas et al., 2010). + self.delta = delta + self.beta_scale = beta_scale + self.beta_log_multiplier = beta_log_multiplier + self.beta_log_denominator = beta_log_denominator + self.beta_min = beta_min + # If set, use ``t = num_observed - beta_t_offset`` in the beta formula + # (for papers that index beta by BO iteration rather than observation count). + self.beta_t_offset = beta_t_offset + + def _compute_beta(self, dimensionality: int, num_observed: int) -> float: + """Compute beta for UCB/LCB via the GP-UCB formula. + + ``beta_t = beta_scale * 2 * log(d * t^2 * pi^2 / (6 * delta))``. + Makarova et al. use ``beta_scale=0.2``; Ishibashi et al. use ``1.0``. + """ + t = num_observed + if self.beta_t_offset is not None: + t = max(1, num_observed - self.beta_t_offset) + beta = ( + self.beta_scale + * self.beta_log_multiplier + * np.log( + dimensionality + * t**2 + * np.pi**2 + / (self.beta_log_denominator * self.delta) + ) + ) + return max(beta, self.beta_min) + + @staticmethod + def get_output_scale(model: Model) -> float: + """Return the output std from a ``Standardize`` outcome transform, or 1.0. + + BoTorch models with ``Standardize`` return posteriors in original scale, + while the reference stopping criteria (Ishibashi / Makarova) assume + standardized quantities. Callers can divide by this value to convert + back when needed. + """ + if hasattr(model, "outcome_transform"): + try: + return float(model.outcome_transform.stdvs.item()) + except Exception: + pass + return 1.0 + + @staticmethod + def _min_ucb_at_points( + model: Model, + X: torch.Tensor, + sqrt_beta: float, + ) -> float: + """Return min UCB = mu + sqrt(beta)*sigma over ``X``.""" + ucb = UpperConfidenceBound(model=model, beta=sqrt_beta ** 2) + with torch.no_grad(): + return float(ucb(X.unsqueeze(-2)).min().item()) + + @staticmethod + def _min_lcb_by_sampling( + model: Model, + X_evaluated: torch.Tensor, + sqrt_beta: float, + bounds_lower: torch.Tensor, + bounds_upper: torch.Tensor, + n_samples: int = 2000, + batch_size: int = 512, + ) -> float: + """Return min LCB = mu - sqrt(beta)*sigma via random sampling.""" + n_dims = bounds_lower.shape[0] + X_random = bounds_lower + (bounds_upper - bounds_lower) * torch.rand( + n_samples, n_dims, **tkwargs + ) + X_candidates = torch.cat([X_random, X_evaluated], dim=0) + + min_lcb = float("inf") + with torch.no_grad(): + for start in range(0, X_candidates.shape[0], batch_size): + X_batch = X_candidates[start : start + batch_size] + post = model.posterior(X_batch) + m = post.mean.squeeze(-1) + s = post.variance.squeeze(-1).sqrt() + lcb = m - sqrt_beta * s + batch_min = float(lcb.min().item()) + if batch_min < min_lcb: + min_lcb = batch_min + return min_lcb + + def _min_lcb_optimize( + self, + strategy, + experiments: pd.DataFrame, + sqrt_beta: float, + ) -> float: + """Minimise LCB over the domain via the strategy's acqf optimizer.""" + neg_lcb = UpperConfidenceBound( + model=strategy.model, beta=float(sqrt_beta) ** 2, maximize=False + ) + candidates = strategy.acqf_optimizer.optimize( + candidate_count=1, + acqfs=[neg_lcb], + domain=strategy.domain, + experiments=experiments, + ) + + transformed_candidate = strategy.domain.inputs.transform( + candidates, + strategy.input_preprocessing_specs, + ) + X_best = torch.from_numpy(transformed_candidate.values).to(**tkwargs) + with torch.no_grad(): + posterior_best = strategy.model.posterior(X_best) + mean_best = posterior_best.mean.squeeze(-1) + std_best = posterior_best.variance.squeeze(-1).sqrt() + + return float((mean_best - sqrt_beta * std_best).item()) + + def _ucb_lcb_regret_bound( + self, + model: Model, + X_evaluated: torch.Tensor, + sqrt_beta: float, + bounds_lower: torch.Tensor, + bounds_upper: torch.Tensor, + n_samples: int = 2000, + batch_size: int = 512, + strategy=None, + experiments: Optional[pd.DataFrame] = None, + ) -> tuple: + """Upper bound on simple regret via the UCB-LCB gap. + + Returns ``(regret_bound, min_ucb, min_lcb)`` with + ``regret_bound = max(0, min_UCB(evaluated) - min_LCB(domain))``. + """ + min_ucb = TerminationEvaluator._min_ucb_at_points( + model, X_evaluated, sqrt_beta, + ) + + if self.lcb_method == "optimize" and strategy is not None and experiments is not None: + min_lcb = self._min_lcb_optimize(strategy, experiments, sqrt_beta) + else: + min_lcb = TerminationEvaluator._min_lcb_by_sampling( + model, X_evaluated, sqrt_beta, bounds_lower, bounds_upper, + n_samples, batch_size, + ) + return max(0.0, min_ucb - min_lcb), min_ucb, min_lcb + + @abstractmethod + def evaluate( + self, + strategy, # BotorchStrategy + experiments: pd.DataFrame, + iteration: int, + ) -> Dict[str, Any]: + """Return a dict of termination metrics for the current iteration.""" + pass + + +# TODO: replace with `from botorch.acquisition.analytic import LogExpectedImprovementPerCost` +# once the BoTorch PR (wgst/botorch feature/log-expected-improvement-with-cost) is merged. +class LogExpectedImprovementPerCost(AnalyticAcquisitionFunction): + r"""Single-outcome Log Expected Improvement per unit cost (analytic). + + Computes the log expected improvement adjusted for the cost of evaluating + the candidate point: + + LogEIC(x) = LogEI(x; best_f) - alpha * log(c(x)) + + where ``LogEI`` is the log expected improvement (Ament et al., 2023) and + ``c(x)`` is the evaluation cost at ``x``. + + This is the acquisition function underlying the cost-aware stopping rule + of Xie et al. (2025): stop when ``max_x LogEIC(x) + log(lambda)`` is + non-positive, i.e. no candidate's expected improvement exceeds its cost + scaled by the exchange rate ``lambda``. + + Args: + model: A fitted single-output GP model. + best_f: Best observed function value (EI baseline). Scalar or + ``(batch_shape,)`` tensor. + cost_callable: ``c(X: Tensor[..., d]) -> Tensor[...]`` — evaluation + cost at each candidate point. Must return strictly positive values. + alpha: Cost exponent in ``c(x)^alpha``. ``1.0`` (default) matches the + primary formulation of Xie et al. (2025). + posterior_transform: Optional posterior transform. + maximize: If ``True``, treat the problem as maximisation. + + References: + Ament et al. (2023): "Unexpected Improvements to Expected Improvement + for Bayesian Optimization" (NeurIPS 2023). + Xie et al. (2025): "Cost-Aware Stopping for Bayesian Optimization" + (arXiv:2507.12453). + """ + + def __init__( + self, + model: Model, + best_f: Union[float, torch.Tensor], + cost_callable: Callable[[torch.Tensor], torch.Tensor], + alpha: float = 1.0, + posterior_transform: Optional[PosteriorTransform] = None, + maximize: bool = False, + ) -> None: + super(AnalyticAcquisitionFunction, self).__init__(model=model) + self.posterior_transform = posterior_transform + self.maximize = maximize + self.register_buffer("best_f", torch.as_tensor(best_f)) + self.cost_callable = cost_callable + self.alpha = alpha + + @t_batch_mode_transform(expected_q=1) + def forward(self, X: torch.Tensor) -> torch.Tensor: + r"""Evaluate LogEIC at ``X``. + + Args: + X: ``(b1 x ... bk) x 1 x d``-dim batched tensor of candidate points. + + Returns: + ``(b1 x ... bk)``-dim tensor of LogEIC values. + """ + posterior = self.model.posterior( + X.squeeze(-2), posterior_transform=self.posterior_transform + ) + mean = posterior.mean.squeeze(-1) + sigma = posterior.variance.squeeze(-1).clamp_min(1e-12).sqrt() + u = (mean - self.best_f.expand_as(mean)) / sigma + if not self.maximize: + u = -u + log_ei = _log_ei_helper(u) + sigma.log() + + costs = self.cost_callable(X.squeeze(-2)) + log_cost = costs.clamp(min=1e-12).log() + return log_ei - self.alpha * log_cost + + + +class UCBLCBRegretEvaluator(TerminationEvaluator): + """Evaluator for the UCB-LCB regret bound (Makarova et al., 2022). + + The bound is ``min_{x in evaluated} UCB(x) - min_{x in domain} LCB(x)`` + using the GP-UCB formulation (Srinivas et al., 2010). Stopping is + triggered when this gap is small — meaning the best candidate already + evaluated is nearly as good as the best anywhere in the domain. + + Single-output minimisation only. + + Args: + delta: Confidence parameter for the GP-UCB beta formula. Default ``0.1``. + beta_scale: Scaling factor for beta. Makarova et al. use ``0.2``. + beta_log_multiplier: Multiplier inside the log term of beta. Default ``2.0``. + beta_log_denominator: Denominator inside the log term of beta. Default ``6.0``. + beta_min: Floor on beta. Default ``0.01``. + beta_t_offset: If set, indexes beta by BO iteration rather than total + observation count (``t = num_observed - beta_t_offset``). + fallback_noise_variance: Noise variance used when it cannot be read + from the GP likelihood (e.g. deterministic surrogates). Default ``1e-4``. + n_samples_lcb: Number of random domain points used when + ``lcb_method="sample"`` to approximate the minimum LCB over the + domain. Default ``2000``. + batch_size: Batch size for GP posterior evaluation during sampling. + Default ``512``. + lcb_method: How the domain-wide minimum LCB is found. + ``"sample"`` (default) draws ``n_samples_lcb`` random points — + matches the reference implementation. ``"optimize"`` uses + BoFire's acquisition function optimizer for higher accuracy. + + References: + Makarova et al. (2022): "Automatic Termination for Hyperparameter + Optimization" (AutoML 2022). + Srinivas et al. (2010): "Gaussian Process Optimization in the Bandit + Setting: No Regret and Experimental Design" (ICML 2010). + """ + + def __init__( + self, + delta: float = 0.1, + beta_scale: float = 0.2, + beta_log_multiplier: float = 2.0, + beta_log_denominator: float = 6.0, + beta_min: float = 0.01, + beta_t_offset: Optional[int] = None, + fallback_noise_variance: float = 1e-4, + n_samples_lcb: int = 2000, + batch_size: int = 512, + lcb_method: Literal["sample", "optimize"] = "sample", + ): + super().__init__( + delta=delta, + beta_scale=beta_scale, + beta_log_multiplier=beta_log_multiplier, + beta_log_denominator=beta_log_denominator, + beta_min=beta_min, + beta_t_offset=beta_t_offset, + ) + self.fallback_noise_variance = fallback_noise_variance + self.n_samples_lcb = n_samples_lcb + self.batch_size = batch_size + self.lcb_method = lcb_method + + def evaluate( + self, + strategy, + experiments: pd.DataFrame, + iteration: int, + ) -> Dict[str, Any]: + """Return regret-bound metrics, or an empty dict when not applicable.""" + if not strategy.is_fitted or strategy.model is None: + return {} + + if len(experiments) < 2: + return {} + + if strategy.model.num_outputs != 1: + return {} + + input_keys = strategy.domain.inputs.get_keys() + dimensionality = len(input_keys) + num_observed = len(experiments) + beta = self._compute_beta(dimensionality, num_observed) + sqrt_beta = np.sqrt(beta) + + evaluated_inputs = experiments[input_keys] + transformed_evaluated = strategy.domain.inputs.transform( + evaluated_inputs, + strategy.input_preprocessing_specs, + ) + X_evaluated = torch.from_numpy(transformed_evaluated.values).to(**tkwargs) + + bounds = strategy.domain.inputs.get_bounds( + specs=strategy.input_preprocessing_specs + ) + lower = torch.tensor(bounds[0], **tkwargs) + upper = torch.tensor(bounds[1], **tkwargs) + + regret_bound, min_ucb_evaluated, min_lcb_domain = self._ucb_lcb_regret_bound( + strategy.model, X_evaluated, sqrt_beta, lower, upper, + self.n_samples_lcb, self.batch_size, strategy=strategy, + experiments=experiments, + ) + + # Noise variance from the GP likelihood (used by conditions to + # compute the termination threshold). + try: + estimated_noise_var = strategy.model.likelihood.noise.item() + except Exception: + estimated_noise_var = self.fallback_noise_variance + + return { + "regret_bound": regret_bound, + "min_ucb_evaluated": min_ucb_evaluated, + "min_lcb_domain": min_lcb_domain, + "estimated_noise_variance": estimated_noise_var, + "beta": beta, + } + + +class ExpMinRegretGapEvaluator(TerminationEvaluator): + """Evaluator for the expected minimum regret gap (Ishibashi et al., 2023). + + Computes a stopping value that upper-bounds the change in expected minimum + simple regret between consecutive BO iterations: + + value_t = delta_f + ei_diff + kappa * sqrt(KL / 2) + + where ``delta_f`` is the change in the GP mean at the incumbent, + ``ei_diff`` is the expected improvement from switching incumbents, + ``kappa`` is the UCB-LCB regret bound from the previous model, and + ``KL`` is the KL divergence of the old GP prior vs. the updated + posterior at the newly observed point. + + Stateful: stores the previous GP model and incumbent index between calls. + The first call always returns an empty dict. Single-output minimisation only. + + Args: + delta: Confidence parameter for the GP-UCB beta formula and the + adaptive threshold. Default ``0.1``. + beta_scale: Scaling factor for beta. Ishibashi et al. use ``1.0`` + (no extra scaling). + beta_log_multiplier: Multiplier inside the log term of beta. Default ``2.0``. + beta_log_denominator: Denominator inside the log term of beta. Default ``6.0``. + beta_min: Floor on beta. Default ``0.01``. + beta_t_offset: If set, indexes beta by BO iteration rather than total + observation count (``t = num_observed - beta_t_offset``). + n_samples_lcb: Random domain points for the min-LCB estimate used in + ``kappa``. Default ``1000``. + batch_size: Batch size for GP posterior evaluation during sampling. + Default ``512``. + threshold_mode: How the stopping threshold is computed. + ``"adaptive"`` uses the theoretically motivated threshold from the + GP noise and posterior variances (eq. 16 in the paper). + ``"median"`` uses ``rate * median`` of the first ``start_timing`` + stopping values. ``"adaptive_median"`` (default) combines both — + stops when either threshold is exceeded. + start_timing: Number of stopping values to collect before the median + threshold can be computed. Default ``10``. + rate: Fraction of the median used as the median threshold. + Default ``0.1``. + noise_var_override: If set, overrides the GP's learned noise variance. + Set to ``1e-6`` for noise-free synthetic benchmarks to match the + reference implementation. + lcb_method: How the domain-wide minimum LCB (``kappa``) is found. + ``"sample"`` (default) or ``"optimize"``. + + Reference: + Ishibashi et al. (2023): "A stopping criterion for Bayesian optimization + by the gap of expected minimum simple regrets" (AISTATS 2023). + """ + + def __init__( + self, + delta: float = 0.1, + beta_scale: float = 1.0, # Ishibashi et al. use no extra scaling. + beta_log_multiplier: float = 2.0, + beta_log_denominator: float = 6.0, + beta_min: float = 0.01, + beta_t_offset: Optional[int] = None, + n_samples_lcb: int = 1000, + batch_size: int = 512, + threshold_mode: Literal[ + "adaptive", "median", "adaptive_median" + ] = "adaptive_median", + start_timing: int = 10, + rate: float = 0.1, + noise_var_override: Optional[float] = None, + lcb_method: Literal["sample", "optimize"] = "sample", + ): + super().__init__( + delta=delta, + beta_scale=beta_scale, + beta_log_multiplier=beta_log_multiplier, + beta_log_denominator=beta_log_denominator, + beta_min=beta_min, + beta_t_offset=beta_t_offset, + ) + self.n_samples_lcb = n_samples_lcb + self.batch_size = batch_size + self.threshold_mode = threshold_mode + self.start_timing = start_timing + self.rate = rate + self.lcb_method = lcb_method + # If set, overrides the GP's learned noise variance (the reference + # implementation uses ~1e-6 for noise-free synthetic functions). + self.noise_var_override = noise_var_override + + # Internal state persisted across evaluate() calls. + self._prev_model: Optional[Model] = None + self._prev_incumbent_idx: Optional[int] = None + self._prev_n_experiments: int = 0 + self._prev_input_preprocessing_specs: Optional[Dict] = None + self._seq_values: List[float] = [] + + @staticmethod + def _get_noise_variance(model) -> float: + """Return GP noise variance in the original (un-standardized) output scale. + + BoTorch models with a ``Standardize`` outcome transform learn noise in + standardized space, but posteriors are returned in the original scale, + so the noise must be un-standardized to match: + ``noise_var_original = noise_var_standardized * y_std**2``. + """ + try: + noise_var = model.likelihood.noise.item() + except Exception: + return 1e-4 + + if hasattr(model, "outcome_transform"): + try: + y_std = model.outcome_transform.stdvs.item() + noise_var = noise_var * y_std**2 + except Exception: + pass + + return noise_var + + def _calc_kl_qp_fast( + self, + old_mean: float, + old_var: float, + new_output: float, + noise_var: float, + ) -> float: + """Closed-form KL(q || p) for a single Gaussian observation.""" + precision = 1.0 / noise_var + k = old_var + m = old_mean + trace_term = k / (k + 1.0 / precision) + logdet_term = np.log(1.0 + precision * k) + se_term = k / ((k + 1.0 / precision) ** 2) * (new_output - m) ** 2 + kl = 0.5 * (-trace_term + logdet_term + se_term) + return max(0.0, float(kl)) + + def _compute_ei_diff( + self, + model: Model, + X_new_incumbent: torch.Tensor, + X_old_incumbent: torch.Tensor, + ) -> float: + """Expected improvement from switching incumbents. + + If the incumbent changed, computes E[max(f(x*_new) - f(x*_old), 0)] + under the new GP's joint posterior at the two incumbent locations. + """ + X_pair = torch.cat([X_new_incumbent, X_old_incumbent], dim=0) + with torch.no_grad(): + posterior = model.posterior(X_pair) + mu = posterior.mean.squeeze(-1) + cov = posterior.distribution.covariance_matrix + + g = float((mu[0] - mu[1]).item()) + var_diff = float( + (cov[0, 0] - 2 * cov[0, 1] + cov[1, 1]).item() + ) + + if var_diff <= 0: + # No uncertainty about the difference between the two incumbents. + beta_val = 0.0 + pdf_val = np.sqrt(1.0 / (2.0 * np.pi)) + cdf_val = 1.0 + else: + beta_val = np.sqrt(var_diff) + u = g / beta_val + pdf_val = float(norm.pdf(u)) + cdf_val = float(norm.cdf(u)) + + return float(beta_val * pdf_val + g * cdf_val) + + def evaluate( + self, + strategy, + experiments: pd.DataFrame, + iteration: int, + ) -> Dict[str, Any]: + """Return regret-gap metrics, or an empty dict on the first call / when not applicable.""" + if not strategy.is_fitted or strategy.model is None: + return {} + if len(experiments) < 2: + return {} + if strategy.model.num_outputs != 1: + return {} + + input_keys = strategy.domain.inputs.get_keys() + output_key = strategy.domain.outputs.get_keys()[0] + dimensionality = len(input_keys) + n_experiments = len(experiments) + + incumbent_idx = int(experiments[output_key].idxmin()) + + # First call: save state and return empty. + if self._prev_model is None or n_experiments <= self._prev_n_experiments: + self._save_state(strategy, experiments, incumbent_idx) + return {} + + # Assume one new point added since the last call; take the last. + new_point_idx = n_experiments - 1 + y_new = float(experiments[output_key].iloc[new_point_idx]) + + preprocessing_specs = strategy.input_preprocessing_specs + all_inputs = experiments[input_keys] + transformed = strategy.domain.inputs.transform( + all_inputs, preprocessing_specs + ) + X_all = torch.from_numpy(transformed.values).to(**tkwargs) + + X_prev = X_all[: self._prev_n_experiments] + x_new = X_all[[new_point_idx]] + x_incumbent_new = X_all[[incumbent_idx]] + x_incumbent_old = X_all[[self._prev_incumbent_idx]] + + # Use the old data count for beta, matching the reference implementation. + sqrt_beta = np.sqrt( + self._compute_beta(dimensionality, self._prev_n_experiments) + ) + + with torch.no_grad(): + old_posterior = self._prev_model.posterior(x_new) + old_mean = float(old_posterior.mean.squeeze().item()) + old_var = float(old_posterior.variance.squeeze().item()) + + noise_var = ( + self.noise_var_override + if self.noise_var_override is not None + else self._get_noise_variance(strategy.model) + ) + + # BoTorch with Standardize: noise_var is in standardized space but + # posterior returns original-scale predictions. Un-standardize noise + # so all quantities are in the same scale (matching reference which + # uses normalize_Y=False where everything is natively in raw scale). + y_std = self.get_output_scale(strategy.model) + noise_var_original = noise_var * y_std**2 + + kl = self._calc_kl_qp_fast(old_mean, old_var, y_new, noise_var_original) + + # delta_f: change in predicted incumbent value. + with torch.no_grad(): + old_mu_at_old_inc = float( + self._prev_model.posterior(x_incumbent_old).mean.squeeze().item() + ) + new_mu_at_new_inc = float( + strategy.model.posterior(x_incumbent_new).mean.squeeze().item() + ) + delta_f = abs(old_mu_at_old_inc - new_mu_at_new_inc) + + # kappa: UCB-LCB regret bound using the old model. + bounds = strategy.domain.inputs.get_bounds( + specs=preprocessing_specs + ) + lower = torch.tensor(bounds[0], **tkwargs) + upper = torch.tensor(bounds[1], **tkwargs) + + kappa, _, _ = self._ucb_lcb_regret_bound( + self._prev_model, X_prev, sqrt_beta, lower, upper, + self.n_samples_lcb, self.batch_size, strategy=strategy, + experiments=experiments, + ) + + # ei_diff: expected improvement from switching incumbents. + if incumbent_idx == self._prev_incumbent_idx: + ei_diff = 0.0 + else: + ei_diff = self._compute_ei_diff( + strategy.model, x_incumbent_new, x_incumbent_old + ) + + stopping_value = delta_f + ei_diff + kappa * np.sqrt(0.5 * kl) + + threshold_adaptive = None + threshold_median = None + + if self.threshold_mode in ("adaptive", "adaptive_median"): + # Adaptive threshold in raw scale (reference uses normalize_Y=False). + threshold_adaptive = self._compute_threshold_adaptive( + self._prev_model, x_incumbent_new, + old_var, + noise_var_original, kappa, + ) + + self._seq_values.append(stopping_value) + if self.threshold_mode in ("median", "adaptive_median"): + threshold_median = self._compute_threshold_median( + self._seq_values, self.start_timing, self.rate, + ) + + self._save_state(strategy, experiments, incumbent_idx) + + return { + "stopping_value": stopping_value, + "delta_f": delta_f, + "ei_diff": ei_diff, + "kappa": kappa, + "kl_divergence": kl, + "threshold_adaptive": threshold_adaptive, + "threshold_median": threshold_median, + "noise_variance": noise_var_original, + "seq_values": list(self._seq_values), + } + + def _save_state( + self, + strategy, + experiments: pd.DataFrame, + incumbent_idx: int, + ) -> None: + """Save current state for comparison at the next evaluate() call.""" + self._prev_model = strategy.model + self._prev_n_experiments = len(experiments) + self._prev_incumbent_idx = incumbent_idx + self._prev_input_preprocessing_specs = strategy.input_preprocessing_specs + + def _compute_threshold_adaptive( + self, + old_model: Model, + x_incumbent_new: torch.Tensor, + var_old_new: float, + noise_var: float, + kappa: float, + ) -> Optional[float]: + """Adaptive threshold from Ishibashi et al. (2023), eq. (16). + + All inputs are in raw Y scale (the reference implementation uses + ``normalize_Y=False``). + + Args: + old_model: The previous GP model. + x_incumbent_new: Transformed input of the new incumbent ``(1, d)``. + var_old_new: Old GP's predictive variance at the new data point. + noise_var: GP noise variance. + kappa: UCB-LCB regret bound from the old GP. + + Returns: + The adaptive threshold, or ``None`` when the denominator is + non-positive. + """ + c = np.sqrt(-2.0 * np.log(self.delta)) + precision = 1.0 / noise_var + with torch.no_grad(): + var_old_incumbent = float( + old_model.posterior(x_incumbent_new).variance.squeeze().item() + ) + + denom = np.sqrt(precision) * (var_old_new + noise_var) + if denom <= 0: + return None + eps1 = np.sqrt(var_old_incumbent) * np.sqrt(var_old_new) * c / denom + eps2 = (kappa / 2.0) * np.sqrt(var_old_new) * c / denom + return float(eps1 + eps2) + + @staticmethod + def _compute_threshold_median( + seq_values: List[float], + start_timing: int, + rate: float, + ) -> Optional[float]: + """``rate * median(seq_values[:start_timing])``, or ``None`` if not enough values.""" + if len(seq_values) <= start_timing: + return None + return float(rate * np.median(seq_values[:start_timing])) + + def dumps(self) -> str: + """Dumps the internal state to a base64 encoded pickle string. + + Analogous to ``BotorchSurrogate.dumps``, so the state can be embedded + in JSON. + """ + state = { + "prev_model": self._prev_model, + "prev_incumbent_idx": self._prev_incumbent_idx, + "prev_n_experiments": self._prev_n_experiments, + "prev_input_preprocessing_specs": self._prev_input_preprocessing_specs, + "seq_values": self._seq_values, + } + buffer = io.BytesIO() + torch.save(state, buffer) + return base64.b64encode(buffer.getvalue()).decode() + + def loads(self, data: str) -> None: + """Loads the internal state from a base64 encoded pickle string produced by ``dumps``. + + Args: + data: Base64-encoded string previously returned by :meth:`dumps`. + """ + buffer = io.BytesIO(base64.b64decode(data.encode())) + state = torch.load(buffer, weights_only=False) + self._prev_model = state["prev_model"] + self._prev_incumbent_idx = state["prev_incumbent_idx"] + self._prev_n_experiments = state["prev_n_experiments"] + self._prev_input_preprocessing_specs = state[ + "prev_input_preprocessing_specs" + ] + self._seq_values = state["seq_values"] + + +class LogEIPCEvaluator(TerminationEvaluator): + """Cost-aware stopping criterion (Xie et al., 2025). + + Stops when the maximum log expected-improvement-per-cost over the domain + falls to zero or below — meaning no candidate point's expected improvement + is worth its evaluation cost: + + stop when max_x [ LogEI(x) - alpha * log(c(x)) - log(lambda_cost) ] <= 0 + + Equivalently: stop when ``max_x EI(x) <= lambda_cost * c(x)^alpha``. + + Ideal for chemical experiments where reagent, time, or equipment costs + matter. Single-output minimisation only. + + Args: + lambda_cost: Exchange rate between improvement and cost. Stopping + is triggered when the best expected improvement no longer exceeds + ``lambda_cost * c(x)``. Higher values favour earlier stopping. + Think of it as "the minimum expected improvement per unit cost + that justifies one more experiment". With ``cost_value=1.0`` this + is directly a threshold on EI in objective units (e.g. 0.05 means + "stop when EI < 5% of objective range"). + cost_callable: Optional function ``c(X: Tensor[n, d]) -> Tensor[n]`` + that returns a per-point cost for each candidate in transformed + input space. Enables input-dependent costs (e.g. reaction time + proportional to temperature: ``lambda X: 1.0 + 3.0 * X[:, 0]``). + Takes priority over ``cost_column`` and ``cost_value``. + Not serialisable — use the evaluator directly rather than via + ``LogEIPCCondition`` when input-dependent costs are needed. + cost_column: Name of the column in the experiments DataFrame that + records the actual cost of each completed experiment. The running + mean is used as the cost estimate for future candidates. Takes + priority over ``cost_value`` when the column is present. + cost_value: Fixed scalar cost per experiment used when neither + ``cost_callable`` nor ``cost_column`` is provided. Default ``1.0`` + (uniform cost — criterion degenerates to a pure LogEI threshold). + alpha: Exponent applied to the cost: ``c(x)^alpha``. ``1.0`` (default) + matches the paper. Values < 1 reduce the influence of cost. + n_samples: Number of random domain points used when + ``search_method="sample"`` to approximate the max LogEIPC. + Increase for higher dimensions where default coverage may be sparse. + batch_size: Batch size for evaluating the GP posterior during sampling, + used to control memory usage. + search_method: How the maximum LogEIPC over the domain is found. + ``"sample"`` (default) draws ``n_samples`` random points — fast, + robust, matches the reference implementation. ``"optimize"`` uses + BoFire's acquisition function optimizer via ``_LogEIPCWithCost`` — + more accurate in high dimensions but slower and can misfire if the + optimizer converges to a local maximum. + cost_model: How cost is estimated when ``cost_column`` is provided. + ``"mean"`` (default) uses the scalar mean of past costs — simple + and backward-compatible. ``"gp"`` fits a ``SingleTaskGP`` to the + observed ``(x, log_cost)`` pairs and uses its posterior mean as a + per-point cost callable, capturing spatial variation in cost. + Matches the paper's "unknown cost" approach. Ignored when + ``cost_callable`` is set directly. + + References: + Ament et al. (2023): "Unexpected Improvements to Expected Improvement + for Bayesian Optimization" (NeurIPS 2023) — LogEIPC acquisition + function (log EI per cost). + Xie et al. (2025): "Cost-Aware Stopping for Bayesian Optimization" + (arXiv:2507.12453) — LogEIPC stopping rule (max LogEIPC ≤ 0) + grounded in Pandora's Box theory. + """ + + def __init__( + self, + lambda_cost: float = 1.0, + cost_column: Optional[str] = None, + cost_value: float = 1.0, + alpha: float = 1.0, + n_samples: int = 2000, + batch_size: int = 512, + cost_callable: Optional[Callable[[torch.Tensor], torch.Tensor]] = None, + search_method: Literal["sample", "optimize"] = "sample", + cost_model: Literal["mean", "gp"] = "mean", + ): + # TerminationEvaluator base class holds beta parameters for UCB/LCB; + # LogEIPC does not use them but we call super().__init__() for + # get_output_scale and other shared utilities. + super().__init__() + self.lambda_cost = lambda_cost + self.cost_column = cost_column + self.cost_value = cost_value + self.alpha = alpha + self.n_samples = n_samples + self.batch_size = batch_size + self.cost_callable = cost_callable + self.search_method = search_method + self.cost_model = cost_model + + def _fit_cost_gp( + self, + strategy, + experiments: pd.DataFrame, + ) -> Optional[Callable[[torch.Tensor], torch.Tensor]]: + """Fit a GP to observed (X, log_cost) pairs and return a cost callable. + + Matches the "unknown cost" approach from Xie et al., 2025: fits a + ``SingleTaskGP`` on log-costs and returns a callable that predicts + ``exp(GP_posterior_mean(x))`` at any candidate point. + + Returns ``None`` when there are too few observations or all costs are + non-positive (log undefined). + """ + if self.cost_column is None: + return None + + input_keys = strategy.domain.inputs.get_keys() + valid = experiments[[*input_keys, self.cost_column]].dropna() + costs = valid[self.cost_column].values.astype(float) + + if len(valid) < 2 or (costs <= 0).any(): + return None + + transformed = strategy.domain.inputs.transform( + valid[input_keys], strategy.input_preprocessing_specs + ) + X = torch.tensor(transformed.values, **tkwargs) + log_c = torch.tensor(np.log(costs), **tkwargs).unsqueeze(-1) + + cost_gp = SingleTaskGP(X, log_c) + fit_gpytorch_mll(ExactMarginalLogLikelihood(cost_gp.likelihood, cost_gp)) + cost_gp.eval() + + def cost_callable(X_cands: torch.Tensor) -> torch.Tensor: + with torch.no_grad(): + return torch.exp(cost_gp.posterior(X_cands).mean.squeeze(-1)) + + return cost_callable + + def _get_cost_estimate(self, experiments: pd.DataFrame) -> float: + """Return the cost estimate for future evaluations. + + Uses the mean of past experiment costs when ``cost_column`` is + available, otherwise returns ``cost_value``. + """ + if ( + self.cost_column is not None + and self.cost_column in experiments.columns + ): + costs = experiments[self.cost_column].dropna() + if len(costs) > 0: + return float(costs.mean()) + return self.cost_value + + def _effective_cost_callable( + self, cost_estimate: float + ) -> Callable[[torch.Tensor], torch.Tensor]: + """Return a cost callable suitable for ``_LogEIPCWithCost``. + + If ``cost_callable`` was supplied by the user, return it directly. + Otherwise wrap the scalar ``cost_estimate`` as a constant callable. + """ + if self.cost_callable is not None: + return self.cost_callable + return lambda X: torch.full( + (X.shape[0],), cost_estimate, dtype=X.dtype, device=X.device + ) + + def _compute_max_log_eipc( + self, + model: Model, + best_f: float, + bounds_lower: torch.Tensor, + bounds_upper: torch.Tensor, + cost_estimate: float, + ) -> float: + """Return max LogEIPC over the domain by random sampling. + + Delegates per-batch evaluation to ``compute_log_eipc_at``. + """ + n_dims = bounds_lower.shape[0] + X_random = bounds_lower + (bounds_upper - bounds_lower) * torch.rand( + self.n_samples, n_dims, **tkwargs + ) + + max_log_eipc = float("-inf") + for start in range(0, self.n_samples, self.batch_size): + X_batch = X_random[start : start + self.batch_size] + batch_vals = self.compute_log_eipc_at( + model, X_batch, best_f, cost_estimate + ) + batch_max = float(batch_vals.max()) + if batch_max > max_log_eipc: + max_log_eipc = batch_max + return max_log_eipc + + def _optimize_max_log_eipc( + self, + strategy, + experiments: pd.DataFrame, + best_f: float, + cost_estimate: float, + ) -> float: + """Return max LogEIPC via BoFire's acquisition function optimizer. + + Uses ``_LogEIPCWithCost`` so that input-dependent cost callables are + correctly accounted for during optimisation. + """ + # LogExpectedImprovementPerCost computes LogEI - alpha*log(c(x)). + # lambda_cost is a constant shift that doesn't affect the argmax, + # so we apply it only when reading back the value. + acqf = LogExpectedImprovementPerCost( + model=strategy.model, + best_f=best_f, + cost_callable=self._effective_cost_callable(cost_estimate), + alpha=self.alpha, + ) + candidates = strategy.acqf_optimizer.optimize( + candidate_count=1, + acqfs=[acqf], + domain=strategy.domain, + experiments=experiments, + ) + transformed = strategy.domain.inputs.transform( + candidates, strategy.input_preprocessing_specs + ) + X_best = torch.from_numpy(transformed.values).to(**tkwargs) + with torch.no_grad(): + val = acqf(X_best.unsqueeze(-2)) + return float(val.item()) - float(np.log(self.lambda_cost)) + + def compute_log_eipc_at( + self, + model: Model, + X: torch.Tensor, + best_f: float, + cost_estimate: float, + ) -> np.ndarray: + """Evaluate LogEIPC at given points. Useful for plotting over a dense grid. + + When ``cost_callable`` is set, costs are evaluated per-point; otherwise + the scalar ``cost_estimate`` is used. + + Args: + model: Fitted GP model (e.g. ``strategy.model``). + X: Candidate points shaped ``(n, d)`` in transformed input space. + best_f: Best observed objective value (original scale). + cost_estimate: Scalar cost fallback (used when ``cost_callable`` is None). + + Returns: + Array of shape ``(n,)`` with LogEIPC values. + """ + logei_acqf = LogExpectedImprovement(model=model, best_f=best_f, maximize=False) + with torch.no_grad(): + log_ei = logei_acqf(X.unsqueeze(-2)) # (n,) + costs = self._effective_cost_callable(cost_estimate)(X) # (n,) + log_eipc = ( + log_ei + - self.alpha * torch.log(costs.clamp(min=1e-12)) + - float(np.log(self.lambda_cost)) + ) + return log_eipc.numpy() + + def evaluate( + self, + strategy, + experiments: pd.DataFrame, + iteration: int, + ) -> Dict[str, Any]: + """Return LogEIPC metrics, or an empty dict when not applicable.""" + if not strategy.is_fitted or strategy.model is None: + return {} + if len(experiments) < 2: + return {} + if strategy.model.num_outputs != 1: + return {} + + output_key = strategy.domain.outputs.get_keys()[0] + best_f = float(experiments[output_key].min()) + + cost_estimate = self._get_cost_estimate(experiments) + if cost_estimate <= 0: + return {} + + # When cost_model="gp" and a cost_column is available, replace the + # scalar mean with a GP-fitted per-point cost callable for this call. + # The user-supplied cost_callable always takes priority. + _saved_callable = self.cost_callable + if self.cost_model == "gp" and self.cost_callable is None: + fitted = self._fit_cost_gp(strategy, experiments) + if fitted is not None: + self.cost_callable = fitted + + if self.search_method == "optimize": + max_log_eipc = self._optimize_max_log_eipc( + strategy, experiments, best_f, cost_estimate + ) + else: + bounds = strategy.domain.inputs.get_bounds( + specs=strategy.input_preprocessing_specs + ) + lower = torch.tensor(bounds[0], **tkwargs) + upper = torch.tensor(bounds[1], **tkwargs) + max_log_eipc = self._compute_max_log_eipc( + strategy.model, best_f, lower, upper, cost_estimate + ) + + self.cost_callable = _saved_callable # restore after GP override + + return { + "max_log_eipc": max_log_eipc, + "best_f": best_f, + "cost_estimate": cost_estimate, + "lambda_cost": self.lambda_cost, + } diff --git a/bofire/termination/thresholds.py b/bofire/termination/thresholds.py new file mode 100644 index 000000000..140aea050 --- /dev/null +++ b/bofire/termination/thresholds.py @@ -0,0 +1,72 @@ +"""Threshold helpers used by termination conditions. + +These pure numeric helpers translate various noise / variability estimates +into a stopping threshold (``epsilon_BO``) consumed by +:class:`UCBLCBRegretBoundCondition`. +""" + +from typing import List, Optional + +import numpy as np +import pandas as pd + + +def compute_threshold_noise( + noise_variance: Optional[float], + threshold_factor: float = 1.0, +) -> Optional[float]: + """Compute ``threshold_factor * noise_variance``. + + Args: + noise_variance: Observation noise variance, or ``None`` when the + GP estimate is unavailable. + threshold_factor: Multiplier applied to ``noise_variance``. + + Returns: + The threshold, or ``None`` if ``noise_variance`` is unavailable or + non-positive. + """ + if noise_variance is None or noise_variance <= 0: + return None + return threshold_factor * noise_variance + + +def compute_threshold_cv( + experiments: pd.DataFrame, + output_key: str, + cv_fold_columns: List[str], + threshold_factor: float = 1.0, +) -> Optional[float]: + """Compute a threshold from cross-validation fold variability. + + Uses the corrected std of the incumbent's per-fold scores + (C. Nadeau and Y. Bengio, NeurIPS 2003): + ``threshold = threshold_factor * sqrt(1/K + 1/(K-1)) * std(fold_scores)``. + The incumbent is the row minimising ``output_key``. + + Args: + experiments: Experiments conducted so far. + output_key: Output column used to locate the incumbent. + cv_fold_columns: Columns containing the per-fold CV scores. + threshold_factor: Multiplier (``decay`` in Makarova et al. 2022). + + Returns: + The corrected CV threshold, or ``None`` if fold scores contain NaN + or have zero variability. + """ + y_values = experiments[output_key].dropna() + if len(y_values) < 1: + return None + incumbent_idx = y_values.idxmin() + fold_scores = ( + experiments.loc[incumbent_idx, cv_fold_columns] + .values.astype(float) + ) + if np.any(np.isnan(fold_scores)): + return None + k = len(cv_fold_columns) + correction = np.sqrt(1.0 / k + 1.0 / (k - 1)) + fold_std = float(np.std(fold_scores, ddof=0)) + if fold_std <= 0: + return None + return float(threshold_factor * correction * fold_std) diff --git a/tests/bofire/data_models/specs/conditions.py b/tests/bofire/data_models/specs/conditions.py index 60294226d..b548f7015 100644 --- a/tests/bofire/data_models/specs/conditions.py +++ b/tests/bofire/data_models/specs/conditions.py @@ -1,8 +1,10 @@ from bofire.data_models.strategies.api import ( AlwaysTrueCondition, CombiCondition, + ExpMinRegretGapCondition, FeasibleExperimentCondition, NumberOfExperimentsCondition, + UCBLCBRegretBoundCondition, ) from tests.bofire.data_models.specs.specs import Specs @@ -34,3 +36,53 @@ FeasibleExperimentCondition, lambda: {"n_required_feasible_experiments": 3, "threshold": 0.95}, ) + +specs.add_valid( + UCBLCBRegretBoundCondition, + lambda: { + "noise_variance": 0.1, + "threshold_factor": 2.0, + "cv_fold_columns": None, + "topq": 1.0, + "min_topq": 20, + "min_experiments": 10, + }, +) + +specs.add_valid( + UCBLCBRegretBoundCondition, + lambda: { + "noise_variance": "cv", + "threshold_factor": 0.5, + "cv_fold_columns": ["f0", "f1", "f2", "f3", "f4"], + "topq": 1.0, + "min_topq": 20, + "min_experiments": 5, + }, +) + +specs.add_valid( + ExpMinRegretGapCondition, + lambda: { + "threshold_mode": "adaptive", + "delta": 0.1, + "rate": 0.1, + "start_timing": 10, + "min_experiments": 5, + "beta_scale": 1.0, + "n_samples_lcb": 1000, + }, +) + +specs.add_valid( + ExpMinRegretGapCondition, + lambda: { + "threshold_mode": "median", + "delta": 0.1, + "rate": 0.05, + "start_timing": 15, + "min_experiments": 5, + "beta_scale": 1.0, + "n_samples_lcb": 1000, + }, +) diff --git a/tests/bofire/strategies/stepwise/test_stepwise.py b/tests/bofire/strategies/stepwise/test_stepwise.py index 8a7e17565..46bc65ff2 100644 --- a/tests/bofire/strategies/stepwise/test_stepwise.py +++ b/tests/bofire/strategies/stepwise/test_stepwise.py @@ -130,6 +130,9 @@ def test_StepWiseStrategy_get_step(n_experiments, expected_strategy): assert strategy.surrogates_specs == _strategy.surrogate_specs # type: ignore +from bofire.strategies.stepwise.stepwise import OptimizationComplete + + def test_StepWiseStrategy_get_step_invalid(): benchmark = Himmelblau() experiments = benchmark.f(benchmark.domain.inputs.sample(12), return_complete=True) @@ -151,7 +154,7 @@ def test_StepWiseStrategy_get_step_invalid(): ) strategy = cast(strategies.StepwiseStrategy, strategies.map(data_model)) strategy.tell(experiments) - with pytest.raises(ValueError, match="No condition could be satisfied."): + with pytest.raises(OptimizationComplete): strategy.get_step() diff --git a/tests/bofire/termination/__init__.py b/tests/bofire/termination/__init__.py new file mode 100644 index 000000000..c8a9cd030 --- /dev/null +++ b/tests/bofire/termination/__init__.py @@ -0,0 +1 @@ +"""Tests for the termination module.""" diff --git a/tests/bofire/termination/test_condition.py b/tests/bofire/termination/test_condition.py new file mode 100644 index 000000000..f8eb347c0 --- /dev/null +++ b/tests/bofire/termination/test_condition.py @@ -0,0 +1,805 @@ +"""Tests for UCBLCBRegretBoundCondition, ExpMinRegretGapCondition, and StepwiseStrategy termination.""" + +import numpy as np +import pandas as pd +import pytest + +from bofire.benchmarks.single import Himmelblau +from bofire.data_models.strategies.api import ( + AlwaysTrueCondition, + ExpMinRegretGapCondition, + LogEIPCCondition, + NumberOfExperimentsCondition, + RandomStrategy as RandomStrategyDataModel, + SoboStrategy as SoboStrategyDataModel, + Step, + StepwiseStrategy as StepwiseStrategyDataModel, + UCBLCBRegretBoundCondition, +) +from bofire.strategies.api import ( + OptimizationComplete, + RandomStrategy, + SoboStrategy, + StepwiseStrategy, +) + + +@pytest.fixture +def benchmark(): + return Himmelblau() + + +class TestUCBLCBRegretBoundConditionDataModel: + """Tests for the condition data model (serialization, defaults, etc.).""" + + def test_defaults(self): + cond = UCBLCBRegretBoundCondition() + assert cond.noise_variance is None + assert cond.threshold_factor == 1.0 + assert cond.min_experiments == 5 + + def test_custom_params(self): + cond = UCBLCBRegretBoundCondition( + noise_variance=0.1, + threshold_factor=2.0, + min_experiments=10, + ) + assert cond.noise_variance == 0.1 + assert cond.threshold_factor == 2.0 + assert cond.min_experiments == 10 + + def test_serialization(self): + cond = UCBLCBRegretBoundCondition(noise_variance=0.1, threshold_factor=2.0) + data = cond.model_dump() + restored = UCBLCBRegretBoundCondition(**data) + assert restored.noise_variance == cond.noise_variance + assert restored.threshold_factor == cond.threshold_factor + + def test_returns_true_without_strategy(self, benchmark): + """Without a strategy kwarg, condition returns True (keep going).""" + cond = UCBLCBRegretBoundCondition() + assert cond.evaluate(benchmark.domain, None) is True + + def test_returns_true_with_unfitted_strategy(self, benchmark): + """With an unfitted strategy, condition returns True (keep going).""" + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + cond = UCBLCBRegretBoundCondition() + assert cond.evaluate(benchmark.domain, None, strategy=strategy) is True + + def test_returns_true_with_few_experiments(self, benchmark): + """With fewer than min_experiments, condition returns True.""" + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(3), return_complete=True) + + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + cond = UCBLCBRegretBoundCondition(min_experiments=10) + assert ( + cond.evaluate(benchmark.domain, experiments, strategy=strategy) + is True + ) + + def test_evaluates_regret_bound(self, benchmark): + """With a fitted strategy and enough data, condition computes the regret bound.""" + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(10), return_complete=True) + + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + # Very tight threshold (manual noise_variance mode): should NOT terminate + cond_tight = UCBLCBRegretBoundCondition( + noise_variance=1e-10, threshold_factor=1.0, min_experiments=5 + ) + assert ( + cond_tight.evaluate(benchmark.domain, experiments, strategy=strategy) + is True + ) + + # Very generous threshold (manual noise_variance mode): should terminate + cond_generous = UCBLCBRegretBoundCondition( + noise_variance=1e6, threshold_factor=1.0, min_experiments=5 + ) + assert ( + cond_generous.evaluate(benchmark.domain, experiments, strategy=strategy) + is False + ) + + def test_gp_noise_threshold(self, benchmark): + """Default (noise_variance=None) uses GP estimated noise for threshold.""" + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(10), return_complete=True) + + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + # Very large threshold_factor with GP noise → should terminate + cond_generous = UCBLCBRegretBoundCondition( + threshold_factor=1e8, min_experiments=5 + ) + assert cond_generous.noise_variance is None # GP noise mode + assert ( + cond_generous.evaluate(benchmark.domain, experiments, strategy=strategy) + is False + ) + + # Default threshold_factor (1.0) with GP noise ~1e-4 → should NOT terminate + cond_default = UCBLCBRegretBoundCondition(min_experiments=5) + assert ( + cond_default.evaluate(benchmark.domain, experiments, strategy=strategy) + is True + ) + + def test_cv_mode_validation(self): + """noise_variance='cv' requires cv_fold_columns with >= 2 columns.""" + with pytest.raises(ValueError, match="cv_fold_columns"): + UCBLCBRegretBoundCondition(noise_variance="cv") + + with pytest.raises(ValueError, match="cv_fold_columns"): + UCBLCBRegretBoundCondition( + noise_variance="cv", cv_fold_columns=["fold_0"] + ) + + # Valid: 2+ fold columns + cond = UCBLCBRegretBoundCondition( + noise_variance="cv", + cv_fold_columns=["fold_0", "fold_1", "fold_2"], + ) + assert cond.noise_variance == "cv" + assert len(cond.cv_fold_columns) == 3 + + def test_cv_mode_serialization(self): + """CV mode conditions serialize and deserialize correctly.""" + cond = UCBLCBRegretBoundCondition( + noise_variance="cv", + cv_fold_columns=["f0", "f1", "f2", "f3", "f4"], + threshold_factor=0.5, + ) + data = cond.model_dump() + restored = UCBLCBRegretBoundCondition(**data) + assert restored.noise_variance == "cv" + assert restored.cv_fold_columns == ["f0", "f1", "f2", "f3", "f4"] + assert restored.threshold_factor == 0.5 + + def test_cv_mode_threshold(self, benchmark): + """With noise_variance='cv', uses incumbent's CV fold std for threshold.""" + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(10), return_complete=True) + + # Add synthetic fold columns with known variability + n_folds = 5 + fold_cols = [f"y_fold_{i}" for i in range(n_folds)] + rng = np.random.RandomState(42) + for col in fold_cols: + experiments[col] = experiments["y"] + rng.normal(0, 0.5, len(experiments)) + + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + # Very large threshold_factor → threshold >> regret bound → should terminate + cond_generous = UCBLCBRegretBoundCondition( + noise_variance="cv", + cv_fold_columns=fold_cols, + threshold_factor=1e6, + min_experiments=5, + ) + assert ( + cond_generous.evaluate(benchmark.domain, experiments, strategy=strategy) + is False + ) + + # Very small threshold_factor → should NOT terminate + cond_tight = UCBLCBRegretBoundCondition( + noise_variance="cv", + cv_fold_columns=fold_cols, + threshold_factor=1e-10, + min_experiments=5, + ) + assert ( + cond_tight.evaluate(benchmark.domain, experiments, strategy=strategy) + is True + ) + + def test_cv_mode_corrected_std(self, benchmark): + """Verify the corrected std uses the Nadeau & Bengio (2003) formula.""" + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(10), return_complete=True) + + # Set fold scores for the incumbent (min y row) with known values + n_folds = 5 + fold_cols = [f"y_fold_{i}" for i in range(n_folds)] + incumbent_idx = experiments["y"].idxmin() + + # Give all experiments uniform folds, but incumbent gets specific values + for col in fold_cols: + experiments[col] = experiments["y"] # zero std + known_folds = [1.0, 2.0, 3.0, 4.0, 5.0] + for i, col in enumerate(fold_cols): + experiments.loc[incumbent_idx, col] = known_folds[i] + + # Expected corrected std (Nadeau & Bengio, 2003, ddof=0) + k = 5 + correction = np.sqrt(1.0 / k + 1.0 / (k - 1)) + expected_std = float(np.std(known_folds, ddof=0)) + expected_threshold = correction * expected_std + + # With threshold_factor = 1.0, the effective threshold should match + # We use a very generous value to verify the threshold is computed + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + # Verify the expected corrected std is a reasonable value + assert expected_threshold > 0 + assert correction == pytest.approx(np.sqrt(0.2 + 0.25), rel=1e-10) + + def test_topq_defaults(self): + """Default topq=1.0 (no filtering), min_topq=20.""" + cond = UCBLCBRegretBoundCondition() + assert cond.topq == 1.0 + assert cond.min_topq == 20 + + def test_topq_custom(self): + """Custom topq values are accepted and serialized correctly.""" + cond = UCBLCBRegretBoundCondition(topq=0.5, min_topq=10) + assert cond.topq == 0.5 + assert cond.min_topq == 10 + data = cond.model_dump() + restored = UCBLCBRegretBoundCondition(**data) + assert restored.topq == 0.5 + assert restored.min_topq == 10 + + def test_topq_validation(self): + """topq must be in (0, 1].""" + with pytest.raises(Exception): + UCBLCBRegretBoundCondition(topq=0.0) + with pytest.raises(Exception): + UCBLCBRegretBoundCondition(topq=-0.5) + with pytest.raises(Exception): + UCBLCBRegretBoundCondition(topq=1.5) + + def test_topq_filtering_runs(self, benchmark): + """With topq < 1.0, condition fits a separate GP on filtered data.""" + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(30), return_complete=True) + + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + # topq=0.5 with generous threshold → should still terminate + cond = UCBLCBRegretBoundCondition( + noise_variance=1e6, + threshold_factor=1.0, + topq=0.5, + min_topq=5, + min_experiments=5, + ) + assert ( + cond.evaluate(benchmark.domain, experiments, strategy=strategy) + is False + ) + + # topq=0.5 with very tight threshold → should NOT terminate + cond_tight = UCBLCBRegretBoundCondition( + noise_variance=1e-10, + threshold_factor=1.0, + topq=0.5, + min_topq=5, + min_experiments=5, + ) + assert ( + cond_tight.evaluate(benchmark.domain, experiments, strategy=strategy) + is True + ) + + def test_topq_no_filtering_when_1(self, benchmark): + """With topq=1.0 (default), no filtering occurs — uses main strategy.""" + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(10), return_complete=True) + + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + # Generous threshold with topq=1.0 → should terminate + cond = UCBLCBRegretBoundCondition( + noise_variance=1e6, topq=1.0, min_experiments=5, + ) + assert ( + cond.evaluate(benchmark.domain, experiments, strategy=strategy) + is False + ) + + +class TestStepwiseStrategyTermination: + """Integration tests for termination via StepwiseStrategy.""" + + def test_optimization_complete_raised_when_converged(self, benchmark): + """StepwiseStrategy should raise OptimizationComplete when + the UCBLCBRegretBoundCondition returns False and no fallback step exists. + """ + domain = benchmark.domain + + # Step 1: Random sampling (up to 10 experiments) + # Step 2: SOBO with very generous termination (should converge quickly) + data_model = StepwiseStrategyDataModel( + domain=domain, + steps=[ + Step( + strategy_data=RandomStrategyDataModel(domain=domain), + condition=NumberOfExperimentsCondition(n_experiments=10), + ), + Step( + strategy_data=SoboStrategyDataModel(domain=domain), + condition=UCBLCBRegretBoundCondition( + threshold_factor=1e6, + min_experiments=2, + ), + ), + ], + ) + strategy = StepwiseStrategy(data_model=data_model) + + # Run the loop until OptimizationComplete is raised + terminated = False + for i in range(50): + try: + candidates = strategy.ask(1) + except OptimizationComplete: + terminated = True + break + + candidates = candidates[domain.inputs.get_keys()] + experiments = benchmark.f(candidates, return_complete=True) + strategy.tell(experiments) + + assert terminated, "Expected OptimizationComplete to be raised" + assert i < 49, "Should terminate before max iterations" + + def test_no_termination_with_always_true_fallback(self, benchmark): + """With AlwaysTrueCondition as last step, no OptimizationComplete is raised.""" + domain = benchmark.domain + + data_model = StepwiseStrategyDataModel( + domain=domain, + steps=[ + Step( + strategy_data=RandomStrategyDataModel(domain=domain), + condition=NumberOfExperimentsCondition(n_experiments=5), + ), + Step( + strategy_data=SoboStrategyDataModel(domain=domain), + condition=AlwaysTrueCondition(), + ), + ], + ) + strategy = StepwiseStrategy(data_model=data_model) + + # Should run all 8 iterations without raising + for i in range(8): + candidates = strategy.ask(1) + candidates = candidates[domain.inputs.get_keys()] + experiments = benchmark.f(candidates, return_complete=True) + strategy.tell(experiments) + + def test_strategy_passed_to_condition(self, benchmark): + """The strategy should be passed to the condition via kwargs.""" + domain = benchmark.domain + + # Use a condition with a min_experiments higher than what we provide + # to ensure the condition receives and uses the strategy + data_model = StepwiseStrategyDataModel( + domain=domain, + steps=[ + Step( + strategy_data=RandomStrategyDataModel(domain=domain), + condition=NumberOfExperimentsCondition(n_experiments=5), + ), + Step( + strategy_data=SoboStrategyDataModel(domain=domain), + condition=UCBLCBRegretBoundCondition( + threshold_factor=1.0, + min_experiments=100, # Very high: never terminates + ), + ), + ], + ) + strategy = StepwiseStrategy(data_model=data_model) + + # Should run all iterations without raising (min_experiments not met) + for i in range(8): + candidates = strategy.ask(1) + candidates = candidates[domain.inputs.get_keys()] + experiments = benchmark.f(candidates, return_complete=True) + strategy.tell(experiments) + + +class TestExpMinRegretGapConditionDataModel: + """Tests for the ExpMinRegretGapCondition data model.""" + + def test_defaults(self): + cond = ExpMinRegretGapCondition() + assert cond.threshold_mode == "adaptive" + assert cond.delta == 0.1 + assert cond.rate == 0.1 + assert cond.start_timing == 10 + assert cond.min_experiments == 5 + assert cond.beta_scale == 1.0 + assert cond.n_samples_lcb == 1000 + + def test_custom_params(self): + cond = ExpMinRegretGapCondition( + threshold_mode="median", + delta=0.05, + rate=0.2, + start_timing=20, + min_experiments=10, + beta_scale=0.5, + n_samples_lcb=500, + ) + assert cond.threshold_mode == "median" + assert cond.delta == 0.05 + assert cond.rate == 0.2 + assert cond.start_timing == 20 + + def test_serialization_adaptive(self): + cond = ExpMinRegretGapCondition(threshold_mode="adaptive", delta=0.05) + data = cond.model_dump() + restored = ExpMinRegretGapCondition(**data) + assert restored.threshold_mode == "adaptive" + assert restored.delta == 0.05 + + def test_serialization_median(self): + cond = ExpMinRegretGapCondition( + threshold_mode="median", rate=0.2, start_timing=15, + ) + data = cond.model_dump() + restored = ExpMinRegretGapCondition(**data) + assert restored.threshold_mode == "median" + assert restored.rate == 0.2 + assert restored.start_timing == 15 + + def test_invalid_threshold_mode(self): + with pytest.raises(Exception): + ExpMinRegretGapCondition(threshold_mode="invalid") + + def test_returns_true_without_strategy(self, benchmark): + cond = ExpMinRegretGapCondition() + assert cond.evaluate(benchmark.domain, None) is True + + def test_returns_true_with_unfitted_strategy(self, benchmark): + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + cond = ExpMinRegretGapCondition() + assert cond.evaluate(benchmark.domain, None, strategy=strategy) is True + + def test_returns_true_with_few_experiments(self, benchmark): + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(3), return_complete=True) + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + cond = ExpMinRegretGapCondition(min_experiments=10) + assert ( + cond.evaluate(benchmark.domain, experiments, strategy=strategy) + is True + ) + + def test_first_call_returns_true(self, benchmark): + """First call (no previous model) should always return True.""" + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(10), return_complete=True) + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + cond = ExpMinRegretGapCondition(min_experiments=5) + assert ( + cond.evaluate(benchmark.domain, experiments, strategy=strategy) + is True + ) + + def test_evaluator_is_stateful(self, benchmark): + """The same evaluator should be reused across calls.""" + cond = ExpMinRegretGapCondition(min_experiments=5) + + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(10), return_complete=True) + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + cond.evaluate(benchmark.domain, experiments, strategy=strategy) + ev1 = cond._evaluator + + # Second call — same evaluator instance + candidates = strategy.ask(1)[benchmark.domain.inputs.get_keys()] + new_exp = benchmark.f(candidates) + new_xy = pd.concat([candidates, new_exp], axis=1) + experiments2 = pd.concat( + [experiments, new_xy], ignore_index=True, + ) + strategy2 = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy2.tell(experiments2) + + cond.evaluate(benchmark.domain, experiments2, strategy=strategy2) + assert cond._evaluator is ev1 + + def test_adaptive_mode_runs(self, benchmark): + """Adaptive threshold mode should run without errors over 2 iterations.""" + cond = ExpMinRegretGapCondition( + threshold_mode="adaptive", min_experiments=5, + ) + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(10), return_complete=True) + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + # First call: saves state, returns True + result1 = cond.evaluate(benchmark.domain, experiments, strategy=strategy) + assert result1 is True + + # Second call: computes metrics + candidates = strategy.ask(1)[benchmark.domain.inputs.get_keys()] + new_exp = benchmark.f(candidates) + new_xy = pd.concat([candidates, new_exp], axis=1) + experiments2 = pd.concat( + [experiments, new_xy], ignore_index=True, + ) + strategy2 = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy2.tell(experiments2) + + # Should return a boolean + result2 = cond.evaluate(benchmark.domain, experiments2, strategy=strategy2) + assert isinstance(result2, bool) + + def test_median_mode_returns_true_before_start_timing(self, benchmark): + """Median mode should return True before start_timing values collected.""" + cond = ExpMinRegretGapCondition( + threshold_mode="median", start_timing=100, min_experiments=5, + ) + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(10), return_complete=True) + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + cond.evaluate(benchmark.domain, experiments, strategy=strategy) + + # Do a second call to get a stopping value + candidates = strategy.ask(1)[benchmark.domain.inputs.get_keys()] + new_exp = benchmark.f(candidates) + new_xy = pd.concat([candidates, new_exp], axis=1) + experiments2 = pd.concat( + [experiments, new_xy], ignore_index=True, + ) + strategy2 = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy2.tell(experiments2) + + # start_timing=100, only 1 value collected → threshold is None → True + result = cond.evaluate(benchmark.domain, experiments2, strategy=strategy2) + assert result is True + + def test_in_stepwise_strategy(self, benchmark): + """ExpMinRegretGapCondition should work inside StepwiseStrategy.""" + domain = benchmark.domain + + data_model = StepwiseStrategyDataModel( + domain=domain, + steps=[ + Step( + strategy_data=RandomStrategyDataModel(domain=domain), + condition=NumberOfExperimentsCondition(n_experiments=10), + ), + Step( + strategy_data=SoboStrategyDataModel(domain=domain), + condition=ExpMinRegretGapCondition( + threshold_mode="adaptive", + min_experiments=5, + ), + ), + ], + ) + strategy = StepwiseStrategy(data_model=data_model) + + # Should run without errors for several iterations + for i in range(15): + try: + candidates = strategy.ask(1) + except OptimizationComplete: + break + candidates = candidates[domain.inputs.get_keys()] + experiments = benchmark.f(candidates, return_complete=True) + strategy.tell(experiments) + + +class TestLogEIPCConditionDataModel: + """Tests for the LogEIPCCondition data model.""" + + def test_defaults(self): + cond = LogEIPCCondition() + assert cond.lambda_cost == 1.0 + assert cond.cost_column is None + assert cond.cost_value == 1.0 + assert cond.alpha == 1.0 + assert cond.min_experiments == 5 + assert cond.n_samples == 2000 + assert cond.search_method == "sample" + assert cond.cost_model == "mean" + + def test_custom_params(self): + cond = LogEIPCCondition( + lambda_cost=0.1, + cost_column="time_seconds", + cost_value=60.0, + alpha=0.5, + min_experiments=10, + n_samples=500, + ) + assert cond.lambda_cost == 0.1 + assert cond.cost_column == "time_seconds" + assert cond.cost_value == 60.0 + assert cond.alpha == 0.5 + assert cond.min_experiments == 10 + assert cond.n_samples == 500 + + def test_serialization(self): + cond = LogEIPCCondition(lambda_cost=0.5, cost_value=2.0) + data = cond.model_dump() + restored = LogEIPCCondition(**data) + assert restored.lambda_cost == cond.lambda_cost + assert restored.cost_value == cond.cost_value + + def test_returns_true_without_strategy(self, benchmark): + cond = LogEIPCCondition() + assert cond.evaluate(benchmark.domain, None) is True + + def test_returns_true_with_unfitted_strategy(self, benchmark): + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + cond = LogEIPCCondition() + assert cond.evaluate(benchmark.domain, None, strategy=strategy) is True + + def test_returns_true_with_few_experiments(self, benchmark): + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(3), return_complete=True) + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + cond = LogEIPCCondition(min_experiments=10) + assert cond.evaluate(benchmark.domain, experiments, strategy=strategy) is True + + def test_evaluate_returns_bool(self, benchmark): + """evaluate() must always return a bool.""" + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(10), return_complete=True) + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + cond = LogEIPCCondition(min_experiments=5) + result = cond.evaluate(benchmark.domain, experiments, strategy=strategy) + assert isinstance(result, bool) + + def test_generous_lambda_does_not_stop(self, benchmark): + """Very small lambda_cost → EI almost always exceeds cost → keep going.""" + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(10), return_complete=True) + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + cond = LogEIPCCondition(lambda_cost=1e-10, min_experiments=5) + assert cond.evaluate(benchmark.domain, experiments, strategy=strategy) is True + + def test_cost_column_used_when_present(self, benchmark): + """When cost_column is set and populated, it should be used.""" + random = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random.ask(10), return_complete=True) + experiments = experiments.copy() + experiments["cost"] = 5.0 + + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + + cond = LogEIPCCondition(cost_column="cost", min_experiments=5) + result = cond.evaluate(benchmark.domain, experiments, strategy=strategy) + assert isinstance(result, bool) + + def test_in_stepwise_strategy(self, benchmark): + """LogEIPCCondition should work inside a StepwiseStrategy.""" + domain = benchmark.domain + + data_model = StepwiseStrategyDataModel( + domain=domain, + steps=[ + Step( + strategy_data=RandomStrategyDataModel(domain=domain), + condition=NumberOfExperimentsCondition(n_experiments=10), + ), + Step( + strategy_data=SoboStrategyDataModel(domain=domain), + condition=LogEIPCCondition( + lambda_cost=1.0, + min_experiments=5, + ), + ), + ], + ) + strategy = StepwiseStrategy(data_model=data_model) + + for i in range(15): + try: + candidates = strategy.ask(1) + except OptimizationComplete: + break + candidates = candidates[domain.inputs.get_keys()] + experiments = benchmark.f(candidates, return_complete=True) + strategy.tell(experiments) diff --git a/tests/bofire/termination/test_evaluator.py b/tests/bofire/termination/test_evaluator.py new file mode 100644 index 000000000..c83067fa5 --- /dev/null +++ b/tests/bofire/termination/test_evaluator.py @@ -0,0 +1,725 @@ +"""Tests for termination evaluators.""" + +import pandas as pd +import pytest + +from bofire.benchmarks.single import Himmelblau +import numpy as np + +from bofire.data_models.strategies.api import RandomStrategy as RandomStrategyDataModel +from bofire.data_models.strategies.api import SoboStrategy as SoboStrategyDataModel +from bofire.strategies.api import RandomStrategy, SoboStrategy +from bofire.termination.evaluator import ExpMinRegretGapEvaluator, LogEIPCEvaluator, UCBLCBRegretEvaluator + + +@pytest.fixture +def benchmark(): + return Himmelblau() + + +@pytest.fixture +def trained_strategy(benchmark): + """Create a trained SoboStrategy with 10 random initial points.""" + random_strategy = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random_strategy.ask(10), return_complete=True) + + strategy = SoboStrategy(data_model=SoboStrategyDataModel(domain=benchmark.domain)) + strategy.tell(experiments) + return strategy, experiments + + +class TestUCBLCBRegretEvaluator: + """Unit tests for the UCBLCBRegretEvaluator.""" + + def test_evaluate_returns_valid_regret_bound(self, trained_strategy): + """Regret bound must be a non-negative float.""" + strategy, experiments = trained_strategy + evaluator = UCBLCBRegretEvaluator() + + result = evaluator.evaluate(strategy, experiments, 0) + + assert isinstance(result["regret_bound"], float) + assert result["regret_bound"] >= 0 + + def test_returns_all_keys(self, trained_strategy): + """Evaluate must return the complete set of expected keys.""" + strategy, experiments = trained_strategy + evaluator = UCBLCBRegretEvaluator() + + result = evaluator.evaluate(strategy, experiments, 0) + + expected_keys = { + "regret_bound", + "min_ucb_evaluated", + "min_lcb_domain", + "estimated_noise_variance", + "beta", + } + assert expected_keys == set(result.keys()) + + def test_min_lcb_leq_min_ucb(self, trained_strategy): + """min LCB(domain) <= min UCB(evaluated), so regret bound >= 0. + + Since LCB(x) <= UCB(x) for all x, and the domain includes the + evaluated points, min_x LCB(x) <= min_i UCB(x_i). + """ + strategy, experiments = trained_strategy + evaluator = UCBLCBRegretEvaluator() + + result = evaluator.evaluate(strategy, experiments, 0) + + assert result["min_lcb_domain"] <= result["min_ucb_evaluated"] + 1e-6 + + def test_regret_bound_equals_ucb_minus_lcb(self, trained_strategy): + """regret_bound = max(0, min_ucb - min_lcb).""" + strategy, experiments = trained_strategy + evaluator = UCBLCBRegretEvaluator() + + result = evaluator.evaluate(strategy, experiments, 0) + + expected = max(0.0, result["min_ucb_evaluated"] - result["min_lcb_domain"]) + assert abs(result["regret_bound"] - expected) < 1e-10 + + def test_returns_empty_dict_when_not_fitted(self, benchmark): + """Should return empty dict when strategy model is not fitted.""" + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + evaluator = UCBLCBRegretEvaluator() + + result = evaluator.evaluate(strategy, pd.DataFrame(), 0) + + assert result == {} + + def test_returns_empty_dict_with_too_few_experiments(self, trained_strategy): + """Should return empty dict with fewer than 2 experiments.""" + strategy, experiments = trained_strategy + evaluator = UCBLCBRegretEvaluator() + + result = evaluator.evaluate(strategy, experiments.iloc[:1], 0) + + assert result == {} + + def test_beta_computed_from_gp_ucb_formula(self, trained_strategy): + """Beta should follow the GP-UCB formula: 0.2 * 2 * log(d * t^2 * pi^2 / (6 * delta)).""" + strategy, experiments = trained_strategy + evaluator = UCBLCBRegretEvaluator() + + result = evaluator.evaluate(strategy, experiments, 0) + + d = len(strategy.domain.inputs.get_keys()) # 2 for Himmelblau + t = len(experiments) # 10 + expected_beta = 0.2 * 2.0 * np.log(d * t**2 * np.pi**2 / (6.0 * 0.1)) + assert abs(result["beta"] - expected_beta) < 1e-10 + + def test_beta_scales_with_observations(self, benchmark): + """Beta should increase logarithmically with number of observations.""" + random_strategy = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + evaluator = UCBLCBRegretEvaluator() + + # Fit with 5 points + exp5 = benchmark.f(random_strategy.ask(5), return_complete=True) + strategy5 = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy5.tell(exp5) + result5 = evaluator.evaluate(strategy5, exp5, 0) + + # Fit with 15 points + exp15 = benchmark.f(random_strategy.ask(15), return_complete=True) + strategy15 = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy15.tell(exp15) + result15 = evaluator.evaluate(strategy15, exp15, 0) + + # More observations → larger beta (logarithmic growth) + assert result15["beta"] > result5["beta"] + + def test_noise_variance_estimated(self, trained_strategy): + """Noise variance should be estimated from the GP likelihood.""" + strategy, experiments = trained_strategy + evaluator = UCBLCBRegretEvaluator() + + result = evaluator.evaluate(strategy, experiments, 0) + + assert result["estimated_noise_variance"] > 0 + + +class TestRegretBoundConvergence: + """Integration tests verifying regret bound behavior over BO iterations.""" + + def test_regret_bound_generally_decreases(self, benchmark): + """Over many BO iterations, the regret bound should generally decrease. + + After fitting with more data, the GP becomes more certain and the + gap between UCB and LCB shrinks. Note: beta grows logarithmically + with t (GP-UCB formula), so with few iterations the beta increase + may initially outpace uncertainty reduction. Over enough iterations, + the GP variance decrease dominates. + + We use enough initial data for a well-fitted GP on Himmelblau (2D), + then check that regret bound decreases overall across BO iterations. + We compare the minimum regret bound in the last 3 iterations vs the + first computed regret bound — the minimum should be strictly smaller, + showing the bound is decreasing at least some of the time. + """ + import torch + + torch.manual_seed(42) + + random_strategy = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random_strategy.ask(20), return_complete=True) + + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + evaluator = UCBLCBRegretEvaluator() + evaluator.n_samples_lcb = 500 # enough samples for stable LCB + + regret_bounds = [] + for i in range(8): + result = evaluator.evaluate(strategy, strategy.experiments, i) + regret_bounds.append(result["regret_bound"]) + + # Run one BO iteration + candidates = strategy.ask(1) + candidates = candidates[benchmark.domain.inputs.get_keys()] + new_experiments = benchmark.f(candidates) + new_xy = pd.concat([candidates, new_experiments], axis=1) + strategy.tell(pd.concat([strategy.experiments, new_xy])) + + # The minimum regret bound across all iterations should be + # smaller than the first regret bound value (bound decreases + # at least some of the time over BO iterations) + first_rb = regret_bounds[0] + min_all = min(regret_bounds[1:]) + assert min_all < first_rb + + +class TestExpMinRegretGapEvaluator: + """Unit tests for the ExpMinRegretGapEvaluator (Ishibashi et al. 2023).""" + + @pytest.fixture + def bo_loop_strategies(self, benchmark): + """Simulate 2 BO iterations, returning (strategy_iter1, exp1, strategy_iter2, exp2). + + Iter 1: 10 random points, fit GP. + Iter 2: ask 1 candidate → evaluate → refit GP with 11 points. + """ + random_strategy = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments1 = benchmark.f(random_strategy.ask(10), return_complete=True) + + strategy1 = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy1.tell(experiments1) + + # Get a candidate and evaluate it + candidates = strategy1.ask(1) + candidates = candidates[benchmark.domain.inputs.get_keys()] + new_exp = benchmark.f(candidates) + new_xy = pd.concat([candidates, new_exp], axis=1) + experiments2 = pd.concat( + [experiments1, new_xy], ignore_index=True, + ) + + strategy2 = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy2.tell(experiments2) + + return strategy1, experiments1, strategy2, experiments2 + + def test_first_call_returns_empty(self, trained_strategy): + """First call should return empty dict (no previous model to compare).""" + strategy, experiments = trained_strategy + evaluator = ExpMinRegretGapEvaluator() + + result = evaluator.evaluate(strategy, experiments, 0) + + assert result == {} + + def test_second_call_returns_metrics(self, bo_loop_strategies): + """Second call should return the full set of stopping metrics.""" + strategy1, exp1, strategy2, exp2 = bo_loop_strategies + evaluator = ExpMinRegretGapEvaluator() + + # First call: saves state + result1 = evaluator.evaluate(strategy1, exp1, 0) + assert result1 == {} + + # Second call: computes metrics + result2 = evaluator.evaluate(strategy2, exp2, 1) + assert result2 != {} + + expected_keys = { + "stopping_value", + "delta_f", + "ei_diff", + "kappa", + "kl_divergence", + "threshold_adaptive", + "threshold_median", + "noise_variance", + "seq_values", + } + assert expected_keys == set(result2.keys()) + + def test_stopping_value_non_negative(self, bo_loop_strategies): + """Stopping value must be non-negative.""" + strategy1, exp1, strategy2, exp2 = bo_loop_strategies + evaluator = ExpMinRegretGapEvaluator() + + evaluator.evaluate(strategy1, exp1, 0) + result = evaluator.evaluate(strategy2, exp2, 1) + + assert result["stopping_value"] >= 0 + + def test_components_non_negative(self, bo_loop_strategies): + """All components (delta_f, ei_diff, kappa, kl) must be non-negative.""" + strategy1, exp1, strategy2, exp2 = bo_loop_strategies + evaluator = ExpMinRegretGapEvaluator() + + evaluator.evaluate(strategy1, exp1, 0) + result = evaluator.evaluate(strategy2, exp2, 1) + + assert result["delta_f"] >= 0 + assert result["ei_diff"] >= 0 + assert result["kappa"] >= 0 + assert result["kl_divergence"] >= 0 + + def test_stopping_value_is_sum_of_components(self, bo_loop_strategies): + """value = delta_f + ei_diff + kappa * sqrt(KL/2).""" + strategy1, exp1, strategy2, exp2 = bo_loop_strategies + evaluator = ExpMinRegretGapEvaluator() + + evaluator.evaluate(strategy1, exp1, 0) + result = evaluator.evaluate(strategy2, exp2, 1) + + expected = ( + result["delta_f"] + + result["ei_diff"] + + result["kappa"] * np.sqrt(0.5 * result["kl_divergence"]) + ) + assert abs(result["stopping_value"] - expected) < 1e-10 + + def test_adaptive_threshold_computed(self, bo_loop_strategies): + """Adaptive threshold should be a positive float.""" + strategy1, exp1, strategy2, exp2 = bo_loop_strategies + evaluator = ExpMinRegretGapEvaluator() + + evaluator.evaluate(strategy1, exp1, 0) + result = evaluator.evaluate(strategy2, exp2, 1) + + assert result["threshold_adaptive"] is not None + assert result["threshold_adaptive"] > 0 + + def test_median_threshold_none_before_start_timing(self, bo_loop_strategies): + """Median threshold should be None when fewer than start_timing values.""" + strategy1, exp1, strategy2, exp2 = bo_loop_strategies + evaluator = ExpMinRegretGapEvaluator() + evaluator.start_timing = 100 # much more than 1 value + + evaluator.evaluate(strategy1, exp1, 0) + result = evaluator.evaluate(strategy2, exp2, 1) + + assert result["threshold_median"] is None + + def test_seq_values_accumulate(self, benchmark): + """Sequence of stopping values should grow with each call.""" + random_strategy = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random_strategy.ask(10), return_complete=True) + evaluator = ExpMinRegretGapEvaluator() + + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + evaluator.evaluate(strategy, experiments, 0) # first call → empty + + for i in range(3): + candidates = strategy.ask(1) + candidates = candidates[benchmark.domain.inputs.get_keys()] + new_exp = benchmark.f(candidates) + new_xy = pd.concat([candidates, new_exp], axis=1) + experiments = pd.concat( + [experiments, new_xy], ignore_index=True, + ) + + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + result = evaluator.evaluate(strategy, experiments, i + 1) + assert len(result["seq_values"]) == i + 1 + + def test_median_threshold_after_start_timing(self, benchmark): + """Once enough values are collected, median threshold should be computed.""" + random_strategy = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random_strategy.ask(10), return_complete=True) + evaluator = ExpMinRegretGapEvaluator() + evaluator.start_timing = 3 # low for tests + evaluator.rate = 0.1 + + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + evaluator.evaluate(strategy, experiments, 0) + + for i in range(5): + candidates = strategy.ask(1) + candidates = candidates[benchmark.domain.inputs.get_keys()] + new_exp = benchmark.f(candidates) + new_xy = pd.concat([candidates, new_exp], axis=1) + experiments = pd.concat( + [experiments, new_xy], ignore_index=True, + ) + + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy.tell(experiments) + result = evaluator.evaluate(strategy, experiments, i + 1) + + # After 5 iterations with start_timing=3, median threshold should exist + assert result["threshold_median"] is not None + assert result["threshold_median"] > 0 + # threshold_median = rate * median(first 3 values) + expected = 0.1 * np.median(result["seq_values"][:3]) + assert abs(result["threshold_median"] - expected) < 1e-10 + + def test_kl_divergence_formula(self): + """Test the fast KL divergence formula against known values.""" + evaluator = ExpMinRegretGapEvaluator() + + # When observation equals the GP mean, KL should be small (only from + # the variance reduction term) + kl = evaluator._calc_kl_qp_fast( + old_mean=1.0, old_var=1.0, new_output=1.0, noise_var=0.1 + ) + assert kl >= 0 + + # KL should increase when observation is far from GP mean + kl_far = evaluator._calc_kl_qp_fast( + old_mean=1.0, old_var=1.0, new_output=10.0, noise_var=0.1 + ) + assert kl_far > kl + + # KL should be 0 when old variance is 0 (degenerate case) + kl_zero = evaluator._calc_kl_qp_fast( + old_mean=1.0, old_var=0.0, new_output=1.0, noise_var=0.1 + ) + assert kl_zero == 0.0 + + def test_returns_empty_unfitted(self, benchmark): + """Should return empty dict when strategy is not fitted.""" + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + evaluator = ExpMinRegretGapEvaluator() + + result = evaluator.evaluate(strategy, pd.DataFrame(), 0) + + assert result == {} + + def test_returns_empty_single_experiment(self, benchmark): + """Should return empty dict with only 1 experiment.""" + random_strategy = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random_strategy.ask(1), return_complete=True) + + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + # Can't tell with 1 experiment on Himmelblau (needs >= 2 for GP) + evaluator = ExpMinRegretGapEvaluator() + result = evaluator.evaluate(strategy, experiments, 0) + + assert result == {} + + def test_dumps_and_loads(self, bo_loop_strategies): + """State should survive a dumps/loads round-trip.""" + strategy1, exp1, strategy2, exp2 = bo_loop_strategies + evaluator = ExpMinRegretGapEvaluator() + + evaluator.evaluate(strategy1, exp1, 0) + evaluator.evaluate(strategy2, exp2, 1) + + # Serialize state + data = evaluator.dumps() + assert isinstance(data, str) + + # Load into a fresh evaluator + evaluator2 = ExpMinRegretGapEvaluator() + evaluator2.loads(data) + + # Internal state should match + assert evaluator2._prev_incumbent_idx == evaluator._prev_incumbent_idx + assert evaluator2._prev_n_experiments == evaluator._prev_n_experiments + assert evaluator2._seq_values == evaluator._seq_values + assert evaluator2._prev_model is not None + + def test_evaluate_after_loads(self, benchmark): + """Evaluator should produce valid metrics after loading state.""" + random_strategy = RandomStrategy( + data_model=RandomStrategyDataModel(domain=benchmark.domain) + ) + experiments = benchmark.f(random_strategy.ask(10), return_complete=True) + + # Iteration 1 + strategy1 = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy1.tell(experiments) + evaluator = ExpMinRegretGapEvaluator() + evaluator.evaluate(strategy1, experiments, 0) + + # Iteration 2 + candidates = strategy1.ask(1) + candidates = candidates[benchmark.domain.inputs.get_keys()] + new_exp = benchmark.f(candidates) + new_xy = pd.concat([candidates, new_exp], axis=1) + experiments2 = pd.concat([experiments, new_xy], ignore_index=True) + + strategy2 = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy2.tell(experiments2) + evaluator.evaluate(strategy2, experiments2, 1) + + # Serialize and simulate restart + data = evaluator.dumps() + + evaluator_new = ExpMinRegretGapEvaluator() + evaluator_new.loads(data) + + # Iteration 3 — should work with the restored evaluator + candidates3 = strategy2.ask(1) + candidates3 = candidates3[benchmark.domain.inputs.get_keys()] + new_exp3 = benchmark.f(candidates3) + new_xy3 = pd.concat([candidates3, new_exp3], axis=1) + experiments3 = pd.concat([experiments2, new_xy3], ignore_index=True) + + strategy3 = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + strategy3.tell(experiments3) + result = evaluator_new.evaluate(strategy3, experiments3, 2) + + assert result != {} + assert result["stopping_value"] >= 0 + assert len(result["seq_values"]) == 2 # values from iter 2 + iter 3 + + +class TestLogEIPCEvaluator: + """Unit tests for the LogEIPCEvaluator (Xie et al., 2025).""" + + def test_evaluate_returns_expected_keys(self, trained_strategy): + strategy, experiments = trained_strategy + evaluator = LogEIPCEvaluator() + + result = evaluator.evaluate(strategy, experiments, 0) + + assert set(result.keys()) == {"max_log_eipc", "best_f", "cost_estimate", "lambda_cost"} + + def test_max_log_eipc_is_float(self, trained_strategy): + strategy, experiments = trained_strategy + evaluator = LogEIPCEvaluator() + + result = evaluator.evaluate(strategy, experiments, 0) + + assert isinstance(result["max_log_eipc"], float) + + def test_best_f_equals_min_observed(self, trained_strategy): + strategy, experiments = trained_strategy + evaluator = LogEIPCEvaluator() + output_key = strategy.domain.outputs.get_keys()[0] + + result = evaluator.evaluate(strategy, experiments, 0) + + assert result["best_f"] == pytest.approx(experiments[output_key].min()) + + def test_returns_empty_when_not_fitted(self, benchmark): + strategy = SoboStrategy( + data_model=SoboStrategyDataModel(domain=benchmark.domain) + ) + evaluator = LogEIPCEvaluator() + + result = evaluator.evaluate(strategy, pd.DataFrame(), 0) + + assert result == {} + + def test_returns_empty_with_few_experiments(self, trained_strategy): + strategy, experiments = trained_strategy + evaluator = LogEIPCEvaluator() + + result = evaluator.evaluate(strategy, experiments.iloc[:1], 0) + + assert result == {} + + def test_cost_column_used_when_present(self, trained_strategy): + strategy, experiments = trained_strategy + experiments = experiments.copy() + experiments["cost"] = 2.5 + + evaluator = LogEIPCEvaluator(cost_column="cost") + result = evaluator.evaluate(strategy, experiments, 0) + + assert result["cost_estimate"] == pytest.approx(2.5) + + def test_cost_value_fallback_when_no_column(self, trained_strategy): + strategy, experiments = trained_strategy + evaluator = LogEIPCEvaluator(cost_value=3.0) + + result = evaluator.evaluate(strategy, experiments, 0) + + assert result["cost_estimate"] == pytest.approx(3.0) + + def test_higher_lambda_cost_decreases_max_log_eipc(self, trained_strategy): + """Higher lambda_cost shifts the threshold up, lowering max_log_eipc.""" + strategy, experiments = trained_strategy + + result_low = LogEIPCEvaluator(lambda_cost=0.01).evaluate(strategy, experiments, 0) + result_high = LogEIPCEvaluator(lambda_cost=100.0).evaluate(strategy, experiments, 0) + + assert result_high["max_log_eipc"] < result_low["max_log_eipc"] + + def test_returns_empty_with_zero_cost(self, trained_strategy): + """Zero cost_value should return empty dict (undefined log(0)).""" + strategy, experiments = trained_strategy + evaluator = LogEIPCEvaluator(cost_value=0.0) + + result = evaluator.evaluate(strategy, experiments, 0) + + assert result == {} + + def test_cost_callable_used_when_provided(self, trained_strategy): + """cost_callable should override cost_value for per-point evaluation.""" + strategy, experiments = trained_strategy + # Callable returns constant 2.5 — should give same result as cost_value=2.5 + ev_callable = LogEIPCEvaluator( + cost_callable=lambda X: X.new_full((X.shape[0],), 2.5) + ) + ev_scalar = LogEIPCEvaluator(cost_value=2.5) + + r_callable = ev_callable.evaluate(strategy, experiments, 0) + r_scalar = ev_scalar.evaluate(strategy, experiments, 0) + + assert r_callable["max_log_eipc"] == pytest.approx( + r_scalar["max_log_eipc"], abs=0.5 + ) + + def test_search_method_optimize_returns_valid_result(self, trained_strategy): + """search_method='optimize' should return a valid max_log_eipc.""" + strategy, experiments = trained_strategy + evaluator = LogEIPCEvaluator(search_method="optimize") + + result = evaluator.evaluate(strategy, experiments, 0) + + assert isinstance(result["max_log_eipc"], float) + + def test_search_method_sample_and_optimize_agree(self, trained_strategy): + """'sample' and 'optimize' should produce results with the same sign.""" + strategy, experiments = trained_strategy + + r_sample = LogEIPCEvaluator( + lambda_cost=1.0, cost_value=1.0, search_method="sample" + ).evaluate(strategy, experiments, 0) + r_opt = LogEIPCEvaluator( + lambda_cost=1.0, cost_value=1.0, search_method="optimize" + ).evaluate(strategy, experiments, 0) + + # Both should agree on stop vs continue + assert (r_sample["max_log_eipc"] > 0) == (r_opt["max_log_eipc"] > 0) + + def test_cost_model_gp_returns_valid_result(self, trained_strategy): + """cost_model='gp' should fit a cost GP and return a valid result.""" + strategy, experiments = trained_strategy + experiments = experiments.copy() + experiments["cost"] = np.random.uniform(1.0, 3.0, len(experiments)) + + evaluator = LogEIPCEvaluator( + cost_column="cost", cost_model="gp" + ) + result = evaluator.evaluate(strategy, experiments, 0) + + assert isinstance(result["max_log_eipc"], float) + + def test_cost_model_gp_doesnt_mutate_cost_callable(self, trained_strategy): + """cost_callable should be None after evaluate() even with cost_model='gp'.""" + strategy, experiments = trained_strategy + experiments = experiments.copy() + experiments["cost"] = 2.0 + + evaluator = LogEIPCEvaluator(cost_column="cost", cost_model="gp") + assert evaluator.cost_callable is None + evaluator.evaluate(strategy, experiments, 0) + assert evaluator.cost_callable is None # restored after call + + def test_cost_model_gp_vs_mean_differ(self, trained_strategy): + """GP cost model should give a different result from scalar mean.""" + strategy, experiments = trained_strategy + experiments = experiments.copy() + # Costs increase with x_1 — spatial variation the GP can learn + experiments["cost"] = 1.0 + experiments["x_1"].abs() + + r_mean = LogEIPCEvaluator( + cost_column="cost", cost_model="mean" + ).evaluate(strategy, experiments, 0) + r_gp = LogEIPCEvaluator( + cost_column="cost", cost_model="gp" + ).evaluate(strategy, experiments, 0) + + # GP-based costs are per-point so max_log_eipc will generally differ + assert r_mean["max_log_eipc"] != pytest.approx(r_gp["max_log_eipc"]) + + +class TestEvaluatorKwargs: + """Tests for overriding evaluator parameters via constructor kwargs.""" + + def test_ucblcb_kwargs_override_defaults(self): + evaluator = UCBLCBRegretEvaluator( + delta=0.05, beta_scale=1.0, n_samples_lcb=500 + ) + assert evaluator.delta == 0.05 + assert evaluator.beta_scale == 1.0 + assert evaluator.n_samples_lcb == 500 + + def test_defaults_preserved_without_kwargs(self): + evaluator = UCBLCBRegretEvaluator() + assert evaluator.delta == 0.1 + assert evaluator.beta_scale == 0.2 + + def test_expmin_kwargs_override_defaults(self): + evaluator = ExpMinRegretGapEvaluator( + delta=0.05, rate=0.2, start_timing=15, beta_scale=0.5 + ) + assert evaluator.delta == 0.05 + assert evaluator.rate == 0.2 + assert evaluator.start_timing == 15 + assert evaluator.beta_scale == 0.5 + assert evaluator._seq_values == [] + + def test_unknown_kwarg_raises(self): + with pytest.raises(TypeError, match="unexpected keyword"): + UCBLCBRegretEvaluator(not_a_real_param=1.0) + + def test_private_kwarg_raises(self): + with pytest.raises(TypeError, match="unexpected keyword"): + ExpMinRegretGapEvaluator(_prev_model="x")