Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ miette = { version="7.6.0", features = ["fancy"] }
mimalloc = { version = "0.1.48", features = ["extended", "v3", "debug"] }
libmimalloc-sys = { version = "0.1.44", features = ["extended", "v3"] }
tikv-jemallocator = { version = "0.6.1" }
tikv-jemalloc-ctl = { version = "0.6.1" }
tikv-jemalloc-ctl = { version = "0.6.1", features = ["stats"] }
tikv-jemalloc-sys = "0.6.1"
memchr = "2.8.0"
memmap2 = "0.9"
memory-stats = "1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ async fn consume_current_local(
NodeControlMsg::CollectTelemetry { .. } => observed.telemetry_ticks += 1,
NodeControlMsg::Config { .. } => observed.configs += 1,
NodeControlMsg::DrainIngress { .. }
| NodeControlMsg::MemoryPressureChanged { .. }
| NodeControlMsg::Shutdown { .. }
| NodeControlMsg::DelayedData { .. } => {
panic!("unexpected message in benchmark current local receiver");
Expand Down Expand Up @@ -202,6 +203,7 @@ async fn consume_current_shared(
NodeControlMsg::CollectTelemetry { .. } => observed.telemetry_ticks += 1,
NodeControlMsg::Config { .. } => observed.configs += 1,
NodeControlMsg::DrainIngress { .. }
| NodeControlMsg::MemoryPressureChanged { .. }
| NodeControlMsg::Shutdown { .. }
| NodeControlMsg::DelayedData { .. } => {
panic!("unexpected message in benchmark current shared receiver");
Expand Down
26 changes: 25 additions & 1 deletion rust/otap-dataflow/crates/admin/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Global health and status endpoints.
//! Process-wide health and status endpoints.
//!
//! - GET `/api/v1/status` - list all pipelines and their status
//! - GET `/api/v1/livez` - liveness probe
Expand Down Expand Up @@ -43,6 +43,8 @@ pub(crate) struct ProbeResponse {
probe: &'static str,
status: &'static str,
generated_at: String,
#[serde(skip_serializing_if = "Option::is_none")]
message: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
failing: Vec<PipelineConditionFailure>,
}
Expand Down Expand Up @@ -83,6 +85,16 @@ pub(crate) async fn livez(State(state): State<AppState>) -> (StatusCode, Json<Pr
}

pub(crate) async fn readyz(State(state): State<AppState>) -> (StatusCode, Json<ProbeResponse>) {
if state.memory_pressure_state.should_fail_readiness() {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ProbeResponse::with_message(
"readyz",
"process memory pressure at hard limit",
)),
);
}

let snapshot = state.observed_state_store.snapshot();
let failing = collect_condition_failures(
&snapshot,
Expand Down Expand Up @@ -160,6 +172,7 @@ impl ProbeResponse {
probe,
status: "ok",
generated_at: Utc::now().to_rfc3339(),
message: None,
failing: Vec::new(),
}
}
Expand All @@ -169,9 +182,20 @@ impl ProbeResponse {
probe,
status: "failed",
generated_at: Utc::now().to_rfc3339(),
message: None,
failing,
}
}

fn with_message(probe: &'static str, message: impl Into<String>) -> Self {
Self {
probe,
status: "failed",
generated_at: Utc::now().to_rfc3339(),
message: Some(message.into()),
failing: Vec::new(),
}
}
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions rust/otap-dataflow/crates/admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tower::ServiceBuilder;
use crate::error::Error;
use otap_df_config::engine::HttpAdminSettings;
use otap_df_engine::control::PipelineAdminSender;
use otap_df_engine::memory_limiter::MemoryPressureState;
use otap_df_state::store::ObservedStateHandle;
use otap_df_telemetry::log_tap::InternalLogTapHandle;
use otap_df_telemetry::registry::TelemetryRegistryHandle;
Expand All @@ -40,6 +41,9 @@ struct AppState {

/// The control message senders for controlling pipelines.
ctrl_msg_senders: Arc<Mutex<Vec<Arc<dyn PipelineAdminSender>>>>,

/// Shared process-wide memory pressure state.
memory_pressure_state: MemoryPressureState,
}

/// Run the admin HTTP server until shutdown is requested.
Expand All @@ -48,6 +52,7 @@ pub async fn run(
observed_store: ObservedStateHandle,
ctrl_msg_senders: Vec<Arc<dyn PipelineAdminSender>>,
metrics_registry: TelemetryRegistryHandle,
memory_pressure_state: MemoryPressureState,
log_tap: Option<InternalLogTapHandle>,
cancel: CancellationToken,
) -> Result<(), Error> {
Expand All @@ -56,6 +61,7 @@ pub async fn run(
metrics_registry,
log_tap,
ctrl_msg_senders: Arc::new(Mutex::new(ctrl_msg_senders)),
memory_pressure_state,
};

let api_routes = Router::new()
Expand Down
36 changes: 35 additions & 1 deletion rust/otap-dataflow/crates/config/src/byte_units.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,30 @@ where
Ok(Some(bytes as u32))
}

/// Deserialize an optional byte size as `u64`.
pub fn deserialize_u64<'de, D>(deserializer: D) -> Result<Option<u64>, D::Error>
where
D: Deserializer<'de>,
{
let value = Option::<Value>::deserialize(deserializer)?;
let Some(value) = value else {
return Ok(None);
};

let bytes = match value {
Value::Number(value) => value,
Value::String(text) => {
let parsed: Byte = text.parse().map_err(DeError::custom)?;
parsed.as_u64()
}
};

Ok(Some(bytes))
}

#[cfg(test)]
mod tests {
use super::deserialize;
use super::{deserialize, deserialize_u64};
use serde::Deserialize;

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -124,4 +145,17 @@ mod tests {
let cfg = de_yaml("value: '0.5 KiB'").expect("should parse 0.5 KiB to 512 bytes");
assert_eq!(cfg.value, Some(512));
}

#[derive(Debug, Deserialize)]
struct Holder64 {
#[serde(default, deserialize_with = "deserialize_u64")]
value: Option<u64>,
}

#[test]
fn deserialize_u64_supports_large_values() {
let cfg = serde_yaml::from_str::<Holder64>("value: 6 GiB")
.expect("should parse large byte values");
assert_eq!(cfg.value, Some(6 * 1024 * 1024 * 1024));
}
}
36 changes: 36 additions & 0 deletions rust/otap-dataflow/crates/config/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,42 @@ groups:
assert!(config.engine.observability.pipeline.is_some());
}

