Skip to content
Draft
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 238 additions & 0 deletions docs/specifications/tracing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
# OpenTelemetry Tracing Design for Numaflow [Proposal]

## 1. Objective
The goal is to enable distributed tracing in Numaflow to track message propagation from Source to Sink. This will provide visibility into:
- The path a message takes through the pipeline.
- Latency introduced by each component (Source, ISB, UDFs, Sink).
- Bottlenecks in the system.

## 2. Key Architectural Decisions

### 2.1 Format: W3C Trace Context
We will use the **W3C Trace Context** standard (`traceparent`, `tracestate`) for context propagation.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hyperlink to the spec?

- **Rationale**: It is the industry standard, natively supported by OpenTelemetry, compact (1-2 headers), and vendor-neutral.

### 2.2 Carrier: System Metadata (`sys_metadata`)
Trace context will be stored in the `sys_metadata` field of the Numaflow `Message` protobuf.

- **Location**: `message.metadata.sys_metadata`
- **Key**: `"tracing"`
- **Value**: A `KeyValueGroup` containing the W3C headers.

**Message Structure:**
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not need to add anything to our proto spec, right? since this is already in place

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

```protobuf
message Message {
// User headers (Immutable)
map<string, string> headers = 5;

// System metadata (Mutable carrier for tracing)
metadata.Metadata metadata = 6;
}

message Metadata {
map<string, KeyValueGroup> sys_metadata = 2;
}
```

**Conceptual Data:**
```json
{
"sys_metadata": {
"tracing": {
"key_value": {
"traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
"tracestate": "pqrs=00f067aa0ba902b7,congo=t61rcWkgMzE"
}
}
}
}
```

## 3. Implementation Plan: Numaflow Core

The Rust data plane (`rust/numaflow-core`) handles the heavy lifting of reading/writing to ISB and invoking UDFs.

### 3.1 Dependencies
Update `Cargo.toml` with:
- `opentelemetry`
- `opentelemetry_sdk`
- `opentelemetry-otlp`
- `tracing-opentelemetry`

### 3.2 Context Propagation Helper
We need a helper module (`numaflow-core/src/shared/otel.rs`) that implements `opentelemetry::propagation::Extractor` and `Injector` for the `KeyValueGroup` struct.

```rust
pub const TRACING_METADATA_KEY: &str = "tracing";

pub struct MetadataExtractor<'a>(pub &'a KeyValueGroup);
// Implements Extractor trait to read "traceparent" from KeyValueGroup

pub struct MetadataInjector<'a>(pub &'a mut KeyValueGroup);
// Implements Injector trait to write "traceparent" to KeyValueGroup
```

### 3.3 Initialization (Tracer Setup)
The tracer MUST be initialized at startup in `main`. It should be configured with a **Service Name** (typically `pipeline_name-vertex_name`) and use the **OTLP Exporter** for production.

**Initialization Snippet:**
```rust
use opentelemetry::sdk::{Resource, trace as sdktrace};
use opentelemetry::KeyValue;
use opentelemetry_otlp::WithExportConfig;

pub fn init_tracing(service_name: &str) -> Result<sdktrace::Tracer, Box<dyn std::error::Error + Send + Sync + 'static>> {
// Check if tracing is enabled via custom env var
let enabled = std::env::var("NUMAFLOW_TRACING_ENABLED")
.map(|v| v.to_lowercase() == "true")
.unwrap_or(false);

if !enabled {
// Return a no-op tracer
return Ok(sdktrace::TracerProvider::default().tracer("noop"));
}

// Configure Resource (Service Name is critical for filtering in UI)
let resource = Resource::new(vec![
KeyValue::new("service.name", service_name.to_string()),
KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
]);

// Configure OTLP Exporter
// - Automatically reads OTEL_EXPORTER_OTLP_ENDPOINT from env
// - Defaults to http://localhost:4317 if not set
let exporter = opentelemetry_otlp::new_exporter()
.tonic(); // Uses gRPC

// Build Pipeline
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_trace_config(
sdktrace::config().with_resource(resource)
)
.install_batch(opentelemetry::runtime::Tokio)?;

Ok(tracer)
}
```

### 3.4 Component Instrumentation

#### A. Source (Producer)
- **Action**: Starts the trace.
- **Logic**:
1. Create a new "Root" Span (e.g., `kind=Producer`).
2. Inject the span context into the outgoing message's `sys_metadata["tracing"]`.
3. Write message to ISB.

#### B. Vertices (Map/Reduce/Sink)
- **Action**: Continues the trace.
- **Logic**:
1. **Read**: Read message from ISB.
2. **Extract**: Look for `sys_metadata["tracing"]`. Use `MetadataExtractor` to get the parent context.
3. **Start Span**: Create a new span (Span Kind to be decided) linked to the parent context.
* **Available Span Kinds**:
- `Server`: Covers the server-side handling of an RPC or other remote request.
- `Client`: Covers the client-side of an RPC or other remote request.
- `Producer`: Describes a child of an asynchronous request (e.g., sending to a queue).
- `Consumer`: Describes a child of an asynchronous request (e.g., receiving from a queue).
- `Internal`: Default. Represents an internal operation within an application.
4. **Process**:
- **Before UDF**: Create a child span (e.g., `kind=Client`) for the UDF call. Inject this context into a *copy* of the metadata passed to the UDF.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, should we set kind=Producer and kind=Consumer when writing to ISB and reading from ISB ?

