Skip to content

Commit 18e9f65

Browse files
committed
feat(config): extensions section, capabilities bindings, and config-level validation
1 parent ce41c95 commit 18e9f65

File tree

9 files changed

+687
-7
lines changed

9 files changed

+687
-7
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
version: otel_dataflow/v1
2+
3+
policies:
4+
channel_capacity:
5+
control:
6+
node: 100
7+
pipeline: 100
8+
pdata: 100
9+
10+
engine: {}
11+
12+
groups:
13+
default:
14+
pipelines:
15+
main:
16+
extensions:
17+
sample_kv_store:
18+
type: "urn:otap:extension:sample_shared_key_value_store"
19+
20+
nodes:
21+
generator:
22+
type: "urn:otel:receiver:traffic_generator"
23+
config:
24+
traffic_config:
25+
max_batch_size: 10
26+
signals_per_second: 10
27+
log_weight: 100
28+
29+
exporter:
30+
type: "urn:otel:exporter:noop"
31+
capabilities:
32+
key_value_store: "sample_kv_store"
33+
34+
connections:
35+
- from: generator
36+
to: exporter

rust/otap-dataflow/crates/config/src/node.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,44 @@ use serde_json::Value;
1616
use std::borrow::Cow;
1717
use std::collections::HashMap;
1818

19+
/// Deserializes a `HashMap<String, String>` while rejecting duplicate keys.
20+
///
21+
/// Standard serde deserialization into `HashMap` silently overwrites earlier
22+
/// entries when keys are duplicated in the source. This function detects that
23+
/// and returns an error so the user gets immediate feedback.
24+
fn deserialize_no_dup_keys<'de, D>(deserializer: D) -> Result<HashMap<String, String>, D::Error>
25+
where
26+
D: serde::Deserializer<'de>,
27+
{
28+
use serde::de::{MapAccess, Visitor};
29+
use std::fmt;
30+
31+
struct NoDupVisitor;
32+
33+
impl<'de> Visitor<'de> for NoDupVisitor {
34+
type Value = HashMap<String, String>;
35+
36+
fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37+
f.write_str("a map with no duplicate keys")
38+
}
39+
40+
fn visit_map<A: MapAccess<'de>>(self, mut map: A) -> Result<Self::Value, A::Error> {
41+
let mut result = HashMap::new();
42+
while let Some((key, value)) = map.next_entry::<String, String>()? {
43+
if result.contains_key(&key) {
44+
return Err(serde::de::Error::custom(format!(
45+
"duplicate capability key '{key}'"
46+
)));
47+
}
48+
let _ = result.insert(key, value);
49+
}
50+
Ok(result)
51+
}
52+
}
53+
54+
deserializer.deserialize_map(NoDupVisitor)
55+
}
56+
1957
/// User configuration for a node in the pipeline.
2058
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
2159
#[serde(deny_unknown_fields)]
@@ -58,6 +96,23 @@ pub struct NodeUserConfig {
5896
#[schemars(extend("x-kubernetes-preserve-unknown-fields" = true))]
5997
pub config: Value,
6098

99+
/// Capability bindings mapping capability names to extension instance names.
100+
///
101+
/// Each entry maps a capability (e.g., `bearer_token_provider`) to the name
102+
/// of an extension instance declared in the pipeline's `extensions` section.
103+
///
104+
/// Example:
105+
/// ```yaml
106+
/// capabilities:
107+
/// bearer_token_provider: azure_auth
108+
/// ```
109+
#[serde(
110+
default,
111+
skip_serializing_if = "HashMap::is_empty",
112+
deserialize_with = "deserialize_no_dup_keys"
113+
)]
114+
pub capabilities: HashMap<String, String>,
115+
61116
/// Entity configuration for the node.
62117
///
63118
/// Currently, we support entity::extend::identity_attributes, for example:
@@ -88,6 +143,8 @@ pub enum NodeKind {
88143
Processor,
89144
/// A sink of signals
90145
Exporter,
146+
/// A provider of shared capabilities (e.g., auth, service discovery).
147+
Extension,
91148

92149
// ToDo(LQ) : Add more node kinds as needed.
93150
// A connector between two pipelines
@@ -102,6 +159,7 @@ impl From<NodeKind> for Cow<'static, str> {
102159
NodeKind::Receiver => "receiver".into(),
103160
NodeKind::Processor => "processor".into(),
104161
NodeKind::Exporter => "exporter".into(),
162+
NodeKind::Extension => "extension".into(),
105163
NodeKind::ProcessorChain => "processor_chain".into(),
106164
}
107165
}
@@ -121,6 +179,7 @@ impl NodeUserConfig {
121179
default_output: None,
122180
entity: None,
123181
config: Value::Null,
182+
capabilities: HashMap::new(),
124183
}
125184
}
126185

