Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/operations/metrics/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ These metrics are specific to map/UDF vertices.
| `forwarder_udf_error_total` | Counter | | Total number of UDF errors |
| `forwarder_udf_processing_time` | Histogram | | Processing times of User-Defined Functions (UDFs), in microseconds |

### Reduce Metrics

These metrics are specific to reduce (windowed aggregation) vertices.

| Metric name | Metric type | Additional Labels | Description |
|--------------------------------------|-------------|-------------------|----------------------------------------------------------------------|
| `reduce_active_windows` | Gauge | | Number of currently open reduce windows |
| `reduce_closed_windows` | Gauge | | Number of closed windows awaiting GC |
| `reduce_watermark_lag` | Gauge | | Difference between wall clock and watermark, in milliseconds |
| `reduce_window_processing_time` | Histogram | | Window open-to-close latency, in microseconds |
| `reduce_pnf_process_time` | Histogram | | UDF reduce function execution time per window, in microseconds |
| `reduce_pbq_write` | Counter | | Total data messages written to PBQ |

> **Note:** All reduce metrics carry the standard pipeline common labels (`pipeline`, `vertex`, `replica`) with `vertex_type` always set to `ReduceUDF`.

### Fallback Sink Metrics

These metrics are specific to sink vertices with a fallback sink configured.
Expand Down
80 changes: 80 additions & 0 deletions rust/numaflow-core/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const MVTX_REGISTRY_GLOBAL_PREFIX: &str = "monovtx";
// Prefixes for the sub-registries
const SINK_REGISTRY_PREFIX: &str = "sink";
const FALLBACK_SINK_REGISTRY_PREFIX: &str = "fallback_sink";
const REDUCE_REGISTRY_PREFIX: &str = "reduce";
const ON_SUCCESS_SINK_REGISTRY_PREFIX: &str = "onsuccess_sink";
const TRANSFORMER_REGISTRY_PREFIX: &str = "transformer";
const UDF_REGISTRY_PREFIX: &str = "udf";
Expand Down Expand Up @@ -149,6 +150,14 @@ const SINK_TIME: &str = "time";
const FALLBACK_SINK_TIME: &str = "time";
const ON_SUCCESS_SINK_TIME: &str = "time";

// reduce specific metrics
const REDUCE_ACTIVE_WINDOWS: &str = "active_windows";
const REDUCE_CLOSED_WINDOWS: &str = "closed_windows";
const REDUCE_WINDOW_PROCESSING_TIME: &str = "window_processing_time";
const REDUCE_PNF_PROCESS_TIME: &str = "pnf_process_time";
const REDUCE_WATERMARK_LAG: &str = "watermark_lag";
const REDUCE_PBQ_WRITE_TOTAL: &str = "pbq_write";

// jetstream isb processing times
const JETSTREAM_ISB_READ_TIME_TOTAL: &str = "read_time_total";
const JETSTREAM_ISB_WRITE_TIME_TOTAL: &str = "write_time_total";
Expand Down Expand Up @@ -307,6 +316,8 @@ pub(crate) struct PipelineMetrics {
pub(crate) sink_forwarder: SinkForwarderMetrics,
pub(crate) jetstream_isb: JetStreamISBMetrics,
pub(crate) pending_raw: Family<Vec<(String, String)>, Gauge>,
// reduce specific metrics
pub(crate) reduce: ReduceMetrics,
}

/// Family of metrics for the sink
Expand Down Expand Up @@ -342,6 +353,39 @@ pub(crate) struct UDFMetrics {
pub(crate) errors_total: Family<Vec<(String, String)>, Counter>,
}

/// Family of metrics for the Reduce vertex
pub(crate) struct ReduceMetrics {
// gauges
pub(crate) active_windows: Family<Vec<(String, String)>, Gauge>,
pub(crate) closed_windows: Family<Vec<(String, String)>, Gauge>,
pub(crate) watermark_lag: Family<Vec<(String, String)>, Gauge<f64, AtomicU64>>,
// histograms
pub(crate) window_processing_time: Family<Vec<(String, String)>, Histogram>,
pub(crate) pnf_process_time: Family<Vec<(String, String)>, Histogram>,
// counters
pub(crate) pbq_write_total: Family<Vec<(String, String)>, Counter>,
}

