Skip to content
224 changes: 224 additions & 0 deletions rust/otap-dataflow/crates/config/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl EngineObservabilityPolicies {
health: self.health,
telemetry: self.telemetry,
resources: None,
transport_headers: None,
}
}

Expand Down Expand Up @@ -1452,6 +1453,229 @@ groups:
assert!(parsed.groups.contains_key("default"));
}

#[test]
fn resolve_transport_headers_pipeline_overrides_group() {
let yaml = r#"
version: otel_dataflow/v1
engine: {}
groups:
default:
policies:
transport_headers:
header_capture:
headers:
- match_names: ["x-group-header"]
pipelines:
with_override:
policies:
transport_headers:
header_capture:
headers:
- match_names: ["x-pipeline-header"]
nodes:
receiver:
type: "urn:test:receiver:example"
config: null
exporter:
type: "urn:test:exporter:example"
config: null
connections:
- from: receiver
to: exporter
"#;
let config = OtelDataflowSpec::from_yaml(yaml).expect("should parse");
let resolved = config.resolve();

let pipeline = resolved
.pipelines
.iter()
.find(|p| p.pipeline_id.as_ref() == "with_override")
.expect("pipeline should be resolved");

// Pipeline-level transport_headers should win over group-level.
let policy = pipeline
.policies
.transport_headers
.as_ref()
.expect("transport_headers should be resolved");
assert_eq!(policy.header_capture.headers.len(), 1);
assert_eq!(
policy.header_capture.headers[0].match_names,
vec!["x-pipeline-header"]
);
}

#[test]
fn resolve_transport_headers_group_overrides_engine() {
let yaml = r#"
version: otel_dataflow/v1
policies:
transport_headers:
header_capture:
headers:
- match_names: ["x-engine-header"]
engine: {}
groups:
default:
policies:
transport_headers:
header_capture:
headers:
- match_names: ["x-group-header"]
pipelines:
main:
nodes:
receiver:
type: "urn:test:receiver:example"
config: null
exporter:
type: "urn:test:exporter:example"
config: null
connections:
- from: receiver
to: exporter
"#;
let config = OtelDataflowSpec::from_yaml(yaml).expect("should parse");
let resolved = config.resolve();

let pipeline = resolved
.pipelines
.iter()
.find(|p| p.pipeline_id.as_ref() == "main")
.expect("pipeline should be resolved");

// Group-level transport_headers should win over engine-level.
let policy = pipeline
.policies
.transport_headers
.as_ref()
.expect("transport_headers should be resolved");
assert_eq!(policy.header_capture.headers.len(), 1);
assert_eq!(
policy.header_capture.headers[0].match_names,
vec!["x-group-header"]
);
}

#[test]
fn resolve_transport_headers_inherits_from_engine() {
let yaml = r#"
version: otel_dataflow/v1
policies:
transport_headers:
header_capture:
headers:
- match_names: ["x-engine-header"]
header_propagation:
default:
selector: all_captured
engine: {}
groups:
default:
pipelines:
main:
nodes:
receiver:
type: "urn:test:receiver:example"
config: null
exporter:
type: "urn:test:exporter:example"
config: null
connections:
- from: receiver
to: exporter
"#;
let config = OtelDataflowSpec::from_yaml(yaml).expect("should parse");
let resolved = config.resolve();

let pipeline = resolved
.pipelines
.iter()
.find(|p| p.pipeline_id.as_ref() == "main")
.expect("pipeline should be resolved");

// Pipeline and group don't define transport_headers, so engine-level
// should be inherited.
let policy = pipeline
.policies
.transport_headers
.as_ref()
.expect("transport_headers should be inherited from engine level");
assert_eq!(policy.header_capture.headers.len(), 1);
assert_eq!(
policy.header_capture.headers[0].match_names,
vec!["x-engine-header"]
);
assert_eq!(
policy.header_propagation.default.selector,
crate::transport_headers_policy::PropagationSelector::AllCaptured
);
}

#[test]
fn resolve_observability_pipeline_has_no_transport_headers() {
let yaml = r#"
version: otel_dataflow/v1
policies:
transport_headers:
header_capture:
headers:
- match_names: ["x-engine-header"]
engine:
observability:
pipeline:
nodes:
itr:
type: "urn:otel:receiver:internal_telemetry"
config: {}
sink:
type: "urn:otel:exporter:console"
config: {}
connections:
- from: itr
to: sink
groups:
default:
pipelines:
main:
nodes:
receiver:
type: "urn:test:receiver:example"
config: null
exporter:
type: "urn:test:exporter:example"
config: null
connections:
- from: receiver
to: exporter
"#;
let config = OtelDataflowSpec::from_yaml(yaml).expect("should parse");
let resolved = config.resolve();

// The observability pipeline should NOT inherit transport_headers from
// the engine level (it's explicitly set to None during resolution).
let obs = resolved
.pipelines
.iter()
.find(|p| p.role == ResolvedPipelineRole::ObservabilityInternal)
.expect("observability pipeline should be resolved");
assert!(
obs.policies.transport_headers.is_none(),
"observability pipeline should not have transport_headers"
);

// Regular pipelines should still inherit engine-level transport_headers.
let main = resolved
.pipelines
.iter()
.find(|p| p.pipeline_id.as_ref() == "main")
.expect("main pipeline should be resolved");
assert!(
main.policies.transport_headers.is_some(),
"regular pipelines should inherit transport_headers from engine level"
);
}

#[test]
fn bundled_configs_parse_as_engine_configs() {
let mut dirs = vec![PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../configs")];
Expand Down
4 changes: 3 additions & 1 deletion rust/otap-dataflow/crates/config/src/engine/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ impl OtelDataflowSpec {
.map(|p| p.clone().into_policies())
.unwrap_or_default();
let mut policies = Policies::resolve([&obs_as_policies, &self.policies]);
// Observability pipelines use default resources.
// Observability pipelines use default resources and do not
// capture/propagate transport headers.
policies.resources = ResourcesPolicy::default();
policies.transport_headers = None;
pipelines.push(ResolvedPipelineConfig {
pipeline_group_id: SYSTEM_PIPELINE_GROUP_ID.into(),
pipeline_id: SYSTEM_OBSERVABILITY_PIPELINE_ID.into(),
Expand Down
4 changes: 4 additions & 0 deletions rust/otap-dataflow/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub mod settings;
/// TLS configuration.
pub mod tls;
pub mod topic;
/// Transport header core types and capture/propagation engines.
pub mod transport_headers;
/// Transport header capture and propagation policy declarations.
pub mod transport_headers_policy;
pub use topic::{
SubscriptionGroupName, TopicAckPropagationMode, TopicAckPropagationPolicies, TopicBackendKind,
TopicBroadcastOnLagPolicy, TopicImplSelectionPolicy, TopicName,
Expand Down
Loading