Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ miette = { version="7.6.0", features = ["fancy"] }
mimalloc = { version = "0.1.48", features = ["extended", "v3", "debug"] }
libmimalloc-sys = { version = "0.1.44", features = ["extended", "v3"] }
tikv-jemallocator = { version = "0.6.1" }
tikv-jemalloc-ctl = { version = "0.6.1" }
tikv-jemalloc-ctl = { version = "0.6.1", features = ["stats"] }
tikv-jemalloc-sys = "0.6.1"
memchr = "2.8.0"
memmap2 = "0.9"
memory-stats = "1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ async fn consume_current_local(
NodeControlMsg::CollectTelemetry { .. } => observed.telemetry_ticks += 1,
NodeControlMsg::Config { .. } => observed.configs += 1,
NodeControlMsg::DrainIngress { .. }
| NodeControlMsg::MemoryPressureChanged { .. }
| NodeControlMsg::Shutdown { .. }
| NodeControlMsg::DelayedData { .. } => {
panic!("unexpected message in benchmark current local receiver");
Expand Down Expand Up @@ -202,6 +203,7 @@ async fn consume_current_shared(
NodeControlMsg::CollectTelemetry { .. } => observed.telemetry_ticks += 1,
NodeControlMsg::Config { .. } => observed.configs += 1,
NodeControlMsg::DrainIngress { .. }
| NodeControlMsg::MemoryPressureChanged { .. }
| NodeControlMsg::Shutdown { .. }
| NodeControlMsg::DelayedData { .. } => {
panic!("unexpected message in benchmark current shared receiver");
Expand Down
26 changes: 25 additions & 1 deletion rust/otap-dataflow/crates/admin/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Global health and status endpoints.
//! Process-wide health and status endpoints.
//!
//! - GET `/api/v1/status` - list all pipelines and their status
//! - GET `/api/v1/livez` - liveness probe
Expand Down Expand Up @@ -43,6 +43,8 @@ pub(crate) struct ProbeResponse {
probe: &'static str,
status: &'static str,
generated_at: String,
#[serde(skip_serializing_if = "Option::is_none")]
message: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
failing: Vec<PipelineConditionFailure>,
}
Expand Down Expand Up @@ -83,6 +85,16 @@ pub(crate) async fn livez(State(state): State<AppState>) -> (StatusCode, Json<Pr
}

pub(crate) async fn readyz(State(state): State<AppState>) -> (StatusCode, Json<ProbeResponse>) {
if state.memory_pressure_state.should_fail_readiness() {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ProbeResponse::with_message(
"readyz",
"process memory pressure at hard limit",
)),
);
}

let snapshot = state.observed_state_store.snapshot();
let failing = collect_condition_failures(
&snapshot,
Expand Down Expand Up @@ -160,6 +172,7 @@ impl ProbeResponse {
probe,
status: "ok",
generated_at: Utc::now().to_rfc3339(),
message: None,
failing: Vec::new(),
}
}
Expand All @@ -169,9 +182,20 @@ impl ProbeResponse {
probe,
status: "failed",
generated_at: Utc::now().to_rfc3339(),
message: None,
failing,
}
}

fn with_message(probe: &'static str, message: impl Into<String>) -> Self {
Self {
probe,
status: "failed",
generated_at: Utc::now().to_rfc3339(),
message: Some(message.into()),
failing: Vec::new(),
}
}
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions rust/otap-dataflow/crates/admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tower::ServiceBuilder;
use crate::error::Error;
use otap_df_config::engine::HttpAdminSettings;
use otap_df_engine::control::PipelineAdminSender;
use otap_df_engine::memory_limiter::MemoryPressureState;
use otap_df_state::store::ObservedStateHandle;
use otap_df_telemetry::log_tap::InternalLogTapHandle;
use otap_df_telemetry::registry::TelemetryRegistryHandle;
Expand All @@ -40,6 +41,9 @@ struct AppState {

/// The control message senders for controlling pipelines.
ctrl_msg_senders: Arc<Mutex<Vec<Arc<dyn PipelineAdminSender>>>>,

/// Shared process-wide memory pressure state.
memory_pressure_state: MemoryPressureState,
}

/// Run the admin HTTP server until shutdown is requested.
Expand All @@ -48,6 +52,7 @@ pub async fn run(
observed_store: ObservedStateHandle,
ctrl_msg_senders: Vec<Arc<dyn PipelineAdminSender>>,
metrics_registry: TelemetryRegistryHandle,
memory_pressure_state: MemoryPressureState,
log_tap: Option<InternalLogTapHandle>,
cancel: CancellationToken,
) -> Result<(), Error> {
Expand All @@ -56,6 +61,7 @@ pub async fn run(
metrics_registry,
log_tap,
ctrl_msg_senders: Arc::new(Mutex::new(ctrl_msg_senders)),
memory_pressure_state,
};

let api_routes = Router::new()
Expand Down
36 changes: 35 additions & 1 deletion rust/otap-dataflow/crates/config/src/byte_units.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,30 @@ where
Ok(Some(bytes as u32))
}

/// Deserialize an optional byte size as `u64`.
pub fn deserialize_u64<'de, D>(deserializer: D) -> Result<Option<u64>, D::Error>
where
D: Deserializer<'de>,
{
let value = Option::<Value>::deserialize(deserializer)?;
let Some(value) = value else {
return Ok(None);
};

let bytes = match value {
Value::Number(value) => value,
Value::String(text) => {
let parsed: Byte = text.parse().map_err(DeError::custom)?;
parsed.as_u64()
}
};

Ok(Some(bytes))
}

#[cfg(test)]
mod tests {
use super::deserialize;
use super::{deserialize, deserialize_u64};
use serde::Deserialize;

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -124,4 +145,17 @@ mod tests {
let cfg = de_yaml("value: '0.5 KiB'").expect("should parse 0.5 KiB to 512 bytes");
assert_eq!(cfg.value, Some(512));
}

#[derive(Debug, Deserialize)]
struct Holder64 {
#[serde(default, deserialize_with = "deserialize_u64")]
value: Option<u64>,
}

#[test]
fn deserialize_u64_supports_large_values() {
let cfg = serde_yaml::from_str::<Holder64>("value: 6 GiB")
.expect("should parse large byte values");
assert_eq!(cfg.value, Some(6 * 1024 * 1024 * 1024));
}
}
36 changes: 36 additions & 0 deletions rust/otap-dataflow/crates/config/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,42 @@ groups:
assert!(config.engine.observability.pipeline.is_some());
}