impl ReduceMetrics {
pub(crate) fn new() -> Self {
Self {
active_windows: Family::<Vec<(String, String)>, Gauge>::default(),
closed_windows: Family::<Vec<(String, String)>, Gauge>::default(),
watermark_lag: Family::<Vec<(String, String)>, Gauge<f64, AtomicU64>>::default(),
window_processing_time:
Family::<Vec<(String, String)>, Histogram>::new_with_constructor(
// 1ms to 60 minutes in microseconds
|| Histogram::new(exponential_buckets_range(1000.0, 3_600_000_000.0, 10)),
),
pnf_process_time: Family::<Vec<(String, String)>, Histogram>::new_with_constructor(
// 1ms to 20 minutes in microseconds
|| Histogram::new(exponential_buckets_range(1000.0, 1_200_000_000.0, 10)),
),
pbq_write_total: Family::<Vec<(String, String)>, Counter>::default(),
}
}
}

/// Generic forwarder metrics
pub(crate) struct PipelineForwarderMetrics {
// read counters
Expand Down Expand Up @@ -763,13 +807,15 @@ impl PipelineMetrics {
sink_forwarder: SinkForwarderMetrics::new(),
jetstream_isb: JetStreamISBMetrics::new(),
pending_raw: Family::<Vec<(String, String)>, Gauge>::default(),
reduce: ReduceMetrics::new(),
};
let mut registry = global_registry().registry.lock();
Self::register_forwarder_metrics(&metrics, &mut registry);
Self::register_source_forwarder_metrics(&metrics, &mut registry);
Self::register_sink_forwarder_metrics(&metrics, &mut registry);
Self::register_jetstream_isb_metrics(&metrics, &mut registry);
Self::register_vertex_metrics(&metrics, &mut registry);
Self::register_reduce_metrics(&metrics, &mut registry);
metrics
}

Expand Down Expand Up @@ -1047,6 +1093,40 @@ impl PipelineMetrics {
metrics.pending_raw.clone(),
);
}

fn register_reduce_metrics(metrics: &Self, registry: &mut Registry) {
let reduce_registry = registry.sub_registry_with_prefix(REDUCE_REGISTRY_PREFIX);
reduce_registry.register(
REDUCE_ACTIVE_WINDOWS,
"Number of currently open reduce windows",
metrics.reduce.active_windows.clone(),
);
reduce_registry.register(
REDUCE_CLOSED_WINDOWS,
"Number of closed reduce windows awaiting GC",
metrics.reduce.closed_windows.clone(),
);
reduce_registry.register(
REDUCE_WATERMARK_LAG,
"Difference between current wall clock and watermark in milliseconds",
metrics.reduce.watermark_lag.clone(),
);
reduce_registry.register(
REDUCE_WINDOW_PROCESSING_TIME,
"Time from window open to window close in microseconds (1ms to 60 minutes)",
metrics.reduce.window_processing_time.clone(),
);
reduce_registry.register(
REDUCE_PNF_PROCESS_TIME,
"Time for UDF reduce function to complete per window in microseconds (1ms to 20 minutes)",
metrics.reduce.pnf_process_time.clone(),
);
reduce_registry.register(
REDUCE_PBQ_WRITE_TOTAL,
"Total number of messages written to PBQ",
metrics.reduce.pbq_write_total.clone(),
);
}
}