@@ -137,6 +196,7 @@ impl NodeUserConfig {
137196
outputs: Vec::new(),
138197
default_output: None,
139198
config: Value::Null,
199+
capabilities: HashMap::new(),
140200
}
141201
}
142202

@@ -153,6 +213,7 @@ impl NodeUserConfig {
153213
outputs: Vec::new(),
154214
default_output: None,
155215
config: Value::Null,
216+
capabilities: HashMap::new(),
156217
}
157218
}
158219

@@ -166,6 +227,7 @@ impl NodeUserConfig {
166227
outputs: Vec::new(),
167228
default_output: None,
168229
config: user_config,
230+
capabilities: HashMap::new(),
169231
}
170232
}
171233

@@ -386,4 +448,39 @@ config: {}
386448
assert!(cfg.entity.is_some());
387449
assert!(cfg.identity_attributes().is_empty());
388450
}
451+
452+
#[test]
453+
fn capabilities_rejects_duplicate_keys_yaml() {
454+
let yaml = r#"
455+
type: "urn:otel:exporter:test"
456+
capabilities:
457+
bearer_token_provider: ext_a
458+
bearer_token_provider: ext_b
459+
"#;
460+
let result: Result<NodeUserConfig, _> = serde_yaml::from_str(yaml);
461+
let err = result.expect_err("should reject duplicate capability keys");
462+
let msg = err.to_string();
463+
assert!(
464+
msg.contains("duplicate"),
465+
"error should mention duplicate: {msg}"
466+
);
467+
}
468+
469+
#[test]
470+
fn capabilities_rejects_duplicate_keys_json() {
471+
let json = r#"{
472+
"type": "urn:otel:exporter:test",
473+
"capabilities": {
474+
"bearer_token_provider": "ext_a",
475+
"bearer_token_provider": "ext_b"
476+
}
477+
}"#;
478+
let result: Result<NodeUserConfig, _> = serde_json::from_str(json);
479+
let err = result.expect_err("should reject duplicate capability keys");
480+
let msg = err.to_string();
481+
assert!(
482+
msg.contains("duplicate"),
483+
"error should mention duplicate: {msg}"
484+
);
485+
}
389486
}

rust/otap-dataflow/crates/config/src/node_urn.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ const fn kind_suffix(expected_kind: NodeKind) -> &'static str {
208208
NodeKind::Receiver => "receiver",
209209
NodeKind::Processor | NodeKind::ProcessorChain => "processor",
210210
NodeKind::Exporter => "exporter",
211+
NodeKind::Extension => "extension",
211212
}
212213
}
213214

@@ -228,9 +229,12 @@ fn parse_kind(raw: &str, kind: &str) -> Result<NodeKind, Error> {
228229
"receiver" => Ok(NodeKind::Receiver),
229230
"processor" => Ok(NodeKind::Processor),
230231
"exporter" => Ok(NodeKind::Exporter),
232+
"extension" => Ok(NodeKind::Extension),
231233
_ => Err(invalid_plugin_urn(
232234
raw,
233-
format!("expected kind `receiver`, `processor`, or `exporter`, found `{kind}`"),
235+
format!(
236+
"expected kind `receiver`, `processor`, `exporter`, or `extension`, found `{kind}`"
237+
),
234238
)),
235239
}
236240
}

0 commit comments

Comments
 (0)