Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/stackable-operator/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file.
- Add generic database connection mechanism ([#1163]).
- Add `config_overrides` module with `KeyValueOverridesProvider` trait, enabling
structured config file formats (e.g. JSON) in addition to key-value overrides ([#1177]).
- Add `Scaler` CRD ([#1190], [#1195]).

### Changed

Expand All @@ -28,9 +29,11 @@ All notable changes to this project will be documented in this file.

[#1163]: https://github.com/stackabletech/operator-rs/pull/1163
[#1177]: https://github.com/stackabletech/operator-rs/pull/1177
[#1190]: https://github.com/stackabletech/operator-rs/pull/1190
[#1191]: https://github.com/stackabletech/operator-rs/pull/1191
[#1192]: https://github.com/stackabletech/operator-rs/pull/1192
[#1194]: https://github.com/stackabletech/operator-rs/pull/1194
[#1195]: https://github.com/stackabletech/operator-rs/pull/1195

## [0.109.0] - 2026-04-07

Expand Down
82 changes: 62 additions & 20 deletions crates/stackable-operator/crds/Scaler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,38 +65,80 @@ spec:
type: string
state:
description: The current state of the scaler state machine.
oneOf:
- required:
- idle
- required:
- preScaling
- required:
- scaling
- required:
- postScaling
- required:
- failed
properties:
details:
failed:
description: |-
A hook returned an error.

The scaler stays here until the user applies the [`Annotation::autoscaling_retry`] annotation
to trigger a reset to [`ScalerState::Idle`].
properties:
failedIn:
description: In which state the scaling operation failed.
description: Which stage produced the error.
enum:
- preScaling
- scaling
- postScaling
- PreScaling
- Scaling
- PostScaling
type: string
previous_replicas:
maximum: 65535.0
minimum: 0.0
type: uint16
reason:
description: Human-readable error message from the hook.
type: string
required:
- failedIn
- reason
type: object
idle:
description: No scaling operation is in progress.
type: object
postScaling:
description: |-
Running the `post_scale` hook (e.g. cluster rebalance).

This stage additionally tracks the previous replica count to be able derive the direction
of the scaling operation.
properties:
previousReplicas:
format: uint16
maximum: 65535.0
minimum: 0.0
type: integer
required:
- previousReplicas
type: object
preScaling:
description: Running the `pre_scale` hook (e.g. data offload).
type: object
scaling:
description: |-
Waiting for the StatefulSet to converge to the new replica count.

This stage additionally tracks the previous replica count to be able derive the direction
of the scaling operation.
properties:
previousReplicas:
format: uint16
maximum: 65535.0
minimum: 0.0
type: integer
required:
- previousReplicas
type: object
state:
enum:
- idle
- preScaling
- scaling
- postScaling
- failed
type: string
required:
- state
type: object
required:
- lastTransitionTime
- replicas
- state
- lastTransitionTime
type: object
required:
- spec
Expand Down
109 changes: 54 additions & 55 deletions crates/stackable-operator/src/crd/scaler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::borrow::Cow;

use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
use kube::CustomResource;
use schemars::JsonSchema;
Expand All @@ -21,7 +19,7 @@ pub mod versioned {
),
namespaced
))]
#[derive(Clone, Debug, PartialEq, CustomResource, Deserialize, Serialize, JsonSchema)]
#[derive(Clone, Debug, PartialEq, Eq, CustomResource, Deserialize, Serialize, JsonSchema)]
pub struct ScalerSpec {
/// Desired replica count.
///
Expand All @@ -40,7 +38,7 @@ pub mod versioned {
}

/// Status of a StackableScaler.
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ScalerStatus {
/// The current total number of replicas targeted by the managed StatefulSet.
Expand All @@ -59,25 +57,15 @@ pub struct ScalerStatus {
pub last_transition_time: Time,
}

// We use `#[serde(tag)]` and `#[serde(content)]` here to circumvent Kubernetes restrictions in their
// structural schema subset of OpenAPI schemas. They don't allow one variant to be typed as a string
// and others to be typed as objects. We therefore encode the variant data in a separate details
// key/object. With this, all variants can be encoded as strings, while the status can still contain
// additional data in an extra field when needed.
#[derive(Clone, Debug, Deserialize, Serialize, strum::Display)]
#[serde(
tag = "state",
content = "details",
rename_all = "camelCase",
rename_all_fields = "camelCase"
)]
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, JsonSchema, strum::Display)]
#[serde(rename_all = "camelCase", rename_all_fields = "camelCase")]
#[strum(serialize_all = "camelCase")]
pub enum ScalerState {
/// No scaling operation is in progress.
Idle,
Idle {},

/// Running the `pre_scale` hook (e.g. data offload).
PreScaling,
PreScaling {},

/// Waiting for the StatefulSet to converge to the new replica count.
///
Expand All @@ -104,44 +92,9 @@ pub enum ScalerState {
},
}

// We manually implement the JSON schema instead of deriving it, because kube's schema transformer
// cannot handle the derived JsonSchema and proceeds to hit the following error: "Property "state"
// has the schema ... but was already defined as ... in another subschema. The schemas for a
// property used in multiple subschemas must be identical".
impl JsonSchema for ScalerState {
fn schema_name() -> Cow<'static, str> {
"ScalerState".into()
}

fn json_schema(generator: &mut schemars::generate::SchemaGenerator) -> schemars::Schema {
schemars::json_schema!({
"type": "object",
"required": ["state"],
"properties": {
"state": {
"type": "string",
"enum": ["idle", "preScaling", "scaling", "postScaling", "failed"]
},
"details": {
"type": "object",
"properties": {
"failedIn": generator.subschema_for::<FailedInState>(),
"previous_replicas": {
"type": "uint16",
"minimum": u16::MIN,
"maximum": u16::MAX
},
"reason": { "type": "string" }
}
}
}
})
}
}

/// In which state the scaling operation failed.
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "PascalCase")]
pub enum FailedInState {
/// The `pre_scale` hook returned an error.
PreScaling,
Expand All @@ -152,3 +105,49 @@ pub enum FailedInState {
/// The `post_scale` hook returned an error.
PostScaling,
}

#[cfg(test)]
mod tests {
use rstest::rstest;

use super::*;
use crate::{
test_utils::serialize_to_yaml_with_singleton_map, utils::yaml_from_str_singleton_map,
};

#[rstest]
#[case::idle("idle: {}", ScalerState::Idle { })]
#[case::pre_scaling("preScaling: {}", ScalerState::PreScaling { })]
#[case::scaling("scaling:
previousReplicas: 42", ScalerState::Scaling { previous_replicas: 42 })]
#[case::post_scaling("postScaling:
previousReplicas: 42", ScalerState::PostScaling { previous_replicas: 42 })]
#[case::failed("failed:
failedIn: PreScaling
reason: bruh moment", ScalerState::Failed {
failed_in: FailedInState::PreScaling,
reason: "bruh moment".to_owned()
} )]
fn parse_state(#[case] input: &str, #[case] expected: ScalerState) {
let parsed: ScalerState =
yaml_from_str_singleton_map(input).expect("invalid test YAML input");
assert_eq!(parsed, expected);
}

#[rstest]
#[case::idle(ScalerState::Idle { }, "idle: {}\n")]
#[case::pre_scaling(ScalerState::PreScaling { }, "preScaling: {}\n")]
#[case::scaling(ScalerState::Scaling { previous_replicas: 42 }, "scaling:
previousReplicas: 42\n")]
#[case::post_scaling(ScalerState::PostScaling { previous_replicas: 42 }, "postScaling:
previousReplicas: 42\n")]
#[case::failed(ScalerState::Failed { failed_in: FailedInState::PreScaling, reason: "bruh moment".to_owned() }, "failed:
failedIn: PreScaling
reason: bruh moment\n")]
fn serialize_state(#[case] input: ScalerState, #[case] expected: &str) {
let serialized =
serialize_to_yaml_with_singleton_map(&input).expect("serialization always passes");

assert_eq!(serialized, expected);
}
}
1 change: 1 addition & 0 deletions crates/stackable-operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub mod product_config_utils;
pub mod product_logging;
pub mod role_utils;
pub mod status;
pub mod test_utils;
pub mod utils;
pub mod validation;

Expand Down
17 changes: 17 additions & 0 deletions crates/stackable-operator/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/// Please use only in tests, as we have non-ideal error handling in case serde_yaml produced
/// non-utf8 output.
pub fn serialize_to_yaml_with_singleton_map<S>(input: &S) -> Result<String, serde_yaml::Error>
where
S: serde::Serialize,
{
use serde::ser::Error as _;

let mut buffer = Vec::new();
let mut serializer = serde_yaml::Serializer::new(&mut buffer);
serde_yaml::with::singleton_map_recursive::serialize(input, &mut serializer)?;
String::from_utf8(buffer).map_err(|err| {
serde_yaml::Error::custom(format!(
"For *some* reason, serde_yaml produced non-utf8 output: {err}"
))
})
}
Loading