diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index 490a7de889..1f9c826e7c 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -40,7 +40,6 @@ thiserror.workspace = true serde_json.workspace = true clap.workspace = true mimalloc = { workspace = true, optional = true } -sysinfo.workspace = true [target.'cfg(not(windows))'.dependencies] tikv-jemallocator = { workspace = true, optional = true } diff --git a/rust/otap-dataflow/README.md b/rust/otap-dataflow/README.md index da03445cb2..30de819da9 100644 --- a/rust/otap-dataflow/README.md +++ b/rust/otap-dataflow/README.md @@ -351,6 +351,67 @@ The `views` sub-module contains zero-copy machinery for: - interpreting OTLP bytes using views to build OTAP records - interpreting OTAP records using views to encode OTLP bytes +## Embedding in Custom Distributions + +The engine crates are designed as reusable libraries. A custom binary can +link the same controller, factory, and node crates and register its own +components via `linkme` distributed slices -- exactly how `src/main.rs` works. + +The `otap_df_controller::startup` module provides three helpers that every +embedding binary typically needs: + +- **`validate_engine_components`** -- Checks that every node URN in a + config maps to a registered component and runs per-component config + validation. +- **`apply_cli_overrides`** -- Merges core-allocation and HTTP-admin + bind overrides into an `OtelDataflowSpec`. +- **`system_info`** -- Returns a formatted string with CPU/memory info + and all registered component URNs, useful for `--help` banners or + diagnostics. + +A minimal custom binary looks like this: + +```rust +use otap_df_config::engine::OtelDataflowSpec; +use otap_df_controller::{Controller, startup}; + +// Side-effect imports to register components via linkme. +use otap_df_core_nodes as _; +// Bring your own contrib/custom nodes as needed. + +// Reference the pipeline factory (or define your own). +use otap_df_otap::OTAP_PIPELINE_FACTORY; + +fn main() -> Result<(), Box> { + otap_df_otap::crypto::install_crypto_provider()?; + + let mut cfg = OtelDataflowSpec::from_file("pipeline.yaml")?; + + // Apply any CLI overrides (core count, admin bind address, etc.) + startup::apply_cli_overrides(&mut cfg, Some(4), None, None); + + // Validate that every node URN in the config is registered. + startup::validate_engine_components(&cfg, &OTAP_PIPELINE_FACTORY)?; + + // Print diagnostics. + // Pass "system" here for the minimal example; in practice, align this + // string with your binary's allocator feature (e.g. "jemalloc", "mimalloc"). + println!("{}", startup::system_info(&OTAP_PIPELINE_FACTORY, "system")); + + // Run the engine. + let controller = Controller::new(&OTAP_PIPELINE_FACTORY); + controller.run_forever(cfg)?; + Ok(()) +} +``` + +This pattern is analogous to the builder approach used by projects like +`bindplane-otel-collector` in the Go ecosystem. The default `src/main.rs` +is itself a thin wrapper over these library calls. + +For a complete runnable example, see +[`examples/custom_collector.rs`](examples/custom_collector.rs). + ## Development Setup **Requirements**: diff --git a/rust/otap-dataflow/crates/controller/Cargo.toml b/rust/otap-dataflow/crates/controller/Cargo.toml index 5ca7227f2b..2b0b198ddd 100644 --- a/rust/otap-dataflow/crates/controller/Cargo.toml +++ b/rust/otap-dataflow/crates/controller/Cargo.toml @@ -22,6 +22,7 @@ otap-df-engine = { workspace = true } otap-df-telemetry = { workspace = true } otap-df-admin = { workspace = true } +sysinfo = { workspace = true } thiserror = { workspace = true } miette = { workspace = true } core_affinity = { workspace = true } diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index 65c82d5331..b4017a4487 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -75,7 +75,7 @@ use otap_df_engine::topic::{ InMemoryBackend, PipelineTopicBinding, TopicBroker, TopicOptions, TopicPublishOutcomeConfig, TopicSet, }; -use otap_df_state::store::ObservedStateStore; +use otap_df_state::store::{ObservedStateHandle, ObservedStateStore}; use otap_df_telemetry::event::{EngineEvent, ErrorSummary, ObservedEventReporter}; use otap_df_telemetry::registry::TelemetryRegistryHandle; use otap_df_telemetry::reporter::MetricsReporter; @@ -91,6 +91,8 @@ use std::thread; /// Error types and helpers for the controller module. pub mod error; +/// Reusable startup helpers (validation, CLI overrides, system info). +pub mod startup; /// Utilities to spawn async tasks on dedicated threads with graceful shutdown. pub mod thread_task; @@ -264,14 +266,52 @@ impl Result<(), Error> { - self.run_with_mode(engine_config, RunMode::ParkMainThread) + self.run_with_mode( + engine_config, + RunMode::ParkMainThread, + None::, + ) + } + + /// Starts the controller and invokes `observer` with an + /// [`ObservedStateHandle`] as soon as the pipeline state store is ready. + /// + /// The callback fires once, before the engine blocks. Use it to obtain + /// zero-overhead, in-process access to pipeline liveness, readiness, and + /// health without going through the admin HTTP server. + pub fn run_forever_with_observer( + &self, + engine_config: OtelDataflowSpec, + observer: F, + ) -> Result<(), Error> + where + F: FnOnce(ObservedStateHandle), + { + self.run_with_mode(engine_config, RunMode::ParkMainThread, Some(observer)) } /// Starts the controller with the given engine configurations. /// /// Runs until pipelines are shut down, then closes telemetry/admin services. pub fn run_till_shutdown(&self, engine_config: OtelDataflowSpec) -> Result<(), Error> { - self.run_with_mode(engine_config, RunMode::ShutdownWhenDone) + self.run_with_mode( + engine_config, + RunMode::ShutdownWhenDone, + None::, + ) + } + + /// Like [`run_till_shutdown`](Self::run_till_shutdown), but invokes + /// `observer` with an [`ObservedStateHandle`] before blocking. + pub fn run_till_shutdown_with_observer( + &self, + engine_config: OtelDataflowSpec, + observer: F, + ) -> Result<(), Error> + where + F: FnOnce(ObservedStateHandle), + { + self.run_with_mode(engine_config, RunMode::ShutdownWhenDone, Some(observer)) } fn map_topic_spec_to_options( @@ -957,11 +997,15 @@ impl( &self, engine_config: OtelDataflowSpec, run_mode: RunMode, - ) -> Result<(), Error> { + observer: Option, + ) -> Result<(), Error> + where + F: FnOnce(ObservedStateHandle), + { let num_pipeline_groups = engine_config.groups.len(); let resolved_config = engine_config.resolve(); let (engine, pipelines, observability_pipeline) = resolved_config.into_parts(); @@ -993,6 +1037,12 @@ impl, + core_id_range: Option, +) -> Option { + match (core_id_range, num_cores) { + (Some(range), _) => Some(range), + (None, Some(0)) => Some(CoreAllocation::AllCores), + (None, Some(count)) => Some(CoreAllocation::CoreCount { count }), + (None, None) => None, + } +} + +/// Converts an optional bind-address string into [`HttpAdminSettings`]. +#[must_use] +pub fn http_admin_bind_override(http_admin_bind: Option) -> Option { + http_admin_bind.map(|bind_address| HttpAdminSettings { bind_address }) +} + +/// Applies core-allocation and HTTP-admin bind overrides to an +/// [`OtelDataflowSpec`]. +/// +/// This is the standard way for CLI entry points to merge command-line flags +/// into a parsed configuration before starting the engine. +pub fn apply_cli_overrides( + engine_cfg: &mut OtelDataflowSpec, + num_cores: Option, + core_id_range: Option, + http_admin_bind: Option, +) { + if let Some(core_allocation) = core_allocation_override(num_cores, core_id_range) { + engine_cfg + .policies + .set_resources(ResourcesPolicy { core_allocation }); + } + if let Some(http_admin) = http_admin_bind_override(http_admin_bind) { + engine_cfg.engine.http_admin = Some(http_admin); + } +} + +/// Validates that every node in a single pipeline references a component URN +/// registered in the given [`PipelineFactory`], and runs per-component config +/// validation. +/// +/// Structural config validation (connections, node references, policies) is +/// already performed during config deserialization +/// ([`OtelDataflowSpec::from_file`]). This function adds the semantic check +/// that all referenced components are actually compiled into the binary, and +/// validates their node-specific config statically. +/// +/// **Scope:** This is *static* validation only -- it checks that config values +/// can be deserialized into the expected types. It does **not** detect runtime +/// issues such as port conflicts, unreachable endpoints, or missing files. +pub fn validate_pipeline_components( + pipeline_group_id: &PipelineGroupId, + pipeline_id: &PipelineId, + pipeline_cfg: &PipelineConfig, + factory: &PipelineFactory, +) -> Result<(), Box> { + for (node_id, node_cfg) in pipeline_cfg.node_iter() { + let kind = node_cfg.kind(); + let urn_str = node_cfg.r#type.as_str(); + + let validate_config_fn = match kind { + NodeKind::Receiver => factory + .get_receiver_factory_map() + .get(urn_str) + .map(|f| f.validate_config), + NodeKind::Processor | NodeKind::ProcessorChain => factory + .get_processor_factory_map() + .get(urn_str) + .map(|f| f.validate_config), + NodeKind::Exporter => factory + .get_exporter_factory_map() + .get(urn_str) + .map(|f| f.validate_config), + }; + + match validate_config_fn { + None => { + let kind_name = match kind { + NodeKind::Receiver => "receiver", + NodeKind::Processor | NodeKind::ProcessorChain => "processor", + NodeKind::Exporter => "exporter", + }; + return Err(std::io::Error::other(format!( + "Unknown {} component `{}` in pipeline_group={} pipeline={} node={}", + kind_name, + urn_str, + pipeline_group_id.as_ref(), + pipeline_id.as_ref(), + node_id.as_ref() + )) + .into()); + } + Some(validate_fn) => { + validate_fn(&node_cfg.config).map_err(|e| { + std::io::Error::other(format!( + "Invalid config for component `{}` in pipeline_group={} pipeline={} node={}: {}", + urn_str, + pipeline_group_id.as_ref(), + pipeline_id.as_ref(), + node_id.as_ref(), + e + )) + })?; + } + } + } + + Ok(()) +} + +/// Validates that every node in every pipeline (including the observability +/// pipeline, if configured) references a component URN registered in the +/// given [`PipelineFactory`]. +/// +/// This is the top-level validation entry point that iterates over all +/// pipeline groups, all pipelines within each group, and the optional +/// observability pipeline. +pub fn validate_engine_components( + engine_cfg: &OtelDataflowSpec, + factory: &PipelineFactory, +) -> Result<(), Box> { + for (pipeline_group_id, pipeline_group) in &engine_cfg.groups { + for (pipeline_id, pipeline_cfg) in &pipeline_group.pipelines { + validate_pipeline_components(pipeline_group_id, pipeline_id, pipeline_cfg, factory)?; + } + } + + // Also validate the observability pipeline nodes, if configured. + if let Some(obs_pipeline) = &engine_cfg.engine.observability.pipeline { + let obs_group_id: PipelineGroupId = SYSTEM_PIPELINE_GROUP_ID.into(); + let obs_pipeline_id: PipelineId = SYSTEM_OBSERVABILITY_PIPELINE_ID.into(); + let obs_pipeline_config = obs_pipeline.clone().into_pipeline_config(); + validate_pipeline_components( + &obs_group_id, + &obs_pipeline_id, + &obs_pipeline_config, + factory, + )?; + } + + Ok(()) +} + +/// Returns a human-readable string with system information and all component +/// URNs registered in the given [`PipelineFactory`]. +/// +/// `memory_allocator` should describe the active global allocator (e.g. +/// `"jemalloc"`, `"mimalloc"`, or `"system"`). The library cannot detect this +/// automatically because allocator selection is a feature of the final binary +/// crate. +/// +/// Useful for diagnostics, `--help` output, or startup banners in any +/// distribution. +#[must_use] +pub fn system_info( + factory: &PipelineFactory, + memory_allocator: &str, +) -> String { + let available_cores = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + + let build_mode = if cfg!(debug_assertions) { + "debug" + } else { + "release" + }; + + let mut sys = System::new_all(); + sys.refresh_memory(); + let total_memory_gb = sys.total_memory() as f64 / 1_073_741_824.0; + let available_memory_gb = sys.available_memory() as f64 / 1_073_741_824.0; + + let debug_warning = if cfg!(debug_assertions) { + "\n\n⚠️ WARNING: This binary was compiled in debug mode. + Debug builds are NOT recommended for production, benchmarks, or performance testing. + Use 'cargo build --release' for optimal performance." + } else { + "" + }; + + let mut receivers_sorted: Vec<&str> = + factory.get_receiver_factory_map().keys().copied().collect(); + let mut processors_sorted: Vec<&str> = factory + .get_processor_factory_map() + .keys() + .copied() + .collect(); + let mut exporters_sorted: Vec<&str> = + factory.get_exporter_factory_map().keys().copied().collect(); + receivers_sorted.sort(); + processors_sorted.sort(); + exporters_sorted.sort(); + + format!( + "System Information: + Available CPU cores: {} + Available memory: {:.2} GB / {:.2} GB + Build mode: {} + Memory allocator: {} + +Available Component URNs: + Receivers: {} + Processors: {} + Exporters: {} + +Example configuration files can be found in the configs/ directory.{}", + available_cores, + available_memory_gb, + total_memory_gb, + build_mode, + memory_allocator, + receivers_sorted.join(", "), + processors_sorted.join(", "), + exporters_sorted.join(", "), + debug_warning + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use otap_df_config::policy::Policies; + + fn minimal_engine_yaml() -> &'static str { + r#" +version: otel_dataflow/v1 +engine: + http_admin: + bind_address: "127.0.0.1:18080" +groups: + default: + pipelines: + main: + nodes: + receiver: + type: "urn:test:receiver:example" + config: null + exporter: + type: "urn:test:exporter:example" + config: null + connections: + - from: receiver + to: exporter +"# + } + + #[test] + fn core_allocation_override_prefers_range() { + let range = CoreAllocation::CoreSet { + set: vec![otap_df_config::policy::CoreRange { start: 2, end: 4 }], + }; + let resolved = core_allocation_override(Some(3), Some(range.clone())); + assert_eq!(resolved, Some(range)); + } + + #[test] + fn core_allocation_override_maps_num_cores() { + assert_eq!( + core_allocation_override(Some(5), None), + Some(CoreAllocation::CoreCount { count: 5 }) + ); + assert_eq!( + core_allocation_override(Some(0), None), + Some(CoreAllocation::AllCores) + ); + assert_eq!(core_allocation_override(None, None), None); + } + + #[test] + fn http_admin_bind_override_sets_custom_bind() { + let settings = http_admin_bind_override(Some("0.0.0.0:18080".to_string())); + assert_eq!( + settings.map(|s| s.bind_address), + Some("0.0.0.0:18080".to_string()) + ); + } + + #[test] + fn http_admin_bind_override_none_keeps_config_value() { + assert!(http_admin_bind_override(None).is_none()); + } + + #[test] + fn apply_cli_overrides_updates_top_level_resources_and_http_admin() { + let mut cfg = + OtelDataflowSpec::from_yaml(minimal_engine_yaml()).expect("base config should parse"); + apply_cli_overrides(&mut cfg, Some(3), None, Some("0.0.0.0:28080".to_string())); + + assert_eq!( + Policies::resolve([&cfg.policies]).resources.core_allocation, + CoreAllocation::CoreCount { count: 3 } + ); + assert_eq!( + cfg.engine + .http_admin + .as_ref() + .map(|s| s.bind_address.as_str()), + Some("0.0.0.0:28080") + ); + + let resolved = cfg.resolve(); + let main = resolved + .pipelines + .iter() + .find(|p| p.pipeline_group_id.as_ref() == "default" && p.pipeline_id.as_ref() == "main") + .expect("default/main should exist"); + assert_eq!( + main.policies.resources.core_allocation, + CoreAllocation::CoreCount { count: 3 } + ); + } + + #[test] + fn apply_cli_overrides_only_changes_global_resources_policy() { + let yaml = r#" +version: otel_dataflow/v1 +policies: + resources: + core_allocation: + type: core_count + count: 9 +engine: {} +groups: + default: + policies: + resources: + core_allocation: + type: core_count + count: 5 + pipelines: + main: + nodes: + receiver: + type: "urn:test:receiver:example" + config: null + exporter: + type: "urn:test:exporter:example" + config: null + connections: + - from: receiver + to: exporter +"#; + let mut cfg = OtelDataflowSpec::from_yaml(yaml).expect("config should parse"); + apply_cli_overrides(&mut cfg, Some(2), None, None); + + // CLI updates top-level/global policy. + assert_eq!( + Policies::resolve([&cfg.policies]).resources.core_allocation, + CoreAllocation::CoreCount { count: 2 } + ); + + // Pipeline resolution keeps precedence (group-level over top-level). + let resolved = cfg.resolve(); + let main = resolved + .pipelines + .iter() + .find(|p| p.pipeline_group_id.as_ref() == "default" && p.pipeline_id.as_ref() == "main") + .expect("default/main should exist"); + assert_eq!( + main.policies.resources.core_allocation, + CoreAllocation::CoreCount { count: 5 } + ); + } + + #[test] + fn cli_num_cores_not_shadowed_by_implicit_default_resources() { + let yaml = r#" +version: otel_dataflow/v1 +engine: {} +groups: + default: + policies: + channel_capacity: + pdata: 500 + pipelines: + main: + nodes: + receiver: + type: "urn:test:receiver:example" + config: null + exporter: + type: "urn:test:exporter:example" + config: null + connections: + - from: receiver + to: exporter +"#; + let mut cfg = OtelDataflowSpec::from_yaml(yaml).expect("config should parse"); + apply_cli_overrides(&mut cfg, Some(4), None, None); + + let resolved = cfg.resolve(); + let main = resolved + .pipelines + .iter() + .find(|p| p.pipeline_group_id.as_ref() == "default" && p.pipeline_id.as_ref() == "main") + .expect("default/main should exist"); + assert_eq!( + main.policies.resources.core_allocation, + CoreAllocation::CoreCount { count: 4 }, + "--num-cores 4 must not be shadowed by an implicit group-level resources default" + ); + } +} diff --git a/rust/otap-dataflow/examples/custom_collector.rs b/rust/otap-dataflow/examples/custom_collector.rs new file mode 100644 index 0000000000..c1395cc055 --- /dev/null +++ b/rust/otap-dataflow/examples/custom_collector.rs @@ -0,0 +1,170 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! A minimal custom collector built on top of the OTAP dataflow engine +//! used as a library. +//! +//! This example demonstrates how to embed the engine in a custom binary +//! using the `otap_df_controller::startup` helpers, without copying any +//! startup logic from the default `src/main.rs`. +//! +//! # Usage +//! +//! ```bash +//! cargo run --example custom_collector -- --config configs/fake-perf.yaml +//! cargo run --example custom_collector -- --config configs/fake-perf.yaml --poll-status +//! ``` + +use std::path::PathBuf; + +use clap::Parser; +use otap_df_config::engine::OtelDataflowSpec; +use otap_df_config::policy::{CoreAllocation, CoreRange}; +// Side-effect imports: link the crates so their `linkme` distributed-slice +// registrations (component factories) are visible in `OTAP_PIPELINE_FACTORY`. +use otap_df_contrib_nodes as _; +use otap_df_controller::Controller; +use otap_df_controller::startup; +use otap_df_core_nodes as _; +use otap_df_otap::OTAP_PIPELINE_FACTORY; + +/// A minimal custom OTAP dataflow collector. +/// +/// Shows how the engine can be embedded as a library. All startup logic +/// (validation, CLI overrides, system info) comes from +/// `otap_df_controller::startup`. +#[derive(Parser, Debug)] +#[command( + author, + version, + about = "Custom OTAP Collector (library mode example)" +)] +struct Args { + /// Path to the engine configuration file (.yaml or .json) + #[arg(short = 'c', long, value_name = "FILE")] + config: PathBuf, + + /// Number of cores to use (0 for all available cores) + #[arg(long)] + num_cores: Option, + + /// Inclusive range of CPU core IDs to pin threads to (e.g. "0-3") + #[arg(long, value_name = "RANGE", value_parser = parse_core_id_allocation, conflicts_with = "num_cores")] + core_id_range: Option, + + /// Address to bind the HTTP admin server to (e.g. "127.0.0.1:8080") + #[arg(long)] + http_admin_bind: Option, + + /// Validate the configuration and exit without starting the engine + #[arg(long)] + validate_and_exit: bool, + + /// Periodically print pipeline health snapshots to stderr + #[arg(long)] + poll_status: bool, +} + +fn parse_core_id_allocation(s: &str) -> Result { + Ok(CoreAllocation::CoreSet { + set: s + .split(',') + .map(|part| { + part.trim() + .parse::() + .map(|n| CoreRange { start: n, end: n }) + .or_else(|_| { + let normalized = part.replace("..=", "-").replace("..", "-"); + let mut parts = normalized.split('-'); + let start = parts + .next() + .ok_or("missing start")? + .trim() + .parse::() + .map_err(|_| "invalid start".to_string())?; + let end = parts + .next() + .ok_or("missing end")? + .trim() + .parse::() + .map_err(|_| "invalid end".to_string())?; + Ok(CoreRange { start, end }) + }) + }) + .collect::, String>>()?, + }) +} + +fn memory_allocator_name() -> &'static str { + if cfg!(feature = "mimalloc") { + "mimalloc" + } else if cfg!(all(feature = "jemalloc", not(windows))) { + "jemalloc" + } else { + "system" + } +} + +fn main() -> Result<(), Box> { + // Install the TLS crypto provider before any network operations. + otap_df_otap::crypto::install_crypto_provider() + .map_err(|e| format!("Failed to install rustls crypto provider: {e}"))?; + + let args = Args::parse(); + + // Print system diagnostics and registered components. + println!( + "{}", + startup::system_info(&OTAP_PIPELINE_FACTORY, memory_allocator_name()) + ); + + // Load and patch the configuration. + let mut engine_cfg = OtelDataflowSpec::from_file(&args.config)?; + startup::apply_cli_overrides( + &mut engine_cfg, + args.num_cores, + args.core_id_range, + args.http_admin_bind, + ); + + // Validate that every node URN maps to a registered component. + startup::validate_engine_components(&engine_cfg, &OTAP_PIPELINE_FACTORY)?; + + if args.validate_and_exit { + println!("Configuration '{}' is valid.", args.config.display()); + return Ok(()); + } + + // Run the engine, obtaining an ObservedStateHandle for in-process health checks. + let poll_status = args.poll_status; + let controller = Controller::new(&OTAP_PIPELINE_FACTORY); + let result = controller.run_forever_with_observer(engine_cfg, |handle| { + eprintln!("[observer] ObservedStateHandle obtained"); + if poll_status { + std::thread::spawn(move || { + loop { + std::thread::sleep(std::time::Duration::from_secs(5)); + let snapshot = handle.snapshot(); + for (key, status) in &snapshot { + eprintln!( + "[observer] pipeline {}:{} -> {:?}", + key.pipeline_group_id().as_ref(), + key.pipeline_id().as_ref(), + status + ); + } + } + }); + } + }); + match result { + Ok(_) => { + println!("Pipeline completed successfully"); + Ok(()) + } + Err(e) => { + eprintln!("Pipeline failed: {e}"); + std::process::exit(1); + } + } +} diff --git a/rust/otap-dataflow/src/main.rs b/rust/otap-dataflow/src/main.rs index 805fd3eee7..a1168e1255 100644 --- a/rust/otap-dataflow/src/main.rs +++ b/rust/otap-dataflow/src/main.rs @@ -5,24 +5,29 @@ use clap::Parser; use otap_df_config::config_provider::{ConfigFormat, resolve_config}; -use otap_df_config::engine::{ - HttpAdminSettings, OtelDataflowSpec, SYSTEM_OBSERVABILITY_PIPELINE_ID, SYSTEM_PIPELINE_GROUP_ID, -}; -use otap_df_config::node::NodeKind; -use otap_df_config::pipeline::PipelineConfig; -use otap_df_config::policy::{CoreAllocation, CoreRange, ResourcesPolicy}; -use otap_df_config::{PipelineGroupId, PipelineId}; +use otap_df_config::engine::OtelDataflowSpec; +use otap_df_config::policy::{CoreAllocation, CoreRange}; // Keep this side-effect import so the crate is linked and its `linkme` // distributed-slice registrations (contrib nodes) are visible // in `OTAP_PIPELINE_FACTORY` at runtime. use otap_df_contrib_nodes as _; use otap_df_controller::Controller; +use otap_df_controller::startup; // Keep this side-effect import so the crate is linked and its `linkme` // distributed-slice registrations (core nodes) are visible // in `OTAP_PIPELINE_FACTORY` at runtime. use otap_df_core_nodes as _; use otap_df_otap::OTAP_PIPELINE_FACTORY; -use sysinfo::System; + +fn memory_allocator_name() -> &'static str { + if cfg!(feature = "mimalloc") { + "mimalloc" + } else if cfg!(all(feature = "jemalloc", not(windows))) { + "jemalloc" + } else { + "system" + } +} #[cfg(all( not(windows), @@ -92,7 +97,7 @@ static GLOBAL: Jemalloc = Jemalloc; version, about, long_about = None, - after_help = system_info(), + after_help = startup::system_info(&OTAP_PIPELINE_FACTORY, memory_allocator_name()), after_long_help = concat!( "EXAMPLES:\n", " ", env!("CARGO_BIN_NAME"), " --config file:/etc/config.yaml\n", @@ -169,129 +174,6 @@ fn parse_core_id_range(s: &str) -> Result { Ok(CoreRange { start, end }) } -fn core_allocation_override( - num_cores: Option, - core_id_range: Option, -) -> Option { - match (core_id_range, num_cores) { - (Some(range), _) => Some(range), - (None, Some(0)) => Some(CoreAllocation::AllCores), - (None, Some(count)) => Some(CoreAllocation::CoreCount { count }), - (None, None) => None, - } -} - -fn http_admin_bind_override(http_admin_bind: Option) -> Option { - http_admin_bind.map(|bind_address| HttpAdminSettings { bind_address }) -} - -fn apply_cli_overrides( - engine_cfg: &mut OtelDataflowSpec, - num_cores: Option, - core_id_range: Option, - http_admin_bind: Option, -) { - if let Some(core_allocation) = core_allocation_override(num_cores, core_id_range) { - engine_cfg - .policies - .set_resources(ResourcesPolicy { core_allocation }); - } - if let Some(http_admin) = http_admin_bind_override(http_admin_bind) { - engine_cfg.engine.http_admin = Some(http_admin); - } -} - -/// Validates that every node in a pipeline references a component URN -/// that is registered in the `OTAP_PIPELINE_FACTORY`. -/// -/// Note: structural config validation (connections, node references, policies) -/// is already performed during config deserialization (`OtelDataflowSpec::from_file`). -/// This function adds the semantic check that all referenced components are actually -/// compiled into this binary, and validates their node-specific config statically. -/// -/// **Scope:** This is *static* validation only — it checks that the config values -/// can be deserialized into the expected types. It does **not** detect runtime -/// issues such as port conflicts, unreachable endpoints, missing files, or other -/// conditions that only manifest when the engine actually starts. -fn validate_pipeline_components( - pipeline_group_id: &PipelineGroupId, - pipeline_id: &PipelineId, - pipeline_cfg: &PipelineConfig, -) -> Result<(), Box> { - for (node_id, node_cfg) in pipeline_cfg.node_iter() { - let kind = node_cfg.kind(); - let urn_str = node_cfg.r#type.as_str(); - - let validate_config_fn = match kind { - NodeKind::Receiver => OTAP_PIPELINE_FACTORY - .get_receiver_factory_map() - .get(urn_str) - .map(|f| f.validate_config), - NodeKind::Processor | NodeKind::ProcessorChain => OTAP_PIPELINE_FACTORY - .get_processor_factory_map() - .get(urn_str) - .map(|f| f.validate_config), - NodeKind::Exporter => OTAP_PIPELINE_FACTORY - .get_exporter_factory_map() - .get(urn_str) - .map(|f| f.validate_config), - }; - - match validate_config_fn { - None => { - let kind_name = match kind { - NodeKind::Receiver => "receiver", - NodeKind::Processor | NodeKind::ProcessorChain => "processor", - NodeKind::Exporter => "exporter", - }; - return Err(std::io::Error::other(format!( - "Unknown {} component `{}` in pipeline_group={} pipeline={} node={}", - kind_name, - urn_str, - pipeline_group_id.as_ref(), - pipeline_id.as_ref(), - node_id.as_ref() - )) - .into()); - } - Some(validate_fn) => { - validate_fn(&node_cfg.config).map_err(|e| { - std::io::Error::other(format!( - "Invalid config for component `{}` in pipeline_group={} pipeline={} node={}: {}", - urn_str, - pipeline_group_id.as_ref(), - pipeline_id.as_ref(), - node_id.as_ref(), - e - )) - })?; - } - } - } - - Ok(()) -} - -fn validate_engine_components( - engine_cfg: &OtelDataflowSpec, -) -> Result<(), Box> { - for (pipeline_group_id, pipeline_group) in &engine_cfg.groups { - for (pipeline_id, pipeline_cfg) in &pipeline_group.pipelines { - validate_pipeline_components(pipeline_group_id, pipeline_id, pipeline_cfg)?; - } - } - - // Also validate the observability pipeline nodes, if configured. - if let Some(obs_pipeline) = &engine_cfg.engine.observability.pipeline { - let obs_group_id: PipelineGroupId = SYSTEM_PIPELINE_GROUP_ID.into(); - let obs_pipeline_id: PipelineId = SYSTEM_OBSERVABILITY_PIPELINE_ID.into(); - let obs_pipeline_config = obs_pipeline.clone().into_pipeline_config(); - validate_pipeline_components(&obs_group_id, &obs_pipeline_id, &obs_pipeline_config)?; - } - - Ok(()) -} - fn main() -> Result<(), Box> { // Install the rustls crypto provider selected by the crypto-* feature flag. // This must happen before any TLS connections (reqwest, tonic, etc.). @@ -306,16 +188,19 @@ fn main() -> Result<(), Box> { validate_and_exit, } = Args::parse(); - println!("{}", system_info()); + println!( + "{}", + startup::system_info(&OTAP_PIPELINE_FACTORY, memory_allocator_name()) + ); let resolved = resolve_config(config.as_deref())?; let mut engine_cfg = match resolved.format { ConfigFormat::Json => OtelDataflowSpec::from_json(&resolved.content)?, ConfigFormat::Yaml => OtelDataflowSpec::from_yaml(&resolved.content)?, }; - apply_cli_overrides(&mut engine_cfg, num_cores, core_id_range, http_admin_bind); + startup::apply_cli_overrides(&mut engine_cfg, num_cores, core_id_range, http_admin_bind); - validate_engine_components(&engine_cfg)?; + startup::validate_engine_components(&engine_cfg, &OTAP_PIPELINE_FACTORY)?; if validate_and_exit { println!("Configuration '{}' is valid.", resolved.source); @@ -336,115 +221,9 @@ fn main() -> Result<(), Box> { } } -fn system_info() -> String { - // Your custom logic here - this could read files, check system state, etc. - let available_cores = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - - let build_mode = if cfg!(debug_assertions) { - "debug" - } else { - "release" - }; - - let memory_allocator = if cfg!(feature = "mimalloc") { - "mimalloc" - } else if cfg!(all(feature = "jemalloc", not(windows))) { - "jemalloc" - } else { - "system" - }; - - let mut sys = System::new_all(); - sys.refresh_memory(); - let total_memory_gb = sys.total_memory() as f64 / 1_073_741_824.0; - let available_memory_gb = sys.available_memory() as f64 / 1_073_741_824.0; - - let debug_warning = if cfg!(debug_assertions) { - "\n\n⚠️ WARNING: This binary was compiled in debug mode. - Debug builds are NOT recommended for production, benchmarks, or performance testing. - Use 'cargo build --release' for optimal performance." - } else { - "" - }; - - // Get available OTAP components - let receivers: Vec<&str> = OTAP_PIPELINE_FACTORY - .get_receiver_factory_map() - .keys() - .copied() - .collect(); - let processors: Vec<&str> = OTAP_PIPELINE_FACTORY - .get_processor_factory_map() - .keys() - .copied() - .collect(); - let exporters: Vec<&str> = OTAP_PIPELINE_FACTORY - .get_exporter_factory_map() - .keys() - .copied() - .collect(); - - let mut receivers_sorted = receivers; - let mut processors_sorted = processors; - let mut exporters_sorted = exporters; - receivers_sorted.sort(); - processors_sorted.sort(); - exporters_sorted.sort(); - - format!( - "System Information: - Available CPU cores: {} - Available memory: {:.2} GB / {:.2} GB - Build mode: {} - Memory allocator: {} - -Available Component URNs: - Receivers: {} - Processors: {} - Exporters: {} - -Example configuration files can be found in the configs/ directory.{}", - available_cores, - available_memory_gb, - total_memory_gb, - build_mode, - memory_allocator, - receivers_sorted.join(", "), - processors_sorted.join(", "), - exporters_sorted.join(", "), - debug_warning - ) -} - #[cfg(test)] mod tests { use super::*; - use otap_df_config::policy::Policies; - - fn minimal_engine_yaml() -> &'static str { - r#" -version: otel_dataflow/v1 -engine: - http_admin: - bind_address: "127.0.0.1:18080" -groups: - default: - pipelines: - main: - nodes: - receiver: - type: "urn:test:receiver:example" - config: null - exporter: - type: "urn:test:exporter:example" - config: null - connections: - - from: receiver - to: exporter -"# - } #[test] fn parse_core_range_ok() { @@ -510,42 +289,6 @@ groups: ); } - #[test] - fn core_allocation_override_prefers_range() { - let range = CoreAllocation::CoreSet { - set: vec![CoreRange { start: 2, end: 4 }], - }; - let resolved = core_allocation_override(Some(3), Some(range.clone())); - assert_eq!(resolved, Some(range)); - } - - #[test] - fn core_allocation_override_maps_num_cores() { - assert_eq!( - core_allocation_override(Some(5), None), - Some(CoreAllocation::CoreCount { count: 5 }) - ); - assert_eq!( - core_allocation_override(Some(0), None), - Some(CoreAllocation::AllCores) - ); - assert_eq!(core_allocation_override(None, None), None); - } - - #[test] - fn http_admin_bind_override_sets_custom_bind() { - let settings = http_admin_bind_override(Some("0.0.0.0:18080".to_string())); - assert_eq!( - settings.map(|s| s.bind_address), - Some("0.0.0.0:18080".to_string()) - ); - } - - #[test] - fn http_admin_bind_override_none_keeps_config_value() { - assert!(http_admin_bind_override(None).is_none()); - } - #[test] fn parse_validate_and_exit_flag() { let args = Args::parse_from([ @@ -567,6 +310,9 @@ groups: #[test] fn validate_unknown_component_rejected() { + use otap_df_config::pipeline::PipelineConfig; + use otap_df_config::{PipelineGroupId, PipelineId}; + let pipeline_group_id: PipelineGroupId = "test_group".into(); let pipeline_id: PipelineId = "test_pipeline".into(); let yaml = r#" @@ -586,8 +332,13 @@ connections: PipelineConfig::from_yaml(pipeline_group_id.clone(), pipeline_id.clone(), yaml) .expect("pipeline YAML should parse"); - let err = validate_pipeline_components(&pipeline_group_id, &pipeline_id, &pipeline_cfg) - .expect_err("semantic component validation should fail"); + let err = startup::validate_pipeline_components( + &pipeline_group_id, + &pipeline_id, + &pipeline_cfg, + &OTAP_PIPELINE_FACTORY, + ) + .expect_err("semantic component validation should fail"); assert!(err.to_string().contains("Unknown receiver component")); } @@ -646,132 +397,4 @@ connections: ); assert_eq!(args.num_cores, None); } - - #[test] - fn apply_cli_overrides_updates_top_level_resources_and_http_admin() { - let mut cfg = - OtelDataflowSpec::from_yaml(minimal_engine_yaml()).expect("base config should parse"); - apply_cli_overrides(&mut cfg, Some(3), None, Some("0.0.0.0:28080".to_string())); - - assert_eq!( - Policies::resolve([&cfg.policies]).resources.core_allocation, - CoreAllocation::CoreCount { count: 3 } - ); - assert_eq!( - cfg.engine - .http_admin - .as_ref() - .map(|s| s.bind_address.as_str()), - Some("0.0.0.0:28080") - ); - - let resolved = cfg.resolve(); - let main = resolved - .pipelines - .iter() - .find(|p| p.pipeline_group_id.as_ref() == "default" && p.pipeline_id.as_ref() == "main") - .expect("default/main should exist"); - assert_eq!( - main.policies.resources.core_allocation, - CoreAllocation::CoreCount { count: 3 } - ); - } - - #[test] - fn apply_cli_overrides_only_changes_global_resources_policy() { - let yaml = r#" -version: otel_dataflow/v1 -policies: - resources: - core_allocation: - type: core_count - count: 9 -engine: {} -groups: - default: - policies: - resources: - core_allocation: - type: core_count - count: 5 - pipelines: - main: - nodes: - receiver: - type: "urn:test:receiver:example" - config: null - exporter: - type: "urn:test:exporter:example" - config: null - connections: - - from: receiver - to: exporter -"#; - let mut cfg = OtelDataflowSpec::from_yaml(yaml).expect("config should parse"); - apply_cli_overrides(&mut cfg, Some(2), None, None); - - // CLI updates top-level/global policy. - assert_eq!( - Policies::resolve([&cfg.policies]).resources.core_allocation, - CoreAllocation::CoreCount { count: 2 } - ); - - // Pipeline resolution keeps precedence (group-level over top-level). - let resolved = cfg.resolve(); - let main = resolved - .pipelines - .iter() - .find(|p| p.pipeline_group_id.as_ref() == "default" && p.pipeline_id.as_ref() == "main") - .expect("default/main should exist"); - assert_eq!( - main.policies.resources.core_allocation, - CoreAllocation::CoreCount { count: 5 } - ); - } - - /// Regression test for the bug where a group-level `policies:` block that - /// only configures `channel_capacity` (or another non-resources field) would - /// cause serde to fill `resources` with `AllCores` default, silently - /// shadowing a `--num-cores` CLI flag written to the top-level config. - #[test] - fn cli_num_cores_not_shadowed_by_implicit_default_resources() { - let yaml = r#" -version: otel_dataflow/v1 -engine: {} -groups: - default: - policies: - channel_capacity: - pdata: 500 - pipelines: - main: - nodes: - receiver: - type: "urn:test:receiver:example" - config: null - exporter: - type: "urn:test:exporter:example" - config: null - connections: - - from: receiver - to: exporter -"#; - let mut cfg = OtelDataflowSpec::from_yaml(yaml).expect("config should parse"); - // The group has a policies block (for channel_capacity) but no resources. - // Before the fix, serde would fill in resources=AllCores at the group level, - // and the resolver would return that instead of the CLI value. - apply_cli_overrides(&mut cfg, Some(4), None, None); - - let resolved = cfg.resolve(); - let main = resolved - .pipelines - .iter() - .find(|p| p.pipeline_group_id.as_ref() == "default" && p.pipeline_id.as_ref() == "main") - .expect("default/main should exist"); - assert_eq!( - main.policies.resources.core_allocation, - CoreAllocation::CoreCount { count: 4 }, - "--num-cores 4 must not be shadowed by an implicit group-level resources default" - ); - } }