diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b8001cea..37628224e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ and this project adheres to [Pragmatic Versioning](https://github.com/experiment - `WeightedMeanFeature` and `MolecularWeightedMeanFeature` engineered features for weighted-mean behavior. - `plot_gp_slice_plotly` now supports fixed input features that can be a mix of `ContinuousInput` and `CategoricalInput` (with string categorical fixed values). - Configurable `noise_constraint` support for GP-based surrogates (`SingleTaskGP`, `MixedSingleTaskGP`, `TanimotoGP`, and `MultiTaskGP`) and corresponding linear/polynomial wrappers. -- Generalized NChooseK constraint support in DoE: `min_count > 0` is now supported, non-zero lower bounds (`lb > 0`) are allowed for NChooseK features, and `nchoosek_constraints_as_bounds` generates deactivation patterns for all activity levels `k ∈ [min_count, max_count]`. When `min_count=0`, the all-zero (fully inactive) pattern is now included naturally. `none_also_valid=True` with `min_count > 0` explicitly adds the all-zero pattern. +- Generalized NChooseK constraint support in DoE: `min_count > 0` is now supported, non-zero lower bounds (`lb > 0`) are allowed for NChooseK features, overlapping NChooseK constraints (shared features) are handled via incremental pairwise merge with consistency filtering, and `nchoosek_constraints_as_bounds` generates deactivation patterns for all activity levels `k ∈ [min_count, max_count]`. ### Changed diff --git a/bofire/strategies/doe/utils.py b/bofire/strategies/doe/utils.py index af8698005..dd5a67dbd 100644 --- a/bofire/strategies/doe/utils.py +++ b/bofire/strategies/doe/utils.py @@ -556,125 +556,190 @@ def hessian(self, x: np.ndarray, *args): return hessian -def check_nchoosek_constraints_as_bounds(domain: Domain) -> None: - """Checks if NChooseK constraints of domain can be formulated as bounds. +def _constraint_patterns( + all_keys: list[str], + constraint: NChooseKConstraint, +) -> list[dict[int, bool]]: + """Generate dicts for one constraint.""" + patterns_of_constraint = [] + n_features = len(constraint.features) + ind = [i for i, key in enumerate(all_keys) if key in constraint.features] + for k in range(max(constraint.min_count, 1), constraint.max_count + 1): + n_inactive = n_features - k + if n_inactive > 0: + for combo in combinations(ind, r=n_inactive): + inactive_set = set(combo) + patterns_of_constraint.append( + {idx: (idx not in inactive_set) for idx in ind} + ) + else: + patterns_of_constraint.append({idx: True for idx in ind}) + return patterns_of_constraint + + +def _merge_two( + patterns_of_constraint_a: list[dict[int, bool]], + patterns_of_constraint_b: list[dict[int, bool]], +) -> list[dict[int, bool]]: + """Merge two pattern lists, filtering conflicts. + + Uses a hash-join on shared feature indices: patterns from A are + bucketed by their assignment to shared keys, so each pattern from B + only needs to merge with the bucket that agrees on those keys. + + pattern_of_constraint_a: {feature_index: is_active} dict from constraint A + pattern_of_constraint_b: {feature_index: is_active} dict from constraint B + + returns: merged {feature_index: is_active} dict if no conflicts, else None + + """ + # Identify shared feature indices between the two sides. + shared = set(patterns_of_constraint_a[0].keys()).intersection( + set(patterns_of_constraint_b[0].keys()) + ) + + patterns_of_merged_constraints = [] + if shared: + # Bucket patterns by their assignment on shared indices. + # if activation_per_feature= {0: True, 1:True , 2: False} is a pattern and shared=[0,2], the key is (True, False) + sorted_shared = sorted(shared) + map_pattern_of_shared_to_global_pattern = { + tuple(activation_per_feature[idx] for idx in sorted_shared): [] + for activation_per_feature in patterns_of_constraint_a + } + for activation_per_feature in patterns_of_constraint_a: + pattern_of_shared = tuple( + activation_per_feature[idx] for idx in sorted_shared + ) + map_pattern_of_shared_to_global_pattern[pattern_of_shared].append( + activation_per_feature + ) + + for activation_per_feature_b in patterns_of_constraint_b: + pattern_of_shared = tuple( + activation_per_feature_b[idx] for idx in sorted_shared + ) + for activation_per_feature_a in map_pattern_of_shared_to_global_pattern.get( + pattern_of_shared, () + ): + merged = dict(activation_per_feature_a) + merged.update(activation_per_feature_b) + patterns_of_merged_constraints.append(merged) + else: + # No shared features — full cross-product. + for activation_per_feature_b in patterns_of_constraint_b: + for activation_per_feature_a in patterns_of_constraint_a: + merged = dict(activation_per_feature_a) + merged.update(activation_per_feature_b) + patterns_of_merged_constraints.append(merged) + + return patterns_of_merged_constraints + + +def _get_nchoosek_combined_patterns( + domain: Domain, +) -> list[tuple[int, ...]]: + """Generate combined deactivation patterns across all NChooseK constraints. + + For each constraint, per-feature active/inactive patterns are generated + for every allowed activity level. When multiple constraints share + features, patterns are merged incrementally (pairwise) rather than via + a single giant Cartesian product, pruning inconsistent combinations + early. + + Consistency: If a feature is shared by multiple constraints, it must be active in all or + inactive in all to yield a valid combined pattern. - For every feature referenced by an NChooseKConstraint the lower bound must - be >= 0 (so that "inactive" variables can be pinned to 0) and the upper - bound must be > 0. The feature sets of different NChooseKConstraints must - be pairwise disjoint. + Returns: + Tuples containing the domain-level indices of features to deactivate + (pin to 0) for one experiment slot. Duplicates are possible when + constraints are disjoint; the caller should deduplicate if needed. + """ + nchoosek_constraints = [ + c for c in domain.constraints if isinstance(c, NChooseKConstraint) + ] + + if not nchoosek_constraints: + return [] + + all_keys = domain.inputs.get_keys() + + # Incremental pairwise merge — each step only materialises a generator + merged_patterns = _constraint_patterns(all_keys, nchoosek_constraints[0]) + for constraint in nchoosek_constraints[1:]: + merged_patterns = _merge_two( + merged_patterns, _constraint_patterns(all_keys, constraint) + ) - Note: a non-zero lower bound (e.g. 0.1) is fine because the bounds-based - approach overrides the bounds of inactive variables to [0, 0] per - experiment while keeping the original [lb, ub] for active variables. + allowed_constraint_patterns = [] + for merged in merged_patterns: + allowed_constraint_patterns.append( + tuple(sorted(idx for idx, active in merged.items() if not active)) + ) + return allowed_constraint_patterns + + +def _build_nchoosek_combined_patterns( + domain: Domain, +) -> list[tuple[int, ...]]: + """Collect combined deactivation patterns. Args: - domain (Domain): Domain whose NChooseK constraints should be checked + domain: Domain whose NChooseK constraints should be combined. + + Returns: + A deduplicated list of inactive-index tuples. + Raises: + ValueError: When no valid combined patterns exist (contradictory + constraints). """ - # collect NChooseK constraints - if len(domain.constraints) == 0: - return + result = list(set(_get_nchoosek_combined_patterns(domain))) - nchoosek_constraints = [] - for c in domain.constraints: - if isinstance(c, NChooseKConstraint): - nchoosek_constraints.append(c) - - if len(nchoosek_constraints) == 0: - return - - # check if the domains of all NChooseK constraints are compatible to linearization - for c in nchoosek_constraints: - for name in np.unique(c.features): - input = domain.inputs.get_by_key(name) - if input.bounds[0] < 0: - raise ValueError( - f"Constraint {c} cannot be formulated as bounds. " - f"Feature '{name}' has lower bound {input.bounds[0]} < 0.", - ) - if input.bounds[1] < 0: - raise ValueError( - f"Constraint {c} cannot be formulated as bounds. " - f"Feature '{name}' has upper bound {input.bounds[1]} < 0.", - ) + if not result and any( + isinstance(c, NChooseKConstraint) for c in domain.constraints + ): + raise ValueError( + "No valid combined NChooseK patterns exist. " + "The overlapping constraints are contradictory." + ) - # check if the parameter names of two nchoose overlap - for c in nchoosek_constraints: - for _c in nchoosek_constraints: - if c != _c: - for name in c.features: - if name in _c.features: - raise ValueError( - f"Domain {domain} cannot be used for formulation as bounds. \ - names attribute of NChooseK constraints must be pairwise disjoint.", - ) + return result def nchoosek_constraints_as_bounds( domain: Domain, n_experiments: int, -) -> List: - """Determines the box bounds for the decision variables +) -> list: + """Determines the bounds for the optimization problem that correspond to the NChooseK constraints of the domain. Args: domain (Domain): Domain to find the bounds for. n_experiments (int): number of experiments for the design to be determined. Returns: - A list of tuples containing bounds that respect NChooseK constraint imposed - onto the decision variables. + A list of tuples containing bounds that respect NChooseK constraints + imposed onto the decision variables. """ - check_nchoosek_constraints_as_bounds(domain) # bounds without NChooseK constraints bounds = np.array( [p.bounds for p in domain.inputs.get(ContinuousInput)] * n_experiments, ) - if len(domain.constraints) > 0: - for constraint in domain.constraints: - if isinstance(constraint, NChooseKConstraint): - n_features = len(constraint.features) - # find indices of constraint features in the domain input keys - ind = [ - i - for i, p in enumerate(domain.inputs.get_keys()) - if p in constraint.features - ] - - # Build all valid deactivation patterns for every allowed - # activity level k in [min_count, max_count]. For each k, - # exactly (n_features - k) variables are set to zero. - all_inactive_patterns = [] - for k in range(max(constraint.min_count, 1), constraint.max_count + 1): - n_inactive = n_features - k - if n_inactive > 0: - all_inactive_patterns.extend( - list(combinations(ind, r=n_inactive)) - ) - - if len(all_inactive_patterns) > 0: - # patterns may have different lengths (different activity - # levels), so we keep them as a plain list of tuples - # instead of converting to a numpy array. - np.random.shuffle( - all_inactive_patterns - ) # shuffle patterns to avoid biasing the optimization towards certain patterns - - # set bounds to zero for the chosen inactive variables per experiment - D = len(domain.inputs) - for i in range(n_experiments): - pattern = all_inactive_patterns[i % len(all_inactive_patterns)] - for idx in pattern: - bounds[idx + i * D, :] = [0, 0] - if i % len(all_inactive_patterns) == ( - len(all_inactive_patterns) - 1 - ): - np.random.shuffle(all_inactive_patterns) - else: - pass + combined_patterns = _build_nchoosek_combined_patterns(domain) + + if combined_patterns: + np.random.shuffle(combined_patterns) + + D = len(domain.inputs) + for i in range(n_experiments): + pattern = combined_patterns[i % len(combined_patterns)] + for idx in pattern: + bounds[idx + i * D, :] = [0, 0] + if i % len(combined_patterns) == (len(combined_patterns) - 1): + np.random.shuffle(combined_patterns) # convert bounds to list of tuples bounds = [(b[0], b[1]) for b in bounds] diff --git a/tests/bofire/strategies/doe/test_utils.py b/tests/bofire/strategies/doe/test_utils.py index ee54a4981..50c6fc04b 100644 --- a/tests/bofire/strategies/doe/test_utils.py +++ b/tests/bofire/strategies/doe/test_utils.py @@ -25,7 +25,6 @@ from bofire.strategies.doe.utils import ( ConstraintWrapper, _minimize, - check_nchoosek_constraints_as_bounds, constraints_as_scipy_constraints, convert_formula_to_string, formula_str_to_fully_continuous, @@ -590,123 +589,6 @@ def test_minimize(): ) -def test_check_nchoosek_constraints_as_bounds(): - # define domain: possible to formulate as bounds, no NChooseK constraints - domain = Domain.from_lists( - inputs=[ContinuousInput(key=f"x{i + 1}", bounds=(0, 1)) for i in range(4)], - outputs=[ContinuousOutput(key="y")], - ) - check_nchoosek_constraints_as_bounds(domain) - - domain = Domain.from_lists( - inputs=[ContinuousInput(key=f"x{i + 1}", bounds=(0, 1)) for i in range(4)], - outputs=[ContinuousOutput(key="y")], - constraints=[], - ) - check_nchoosek_constraints_as_bounds(domain) - - domain = Domain.from_lists( - inputs=[ContinuousInput(key=f"x{i + 1}", bounds=(0, 1)) for i in range(4)], - outputs=[ContinuousOutput(key="y")], - constraints=[ - LinearEqualityConstraint(features=["x1", "x2"], coefficients=[1, 1], rhs=0), - ], - ) - check_nchoosek_constraints_as_bounds(domain) - - # n-choose-k constraints when variables can be negative - domain = Domain.from_lists( - inputs=[ - ContinuousInput(key=f"x{1}", bounds=(0, 1)), - ContinuousInput(key=f"x{2}", bounds=(0, 2)), - ContinuousInput(key=f"x{3}", bounds=(0, 3)), - ContinuousInput(key=f"x{4}", bounds=(0, 4)), - ], - outputs=[ContinuousOutput(key="y")], - constraints=[ - LinearEqualityConstraint(features=["x1", "x2"], coefficients=[1, 1], rhs=0), - LinearInequalityConstraint( - features=["x3", "x4"], - coefficients=[1, 1], - rhs=0, - ), - NChooseKConstraint( - features=["x1", "x2"], - max_count=1, - min_count=0, - none_also_valid=True, - ), - NChooseKConstraint( - features=["x3", "x4"], - max_count=1, - min_count=0, - none_also_valid=True, - ), - ], - ) - check_nchoosek_constraints_as_bounds(domain) - - # NChooseK with non-zero lower bounds should be allowed: inactive variables - # are pinned to [0, 0] by the bounds formulation regardless of the original lb. - domain = Domain.from_lists( - inputs=[ - ContinuousInput(key=f"x{i + 1}", bounds=(0.1, 1), allow_zero=True) - for i in range(4) - ], - outputs=[ContinuousOutput(key="y")], - constraints=[ - NChooseKConstraint( - features=["x1", "x2"], - max_count=1, - min_count=0, - none_also_valid=True, - ), - ], - ) - check_nchoosek_constraints_as_bounds(domain) - - domain = Domain.from_lists( - inputs=[ - ContinuousInput(key=f"x{1}", bounds=(0.1, 1.0), allow_zero=True), - ContinuousInput(key=f"x{2}", bounds=(0.1, 1.0), allow_zero=True), - ContinuousInput(key=f"x{3}", bounds=(0.1, 1.0)), - ContinuousInput(key=f"x{4}", bounds=(0.1, 1.0)), - ], - outputs=[ContinuousOutput(key="y")], - constraints=[ - NChooseKConstraint( - features=["x1", "x2"], - max_count=1, - min_count=0, - none_also_valid=True, - ), - ], - ) - check_nchoosek_constraints_as_bounds(domain) - - # Not allowed: names parameters of two NChooseK overlap - domain = Domain.from_lists( - inputs=[ContinuousInput(key=f"x{i + 1}", bounds=(0, 1)) for i in range(4)], - outputs=[ContinuousOutput(key="y")], - constraints=[ - NChooseKConstraint( - features=["x1", "x2"], - max_count=1, - min_count=0, - none_also_valid=True, - ), - NChooseKConstraint( - features=["x2", "x3", "x4"], - max_count=2, - min_count=0, - none_also_valid=True, - ), - ], - ) - with pytest.raises(ValueError): - check_nchoosek_constraints_as_bounds(domain) - - def test_nchoosek_constraints_as_bounds(): # define domain: no NChooseK constraints domain = Domain.from_lists( @@ -1128,7 +1010,63 @@ def test_nchoosek_bounds_none_also_valid(): ) +def test_multi_nchoosek_bounds_known_patterns(): + """Test nchoosek_constraints_as_bounds against known expected activity patterns.""" + n_features = 3 + d = Domain.from_lists( + inputs=[ + ContinuousInput(key=f"x{i}", bounds=(0.0, 1.0)) for i in range(n_features) + ], + outputs=[ContinuousOutput(key="y")], + constraints=[ + NChooseKConstraint( + features=["x0", "x1"], + min_count=1, + max_count=1, + none_also_valid=False, + ), + NChooseKConstraint( + features=["x1", "x2"], + min_count=1, + max_count=1, + none_also_valid=False, + ), + ], + ) + n_experiments = 12 + bounds = nchoosek_constraints_as_bounds(d, n_experiments=n_experiments) + + D = n_features + assert len(bounds) == D * n_experiments + + # extract the activity pattern (1=active, 0=pinned-to-zero) per experiment + observed_patterns = set() + for i in range(n_experiments): + exp_bounds = bounds[i * D : (i + 1) * D] + pattern = tuple(1 if b != (0.0, 0.0) else 0 for b in exp_bounds) + observed_patterns.add(pattern) + # every active slot must keep its original bounds + for j, b in enumerate(exp_bounds): + if b != (0.0, 0.0): + assert b == ( + 0.0, + 1.0, + ), f"exp {i}, feature {j}: expected (0.0, 1.0), got {b}" + + expected_patterns = { + (0, 1, 0), # x1 active, x0 and x2 inactive + (1, 0, 1), # x0 and x2 active, x1 inactive + } + assert observed_patterns == expected_patterns, ( + f"Expected patterns {sorted(expected_patterns)}, " + f"got {sorted(observed_patterns)}" + ) + + # every pattern must have between min_count and max_count active features + for pat in observed_patterns: + active = sum(pat) + assert 1 <= active <= 2, f"Pattern {pat} has {active} active features" + + if __name__ == "__main__": - test_formula_str_to_fully_continuous() - test_formula_str_to_fully_continuous_only_categoricals() - only_continuous_inputs_formula_str_to_fully_continuous() + test_multi_nchoosek_bounds_known_patterns() diff --git a/tests/bofire/strategies/test_doe.py b/tests/bofire/strategies/test_doe.py index ec6ae8276..11ba34303 100644 --- a/tests/bofire/strategies/test_doe.py +++ b/tests/bofire/strategies/test_doe.py @@ -1307,5 +1307,303 @@ def test_nchoosek_none_valid(): assert active >= 2, f"Too few active features: {active}" +def test_nchoosek_overlapping_formulation(): + """Test overlapping NChooseK constraints with a formulation constraint. + + Scenario (filler / expander formulation): + - Features: h2o, oil, compound (all in [0, 1], sum == 1) + - Filler constraint: [h2o, oil] choose 1 (exactly one filler) + - Expander constraint: [h2o, compound] choose 1 (exactly one expander) + + Because h2o is shared, the Cartesian product filters down to: + - h2o=on, oil=off, compound=off → water fills and expands + - h2o=off, oil=on, compound=on → oil as filler, compound as expander + + Both patterns have exactly 2 active features, consistent with the + formulation constraint (sum == 1 over 2 active components). + """ + d = Domain.from_lists( + inputs=[ + ContinuousInput(key="h2o", bounds=(0.0, 1.0)), + ContinuousInput(key="oil", bounds=(0.0, 1.0)), + ContinuousInput(key="compound", bounds=(0.0, 1.0)), + ], + outputs=[ContinuousOutput(key="y")], + constraints=[ + NChooseKConstraint( + features=["h2o", "oil"], + min_count=1, + max_count=1, + none_also_valid=False, + ), + NChooseKConstraint( + features=["h2o", "compound"], + min_count=1, + max_count=1, + none_also_valid=False, + ), + LinearEqualityConstraint( + features=["h2o", "oil", "compound"], + coefficients=[1.0, 1.0, 1.0], + rhs=1.0, + ), + ], + ) + + # --- Verify the bounds patterns --- + n_exp = 10 + bounds = nchoosek_constraints_as_bounds(d, n_experiments=n_exp) + # Domain sorts inputs alphabetically: compound, h2o, oil + sorted_keys = d.inputs.get_keys(ContinuousInput) + D = len(sorted_keys) + + observed_named_patterns = set() + for i in range(n_exp): + exp_bounds = bounds[i * D : (i + 1) * D] + active_keys = frozenset( + sorted_keys[j] for j, b in enumerate(exp_bounds) if b != (0.0, 0.0) + ) + observed_named_patterns.add(active_keys) + + # Only 2 valid combinations should survive: + # {h2o} → water fills and expands + # {oil, compound} → oil as filler, compound as expander + expected_named = {frozenset(["h2o"]), frozenset(["oil", "compound"])} + assert ( + observed_named_patterns == expected_named + ), f"Expected patterns {expected_named}, got {observed_named_patterns}" + + # --- Verify the optimizer produces a valid design --- + data_model = data_models.DoEStrategy( + domain=d, + criterion=DOptimalityCriterion(formula="linear"), + ) + strategy = DoEStrategy(data_model=data_model) + candidates = strategy.ask(candidate_count=n_exp, raise_validation_error=False) + assert candidates.shape == (n_exp, D) + + for _, row in candidates.iterrows(): + h2o, oil, compound = row["h2o"], row["oil"], row["compound"] + # Either water-only (1 active) or oil+compound (2 active) + is_water = abs(oil) < 1e-6 and abs(compound) < 1e-6 + is_oil_compound = abs(h2o) < 1e-6 + assert is_water or is_oil_compound, ( + f"Invalid pattern: h2o={h2o:.4f}, oil={oil:.4f}, " + f"compound={compound:.4f}" + ) + + # --- Example 2: max_count > 1 on an overlapping constraint --- + # + # Cleaning-agent formulation: + # Features: water, alcohol, surfactant (all in [0,1], sum == 1) + # Solvent constraint: [water, alcohol] choose 1..2 (one or both solvents) + # Active constraint: [water, surfactant] choose 1 (exactly one active) + # + # water is shared. Cartesian product (3×2 = 6) after consistency filter: + # Solvent{water} × Active{water} → {water} ✓ + # Solvent{water} × Active{surfactant} → water on/off conflict ✗ + # Solvent{alcohol} × Active{water} → water off/on conflict ✗ + # Solvent{alcohol} × Active{surfactant} → {alcohol, surfactant} ✓ + # Solvent{water, alcohol} × Active{water} → {water, alcohol} ✓ + # Solvent{water, alcohol} × Active{surfactant} → water on/off conflict ✗ + # + # Valid patterns: {water}, {alcohol, surfactant}, {water, alcohol} + d2 = Domain.from_lists( + inputs=[ + ContinuousInput(key="water", bounds=(0.0, 1.0)), + ContinuousInput(key="alcohol", bounds=(0.0, 1.0)), + ContinuousInput(key="surfactant", bounds=(0.0, 1.0)), + ], + outputs=[ContinuousOutput(key="y")], + constraints=[ + NChooseKConstraint( + features=["water", "alcohol"], + min_count=1, + max_count=2, + none_also_valid=False, + ), + NChooseKConstraint( + features=["water", "surfactant"], + min_count=1, + max_count=1, + none_also_valid=False, + ), + LinearEqualityConstraint( + features=["water", "alcohol", "surfactant"], + coefficients=[1.0, 1.0, 1.0], + rhs=1.0, + ), + ], + ) + + n_exp2 = 15 + bounds2 = nchoosek_constraints_as_bounds(d2, n_experiments=n_exp2) + sorted_keys2 = d2.inputs.get_keys(ContinuousInput) + D2 = len(sorted_keys2) + + observed2 = set() + for i in range(n_exp2): + exp_bounds = bounds2[i * D2 : (i + 1) * D2] + active_keys = frozenset( + sorted_keys2[j] for j, b in enumerate(exp_bounds) if b != (0.0, 0.0) + ) + observed2.add(active_keys) + + expected2 = { + frozenset(["water"]), + frozenset(["alcohol", "surfactant"]), + frozenset(["water", "alcohol"]), + } + assert ( + observed2 == expected2 + ), f"max_count>1 example: expected {expected2}, got {observed2}" + + # Verify optimizer design respects per-constraint limits + data_model2 = data_models.DoEStrategy( + domain=d2, + criterion=DOptimalityCriterion(formula="linear"), + ) + strategy2 = DoEStrategy(data_model=data_model2) + candidates2 = strategy2.ask(candidate_count=n_exp2, raise_validation_error=False) + assert candidates2.shape == (n_exp2, D2) + + for _, row in candidates2.iterrows(): + vals = {k: row[k] for k in ["water", "alcohol", "surfactant"]} + active = frozenset(k for k, v in vals.items() if abs(v) > 1e-6) + + # Solvent constraint: 1..2 of {water, alcohol} active + solvent_active = sum(1 for k in ["water", "alcohol"] if k in active) + assert 0 <= solvent_active <= 2, f"Solvent constraint violated: {active}" + + # Active constraint: at most 1 of {water, surfactant} active + active_active = sum(1 for k in ["water", "surfactant"] if k in active) + assert active_active <= 1, f"Active constraint violated: {active}" + + # Consistency: if surfactant is on, water must be off (and vice versa) + if "surfactant" in active: + assert "water" not in active, f"water+surfactant conflict: {active}" + + +def test_nchoosek_overlapping_formulation_complex(): + """Test 3 overlapping NChooseK constraints with shared + disjoint features. + + Scenario (paint formulation): + - Features: water, resin, pigment, solvent, additive (all in [0,1], sum==1) + - Binder constraint: [water, resin] choose 1 (exactly one binder base) + - Carrier constraint: [water, solvent] choose 1 (exactly one carrier) + - Colorant constraint: [pigment, additive] choose 1 (exactly one colorant) + + water is shared between binder and carrier. The Cartesian product is + {binder} × {colorant} × {carrier} = 2×2×2 = 8, but the consistency + filter on water kills 4, leaving 4 valid combined patterns: + + 1. {water, pigment} → water is both binder and carrier + 2. {water, additive} → water is both binder and carrier + 3. {resin, pigment, solvent} → resin as binder, solvent as carrier + 4. {resin, additive, solvent} → resin as binder, solvent as carrier + + Notably the active count varies: 2 or 3 per pattern. + """ + d = Domain.from_lists( + inputs=[ + ContinuousInput(key="water", bounds=(0.0, 1.0)), + ContinuousInput(key="resin", bounds=(0.0, 1.0)), + ContinuousInput(key="pigment", bounds=(0.0, 1.0)), + ContinuousInput(key="solvent", bounds=(0.0, 1.0)), + ContinuousInput(key="additive", bounds=(0.0, 1.0)), + ], + outputs=[ContinuousOutput(key="y")], + constraints=[ + NChooseKConstraint( + features=["water", "resin"], + min_count=1, + max_count=1, + none_also_valid=False, + ), + NChooseKConstraint( + features=["water", "solvent"], + min_count=1, + max_count=1, + none_also_valid=False, + ), + NChooseKConstraint( + features=["pigment", "additive"], + min_count=1, + max_count=1, + none_also_valid=False, + ), + LinearEqualityConstraint( + features=["water", "resin", "pigment", "solvent", "additive"], + coefficients=[1.0, 1.0, 1.0, 1.0, 1.0], + rhs=1.0, + ), + ], + ) + + # --- Verify the bounds patterns --- + n_exp = 20 + bounds = nchoosek_constraints_as_bounds(d, n_experiments=n_exp) + sorted_keys = d.inputs.get_keys(ContinuousInput) + D = len(sorted_keys) + + observed_named_patterns = set() + for i in range(n_exp): + exp_bounds = bounds[i * D : (i + 1) * D] + active_keys = frozenset( + sorted_keys[j] for j, b in enumerate(exp_bounds) if b != (0.0, 0.0) + ) + observed_named_patterns.add(active_keys) + + expected_named = { + frozenset(["water", "pigment"]), + frozenset(["water", "additive"]), + frozenset(["resin", "pigment", "solvent"]), + frozenset(["resin", "additive", "solvent"]), + } + assert observed_named_patterns == expected_named, ( + f"Expected {len(expected_named)} patterns {expected_named}, " + f"got {len(observed_named_patterns)} patterns {observed_named_patterns}" + ) + + # Verify active counts: must be 2 or 3 + for pat in observed_named_patterns: + assert len(pat) in (2, 3), f"Unexpected active count {len(pat)} in {pat}" + + # --- Verify the optimizer produces a valid design --- + data_model = data_models.DoEStrategy( + domain=d, + criterion=DOptimalityCriterion(formula="linear"), + ) + strategy = DoEStrategy(data_model=data_model) + candidates = strategy.ask(candidate_count=n_exp, raise_validation_error=False) + assert candidates.shape == (n_exp, D) + + for _, row in candidates.iterrows(): + vals = {k: row[k] for k in ["water", "resin", "pigment", "solvent", "additive"]} + active = frozenset(k for k, v in vals.items() if abs(v) > 1e-6) + + # The optimizer may converge an "active" variable to near-zero, so + # the observed active set can be a subset of a valid pattern. + # We only check per-constraint satisfaction (at most 1 active per + # choose-1 group). + + # Binder constraint: at most one of {water, resin} active + binder_active = sum(1 for k in ["water", "resin"] if k in active) + assert binder_active <= 1, f"Binder constraint violated: {active}" + + # Carrier constraint: at most one of {water, solvent} active + carrier_active = sum(1 for k in ["water", "solvent"] if k in active) + assert carrier_active <= 1, f"Carrier constraint violated: {active}" + + # Colorant constraint: at most one of {pigment, additive} active + colorant_active = sum(1 for k in ["pigment", "additive"] if k in active) + assert colorant_active <= 1, f"Colorant constraint violated: {active}" + + # Consistency: if water is active, resin and solvent must be off + if "water" in active: + assert "resin" not in active, f"water+resin conflict: {active}" + assert "solvent" not in active, f"water+solvent conflict: {active}" + + if __name__ == "__main__": test_nchoosek_none_valid()