Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions rust/otap-dataflow/configs/fake-with-extension.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version: otel_dataflow/v1

policies:
channel_capacity:
control:
node: 100
pipeline: 100
pdata: 100

engine: {}

groups:
default:
pipelines:
main:
extensions:
sample_kv_store:
type: "urn:otap:extension:sample_shared_key_value_store"

nodes:
generator:
type: "urn:otel:receiver:traffic_generator"
config:
traffic_config:
max_batch_size: 10
signals_per_second: 10
log_weight: 100

exporter:
type: "urn:otel:exporter:noop"
capabilities:
key_value_store: "sample_kv_store"

connections:
- from: generator
to: exporter
3 changes: 3 additions & 0 deletions rust/otap-dataflow/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ pub type PipelineId = Cow<'static, str>;
/// The id of a node in the pipeline.
pub type NodeId = Cow<'static, str>;

/// The id of a capability binding (e.g., "bearer_token_provider").
pub type CapabilityId = Cow<'static, str>;

/// The URN of a node type.
pub use node_urn::NodeUrn;

Expand Down
103 changes: 102 additions & 1 deletion rust/otap-dataflow/crates/config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,53 @@
use crate::error::Error;
use crate::pipeline::telemetry::{AttributeValue, TelemetryAttribute};
use crate::transport_headers_policy::{HeaderCapturePolicy, HeaderPropagationPolicy};
use crate::{Description, NodeUrn, PortName};
use crate::{CapabilityId, Description, NodeId, NodeUrn, PortName};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::borrow::Cow;
use std::collections::HashMap;

/// Deserializes a `HashMap<String, String>` while rejecting duplicate keys.
///
/// Standard serde deserialization into `HashMap` silently overwrites earlier
/// entries when keys are duplicated in the source. This function detects that
/// and returns an error so the user gets immediate feedback.
fn deserialize_no_dup_keys<'de, D>(
deserializer: D,
) -> Result<HashMap<CapabilityId, NodeId>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{MapAccess, Visitor};
use std::fmt;

struct NoDupVisitor;

impl<'de> Visitor<'de> for NoDupVisitor {
type Value = HashMap<CapabilityId, NodeId>;

fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("a map with no duplicate keys")
}

fn visit_map<A: MapAccess<'de>>(self, mut map: A) -> Result<Self::Value, A::Error> {
let mut result = HashMap::new();
while let Some((key, value)) = map.next_entry::<String, String>()? {
if result.contains_key(key.as_str()) {
return Err(serde::de::Error::custom(format!(
"duplicate capability key '{key}'"
)));
}
let _ = result.insert(CapabilityId::from(key), NodeId::from(value));
}
Ok(result)
}
}

deserializer.deserialize_map(NoDupVisitor)
}

/// User configuration for a node in the pipeline.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -60,6 +100,23 @@ pub struct NodeUserConfig {
#[schemars(extend("x-kubernetes-preserve-unknown-fields" = true))]
pub config: Value,

/// Capability bindings mapping capability names to extension instance names.
///
/// Each entry maps a capability (e.g., `bearer_token_provider`) to the name
/// of an extension instance declared in the pipeline's `extensions` section.
///
/// Example:
/// ```yaml
/// capabilities:
/// bearer_token_provider: azure_auth
/// ```
#[serde(
default,
skip_serializing_if = "HashMap::is_empty",
deserialize_with = "deserialize_no_dup_keys"
)]
pub capabilities: HashMap<CapabilityId, NodeId>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This enforces one extension per capability per node. Is this intentional? The design doc doesn't state this explicitly.

If an exporter needs header_setter functionality from two independent extensions (e.g., one for correlation headers, one for custom metadata headers), the only workaround would be for the user to write a custom extension that combines both. That seems like a heavy ask for what might be a common use case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this, at some point. I feel that if there are more than one capability, we should follow the Collectr's naming convention and use names like bearer_token/left bearer_token/right if you need more than one. I figure we should tackle this when it happens.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with tackling this later. However, one thing to keep in mind: the design doc states that capabilities are "known by the engine version and validated during configuration loading," and that external extensions cannot introduce new capability interfaces. This is different from the Go Collector, where any extension can expose any interface and components look them up by ID at runtime.

So bearer_token/left and bearer_token/right would each need to be pre-registered as known capabilities in the engine core. It's not purely a user naming convention like it would be in the Go Collector. Each new variant requires an engine core change, so this may not be as straightforward to address when the time comes. Just wanted to flag this so it's not overlooked.

Copy link
Copy Markdown
Contributor Author

@gouslu gouslu Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends on how you resolve them. /left and /right might just mean that there are 2 different instances and that you get provided with a map hat has both in it or something like that. What would need to be known is the capability id basically, same as today.