#[test]
fn from_yaml_requires_explicit_memory_limiter_mode() {
let yaml = r#"
version: otel_dataflow/v1
policies:
resources:
memory_limiter:
source: auto
soft_limit: 1 GiB
hard_limit: 2 GiB
engine: {}
groups:
default:
pipelines:
main:
nodes:
receiver:
type: "urn:test:receiver:example"
config: null
exporter:
type: "urn:test:exporter:example"
config: null
connections:
- from: receiver
to: exporter
"#;

let err = OtelDataflowSpec::from_yaml(yaml).expect_err("should reject missing mode");
match err {
Error::DeserializationError { details, .. } => {
assert!(details.contains("missing field `mode`"));
}
other => panic!("expected deserialization error, got: {other:?}"),
}
}

#[test]
fn from_yaml_rejects_reserved_system_group() {
let yaml = r#"
Expand Down
27 changes: 27 additions & 0 deletions rust/otap-dataflow/crates/config/src/engine/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,33 @@ impl OtelDataflowSpec {
if let Err(e) = pipeline_group.validate(pipeline_group_id) {
errors.push(e);
}
if pipeline_group
.policies
.as_ref()
.and_then(|policies| policies.resources.as_ref())
.and_then(|resources| resources.memory_limiter.as_ref())
.is_some()
{
errors.push(Error::InvalidUserConfig {
error: format!(
"groups.{pipeline_group_id}.policies.resources.memory_limiter is not supported; configure the process-wide limiter only at top-level policies.resources.memory_limiter"
),
});
}
for (pipeline_id, pipeline) in &pipeline_group.pipelines {
if pipeline
.policies()
.and_then(|policies| policies.resources.as_ref())
.and_then(|resources| resources.memory_limiter.as_ref())
.is_some()
{
errors.push(Error::InvalidUserConfig {
error: format!(
"groups.{pipeline_group_id}.pipelines.{pipeline_id}.policies.resources.memory_limiter is not supported; configure the process-wide limiter only at top-level policies.resources.memory_limiter"
),
});
}
}
}

if !errors.is_empty() {
Expand Down
Loading
Loading