#[test]
fn from_yaml_requires_explicit_memory_limiter_mode() {
let yaml = r#"
version: otel_dataflow/v1
policies:
resources:
memory_limiter:
source: auto
soft_limit: 1 GiB
hard_limit: 2 GiB
engine: {}
groups:
default:
pipelines:
main:
nodes:
receiver:
type: "urn:test:receiver:example"
config: null
exporter:
type: "urn:test:exporter:example"
config: null
connections:
- from: receiver
to: exporter
"#;

let err = OtelDataflowSpec::from_yaml(yaml).expect_err("should reject missing mode");
match err {
Error::DeserializationError { details, .. } => {
assert!(details.contains("missing field `mode`"));
}
other => panic!("expected deserialization error, got: {other:?}"),
}
}

#[test]
fn from_yaml_rejects_reserved_system_group() {
let yaml = r#"
Expand Down
27 changes: 27 additions & 0 deletions rust/otap-dataflow/crates/config/src/engine/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,33 @@ impl OtelDataflowSpec {
if let Err(e) = pipeline_group.validate(pipeline_group_id) {
errors.push(e);
}
if pipeline_group
.policies
.as_ref()
.and_then(|policies| policies.resources.as_ref())
.and_then(|resources| resources.memory_limiter.as_ref())
.is_some()
{
errors.push(Error::InvalidUserConfig {
error: format!(
"groups.{pipeline_group_id}.policies.resources.memory_limiter is not supported; configure the process-wide limiter only at top-level policies.resources.memory_limiter"
),
});
}
for (pipeline_id, pipeline) in &pipeline_group.pipelines {
if pipeline
.policies()
.and_then(|policies| policies.resources.as_ref())
.and_then(|resources| resources.memory_limiter.as_ref())
.is_some()
{
errors.push(Error::InvalidUserConfig {
error: format!(
"groups.{pipeline_group_id}.pipelines.{pipeline_id}.policies.resources.memory_limiter is not supported; configure the process-wide limiter only at top-level policies.resources.memory_limiter"
),
});
}
}
}

if !errors.is_empty() {
Expand Down
Loading
Loading