Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions rust/otap-dataflow/configs/fake-with-extension.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version: otel_dataflow/v1

policies:
channel_capacity:
control:
node: 100
pipeline: 100
pdata: 100

engine: {}

groups:
default:
pipelines:
main:
extensions:
sample_kv_store:
type: "urn:otap:extension:sample_shared_key_value_store"

nodes:
generator:
type: "urn:otel:receiver:traffic_generator"
config:
traffic_config:
max_batch_size: 10
signals_per_second: 10
log_weight: 100

exporter:
type: "urn:otel:exporter:noop"
capabilities:
key_value_store: "sample_kv_store"

connections:
- from: generator
to: exporter
3 changes: 3 additions & 0 deletions rust/otap-dataflow/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ pub type PipelineId = Cow<'static, str>;
/// The id of a node in the pipeline.
pub type NodeId = Cow<'static, str>;

/// The id of a capability binding (e.g., "bearer_token_provider").
pub type CapabilityId = Cow<'static, str>;

/// The URN of a node type.
pub use node_urn::NodeUrn;

Expand Down
103 changes: 102 additions & 1 deletion rust/otap-dataflow/crates/config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,53 @@
use crate::error::Error;
use crate::pipeline::telemetry::{AttributeValue, TelemetryAttribute};
use crate::transport_headers_policy::{HeaderCapturePolicy, HeaderPropagationPolicy};
use crate::{Description, NodeUrn, PortName};
use crate::{CapabilityId, Description, NodeId, NodeUrn, PortName};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::borrow::Cow;
use std::collections::HashMap;

/// Deserializes a `HashMap<String, String>` while rejecting duplicate keys.
///
/// Standard serde deserialization into `HashMap` silently overwrites earlier
/// entries when keys are duplicated in the source. This function detects that
/// and returns an error so the user gets immediate feedback.
fn deserialize_no_dup_keys<'de, D>(
deserializer: D,
) -> Result<HashMap<CapabilityId, NodeId>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{MapAccess, Visitor};
use std::fmt;

struct NoDupVisitor;

impl<'de> Visitor<'de> for NoDupVisitor {
type Value = HashMap<CapabilityId, NodeId>;

fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("a map with no duplicate keys")
}

fn visit_map<A: MapAccess<'de>>(self, mut map: A) -> Result<Self::Value, A::Error> {
let mut result = HashMap::new();
while let Some((key, value)) = map.next_entry::<String, String>()? {
if result.contains_key(key.as_str()) {
return Err(serde::de::Error::custom(format!(
"duplicate capability key '{key}'"
)));
}
let _ = result.insert(CapabilityId::from(key), NodeId::from(value));
}
Ok(result)
}
}

deserializer.deserialize_map(NoDupVisitor)
}

/// User configuration for a node in the pipeline.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -60,6 +100,23 @@ pub struct NodeUserConfig {
#[schemars(extend("x-kubernetes-preserve-unknown-fields" = true))]
pub config: Value,

/// Capability bindings mapping capability names to extension instance names.
///
/// Each entry maps a capability (e.g., `bearer_token_provider`) to the name
/// of an extension instance declared in the pipeline's `extensions` section.
///
/// Example:
/// ```yaml
/// capabilities:
/// bearer_token_provider: azure_auth
/// ```
#[serde(
default,
skip_serializing_if = "HashMap::is_empty",
deserialize_with = "deserialize_no_dup_keys"
)]
pub capabilities: HashMap<CapabilityId, NodeId>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This enforces one extension per capability per node. Is this intentional? The design doc doesn't state this explicitly.

If an exporter needs header_setter functionality from two independent extensions (e.g., one for correlation headers, one for custom metadata headers), the only workaround would be for the user to write a custom extension that combines both. That seems like a heavy ask for what might be a common use case.