- **After UDF**: Receive results.
5. **Write**:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think handling traces in Sink should be a separate section.
We could add more details for retries, dropped etc. Eg:

  // Record a Span Event with details
  span.add_event("message.dropped", vec![
      KeyValue::new("drop.reason", "fallback_strategy"),
      KeyValue::new("drop.after_retries", 5),
  ]);

Tracing backend will see something like:

{
  "trace_id": "4bf92f...",
  "span_id": "aabb...",
  "name": "numaflow.my-pipeline.sink-vertex.process",
  "events": [
    {
      "name": "message.dropped",
      "timestamp": "2026-04-01T12:00:00Z",
      "attributes": {
        "drop.reason": "fallback_strategy",
        "drop.after_retries": 5
      }
    }
  ]
}

- Inject the current context into the outgoing message's `sys_metadata`.
- Write to next ISB.

### 3.5 Handling Aggregation (Reduce Vertex)
Reduce vertices ingest multiple messages (Fan-In) and produce a single result. This requires a strategy for linking multiple parent contexts to a single resulting span.

#### Option 1: Trace Links
OpenTelemetry supports "Links" to associate a Span with multiple other Spans that are not its direct parent.
- **Ingest**: Store the trace context of incoming messages in the window state.
- **Aggregate**: When creating the `UDF Call` span:
- Set the **Parent** to the context of the *first* message in the window (or a representative message).
- Add **Links** for the contexts of all other messages in the window.
- **Limit**: To prevent span bloat, limit the number of links (e.g., max 50). If more, sample (e.g., first 10, last 10, random 30).

#### Option 2: First-Wins (Simplified)
- **Ingest**: Keep the trace context of only the *first* message that opened the window.
- **Aggregate**: Use that single context as the Parent for the `UDF Call` span.
- **Pros**: Zero storage overhead, simple.
- **Cons**: Loss of lineage for other messages.


## 4. Implementation Plan: Numaflow SDKs (e.g., Go/Rust/Python)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not worry about SDKs, we just need server spans. Adding dependencies on SDK might be an overkill.


The SDKs run user code (UDFs) in a separate container. They **MUST** also initialize a tracer to participate in the distributed trace.

### 4.1 Initialization
The SDK must run similar initialization code as the Core (see section 3.3).
- It should read `NUMAFLOW_TRACING_ENABLED` to decide whether to start.
- It should use `NUMAFLOW_PIPELINE_NAME` + `NUMAFLOW_VERTEX_NAME` (or similar) to construct `service.name`.
- It connects to the **same** OTLP endpoint as the Core sidecar.

### 4.2 Helper
Implement `propagation.TextMapCarrier` for the `KeyValueGroup` protobuf struct in Go/Rust.

### 4.3 Instrumentation
In the gRPC handler (e.g., `MapFn`):
1. **Extract**: Read `traceparent` from `request.Metadata.SysMetadata["tracing"]`.
2. **Start Span**: Start a new span (`name="UserFunction"`, `kind=Server`).
3. **Execute**: Pass the `context` (containing the span) to the user's function.
- This allows users to create their own child spans using standard OTel APIs: `otel.Tracer("my-udf").Start(ctx, "sub-task")`.
4. **End Span**: Close the span when the function returns.

## 5. End-to-End Workflow

1. **Source**:
- Generates data.
- Starts Trace `T1`, Span `S1`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we create the trace id in SDK after receiving messages from UDF ? Only then we can propagate traceparent coming from external systems like Kafka right?

- Writes Message `M1` with `sys_metadata.tracing.traceparent = T1-S1`.

2. **ISB (JetStream)**:
- Persists `M1`.

3. **Map Vertex (Core)**:
- Reads `M1`.
- Extracts context `T1-S1`.
- Starts Span `S2` (Parent: `S1`) -> **"Vertex Processing"**.
- Covers overhead (ISB read/write, serialization).
- Prepares UDF Request. Starts Span `S3` (Parent: `S2`) -> **"UDF RPC Call"**.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use predictable span names? eg

numaflow.{pipeline}.{vertex}.process
numaflow.{pipeline}.{vertex}.udf
numaflow.{pipeline}.{vertex}.sink.retry

- Covers gRPC roundtrip to UDF container.
- Injects `T1-S3` into UDF Request Metadata.

4. **UDF (SDK)**:
- Receives Request.
- Extracts `T1-S3`.
- Starts Span `S4` (Parent: `S3`). runs user code... Ends `S4`.
- **Crucial**: SDK sends Span `S4` data to Collector.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expecting the sdk to handle data sending?

Copy link
Copy Markdown
Member Author

@adarsh0728 adarsh0728 Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thought of both core and sdk to send data

Trace X:
  Core Span: Vertex Processing
    SDK will start Span for: UDF Execution
      User starts Span (optional): some calculation

For sdk/user spans to work, user has to initiate tracing(we can provide a helper fn via sdk to set up global tracer for udf process with same OTEL endpoint env variable)


5. **Map Vertex (Resume)**:
- UDF returns. Ends `S3`.
- Writes Message `M2` to next ISB.
- Injects `T1-S2` (or a new child `S5`) into `M2.sys_metadata`.

## 6. Configuration & Deployment

### 6.1 Pipeline Spec [TBD]
Users enable tracing by adding environment variables to the pipeline spec.

```yaml
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline

container:
env:
- name: NUMAFLOW_TRACING_ENABLED
value: "true"
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "http://otel-collector.observability:4317"
```

### 6.2 Infrastructure
The user is responsible for deploying the observability stack:
1. **OTEL Collector**: Receives traces from Numaflow.
2. **Backend**: Jaeger, Datadog, etc.

Loading