diff --git a/rust/otap-dataflow/configs/fake-with-extension.yaml b/rust/otap-dataflow/configs/fake-with-extension.yaml new file mode 100644 index 0000000000..82e21b53aa --- /dev/null +++ b/rust/otap-dataflow/configs/fake-with-extension.yaml @@ -0,0 +1,36 @@ +version: otel_dataflow/v1 + +policies: + channel_capacity: + control: + node: 100 + pipeline: 100 + pdata: 100 + +engine: {} + +groups: + default: + pipelines: + main: + extensions: + sample_kv_store: + type: "urn:otap:extension:sample_shared_key_value_store" + + nodes: + generator: + type: "urn:otel:receiver:traffic_generator" + config: + traffic_config: + max_batch_size: 10 + signals_per_second: 10 + log_weight: 100 + + exporter: + type: "urn:otel:exporter:noop" + capabilities: + key_value_store: "sample_kv_store" + + connections: + - from: generator + to: exporter diff --git a/rust/otap-dataflow/crates/config/src/lib.rs b/rust/otap-dataflow/crates/config/src/lib.rs index 320b4cbf96..f5042eacbb 100644 --- a/rust/otap-dataflow/crates/config/src/lib.rs +++ b/rust/otap-dataflow/crates/config/src/lib.rs @@ -78,6 +78,9 @@ pub type PipelineId = Cow<'static, str>; /// The id of a node in the pipeline. pub type NodeId = Cow<'static, str>; +/// The id of a capability binding (e.g., "bearer_token_provider"). +pub type CapabilityId = Cow<'static, str>; + /// The URN of a node type. pub use node_urn::NodeUrn; diff --git a/rust/otap-dataflow/crates/config/src/node.rs b/rust/otap-dataflow/crates/config/src/node.rs index f9cf2b6614..26a7c655a2 100644 --- a/rust/otap-dataflow/crates/config/src/node.rs +++ b/rust/otap-dataflow/crates/config/src/node.rs @@ -11,13 +11,53 @@ use crate::error::Error; use crate::pipeline::telemetry::{AttributeValue, TelemetryAttribute}; use crate::transport_headers_policy::{HeaderCapturePolicy, HeaderPropagationPolicy}; -use crate::{Description, NodeUrn, PortName}; +use crate::{CapabilityId, Description, NodeId, NodeUrn, PortName}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::borrow::Cow; use std::collections::HashMap; +/// Deserializes a `HashMap` while rejecting duplicate keys. +/// +/// Standard serde deserialization into `HashMap` silently overwrites earlier +/// entries when keys are duplicated in the source. This function detects that +/// and returns an error so the user gets immediate feedback. +fn deserialize_no_dup_keys<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + use serde::de::{MapAccess, Visitor}; + use std::fmt; + + struct NoDupVisitor; + + impl<'de> Visitor<'de> for NoDupVisitor { + type Value = HashMap; + + fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("a map with no duplicate keys") + } + + fn visit_map>(self, mut map: A) -> Result { + let mut result = HashMap::new(); + while let Some((key, value)) = map.next_entry::()? { + if result.contains_key(key.as_str()) { + return Err(serde::de::Error::custom(format!( + "duplicate capability key '{key}'" + ))); + } + let _ = result.insert(CapabilityId::from(key), NodeId::from(value)); + } + Ok(result) + } + } + + deserializer.deserialize_map(NoDupVisitor) +} + /// User configuration for a node in the pipeline. #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] #[serde(deny_unknown_fields)] @@ -60,6 +100,23 @@ pub struct NodeUserConfig { #[schemars(extend("x-kubernetes-preserve-unknown-fields" = true))] pub config: Value, + /// Capability bindings mapping capability names to extension instance names. + /// + /// Each entry maps a capability (e.g., `bearer_token_provider`) to the name + /// of an extension instance declared in the pipeline's `extensions` section. + /// + /// Example: + /// ```yaml + /// capabilities: + /// bearer_token_provider: azure_auth + /// ``` + #[serde( + default, + skip_serializing_if = "HashMap::is_empty", + deserialize_with = "deserialize_no_dup_keys" + )] + pub capabilities: HashMap, + /// Entity configuration for the node. /// /// Currently, we support entity::extend::identity_attributes, for example: @@ -112,6 +169,8 @@ pub enum NodeKind { Processor, /// A sink of signals Exporter, + /// A provider of shared capabilities (e.g., auth, service discovery). + Extension, // ToDo(LQ) : Add more node kinds as needed. // A connector between two pipelines @@ -126,6 +185,7 @@ impl From for Cow<'static, str> { NodeKind::Receiver => "receiver".into(), NodeKind::Processor => "processor".into(), NodeKind::Exporter => "exporter".into(), + NodeKind::Extension => "extension".into(), NodeKind::ProcessorChain => "processor_chain".into(), } } @@ -145,6 +205,7 @@ impl NodeUserConfig { default_output: None, entity: None, config: Value::Null, + capabilities: HashMap::new(), header_capture: None, header_propagation: None, } @@ -163,6 +224,7 @@ impl NodeUserConfig { outputs: Vec::new(), default_output: None, config: Value::Null, + capabilities: HashMap::new(), header_capture: None, header_propagation: None, } @@ -181,6 +243,7 @@ impl NodeUserConfig { outputs: Vec::new(), default_output: None, config: Value::Null, + capabilities: HashMap::new(), header_capture: None, header_propagation: None, } @@ -196,6 +259,7 @@ impl NodeUserConfig { outputs: Vec::new(), default_output: None, config: user_config, + capabilities: HashMap::new(), header_capture: None, header_propagation: None, } @@ -228,6 +292,7 @@ impl NodeUserConfig { NodeKind::Processor => "processor", NodeKind::Exporter => "exporter", NodeKind::ProcessorChain => "processor_chain", + NodeKind::Extension => "extension", NodeKind::Receiver => unreachable!(), } ), @@ -243,6 +308,7 @@ impl NodeUserConfig { NodeKind::Receiver => "receiver", NodeKind::Processor => "processor", NodeKind::ProcessorChain => "processor_chain", + NodeKind::Extension => "extension", NodeKind::Exporter => unreachable!(), } ), @@ -518,6 +584,23 @@ config: ); } + #[test] + fn capabilities_rejects_duplicate_keys_yaml() { + let yaml = r#" +type: "urn:otel:exporter:test" +capabilities: + bearer_token_provider: ext_a + bearer_token_provider: ext_b +"#; + let result: Result = serde_yaml::from_str(yaml); + let err = result.expect_err("should reject duplicate capability keys"); + let msg = err.to_string(); + assert!( + msg.contains("duplicate"), + "error should mention duplicate: {msg}" + ); + } + #[test] fn header_capture_on_processor_is_rejected() { let mut cfg = NodeUserConfig::new_processor_config("processor:batch"); @@ -560,4 +643,22 @@ config: cfg.validate_transport_header_fields("test", &mut errors); assert!(errors.is_empty()); } + + #[test] + fn capabilities_rejects_duplicate_keys_json() { + let json = r#"{ + "type": "urn:otel:exporter:test", + "capabilities": { + "bearer_token_provider": "ext_a", + "bearer_token_provider": "ext_b" + } + }"#; + let result: Result = serde_json::from_str(json); + let err = result.expect_err("should reject duplicate capability keys"); + let msg = err.to_string(); + assert!( + msg.contains("duplicate"), + "error should mention duplicate: {msg}" + ); + } } diff --git a/rust/otap-dataflow/crates/config/src/node_urn.rs b/rust/otap-dataflow/crates/config/src/node_urn.rs index 39ce668a24..5df2c1cd6c 100644 --- a/rust/otap-dataflow/crates/config/src/node_urn.rs +++ b/rust/otap-dataflow/crates/config/src/node_urn.rs @@ -208,6 +208,7 @@ const fn kind_suffix(expected_kind: NodeKind) -> &'static str { NodeKind::Receiver => "receiver", NodeKind::Processor | NodeKind::ProcessorChain => "processor", NodeKind::Exporter => "exporter", + NodeKind::Extension => "extension", } } @@ -228,9 +229,12 @@ fn parse_kind(raw: &str, kind: &str) -> Result { "receiver" => Ok(NodeKind::Receiver), "processor" => Ok(NodeKind::Processor), "exporter" => Ok(NodeKind::Exporter), + "extension" => Ok(NodeKind::Extension), _ => Err(invalid_plugin_urn( raw, - format!("expected kind `receiver`, `processor`, or `exporter`, found `{kind}`"), + format!( + "expected kind `receiver`, `processor`, `exporter`, or `extension`, found `{kind}`" + ), )), } } diff --git a/rust/otap-dataflow/crates/config/src/pipeline.rs b/rust/otap-dataflow/crates/config/src/pipeline.rs index 71c268b2f5..bd2f340a85 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline.rs @@ -41,10 +41,22 @@ pub struct PipelineConfig { #[serde(default, skip_serializing_if = "Option::is_none")] policies: Option, - /// All nodes in this pipeline, keyed by node ID. + /// All data-path nodes in this pipeline, keyed by node ID. + /// + /// This includes receivers, processors, and exporters — but NOT extensions. + /// Extensions are configured in the sibling `extensions` section. #[serde(default)] nodes: PipelineNodes, + /// Pipeline extensions, keyed by extension ID. + /// + /// Extensions are long-lived components that run alongside the pipeline and + /// expose functionality (e.g., authentication, service discovery) to other + /// components. Unlike nodes, extensions do NOT participate in data-path + /// connections. + #[serde(default, skip_serializing_if = "PipelineNodes::is_empty")] + extensions: PipelineNodes, + /// Explicit graph connections between nodes. /// /// When provided, these connections are used as the authoritative topology for @@ -474,17 +486,28 @@ impl PipelineConfig { self.policies.as_ref() } - /// Returns a reference to the main pipeline nodes. + /// Returns a reference to the main pipeline nodes (receivers, processors, exporters). #[must_use] pub const fn nodes(&self) -> &PipelineNodes { &self.nodes } - /// Returns an iterator visiting all nodes in the pipeline. + /// Returns a reference to the pipeline extensions. + #[must_use] + pub const fn extensions(&self) -> &PipelineNodes { + &self.extensions + } + + /// Returns an iterator visiting all data-path nodes in the pipeline. pub fn node_iter(&self) -> impl Iterator)> { self.nodes.iter() } + /// Returns an iterator visiting all extension nodes in the pipeline. + pub fn extension_iter(&self) -> impl Iterator)> { + self.extensions.iter() + } + /// Returns true if the pipeline graph is defined with top-level connections. #[must_use] pub fn has_connections(&self) -> bool { @@ -496,11 +519,16 @@ impl PipelineConfig { self.connections.iter() } - /// Creates a consuming iterator over the nodes in the pipeline. + /// Creates a consuming iterator over the data-path nodes in the pipeline. pub fn node_into_iter(self) -> impl Iterator)> { self.nodes.into_iter() } + /// Creates a consuming iterator over the extensions in the pipeline. + pub fn extension_into_iter(self) -> impl Iterator)> { + self.extensions.into_iter() + } + /// Remove unconnected nodes from the main pipeline graph and return removed node descriptors. /// /// Connectivity is defined by top-level `connections`: @@ -526,6 +554,8 @@ impl PipelineConfig { !has_incoming || !has_outgoing } NodeKind::Exporter => !has_incoming, + // Extensions are in a separate section and never appear in `nodes`. + NodeKind::Extension => false, }; if should_remove { @@ -590,17 +620,20 @@ impl PipelineConfig { r#type: PipelineType::Otap, policies, nodes, + extensions: PipelineNodes::default(), connections, } } - /// Normalize plugin URNs for pipeline nodes. + /// Normalize plugin URNs for pipeline nodes and extensions. fn canonicalize_plugin_urns( &mut self, pipeline_group_id: &PipelineGroupId, pipeline_id: &PipelineId, ) -> Result<(), Error> { self.nodes + .canonicalize_plugin_urns(pipeline_group_id, pipeline_id)?; + self.extensions .canonicalize_plugin_urns(pipeline_group_id, pipeline_id) } @@ -631,6 +664,8 @@ impl PipelineConfig { &mut errors, ); + self.validate_capability_bindings(&mut errors); + if !errors.is_empty() { Err(Error::InvalidConfiguration { errors }) } else { @@ -638,6 +673,43 @@ impl PipelineConfig { } } + /// Validates that every capability binding references an extension that + /// exists in the `extensions:` section, and that extensions themselves + /// do not declare capability bindings (they provide capabilities, not + /// consume them). + fn validate_capability_bindings(&self, errors: &mut Vec) { + // Check that capability bindings on nodes reference valid extensions + for (node_id, node_config) in self.nodes.iter() { + for (capability_name, extension_name) in &node_config.capabilities { + if !self.extensions.contains_key(extension_name.as_ref()) { + errors.push(Error::InvalidUserConfig { + error: format!( + "Node '{}' binds capability '{}' to extension '{}', \ + but no extension with that name exists in the `extensions` section.", + node_id.as_ref(), + capability_name, + extension_name, + ), + }); + } + } + } + + // Check that extensions don't have capability bindings + for (ext_id, ext_config) in self.extensions.iter() { + if !ext_config.capabilities.is_empty() { + errors.push(Error::InvalidUserConfig { + error: format!( + "Extension '{}' has a `capabilities` section, but extensions \ + provide capabilities — they don't consume them. \ + Move capability bindings to the nodes that need them.", + ext_id.as_ref(), + ), + }); + } + } + } + fn validate_connections( &self, nodes: &PipelineNodes, @@ -884,6 +956,7 @@ fn prune_connection( pub struct PipelineConfigBuilder { description: Option, nodes: HashMap, + extensions: HashMap, duplicate_nodes: Vec, pending_connections: Vec, } @@ -902,6 +975,7 @@ impl PipelineConfigBuilder { Self { description: None, nodes: HashMap::new(), + extensions: HashMap::new(), duplicate_nodes: Vec::new(), pending_connections: Vec::new(), } @@ -936,6 +1010,7 @@ impl PipelineConfigBuilder { outputs: Vec::new(), default_output: None, config: config.unwrap_or(Value::Null), + capabilities: HashMap::new(), header_capture: None, header_propagation: None, }, @@ -974,6 +1049,36 @@ impl PipelineConfigBuilder { self.add_node(id, node_type, config) } + /// Add an extension (configured as a sibling to nodes, not as a node). + pub fn add_extension, U: Into>( + mut self, + id: S, + node_type: U, + config: Option, + ) -> Self { + let id = id.into(); + let node_type = node_type.into(); + if self.extensions.contains_key(&id) { + self.duplicate_nodes.push(id.clone()); + } else { + _ = self.extensions.insert( + id.clone(), + NodeUserConfig { + r#type: node_type, + description: None, + entity: None, + outputs: Vec::new(), + default_output: None, + config: config.unwrap_or(Value::Null), + header_capture: None, + header_propagation: None, + capabilities: HashMap::new(), + }, + ); + } + self + } + /// Connects a source node output port to one or more target nodes /// with a given dispatch policy. pub fn connect( @@ -1174,6 +1279,11 @@ impl PipelineConfigBuilder { .into_iter() .map(|(id, node)| (id, Arc::new(node))) .collect(), + extensions: self + .extensions + .into_iter() + .map(|(id, node)| (id, Arc::new(node))) + .collect(), connections: built_connections, policies: None, r#type: pipeline_type, @@ -2502,4 +2612,368 @@ sink: assert_eq!(removed_ids.len(), 3); assert!(config.connection_iter().next().is_none()); } + + // ── Extension config tests ────────────────────────────────────── + + #[test] + fn test_extensions_parsed_separately_from_nodes() { + let config = PipelineConfigBuilder::new() + .add_receiver("recv", "urn:test:receiver:example", None) + .add_exporter("exp", "urn:test:exporter:example", None) + .add_extension("auth", "urn:test:extension:auth", None) + .connect("recv", "", ["exp"], DispatchPolicy::Broadcast) + .build(PipelineType::Otap, "g", "p") + .unwrap(); + + // Extensions should NOT appear in node_iter + let node_names: Vec<_> = config.node_iter().map(|(id, _)| id.as_ref()).collect(); + assert!(node_names.contains(&"recv")); + assert!(node_names.contains(&"exp")); + assert!(!node_names.contains(&"auth")); + + // Extensions should appear in extension_iter + let ext_names: Vec<_> = config.extension_iter().map(|(id, _)| id.as_ref()).collect(); + assert_eq!(ext_names, vec!["auth"]); + } + + #[test] + fn test_extension_with_config_and_capabilities() { + let yaml = r#" +nodes: + receiver: + type: "urn:test:receiver:example" + exporter: + type: "urn:test:exporter:example" + capabilities: + bearer_token_provider: "auth" + +extensions: + auth: + type: "urn:test:extension:auth" + config: + method: "managed_identity" + scope: "https://example.com/.default" + +connections: + - from: receiver + to: exporter +"#; + let config = super::PipelineConfig::from_yaml("g".into(), "p".into(), yaml).unwrap(); + + // Extension parsed with config + let (_, ext_cfg) = config.extension_iter().next().unwrap(); + assert_eq!(ext_cfg.r#type.as_ref(), "urn:test:extension:auth"); + assert_eq!(ext_cfg.config["method"], "managed_identity"); + + // Exporter has capabilities binding + let (_, exp_cfg) = config + .node_iter() + .find(|(id, _)| id.as_ref() == "exporter") + .unwrap(); + assert_eq!( + exp_cfg.capabilities.get("bearer_token_provider"), + Some(&crate::NodeId::from("auth".to_string())) + ); + } + + #[test] + fn test_extension_kind_from_urn() { + use crate::node::NodeKind; + use crate::node_urn; + + let urn = + node_urn::validate_plugin_urn("urn:test:extension:auth", NodeKind::Extension).unwrap(); + assert_eq!(urn.kind(), NodeKind::Extension); + } + + #[test] + fn test_same_name_in_nodes_and_extensions_allowed() { + // Nodes and extensions are separate namespaces — same name is valid. + let config = PipelineConfigBuilder::new() + .add_receiver("myname", "urn:test:receiver:example", None) + .add_exporter("exp", "urn:test:exporter:example", None) + .add_extension("myname", "urn:test:extension:auth", None) + .connect("myname", "", ["exp"], DispatchPolicy::Broadcast) + .build(PipelineType::Otap, "g", "p") + .unwrap(); + + assert!(config.nodes().contains_key("myname")); + assert!(config.extensions().contains_key("myname")); + } + + #[test] + fn test_extension_urn_in_nodes_section_parsed_with_extension_kind() { + // Parsing does not enforce section/kind alignment. + // An extension URN under `nodes:` is accepted and keeps Extension kind. + let yaml = r#" +nodes: + auth: + type: "urn:test:extension:auth" + receiver: + type: "urn:test:receiver:example" + exporter: + type: "urn:test:exporter:example" + +connections: + - from: receiver + to: exporter +"#; + let config = super::PipelineConfig::from_yaml("g".into(), "p".into(), yaml).unwrap(); + + // The URN kind is preserved as Extension. + let (_, auth_cfg) = config + .node_iter() + .find(|(id, _)| id.as_ref() == "auth") + .unwrap(); + assert_eq!(auth_cfg.kind(), NodeKind::Extension); + // Any stricter enforcement belongs to downstream engine/runtime validation. + } + + #[test] + fn test_receiver_urn_in_extensions_section_parsed() { + // A receiver URN placed in the `extensions:` section parses + // but has receiver kind — the engine should reject this at + // build time since the extension factory won't find it. + let yaml = r#" +nodes: + receiver: + type: "urn:test:receiver:example" + exporter: + type: "urn:test:exporter:example" + +extensions: + misplaced: + type: "urn:test:receiver:example" + +connections: + - from: receiver + to: exporter +"#; + let config = super::PipelineConfig::from_yaml("g".into(), "p".into(), yaml).unwrap(); + + let (_, ext_cfg) = config.extension_iter().next().unwrap(); + // The URN parses as receiver kind even though it's in extensions + assert_eq!(ext_cfg.kind(), NodeKind::Receiver); + } + + #[test] + fn test_empty_extensions_section_allowed() { + let yaml = r#" +nodes: + receiver: + type: "urn:test:receiver:example" + exporter: + type: "urn:test:exporter:example" + +extensions: {} + +connections: + - from: receiver + to: exporter +"#; + let config = super::PipelineConfig::from_yaml("g".into(), "p".into(), yaml).unwrap(); + assert_eq!(config.extension_iter().count(), 0); + } + + #[test] + fn test_missing_extensions_section_allowed() { + let yaml = r#" +nodes: + receiver: + type: "urn:test:receiver:example" + exporter: + type: "urn:test:exporter:example" + +connections: + - from: receiver + to: exporter +"#; + let config = super::PipelineConfig::from_yaml("g".into(), "p".into(), yaml).unwrap(); + assert_eq!(config.extension_iter().count(), 0); + } + + #[test] + fn test_capabilities_empty_allowed() { + let yaml = r#" +nodes: + receiver: + type: "urn:test:receiver:example" + exporter: + type: "urn:test:exporter:example" + capabilities: {} + +connections: + - from: receiver + to: exporter +"#; + let config = super::PipelineConfig::from_yaml("g".into(), "p".into(), yaml).unwrap(); + let (_, exp_cfg) = config + .node_iter() + .find(|(id, _)| id.as_ref() == "exporter") + .unwrap(); + assert!(exp_cfg.capabilities.is_empty()); + } + + #[test] + fn test_multiple_extensions_parsed() { + let yaml = r#" +nodes: + receiver: + type: "urn:test:receiver:example" + exporter: + type: "urn:test:exporter:example" + capabilities: + bearer_token_provider: "auth" + key_value_store: "kv" + +extensions: + auth: + type: "urn:test:extension:auth" + config: + method: dev + kv: + type: "urn:test:extension:kv_store" + +connections: + - from: receiver + to: exporter +"#; + let config = super::PipelineConfig::from_yaml("g".into(), "p".into(), yaml).unwrap(); + + let ext_names: Vec<_> = config + .extension_iter() + .map(|(id, _)| id.as_ref().to_string()) + .collect(); + assert_eq!(ext_names.len(), 2); + assert!(ext_names.contains(&"auth".to_string())); + assert!(ext_names.contains(&"kv".to_string())); + + // Exporter binds two capabilities + let (_, exp_cfg) = config + .node_iter() + .find(|(id, _)| id.as_ref() == "exporter") + .unwrap(); + assert_eq!(exp_cfg.capabilities.len(), 2); + assert_eq!(exp_cfg.capabilities["bearer_token_provider"], "auth"); + assert_eq!(exp_cfg.capabilities["key_value_store"], "kv"); + } + + #[test] + fn test_capability_binding_to_nonexistent_extension_rejected() { + let yaml = r#" +nodes: + receiver: + type: "urn:test:receiver:example" + exporter: + type: "urn:test:exporter:example" + capabilities: + bearer_token_provider: "nonexistent_auth" + +connections: + - from: receiver + to: exporter +"#; + let result = super::PipelineConfig::from_yaml("g".into(), "p".into(), yaml); + match result { + Err(Error::InvalidConfiguration { errors }) => { + assert!( + errors.iter().any(|e| matches!( + e, + Error::InvalidUserConfig { error } if error.contains("nonexistent_auth") + )), + "expected error naming 'nonexistent_auth', got: {errors:?}" + ); + } + other => panic!("expected InvalidConfiguration, got {other:?}"), + } + } + + #[test] + fn test_capability_binding_to_existing_extension_passes() { + let yaml = r#" +nodes: + receiver: + type: "urn:test:receiver:example" + exporter: + type: "urn:test:exporter:example" + capabilities: + bearer_token_provider: "auth" + +extensions: + auth: + type: "urn:test:extension:auth" + +connections: + - from: receiver + to: exporter +"#; + let config = super::PipelineConfig::from_yaml("g".into(), "p".into(), yaml).unwrap(); + let result = config.validate(&"g".into(), &"p".into()); + assert!( + result.is_ok(), + "capability binding to existing extension should pass: {:?}", + result.err() + ); + } + + #[test] + fn test_capabilities_on_extension_rejected() { + let yaml = r#" +nodes: + receiver: + type: "urn:test:receiver:example" + exporter: + type: "urn:test:exporter:example" + +extensions: + auth: + type: "urn:test:extension:auth" + capabilities: + some_capability: "other_ext" + +connections: + - from: receiver + to: exporter +"#; + let result = super::PipelineConfig::from_yaml("g".into(), "p".into(), yaml); + match result { + Err(Error::InvalidConfiguration { errors }) => { + assert!( + errors.iter().any(|e| matches!( + e, + Error::InvalidUserConfig { error } if error.contains("provide capabilities") + )), + "expected error about extensions providing capabilities, got: {errors:?}" + ); + } + other => panic!("expected InvalidConfiguration, got {other:?}"), + } + } + + #[test] + fn test_extensions_at_group_level_rejected_by_serde() { + // PipelineGroupConfig uses #[serde(deny_unknown_fields)], + // so `extensions:` at the group level is rejected by deserialization. + let yaml = r#" +pipelines: + main: + nodes: + recv: + type: "urn:test:receiver:example" + exp: + type: "urn:test:exporter:example" + connections: + - from: recv + to: exp +extensions: + auth: + type: "urn:test:extension:auth" +"#; + let result: Result = + serde_yaml::from_str(yaml); + assert!( + result.is_err(), + "extensions at group level should be rejected by serde" + ); + } } diff --git a/rust/otap-dataflow/crates/controller/src/startup.rs b/rust/otap-dataflow/crates/controller/src/startup.rs index 78d52377fc..aa0bea21f4 100644 --- a/rust/otap-dataflow/crates/controller/src/startup.rs +++ b/rust/otap-dataflow/crates/controller/src/startup.rs @@ -111,6 +111,13 @@ pub fn validate_pipeline_components( .get_exporter_factory_map() .get(urn_str) .map(|f| f.validate_config), + NodeKind::Extension => { + // Extensions are not yet validated here because PipelineFactory + // does not have an extension factory registry. Once one is added, + // this should look up and validate extension configs similarly to + // receivers/processors/exporters. + continue; + } }; match validate_config_fn { @@ -119,6 +126,7 @@ pub fn validate_pipeline_components( NodeKind::Receiver => "receiver", NodeKind::Processor | NodeKind::ProcessorChain => "processor", NodeKind::Exporter => "exporter", + NodeKind::Extension => unreachable!("handled above"), }; return Err(std::io::Error::other(format!( "Unknown {} component `{}` in pipeline_group={} pipeline={} node={}", diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/fanout_processor/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/fanout_processor/mod.rs index 908932bd9c..96ef7173ef 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/fanout_processor/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/fanout_processor/mod.rs @@ -1266,6 +1266,7 @@ mod tests { "await_ack": await_ack, "destinations": destinations_cfg, }), + capabilities: HashMap::new(), header_capture: None, header_propagation: None, }; @@ -1324,6 +1325,7 @@ mod tests { ], "await_ack": "primary" }), + capabilities: HashMap::new(), header_capture: None, header_propagation: None, } @@ -1374,6 +1376,7 @@ mod tests { outputs: (0..65).map(|i| PortName::from(format!("p{i}"))).collect(), default_output: None, config: json!({}), + capabilities: HashMap::new(), header_capture: None, header_propagation: None, }; @@ -1413,6 +1416,7 @@ mod tests { outputs: vec!["p1".into(), "p2".into()], default_output: None, config: json!({}), + capabilities: HashMap::new(), header_capture: None, header_propagation: None, }; @@ -1445,6 +1449,7 @@ mod tests { outputs: vec!["p1".into(), "p2".into()], default_output: None, config: json!({}), + capabilities: HashMap::new(), header_capture: None, header_propagation: None, }; @@ -1469,6 +1474,7 @@ mod tests { entity: None, default_output: None, config: json!({}), + capabilities: HashMap::new(), header_capture: None, header_propagation: None, }; @@ -1504,6 +1510,7 @@ mod tests { outputs: vec!["primary".into(), "backup".into()], default_output: None, config: json!({}), + capabilities: HashMap::new(), header_capture: None, header_propagation: None, }; @@ -1538,6 +1545,7 @@ mod tests { outputs: vec!["dest".into()], default_output: None, config: json!({}), + capabilities: HashMap::new(), header_capture: None, header_propagation: None, }; @@ -1585,6 +1593,7 @@ mod tests { outputs: vec!["primary".into(), "a".into(), "b".into()], default_output: None, config: json!({}), + capabilities: HashMap::new(), header_capture: None, header_propagation: None, }; @@ -1628,6 +1637,7 @@ mod tests { outputs: vec!["primary".into(), "fb1".into(), "fb2".into()], default_output: None, config: json!({}), + capabilities: HashMap::new(), header_capture: None, header_propagation: None, }; @@ -1680,6 +1690,7 @@ mod tests { ], "await_ack": "primary" }), + capabilities: HashMap::new(), header_capture: None, header_propagation: None, }; @@ -2543,6 +2554,7 @@ mod tests { outputs: outputs.clone(), default_output: None, config, + capabilities: HashMap::new(), header_capture: None, header_propagation: None, }; diff --git a/rust/otap-dataflow/crates/engine/src/config.rs b/rust/otap-dataflow/crates/engine/src/config.rs index d1b7d3a326..90a658c08f 100644 --- a/rust/otap-dataflow/crates/engine/src/config.rs +++ b/rust/otap-dataflow/crates/engine/src/config.rs @@ -63,6 +63,17 @@ pub struct ExporterConfig { pub input_pdata_channel: PdataChannelConfig, } +/// Runtime configuration for an extension. +/// +/// Extensions only have a control channel — they do not process pipeline data. +#[derive(Clone, Debug)] +pub struct ExtensionConfig { + /// Name of the extension. + pub name: NodeId, + /// Configuration for control channel. + pub control_channel: ControlChannelConfig, +} + impl ReceiverConfig { /// Creates a new receiver configuration with default channel capacities. pub fn new(name: T) -> Self @@ -171,3 +182,28 @@ impl ExporterConfig { } } } + +impl ExtensionConfig { + /// Creates a new extension configuration with default channel capacities. + #[must_use] + pub fn new(name: T) -> Self + where + T: Into, + { + Self::with_control_channel_capacity(name, DEFAULT_CONTROL_CHANNEL_CAPACITY) + } + + /// Creates a new extension configuration with explicit control channel capacity. + #[must_use] + pub fn with_control_channel_capacity(name: T, control_channel_capacity: usize) -> Self + where + T: Into, + { + ExtensionConfig { + name: name.into(), + control_channel: ControlChannelConfig { + capacity: control_channel_capacity, + }, + } + } +} diff --git a/rust/otap-dataflow/crates/engine/src/error.rs b/rust/otap-dataflow/crates/engine/src/error.rs index a6423d54b1..dca0ca388c 100644 --- a/rust/otap-dataflow/crates/engine/src/error.rs +++ b/rust/otap-dataflow/crates/engine/src/error.rs @@ -323,6 +323,13 @@ pub enum Error { plugin_urn: NodeUrn, }, + /// An extension was placed in the `nodes` section instead of `extensions`. + #[error("Extension `{node}` was placed in `nodes` but belongs in the `extensions` section")] + ExtensionInNodesSection { + /// The node name that was misconfigured. + node: NodeName, + }, + /// Unknown node. #[error("Unknown node `{node}`")] UnknownNode { @@ -503,6 +510,7 @@ impl Error { Error::SpmcSharedNotSupported { .. } => "SpmcSharedNotSupported", Error::TooManyNodes {} => "TooManyNodes", Error::UnknownExporter { .. } => "UnknownExporter", + Error::ExtensionInNodesSection { .. } => "ExtensionInNodesSection", Error::UnknownNode { .. } => "UnknownNode", Error::UnknownOutputPort { .. } => "UnknownOutputPort", Error::UnknownProcessor { .. } => "UnknownProcessor", diff --git a/rust/otap-dataflow/crates/engine/src/lib.rs b/rust/otap-dataflow/crates/engine/src/lib.rs index 94d9eb2292..11e73d2a6e 100644 --- a/rust/otap-dataflow/crates/engine/src/lib.rs +++ b/rust/otap-dataflow/crates/engine/src/lib.rs @@ -627,6 +627,9 @@ impl PipelineFactory { kind: "ProcessorChain".into(), }); } + otap_df_config::node::NodeKind::Extension => { + return Err(Error::ExtensionInNodesSection { node: name.clone() }); + } }; let node_id = build_state.next_node_id(name.clone(), node_type, pipe_node)?; let _ = node_ids.insert(name.clone(), node_id); @@ -726,6 +729,9 @@ impl PipelineFactory { // ToDo(LQ): Implement processor chain optimization to eliminate intermediary channels. unreachable!("rejected in first pass"); } + otap_df_config::node::NodeKind::Extension => { + return Err(Error::ExtensionInNodesSection { node: name.clone() }); + } } } @@ -818,6 +824,10 @@ impl PipelineFactory { kind: "ProcessorChain".into(), }); } + otap_df_config::node::NodeKind::Extension => { + // Extensions don't participate in wiring contracts. + continue; + } }; _ = contracts_by_node.insert(node_name.as_ref().to_string().into(), contract); diff --git a/rust/otap-dataflow/src/main.rs b/rust/otap-dataflow/src/main.rs index a1168e1255..ac58560c62 100644 --- a/rust/otap-dataflow/src/main.rs +++ b/rust/otap-dataflow/src/main.rs @@ -155,8 +155,8 @@ fn parse_core_id_allocation(s: &str) -> Result { fn parse_core_id_range(s: &str) -> Result { // Accept formats: "a..=b", "a..b", "a-b" let normalized = s.replace("..=", "-").replace("..", "-"); - let mut parts = normalized.split('-'); - let start = parts + let mut parts: std::str::Split<'_, char> = normalized.split('-'); + let start: usize = parts .next() .ok_or_else(|| "missing start of core id range".to_string())? .trim()