/// Entity configuration for the node.
///
/// Currently, we support entity::extend::identity_attributes, for example:
Expand Down Expand Up @@ -112,6 +169,8 @@ pub enum NodeKind {
Processor,
/// A sink of signals
Exporter,
/// A provider of shared capabilities (e.g., auth, service discovery).
Extension,

// ToDo(LQ) : Add more node kinds as needed.
// A connector between two pipelines
Expand All @@ -126,6 +185,7 @@ impl From<NodeKind> for Cow<'static, str> {
NodeKind::Receiver => "receiver".into(),
NodeKind::Processor => "processor".into(),
NodeKind::Exporter => "exporter".into(),
NodeKind::Extension => "extension".into(),
NodeKind::ProcessorChain => "processor_chain".into(),
}
}
Expand All @@ -145,6 +205,7 @@ impl NodeUserConfig {
default_output: None,
entity: None,
config: Value::Null,
capabilities: HashMap::new(),
header_capture: None,
header_propagation: None,
}
Expand All @@ -163,6 +224,7 @@ impl NodeUserConfig {
outputs: Vec::new(),
default_output: None,
config: Value::Null,
capabilities: HashMap::new(),
header_capture: None,
header_propagation: None,
}
Expand All @@ -181,6 +243,7 @@ impl NodeUserConfig {
outputs: Vec::new(),
default_output: None,
config: Value::Null,
capabilities: HashMap::new(),
header_capture: None,
header_propagation: None,
}
Expand All @@ -196,6 +259,7 @@ impl NodeUserConfig {
outputs: Vec::new(),
default_output: None,
config: user_config,
capabilities: HashMap::new(),
header_capture: None,
header_propagation: None,
}
Expand Down Expand Up @@ -228,6 +292,7 @@ impl NodeUserConfig {
NodeKind::Processor => "processor",
NodeKind::Exporter => "exporter",
NodeKind::ProcessorChain => "processor_chain",
NodeKind::Extension => "extension",
NodeKind::Receiver => unreachable!(),
}
),
Expand All @@ -243,6 +308,7 @@ impl NodeUserConfig {
NodeKind::Receiver => "receiver",
NodeKind::Processor => "processor",
NodeKind::ProcessorChain => "processor_chain",
NodeKind::Extension => "extension",
NodeKind::Exporter => unreachable!(),
}
),
Expand Down Expand Up @@ -518,6 +584,23 @@ config:
);
}

#[test]
fn capabilities_rejects_duplicate_keys_yaml() {
let yaml = r#"
type: "urn:otel:exporter:test"
capabilities:
bearer_token_provider: ext_a
bearer_token_provider: ext_b
"#;
let result: Result<NodeUserConfig, _> = serde_yaml::from_str(yaml);
let err = result.expect_err("should reject duplicate capability keys");
let msg = err.to_string();
assert!(
msg.contains("duplicate"),
"error should mention duplicate: {msg}"
);
}

#[test]
fn header_capture_on_processor_is_rejected() {
let mut cfg = NodeUserConfig::new_processor_config("processor:batch");
Expand Down Expand Up @@ -560,4 +643,22 @@ config:
cfg.validate_transport_header_fields("test", &mut errors);
assert!(errors.is_empty());
}

#[test]
fn capabilities_rejects_duplicate_keys_json() {
let json = r#"{
"type": "urn:otel:exporter:test",
"capabilities": {
"bearer_token_provider": "ext_a",
"bearer_token_provider": "ext_b"
}
}"#;
let result: Result<NodeUserConfig, _> = serde_json::from_str(json);
let err = result.expect_err("should reject duplicate capability keys");
let msg = err.to_string();
assert!(
msg.contains("duplicate"),
"error should mention duplicate: {msg}"
);
}
}
6 changes: 5 additions & 1 deletion rust/otap-dataflow/crates/config/src/node_urn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ const fn kind_suffix(expected_kind: NodeKind) -> &'static str {
NodeKind::Receiver => "receiver",
NodeKind::Processor | NodeKind::ProcessorChain => "processor",
NodeKind::Exporter => "exporter",
NodeKind::Extension => "extension",
}
}

Expand All @@ -228,9 +229,12 @@ fn parse_kind(raw: &str, kind: &str) -> Result<NodeKind, Error> {
"receiver" => Ok(NodeKind::Receiver),
"processor" => Ok(NodeKind::Processor),
"exporter" => Ok(NodeKind::Exporter),
"extension" => Ok(NodeKind::Extension),
_ => Err(invalid_plugin_urn(
raw,
format!("expected kind `receiver`, `processor`, or `exporter`, found `{kind}`"),
format!(
"expected kind `receiver`, `processor`, `exporter`, or `extension`, found `{kind}`"
),
)),
}
}
Expand Down
Loading
Loading