diff --git a/docs/operations/metrics/metrics.md b/docs/operations/metrics/metrics.md index 57bc8c3e2d..466c4b415e 100644 --- a/docs/operations/metrics/metrics.md +++ b/docs/operations/metrics/metrics.md @@ -55,6 +55,21 @@ These metrics are specific to map/UDF vertices. | `forwarder_udf_error_total` | Counter | | Total number of UDF errors | | `forwarder_udf_processing_time` | Histogram | | Processing times of User-Defined Functions (UDFs), in microseconds | +### Reduce Metrics + +These metrics are specific to reduce (windowed aggregation) vertices. + +| Metric name | Metric type | Additional Labels | Description | +|--------------------------------------|-------------|-------------------|----------------------------------------------------------------------| +| `reduce_active_windows` | Gauge | | Number of currently open reduce windows | +| `reduce_closed_windows` | Gauge | | Number of closed windows awaiting GC | +| `reduce_watermark_lag` | Gauge | | Difference between wall clock and watermark, in milliseconds | +| `reduce_window_processing_time` | Histogram | | Window open-to-close latency, in microseconds | +| `reduce_pnf_process_time` | Histogram | | UDF reduce function execution time per window, in microseconds | +| `reduce_pbq_write` | Counter | | Total data messages written to PBQ | + +> **Note:** All reduce metrics carry the standard pipeline common labels (`pipeline`, `vertex`, `replica`) with `vertex_type` always set to `ReduceUDF`. + ### Fallback Sink Metrics These metrics are specific to sink vertices with a fallback sink configured. diff --git a/rust/numaflow-core/src/metrics/mod.rs b/rust/numaflow-core/src/metrics/mod.rs index 033b9a3532..c7d50c902e 100644 --- a/rust/numaflow-core/src/metrics/mod.rs +++ b/rust/numaflow-core/src/metrics/mod.rs @@ -62,6 +62,7 @@ const MVTX_REGISTRY_GLOBAL_PREFIX: &str = "monovtx"; // Prefixes for the sub-registries const SINK_REGISTRY_PREFIX: &str = "sink"; const FALLBACK_SINK_REGISTRY_PREFIX: &str = "fallback_sink"; +const REDUCE_REGISTRY_PREFIX: &str = "reduce"; const ON_SUCCESS_SINK_REGISTRY_PREFIX: &str = "onsuccess_sink"; const TRANSFORMER_REGISTRY_PREFIX: &str = "transformer"; const UDF_REGISTRY_PREFIX: &str = "udf"; @@ -149,6 +150,14 @@ const SINK_TIME: &str = "time"; const FALLBACK_SINK_TIME: &str = "time"; const ON_SUCCESS_SINK_TIME: &str = "time"; +// reduce specific metrics +const REDUCE_ACTIVE_WINDOWS: &str = "active_windows"; +const REDUCE_CLOSED_WINDOWS: &str = "closed_windows"; +const REDUCE_WINDOW_PROCESSING_TIME: &str = "window_processing_time"; +const REDUCE_PNF_PROCESS_TIME: &str = "pnf_process_time"; +const REDUCE_WATERMARK_LAG: &str = "watermark_lag"; +const REDUCE_PBQ_WRITE_TOTAL: &str = "pbq_write"; + // jetstream isb processing times const JETSTREAM_ISB_READ_TIME_TOTAL: &str = "read_time_total"; const JETSTREAM_ISB_WRITE_TIME_TOTAL: &str = "write_time_total"; @@ -307,6 +316,8 @@ pub(crate) struct PipelineMetrics { pub(crate) sink_forwarder: SinkForwarderMetrics, pub(crate) jetstream_isb: JetStreamISBMetrics, pub(crate) pending_raw: Family, Gauge>, + // reduce specific metrics + pub(crate) reduce: ReduceMetrics, } /// Family of metrics for the sink @@ -342,6 +353,39 @@ pub(crate) struct UDFMetrics { pub(crate) errors_total: Family, Counter>, } +/// Family of metrics for the Reduce vertex +pub(crate) struct ReduceMetrics { + // gauges + pub(crate) active_windows: Family, Gauge>, + pub(crate) closed_windows: Family, Gauge>, + pub(crate) watermark_lag: Family, Gauge>, + // histograms + pub(crate) window_processing_time: Family, Histogram>, + pub(crate) pnf_process_time: Family, Histogram>, + // counters + pub(crate) pbq_write_total: Family, Counter>, +} + +impl ReduceMetrics { + pub(crate) fn new() -> Self { + Self { + active_windows: Family::, Gauge>::default(), + closed_windows: Family::, Gauge>::default(), + watermark_lag: Family::, Gauge>::default(), + window_processing_time: + Family::, Histogram>::new_with_constructor( + // 1ms to 60 minutes in microseconds + || Histogram::new(exponential_buckets_range(1000.0, 3_600_000_000.0, 10)), + ), + pnf_process_time: Family::, Histogram>::new_with_constructor( + // 1ms to 20 minutes in microseconds + || Histogram::new(exponential_buckets_range(1000.0, 1_200_000_000.0, 10)), + ), + pbq_write_total: Family::, Counter>::default(), + } + } +} + /// Generic forwarder metrics pub(crate) struct PipelineForwarderMetrics { // read counters @@ -763,6 +807,7 @@ impl PipelineMetrics { sink_forwarder: SinkForwarderMetrics::new(), jetstream_isb: JetStreamISBMetrics::new(), pending_raw: Family::, Gauge>::default(), + reduce: ReduceMetrics::new(), }; let mut registry = global_registry().registry.lock(); Self::register_forwarder_metrics(&metrics, &mut registry); @@ -770,6 +815,7 @@ impl PipelineMetrics { Self::register_sink_forwarder_metrics(&metrics, &mut registry); Self::register_jetstream_isb_metrics(&metrics, &mut registry); Self::register_vertex_metrics(&metrics, &mut registry); + Self::register_reduce_metrics(&metrics, &mut registry); metrics } @@ -1047,6 +1093,40 @@ impl PipelineMetrics { metrics.pending_raw.clone(), ); } + + fn register_reduce_metrics(metrics: &Self, registry: &mut Registry) { + let reduce_registry = registry.sub_registry_with_prefix(REDUCE_REGISTRY_PREFIX); + reduce_registry.register( + REDUCE_ACTIVE_WINDOWS, + "Number of currently open reduce windows", + metrics.reduce.active_windows.clone(), + ); + reduce_registry.register( + REDUCE_CLOSED_WINDOWS, + "Number of closed reduce windows awaiting GC", + metrics.reduce.closed_windows.clone(), + ); + reduce_registry.register( + REDUCE_WATERMARK_LAG, + "Difference between current wall clock and watermark in milliseconds", + metrics.reduce.watermark_lag.clone(), + ); + reduce_registry.register( + REDUCE_WINDOW_PROCESSING_TIME, + "Time from window open to window close in microseconds (1ms to 60 minutes)", + metrics.reduce.window_processing_time.clone(), + ); + reduce_registry.register( + REDUCE_PNF_PROCESS_TIME, + "Time for UDF reduce function to complete per window in microseconds (1ms to 20 minutes)", + metrics.reduce.pnf_process_time.clone(), + ); + reduce_registry.register( + REDUCE_PBQ_WRITE_TOTAL, + "Total number of messages written to PBQ", + metrics.reduce.pbq_write_total.clone(), + ); + } } /// MONOVTX_METRICS is the MonoVtxMetrics object which stores the metrics diff --git a/rust/numaflow-core/src/reduce/pbq.rs b/rust/numaflow-core/src/reduce/pbq.rs index 8d6787a4f4..5ac971620d 100644 --- a/rust/numaflow-core/src/reduce/pbq.rs +++ b/rust/numaflow-core/src/reduce/pbq.rs @@ -1,6 +1,8 @@ +use crate::config::pipeline::VERTEX_TYPE_REDUCE_UDF; use crate::error::Result; use crate::mark_success; -use crate::message::Message; +use crate::message::{Message, MessageType}; +use crate::metrics::{pipeline_metric_labels, pipeline_metrics}; use crate::pipeline::isb::reader::ISBReaderOrchestrator; use crate::reduce::wal::WalMessage; use crate::reduce::wal::segment::append::{AppendOnlyWal, SegmentWriteMessage}; @@ -114,9 +116,18 @@ impl PBQ { let msg: WalMessage = msg.try_into().map_err(|e| { crate::error::Error::Reduce(format!("Failed to parse WAL message: {e}")) })?; - messages_tx.send(msg.into()).await.map_err(|_| { + let message: Message = msg.into(); + let is_data = message.typ == MessageType::Data; + messages_tx.send(message).await.map_err(|_| { crate::error::Error::Reduce("PBQ WAL replay: receiver dropped".to_string()) })?; + if is_data { + pipeline_metrics() + .reduce + .pbq_write_total + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .inc(); + } replayed_count += 1; } @@ -153,11 +164,19 @@ impl PBQ { })?; // Send cloned message downstream + let is_data = message.typ == MessageType::Data; tx.send(message).await.map_err(|_| { crate::error::Error::Reduce( "PBQ ISB reader: downstream receiver dropped".to_string(), ) })?; + if is_data { + pipeline_metrics() + .reduce + .pbq_write_total + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .inc(); + } } isb_handle @@ -194,11 +213,19 @@ impl PBQ { while let Some(read_msg) = isb_stream.next().await { // Extract the message and forward to output channel let message = read_msg.message().clone(); + let is_data = message.typ == MessageType::Data; tx.send(message).await.map_err(|_| { crate::error::Error::Reduce( "PBQ ISB reader: downstream receiver dropped".to_string(), ) })?; + if is_data { + pipeline_metrics() + .reduce + .pbq_write_total + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .inc(); + } // Mark the read message as success after processing mark_success!(read_msg); } diff --git a/rust/numaflow-core/src/reduce/reducer/aligned/reducer.rs b/rust/numaflow-core/src/reduce/reducer/aligned/reducer.rs index 150d2491b2..28cc50bebe 100644 --- a/rust/numaflow-core/src/reduce/reducer/aligned/reducer.rs +++ b/rust/numaflow-core/src/reduce/reducer/aligned/reducer.rs @@ -1,7 +1,8 @@ +use crate::config::pipeline::VERTEX_TYPE_REDUCE_UDF; use crate::config::{get_vertex_name, get_vertex_replica}; use crate::error::Error; use crate::message::{Message, MessageType}; -use crate::metrics::{pipeline_drop_metric_labels, pipeline_metrics}; +use crate::metrics::{pipeline_drop_metric_labels, pipeline_metric_labels, pipeline_metrics}; use crate::pipeline::isb::writer::ISBWriterOrchestrator; use crate::reduce::reducer::aligned::user_defined::UserDefinedAlignedReduce; use crate::reduce::reducer::aligned::windower::{ @@ -15,7 +16,7 @@ use numaflow_pb::objects::wal::GcEvent; use std::collections::HashMap; use std::ops::Sub; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_stream::StreamExt; @@ -44,6 +45,7 @@ struct ReduceTask { error_tx: mpsc::Sender, window: Window, window_manager: AlignedWindowManager, + window_open_time: Instant, } impl ReduceTask { @@ -63,6 +65,7 @@ impl ReduceTask { error_tx, window, window_manager, + window_open_time: Instant::now(), } } @@ -92,10 +95,17 @@ impl ReduceTask { // Call the reduce function. This is a blocking call and will return only once the window // is closed, cancellation is detected, or on error. The output is sent to the result_tx // channel which is consumed by the writer task and published to JetStream. + let udf_start = Instant::now(); let result = self .client .reduce_fn(message_stream, result_tx, cln_token) .await; + // recorded unconditionally; error/cancellation paths return below before window_processing_time + pipeline_metrics() + .reduce + .pnf_process_time + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .observe(udf_start.elapsed().as_micros() as f64); if let Err(e) = result { // Check if this is a cancellation error @@ -126,10 +136,24 @@ impl ReduceTask { .oldest_window() .expect("no oldest window found"); + // Record window processing time (open to results-written) + pipeline_metrics() + .reduce + .window_processing_time + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .observe(self.window_open_time.elapsed().as_micros() as f64); + // we can safely delete the window from the window manager since the results are // successfully written to jetstream and watermark is published. self.window_manager.gc_window(self.window.clone()); + // Update closed windows gauge after GC removes the window + pipeline_metrics() + .reduce + .closed_windows + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(self.window_manager.closed_window_count() as i64); + // now that the processing is done, we can add this window to the GC WAL. let Some(gc_wal_tx) = &self.gc_wal_tx else { // return if the GC WAL is not configured @@ -281,6 +305,13 @@ impl AlignedReduceActor { }, ); + // Update active windows gauge + pipeline_metrics() + .reduce + .active_windows + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(self.window_manager.active_window_count() as i64); + // Send the open command with the first message if let Err(e) = message_tx.send(window_msg).await && !self.cln_token.is_cancelled() @@ -340,6 +371,18 @@ impl AlignedReduceActor { return; }; + // Update active and closed windows gauges + pipeline_metrics() + .reduce + .active_windows + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(self.window_manager.active_window_count() as i64); + pipeline_metrics() + .reduce + .closed_windows + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(self.window_manager.closed_window_count() as i64); + // we don't need to write the close message to the client, stream closing // is considered as close for aligned windows. // Drop the sender to signal completion @@ -475,6 +518,12 @@ impl AlignedReducer { // Only close windows if the idle watermark is greater than current watermark if idle_watermark > self.current_watermark { self.current_watermark = idle_watermark; + let lag = (Utc::now().timestamp_millis() - self.current_watermark.timestamp_millis()).max(0); + pipeline_metrics() + .reduce + .watermark_lag + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(lag as f64); self.close_windows_with_watermark(idle_watermark, &actor_tx).await; } } @@ -579,6 +628,17 @@ impl AlignedReducer { .unwrap_or(DateTime::from_timestamp_millis(-1).expect("Invalid timestamp")), ); + // Update watermark lag gauge (skip -1 sentinel) + if self.current_watermark.timestamp_millis() != -1 { + let lag = + (Utc::now().timestamp_millis() - self.current_watermark.timestamp_millis()).max(0); + pipeline_metrics() + .reduce + .watermark_lag + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(lag as f64); + } + // Handle late messages if msg.is_late { if self.current_watermark.timestamp_millis() == -1 { diff --git a/rust/numaflow-core/src/reduce/reducer/aligned/windower.rs b/rust/numaflow-core/src/reduce/reducer/aligned/windower.rs index 845687b834..a8a7cf8430 100644 --- a/rust/numaflow-core/src/reduce/reducer/aligned/windower.rs +++ b/rust/numaflow-core/src/reduce/reducer/aligned/windower.rs @@ -62,6 +62,22 @@ impl AlignedWindowManager { AlignedWindowManager::Sliding(manager) => manager.oldest_window(), } } + + /// Returns the number of currently active windows. + pub(crate) fn active_window_count(&self) -> usize { + match self { + AlignedWindowManager::Fixed(manager) => manager.active_window_count(), + AlignedWindowManager::Sliding(manager) => manager.active_window_count(), + } + } + + /// Returns the number of closed windows awaiting GC. + pub(crate) fn closed_window_count(&self) -> usize { + match self { + AlignedWindowManager::Fixed(manager) => manager.closed_window_count(), + AlignedWindowManager::Sliding(manager) => manager.closed_window_count(), + } + } } /// A Window is represented by its start and end time. All the data which event time falls within diff --git a/rust/numaflow-core/src/reduce/reducer/aligned/windower/fixed.rs b/rust/numaflow-core/src/reduce/reducer/aligned/windower/fixed.rs index 7110820265..bdc3dad4c2 100644 --- a/rust/numaflow-core/src/reduce/reducer/aligned/windower/fixed.rs +++ b/rust/numaflow-core/src/reduce/reducer/aligned/windower/fixed.rs @@ -151,6 +151,22 @@ impl FixedWindowManager { .next() .cloned() } + + /// Returns the number of currently active windows. + pub(crate) fn active_window_count(&self) -> usize { + self.active_windows + .read() + .expect("Poisoned lock for active_windows") + .len() + } + + /// Returns the number of closed windows awaiting GC. + pub(crate) fn closed_window_count(&self) -> usize { + self.closed_windows + .read() + .expect("Poisoned lock for closed_windows") + .len() + } } #[cfg(test)] diff --git a/rust/numaflow-core/src/reduce/reducer/aligned/windower/sliding.rs b/rust/numaflow-core/src/reduce/reducer/aligned/windower/sliding.rs index 142cc7b87b..c46d7de551 100644 --- a/rust/numaflow-core/src/reduce/reducer/aligned/windower/sliding.rs +++ b/rust/numaflow-core/src/reduce/reducer/aligned/windower/sliding.rs @@ -223,6 +223,22 @@ impl SlidingWindowManager { .cloned() } + /// Returns the number of currently active windows. + pub(crate) fn active_window_count(&self) -> usize { + self.active_windows + .read() + .expect("Poisoned lock for active_windows") + .len() + } + + /// Returns the number of closed windows awaiting GC. + pub(crate) fn closed_window_count(&self) -> usize { + self.closed_windows + .read() + .expect("Poisoned lock for closed_windows") + .len() + } + /// Helper method to format sorted window information for logging. fn format_windows_for_log(windows: &[ProtoWindow]) -> String { let formatted_windows: String = windows diff --git a/rust/numaflow-core/src/reduce/reducer/unaligned/reducer.rs b/rust/numaflow-core/src/reduce/reducer/unaligned/reducer.rs index 4396e9a76d..3c063497f3 100644 --- a/rust/numaflow-core/src/reduce/reducer/unaligned/reducer.rs +++ b/rust/numaflow-core/src/reduce/reducer/unaligned/reducer.rs @@ -10,9 +10,10 @@ use crate::reduce::reducer::unaligned::windower::{ use crate::reduce::wal::segment::append::{AppendOnlyWal, SegmentWriteMessage}; use crate::typ::NumaflowTypeConfig; +use crate::config::pipeline::VERTEX_TYPE_REDUCE_UDF; use crate::config::{get_vertex_name, get_vertex_replica}; use crate::jh_abort_guard; -use crate::metrics::{pipeline_drop_metric_labels, pipeline_metrics}; +use crate::metrics::{pipeline_drop_metric_labels, pipeline_metric_labels, pipeline_metrics}; use chrono::{DateTime, Utc}; use numaflow_pb::clients::accumulator::AccumulatorRequest; use numaflow_pb::clients::sessionreduce::SessionReduceRequest; @@ -371,6 +372,17 @@ impl ReduceTask { for (_keys, window) in self.tracked_windows.drain() { self.window_manager.delete_window(window); } + // update gauges immediately so the metric reflects GC without waiting for the next message + pipeline_metrics() + .reduce + .active_windows + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(self.window_manager.active_window_count() as i64); + pipeline_metrics() + .reduce + .closed_windows + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(self.window_manager.closed_window_count() as i64); } } @@ -625,7 +637,23 @@ impl UnalignedReducer { // Only close windows if the idle watermark is greater than current watermark if idle_watermark > self.current_watermark { self.current_watermark = idle_watermark; + let lag = (Utc::now().timestamp_millis() - self.current_watermark.timestamp_millis()).max(0); + pipeline_metrics() + .reduce + .watermark_lag + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(lag as f64); self.close_windows_by_wm(idle_watermark, &actor_tx).await; + pipeline_metrics() + .reduce + .active_windows + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(self.window_manager.active_window_count() as i64); + pipeline_metrics() + .reduce + .closed_windows + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(self.window_manager.closed_window_count() as i64); } } continue; // Skip further processing for WMB messages @@ -638,6 +666,16 @@ impl UnalignedReducer { } self.assign_and_close_windows(msg, &actor_tx).await; + pipeline_metrics() + .reduce + .active_windows + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(self.window_manager.active_window_count() as i64); + pipeline_metrics() + .reduce + .closed_windows + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(self.window_manager.closed_window_count() as i64); } None => { // Stream ended @@ -724,6 +762,17 @@ impl UnalignedReducer { .unwrap_or(DateTime::from_timestamp_millis(-1).expect("Invalid timestamp")), ); + // Update watermark lag gauge (skip -1 sentinel) + if self.current_watermark.timestamp_millis() != -1 { + let lag = + (Utc::now().timestamp_millis() - self.current_watermark.timestamp_millis()).max(0); + pipeline_metrics() + .reduce + .watermark_lag + .get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF)) + .set(lag as f64); + } + // Handle late messages if msg.is_late { if self.current_watermark.timestamp_millis() == -1 { diff --git a/rust/numaflow-core/src/reduce/reducer/unaligned/windower.rs b/rust/numaflow-core/src/reduce/reducer/unaligned/windower.rs index 78476c7d3d..0e694a77e8 100644 --- a/rust/numaflow-core/src/reduce/reducer/unaligned/windower.rs +++ b/rust/numaflow-core/src/reduce/reducer/unaligned/windower.rs @@ -194,6 +194,23 @@ impl UnalignedWindowManager { UnalignedWindowManager::Session(manager) => manager.oldest_window_end_time(), } } + + /// Returns the number of currently active windows. + pub(crate) fn active_window_count(&self) -> usize { + match self { + UnalignedWindowManager::Accumulator(manager) => manager.active_window_count(), + UnalignedWindowManager::Session(manager) => manager.active_window_count(), + } + } + + /// Returns the number of closed windows awaiting GC. + /// Accumulator doesn't track closed windows separately, so returns 0. + pub(crate) fn closed_window_count(&self) -> usize { + match self { + UnalignedWindowManager::Accumulator(_) => 0, + UnalignedWindowManager::Session(manager) => manager.closed_window_count(), + } + } } /// Combines keys into a single string for use as a map key diff --git a/rust/numaflow-core/src/reduce/reducer/unaligned/windower/accumulator.rs b/rust/numaflow-core/src/reduce/reducer/unaligned/windower/accumulator.rs index 5e359096d7..9ac2c54cb6 100644 --- a/rust/numaflow-core/src/reduce/reducer/unaligned/windower/accumulator.rs +++ b/rust/numaflow-core/src/reduce/reducer/unaligned/windower/accumulator.rs @@ -211,6 +211,11 @@ impl AccumulatorWindowManager { .filter_map(|window_state| window_state.oldest_timestamp()) .min() } + + /// Returns the number of currently active windows (unique key slots). + pub(crate) fn active_window_count(&self) -> usize { + self.active_windows.read().expect("Poisoned lock").len() + } } #[cfg(test)] diff --git a/rust/numaflow-core/src/reduce/reducer/unaligned/windower/session.rs b/rust/numaflow-core/src/reduce/reducer/unaligned/windower/session.rs index 4263af38de..48e4d08795 100644 --- a/rust/numaflow-core/src/reduce/reducer/unaligned/windower/session.rs +++ b/rust/numaflow-core/src/reduce/reducer/unaligned/windower/session.rs @@ -419,6 +419,21 @@ impl SessionWindowManager { .map(|window| window.end_time) .min() } + + /// Returns the total number of currently active windows across all keys. + pub(crate) fn active_window_count(&self) -> usize { + self.active_windows + .read() + .expect("Poisoned lock") + .values() + .map(|s| s.len()) + .sum() + } + + /// Returns the number of closed windows awaiting GC. + pub(crate) fn closed_window_count(&self) -> usize { + self.closed_windows.read().expect("Poisoned lock").len() + } } #[cfg(test)]