/// MONOVTX_METRICS is the MonoVtxMetrics object which stores the metrics
Expand Down
31 changes: 29 additions & 2 deletions rust/numaflow-core/src/reduce/pbq.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::config::pipeline::VERTEX_TYPE_REDUCE_UDF;
use crate::error::Result;
use crate::mark_success;
use crate::message::Message;
use crate::message::{Message, MessageType};
use crate::metrics::{pipeline_metric_labels, pipeline_metrics};
use crate::pipeline::isb::reader::ISBReaderOrchestrator;
use crate::reduce::wal::WalMessage;
use crate::reduce::wal::segment::append::{AppendOnlyWal, SegmentWriteMessage};
Expand Down Expand Up @@ -114,9 +116,18 @@ impl<C: NumaflowTypeConfig> PBQ<C> {
let msg: WalMessage = msg.try_into().map_err(|e| {
crate::error::Error::Reduce(format!("Failed to parse WAL message: {e}"))
})?;
messages_tx.send(msg.into()).await.map_err(|_| {
let message: Message = msg.into();
let is_data = message.typ == MessageType::Data;
messages_tx.send(message).await.map_err(|_| {
crate::error::Error::Reduce("PBQ WAL replay: receiver dropped".to_string())
})?;
if is_data {
pipeline_metrics()
.reduce
.pbq_write_total
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
.inc();
}
replayed_count += 1;
}

Expand Down Expand Up @@ -153,11 +164,19 @@ impl<C: NumaflowTypeConfig> PBQ<C> {
})?;

// Send cloned message downstream
let is_data = message.typ == MessageType::Data;
tx.send(message).await.map_err(|_| {
crate::error::Error::Reduce(
"PBQ ISB reader: downstream receiver dropped".to_string(),
)
})?;
if is_data {
pipeline_metrics()
.reduce
.pbq_write_total
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
.inc();
}
}

