diff --git a/rust/otap-dataflow/crates/config/src/engine.rs b/rust/otap-dataflow/crates/config/src/engine.rs index bdfe3c0fb4..c0022dc64f 100644 --- a/rust/otap-dataflow/crates/config/src/engine.rs +++ b/rust/otap-dataflow/crates/config/src/engine.rs @@ -154,6 +154,7 @@ impl EngineObservabilityPolicies { health: self.health, telemetry: self.telemetry, resources: None, + transport_headers: None, } } @@ -1452,6 +1453,229 @@ groups: assert!(parsed.groups.contains_key("default")); } + #[test] + fn resolve_transport_headers_pipeline_overrides_group() { + let yaml = r#" +version: otel_dataflow/v1 +engine: {} +groups: + default: + policies: + transport_headers: + header_capture: + headers: + - match_names: ["x-group-header"] + pipelines: + with_override: + policies: + transport_headers: + header_capture: + headers: + - match_names: ["x-pipeline-header"] + nodes: + receiver: + type: "urn:test:receiver:example" + config: null + exporter: + type: "urn:test:exporter:example" + config: null + connections: + - from: receiver + to: exporter +"#; + let config = OtelDataflowSpec::from_yaml(yaml).expect("should parse"); + let resolved = config.resolve(); + + let pipeline = resolved + .pipelines + .iter() + .find(|p| p.pipeline_id.as_ref() == "with_override") + .expect("pipeline should be resolved"); + + // Pipeline-level transport_headers should win over group-level. + let policy = pipeline + .policies + .transport_headers + .as_ref() + .expect("transport_headers should be resolved"); + assert_eq!(policy.header_capture.headers.len(), 1); + assert_eq!( + policy.header_capture.headers[0].match_names, + vec!["x-pipeline-header"] + ); + } + + #[test] + fn resolve_transport_headers_group_overrides_engine() { + let yaml = r#" +version: otel_dataflow/v1 +policies: + transport_headers: + header_capture: + headers: + - match_names: ["x-engine-header"] +engine: {} +groups: + default: + policies: + transport_headers: + header_capture: + headers: + - match_names: ["x-group-header"] + pipelines: + main: + nodes: + receiver: + type: "urn:test:receiver:example" + config: null + exporter: + type: "urn:test:exporter:example" + config: null + connections: + - from: receiver + to: exporter +"#; + let config = OtelDataflowSpec::from_yaml(yaml).expect("should parse"); + let resolved = config.resolve(); + + let pipeline = resolved + .pipelines + .iter() + .find(|p| p.pipeline_id.as_ref() == "main") + .expect("pipeline should be resolved"); + + // Group-level transport_headers should win over engine-level. + let policy = pipeline + .policies + .transport_headers + .as_ref() + .expect("transport_headers should be resolved"); + assert_eq!(policy.header_capture.headers.len(), 1); + assert_eq!( + policy.header_capture.headers[0].match_names, + vec!["x-group-header"] + ); + } + + #[test] + fn resolve_transport_headers_inherits_from_engine() { + let yaml = r#" +version: otel_dataflow/v1 +policies: + transport_headers: + header_capture: + headers: + - match_names: ["x-engine-header"] + header_propagation: + default: + selector: all_captured +engine: {} +groups: + default: + pipelines: + main: + nodes: + receiver: + type: "urn:test:receiver:example" + config: null + exporter: + type: "urn:test:exporter:example" + config: null + connections: + - from: receiver + to: exporter +"#; + let config = OtelDataflowSpec::from_yaml(yaml).expect("should parse"); + let resolved = config.resolve(); + + let pipeline = resolved + .pipelines + .iter() + .find(|p| p.pipeline_id.as_ref() == "main") + .expect("pipeline should be resolved"); + + // Pipeline and group don't define transport_headers, so engine-level + // should be inherited. + let policy = pipeline + .policies + .transport_headers + .as_ref() + .expect("transport_headers should be inherited from engine level"); + assert_eq!(policy.header_capture.headers.len(), 1); + assert_eq!( + policy.header_capture.headers[0].match_names, + vec!["x-engine-header"] + ); + assert_eq!( + policy.header_propagation.default.selector, + crate::transport_headers_policy::PropagationSelector::AllCaptured + ); + } + + #[test] + fn resolve_observability_pipeline_has_no_transport_headers() { + let yaml = r#" +version: otel_dataflow/v1 +policies: + transport_headers: + header_capture: + headers: + - match_names: ["x-engine-header"] +engine: + observability: + pipeline: + nodes: + itr: + type: "urn:otel:receiver:internal_telemetry" + config: {} + sink: + type: "urn:otel:exporter:console" + config: {} + connections: + - from: itr + to: sink +groups: + default: + pipelines: + main: + nodes: + receiver: + type: "urn:test:receiver:example" + config: null + exporter: + type: "urn:test:exporter:example" + config: null + connections: + - from: receiver + to: exporter +"#; + let config = OtelDataflowSpec::from_yaml(yaml).expect("should parse"); + let resolved = config.resolve(); + + // The observability pipeline should NOT inherit transport_headers from + // the engine level (it's explicitly set to None during resolution). + let obs = resolved + .pipelines + .iter() + .find(|p| p.role == ResolvedPipelineRole::ObservabilityInternal) + .expect("observability pipeline should be resolved"); + assert!( + obs.policies.transport_headers.is_none(), + "observability pipeline should not have transport_headers" + ); + + // Regular pipelines should still inherit engine-level transport_headers. + let main = resolved + .pipelines + .iter() + .find(|p| p.pipeline_id.as_ref() == "main") + .expect("main pipeline should be resolved"); + assert!( + main.policies.transport_headers.is_some(), + "regular pipelines should inherit transport_headers from engine level" + ); + } + #[test] fn bundled_configs_parse_as_engine_configs() { let mut dirs = vec![PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../configs")]; diff --git a/rust/otap-dataflow/crates/config/src/engine/resolve.rs b/rust/otap-dataflow/crates/config/src/engine/resolve.rs index 116d197a2b..8865c99896 100644 --- a/rust/otap-dataflow/crates/config/src/engine/resolve.rs +++ b/rust/otap-dataflow/crates/config/src/engine/resolve.rs @@ -133,8 +133,10 @@ impl OtelDataflowSpec { .map(|p| p.clone().into_policies()) .unwrap_or_default(); let mut policies = Policies::resolve([&obs_as_policies, &self.policies]); - // Observability pipelines use default resources. + // Observability pipelines use default resources and do not + // capture/propagate transport headers. policies.resources = ResourcesPolicy::default(); + policies.transport_headers = None; pipelines.push(ResolvedPipelineConfig { pipeline_group_id: SYSTEM_PIPELINE_GROUP_ID.into(), pipeline_id: SYSTEM_OBSERVABILITY_PIPELINE_ID.into(), diff --git a/rust/otap-dataflow/crates/config/src/lib.rs b/rust/otap-dataflow/crates/config/src/lib.rs index f50483be82..320b4cbf96 100644 --- a/rust/otap-dataflow/crates/config/src/lib.rs +++ b/rust/otap-dataflow/crates/config/src/lib.rs @@ -36,6 +36,10 @@ pub mod settings; /// TLS configuration. pub mod tls; pub mod topic; +/// Transport header core types and capture/propagation engines. +pub mod transport_headers; +/// Transport header capture and propagation policy declarations. +pub mod transport_headers_policy; pub use topic::{ SubscriptionGroupName, TopicAckPropagationMode, TopicAckPropagationPolicies, TopicBackendKind, TopicBroadcastOnLagPolicy, TopicImplSelectionPolicy, TopicName, diff --git a/rust/otap-dataflow/crates/config/src/node.rs b/rust/otap-dataflow/crates/config/src/node.rs index 3e96a83d80..f9cf2b6614 100644 --- a/rust/otap-dataflow/crates/config/src/node.rs +++ b/rust/otap-dataflow/crates/config/src/node.rs @@ -8,7 +8,9 @@ //! //! A node can expose multiple named output ports. +use crate::error::Error; use crate::pipeline::telemetry::{AttributeValue, TelemetryAttribute}; +use crate::transport_headers_policy::{HeaderCapturePolicy, HeaderPropagationPolicy}; use crate::{Description, NodeUrn, PortName}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -75,6 +77,28 @@ pub struct NodeUserConfig { /// ``` #[serde(default, skip_serializing_if = "Option::is_none")] pub entity: Option, + + /// Node-level header capture policy override (receivers only). + /// + /// When set on a receiver node, this policy **fully replaces** the + /// pipeline-level `transport_headers.header_capture` policy for this + /// node. When absent, the pipeline-level policy applies. + /// + /// Setting this field on a processor or exporter node is a + /// configuration error. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub header_capture: Option, + + /// Node-level header propagation policy override (exporters only). + /// + /// When set on an exporter node, this policy **fully replaces** the + /// pipeline-level `transport_headers.header_propagation` policy for this + /// node. When absent, the pipeline-level policy applies. + /// + /// Setting this field on a processor or receiver node is a + /// configuration error. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub header_propagation: Option, } /// Node kinds @@ -121,6 +145,8 @@ impl NodeUserConfig { default_output: None, entity: None, config: Value::Null, + header_capture: None, + header_propagation: None, } } @@ -137,6 +163,8 @@ impl NodeUserConfig { outputs: Vec::new(), default_output: None, config: Value::Null, + header_capture: None, + header_propagation: None, } } @@ -153,6 +181,8 @@ impl NodeUserConfig { outputs: Vec::new(), default_output: None, config: Value::Null, + header_capture: None, + header_propagation: None, } } @@ -166,6 +196,8 @@ impl NodeUserConfig { outputs: Vec::new(), default_output: None, config: user_config, + header_capture: None, + header_propagation: None, } } @@ -180,6 +212,44 @@ impl NodeUserConfig { .unwrap_or_default() } + /// Validates transport header policy fields on this node and pushes any + /// errors into the provided vector. Receivers may only declare + /// `header_capture`; exporters may only declare `header_propagation`; + /// processors may declare neither. + pub fn validate_transport_header_fields(&self, node_name: &str, errors: &mut Vec) { + let kind = self.kind(); + + if self.header_capture.is_some() && kind != NodeKind::Receiver { + errors.push(Error::InvalidUserConfig { + error: format!( + "node `{node_name}`: `header_capture` is only allowed on receiver nodes \ + (this node is a {kind})", + kind = match kind { + NodeKind::Processor => "processor", + NodeKind::Exporter => "exporter", + NodeKind::ProcessorChain => "processor_chain", + NodeKind::Receiver => unreachable!(), + } + ), + }); + } + + if self.header_propagation.is_some() && kind != NodeKind::Exporter { + errors.push(Error::InvalidUserConfig { + error: format!( + "node `{node_name}`: `header_propagation` is only allowed on exporter nodes \ + (this node is a {kind})", + kind = match kind { + NodeKind::Receiver => "receiver", + NodeKind::Processor => "processor", + NodeKind::ProcessorChain => "processor_chain", + NodeKind::Exporter => unreachable!(), + } + ), + }); + } + } + /// Adds an output port to this node declaration. pub fn add_output>(&mut self, port_name: P) { let port_name: PortName = port_name.into(); @@ -386,4 +456,108 @@ config: {} assert!(cfg.entity.is_some()); assert!(cfg.identity_attributes().is_empty()); } + + // -- Transport header node-level override tests -- + + #[test] + fn receiver_with_header_capture_override() { + let yaml = r#" +type: "receiver:otap" +header_capture: + headers: + - match_names: ["x-request-id"] + store_as: request_id +config: + listening_addr: "0.0.0.0:50051" +"#; + let cfg: NodeUserConfig = serde_yaml::from_str(yaml).unwrap(); + assert!(matches!(cfg.kind(), NodeKind::Receiver)); + assert!(cfg.header_capture.is_some()); + assert!(cfg.header_propagation.is_none()); + + // No validation errors for receiver + header_capture + let mut errors = Vec::new(); + cfg.validate_transport_header_fields("test_node", &mut errors); + assert!(errors.is_empty()); + + let capture = cfg.header_capture.as_ref().unwrap(); + assert_eq!(capture.headers.len(), 1); + assert_eq!(capture.headers[0].match_names, vec!["x-request-id"]); + assert_eq!(capture.headers[0].store_as.as_deref(), Some("request_id")); + } + + #[test] + fn exporter_with_header_propagation_override() { + let yaml = r#" +type: "exporter:otap" +header_propagation: + default: + selector: all_captured + overrides: + - match: + stored_names: ["authorization"] + action: drop +config: + grpc_endpoint: "http://127.0.0.1:50051" +"#; + let cfg: NodeUserConfig = serde_yaml::from_str(yaml).unwrap(); + assert!(matches!(cfg.kind(), NodeKind::Exporter)); + assert!(cfg.header_capture.is_none()); + assert!(cfg.header_propagation.is_some()); + + // No validation errors for exporter + header_propagation + let mut errors = Vec::new(); + cfg.validate_transport_header_fields("test_node", &mut errors); + assert!(errors.is_empty()); + + let propagation = cfg.header_propagation.as_ref().unwrap(); + assert_eq!(propagation.overrides.len(), 1); + assert_eq!( + propagation.overrides[0].match_rule.stored_names, + vec!["authorization"] + ); + } + + #[test] + fn header_capture_on_processor_is_rejected() { + let mut cfg = NodeUserConfig::new_processor_config("processor:batch"); + cfg.header_capture = Some(HeaderCapturePolicy::default()); + let mut errors = Vec::new(); + cfg.validate_transport_header_fields("batch", &mut errors); + assert_eq!(errors.len(), 1); + assert!(errors[0].to_string().contains("header_capture")); + assert!(errors[0].to_string().contains("processor")); + } + + #[test] + fn header_capture_on_exporter_is_rejected() { + let mut cfg = NodeUserConfig::new_exporter_config("exporter:otap"); + cfg.header_capture = Some(HeaderCapturePolicy::default()); + let mut errors = Vec::new(); + cfg.validate_transport_header_fields("otap_export", &mut errors); + assert_eq!(errors.len(), 1); + assert!(errors[0].to_string().contains("header_capture")); + assert!(errors[0].to_string().contains("exporter")); + } + + #[test] + fn header_propagation_on_receiver_is_rejected() { + let mut cfg = NodeUserConfig::new_receiver_config("receiver:otap"); + cfg.header_propagation = Some(HeaderPropagationPolicy::default()); + let mut errors = Vec::new(); + cfg.validate_transport_header_fields("otap_ingest", &mut errors); + assert_eq!(errors.len(), 1); + assert!(errors[0].to_string().contains("header_propagation")); + assert!(errors[0].to_string().contains("receiver")); + } + + #[test] + fn receiver_without_override_has_no_validation_errors() { + let cfg = NodeUserConfig::new_receiver_config("receiver:otap"); + assert!(cfg.header_capture.is_none()); + assert!(cfg.header_propagation.is_none()); + let mut errors = Vec::new(); + cfg.validate_transport_header_fields("test", &mut errors); + assert!(errors.is_empty()); + } } diff --git a/rust/otap-dataflow/crates/config/src/pipeline.rs b/rust/otap-dataflow/crates/config/src/pipeline.rs index e73d2ea62b..71c268b2f5 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline.rs @@ -610,6 +610,7 @@ impl PipelineConfig { /// - Duplicate node IDs /// - Duplicate output ports (same source node + port name) /// - Invalid hyper-edges (missing source or target nodes) + /// - Invalid node-level transport header policy overrides pub fn validate( &self, pipeline_group_id: &PipelineGroupId, @@ -617,6 +618,11 @@ impl PipelineConfig { ) -> Result<(), Error> { let mut errors = Vec::new(); + // Validate node-level transport header policy fields. + for (node_name, node_config) in self.nodes.iter() { + node_config.validate_transport_header_fields(node_name, &mut errors); + } + self.validate_connections( &self.nodes, &self.connections, @@ -930,6 +936,8 @@ impl PipelineConfigBuilder { outputs: Vec::new(), default_output: None, config: config.unwrap_or(Value::Null), + header_capture: None, + header_propagation: None, }, ); } @@ -2367,6 +2375,109 @@ sink: ); } + #[test] + fn test_header_capture_on_exporter_rejected_by_validate() { + let yaml = r#" + nodes: + recv: + type: "receiver:otap" + config: {} + exp: + type: "exporter:otap" + header_capture: + headers: + - match_names: ["x-tenant-id"] + config: {} + connections: + - from: recv + to: exp + "#; + let result = super::PipelineConfig::from_yaml("g".into(), "p".into(), yaml); + match result { + Err(Error::InvalidConfiguration { errors }) => { + let msgs: Vec<_> = errors + .iter() + .filter_map(|e| match e { + Error::InvalidUserConfig { error } => Some(error.as_str()), + _ => None, + }) + .collect(); + assert!( + msgs.iter().any(|m| m.contains("header_capture")), + "expected a header_capture validation error, got: {msgs:?}" + ); + } + other => panic!( + "expected Err(InvalidConfiguration) with header_capture error, got {other:?}" + ), + } + } + + #[test] + fn test_header_propagation_on_receiver_rejected_by_validate() { + let yaml = r#" + nodes: + recv: + type: "receiver:otap" + header_propagation: + default: + selector: all_captured + config: {} + exp: + type: "exporter:otap" + config: {} + connections: + - from: recv + to: exp + "#; + let result = super::PipelineConfig::from_yaml("g".into(), "p".into(), yaml); + match result { + Err(Error::InvalidConfiguration { errors }) => { + let msgs: Vec<_> = errors + .iter() + .filter_map(|e| match e { + Error::InvalidUserConfig { error } => Some(error.as_str()), + _ => None, + }) + .collect(); + assert!( + msgs.iter().any(|m| m.contains("header_propagation")), + "expected a header_propagation validation error, got: {msgs:?}" + ); + } + other => panic!( + "expected Err(InvalidConfiguration) with header_propagation error, got {other:?}" + ), + } + } + + #[test] + fn test_valid_header_overrides_pass_validation() { + let yaml = r#" + nodes: + recv: + type: "receiver:otap" + header_capture: + headers: + - match_names: ["x-tenant-id"] + config: {} + exp: + type: "exporter:otap" + header_propagation: + default: + selector: all_captured + config: {} + connections: + - from: recv + to: exp + "#; + let result = super::PipelineConfig::from_yaml("g".into(), "p".into(), yaml); + assert!( + result.is_ok(), + "expected valid pipeline with correct header overrides, got: {result:?}" + ); + } + #[test] fn test_remove_unconnected_prunes_connections_for_removed_nodes() { let yaml = r#" diff --git a/rust/otap-dataflow/crates/config/src/policy.rs b/rust/otap-dataflow/crates/config/src/policy.rs index 7da02414a8..bfb9c8238b 100644 --- a/rust/otap-dataflow/crates/config/src/policy.rs +++ b/rust/otap-dataflow/crates/config/src/policy.rs @@ -4,6 +4,7 @@ //! Engine and pipeline policy declarations. use crate::health::HealthPolicy; +use crate::transport_headers_policy::TransportHeadersPolicy; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::fmt::Display; @@ -36,6 +37,13 @@ pub struct Policies { /// applies. #[serde(default, skip_serializing_if = "Option::is_none")] pub(crate) resources: Option, + /// Transport headers policy controlling header capture at receivers + /// and propagation at exporters. + /// + /// When absent, transport headers are not captured or propagated + /// (the feature is entirely opt-in). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(crate) transport_headers: Option, } impl Policies { @@ -51,6 +59,7 @@ impl Policies { let mut health = None; let mut telemetry = None; let mut resources = None; + let mut transport_headers = None; for scope in scopes { if channel_capacity.is_none() { channel_capacity = scope.channel_capacity.as_ref(); @@ -64,13 +73,16 @@ impl Policies { if resources.is_none() { resources = scope.resources.as_ref(); } + if transport_headers.is_none() { + transport_headers = scope.transport_headers.as_ref(); + } } ResolvedPolicies { channel_capacity: channel_capacity.cloned().unwrap_or_default(), health: health.cloned().unwrap_or_default(), - telemetry: telemetry.cloned().unwrap_or_default(), resources: resources.cloned().unwrap_or_default(), + transport_headers: transport_headers.cloned(), } } @@ -116,6 +128,9 @@ pub struct ResolvedPolicies { pub telemetry: TelemetryPolicy, /// Resources policy. pub resources: ResourcesPolicy, + /// Transport headers policy. `None` when the feature is not configured + /// (opt-in only -- no headers are captured or propagated by default). + pub transport_headers: Option, } /// instrumentation overhead. #[derive( diff --git a/rust/otap-dataflow/crates/config/src/transport_headers.rs b/rust/otap-dataflow/crates/config/src/transport_headers.rs new file mode 100644 index 0000000000..5a91f95cd5 --- /dev/null +++ b/rust/otap-dataflow/crates/config/src/transport_headers.rs @@ -0,0 +1,457 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Protocol-neutral transport header abstraction for end-to-end header +//! propagation through the pipeline. +//! +//! Transport headers represent request-scoped metadata captured from inbound +//! transport protocols (gRPC metadata, HTTP headers) and carried through the +//! pipeline context. +//! +//! The abstraction preserves: +//! - Duplicate header names (multiple entries with the same logical name) +//! - Binary values (e.g. gRPC binary metadata with `-bin` suffix) +//! - Original wire names for lossless round-tripping +//! - Normalized logical names for policy matching + +use std::fmt; + +/// Kind of header value. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ValueKind { + /// UTF-8 text value. + Text, + /// Arbitrary binary value (e.g. gRPC `-bin` metadata). + Binary, +} + +impl fmt::Display for ValueKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ValueKind::Text => write!(f, "text"), + ValueKind::Binary => write!(f, "binary"), + } + } +} + +/// A single captured transport header. +/// +/// Each entry records both the normalized logical name (used for policy +/// matching) and the original wire name observed on ingress (used for +/// lossless re-emission on egress). +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct TransportHeader { + /// Normalized logical name used for matching and policy lookup. + pub name: String, + /// Original header or metadata name observed on ingress. + pub wire_name: String, + /// Whether the value is text or binary. + pub value_kind: ValueKind, + /// Raw value bytes. + pub value: Vec, +} + +impl TransportHeader { + /// Create a new text transport header. + #[must_use] + pub fn text( + name: impl Into, + wire_name: impl Into, + value: impl Into>, + ) -> Self { + Self { + name: name.into(), + wire_name: wire_name.into(), + value_kind: ValueKind::Text, + value: value.into(), + } + } + + /// Create a new binary transport header. + #[must_use] + pub fn binary( + name: impl Into, + wire_name: impl Into, + value: impl Into>, + ) -> Self { + Self { + name: name.into(), + wire_name: wire_name.into(), + value_kind: ValueKind::Binary, + value: value.into(), + } + } + + /// Returns the value as a UTF-8 string, if it is valid text. + #[must_use] + pub fn value_as_str(&self) -> Option<&str> { + std::str::from_utf8(&self.value).ok() + } +} + +/// An ordered collection of captured transport headers. +/// +/// Headers are stored as a `Vec` to preserve insertion order and allow +/// duplicate names (same logical name appearing multiple times), which +/// is valid in both HTTP and gRPC metadata. +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct TransportHeaders { + headers: Vec, +} + +impl TransportHeaders { + /// Create an empty header collection. + #[must_use] + pub fn new() -> Self { + Self { + headers: Vec::new(), + } + } + + /// Create a header collection with pre-allocated capacity. + #[must_use] + pub fn with_capacity(capacity: usize) -> Self { + Self { + headers: Vec::with_capacity(capacity), + } + } + + /// Add a header to the collection. + pub fn push(&mut self, header: TransportHeader) { + self.headers.push(header); + } + + /// Remove all headers from the collection. + pub fn clear(&mut self) { + self.headers.clear(); + } + + /// Returns `true` if there are no headers. + #[must_use] + pub fn is_empty(&self) -> bool { + self.headers.is_empty() + } + + /// Returns the number of headers. + #[must_use] + pub fn len(&self) -> usize { + self.headers.len() + } + + /// Iterate over all headers. + pub fn iter(&self) -> impl Iterator { + self.headers.iter() + } + + /// Find all headers matching a normalized name (case-sensitive match on + /// the logical name). + pub fn find_by_name<'a>(&'a self, name: &'a str) -> impl Iterator { + self.headers.iter().filter(move |h| h.name == name) + } + + /// Returns a slice of all headers. + #[must_use] + pub fn as_slice(&self) -> &[TransportHeader] { + &self.headers + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::transport_headers_policy::{ + CaptureDefaults, CaptureRule, HeaderCapturePolicy, HeaderPropagationPolicy, NameStrategy, + PropagationAction, PropagationDefault, PropagationMatch, PropagationOverride, + PropagationSelector, + }; + + #[test] + fn find_by_name_returns_matching_headers() { + let mut headers = TransportHeaders::new(); + headers.push(TransportHeader::text("tenant", "X-Tenant", b"a".to_vec())); + headers.push(TransportHeader::text( + "request-id", + "X-Request-Id", + b"b".to_vec(), + )); + headers.push(TransportHeader::text("tenant", "X-Tenant", b"c".to_vec())); + + let tenants: Vec<_> = headers.find_by_name("tenant").collect(); + assert_eq!(tenants.len(), 2); + assert_eq!(tenants[0].value, b"a"); + assert_eq!(tenants[1].value, b"c"); + } + + #[test] + fn duplicate_names_preserved() { + let mut headers = TransportHeaders::new(); + headers.push(TransportHeader::text("key", "Key", b"val1".to_vec())); + headers.push(TransportHeader::text("key", "Key", b"val2".to_vec())); + assert_eq!(headers.len(), 2); + } + + #[test] + fn value_as_str_for_text() { + let h = TransportHeader::text("name", "Name", b"hello".to_vec()); + assert_eq!(h.value_as_str(), Some("hello")); + } + + #[test] + fn value_as_str_for_invalid_utf8() { + let h = TransportHeader::binary("name-bin", "name-bin", vec![0xFF, 0xFE]); + assert_eq!(h.value_as_str(), None); + } + + // -- Capture engine tests ------------------------------------------------ + + fn make_capture_policy(rules: Vec) -> HeaderCapturePolicy { + HeaderCapturePolicy { + defaults: CaptureDefaults::default(), + headers: rules, + } + } + + fn rule(names: &[&str], store_as: Option<&str>) -> CaptureRule { + CaptureRule { + match_names: names.iter().map(|s| s.to_string()).collect(), + store_as: store_as.map(|s| s.to_string()), + sensitive: false, + value_kind: None, + } + } + + #[test] + fn capture_empty_policy_captures_nothing() { + let policy = HeaderCapturePolicy::default(); + let pairs = vec![("X-Tenant-Id", b"abc" as &[u8])]; + let mut result = TransportHeaders::new(); + let stats = policy.capture_from_pairs(pairs.into_iter(), &mut result); + assert!(result.is_empty()); + assert!(stats.is_none()); + } + + #[test] + fn capture_matching_headers() { + let policy = make_capture_policy(vec![ + rule(&["x-tenant-id"], Some("tenant_id")), + rule(&["x-request-id"], None), + ]); + + let pairs: Vec<(&str, &[u8])> = vec![ + ("X-Tenant-Id", b"t-123"), + ("X-Request-Id", b"r-456"), + ("X-Unmatched", b"ignored"), + ]; + let mut result = TransportHeaders::new(); + let stats = policy.capture_from_pairs(pairs.into_iter(), &mut result); + assert!(stats.is_none()); + assert_eq!(result.len(), 2); + assert_eq!(result.as_slice()[0].name, "tenant_id"); + assert_eq!(result.as_slice()[0].wire_name, "X-Tenant-Id"); + assert_eq!(result.as_slice()[0].value, b"t-123"); + assert_eq!(result.as_slice()[1].name, "x-request-id"); + } + + #[test] + fn capture_case_insensitive_matching() { + let policy = make_capture_policy(vec![rule(&["x-tenant-id"], None)]); + + let pairs: Vec<(&str, &[u8])> = vec![("X-TENANT-ID", b"val")]; + let mut result = TransportHeaders::new(); + let stats = policy.capture_from_pairs(pairs.into_iter(), &mut result); + assert!(stats.is_none()); + assert_eq!(result.len(), 1); + assert_eq!(result.as_slice()[0].name, "x-tenant-id"); + assert_eq!(result.as_slice()[0].wire_name, "X-TENANT-ID"); + } + + #[test] + fn capture_respects_max_entries() { + let mut policy = make_capture_policy(vec![rule(&["x-key"], None)]); + policy.defaults.max_entries = 2; + + let pairs: Vec<(&str, &[u8])> = vec![("x-key", b"1"), ("x-key", b"2"), ("x-key", b"3")]; + let mut result = TransportHeaders::new(); + let stats = policy.capture_from_pairs(pairs.into_iter(), &mut result); + assert_eq!(result.len(), 2); + let stats = stats.expect("should report skipped headers"); + assert_eq!(stats.skipped_max_entries, 1); + assert_eq!(stats.skipped_name_too_long, 0); + assert_eq!(stats.skipped_value_too_long, 0); + } + + #[test] + fn capture_drops_oversized_value() { + let mut policy = make_capture_policy(vec![rule(&["x-key"], None)]); + policy.defaults.max_value_bytes = 3; + + let pairs: Vec<(&str, &[u8])> = vec![("x-key", b"toolong"), ("x-key", b"ok")]; + let mut result = TransportHeaders::new(); + let stats = policy.capture_from_pairs(pairs.into_iter(), &mut result); + assert_eq!(result.len(), 1); + assert_eq!(result.as_slice()[0].value, b"ok"); + let stats = stats.expect("should report skipped headers"); + assert_eq!(stats.skipped_value_too_long, 1); + assert_eq!(stats.skipped_max_entries, 0); + assert_eq!(stats.skipped_name_too_long, 0); + } + + #[test] + fn capture_binary_detection() { + let policy = make_capture_policy(vec![rule(&["auth-token-bin"], None)]); + + let pairs: Vec<(&str, &[u8])> = vec![("auth-token-bin", &[0xFF, 0x00])]; + let mut result = TransportHeaders::new(); + let stats = policy.capture_from_pairs(pairs.into_iter(), &mut result); + assert!(stats.is_none()); + assert_eq!(result.len(), 1); + assert_eq!(result.as_slice()[0].value_kind, ValueKind::Binary); + } + + // -- Propagation policy tests -------------------------------------------- + + #[test] + fn propagate_all_captured_default() { + let policy = HeaderPropagationPolicy::new( + PropagationDefault { + selector: PropagationSelector::AllCaptured, + ..PropagationDefault::default() + }, + vec![], + ); + let mut headers = TransportHeaders::new(); + headers.push(TransportHeader::text( + "tenant_id", + "X-Tenant-Id", + b"t-1".to_vec(), + )); + headers.push(TransportHeader::text( + "request_id", + "X-Request-Id", + b"r-1".to_vec(), + )); + + let propagated: Vec<_> = policy.propagate(&headers).collect(); + assert_eq!(propagated.len(), 2); + assert_eq!(propagated[0].header_name, "X-Tenant-Id"); + assert_eq!(propagated[1].header_name, "X-Request-Id"); + } + + #[test] + fn propagate_override_drops_auth() { + let policy = HeaderPropagationPolicy::new( + PropagationDefault { + selector: PropagationSelector::AllCaptured, + ..PropagationDefault::default() + }, + vec![PropagationOverride { + match_rule: PropagationMatch { + stored_names: vec!["authorization".to_string()], + }, + action: PropagationAction::Drop, + name: None, + on_error: None, + }], + ); + + let mut headers = TransportHeaders::new(); + headers.push(TransportHeader::text( + "tenant_id", + "X-Tenant-Id", + b"t-1".to_vec(), + )); + headers.push(TransportHeader::text( + "authorization", + "Authorization", + b"Bearer secret".to_vec(), + )); + + let propagated: Vec<_> = policy.propagate(&headers).collect(); + assert_eq!(propagated.len(), 1); + assert_eq!(propagated[0].header_name, "X-Tenant-Id"); + } + + #[test] + fn propagate_selector_none_drops_all_unless_override() { + let policy = HeaderPropagationPolicy { + default: PropagationDefault { + selector: PropagationSelector::None, + ..PropagationDefault::default() + }, + overrides: vec![PropagationOverride { + match_rule: PropagationMatch { + stored_names: vec!["tenant_id".to_string()], + }, + action: PropagationAction::Propagate, + name: None, + on_error: None, + }], + }; + + let mut headers = TransportHeaders::new(); + headers.push(TransportHeader::text( + "tenant_id", + "X-Tenant-Id", + b"t-1".to_vec(), + )); + headers.push(TransportHeader::text( + "request_id", + "X-Request-Id", + b"r-1".to_vec(), + )); + + let propagated: Vec<_> = policy.propagate(&headers).collect(); + assert_eq!(propagated.len(), 1); + assert_eq!(propagated[0].header_name, "X-Tenant-Id"); + } + + #[test] + fn propagate_stored_name_strategy() { + let policy = HeaderPropagationPolicy::new( + PropagationDefault { + selector: PropagationSelector::AllCaptured, + name: NameStrategy::StoredName, + ..PropagationDefault::default() + }, + vec![], + ); + + let mut headers = TransportHeaders::new(); + headers.push(TransportHeader::text( + "tenant_id", + "X-Tenant-Id", + b"t-1".to_vec(), + )); + + let propagated: Vec<_> = policy.propagate(&headers).collect(); + assert_eq!(propagated.len(), 1); + assert_eq!(propagated[0].header_name, "tenant_id"); + } + + #[test] + fn propagate_named_selector() { + let policy = HeaderPropagationPolicy { + default: PropagationDefault { + selector: PropagationSelector::Named(vec!["tenant_id".to_string()]), + ..PropagationDefault::default() + }, + overrides: vec![], + }; + + let mut headers = TransportHeaders::new(); + headers.push(TransportHeader::text( + "tenant_id", + "X-Tenant-Id", + b"t-1".to_vec(), + )); + headers.push(TransportHeader::text( + "request_id", + "X-Request-Id", + b"r-1".to_vec(), + )); + + let propagated: Vec<_> = policy.propagate(&headers).collect(); + assert_eq!(propagated.len(), 1); + assert_eq!(propagated[0].header_name, "X-Tenant-Id"); + } +} diff --git a/rust/otap-dataflow/crates/config/src/transport_headers_policy.rs b/rust/otap-dataflow/crates/config/src/transport_headers_policy.rs new file mode 100644 index 0000000000..31f5535b23 --- /dev/null +++ b/rust/otap-dataflow/crates/config/src/transport_headers_policy.rs @@ -0,0 +1,561 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Transport header capture and propagation policy declarations. +//! +//! This policy family controls which inbound transport headers are captured +//! by receivers and which captured headers are propagated by exporters. +//! +//! Extraction and propagation are explicit and opt-in. The default behavior +//! is not to forward any inbound headers. +//! +//! TODO: Implement the sensitive capability for headers + +use std::fmt; + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::transport_headers::{TransportHeader, TransportHeaders, ValueKind}; + +// -- Stats types -------------------------------------------------------------- + +/// Statistics returned by [`HeaderCapturePolicy::capture_from_pairs`] when +/// one or more matching headers could not be captured due to policy limits. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CaptureStats { + /// Matching headers skipped because `max_entries` was already reached. + pub skipped_max_entries: usize, + /// Matching headers skipped because the wire name exceeded `max_name_bytes`. + pub skipped_name_too_long: usize, + /// Matching headers skipped because the value exceeded `max_value_bytes`. + pub skipped_value_too_long: usize, +} + +impl fmt::Display for CaptureStats { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "capture limits exceeded: {} skipped (max_entries), {} skipped (name too long), {} skipped (value too long)", + self.skipped_max_entries, self.skipped_name_too_long, self.skipped_value_too_long + ) + } +} + +impl std::error::Error for CaptureStats {} + +/// A single header selected for propagation +#[derive(Debug)] +pub struct PropagatedHeader<'a> { + /// The wire name to use on the outbound request. + /// + /// Points to `TransportHeader::wire_name` when the name strategy + /// is [`NameStrategy::Preserve`], or `TransportHeader::name` when + /// [`NameStrategy::StoredName`]. + pub header_name: &'a str, + /// Whether the value is text or binary. + pub value_kind: &'a ValueKind, + /// Raw value bytes. + pub value: &'a [u8], +} + +/// Transport headers policy controlling capture at receivers and +/// propagation at exporters. +#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct TransportHeadersPolicy { + /// Header capture rules applied by receivers. + #[serde(default)] + pub header_capture: HeaderCapturePolicy, + /// Header propagation rules applied by exporters. + #[serde(default)] + pub header_propagation: HeaderPropagationPolicy, +} + +// -- Header Capture ----------------------------------------------------------- + +/// Policy controlling which inbound transport headers are captured by +/// receivers and stored in the pipeline context. +#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct HeaderCapturePolicy { + /// Default limits applied to all captured headers. + #[serde(default)] + pub(crate) defaults: CaptureDefaults, + /// Per-header capture rules. Only headers matching at least one rule + /// are captured. + #[serde(default)] + pub(crate) headers: Vec, +} + +impl HeaderCapturePolicy { + /// Create a new capture policy from the given defaults and rules. + #[must_use] + pub fn new(defaults: CaptureDefaults, headers: Vec) -> Self { + Self { defaults, headers } + } + + /// Returns `true` when no capture rules are defined, meaning the policy + /// will not capture any headers. + #[must_use] + pub fn is_empty(&self) -> bool { + self.headers.is_empty() + } + + /// Capture headers from an iterator of `(wire_name, value)` pairs. + /// + /// Each pair is matched against the capture rules. Only headers + /// matching at least one rule are captured, subject to the configured + /// limits. The `result` collection is cleared before populating. + /// + /// Returns `None` when all matching headers were captured successfully, + /// or `Some(CaptureStats)` when one or more matching headers had to be + /// skipped due to policy limits. + pub fn capture_from_pairs<'a>( + &self, + pairs: impl Iterator, + result: &mut TransportHeaders, + ) -> Option { + result.clear(); + + if self.is_empty() { + return None; + } + + let defaults = &self.defaults; + let mut skipped_max_entries: usize = 0; + let mut skipped_name_too_long: usize = 0; + let mut skipped_value_too_long: usize = 0; + + for (wire_name, value) in pairs { + if let Some(matched_rule) = self.find_matching_rule(wire_name) { + // Enforce entry count limit. + if result.len() >= defaults.max_entries { + skipped_max_entries += 1; + continue; + } + + // Enforce name length limit — drop oversized names. + if wire_name.len() > defaults.max_name_bytes { + skipped_name_too_long += 1; + continue; + } + + // Enforce value length limit — drop oversized values. + if value.len() > defaults.max_value_bytes { + skipped_value_too_long += 1; + continue; + } + + let name = matched_rule + .store_as + .clone() + .unwrap_or_else(|| wire_name.to_ascii_lowercase()); + + let value_kind = match matched_rule.value_kind { + Some(ValueKindConfig::Text) => ValueKind::Text, + Some(ValueKindConfig::Binary) => ValueKind::Binary, + None => { + if wire_name.ends_with("-bin") { + ValueKind::Binary + } else { + ValueKind::Text + } + } + }; + + result.push(TransportHeader { + name, + wire_name: wire_name.to_string(), + value_kind, + value: value.to_vec(), + }); + } + } + + if skipped_max_entries > 0 || skipped_name_too_long > 0 || skipped_value_too_long > 0 { + Some(CaptureStats { + skipped_max_entries, + skipped_name_too_long, + skipped_value_too_long, + }) + } else { + None + } + } + + /// Find the first capture rule whose `match_names` contains the given + /// wire name (case-insensitive comparison). + fn find_matching_rule(&self, wire_name: &str) -> Option<&CaptureRule> { + self.headers.iter().find(|rule| { + rule.match_names + .iter() + .any(|m| wire_name.eq_ignore_ascii_case(m)) + }) + } +} + +/// Default limits for header capture. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct CaptureDefaults { + /// Maximum number of headers captured per message. + #[serde(default = "default_max_entries")] + pub max_entries: usize, + /// Maximum byte length of a header name. + #[serde(default = "default_max_name_bytes")] + pub max_name_bytes: usize, + /// Maximum byte length of a header value. + #[serde(default = "default_max_value_bytes")] + pub max_value_bytes: usize, + /// Action taken when a header violates a limit. + #[serde(default)] + pub on_error: ErrorAction, +} + +impl Default for CaptureDefaults { + fn default() -> Self { + Self { + max_entries: default_max_entries(), + max_name_bytes: default_max_name_bytes(), + max_value_bytes: default_max_value_bytes(), + on_error: ErrorAction::default(), + } + } +} + +const fn default_max_entries() -> usize { + 32 +} + +const fn default_max_name_bytes() -> usize { + 128 +} + +const fn default_max_value_bytes() -> usize { + 4096 +} + +/// A single header capture rule. +/// +/// Headers whose wire name matches any entry in `match_names` +/// (case-insensitive) are captured. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct CaptureRule { + /// Wire header names to match (case-insensitive). + pub match_names: Vec, + /// Normalized logical name to store the header under. If omitted, + /// defaults to the first matched name lowercased. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub store_as: Option, + /// Whether this header contains sensitive data (e.g. auth tokens). + /// Sensitive headers may receive special treatment in logging and + /// debug output. + /// TODO: Implement the sensitive capability for headers + #[serde(default)] + pub sensitive: bool, + /// Override the auto-detected value kind. When omitted, binary is + /// inferred from the gRPC `-bin` suffix convention; otherwise text + /// is assumed. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub value_kind: Option, +} + +/// Configured value kind for a capture rule. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ValueKindConfig { + /// UTF-8 text. + Text, + /// Arbitrary binary bytes. + Binary, +} + +// -- Header Propagation ------------------------------------------------------- + +/// Policy controlling which captured transport headers are propagated by +/// exporters onto outbound requests. +#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct HeaderPropagationPolicy { + /// Default propagation behavior applied to all captured headers. + #[serde(default)] + pub(crate) default: PropagationDefault, + /// Per-header overrides applied after the default. + #[serde(default)] + pub(crate) overrides: Vec, +} + +impl HeaderPropagationPolicy { + /// Create a new propagation policy from the given default behavior and overrides. + #[must_use] + pub fn new(default: PropagationDefault, overrides: Vec) -> Self { + Self { default, overrides } + } + + /// Returns an iterator over headers that should be propagated on + /// egress. Each [`PropagatedHeader`] borrows from the captured + /// headers + /// + /// Headers whose policy action is [`PropagationAction::Drop`] are + /// silently skipped. The [`PropagatedHeader::egress_name`] field + /// points to either the original wire name or the stored name, + /// depending on the resolved [`NameStrategy`]. + pub fn propagate<'a>( + &'a self, + headers: &'a TransportHeaders, + ) -> impl Iterator> { + headers.iter().filter_map(move |header| { + let (action, name_strategy) = self.resolve_action(header); + if action == PropagationAction::Drop { + return None; + } + let header_name = match name_strategy { + NameStrategy::Preserve => &header.wire_name, + NameStrategy::StoredName => &header.name, + }; + Some(PropagatedHeader { + header_name, + value_kind: &header.value_kind, + value: &header.value, + }) + }) + } + + /// Determine the action and name strategy for a single header by + /// checking overrides first, then falling back to the default. + fn resolve_action(&self, header: &TransportHeader) -> (PropagationAction, NameStrategy) { + // Check overrides first. + for ov in &self.overrides { + if ov + .match_rule + .stored_names + .iter() + .any(|s| header.name.eq_ignore_ascii_case(s)) + { + let name_strategy = ov.name.unwrap_or(self.default.name); + return (ov.action, name_strategy); + } + } + + // Check whether the header passes the default selector. + let selected = match &self.default.selector { + PropagationSelector::AllCaptured => true, + PropagationSelector::None => false, + PropagationSelector::Named(names) => { + names.iter().any(|n| header.name.eq_ignore_ascii_case(n)) + } + }; + + if selected { + (self.default.action, self.default.name) + } else { + (PropagationAction::Drop, self.default.name) + } + } +} + +/// Default propagation behavior. +#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct PropagationDefault { + /// Which captured headers to select for propagation. + #[serde(default)] + pub selector: PropagationSelector, + /// Default action for selected headers. + #[serde(default)] + pub action: PropagationAction, + /// How to derive the outbound header name from the stored header. + #[serde(default)] + pub name: NameStrategy, + /// Action taken when a header cannot be propagated. + #[serde(default)] + pub on_error: ErrorAction, +} + +/// Selects which captured headers are candidates for propagation. +#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum PropagationSelector { + /// Propagate all captured headers (subject to overrides). + AllCaptured, + /// Do not propagate any captured headers by default (overrides may + /// still select specific headers). + #[default] + None, + /// Propagate only headers whose stored names appear in this list. + Named(Vec), +} + +/// Action to take for a header during propagation. +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum PropagationAction { + /// Include the header on the outbound request. + #[default] + Propagate, + /// Exclude the header from the outbound request. + Drop, +} + +/// Strategy for mapping the stored header name to the outbound wire name. +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum NameStrategy { + /// Use the original wire name observed on ingress. + #[default] + Preserve, + /// Use the normalized stored name. + StoredName, +} + +/// Action taken when a header violates a policy constraint. +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ErrorAction { + /// Silently drop the offending header. + #[default] + Drop, +} + +/// A per-header propagation override. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct PropagationOverride { + /// Matching criteria for this override. + #[serde(rename = "match")] + pub match_rule: PropagationMatch, + /// Action to take for matched headers. Defaults to `propagate`. + #[serde(default)] + pub action: PropagationAction, + /// Override the name strategy for matched headers. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub name: Option, + /// Override the error action for matched headers. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub on_error: Option, +} + +/// Matching criteria for propagation overrides. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct PropagationMatch { + /// Match headers whose stored (normalized) name appears in this list. + pub stored_names: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn default_capture_policy_captures_nothing() { + let policy = HeaderCapturePolicy::default(); + assert!(policy.is_empty()); + assert_eq!(policy.defaults.max_entries, 32); + assert_eq!(policy.defaults.max_name_bytes, 128); + assert_eq!(policy.defaults.max_value_bytes, 4096); + assert_eq!(policy.defaults.on_error, ErrorAction::Drop); + } + + #[test] + fn default_propagation_policy() { + let policy = HeaderPropagationPolicy::default(); + assert_eq!(policy.default.selector, PropagationSelector::None); + assert_eq!(policy.default.action, PropagationAction::Propagate); + assert_eq!(policy.default.name, NameStrategy::Preserve); + assert_eq!(policy.default.on_error, ErrorAction::Drop); + assert!(policy.overrides.is_empty()); + } + + #[test] + fn capture_policy_serde_roundtrip() { + let yaml = r#" +defaults: + max_entries: 16 + max_name_bytes: 64 + max_value_bytes: 2048 + on_error: drop +headers: + - match_names: ["x-tenant-id"] + store_as: tenant_id + - match_names: ["authorization"] + sensitive: true + - match_names: ["x-request-id"] +"#; + let policy: HeaderCapturePolicy = serde_yaml::from_str(yaml).expect("parse"); + assert_eq!(policy.defaults.max_entries, 16); + assert_eq!(policy.defaults.on_error, ErrorAction::Drop); + assert_eq!(policy.headers.len(), 3); + assert_eq!(policy.headers[0].store_as.as_deref(), Some("tenant_id")); + assert!(policy.headers[1].sensitive); + assert_eq!(policy.headers[2].match_names, vec!["x-request-id"]); + + // roundtrip + let json = serde_json::to_string(&policy).expect("serialize"); + let back: HeaderCapturePolicy = serde_json::from_str(&json).expect("deserialize"); + assert_eq!(back, policy); + } + + #[test] + fn propagation_policy_serde_roundtrip() { + let yaml = r#" +default: + selector: all_captured + action: propagate + name: preserve + on_error: drop +overrides: + - match: + stored_names: ["authorization"] + action: drop +"#; + let policy: HeaderPropagationPolicy = serde_yaml::from_str(yaml).expect("parse"); + assert_eq!(policy.overrides.len(), 1); + assert_eq!( + policy.overrides[0].match_rule.stored_names, + vec!["authorization"] + ); + assert_eq!(policy.overrides[0].action, PropagationAction::Drop); + + let json = serde_json::to_string(&policy).expect("serialize"); + let back: HeaderPropagationPolicy = serde_json::from_str(&json).expect("deserialize"); + assert_eq!(back, policy); + } + + #[test] + fn full_transport_headers_policy_serde() { + let yaml = r#" +header_capture: + defaults: + max_entries: 32 + headers: + - match_names: ["x-tenant-id"] + store_as: tenant_id +header_propagation: + default: + selector: all_captured + overrides: + - match: + stored_names: ["authorization"] + action: drop +"#; + let policy: TransportHeadersPolicy = serde_yaml::from_str(yaml).expect("parse"); + assert_eq!(policy.header_capture.headers.len(), 1); + assert_eq!(policy.header_propagation.overrides.len(), 1); + } + + #[test] + fn selector_named_variant() { + let yaml = r#"!named +- tenant_id +- request_id +"#; + let selector: PropagationSelector = serde_yaml::from_str(yaml).expect("parse"); + assert_eq!( + selector, + PropagationSelector::Named(vec!["tenant_id".to_string(), "request_id".to_string()]) + ); + } +} diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index b4017a4487..082895a3ab 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -55,6 +55,7 @@ use otap_df_config::topic::{ TopicAckPropagationMode, TopicBackendKind, TopicBroadcastOnLagPolicy, TopicImplSelectionPolicy, TopicSpec, }; +use otap_df_config::transport_headers_policy::TransportHeadersPolicy; use otap_df_config::{ DeployedPipelineKey, PipelineGroupId, PipelineId, PipelineKey, SubscriptionGroupName, TopicName, pipeline::PipelineConfig, @@ -1233,6 +1234,7 @@ impl, telemetry_reporting_interval: std::time::Duration, pipeline_factory: &'static PipelineFactory, pipeline_context: PipelineContext, @@ -1739,6 +1745,7 @@ impl { pdata_receiver: Option>, /// Telemetry guard for node lifecycle cleanup. telemetry: Option, + /// Pre-resolved propagation policy for transport header forwarding. + propagation_policy: Option, }, /// An exporter with a `Send` implementation. Shared { @@ -73,6 +76,8 @@ pub enum ExporterWrapper { pdata_receiver: Option>, /// Telemetry guard for node lifecycle cleanup. telemetry: Option, + /// Pre-resolved propagation policy for transport header forwarding. + propagation_policy: Option, }, } @@ -113,6 +118,7 @@ impl ExporterWrapper { control_receiver: LocalReceiver::mpsc(control_receiver), pdata_receiver: None, // This will be set later telemetry: None, + propagation_policy: None, } } @@ -139,6 +145,7 @@ impl ExporterWrapper { control_receiver: SharedReceiver::mpsc(control_receiver), pdata_receiver: None, // This will be set later telemetry: None, + propagation_policy: None, } } @@ -152,6 +159,7 @@ impl ExporterWrapper { control_sender, control_receiver, pdata_receiver, + propagation_policy, .. } => ExporterWrapper::Local { node_id, @@ -162,6 +170,7 @@ impl ExporterWrapper { control_receiver, pdata_receiver, telemetry: Some(guard), + propagation_policy, }, ExporterWrapper::Shared { node_id, @@ -171,6 +180,7 @@ impl ExporterWrapper { control_sender, control_receiver, pdata_receiver, + propagation_policy, .. } => ExporterWrapper::Shared { node_id, @@ -181,6 +191,7 @@ impl ExporterWrapper { control_receiver, pdata_receiver, telemetry: Some(guard), + propagation_policy, }, } } @@ -208,6 +219,7 @@ impl ExporterWrapper { exporter, pdata_receiver, telemetry, + propagation_policy, .. } => { let (control_sender, control_receiver) = @@ -230,6 +242,7 @@ impl ExporterWrapper { control_receiver, pdata_receiver, telemetry, + propagation_policy, } } ExporterWrapper::Shared { @@ -241,6 +254,7 @@ impl ExporterWrapper { exporter, pdata_receiver, telemetry, + propagation_policy, .. } => { let (control_sender, control_receiver) = @@ -263,6 +277,7 @@ impl ExporterWrapper { control_receiver, pdata_receiver, telemetry, + propagation_policy, } } } @@ -301,6 +316,7 @@ impl ExporterWrapper { exporter, control_receiver, pdata_receiver, + propagation_policy, .. }, metrics_reporter, @@ -323,6 +339,7 @@ impl ExporterWrapper { effect_handler .core .set_completion_emission_metrics(completion_emission_metrics.clone()); + effect_handler.set_propagation_policy(propagation_policy); let inbox = ExporterInbox::new( Receiver::Local(control_receiver), pdata_rx, @@ -337,6 +354,7 @@ impl ExporterWrapper { exporter, control_receiver, pdata_receiver, + propagation_policy, .. }, metrics_reporter, @@ -359,6 +377,7 @@ impl ExporterWrapper { effect_handler .core .set_completion_emission_metrics(completion_emission_metrics); + effect_handler.set_propagation_policy(propagation_policy); let inbox = shared::ExporterInbox::new( control_receiver, pdata_rx, @@ -369,6 +388,55 @@ impl ExporterWrapper { } } } + + /// Returns the wrapper with the given pre-resolved propagation policy for + /// transport header forwarding. + pub(crate) fn with_propagation_policy(self, policy: Option) -> Self { + match self { + ExporterWrapper::Local { + node_id, + user_config, + runtime_config, + exporter, + control_sender, + control_receiver, + pdata_receiver, + telemetry, + .. + } => ExporterWrapper::Local { + node_id, + user_config, + runtime_config, + exporter, + control_sender, + control_receiver, + pdata_receiver, + telemetry, + propagation_policy: policy, + }, + ExporterWrapper::Shared { + node_id, + user_config, + runtime_config, + exporter, + control_sender, + control_receiver, + pdata_receiver, + telemetry, + .. + } => ExporterWrapper::Shared { + node_id, + user_config, + runtime_config, + exporter, + control_sender, + control_receiver, + pdata_receiver, + telemetry, + propagation_policy: policy, + }, + } + } } #[async_trait::async_trait(?Send)] @@ -1327,4 +1395,70 @@ mod tests { Message::Control(NodeControlMsg::Shutdown { .. }) )); } + + // -- with_propagation_policy tests ---------------------------------------- + + use otap_df_config::transport_headers_policy::HeaderPropagationPolicy; + + #[test] + fn test_with_propagation_policy_none_by_default() { + let test_runtime = TestRuntime::::new(); + let wrapper = ExporterWrapper::local( + TestExporter::new(test_runtime.counters()), + test_node(test_runtime.config().name.clone()), + Arc::new(NodeUserConfig::new_exporter_config("test")), + test_runtime.config(), + ); + + match wrapper { + ExporterWrapper::Local { + propagation_policy, .. + } => assert!(propagation_policy.is_none(), "should be None by default"), + _ => panic!("expected Local variant"), + } + } + + #[test] + fn test_with_propagation_policy_local() { + let test_runtime = TestRuntime::::new(); + let wrapper = ExporterWrapper::local( + TestExporter::new(test_runtime.counters()), + test_node(test_runtime.config().name.clone()), + Arc::new(NodeUserConfig::new_exporter_config("test")), + test_runtime.config(), + ) + .with_propagation_policy(Some(HeaderPropagationPolicy::default())); + + match wrapper { + ExporterWrapper::Local { + propagation_policy, .. + } => assert!( + propagation_policy.is_some(), + "should be set after with_propagation_policy", + ), + _ => panic!("expected Local variant"), + } + } + + #[test] + fn test_with_propagation_policy_shared() { + let test_runtime = TestRuntime::::new(); + let wrapper = ExporterWrapper::shared( + TestExporter::new(test_runtime.counters()), + test_node(test_runtime.config().name.clone()), + Arc::new(NodeUserConfig::new_exporter_config("test")), + test_runtime.config(), + ) + .with_propagation_policy(Some(HeaderPropagationPolicy::default())); + + match wrapper { + ExporterWrapper::Shared { + propagation_policy, .. + } => assert!( + propagation_policy.is_some(), + "should be set after with_propagation_policy", + ), + _ => panic!("expected Shared variant"), + } + } } diff --git a/rust/otap-dataflow/crates/engine/src/lib.rs b/rust/otap-dataflow/crates/engine/src/lib.rs index 7cadf00111..94d9eb2292 100644 --- a/rust/otap-dataflow/crates/engine/src/lib.rs +++ b/rust/otap-dataflow/crates/engine/src/lib.rs @@ -34,6 +34,9 @@ use otap_df_config::{ node::NodeUserConfig, pipeline::{DispatchPolicy, PipelineConfig}, policy::{ChannelCapacityPolicy, TelemetryPolicy}, + transport_headers_policy::{ + HeaderCapturePolicy, HeaderPropagationPolicy, TransportHeadersPolicy, + }, }; use otap_df_telemetry::INTERNAL_TELEMETRY_RECEIVER_URN; use otap_df_telemetry::InternalTelemetrySettings; @@ -540,6 +543,7 @@ impl PipelineFactory { mut config: PipelineConfig, channel_capacity_policy: ChannelCapacityPolicy, telemetry_policy: TelemetryPolicy, + transport_headers_policy: Option, internal_telemetry: Option, ) -> Result, Error> { let mut receivers = Vec::new(); @@ -673,6 +677,7 @@ impl PipelineFactory { node_config.clone(), channel_capacity_policy.control.node, channel_capacity_policy.pdata, + &transport_headers_policy, ) }, )?; @@ -711,6 +716,7 @@ impl PipelineFactory { node_config.clone(), channel_capacity_policy.control.node, channel_capacity_policy.pdata, + &transport_headers_policy, ) }, )?; @@ -1349,6 +1355,7 @@ impl PipelineFactory { node_config: Arc, control_channel_capacity: usize, pdata_channel_capacity: usize, + transport_headers_policy: &Option, ) -> Result, Error> { let pipeline_group_id = pipeline_ctx.pipeline_group_id(); let pipeline_id = pipeline_ctx.pipeline_id(); @@ -1383,13 +1390,16 @@ impl PipelineFactory { ); let create = factory.create; + let capture_policy = resolve_capture_policy(&node_config, transport_headers_policy); + let receiver = create( (*pipeline_ctx).clone(), node_id.clone(), node_config, &runtime_config, ) - .map_err(|e| Error::ConfigError(Box::new(e)))?; + .map_err(|e| Error::ConfigError(Box::new(e)))? + .with_capture_policy(capture_policy); otel_debug!( "receiver.create.complete", @@ -1471,6 +1481,7 @@ impl PipelineFactory { node_config: Arc, control_channel_capacity: usize, pdata_channel_capacity: usize, + transport_headers_policy: &Option, ) -> Result, Error> { let pipeline_group_id = pipeline_ctx.pipeline_group_id(); let pipeline_id = pipeline_ctx.pipeline_id(); @@ -1505,13 +1516,16 @@ impl PipelineFactory { ); let create = factory.create; + let propagation_policy = resolve_propagation_policy(&node_config, transport_headers_policy); + let exporter = create( (*pipeline_ctx).clone(), node_id.clone(), node_config, &exporter_config, ) - .map_err(|e| Error::ConfigError(Box::new(e)))?; + .map_err(|e| Error::ConfigError(Box::new(e)))? + .with_propagation_policy(propagation_policy); otel_debug!( "exporter.create.complete", @@ -1525,6 +1539,44 @@ impl PipelineFactory { } } +/// Resolves the effective capture policy for a receiver node. +/// +/// Node-level `header_capture` takes precedence over the pipeline-level +/// `transport_headers_policy`. Returns `None` when neither is configured. +fn resolve_capture_policy( + node_config: &NodeUserConfig, + transport_headers_policy: &Option, +) -> Option { + node_config + .header_capture + .as_ref() + .or_else(|| { + transport_headers_policy + .as_ref() + .map(|thp| &thp.header_capture) + }) + .cloned() +} + +/// Resolves the effective propagation policy for an exporter node. +/// +/// Node-level `header_propagation` takes precedence over the pipeline-level +/// `transport_headers_policy`. Returns `None` when neither is configured. +fn resolve_propagation_policy( + node_config: &NodeUserConfig, + transport_headers_policy: &Option, +) -> Option { + node_config + .header_propagation + .as_ref() + .or_else(|| { + transport_headers_policy + .as_ref() + .map(|thp| &thp.header_propagation) + }) + .cloned() +} + trait TelemetryWrapped: Sized { fn with_control_channel_metrics( self, @@ -2032,9 +2084,164 @@ fn stable_hash64(value: &str) -> u64 { #[cfg(test)] mod test { use super::*; + use otap_df_config::transport_headers_policy::{ + CaptureDefaults, CaptureRule, HeaderCapturePolicy, HeaderPropagationPolicy, + PropagationAction, PropagationDefault, PropagationSelector, + }; #[test] fn test_interests() { assert_eq!(Interests::ACKS | Interests::NACKS, Interests::ACKS_OR_NACKS); } + + // -- resolve_capture_policy tests ----------------------------------------- + + fn make_capture_policy_with_rule(name: &str) -> HeaderCapturePolicy { + HeaderCapturePolicy::new( + CaptureDefaults::default(), + vec![CaptureRule { + match_names: vec![name.to_owned()], + store_as: None, + sensitive: false, + value_kind: None, + }], + ) + } + + #[test] + fn test_resolve_capture_policy_node_overrides_pipeline() { + let node_policy = make_capture_policy_with_rule("x-node-header"); + let pipeline_policy = make_capture_policy_with_rule("x-pipeline-header"); + + let mut node_config = NodeUserConfig::new_receiver_config("test_receiver"); + node_config.header_capture = Some(node_policy.clone()); + + let transport_headers_policy = Some(TransportHeadersPolicy { + header_capture: pipeline_policy, + ..Default::default() + }); + + let policy = resolve_capture_policy(&node_config, &transport_headers_policy); + assert!(policy.is_some(), "should resolve a policy"); + + // Verify the node-level policy was used by checking that a + // "x-node-header" is captured while "x-pipeline-header" is not. + let policy = policy.unwrap(); + let mut captured = otap_df_config::transport_headers::TransportHeaders::new(); + let _ = policy.capture_from_pairs( + [("x-node-header", b"val" as &[u8])].into_iter(), + &mut captured, + ); + assert_eq!(captured.len(), 1); + let _ = policy.capture_from_pairs( + [("x-pipeline-header", b"val" as &[u8])].into_iter(), + &mut captured, + ); + assert_eq!(captured.len(), 0); + } + + #[test] + fn test_resolve_capture_policy_falls_back_to_pipeline() { + let pipeline_policy = make_capture_policy_with_rule("x-pipeline-header"); + + let node_config = NodeUserConfig::new_receiver_config("test_receiver"); + // node_config.header_capture is None by default + + let transport_headers_policy = Some(TransportHeadersPolicy { + header_capture: pipeline_policy, + ..Default::default() + }); + + let policy = resolve_capture_policy(&node_config, &transport_headers_policy); + assert!(policy.is_some(), "should fall back to pipeline policy"); + + let policy = policy.unwrap(); + let mut captured = otap_df_config::transport_headers::TransportHeaders::new(); + let _ = policy.capture_from_pairs( + [("x-pipeline-header", b"val" as &[u8])].into_iter(), + &mut captured, + ); + assert_eq!(captured.len(), 1); + } + + #[test] + fn test_resolve_capture_policy_none_when_both_absent() { + let node_config = NodeUserConfig::new_receiver_config("test_receiver"); + let transport_headers_policy = None; + + let policy = resolve_capture_policy(&node_config, &transport_headers_policy); + assert!(policy.is_none()); + } + + // -- resolve_propagation_policy tests ------------------------------------- + + fn make_propagation_policy(action: PropagationAction) -> HeaderPropagationPolicy { + HeaderPropagationPolicy::new( + PropagationDefault { + selector: PropagationSelector::AllCaptured, + action, + ..Default::default() + }, + vec![], + ) + } + + #[test] + fn test_resolve_propagation_policy_node_overrides_pipeline() { + let node_policy = make_propagation_policy(PropagationAction::Propagate); + let pipeline_policy = make_propagation_policy(PropagationAction::Drop); + + let mut node_config = NodeUserConfig::new_exporter_config("test_exporter"); + node_config.header_propagation = Some(node_policy); + + let transport_headers_policy = Some(TransportHeadersPolicy { + header_propagation: pipeline_policy, + ..Default::default() + }); + + let policy = resolve_propagation_policy(&node_config, &transport_headers_policy); + assert!(policy.is_some(), "should resolve a policy"); + + // Verify node-level policy (Propagate) was used, not pipeline (Drop). + let policy = policy.unwrap(); + let mut headers = otap_df_config::transport_headers::TransportHeaders::new(); + headers.push(otap_df_config::transport_headers::TransportHeader::text( + "x-test", "x-test", b"val", + )); + let propagated: Vec<_> = policy.propagate(&headers).collect(); + assert_eq!(propagated.len(), 1, "node policy should propagate"); + } + + #[test] + fn test_resolve_propagation_policy_falls_back_to_pipeline() { + let pipeline_policy = make_propagation_policy(PropagationAction::Propagate); + + let node_config = NodeUserConfig::new_exporter_config("test_exporter"); + // node_config.header_propagation is None by default + + let transport_headers_policy = Some(TransportHeadersPolicy { + header_propagation: pipeline_policy, + ..Default::default() + }); + + let policy = resolve_propagation_policy(&node_config, &transport_headers_policy); + assert!(policy.is_some(), "should fall back to pipeline policy"); + + let policy = policy.unwrap(); + let mut headers = otap_df_config::transport_headers::TransportHeaders::new(); + headers.push(otap_df_config::transport_headers::TransportHeader::text( + "x-test", "x-test", b"val", + )); + let propagated: Vec<_> = policy.propagate(&headers).collect(); + assert_eq!(propagated.len(), 1, "pipeline policy should propagate"); + } + + #[test] + fn test_resolve_propagation_policy_none_when_both_absent() { + let node_config = NodeUserConfig::new_exporter_config("test_exporter"); + let transport_headers_policy = None; + + let policy = resolve_propagation_policy(&node_config, &transport_headers_policy); + assert!(policy.is_none()); + } } diff --git a/rust/otap-dataflow/crates/engine/src/local/exporter.rs b/rust/otap-dataflow/crates/engine/src/local/exporter.rs index c5b4cf4acc..a2898279b6 100644 --- a/rust/otap-dataflow/crates/engine/src/local/exporter.rs +++ b/rust/otap-dataflow/crates/engine/src/local/exporter.rs @@ -41,6 +41,7 @@ use crate::message::ExporterInbox; use crate::node::NodeId; use crate::terminal_state::TerminalState; use async_trait::async_trait; +use otap_df_config::transport_headers_policy::HeaderPropagationPolicy; use otap_df_telemetry::error::Error as TelemetryError; use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler}; use otap_df_telemetry::reporter::MetricsReporter; @@ -96,6 +97,9 @@ pub trait Exporter { pub struct EffectHandler { pub(crate) core: EffectHandlerCore, _pd: PhantomData, + /// Propagation policy for filtering captured headers on egress. + /// `None` when no propagation policy is configured (zero overhead). + propagation_policy: Option, } impl EffectHandler { @@ -106,6 +110,7 @@ impl EffectHandler { EffectHandler { core: EffectHandlerCore::new(node_id, metrics_reporter), _pd: PhantomData, + propagation_policy: None, } } @@ -121,6 +126,19 @@ impl EffectHandler { self.core.node_interests() } + /// Returns the propagation policy if a header propagation policy is configured. + /// + /// Returns `None` when no propagation policy is active (zero overhead). + #[must_use] + pub fn propagation_policy(&self) -> Option<&HeaderPropagationPolicy> { + self.propagation_policy.as_ref() + } + + /// Sets the propagation policy for transport header filtering. + pub fn set_propagation_policy(&mut self, policy: Option) { + self.propagation_policy = policy; + } + /// Print an info message to stdout. /// /// This method provides a standardized way for exporters to output diff --git a/rust/otap-dataflow/crates/engine/src/local/receiver.rs b/rust/otap-dataflow/crates/engine/src/local/receiver.rs index dfdf41fbce..3ef14f7248 100644 --- a/rust/otap-dataflow/crates/engine/src/local/receiver.rs +++ b/rust/otap-dataflow/crates/engine/src/local/receiver.rs @@ -45,6 +45,7 @@ use crate::terminal_state::TerminalState; use async_trait::async_trait; use otap_df_channel::error::RecvError; use otap_df_config::PortName; +use otap_df_config::transport_headers_policy::HeaderCapturePolicy; use otap_df_telemetry::error::Error as TelemetryError; use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler}; use otap_df_telemetry::reporter::MetricsReporter; @@ -134,6 +135,9 @@ pub struct EffectHandler { pub(crate) core: EffectHandlerCore, /// Output-port router. pub router: OutputRouter>, + /// Capture policy for extracting transport headers from inbound metadata. + /// `None` when no capture policy is configured (zero overhead). + capture_policy: Option, } /// Implementation for the `!Send` effect handler. @@ -150,7 +154,11 @@ impl EffectHandler { let mut core = EffectHandlerCore::new(node_id.clone(), metrics_reporter); core.set_runtime_ctrl_msg_sender(node_request_sender); let router = OutputRouter::new(node_id, msg_senders, default_port); - EffectHandler { core, router } + EffectHandler { + core, + router, + capture_policy: None, + } } /// Returns the id of the receiver associated with this handler. @@ -183,6 +191,19 @@ impl EffectHandler { self.core.node_interests() } + /// Returns the capture policy if a header capture policy is configured. + /// + /// Returns `None` when no capture policy is active (zero overhead). + #[must_use] + pub fn capture_policy(&self) -> Option<&HeaderCapturePolicy> { + self.capture_policy.as_ref() + } + + /// Sets the capture policy for transport header extraction. + pub fn set_capture_policy(&mut self, policy: Option) { + self.capture_policy = policy; + } + /// Sends a message to the next node(s) in the pipeline using the default port. /// /// If a default port is configured (either explicitly or deduced when a single port is diff --git a/rust/otap-dataflow/crates/engine/src/receiver.rs b/rust/otap-dataflow/crates/engine/src/receiver.rs index 1d270156c7..5483ac38f4 100644 --- a/rust/otap-dataflow/crates/engine/src/receiver.rs +++ b/rust/otap-dataflow/crates/engine/src/receiver.rs @@ -29,6 +29,7 @@ use otap_df_channel::error::SendError; use otap_df_channel::mpsc; use otap_df_config::PortName; use otap_df_config::node::NodeUserConfig; +use otap_df_config::transport_headers_policy::HeaderCapturePolicy; use otap_df_telemetry::reporter::MetricsReporter; use std::collections::HashMap; use std::sync::Arc; @@ -62,6 +63,8 @@ pub enum ReceiverWrapper { telemetry: Option, /// Whether outgoing messages need source node tagging. source_tag: SourceTagging, + /// Pre-resolved capture policy for transport header extraction. + capture_policy: Option, }, /// A receiver with a `Send` implementation. Shared { @@ -86,6 +89,8 @@ pub enum ReceiverWrapper { telemetry: Option, /// Whether outgoing messages need source node tagging. source_tag: SourceTagging, + /// Pre-resolved capture policy for transport header extraction. + capture_policy: Option, }, } @@ -127,6 +132,7 @@ impl ReceiverWrapper { pdata_receiver: None, telemetry: None, source_tag: SourceTagging::Disabled, + capture_policy: None, } } @@ -154,6 +160,7 @@ impl ReceiverWrapper { pdata_receiver: None, telemetry: None, source_tag: SourceTagging::Disabled, + capture_policy: None, } } @@ -169,6 +176,7 @@ impl ReceiverWrapper { pdata_senders, pdata_receiver, source_tag, + capture_policy, .. } => ReceiverWrapper::Local { node_id, @@ -181,6 +189,7 @@ impl ReceiverWrapper { pdata_receiver, telemetry: Some(guard), source_tag, + capture_policy, }, ReceiverWrapper::Shared { node_id, @@ -192,6 +201,7 @@ impl ReceiverWrapper { pdata_senders, pdata_receiver, source_tag, + capture_policy, .. } => ReceiverWrapper::Shared { node_id, @@ -204,6 +214,7 @@ impl ReceiverWrapper { pdata_receiver, telemetry: Some(guard), source_tag, + capture_policy, }, } } @@ -233,6 +244,7 @@ impl ReceiverWrapper { pdata_receiver, telemetry, source_tag, + capture_policy, .. } => { let (control_sender, control_receiver) = @@ -257,6 +269,7 @@ impl ReceiverWrapper { pdata_receiver, telemetry, source_tag, + capture_policy, } } ReceiverWrapper::Shared { @@ -270,6 +283,7 @@ impl ReceiverWrapper { pdata_receiver, telemetry, source_tag, + capture_policy, .. } => { let (control_sender, control_receiver) = @@ -294,6 +308,7 @@ impl ReceiverWrapper { pdata_receiver, telemetry, source_tag, + capture_policy, } } } @@ -317,6 +332,7 @@ impl ReceiverWrapper { pdata_senders, user_config, source_tag, + capture_policy, .. }, metrics_reporter, @@ -341,6 +357,7 @@ impl ReceiverWrapper { metrics_reporter, ); effect_handler.set_source_tagging(source_tag); + effect_handler.set_capture_policy(capture_policy); effect_handler .core .set_pipeline_completion_msg_sender(pipeline_completion_msg_tx); @@ -355,6 +372,7 @@ impl ReceiverWrapper { pdata_senders, user_config, source_tag, + capture_policy, .. }, metrics_reporter, @@ -379,6 +397,7 @@ impl ReceiverWrapper { metrics_reporter, ); effect_handler.set_source_tagging(source_tag); + effect_handler.set_capture_policy(capture_policy); effect_handler .core .set_pipeline_completion_msg_sender(pipeline_completion_msg_tx); @@ -475,6 +494,65 @@ impl NodeWithPDataSender for ReceiverWrapper { } } +impl ReceiverWrapper { + /// Returns the wrapper with the given pre-resolved capture engine for + /// transport header extraction. + pub(crate) fn with_capture_policy(self, policy: Option) -> Self { + match self { + ReceiverWrapper::Local { + node_id, + user_config, + runtime_config, + receiver, + control_sender, + control_receiver, + pdata_senders, + pdata_receiver, + telemetry, + source_tag, + .. + } => ReceiverWrapper::Local { + node_id, + user_config, + runtime_config, + receiver, + control_sender, + control_receiver, + pdata_senders, + pdata_receiver, + telemetry, + source_tag, + capture_policy: policy, + }, + ReceiverWrapper::Shared { + node_id, + user_config, + runtime_config, + receiver, + control_sender, + control_receiver, + pdata_senders, + pdata_receiver, + telemetry, + source_tag, + .. + } => ReceiverWrapper::Shared { + node_id, + user_config, + runtime_config, + receiver, + control_sender, + control_receiver, + pdata_senders, + pdata_receiver, + telemetry, + source_tag, + capture_policy: policy, + }, + } + } +} + #[cfg(test)] mod tests { use super::ReceiverWrapper; @@ -785,4 +863,69 @@ mod tests { .run_test(scenario(port_rx)) .run_validation(validation_procedure()); } + + // -- with_capture_policy tests -------------------------------------------- + + use otap_df_config::transport_headers_policy::HeaderCapturePolicy; + + #[test] + fn test_with_capture_policy_none_by_default() { + let (port_tx, _port_rx) = oneshot::channel(); + let test_runtime = TestRuntime::::new(); + let wrapper = ReceiverWrapper::local( + TestReceiver::new(test_runtime.counters(), port_tx), + test_node("recv"), + Arc::new(NodeUserConfig::new_receiver_config("test")), + test_runtime.config(), + ); + + match wrapper { + ReceiverWrapper::Local { capture_policy, .. } => { + assert!(capture_policy.is_none(), "should be None by default") + } + _ => panic!("expected Local variant"), + } + } + + #[test] + fn test_with_capture_policy_local() { + let (port_tx, _port_rx) = oneshot::channel(); + let test_runtime = TestRuntime::::new(); + let wrapper = ReceiverWrapper::local( + TestReceiver::new(test_runtime.counters(), port_tx), + test_node("recv"), + Arc::new(NodeUserConfig::new_receiver_config("test")), + test_runtime.config(), + ) + .with_capture_policy(Some(HeaderCapturePolicy::default())); + + match wrapper { + ReceiverWrapper::Local { capture_policy, .. } => assert!( + capture_policy.is_some(), + "should be set after with_capture_policy" + ), + _ => panic!("expected Local variant"), + } + } + + #[test] + fn test_with_capture_policy_shared() { + let (port_tx, _port_rx) = oneshot::channel(); + let test_runtime = TestRuntime::::new(); + let wrapper = ReceiverWrapper::shared( + TestReceiver::new(test_runtime.counters(), port_tx), + test_node("recv"), + Arc::new(NodeUserConfig::new_receiver_config("test")), + test_runtime.config(), + ) + .with_capture_policy(Some(HeaderCapturePolicy::default())); + + match wrapper { + ReceiverWrapper::Shared { capture_policy, .. } => assert!( + capture_policy.is_some(), + "should be set after with_capture_policy" + ), + _ => panic!("expected Shared variant"), + } + } } diff --git a/rust/otap-dataflow/crates/engine/src/shared/exporter.rs b/rust/otap-dataflow/crates/engine/src/shared/exporter.rs index 5ee4e8c617..fb0a8e5fa6 100644 --- a/rust/otap-dataflow/crates/engine/src/shared/exporter.rs +++ b/rust/otap-dataflow/crates/engine/src/shared/exporter.rs @@ -42,6 +42,7 @@ use crate::terminal_state::TerminalState; use crate::{Interests, ReceivedAtNode}; use async_trait::async_trait; use otap_df_channel::error::RecvError; +use otap_df_config::transport_headers_policy::HeaderPropagationPolicy; use otap_df_telemetry::error::Error as TelemetryError; use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler}; use otap_df_telemetry::reporter::MetricsReporter; @@ -97,6 +98,9 @@ pub trait Exporter { pub struct EffectHandler { pub(crate) core: EffectHandlerCore, _pd: PhantomData, + /// Propagation policy for filtering captured headers on egress. + /// `None` when no propagation policy is configured (zero overhead). + propagation_policy: Option, } impl EffectHandler { @@ -107,6 +111,7 @@ impl EffectHandler { EffectHandler { core: EffectHandlerCore::new(node_id, metrics_reporter), _pd: PhantomData, + propagation_policy: None, } } @@ -122,6 +127,19 @@ impl EffectHandler { self.core.node_interests() } + /// Returns the propagation policy if a header propagation policy is configured. + /// + /// Returns `None` when no propagation policy is active (zero overhead). + #[must_use] + pub fn propagation_policy(&self) -> Option<&HeaderPropagationPolicy> { + self.propagation_policy.as_ref() + } + + /// Sets the propagation policy for transport header filtering. + pub fn set_propagation_policy(&mut self, policy: Option) { + self.propagation_policy = policy; + } + /// Print an info message to stdout. /// /// This method provides a standardized way for exporters to output diff --git a/rust/otap-dataflow/crates/engine/src/shared/receiver.rs b/rust/otap-dataflow/crates/engine/src/shared/receiver.rs index 3d49605a9f..623fd21d0f 100644 --- a/rust/otap-dataflow/crates/engine/src/shared/receiver.rs +++ b/rust/otap-dataflow/crates/engine/src/shared/receiver.rs @@ -45,6 +45,7 @@ use crate::terminal_state::TerminalState; use async_trait::async_trait; use otap_df_channel::error::RecvError; use otap_df_config::PortName; +use otap_df_config::transport_headers_policy::HeaderCapturePolicy; use otap_df_telemetry::error::Error as TelemetryError; use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler}; use otap_df_telemetry::reporter::MetricsReporter; @@ -102,6 +103,9 @@ pub struct EffectHandler { pub(crate) core: EffectHandlerCore, /// Output-port router. pub router: OutputRouter>, + /// Capture policy for extracting transport headers from inbound metadata. + /// `None` when no capture policy is configured (zero overhead). + capture_policy: Option, } /// Implementation for the `Send` effect handler. @@ -121,7 +125,11 @@ impl EffectHandler { let mut core = EffectHandlerCore::new(node_id.clone(), metrics_reporter); core.set_runtime_ctrl_msg_sender(runtime_ctrl_msg_sender); let router = OutputRouter::new(node_id, msg_senders, default_port); - EffectHandler { core, router } + EffectHandler { + core, + router, + capture_policy: None, + } } /// Returns the name of the receiver associated with this handler. @@ -154,6 +162,19 @@ impl EffectHandler { self.core.node_interests() } + /// Returns the capture policy if a header capture policy is configured. + /// + /// Returns `None` when no capture policy is active (zero overhead). + #[must_use] + pub fn capture_policy(&self) -> Option<&HeaderCapturePolicy> { + self.capture_policy.as_ref() + } + + /// Sets the capture policy for transport header extraction. + pub fn set_capture_policy(&mut self, policy: Option) { + self.capture_policy = policy; + } + /// Sends a message to the next node(s) in the pipeline. /// /// # Errors diff --git a/rust/otap-dataflow/crates/otap/src/lib.rs b/rust/otap-dataflow/crates/otap/src/lib.rs index cb76f9a678..a6c483ef75 100644 --- a/rust/otap-dataflow/crates/otap/src/lib.rs +++ b/rust/otap-dataflow/crates/otap/src/lib.rs @@ -55,6 +55,10 @@ pub mod object_store; /// Cryptographic provider initialization (see [`crypto::install_crypto_provider`]). pub mod crypto; +/// Protocol-neutral transport header abstraction for end-to-end header +/// propagation through the pipeline. +pub mod transport_headers; + /// TLS utilities #[cfg(feature = "experimental-tls")] pub mod tls_utils; diff --git a/rust/otap-dataflow/crates/otap/src/pdata.rs b/rust/otap-dataflow/crates/otap/src/pdata.rs index bf258f2a7c..65ca9ac7d8 100644 --- a/rust/otap-dataflow/crates/otap/src/pdata.rs +++ b/rust/otap-dataflow/crates/otap/src/pdata.rs @@ -24,10 +24,23 @@ use otap_df_engine::{ }; use otap_df_pdata::OtapPayload; -/// Context for OTAP requests +use crate::transport_headers::TransportHeaders; + +/// Context for OTAP requests. +/// +/// Carries two independent concerns: +/// - **Routing stack**: Ack/Nack routing frames used by the pipeline engine +/// for result notification. Reset at transport boundaries (topic hops). +/// - **Transport headers**: Protocol-neutral request-scoped metadata captured +/// from inbound transport headers. Preserved across transport boundaries. #[derive(Clone, Debug, Default, PartialEq)] pub struct Context { stack: Vec, + /// Transport headers captured from inbound protocol metadata. + /// + /// `None` when no headers have been captured (the common case, zero + /// additional allocation). + transport_headers: Option, } impl Context { @@ -37,6 +50,7 @@ impl Context { pub fn with_capacity(capacity: usize) -> Self { Self { stack: Vec::with_capacity(capacity), + transport_headers: None, } } @@ -272,6 +286,17 @@ impl Context { }); } + /// Returns a reference to the captured transport headers, if any. + #[must_use] + pub fn transport_headers(&self) -> Option<&TransportHeaders> { + self.transport_headers.as_ref() + } + + /// Set the transport headers for this context. + pub fn set_transport_headers(&mut self, headers: TransportHeaders) { + self.transport_headers = Some(headers); + } + /// Get the source node for this context. #[must_use] pub fn source_node(&self) -> Option { @@ -392,15 +417,22 @@ impl OtapPdata { &self.payload } - /// Clone this pdata while resetting transport context to default. + /// Clone this pdata while resetting Ack/Nack routing state to default. /// /// This is used at transport boundaries (for example topic hops between /// pipelines) where in-process Ack/Nack routing state must not leak across /// boundaries. + /// + /// Transport headers are **preserved** because they represent + /// request-scoped metadata (tenant ID, auth, trace context) that should + /// survive cross-pipeline hops. #[must_use] pub fn clone_without_context(&self) -> Self { Self { - context: Context::default(), + context: Context { + stack: Vec::new(), + transport_headers: self.context.transport_headers.clone(), + }, payload: self.payload.clone(), } } @@ -514,6 +546,24 @@ impl OtapPdata { pub fn has_ack_or_nack_interests(&self) -> bool { self.context.has_ack_or_nack_subscribers() } + + /// Returns a reference to the captured transport headers, if any. + #[must_use] + pub fn transport_headers(&self) -> Option<&TransportHeaders> { + self.context.transport_headers() + } + + /// Set transport headers on this pdata's context. + pub fn set_transport_headers(&mut self, headers: TransportHeaders) { + self.context.set_transport_headers(headers); + } + + /// Builder-style method to attach transport headers. + #[must_use] + pub fn with_transport_headers(mut self, headers: TransportHeaders) -> Self { + self.context.set_transport_headers(headers); + self + } } /* -------- Producer effect handler extensions (shared, local) -------- */ diff --git a/rust/otap-dataflow/crates/otap/src/transport_headers.rs b/rust/otap-dataflow/crates/otap/src/transport_headers.rs new file mode 100644 index 0000000000..9820f46d75 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/transport_headers.rs @@ -0,0 +1,220 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Protocol-neutral transport header abstraction for end-to-end header +//! propagation through the pipeline. +//! +//! Core types and engines are defined in [`otap_df_config::transport_headers`] +//! and re-exported here for backward compatibility and convenience. + +// Re-export all public items from the config crate's transport_headers module. +pub use otap_df_config::transport_headers::{TransportHeader, TransportHeaders, ValueKind}; +// Re-export policy types that include capture and propagation logic. +pub use otap_df_config::transport_headers_policy::{ + CaptureStats, HeaderCapturePolicy, HeaderPropagationPolicy, PropagatedHeader, +}; + +#[cfg(test)] +mod tests { + use super::*; + use otap_df_config::transport_headers_policy::{ + CaptureDefaults, CaptureRule, PropagationAction, PropagationDefault, PropagationMatch, + PropagationOverride, PropagationSelector, + }; + + // -- Helper functions for tests ------------------------------------------ + + fn make_capture_policy(rules: Vec) -> HeaderCapturePolicy { + HeaderCapturePolicy::new(CaptureDefaults::default(), rules) + } + + fn rule(names: &[&str], store_as: Option<&str>) -> CaptureRule { + CaptureRule { + match_names: names.iter().map(|s| s.to_string()).collect(), + store_as: store_as.map(|s| s.to_string()), + sensitive: false, + value_kind: None, + } + } + + // -- End-to-end integration tests (depend on OtapPdata) ------------------ + + /// End-to-end test demonstrating the full transport header lifecycle: + /// + /// 1. **Receiver extraction** — Simulate a receiver capturing headers from + /// inbound gRPC metadata using `HeaderCapturePolicy`. + /// 2. **Pdata context attachment** — Attach captured headers to `OtapPdata`. + /// 3. **Processor transparency** — Verify headers survive `clone_without_context()` + /// (what happens at pipeline boundaries / processor pass-through). + /// 4. **Exporter propagation** — Apply `HeaderPropagationPolicy` to filter headers + /// for egress, including dropping sensitive headers like `authorization`. + /// This test exercises the scenario from the design spec: + /// - `otlp_ingest` captures `x-tenant-id`, `x-request-id`, `authorization` + /// - `batch` processor preserves headers unchanged + /// - `otap_export` propagates all except `authorization` (dropped by override) + #[test] + fn end_to_end_capture_preserve_propagate() { + // ========== Step 1: Simulate receiver header capture ========== + + let capture_policy = make_capture_policy(vec![ + rule(&["x-tenant-id"], Some("tenant_id")), + rule(&["x-request-id"], None), + rule(&["authorization"], None), + ]); + + let inbound_metadata: Vec<(&str, &[u8])> = vec![ + ("X-Tenant-Id", b"tenant-abc-123"), + ("X-Request-Id", b"req-xyz-789"), + ("Authorization", b"Bearer super-secret-token"), + ("X-Unrelated-Header", b"should-be-ignored"), + ]; + + let mut captured = TransportHeaders::new(); + let stats = capture_policy.capture_from_pairs(inbound_metadata.into_iter(), &mut captured); + assert!(stats.is_none()); + + assert_eq!( + captured.len(), + 3, + "should capture exactly 3 matching headers" + ); + assert_eq!(captured.as_slice()[0].name, "tenant_id"); + assert_eq!(captured.as_slice()[0].wire_name, "X-Tenant-Id"); + assert_eq!(captured.as_slice()[0].value, b"tenant-abc-123"); + assert_eq!(captured.as_slice()[1].name, "x-request-id"); + assert_eq!(captured.as_slice()[2].name, "authorization"); + + // ========== Step 2: Attach to OtapPdata context ========== + + let pdata = crate::testing::create_test_pdata().with_transport_headers(captured); + + assert!(pdata.transport_headers().is_some()); + assert_eq!(pdata.transport_headers().unwrap().len(), 3); + + // ========== Step 3: Simulate processor pass-through ========== + + let pdata_after_processor = pdata.clone_without_context(); + + assert!( + pdata_after_processor.transport_headers().is_some(), + "transport headers must survive clone_without_context()" + ); + let headers_after = pdata_after_processor.transport_headers().unwrap(); + assert_eq!(headers_after.len(), 3); + assert_eq!(headers_after.as_slice()[0].name, "tenant_id"); + assert_eq!(headers_after.as_slice()[1].name, "x-request-id"); + assert_eq!(headers_after.as_slice()[2].name, "authorization"); + + // ========== Step 4: Simulate exporter propagation ========== + + let propagation_policy = HeaderPropagationPolicy::new( + PropagationDefault { + selector: PropagationSelector::AllCaptured, + ..PropagationDefault::default() + }, + vec![PropagationOverride { + match_rule: PropagationMatch { + stored_names: vec!["authorization".to_string()], + }, + action: PropagationAction::Drop, + name: None, + on_error: None, + }], + ); + + let propagated: Vec<_> = propagation_policy.propagate(headers_after).collect(); + + assert_eq!( + propagated.len(), + 2, + "authorization should be dropped, leaving 2 headers" + ); + assert_eq!(propagated[0].header_name, "X-Tenant-Id"); + assert_eq!(propagated[0].value, b"tenant-abc-123"); + assert_eq!(propagated[1].header_name, "X-Request-Id"); + assert_eq!(propagated[1].value, b"req-xyz-789"); + + assert!( + propagated.iter().all(|h| h.header_name != "Authorization"), + "authorization header must not be propagated" + ); + } + + /// Test that demonstrates duplicate header names are preserved throughout + /// the entire pipeline flow (a key semantic requirement). + #[test] + fn end_to_end_duplicate_headers_preserved() { + let capture_policy = make_capture_policy(vec![rule(&["x-forwarded-for"], None)]); + + let inbound: Vec<(&str, &[u8])> = vec![ + ("X-Forwarded-For", b"10.0.0.1"), + ("X-Forwarded-For", b"192.168.1.1"), + ("X-Forwarded-For", b"172.16.0.1"), + ]; + + let mut captured = TransportHeaders::new(); + let stats = capture_policy.capture_from_pairs(inbound.into_iter(), &mut captured); + assert!(stats.is_none()); + assert_eq!( + captured.len(), + 3, + "all duplicate headers should be captured" + ); + + let pdata = crate::testing::create_test_pdata().with_transport_headers(captured); + let pdata_after = pdata.clone_without_context(); + + let headers = pdata_after.transport_headers().unwrap(); + assert_eq!( + headers.len(), + 3, + "duplicates must survive clone_without_context" + ); + + let propagation_policy = HeaderPropagationPolicy::new( + PropagationDefault { + selector: PropagationSelector::AllCaptured, + ..PropagationDefault::default() + }, + vec![], + ); + let propagated: Vec<_> = propagation_policy.propagate(headers).collect(); + assert_eq!(propagated.len(), 3, "duplicates must survive propagation"); + + let values: Vec<&[u8]> = propagated.iter().map(|h| h.value).collect(); + let expected: Vec<&[u8]> = vec![b"10.0.0.1", b"192.168.1.1", b"172.16.0.1"]; + assert_eq!(values, expected); + } + + /// Test binary header preservation through the entire flow. + #[test] + fn end_to_end_binary_headers_preserved() { + let capture_policy = make_capture_policy(vec![rule(&["trace-context-bin"], None)]); + + let binary_value: Vec = vec![0x00, 0x01, 0xFF, 0xFE, 0x80, 0x7F]; + let inbound: Vec<(&str, &[u8])> = vec![("trace-context-bin", &binary_value)]; + + let mut captured = TransportHeaders::new(); + let stats = capture_policy.capture_from_pairs(inbound.into_iter(), &mut captured); + assert!(stats.is_none()); + assert_eq!(captured.len(), 1); + assert_eq!(captured.as_slice()[0].value_kind, ValueKind::Binary); + assert_eq!(captured.as_slice()[0].value, binary_value); + + let pdata = crate::testing::create_test_pdata().with_transport_headers(captured); + let pdata_after = pdata.clone_without_context(); + + let headers = pdata_after.transport_headers().unwrap(); + let propagation_policy = HeaderPropagationPolicy::new( + PropagationDefault { + selector: PropagationSelector::AllCaptured, + ..PropagationDefault::default() + }, + vec![], + ); + let propagated: Vec<_> = propagation_policy.propagate(headers).collect(); + + assert_eq!(*propagated[0].value_kind, ValueKind::Binary); + assert_eq!(propagated[0].value, binary_value.as_slice()); + } +} diff --git a/rust/otap-dataflow/crates/otap/tests/core_node_liveness_tests.rs b/rust/otap-dataflow/crates/otap/tests/core_node_liveness_tests.rs index 79e6d9d8c6..4e84e2091b 100644 --- a/rust/otap-dataflow/crates/otap/tests/core_node_liveness_tests.rs +++ b/rust/otap-dataflow/crates/otap/tests/core_node_liveness_tests.rs @@ -154,7 +154,8 @@ fn run_pipeline_with_condition( config, channel_capacity_policy.clone(), TelemetryPolicy::default(), - None, + None, // transport_headers_policy + None, // internal_telemetry ) .expect("failed to build runtime pipeline"); diff --git a/rust/otap-dataflow/crates/otap/tests/durable_buffer_processor_tests.rs b/rust/otap-dataflow/crates/otap/tests/durable_buffer_processor_tests.rs index ae27d6b563..5e216bd530 100644 --- a/rust/otap-dataflow/crates/otap/tests/durable_buffer_processor_tests.rs +++ b/rust/otap-dataflow/crates/otap/tests/durable_buffer_processor_tests.rs @@ -475,7 +475,8 @@ where config.clone(), channel_capacity_policy.clone(), TelemetryPolicy::default(), - None, + None, // transport_headers_policy + None, // internal_telemetry ) .expect("failed to build runtime pipeline"); @@ -761,7 +762,8 @@ where config.clone(), channel_capacity_policy.clone(), TelemetryPolicy::default(), - None, + None, // transport_headers_policy + None, // internal_telemetry ) .expect("failed to build runtime pipeline"); diff --git a/rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs b/rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs index 96639652f7..a50551a7da 100644 --- a/rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs +++ b/rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs @@ -68,7 +68,8 @@ fn test_telemetry_registries_cleanup() { config.clone(), channel_capacity_policy.clone(), telemetry_policy, - None, + None, // transport_headers_policy + None, // internal_telemetry ) .expect("failed to build runtime pipeline"); @@ -155,7 +156,8 @@ fn test_pipeline_fan_in_builds() { config, ChannelCapacityPolicy::default(), telemetry_policy, - None, + None, // transport_headers_policy + None, // internal_telemetry ) .expect("failed to build fan-in pipeline"); @@ -192,7 +194,8 @@ fn test_pipeline_mixed_receivers_shared_channel_builds() { config, ChannelCapacityPolicy::default(), telemetry_policy, - None, + None, // transport_headers_policy + None, // internal_telemetry ) .expect("failed to build mixed receiver pipeline"); diff --git a/rust/otap-dataflow/docs/configuration-model.md b/rust/otap-dataflow/docs/configuration-model.md index a87e1d69e9..62c55cfee1 100644 --- a/rust/otap-dataflow/docs/configuration-model.md +++ b/rust/otap-dataflow/docs/configuration-model.md @@ -209,8 +209,8 @@ Optional observability policies are supported at: ## Policy Hierarchy -Policies include channel capacity, health, runtime telemetry, and -resources controls: +Policies include channel capacity, health, runtime telemetry, resources +controls, and transport headers: ```yaml policies: @@ -229,8 +229,19 @@ policies: resources: core_allocation: type: all_cores + transport_headers: + header_capture: + headers: + - match_names: ["x-tenant-id"] + store_as: tenant_id + header_propagation: + default: + selector: all_captured ``` +For full transport header policy documentation, see +[transport-headers.md](transport-headers.md). + Resolution order: - regular pipelines: @@ -249,6 +260,7 @@ Defaults at top-level: - `telemetry.tokio_metrics = true` - `telemetry.runtime_metrics = basic` - `resources.core_allocation = all_cores` +- `transport_headers` = not set (opt-in; no headers captured or propagated) Control channel keys: diff --git a/rust/otap-dataflow/docs/transport-headers.md b/rust/otap-dataflow/docs/transport-headers.md new file mode 100644 index 0000000000..9572aa5d29 --- /dev/null +++ b/rust/otap-dataflow/docs/transport-headers.md @@ -0,0 +1,402 @@ +# Transport Header Policy + +## Overview + +The transport header policy controls end-to-end forwarding of +request-scoped transport metadata (gRPC metadata, HTTP headers) +through the pipeline. It operates in two phases: + +1. **Capture** - Receiver nodes extract selected headers from inbound + requests and attach them to the pipeline message context. +2. **Propagation** - Exporter nodes filter the captured headers and + attach the approved subset to outbound requests. + +The feature is entirely **opt-in**. When no transport header policy is +configured, no headers are captured or forwarded and there is zero +runtime overhead. + +## Configuration Scope + +The transport header policy can be set at multiple levels of the +configuration hierarchy. Each level inherits from its parent and can +override it. + +### Policy Inheritance + +```text +engine policies (broadest scope) + -> group policies + -> pipeline policies (narrowest scope before node overrides) +``` + +At each level, the `transport_headers` field is placed inside +`policies`: + +```yaml +version: otel_dataflow/v1 +policies: + transport_headers: + header_capture: + # ... + header_propagation: + # ... +groups: + my_group: + policies: + transport_headers: + # overrides engine-level policy for this group + pipelines: + my_pipeline: + policies: + transport_headers: + # overrides group-level policy for this pipeline +``` + +The most specific (narrowest) scope wins. If a pipeline defines its +own `transport_headers`, that policy is used for all nodes in that +pipeline regardless of what the group or engine level specifies. + +### Node-Level Overrides + +Individual receiver and exporter nodes can override the capture or +propagation policy independently. A node-level override **fully +replaces** the pipeline-level policy for that node. + +```yaml +nodes: + otlp/ingest: + type: receiver:otlp + config: + protocols: + grpc: + listening_addr: "0.0.0.0:4317" + # Override pipeline-level capture for this receiver. + header_capture: + headers: + - match_names: ["x-tenant-id"] + store_as: tenant_id + + otlp/export: + type: exporter:otlp_grpc + config: + grpc_endpoint: "http://backend:4317" + # Override pipeline-level propagation for this exporter. + header_propagation: + default: + selector: all_captured + overrides: + - match: + stored_names: ["authorization"] + action: drop +``` + +- `header_capture` is only valid on **receiver** nodes. Setting it + on a processor or exporter is a configuration error. +- `header_propagation` is only valid on **exporter** nodes. Setting + it on a processor or receiver is a configuration error. + +## Header Capture + +The capture policy controls which inbound transport headers a +receiver extracts from each request. Only headers that match at +least one capture rule are extracted. + +```yaml +header_capture: + defaults: + max_entries: 32 # default: 32 + max_name_bytes: 128 # default: 128 + max_value_bytes: 4096 # default: 4096 + on_error: drop # default: drop + headers: + - match_names: ["x-tenant-id"] + store_as: tenant_id + - match_names: ["x-request-id"] + - match_names: ["authorization"] + sensitive: true + - match_names: ["x-trace-context-bin"] + value_kind: binary +``` + +### Capture Rules + +Each entry in `headers` is a capture rule. A header is captured +when its wire name matches any entry in `match_names` +(case-insensitive). + +- `match_names` (required): wire header names to match + (case-insensitive) +- `store_as` (optional): normalized name used for policy matching + and storage. Default: first matched name lowercased. +- `sensitive` (optional): marks the header as containing sensitive + data (e.g., auth tokens). Default: `false`. +- `value_kind` (optional): override auto-detected value kind + (`text` or `binary`). When omitted, headers ending in `-bin` + are treated as binary; all others as text. + +### Defaults + +The `defaults` block sets limits applied to all captured headers +in a single request. + +- `max_entries` (default `32`): maximum headers captured per + request. Matching headers beyond this limit are skipped. +- `max_name_bytes` (default `128`): maximum byte length of a + header name. Headers with longer names are skipped. +- `max_value_bytes` (default `4096`): maximum byte length of a + header value. Headers with longer values are skipped. +- `on_error` (default `drop`): action when a limit is violated. + Currently only `drop` is supported. + +When any matching header is skipped due to a limit, the runtime +reports statistics indicating how many headers were skipped and +why (max entries reached, name too long, or value too long). + +## Header Propagation + +The propagation policy controls which captured headers an exporter +includes on outbound requests. It operates in two stages: + +1. **Default selector** - Determines the baseline set of headers + eligible for propagation. +2. **Overrides** - Per-header rules that can force-propagate or + force-drop specific headers, regardless of the default. + +```yaml +header_propagation: + default: + selector: all_captured # default: none + action: propagate # default: propagate + name: preserve # default: preserve + on_error: drop # default: drop + overrides: + - match: + stored_names: ["authorization"] + action: drop + - match: + stored_names: ["x-internal-trace"] + action: propagate + name: stored_name +``` + +### Default Behavior + +- `selector` (default `none`): which captured headers are + candidates for propagation. See selector values below. +- `action` (default `propagate`): action applied to selected + headers (`propagate` or `drop`). +- `name` (default `preserve`): how the outbound wire name is + determined. See name strategy below. +- `on_error` (default `drop`): action on error. Currently only + `drop` is supported. + +### Selector Values + +| Value | Behavior | +| --- | --- | +| `all_captured` | Propagate all captured headers. | +| `none` | Propagate nothing by default (default). | +| `!named [list]` | Propagate only listed stored names. | + +When `none` is used, only headers explicitly matched by an override +with `action: propagate` are included on egress. + +Example of the `named` selector: + +```yaml +header_propagation: + default: + selector: !named + - tenant_id + - x-request-id +``` + +### Name Strategy + +| Value | Behavior | +| --- | --- | +| `preserve` | Use original wire name (default). | +| `stored_name` | Use the normalized stored name. | + +For example, if a header was captured from `X-Tenant-Id` and stored +as `tenant_id`, then `preserve` emits `X-Tenant-Id` on egress while +`stored_name` emits `tenant_id`. + +### Overrides + +Each override targets specific headers by their stored (normalized) +name and can force a different action or name strategy than the +default. + +- `match.stored_names` (required): match headers whose stored name + appears in this list (case-insensitive). +- `action` (optional): action for matched headers. Default: + `propagate`. +- `name` (optional): override name strategy for matched headers. + Inherits from `default.name` when omitted. +- `on_error` (optional): override error action for matched headers. + Inherits from `default.on_error` when omitted. + +Overrides are evaluated before the default selector. If a header +matches an override, the override's action is used and the default +selector is not consulted. + +## Examples + +### Capture and Forward Tenant ID + +Capture a single header at the receiver and forward it through to +all exporters with the original wire name preserved. + +```yaml +version: otel_dataflow/v1 +policies: + transport_headers: + header_capture: + headers: + - match_names: ["x-tenant-id"] + store_as: tenant_id + header_propagation: + default: + selector: all_captured +groups: + default: + pipelines: + main: + nodes: + otlp/ingest: + type: receiver:otlp + config: + protocols: + grpc: + listening_addr: "0.0.0.0:4317" + batch: + type: processor:batch + config: {} + otlp/export: + type: exporter:otlp_grpc + config: + grpc_endpoint: "http://backend:4317" + connections: + - from: otlp/ingest + to: batch + - from: batch + to: otlp/export +``` + +### Drop Sensitive Headers on Egress + +Capture multiple headers including authorization, but strip the +authorization header before forwarding to downstream services. + +```yaml +policies: + transport_headers: + header_capture: + headers: + - match_names: ["x-tenant-id"] + store_as: tenant_id + - match_names: ["x-request-id"] + - match_names: ["authorization"] + sensitive: true + header_propagation: + default: + selector: all_captured + overrides: + - match: + stored_names: ["authorization"] + action: drop +``` + +### Allowlist: Propagate Only Named Headers + +Instead of forwarding all captured headers, propagate only an +explicit list. Any captured header not in the list is dropped on +egress. + +```yaml +policies: + transport_headers: + header_capture: + headers: + - match_names: ["x-tenant-id"] + store_as: tenant_id + - match_names: ["x-request-id"] + - match_names: ["authorization"] + sensitive: true + - match_names: ["x-debug-flags"] + header_propagation: + default: + selector: !named + - tenant_id + - x-request-id +``` + +In this example, `authorization` and `x-debug-flags` are captured +(available for internal processing or logging) but are not forwarded +to downstream services. + +### Per-Node Override + +A pipeline captures tenant ID globally, but one specific exporter +needs a different propagation policy that renames the wire name to +the stored name. + +```yaml +policies: + transport_headers: + header_capture: + headers: + - match_names: ["x-tenant-id"] + store_as: tenant_id + header_propagation: + default: + selector: all_captured + +groups: + default: + pipelines: + main: + nodes: + otlp/ingest: + type: receiver:otlp + config: + protocols: + grpc: + listening_addr: "0.0.0.0:4317" + batch: + type: processor:batch + config: {} + primary/export: + type: exporter:otlp_grpc + config: + grpc_endpoint: "http://primary:4317" + # Pipeline-level propagation (preserve wire name). + secondary/export: + type: exporter:otlp_grpc + config: + grpc_endpoint: "http://secondary:4317" + # Override: use stored name as wire name. + header_propagation: + default: + selector: all_captured + name: stored_name + connections: + - from: otlp/ingest + to: batch + - from: batch + to: [primary/export, secondary/export] +``` + +In this example, `primary/export` sends the header as `X-Tenant-Id` +(the original wire name), while `secondary/export` sends it as +`tenant_id` (the stored name). + +## Limitations + +1. **`on_error` only supports `drop`** - No `log` or `reject` + actions are available. Headers that violate limits are silently + dropped. +2. **Exact match only** - `match_names` requires exact header name + matches (case-insensitive). Regex and glob patterns are not + supported.