/// Entity configuration for the node.
///
/// Currently, we support entity::extend::identity_attributes, for example:
Expand Down Expand Up @@ -112,6 +169,8 @@ pub enum NodeKind {
Processor,
/// A sink of signals
Exporter,
/// A provider of shared capabilities (e.g., auth, service discovery).
Extension,

// ToDo(LQ) : Add more node kinds as needed.
// A connector between two pipelines
Expand All @@ -126,6 +185,7 @@ impl From<NodeKind> for Cow<'static, str> {
NodeKind::Receiver => "receiver".into(),
NodeKind::Processor => "processor".into(),
NodeKind::Exporter => "exporter".into(),
NodeKind::Extension => "extension".into(),
NodeKind::ProcessorChain => "processor_chain".into(),
}
}
Expand All @@ -145,6 +205,7 @@ impl NodeUserConfig {
default_output: None,
entity: None,
config: Value::Null,
capabilities: HashMap::new(),
header_capture: None,
header_propagation: None,
}
Expand All @@ -163,6 +224,7 @@ impl NodeUserConfig {
outputs: Vec::new(),
default_output: None,
config: Value::Null,
capabilities: HashMap::new(),
header_capture: None,
header_propagation: None,
}
Expand All @@ -181,6 +243,7 @@ impl NodeUserConfig {
outputs: Vec::new(),
default_output: None,
config: Value::Null,
capabilities: HashMap::new(),
header_capture: None,
header_propagation: None,
}
Expand All @@ -196,6 +259,7 @@ impl NodeUserConfig {
outputs: Vec::new(),
default_output: None,
config: user_config,
capabilities: HashMap::new(),
header_capture: None,
header_propagation: None,
}
Expand Down Expand Up @@ -228,6 +292,7 @@ impl NodeUserConfig {
NodeKind::Processor => "processor",
NodeKind::Exporter => "exporter",
NodeKind::ProcessorChain => "processor_chain",
NodeKind::Extension => "extension",
NodeKind::Receiver => unreachable!(),
}
),
Expand All @@ -243,6 +308,7 @@ impl NodeUserConfig {
NodeKind::Receiver => "receiver",
NodeKind::Processor => "processor",
NodeKind::ProcessorChain => "processor_chain",
NodeKind::Extension => "extension",
NodeKind::Exporter => unreachable!(),
}
),
Expand Down Expand Up @@ -518,6 +584,23 @@ config:
);
}

#[test]
fn capabilities_rejects_duplicate_keys_yaml() {
let yaml = r#"
type: "urn:otel:exporter:test"
capabilities:
bearer_token_provider: ext_a
bearer_token_provider: ext_b
"#;
let result: Result<NodeUserConfig, _> = serde_yaml::from_str(yaml);
let err = result.expect_err("should reject duplicate capability keys");
let msg = err.to_string();
assert!(
msg.contains("duplicate"),
"error should mention duplicate: {msg}"
);
}

#[test]
fn header_capture_on_processor_is_rejected() {
let mut cfg = NodeUserConfig::new_processor_config("processor:batch");
Expand Down Expand Up @@ -560,4 +643,22 @@ config:
cfg.validate_transport_header_fields("test", &mut errors);
assert!(errors.is_empty());
}

#[test]
fn capabilities_rejects_duplicate_keys_json() {
let json = r#"{
"type": "urn:otel:exporter:test",
"capabilities": {
"bearer_token_provider": "ext_a",
"bearer_token_provider": "ext_b"
}
}"#;
let result: Result<NodeUserConfig, _> = serde_json::from_str(json);
let err = result.expect_err("should reject duplicate capability keys");
let msg = err.to_string();
assert!(
msg.contains("duplicate"),
"error should mention duplicate: {msg}"
);
}
}
6 changes: 5 additions & 1 deletion rust/otap-dataflow/crates/config/src/node_urn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ const fn kind_suffix(expected_kind: NodeKind) -> &'static str {
NodeKind::Receiver => "receiver",
NodeKind::Processor | NodeKind::ProcessorChain => "processor",
NodeKind::Exporter => "exporter",
NodeKind::Extension => "extension",
}
}

Expand All @@ -228,9 +229,12 @@ fn parse_kind(raw: &str, kind: &str) -> Result<NodeKind, Error> {
"receiver" => Ok(NodeKind::Receiver),
"processor" => Ok(NodeKind::Processor),
"exporter" => Ok(NodeKind::Exporter),
"extension" => Ok(NodeKind::Extension),
_ => Err(invalid_plugin_urn(
raw,
format!("expected kind `receiver`, `processor`, or `exporter`, found `{kind}`"),
format!(
"expected kind `receiver`, `processor`, `exporter`, or `extension`, found `{kind}`"
),
)),
}
}
Expand Down
Loading
Loading