isb_handle
Expand Down Expand Up @@ -194,11 +213,19 @@ impl<C: NumaflowTypeConfig> PBQ<C> {
while let Some(read_msg) = isb_stream.next().await {
// Extract the message and forward to output channel
let message = read_msg.message().clone();
let is_data = message.typ == MessageType::Data;
tx.send(message).await.map_err(|_| {
crate::error::Error::Reduce(
"PBQ ISB reader: downstream receiver dropped".to_string(),
)
})?;
if is_data {
pipeline_metrics()
.reduce
.pbq_write_total
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
.inc();
}
// Mark the read message as success after processing
mark_success!(read_msg);
}
Expand Down
64 changes: 62 additions & 2 deletions rust/numaflow-core/src/reduce/reducer/aligned/reducer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::config::pipeline::VERTEX_TYPE_REDUCE_UDF;
use crate::config::{get_vertex_name, get_vertex_replica};
use crate::error::Error;
use crate::message::{Message, MessageType};
use crate::metrics::{pipeline_drop_metric_labels, pipeline_metrics};
use crate::metrics::{pipeline_drop_metric_labels, pipeline_metric_labels, pipeline_metrics};
use crate::pipeline::isb::writer::ISBWriterOrchestrator;
use crate::reduce::reducer::aligned::user_defined::UserDefinedAlignedReduce;
use crate::reduce::reducer::aligned::windower::{
Expand All @@ -15,7 +16,7 @@ use numaflow_pb::objects::wal::GcEvent;
use std::collections::HashMap;
use std::ops::Sub;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -44,6 +45,7 @@ struct ReduceTask<C: NumaflowTypeConfig> {
error_tx: mpsc::Sender<Error>,
window: Window,
window_manager: AlignedWindowManager,
window_open_time: Instant,
}

impl<C: NumaflowTypeConfig> ReduceTask<C> {
Expand All @@ -63,6 +65,7 @@ impl<C: NumaflowTypeConfig> ReduceTask<C> {
error_tx,
window,
window_manager,
window_open_time: Instant::now(),
}
}

Expand Down Expand Up @@ -92,10 +95,18 @@ impl<C: NumaflowTypeConfig> ReduceTask<C> {
// Call the reduce function. This is a blocking call and will return only once the window
// is closed, cancellation is detected, or on error. The output is sent to the result_tx
// channel which is consumed by the writer task and published to JetStream.
let udf_start = Instant::now();
let result = self
.client
.reduce_fn(message_stream, result_tx, cln_token)
.await;
if result.is_ok() {
pipeline_metrics()
.reduce
.pnf_process_time
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
.observe(udf_start.elapsed().as_micros() as f64);
}

if let Err(e) = result {
// Check if this is a cancellation error
Expand Down Expand Up @@ -126,10 +137,24 @@ impl<C: NumaflowTypeConfig> ReduceTask<C> {
.oldest_window()
.expect("no oldest window found");

// Record window processing time (open to results-written)
pipeline_metrics()
.reduce
.window_processing_time
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
.observe(self.window_open_time.elapsed().as_micros() as f64);

// we can safely delete the window from the window manager since the results are
// successfully written to jetstream and watermark is published.
self.window_manager.gc_window(self.window.clone());

// Update closed windows gauge after GC removes the window
pipeline_metrics()
.reduce
.closed_windows
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
.set(self.window_manager.closed_window_count() as i64);

// now that the processing is done, we can add this window to the GC WAL.
let Some(gc_wal_tx) = &self.gc_wal_tx else {
// return if the GC WAL is not configured
Expand Down Expand Up @@ -281,6 +306,13 @@ impl<C: NumaflowTypeConfig> AlignedReduceActor<C> {
},
);

// Update active windows gauge
pipeline_metrics()
.reduce
.active_windows
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
.set(self.window_manager.active_window_count() as i64);

// Send the open command with the first message
if let Err(e) = message_tx.send(window_msg).await
&& !self.cln_token.is_cancelled()
Expand Down Expand Up @@ -340,6 +372,18 @@ impl<C: NumaflowTypeConfig> AlignedReduceActor<C> {
return;
};

// Update active and closed windows gauges
pipeline_metrics()
.reduce
.active_windows
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
.set(self.window_manager.active_window_count() as i64);
pipeline_metrics()
.reduce
.closed_windows
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
.set(self.window_manager.closed_window_count() as i64);

// we don't need to write the close message to the client, stream closing
// is considered as close for aligned windows.
// Drop the sender to signal completion
Expand Down Expand Up @@ -475,6 +519,12 @@ impl<C: NumaflowTypeConfig> AlignedReducer<C> {
// Only close windows if the idle watermark is greater than current watermark
if idle_watermark > self.current_watermark {
self.current_watermark = idle_watermark;
let lag = Utc::now().timestamp_millis() - self.current_watermark.timestamp_millis();
pipeline_metrics()
.reduce
.watermark_lag
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
.set(lag as f64);
self.close_windows_with_watermark(idle_watermark, &actor_tx).await;
}
}
Expand Down Expand Up @@ -579,6 +629,16 @@ impl<C: NumaflowTypeConfig> AlignedReducer<C> {
.unwrap_or(DateTime::from_timestamp_millis(-1).expect("Invalid timestamp")),
);

// Update watermark lag gauge (skip -1 sentinel)
if self.current_watermark.timestamp_millis() != -1 {
let lag = Utc::now().timestamp_millis() - self.current_watermark.timestamp_millis();
pipeline_metrics()
.reduce
.watermark_lag
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
.set(lag as f64);
}

// Handle late messages
if msg.is_late {
if self.current_watermark.timestamp_millis() == -1 {
Expand Down
16 changes: 16 additions & 0 deletions rust/numaflow-core/src/reduce/reducer/aligned/windower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ impl AlignedWindowManager {
AlignedWindowManager::Sliding(manager) => manager.oldest_window(),
}
}

/// Returns the number of currently active windows.
pub(crate) fn active_window_count(&self) -> usize {
match self {
AlignedWindowManager::Fixed(manager) => manager.active_window_count(),
AlignedWindowManager::Sliding(manager) => manager.active_window_count(),
}
}

/// Returns the number of closed windows awaiting GC.
pub(crate) fn closed_window_count(&self) -> usize {
match self {
AlignedWindowManager::Fixed(manager) => manager.closed_window_count(),
AlignedWindowManager::Sliding(manager) => manager.closed_window_count(),
}
}
}

/// A Window is represented by its start and end time. All the data which event time falls within
Expand Down
Loading
Loading