diff --git a/rust/otap-dataflow/benchmarks/benches/control_channel/main.rs b/rust/otap-dataflow/benchmarks/benches/control_channel/main.rs index 87e273c2c0..127ffad01d 100644 --- a/rust/otap-dataflow/benchmarks/benches/control_channel/main.rs +++ b/rust/otap-dataflow/benchmarks/benches/control_channel/main.rs @@ -141,7 +141,8 @@ async fn consume_current_local( NodeControlMsg::DrainIngress { .. } | NodeControlMsg::MemoryPressureChanged { .. } | NodeControlMsg::Shutdown { .. } - | NodeControlMsg::DelayedData { .. } => { + | NodeControlMsg::DelayedData { .. } + | NodeControlMsg::Wakeup { .. } => { panic!("unexpected message in benchmark current local receiver"); } } @@ -205,7 +206,8 @@ async fn consume_current_shared( NodeControlMsg::DrainIngress { .. } | NodeControlMsg::MemoryPressureChanged { .. } | NodeControlMsg::Shutdown { .. } - | NodeControlMsg::DelayedData { .. } => { + | NodeControlMsg::DelayedData { .. } + | NodeControlMsg::Wakeup { .. } => { panic!("unexpected message in benchmark current shared receiver"); } } diff --git a/rust/otap-dataflow/crates/core-nodes/src/exporters/otap_exporter/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/exporters/otap_exporter/mod.rs index 190467917b..8652b56030 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/exporters/otap_exporter/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/exporters/otap_exporter/mod.rs @@ -1280,11 +1280,13 @@ mod tests { let (batch_received_tx, mut batch_received_rx) = tokio::sync::mpsc::channel(1); let (server_shutdown_tx, server_shutdown_rx) = tokio::sync::oneshot::channel(); + let (server_ready_tx, server_ready_rx) = tokio::sync::oneshot::channel(); // Start gRPC server that returns errors let listening_addr: SocketAddr = format!("{grpc_addr}:{grpc_port}").parse().unwrap(); let server_handle = tokio_rt.spawn(async move { let tcp_listener = TcpListener::bind(listening_addr).await.unwrap(); + let _ = server_ready_tx.send(()); let tcp_stream = TcpListenerStream::new(tcp_listener); let error_service = ArrowLogsServiceServer::new(ArrowLogsServiceGrpcErrorMock { sender: batch_received_tx, @@ -1316,6 +1318,10 @@ mod tests { }); tokio::join!(local_set, async { + server_ready_rx + .await + .expect("server should bind before exporter traffic starts"); + // Send a batch with ACK/NACK subscription let log_message = create_otap_batch(LOG_BATCH_ID, ArrowPayloadType::Logs); let pdata = OtapPdata::new_default(log_message.into()).test_subscribe_to( diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/batch_processor/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/batch_processor/mod.rs index f180cfcffd..f0b7ee5b46 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/batch_processor/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/batch_processor/mod.rs @@ -36,9 +36,10 @@ use otap_df_config::node::NodeUserConfig; use otap_df_config::{SignalFormat, SignalType}; use otap_df_engine::MessageSourceLocalEffectHandlerExtension; use otap_df_engine::{ - ConsumerEffectHandlerExtension, Interests, ProducerEffectHandlerExtension, + ConsumerEffectHandlerExtension, Interests, LocalWakeupRequirements, + ProcessorRuntimeRequirements, ProducerEffectHandlerExtension, config::ProcessorConfig, - control::{AckMsg, CallData, NackMsg, NodeControlMsg}, + control::{AckMsg, CallData, NackMsg, NodeControlMsg, WakeupSlot}, error::{Error as EngineError, ProcessorErrorKind}, local::processor as local, message::Message, @@ -78,6 +79,33 @@ pub const DEFAULT_MAX_BATCH_DURATION_MS: u64 = 200; const LOG_MSG_BATCHING_FAILED_PREFIX: &str = "OTAP batch processor: low-level batching failed for"; const LOG_MSG_BATCHING_FAILED_SUFFIX: &str = "; dropping"; +// Encodes each supported (format, signal) pair into a distinct batch-local +// wakeup slot. +const fn wakeup_slot(format: SignalFormat, signal: SignalType) -> WakeupSlot { + let format_base = match format { + SignalFormat::OtapRecords => 0, + SignalFormat::OtlpBytes => 3, + }; + let signal_offset = match signal { + SignalType::Logs => 0, + SignalType::Metrics => 1, + SignalType::Traces => 2, + }; + WakeupSlot(format_base + signal_offset) +} + +const fn signal_from_wakeup_slot(slot: WakeupSlot) -> Option<(SignalFormat, SignalType)> { + match slot.0 { + 0 => Some((SignalFormat::OtapRecords, SignalType::Logs)), + 1 => Some((SignalFormat::OtapRecords, SignalType::Metrics)), + 2 => Some((SignalFormat::OtapRecords, SignalType::Traces)), + 3 => Some((SignalFormat::OtlpBytes, SignalType::Logs)), + 4 => Some((SignalFormat::OtlpBytes, SignalType::Metrics)), + 5 => Some((SignalFormat::OtlpBytes, SignalType::Traces)), + _ => None, + } +} + /// How to size a batch. /// /// Note: these are not always supported. In the present code, the only @@ -149,9 +177,9 @@ trait Batcher { records: Vec, ) -> Result, PDataError>; - /// We are using an empty DelayData request as a one-shot - /// timer. This returns the appropriate empty request. - /// TODO: Add proper one-shot timer and cancellation, see #1472. + fn wakeup_slot(signal: SignalType) -> WakeupSlot; + + /// Returns the appropriate empty request payload for this signal. fn empty(signal: SignalType) -> T; } @@ -446,6 +474,12 @@ where metrics: &'a mut MetricSet, } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum ActiveBatchProcessorFormatKind { + Otap, + Otlp, +} + /// There are three reasons to flush. #[derive(Copy, Clone, Debug, PartialEq, Eq)] enum FlushReason { @@ -539,6 +573,20 @@ async fn log_batching_failed( } impl BatchProcessor { + fn no_active_format_error() -> EngineError { + EngineError::InternalError { + message: "batch processor has no active format state".to_owned(), + } + } + + const fn local_wakeup_requirements(&self) -> LocalWakeupRequirements { + let live_slots = match self.config.format { + BatchingFormat::Otap | BatchingFormat::Otlp => 3, + BatchingFormat::Preserve => 6, + }; + LocalWakeupRequirements::new(live_slots) + } + /// Parse JSON config and build the processor instance with the provided metrics set. /// This function does not wrap the processor into a ProcessorWrapper so callers can /// preserve the original NodeUserConfig (including outputs/default_output). @@ -632,6 +680,27 @@ impl BatchProcessor { }) } + fn format_for_signal_format( + &self, + signal_format: SignalFormat, + ) -> Option { + match signal_format { + SignalFormat::OtapRecords if self.otap_signals.is_some() => { + Some(ActiveBatchProcessorFormatKind::Otap) + } + SignalFormat::OtapRecords if self.otlp_signals.is_some() => { + Some(ActiveBatchProcessorFormatKind::Otlp) + } + SignalFormat::OtlpBytes if self.otlp_signals.is_some() => { + Some(ActiveBatchProcessorFormatKind::Otlp) + } + SignalFormat::OtlpBytes if self.otap_signals.is_some() => { + Some(ActiveBatchProcessorFormatKind::Otap) + } + _ => None, + } + } + /// Process one incoming batch. Immediately acks empty requests. /// If this input causes pending data to exceed the lower bound, it will /// flush at least one output. @@ -670,33 +739,33 @@ impl BatchProcessor { match payload { OtapPayload::OtapArrowRecords(otap) => { - if self.otap_signals.is_some() { - self.otap_format() - .expect("some") + if let Some(mut otap_format) = self.otap_format() { + otap_format .for_signal(signal) .accept_payload(effect, ctx, otap, items) .await? - } else { - self.otlp_format() - .expect("some") + } else if let Some(mut otlp_format) = self.otlp_format() { + otlp_format .for_signal(signal) .accept_payload(effect, ctx, otap.try_into()?, items) .await? + } else { + return Err(Self::no_active_format_error()); } } OtapPayload::OtlpBytes(otlp) => { - if self.otlp_signals.is_some() { - self.otlp_format() - .expect("some") + if let Some(mut otlp_format) = self.otlp_format() { + otlp_format .for_signal(signal) .accept_payload(effect, ctx, otlp, items) .await? - } else { - self.otap_format() - .expect("some") + } else if let Some(mut otap_format) = self.otap_format() { + otap_format .for_signal(signal) .accept_payload(effect, ctx, otlp.try_into()?, items) .await? + } else { + return Err(Self::no_active_format_error()); } } }; @@ -743,6 +812,10 @@ impl Batcher for SignalBuffer { SignalType::Traces => OtapArrowRecords::Traces(otap_df_pdata::otap::Traces::default()), } } + + fn wakeup_slot(signal: SignalType) -> WakeupSlot { + wakeup_slot(SignalFormat::OtapRecords, signal) + } } impl Batcher for SignalBuffer { @@ -763,6 +836,10 @@ impl Batcher for SignalBuffer { SignalType::Traces => OtlpProtoBytes::ExportTracesRequest(Bytes::new()), } } + + fn wakeup_slot(signal: SignalType) -> WakeupSlot { + wakeup_slot(SignalFormat::OtlpBytes, signal) + } } impl<'a, T: OtapPayloadHelpers> BatchProcessorSignal<'a, T> @@ -848,6 +925,8 @@ where return Ok(()); } + let _ = effect.cancel_wakeup(SignalBuffer::::wakeup_slot(self.signal)); + // If this is a timer-based flush and we were called too soon, // skip. this may happen if the batch for which the timer was set // flushes for size before the timer. @@ -1042,21 +1121,22 @@ impl BatchProcessor { } let signal = retdata.signal_type(); - match retdata.signal_format() { - SignalFormat::OtapRecords => { + match self.format_for_signal_format(retdata.signal_format()) { + Some(ActiveBatchProcessorFormatKind::Otap) => { self.otap_format() - .expect("some") + .expect("otap batch state must exist when otap format kind is selected") .for_signal(signal) .handle(signal, calldata, effect, res) .await } - SignalFormat::OtlpBytes => { + Some(ActiveBatchProcessorFormatKind::Otlp) => { self.otlp_format() - .expect("some") + .expect("otlp batch state must exist when otlp format kind is selected") .for_signal(signal) .handle(signal, calldata, effect, res) .await } + None => Err(Self::no_active_format_error()), } } } @@ -1080,53 +1160,90 @@ pub fn create_otap_batch_processor( #[async_trait(?Send)] impl local::Processor for BatchProcessor { + fn runtime_requirements(&self) -> ProcessorRuntimeRequirements { + ProcessorRuntimeRequirements { + local_wakeups: Some(self.local_wakeup_requirements()), + } + } + async fn process( &mut self, msg: Message, effect: &mut local::EffectHandler, ) -> Result<(), EngineError> { match msg { - Message::Control(ctrl) => match ctrl { - NodeControlMsg::Config { .. } => Ok(()), - NodeControlMsg::Shutdown { .. } => { - self.flush_shutdown(effect).await?; - Ok(()) - } - NodeControlMsg::CollectTelemetry { - mut metrics_reporter, - } => metrics_reporter.report(&mut self.metrics).map_err(|e| { - EngineError::InternalError { - message: e.to_string(), + Message::Control(ctrl) => { + match ctrl { + NodeControlMsg::Config { .. } => Ok(()), + NodeControlMsg::Shutdown { .. } => { + self.flush_shutdown(effect).await?; + Ok(()) } - }), - NodeControlMsg::DelayedData { data, when } => { - let signal = data.signal_type(); + NodeControlMsg::CollectTelemetry { + mut metrics_reporter, + } => metrics_reporter.report(&mut self.metrics).map_err(|e| { + EngineError::InternalError { + message: e.to_string(), + } + }), + NodeControlMsg::Wakeup { slot, when, .. } => { + let Some((format, signal)) = signal_from_wakeup_slot(slot) else { + return Ok(()); + }; + + match format { + SignalFormat::OtapRecords => { + if let Some(mut otap_format) = self.otap_format() { + otap_format + .for_signal(signal) + .flush_signal_impl(effect, when, FlushReason::Timer) + .await?; + } + } + SignalFormat::OtlpBytes => { + if let Some(mut otlp_format) = self.otlp_format() { + otlp_format + .for_signal(signal) + .flush_signal_impl(effect, when, FlushReason::Timer) + .await?; + } + } + }; - match data.signal_format() { - SignalFormat::OtapRecords => { - self.otap_format() - .expect("some") + Ok(()) + } + NodeControlMsg::DelayedData { data, when } => { + let signal = data.signal_type(); + + match self.format_for_signal_format(data.signal_format()) { + Some(ActiveBatchProcessorFormatKind::Otap) => self + .otap_format() + .expect( + "otap batch state must exist when otap format kind is selected", + ) .for_signal(signal) .flush_signal_impl(effect, when, FlushReason::Timer) - .await? - } - SignalFormat::OtlpBytes => { - self.otlp_format() - .expect("some") + .await?, + Some(ActiveBatchProcessorFormatKind::Otlp) => self + .otlp_format() + .expect( + "otlp batch state must exist when otlp format kind is selected", + ) .for_signal(signal) .flush_signal_impl(effect, when, FlushReason::Timer) - .await? - } - }; + .await?, + None => return Err(Self::no_active_format_error()), + }; - Ok(()) + Ok(()) + } + NodeControlMsg::Ack(ack) => self.handle_ack(effect, ack).await, + NodeControlMsg::Nack(nack) => self.handle_nack(effect, nack).await, + NodeControlMsg::DrainIngress { .. } => Ok(()), + NodeControlMsg::TimerTick { .. } => unreachable!(), + NodeControlMsg::MemoryPressureChanged { .. } => Ok(()), } - NodeControlMsg::Ack(ack) => self.handle_ack(effect, ack).await, - NodeControlMsg::Nack(nack) => self.handle_nack(effect, nack).await, - NodeControlMsg::MemoryPressureChanged { .. } => Ok(()), - NodeControlMsg::DrainIngress { .. } => Ok(()), - NodeControlMsg::TimerTick { .. } => unreachable!(), - }, + } Message::PData(request) => self.process_signal_impl(effect, request).await, } } @@ -1327,18 +1444,12 @@ where self.arrival = Some(now); effect - .delay_data( - now + timeout, - Box::new(OtapPdata::new( - Context::default(), - Self::empty(signal).into(), - )), - ) - .await + .set_wakeup(Self::wakeup_slot(signal), now + timeout) + .map(|_| ()) .map_err(|_| EngineError::ProcessorError { processor: effect.processor_id(), kind: ProcessorErrorKind::Other, - error: "could not set one-shot timer".into(), + error: "could not set wakeup".into(), source_detail: "".into(), }) } @@ -1368,12 +1479,12 @@ mod tests { use otap_df_engine::config::ProcessorConfig; use otap_df_engine::context::ControllerContext; use otap_df_engine::control::{ - NodeControlMsg, PipelineCompletionMsg, RuntimeControlMsg, pipeline_completion_msg_channel, + NodeControlMsg, PipelineCompletionMsg, pipeline_completion_msg_channel, runtime_ctrl_msg_channel, }; use otap_df_engine::message::Message; use otap_df_engine::node::Node; - use otap_df_engine::testing::liveness::{next_completion, next_runtime_control}; + use otap_df_engine::testing::liveness::next_completion; use otap_df_engine::testing::processor::TestRuntime; use otap_df_engine::testing::test_node; use otap_df_otap::pdata::OtapPdata; @@ -1621,7 +1732,7 @@ mod tests { #[derive(Clone)] enum TestEvent { Input(OtlpProtoMessage), - Elapsed, // Signal to deliver all pending DelayedData messages + Elapsed, // Signal to deliver due wakeups } /// Policy for acking or nacking an output @@ -1658,6 +1769,17 @@ mod tests { otap_to_otlp(&rec) } + const fn all_wakeup_slots() -> [WakeupSlot; 6] { + [ + wakeup_slot(SignalFormat::OtapRecords, SignalType::Logs), + wakeup_slot(SignalFormat::OtapRecords, SignalType::Metrics), + wakeup_slot(SignalFormat::OtapRecords, SignalType::Traces), + wakeup_slot(SignalFormat::OtlpBytes, SignalType::Logs), + wakeup_slot(SignalFormat::OtlpBytes, SignalType::Metrics), + wakeup_slot(SignalFormat::OtlpBytes, SignalType::Traces), + ] + } + fn run_batch_processor_test( events: impl Iterator, subscribe: bool, @@ -1687,10 +1809,8 @@ mod tests { phase .run_test(move |mut ctx| async move { - let (runtime_ctrl_tx, mut runtime_ctrl_rx) = runtime_ctrl_msg_channel(10); let (pipeline_completion_tx, mut pipeline_completion_rx) = pipeline_completion_msg_channel(10); - ctx.set_runtime_ctrl_sender(runtime_ctrl_tx); ctx.set_pipeline_completion_sender(pipeline_completion_tx); // Track outputs by event position @@ -1703,16 +1823,11 @@ mod tests { let mut received_acks: Vec = Vec::new(); let mut received_nacks: Vec = Vec::new(); - // Track latest DelayedData message - let mut pending_delay: Option<(Instant, Box)> = None; let mut input_idx = 0; let mut total_outputs = 0; // Process each event in sequence for (event_idx, event) in events.into_iter().enumerate() { - // Determine if this is an elapsed event - let is_elapsed = matches!(event, TestEvent::Elapsed); - // Process the event match event { TestEvent::Input(input_otlp) => { @@ -1745,20 +1860,16 @@ mod tests { input_idx += 1; } TestEvent::Elapsed => { - // Elapsed event - no input to process - } - } - - // If this is an Elapsed event, deliver the pending DelayedData if present - if is_elapsed { - if let Some((when, data)) = pending_delay.take() { - // Note we deliver "when" exactly as the DelayData requested, - // which is a future timestamp; however it's the deadline requested, - // and since "when" passes through, the comparison is succesful using - // the expected instant. - let delayed_msg = - Message::Control(NodeControlMsg::DelayedData { when, data }); - ctx.process(delayed_msg).await.expect("process delayed"); + let when = Instant::now() + Duration::from_secs(1); + for slot in all_wakeup_slots() { + ctx.process(Message::Control(NodeControlMsg::Wakeup { + slot, + when, + revision: 0, + })) + .await + .expect("process wakeup"); + } } } @@ -1800,22 +1911,6 @@ mod tests { } } - // Drain control channel for DelayData requests and acks/nacks - loop { - match runtime_ctrl_rx.try_recv() { - Ok(RuntimeControlMsg::DelayData { when, data, .. }) => { - looped += 1; - pending_delay = Some((when, data)); - } - Ok(_) => { - panic!("unexpected case"); - } - Err(_) => { - break; - } - } - } - loop { match pipeline_completion_rx.try_recv() { Ok(PipelineCompletionMsg::DeliverAck { ack }) => { @@ -2011,11 +2106,44 @@ mod tests { test_timer_flush(datagen.generate_logs().into(), true); } - // The processor schedules one-shot DelayedData wakeups without cancelling older - // ones. This test proves that a stale wakeup is ignored and that the current - // wakeup still flushes the buffered input later. + /// Scenario: the batch processor derives wakeup slots from the supported + /// `(format, signal)` pairs used by its internal timers. + /// Guarantees: every supported pair round-trips through the encoder/decoder, + /// and each pair maps to a distinct wakeup slot. #[test] - fn test_timer_flush_ignores_stale_delayed_wakeup() { + fn test_wakeup_slot_round_trip_and_uniqueness() { + let slots = [ + (SignalFormat::OtapRecords, SignalType::Logs), + (SignalFormat::OtapRecords, SignalType::Metrics), + (SignalFormat::OtapRecords, SignalType::Traces), + (SignalFormat::OtlpBytes, SignalType::Logs), + (SignalFormat::OtlpBytes, SignalType::Metrics), + (SignalFormat::OtlpBytes, SignalType::Traces), + ]; + + for (expected_format, expected_signal) in slots { + let slot = wakeup_slot(expected_format, expected_signal); + assert_eq!( + signal_from_wakeup_slot(slot), + Some((expected_format, expected_signal)) + ); + } + + let mut unique = std::collections::HashSet::new(); + for (format, signal) in slots { + assert!( + unique.insert(wakeup_slot(format, signal)), + "slot mapping should be unique for each format/signal pair" + ); + } + assert_eq!(unique.len(), 6); + } + + // The processor replaces wakeups per slot. This test proves that an early + // wakeup is ignored and that the current wakeup still flushes the buffered + // input later. + #[test] + fn test_timer_flush_ignores_stale_wakeup() { let (telemetry_registry, metrics_reporter, phase) = setup_test_runtime(json!({ "otap": { "min_size": 5, @@ -2027,9 +2155,6 @@ mod tests { phase .run_test(move |mut ctx| async move { - let (runtime_ctrl_tx, mut runtime_ctrl_rx) = runtime_ctrl_msg_channel(10); - ctx.set_runtime_ctrl_sender(runtime_ctrl_tx); - let mut datagen = DataGenerator::new(1); let first = datagen.generate_logs(); let second = datagen.generate_logs(); @@ -2044,20 +2169,6 @@ mod tests { "first input should remain buffered" ); - let RuntimeControlMsg::DelayData { - when: stale_when, - data: stale_data, - .. - } = next_runtime_control( - &mut runtime_ctrl_rx, - Duration::from_secs(1), - "initial batch timer wakeup", - ) - .await - else { - panic!("expected initial DelayData"); - }; - // The second input takes the buffer over the min size, so the processor flushes // before the original timer fires. let rec = encode_logs_otap_batch(&second).expect("encode logs"); @@ -2076,37 +2187,27 @@ mod tests { "new post-flush batch should remain buffered" ); - let RuntimeControlMsg::DelayData { - when: current_when, - data: current_data, - .. - } = next_runtime_control( - &mut runtime_ctrl_rx, - Duration::from_secs(1), - "replacement batch timer wakeup", - ) - .await - else { - panic!("expected replacement DelayData"); - }; - - ctx.process(Message::Control(NodeControlMsg::DelayedData { + let stale_when = Instant::now(); + ctx.process(Message::Control(NodeControlMsg::Wakeup { + slot: wakeup_slot(SignalFormat::OtapRecords, SignalType::Logs), when: stale_when, - data: stale_data, + revision: 0, })) .await - .expect("process stale delayed data"); + .expect("process stale wakeup"); assert!( ctx.drain_pdata().await.is_empty(), "stale wakeup should be ignored" ); - ctx.process(Message::Control(NodeControlMsg::DelayedData { + let current_when = Instant::now() + Duration::from_secs(1); + ctx.process(Message::Control(NodeControlMsg::Wakeup { + slot: wakeup_slot(SignalFormat::OtapRecords, SignalType::Logs), when: current_when, - data: current_data, + revision: 1, })) .await - .expect("process current delayed data"); + .expect("process current wakeup"); let final_flush = ctx.drain_pdata().await; assert_eq!( final_flush.len(), @@ -2126,6 +2227,142 @@ mod tests { }); } + /// Scenario: buffered input has armed a real batch wakeup, but the processor + /// first receives a foreign wakeup slot that does not decode to any local + /// `(format, signal)` timer. + /// Guarantees: the foreign wakeup is ignored without flushing or corrupting + /// state, and the real/current wakeup still flushes the buffered input later. + #[test] + fn test_unknown_wakeup_slot_is_ignored_without_side_effects() { + let (telemetry_registry, metrics_reporter, phase) = setup_test_runtime(json!({ + "otap": { + "min_size": 5, + "max_size": 10, + "sizer": "items", + }, + "max_batch_duration": "50ms" + })); + + phase + .run_test(move |mut ctx| async move { + let mut datagen = DataGenerator::new(1); + let input = datagen.generate_logs(); + + let rec = encode_logs_otap_batch(&input).expect("encode logs"); + ctx.process(Message::PData(OtapPdata::new_default(rec.into()))) + .await + .expect("process input"); + assert!( + ctx.drain_pdata().await.is_empty(), + "input should remain buffered until the real wakeup" + ); + + ctx.process(Message::Control(NodeControlMsg::Wakeup { + slot: WakeupSlot(99), + when: Instant::now(), + revision: 0, + })) + .await + .expect("process unknown wakeup"); + assert!( + ctx.drain_pdata().await.is_empty(), + "foreign wakeup should be ignored" + ); + + let current_when = Instant::now() + Duration::from_secs(1); + ctx.process(Message::Control(NodeControlMsg::Wakeup { + slot: wakeup_slot(SignalFormat::OtapRecords, SignalType::Logs), + when: current_when, + revision: 1, + })) + .await + .expect("process current wakeup"); + let flushed = ctx.drain_pdata().await; + assert_eq!(flushed.len(), 1, "real wakeup should flush buffered input"); + + ctx.process(Message::Control(NodeControlMsg::CollectTelemetry { + metrics_reporter, + })) + .await + .expect("collect telemetry"); + }) + .validate(move |_| async move { + tokio::time::sleep(Duration::from_millis(50)).await; + verify_item_metrics(&telemetry_registry, SignalType::Logs, 3); + }); + } + + /// Scenario: the batch processor runs in forced OTAP mode, has live + /// outbound completion state in its OTAP batch bookkeeping, and then + /// receives a downstream Ack whose returned payload format is OTLP bytes. + /// Guarantees: response handling falls back to the active OTAP batch state, + /// releases the outbound slot, and delivers the upstream Ack without + /// panicking on the returned payload format. + #[test] + fn test_ack_response_format_falls_back_to_active_batch_state() { + let (_telemetry_registry, _metrics_reporter, phase) = setup_test_runtime(json!({ + "format": "otap", + "otap": { + "min_size": 1, + "max_size": 10, + "sizer": "items", + }, + "max_batch_duration": "1s" + })); + + phase + .run_test(move |mut ctx| async move { + let (pipeline_completion_tx, mut pipeline_completion_rx) = + pipeline_completion_msg_channel(10); + ctx.set_pipeline_completion_sender(pipeline_completion_tx); + + let mut datagen = DataGenerator::new(1); + let input: OtlpProtoMessage = datagen.generate_logs().into(); + let input_bytes = otlp_message_to_bytes(&input); + + let pdata = OtapPdata::new_default(input_bytes.clone().into()).test_subscribe_to( + Interests::ACKS | Interests::NACKS, + TestCallData::default().into(), + 23, + ); + + ctx.process(Message::PData(pdata)) + .await + .expect("process input"); + + let mut outputs = ctx.drain_pdata().await; + assert_eq!(outputs.len(), 1, "size flush should emit one batch"); + + let output = outputs.remove(0); + let (output_ctx, _output_payload) = output.into_parts(); + let returned = OtapPdata::new(output_ctx, input_bytes.into()); + + let (_, ack) = + next_ack(AckMsg::new(returned)).expect("expected outbound ack subscriber"); + ctx.process(Message::Control(NodeControlMsg::Ack(ack))) + .await + .expect("process ack"); + + match next_completion( + &mut pipeline_completion_rx, + Duration::from_secs(1), + "batch processor upstream completion after format fallback ack", + ) + .await + { + PipelineCompletionMsg::DeliverAck { ack } => { + let (node_id, ack) = next_ack(ack).expect("expected ack subscriber"); + assert_eq!(node_id, 23); + let calldata: TestCallData = + ack.unwind.route.calldata.try_into().expect("calldata"); + assert_eq!(TestCallData::default(), calldata); + } + other => panic!("expected upstream ack after format fallback, got {other:?}"), + } + }) + .validate(|_| async move {}); + } + // A partial batch that never reached the size threshold must still flush on // Shutdown, and its downstream Ack must release the upstream completion state // rather than leaving correlated requests stuck. @@ -2692,9 +2929,6 @@ mod tests { phase .run_test(move |mut ctx| async move { - let (pipeline_tx, mut pipeline_rx) = runtime_ctrl_msg_channel(10); - ctx.set_runtime_ctrl_sender(pipeline_tx); - // Create test data let mut datagen = DataGenerator::new(1); let logs1: OtlpProtoMessage = datagen.generate_logs().into(); @@ -2705,8 +2939,6 @@ mod tests { let otap_message2 = otlp_to_otap(&logs2); let mut outputs = Vec::new(); - let mut pending_delays: Vec<(Instant, Box)> = Vec::new(); - // Send both ctx.process(Message::PData(OtapPdata::new_default(otlp_message1.into()))) .await @@ -2716,23 +2948,24 @@ mod tests { .await .expect("process otlp"); - // Drain control channel for DelayData - while let Ok(RuntimeControlMsg::DelayData { when, data, .. }) = - pipeline_rx.try_recv() - { - pending_delays.push((when, data)); - } - assert!( ctx.drain_pdata().await.is_empty(), "no outputs before timeout" ); - // Trigger timeout - for (when, data) in pending_delays { - ctx.process(Message::Control(NodeControlMsg::DelayedData { when, data })) - .await - .expect("process delayed"); + // Trigger timeout for both active batching slots. + let when = Instant::now() + Duration::from_secs(1); + for slot in [ + wakeup_slot(SignalFormat::OtlpBytes, SignalType::Logs), + wakeup_slot(SignalFormat::OtapRecords, SignalType::Logs), + ] { + ctx.process(Message::Control(NodeControlMsg::Wakeup { + slot, + when, + revision: 0, + })) + .await + .expect("process wakeup"); } // Drain outputs after timeout diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/durable_buffer_processor/deferred_retry_state.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/durable_buffer_processor/deferred_retry_state.rs new file mode 100644 index 0000000000..d1a0b61eb2 --- /dev/null +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/durable_buffer_processor/deferred_retry_state.rs @@ -0,0 +1,554 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Deferred retry scheduling for `durable_buffer_processor`. +//! +//! Durable buffer needs local per-bundle retry state even though it only uses +//! one engine wakeup slot. This module owns that state and keeps the two layers +//! aligned: +//! +//! - the processor tracks every deferred bundle locally by bundle identity +//! - a local ordered index gives the next retry to resume +//! - the single engine wakeup slot is always armed to the earliest deferred +//! retry deadline +//! - wakeup revisions are used to ignore stale wakeups after re-arming +//! +//! This keeps retry scheduling on one mechanism under heavy NACK pressure +//! instead of splitting between per-bundle wakeups and a separate overflow +//! path. +//! +//! Guarantees: +//! +//! - a deferred bundle is held out of the normal poll loop until it is resumed +//! or explicitly re-deferred +//! - due retries are resumed in deadline order, with deterministic ordering for +//! equal deadlines +//! - this module does not introduce any growth path beyond the number of +//! deferred bundles: it keeps one authoritative map entry and one ordered +//! index entry per deferred bundle, plus at most one armed wakeup record +//! - durable buffer retry scheduling does not depend on having one engine +//! wakeup slot per deferred bundle + +use otap_df_engine::WakeupError; +use otap_df_engine::control::{WakeupRevision, WakeupSlot}; +use otap_df_engine::local::processor::EffectHandler; +use otap_df_otap::pdata::OtapPdata; +use quiver::subscriber::BundleRef; +use std::collections::{BTreeSet, HashMap}; +use std::time::{Duration, Instant}; + +/// Durable buffer uses one processor-local wakeup slot for "the earliest retry +/// currently pending in local state". +pub(super) const RETRY_WAKEUP_SLOT: WakeupSlot = WakeupSlot(0); + +/// Convert a Quiver bundle identity into the stable key used by retry state. +pub(super) fn retry_key(bundle_ref: BundleRef) -> (u64, u32) { + (bundle_ref.segment_seq.raw(), bundle_ref.bundle_index.raw()) +} + +/// Local deferred retry state for one bundle. +/// +/// Durable buffer keeps retry scheduling state locally and only uses the engine +/// wakeup API to re-arm the earliest pending retry deadline. +#[derive(Clone, Copy)] +pub(super) struct DeferredRetry { + bundle_ref: BundleRef, + retry_count: u32, + retry_at: Instant, + sequence: u64, +} + +impl DeferredRetry { + const fn new( + bundle_ref: BundleRef, + retry_count: u32, + retry_at: Instant, + sequence: u64, + ) -> Self { + Self { + bundle_ref, + retry_count, + retry_at, + sequence, + } + } + + pub(super) const fn bundle_ref(self) -> BundleRef { + self.bundle_ref + } + + pub(super) const fn retry_count(self) -> u32 { + self.retry_count + } +} + +/// Tracks the engine wakeup currently armed for the earliest deferred retry. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +struct ArmedRetryWakeup { + when: Instant, + revision: WakeupRevision, +} + +/// Ordering/index key for deferred retries. +/// +/// Ordering is lexicographic by `(retry_at, sequence, key)`, which means: +/// - earlier retry deadlines are resumed first +/// - equal deadlines use insertion sequence as a deterministic tie-breaker +/// - `key` keeps the ordering total and points back to the authoritative +/// `DeferredRetry` stored in the map +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +struct DeferredRetryOrder { + retry_at: Instant, + sequence: u64, + key: (u64, u32), +} + +/// Local deferred-retry scheduling state for durable buffer. +/// +/// The processor keeps all retry deadlines locally and uses one engine wakeup +/// slot for "the earliest retry currently pending". This keeps the heavy-NACK +/// path on a single scheduling mechanism instead of splitting between many +/// armed wakeups and a separate overflow queue. +pub(super) struct DeferredRetryState { + /// Authoritative retry state keyed by bundle identity. + /// + /// Invariant: every deferred bundle appears exactly once here and exactly + /// once in `deferred_order`. + deferred: HashMap<(u64, u32), DeferredRetry>, + + /// Due-order index for deferred retries. + deferred_order: BTreeSet, + + /// Engine wakeup currently armed for the earliest deferred retry, if any. + /// + /// Invariant: when present, `when` equals the earliest currently armed + /// retry deadline as seen by the engine wakeup API, and `revision` is the + /// only wakeup revision allowed to trigger that arm. + armed_wakeup: Option, + + /// Monotonic tie-breaker for equal-deadline ordering. + next_sequence: u64, +} + +impl DeferredRetryState { + pub(super) fn new() -> Self { + Self { + deferred: HashMap::new(), + deferred_order: BTreeSet::new(), + armed_wakeup: None, + next_sequence: 0, + } + } + + pub(super) fn scheduled_len(&self) -> usize { + self.deferred.len() + } + + pub(super) fn is_deferred_key(&self, key: (u64, u32)) -> bool { + self.deferred.contains_key(&key) + } + + fn deferred_order(key: (u64, u32), retry: DeferredRetry) -> DeferredRetryOrder { + DeferredRetryOrder { + retry_at: retry.retry_at, + sequence: retry.sequence, + key, + } + } + + fn remove_deferred(&mut self, key: (u64, u32)) -> Option { + let retry = self.deferred.remove(&key)?; + let _ = self + .deferred_order + .remove(&Self::deferred_order(key, retry)); + Some(retry) + } + + fn insert_deferred(&mut self, bundle_ref: BundleRef, retry_count: u32, retry_at: Instant) { + let key = retry_key(bundle_ref); + let _ = self.remove_deferred(key); + let retry = DeferredRetry::new(bundle_ref, retry_count, retry_at, self.next_sequence); + self.next_sequence = self.next_sequence.saturating_add(1); + let _ = self.deferred.insert(key, retry); + let _ = self.deferred_order.insert(Self::deferred_order(key, retry)); + } + + fn desired_wakeup_at(&self, no_earlier_than: Option) -> Option { + let earliest = self.deferred_order.first().map(|order| order.retry_at)?; + Some(match no_earlier_than { + Some(not_before) if earliest < not_before => not_before, + _ => earliest, + }) + } + + fn sync_armed_wakeup( + &mut self, + effect_handler: &mut EffectHandler, + no_earlier_than: Option, + ) -> Result<(), WakeupError> { + let Some(when) = self.desired_wakeup_at(no_earlier_than) else { + if self.armed_wakeup.is_some() { + let _ = effect_handler.cancel_wakeup(RETRY_WAKEUP_SLOT); + self.armed_wakeup = None; + } + return Ok(()); + }; + + if self + .armed_wakeup + .is_some_and(|armed_wakeup| armed_wakeup.when == when) + { + return Ok(()); + } + + let revision = effect_handler + .set_wakeup(RETRY_WAKEUP_SLOT, when)? + .revision(); + self.armed_wakeup = Some(ArmedRetryWakeup { when, revision }); + Ok(()) + } + + /// Schedule or re-schedule retry deferral for a bundle. + /// + /// Guarantees: + /// - the bundle remains deferred in local state until retry resumption + /// - the single engine wakeup always tracks the earliest deferred retry + /// - returns `false` only when the wakeup could not be armed + pub(super) fn schedule_at( + &mut self, + bundle_ref: BundleRef, + retry_count: u32, + retry_at: Instant, + effect_handler: &mut EffectHandler, + ) -> bool { + let key = retry_key(bundle_ref); + self.insert_deferred(bundle_ref, retry_count, retry_at); + match self.sync_armed_wakeup(effect_handler, None) { + Ok(()) => true, + Err(error) => { + let _ = self.remove_deferred(key); + debug_assert_ne!( + error, + WakeupError::Capacity, + "single-slot durable-buffer wakeup should not hit capacity" + ); + false + } + } + } + + /// Schedule or re-schedule retry deferral after a relative delay. + /// + /// Guarantees: + /// - equivalent to `schedule_at(now + delay)` + /// - keeps the delay-to-deadline conversion local to deferred retry state + pub(super) fn schedule_after( + &mut self, + bundle_ref: BundleRef, + retry_count: u32, + delay: Duration, + effect_handler: &mut EffectHandler, + ) -> bool { + self.schedule_at( + bundle_ref, + retry_count, + Instant::now() + delay, + effect_handler, + ) + } + + /// Accept one wakeup delivery only when it matches the currently armed + /// durable-buffer slot and revision. + /// + /// Guarantees: + /// - unrelated slots are ignored + /// - stale revisions are ignored + /// - the matching wakeup clears the armed state exactly once + pub(super) fn accept_wakeup(&mut self, slot: WakeupSlot, revision: WakeupRevision) -> bool { + if slot != RETRY_WAKEUP_SLOT { + return false; + } + + let Some(armed_wakeup) = self.armed_wakeup else { + return false; + }; + + if armed_wakeup.revision != revision { + return false; + } + + self.armed_wakeup = None; + true + } + + /// Pop the next deferred retry only when its due time has arrived. + /// + /// Guarantee: returning a retry clears all local deferred bookkeeping for + /// that bundle so it can be resumed exactly once. + pub(super) fn take_due_retry(&mut self, now: Instant) -> Option { + let order = *self.deferred_order.first()?; + if order.retry_at > now { + return None; + } + + let _ = self.deferred_order.remove(&order); + let retry = self.deferred.remove(&order.key)?; + Some(retry) + } + + /// Drop all deferred retry gating at shutdown entry. + /// + /// Guarantees: + /// - no bundle remains blocked behind local retry backoff state + /// - the armed wakeup record is cleared + /// - previously deferred bundles can be drained through the normal Quiver + /// poll path during shutdown + pub(super) fn clear_for_shutdown(&mut self) { + self.deferred.clear(); + self.deferred_order.clear(); + self.armed_wakeup = None; + } + + /// Re-arm the single durable-buffer wakeup after retry processing. + /// + /// `no_earlier_than` lets the caller push the next retry attempt out when + /// retries are already due but resend is currently blocked by flow control. + pub(super) fn rearm_after_processing( + &mut self, + effect_handler: &mut EffectHandler, + no_earlier_than: Option, + ) -> bool { + match self.sync_armed_wakeup(effect_handler, no_earlier_than) { + Ok(()) => true, + Err(error) => { + debug_assert_ne!( + error, + WakeupError::Capacity, + "single-slot durable-buffer wakeup should not hit capacity" + ); + false + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use quiver::segment::SegmentSeq; + use quiver::subscriber::BundleIndex; + use std::time::Duration; + + /// Scenario: one deferred retry is currently the earliest retry, and the + /// processor computes the wakeup it should arm next. + /// Guarantees: the single durable-buffer wakeup targets that earliest retry. + #[test] + fn desired_wakeup_tracks_earliest_retry() { + let mut state = DeferredRetryState::new(); + let now = Instant::now(); + + state.insert_deferred( + BundleRef { + segment_seq: SegmentSeq::new(1), + bundle_index: BundleIndex::new(1), + }, + 1, + now + Duration::from_secs(3), + ); + state.insert_deferred( + BundleRef { + segment_seq: SegmentSeq::new(2), + bundle_index: BundleIndex::new(2), + }, + 2, + now + Duration::from_secs(1), + ); + + assert_eq!( + state.desired_wakeup_at(None), + Some(now + Duration::from_secs(1)) + ); + } + + /// Scenario: retries are already due, but resend is currently blocked and + /// the processor wants to defer the next retry attempt by one poll interval. + /// Guarantees: the next wakeup is not armed earlier than the supplied floor. + #[test] + fn desired_wakeup_respects_retry_floor() { + let mut state = DeferredRetryState::new(); + let now = Instant::now(); + + state.insert_deferred( + BundleRef { + segment_seq: SegmentSeq::new(9), + bundle_index: BundleIndex::new(1), + }, + 1, + now - Duration::from_millis(1), + ); + + assert_eq!( + state.desired_wakeup_at(Some(now + Duration::from_secs(1))), + Some(now + Duration::from_secs(1)) + ); + } + + /// Scenario: the processor receives the exact wakeup revision currently + /// armed for the durable-buffer retry slot. + /// Guarantees: that wakeup is accepted and clears the armed state exactly once. + #[test] + fn accept_wakeup_clears_matching_arm() { + let mut state = DeferredRetryState::new(); + let now = Instant::now(); + state.armed_wakeup = Some(ArmedRetryWakeup { + when: now, + revision: 17, + }); + + assert!(state.accept_wakeup(RETRY_WAKEUP_SLOT, 17)); + assert!(state.armed_wakeup.is_none()); + assert!(!state.accept_wakeup(RETRY_WAKEUP_SLOT, 17)); + } + + /// Scenario: the processor receives a wakeup for the retry slot, but the + /// revision is stale relative to the currently armed wakeup. + /// Guarantees: the stale wakeup is ignored and the armed wakeup remains. + #[test] + fn accept_wakeup_ignores_stale_revision() { + let mut state = DeferredRetryState::new(); + let now = Instant::now(); + state.armed_wakeup = Some(ArmedRetryWakeup { + when: now, + revision: 5, + }); + + assert!(!state.accept_wakeup(RETRY_WAKEUP_SLOT, 4)); + assert_eq!( + state.armed_wakeup, + Some(ArmedRetryWakeup { + when: now, + revision: 5, + }) + ); + } + + /// Scenario: the processor receives a wakeup for some unrelated slot. + /// Guarantees: the unrelated wakeup is ignored and armed retry state remains. + #[test] + fn accept_wakeup_ignores_unrelated_slot() { + let mut state = DeferredRetryState::new(); + let now = Instant::now(); + state.armed_wakeup = Some(ArmedRetryWakeup { + when: now, + revision: 3, + }); + + assert!(!state.accept_wakeup(WakeupSlot(999), 3)); + assert!(state.armed_wakeup.is_some()); + } + + /// Scenario: one deferred retry becomes due and is popped for retry resumption. + /// Guarantees: taking that retry clears all local deferred bookkeeping. + #[test] + fn take_due_retry_clears_tracking() { + let mut state = DeferredRetryState::new(); + let bundle_ref = BundleRef { + segment_seq: SegmentSeq::new(321), + bundle_index: BundleIndex::new(7), + }; + let key = retry_key(bundle_ref); + let retry_at = Instant::now(); + + state.insert_deferred(bundle_ref, 4, retry_at); + + assert!(state.deferred.contains_key(&key)); + assert_eq!(state.deferred_order.len(), 1); + + let retry = state + .take_due_retry(retry_at + Duration::from_millis(1)) + .expect("retry should be due"); + + assert_eq!(retry.bundle_ref().segment_seq.raw(), 321); + assert_eq!(retry.bundle_ref().bundle_index.raw(), 7); + assert_eq!(retry.retry_count(), 4); + assert!(!state.deferred.contains_key(&key)); + assert!(state.deferred_order.is_empty()); + } + + /// Scenario: multiple retries become due at the same timestamp. + /// Guarantees: equal-deadline retries are resumed in insertion order via sequence. + #[test] + fn equal_deadline_retries_follow_sequence_order() { + let mut state = DeferredRetryState::new(); + let retry_at = Instant::now(); + let first = BundleRef { + segment_seq: SegmentSeq::new(111), + bundle_index: BundleIndex::new(1), + }; + let second = BundleRef { + segment_seq: SegmentSeq::new(222), + bundle_index: BundleIndex::new(2), + }; + let third = BundleRef { + segment_seq: SegmentSeq::new(333), + bundle_index: BundleIndex::new(3), + }; + + state.insert_deferred(first, 1, retry_at); + state.insert_deferred(second, 2, retry_at); + state.insert_deferred(third, 3, retry_at); + + assert_eq!( + state + .take_due_retry(retry_at + Duration::from_millis(1)) + .expect("first retry") + .bundle_ref(), + first + ); + assert_eq!( + state + .take_due_retry(retry_at + Duration::from_millis(1)) + .expect("second retry") + .bundle_ref(), + second + ); + assert_eq!( + state + .take_due_retry(retry_at + Duration::from_millis(1)) + .expect("third retry") + .bundle_ref(), + third + ); + assert!(state.deferred.is_empty()); + assert!(state.deferred_order.is_empty()); + } + + /// Scenario: durable buffer starts shutdown while it still has deferred + /// retries tracked locally behind its single retry wakeup. + /// Guarantees: shutdown clearing removes all local retry gating and the + /// armed wakeup record so those bundles can be drained through the normal + /// poll path. + #[test] + fn clear_for_shutdown_drops_deferred_tracking() { + let mut state = DeferredRetryState::new(); + let retry_at = Instant::now() + Duration::from_secs(1); + state.insert_deferred( + BundleRef { + segment_seq: SegmentSeq::new(7), + bundle_index: BundleIndex::new(1), + }, + 1, + retry_at, + ); + state.armed_wakeup = Some(ArmedRetryWakeup { + when: retry_at, + revision: 9, + }); + + state.clear_for_shutdown(); + + assert!(state.deferred.is_empty()); + assert!(state.deferred_order.is_empty()); + assert!(state.armed_wakeup.is_none()); + } +} diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/durable_buffer_processor/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/durable_buffer_processor/mod.rs index 4a7496c44d..bf8e4e1aa4 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/durable_buffer_processor/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/durable_buffer_processor/mod.rs @@ -43,7 +43,7 @@ //! - `TimerTick`: Poll storage for bundles, send downstream //! - `Ack`: Extract BundleRef from calldata, call handle.ack() //! - `Nack (permanent)`: Call handle.reject() — no retry -//! - `Nack (transient)`: Call handle.defer() and schedule retry via delay_data() +//! - `Nack (transient)`: Call handle.defer() and schedule retry via a wakeup //! - `Shutdown`: Flush storage engine //! //! # Retry Behavior and Error Handling @@ -71,6 +71,7 @@ mod bundle_adapter; mod config; +mod deferred_retry_state; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; @@ -97,6 +98,9 @@ use bundle_adapter::{ OtapRecordBundleAdapter, OtlpBytesAdapter, convert_bundle_to_pdata, signal_type_from_slot_id, }; pub use config::{DurableBufferConfig, OtlpHandling, SizeCapPolicy}; +use deferred_retry_state::DeferredRetryState; +#[cfg(test)] +use deferred_retry_state::RETRY_WAKEUP_SLOT; use otap_df_config::SignalType; use otap_df_config::error::Error as ConfigError; @@ -104,14 +108,17 @@ use otap_df_config::node::NodeUserConfig; use otap_df_engine::config::ProcessorConfig; use otap_df_engine::context::PipelineContext; use otap_df_engine::control::Context8u8; -use otap_df_engine::control::{AckMsg, CallData, NackMsg, NodeControlMsg}; +use otap_df_engine::control::{ + AckMsg, CallData, NackMsg, NodeControlMsg, WakeupRevision, WakeupSlot, +}; use otap_df_engine::error::Error; use otap_df_engine::local::processor::EffectHandler; use otap_df_engine::message::Message; use otap_df_engine::node::NodeId; use otap_df_engine::processor::ProcessorWrapper; use otap_df_engine::{ - ConsumerEffectHandlerExtension, Interests, ProcessorFactory, ProducerEffectHandlerExtension, + ConsumerEffectHandlerExtension, Interests, LocalWakeupRequirements, ProcessorFactory, + ProcessorRuntimeRequirements, ProducerEffectHandlerExtension, }; use otap_df_pdata::{OtapArrowRecords, OtapPayload}; use otap_df_telemetry::instrument::{Counter, Gauge, ObserveCounter}; @@ -325,38 +332,6 @@ fn decode_bundle_ref(calldata: &CallData) -> Option { }) } -/// Encode a retry ticket into CallData for DelayedData scheduling. -/// -/// Layout: [segment_seq (u64), bundle_index (u32), retry_count (u32) packed into u64] -fn encode_retry_ticket(bundle_ref: BundleRef, retry_count: u32) -> CallData { - // Pack bundle_index (low 32 bits) and retry_count (high 32 bits) into one u64 - let packed = (bundle_ref.bundle_index.raw() as u64) | ((retry_count as u64) << 32); - smallvec![ - Context8u8::from(bundle_ref.segment_seq.raw()), - Context8u8::from(packed), - ] -} - -/// Decode a retry ticket from CallData. -/// -/// Returns (BundleRef, retry_count) if valid. -fn decode_retry_ticket(calldata: &CallData) -> Option<(BundleRef, u32)> { - if calldata.len() < 2 { - return None; - } - let segment_seq = SegmentSeq::new(u64::from(calldata[0])); - let packed = u64::from(calldata[1]); - let bundle_index = BundleIndex::new((packed & 0xFFFF_FFFF) as u32); - let retry_count = (packed >> 32) as u32; - Some(( - BundleRef { - segment_seq, - bundle_index, - }, - retry_count, - )) -} - // ───────────────────────────────────────────────────────────────────────────── // Pending Bundle Tracking // ───────────────────────────────────────────────────────────────────────────── @@ -410,6 +385,16 @@ enum EngineState { Failed(String), } +/// Outcome of trying to resume one deferred retry. +enum RetryResumeOutcome { + /// The retry was re-claimed and sent downstream. + Sent, + /// The retry remains deferred because resend is blocked for now. + Deferred, + /// The retry no longer needs to be retried on this pass. + Skipped, +} + /// Cached per-segment signal classification for queued-item gauge computation. /// /// Populated once per segment (on first access after finalization) and never @@ -447,10 +432,9 @@ pub struct DurableBuffer { /// Key is the (segment_seq, bundle_index) pair encoded as a u128 for fast lookup. pending_bundles: HashMap<(u64, u32), PendingBundle>, - /// Bundles scheduled for retry via delay_data. - /// These are skipped by poll_next_bundle to enforce backoff. - /// Removed when the delay fires and claim_bundle is called. - retry_scheduled: HashSet<(u64, u32)>, + /// Processor-local retry deferral state, driven by one wakeup slot plus a + /// local ordered retry queue. + deferred_retry_state: DeferredRetryState, /// Configuration. config: DurableBufferConfig, @@ -544,7 +528,7 @@ impl DurableBuffer { Ok(Self { engine_state: EngineState::Uninitialized, pending_bundles: HashMap::new(), - retry_scheduled: HashSet::new(), + deferred_retry_state: DeferredRetryState::new(), config, core_id, num_cores, @@ -640,56 +624,97 @@ impl DurableBuffer { self.pending_bundles.len() < self.config.max_in_flight } - /// Schedule a retry for a bundle via delay_data. - /// - /// This is the single point of coordination between `delay_data` scheduling - /// and `retry_scheduled` tracking. Always use this method instead of calling - /// `delay_data` directly to ensure the two stay in sync. + /// Resumes one deferred retry, either by sending it downstream again or by + /// re-deferring it if downstream/backpressure constraints still apply. /// - /// Returns true if scheduling succeeded, false if it failed (caller should - /// let poll_next_bundle pick up the bundle instead). - async fn schedule_retry( + /// Guarantees: + /// - respects `max_in_flight` + /// - re-defers blocked retries with `poll_interval` + /// - returns enough outcome information for the caller to decide whether + /// the current wakeup pass should keep resuming more due retries + fn resume_retry( &mut self, bundle_ref: BundleRef, retry_count: u32, - delay: Duration, effect_handler: &mut EffectHandler, - ) -> bool { - let key = (bundle_ref.segment_seq.raw(), bundle_ref.bundle_index.raw()); - - // Create a lightweight retry ticket - // TODO(#1472): Replace with proper timer support when available. - // Currently we abuse delay_data() with an empty payload as a workaround - // for the lack of a native "schedule callback" primitive. - let retry_ticket = OtapPdata::new( - Default::default(), - OtapPayload::empty(SignalType::Traces), // Signal type doesn't matter for empty payload - ); - let calldata = encode_retry_ticket(bundle_ref, retry_count); - let mut retry_ticket = Box::new(retry_ticket); - effect_handler.subscribe_to(Interests::empty(), calldata, &mut retry_ticket); + ) -> Result { + if !self.can_send_more() { + otel_debug!( + "durable_buffer.retry.deferred", + segment_seq = bundle_ref.segment_seq.raw(), + bundle_index = bundle_ref.bundle_index.raw(), + in_flight = self.pending_bundles.len(), + max_in_flight = self.config.max_in_flight + ); - let retry_at = Instant::now() + delay; - if effect_handler - .delay_data(retry_at, retry_ticket) - .await - .is_ok() - { - // Track that this bundle is scheduled - poll_next_bundle will skip it - let _ = self.retry_scheduled.insert(key); - true - } else { - // Failed to schedule - don't add to retry_scheduled, poll will pick it up - false + if !self.deferred_retry_state.schedule_after( + bundle_ref, + retry_count, + self.config.poll_interval, + effect_handler, + ) { + otel_warn!("durable_buffer.retry.reschedule_failed"); + } + return Ok(RetryResumeOutcome::Deferred); } - } - /// Remove a bundle from retry_scheduled tracking. - /// - /// Call this when the delay has fired and we're about to process the retry. - fn unschedule_retry(&mut self, bundle_ref: BundleRef) { - let key = (bundle_ref.segment_seq.raw(), bundle_ref.bundle_index.raw()); - let _ = self.retry_scheduled.remove(&key); + let claim_result = { + let (engine, subscriber_id) = self.engine()?; + engine.claim_bundle(subscriber_id, bundle_ref) + }; + + match claim_result { + Ok(handle) => match self.try_process_bundle_handle_with_retry_count( + handle, + retry_count, + effect_handler, + ) { + ProcessBundleResult::Sent => { + otel_debug!( + "durable_buffer.retry.sent", + segment_seq = bundle_ref.segment_seq.raw(), + bundle_index = bundle_ref.bundle_index.raw(), + retry_count = retry_count + ); + Ok(RetryResumeOutcome::Sent) + } + ProcessBundleResult::Skipped => { + otel_warn!( + "durable_buffer.retry.skipped", + segment_seq = bundle_ref.segment_seq.raw(), + bundle_index = bundle_ref.bundle_index.raw() + ); + Ok(RetryResumeOutcome::Skipped) + } + ProcessBundleResult::Backpressure => { + otel_debug!( + "durable_buffer.retry.backpressure", + segment_seq = bundle_ref.segment_seq.raw(), + bundle_index = bundle_ref.bundle_index.raw() + ); + + if !self.deferred_retry_state.schedule_after( + bundle_ref, + retry_count, + self.config.poll_interval, + effect_handler, + ) { + otel_warn!("durable_buffer.retry.reschedule_failed"); + } + Ok(RetryResumeOutcome::Deferred) + } + ProcessBundleResult::Error(e) => Err(e), + }, + Err(e) => { + otel_debug!( + "durable_buffer.retry.claim_failed", + segment_seq = bundle_ref.segment_seq.raw(), + bundle_index = bundle_ref.bundle_index.raw(), + error = %e + ); + Ok(RetryResumeOutcome::Skipped) + } + } } /// Lazily initialize the Quiver engine on first use. @@ -1272,7 +1297,7 @@ impl DurableBuffer { "durable_buffer.drain.all_blocked", bundles_processed = bundles_processed, in_flight = self.pending_bundles.len(), - retry_scheduled = self.retry_scheduled.len() + retry_scheduled = self.deferred_retry_state.scheduled_len() ); break; } @@ -1366,10 +1391,10 @@ impl DurableBuffer { // Skip if this bundle is scheduled for retry (waiting for backoff). // This enforces the exponential backoff - poll_next_bundle() returns - // deferred bundles immediately, but we should wait for delay_data to fire. - if self.retry_scheduled.contains(&key) { + // deferred bundles immediately, but we should wait for the retry delay. + if self.deferred_retry_state.is_deferred_key(key) { // Bundle is waiting for backoff. Release the claim; it will be - // re-claimed when the delay_data retry ticket fires. + // re-claimed when the single durable-buffer retry wakeup resumes it. drop(handle); // Implicit defer return ProcessBundleResult::Skipped; } @@ -1509,10 +1534,8 @@ impl DurableBuffer { /// For permanent NACKs (e.g., malformed data that will never succeed), the bundle /// is rejected immediately without retry. /// - /// For transient NACKs, schedules a retry with exponential backoff using `delay_data()`. - /// The bundle is deferred in Quiver (releasing the claim) and a lightweight - /// retry ticket is scheduled. When the delay expires, `handle_delayed_retry` - /// will re-claim the bundle and attempt redelivery. + /// For transient NACKs, defers the bundle locally with exponential backoff + /// and ensures the single durable-buffer wakeup tracks the earliest retry. async fn handle_nack( &mut self, nack: NackMsg, @@ -1586,10 +1609,12 @@ impl DurableBuffer { drop(pending.handle); // Schedule the retry - if self - .schedule_retry(bundle_ref, retry_count, backoff, effect_handler) - .await - { + if self.deferred_retry_state.schedule_after( + bundle_ref, + retry_count, + backoff, + effect_handler, + ) { self.metrics.retries_scheduled.add(1); } else { otel_warn!( @@ -1609,123 +1634,45 @@ impl DurableBuffer { Ok(()) } - /// Handle a delayed retry ticket. + /// Handle a retry wakeup. /// /// Re-claims the bundle from Quiver and attempts redelivery downstream. - async fn handle_delayed_retry( + async fn handle_retry_wakeup( &mut self, - retry_ticket: Box, + slot: WakeupSlot, + revision: WakeupRevision, effect_handler: &mut EffectHandler, ) -> Result<(), Error> { - // Decode the retry ticket - let Some(calldata) = retry_ticket.source_route() else { - otel_warn!("durable_buffer.retry.missing_calldata"); - return Ok(()); - }; - - let Some((bundle_ref, retry_count)) = decode_retry_ticket(&calldata.calldata) else { - otel_warn!("durable_buffer.retry.invalid_calldata"); - return Ok(()); - }; - - // Check max_in_flight limit - if !self.can_send_more() { - // At capacity - re-schedule with a short delay. - // Bundle stays in retry_scheduled (wasn't removed yet). - otel_debug!( - "durable_buffer.retry.deferred", - segment_seq = bundle_ref.segment_seq.raw(), - bundle_index = bundle_ref.bundle_index.raw(), - in_flight = self.pending_bundles.len(), - max_in_flight = self.config.max_in_flight + if !self.deferred_retry_state.accept_wakeup(slot, revision) { + otel_warn!( + "durable_buffer.retry.unknown_wakeup", + wakeup_slot = slot.0.to_string(), + wakeup_revision = revision ); - - // Re-schedule - note: bundle is still in retry_scheduled, schedule_retry - // will just update it (insert is idempotent for HashSet) - if !self - .schedule_retry( - bundle_ref, - retry_count, - self.config.poll_interval, - effect_handler, - ) - .await - { - // Failed to re-schedule - remove from retry_scheduled so poll can pick it up - self.unschedule_retry(bundle_ref); - otel_warn!("durable_buffer.retry.reschedule_failed"); - } return Ok(()); } - // Backoff period has elapsed and we have capacity - remove from retry_scheduled. - // This allows poll_next_bundle to see it again if claim_bundle fails. - self.unschedule_retry(bundle_ref); - - // Re-claim the bundle from Quiver - let claim_result = { - let (engine, subscriber_id) = self.engine()?; - engine.claim_bundle(subscriber_id, bundle_ref) - }; - - match claim_result { - Ok(handle) => { - // Successfully re-claimed, now send downstream - match self.try_process_bundle_handle_with_retry_count( - handle, - retry_count, - effect_handler, - ) { - ProcessBundleResult::Sent => { - otel_debug!( - "durable_buffer.retry.sent", - segment_seq = bundle_ref.segment_seq.raw(), - bundle_index = bundle_ref.bundle_index.raw(), - retry_count = retry_count - ); - } - ProcessBundleResult::Skipped => { - // Shouldn't happen - we just claimed it and removed from retry_scheduled - otel_warn!( - "durable_buffer.retry.skipped", - segment_seq = bundle_ref.segment_seq.raw(), - bundle_index = bundle_ref.bundle_index.raw() - ); - } - ProcessBundleResult::Backpressure => { - // Channel full - the handle was dropped (deferred). - // Re-schedule retry with a short delay. - otel_debug!( - "durable_buffer.retry.backpressure", - segment_seq = bundle_ref.segment_seq.raw(), - bundle_index = bundle_ref.bundle_index.raw() - ); + let mut rearm_no_earlier_than = None; + loop { + let now = Instant::now(); + let Some(retry) = self.deferred_retry_state.take_due_retry(now) else { + break; + }; - // Short delay for backpressure (not exponential - this isn't a failure). - // If scheduling fails, poll will pick it up. - let _ = self - .schedule_retry( - bundle_ref, - retry_count, - self.config.poll_interval, - effect_handler, - ) - .await; - } - ProcessBundleResult::Error(e) => { - return Err(e); - } + match self.resume_retry(retry.bundle_ref(), retry.retry_count(), effect_handler)? { + RetryResumeOutcome::Sent | RetryResumeOutcome::Skipped => {} + RetryResumeOutcome::Deferred => { + rearm_no_earlier_than = Some(now + self.config.poll_interval); + break; } } - Err(e) => { - // Claim failed - bundle may have been resolved or segment dropped - otel_debug!( - "durable_buffer.retry.claim_failed", - segment_seq = bundle_ref.segment_seq.raw(), - bundle_index = bundle_ref.bundle_index.raw(), - error = %e - ); - } + } + + if !self + .deferred_retry_state + .rearm_after_processing(effect_handler, rearm_no_earlier_than) + { + otel_warn!("durable_buffer.retry.rearm_failed"); } Ok(()) @@ -1735,8 +1682,9 @@ impl DurableBuffer { /// /// The shutdown sequence is: /// 1. Flush to finalize any open segment (makes data visible to subscribers) - /// 2. Drain remaining bundles to downstream (best-effort, respects deadline) - /// 3. Engine shutdown (always attempted - also finalizes open segment if flush was skipped) + /// 2. Clear deferred-retry gating so parked retry bundles become drainable + /// 3. Drain remaining bundles to downstream (best-effort, respects deadline) + /// 4. Engine shutdown (always attempted - also finalizes open segment if flush was skipped) /// /// Note: Quiver's `shutdown()` internally calls `finalize_current_segment()`, so even /// if we skip the explicit flush due to deadline pressure, the engine shutdown will @@ -1757,6 +1705,12 @@ impl DurableBuffer { return Ok(()); } + // Shutdown is terminal for this processor instance, so retry backoff no + // longer matters. Clear local deferred-retry gating up front so bundles + // that were parked behind backoff become drainable through the normal + // Quiver poll loop below. + self.deferred_retry_state.clear_for_shutdown(); + // Check deadline before flush/drain sequence if Instant::now() >= deadline { otel_warn!("durable_buffer.shutdown.deadline_exceeded"); @@ -1842,6 +1796,12 @@ impl DurableBuffer { #[async_trait(?Send)] impl otap_df_engine::local::processor::Processor for DurableBuffer { + fn runtime_requirements(&self) -> ProcessorRuntimeRequirements { + ProcessorRuntimeRequirements { + local_wakeups: Some(LocalWakeupRequirements::new(1)), + } + } + async fn process( &mut self, msg: Message, @@ -1934,15 +1894,11 @@ impl otap_df_engine::local::processor::Processor for DurableBuffer { } NodeControlMsg::MemoryPressureChanged { .. } => Ok(()), NodeControlMsg::DrainIngress { .. } => Ok(()), - NodeControlMsg::DelayedData { data, .. } => { - // Check if this is a retry ticket (has BundleRef + retry_count in calldata) - if let Some(route) = data.source_route() { - if decode_retry_ticket(&route.calldata).is_some() { - // This is a retry ticket - handle retry - return self.handle_delayed_retry(data, effect_handler).await; - } - } - // Not a retry ticket - shouldn't happen, but handle gracefully + NodeControlMsg::Wakeup { slot, revision, .. } => { + self.handle_retry_wakeup(slot, revision, effect_handler) + .await + } + NodeControlMsg::DelayedData { .. } => { otel_warn!("durable_buffer.delayed_data.unexpected"); Ok(()) } @@ -2035,46 +1991,383 @@ mod tests { assert!(decode_bundle_ref(&calldata).is_none()); } + /// Scenario: one transient NACK arms a normal processor-local wakeup and the + /// wakeup control message is later delivered through the processor inbox. + /// Guarantees: the retry stays deferred until the wakeup arrives, and that + /// wakeup resumes normal downstream delivery exactly once. #[test] - fn test_retry_ticket_encoding_roundtrip() { - let bundle_ref = BundleRef { - segment_seq: SegmentSeq::new(98765), - bundle_index: BundleIndex::new(123), - }; - let retry_count = 7u32; + fn test_retry_wakeup_resumes_retry_logic() { + use otap_df_config::node::NodeUserConfig; + use otap_df_engine::config::ProcessorConfig; + use otap_df_engine::context::ControllerContext; + use otap_df_engine::control::pipeline_completion_msg_channel; + use otap_df_engine::message::Message; + use otap_df_engine::testing::processor::TestRuntime; + use otap_df_engine::testing::test_node; + use otap_df_otap::testing::next_nack; + use otap_df_pdata::encode::encode_logs_otap_batch; + use otap_df_pdata::testing::fixtures::DataGenerator; + use serde_json::json; + + let rt = TestRuntime::new(); + let controller = ControllerContext::new(rt.metrics_registry()); + let pipeline_ctx = controller.pipeline_context_with("grp".into(), "pipe".into(), 0, 1, 0); + let temp_dir = tempfile::tempdir().expect("tempdir"); - let calldata = encode_retry_ticket(bundle_ref, retry_count); - let decoded = decode_retry_ticket(&calldata); + let mut node_config = NodeUserConfig::new_processor_config(DURABLE_BUFFER_URN); + node_config.config = json!({ + "path": temp_dir.path(), + "retention_size_cap": "256 MiB", + "poll_interval": "100ms", + "max_segment_open_duration": "1s", + "initial_retry_interval": "100ms", + "max_retry_interval": "100ms", + "retry_multiplier": 2.0, + "max_in_flight": 1000 + }); + + let processor = create_durable_buffer( + pipeline_ctx, + test_node("durable-buffer-retry-wakeup"), + Arc::new(node_config), + &ProcessorConfig::new("durable-buffer-retry-wakeup"), + ) + .expect("create durable buffer"); + + rt.set_processor(processor) + .run_test(move |mut ctx| async move { + let (pipeline_completion_tx, _pipeline_completion_rx) = + pipeline_completion_msg_channel(10); + ctx.set_pipeline_completion_sender(pipeline_completion_tx); + + let mut datagen = DataGenerator::new(1); + let input = datagen.generate_logs(); + let rec = encode_logs_otap_batch(&input).expect("encode logs"); + ctx.process(Message::PData(OtapPdata::new_default(rec.into()))) + .await + .expect("process input"); + + ctx.process(Message::Control(NodeControlMsg::TimerTick {})) + .await + .expect("process timer tick"); + let mut outputs = ctx.drain_pdata().await; + assert_eq!(outputs.len(), 1, "timer tick should emit one bundle"); + + let sent = outputs.pop().expect("sent bundle"); + let (_, nack) = + next_nack(NackMsg::new("retry", sent)).expect("expected nack subscriber"); + ctx.process(Message::Control(NodeControlMsg::Nack(nack))) + .await + .expect("process nack"); + assert!( + ctx.drain_pdata().await.is_empty(), + "nack should defer delivery until wakeup" + ); - assert!(decoded.is_some()); - let (decoded_ref, decoded_count) = decoded.unwrap(); - assert_eq!(decoded_ref.segment_seq.raw(), 98765); - assert_eq!(decoded_ref.bundle_index.raw(), 123); - assert_eq!(decoded_count, 7); + ctx.sleep(Duration::from_millis(200)).await; + ctx.process(Message::Control(NodeControlMsg::Wakeup { + slot: RETRY_WAKEUP_SLOT, + when: Instant::now(), + revision: 0, + })) + .await + .expect("process retry wakeup"); + + let retried = ctx.drain_pdata().await; + assert_eq!(retried.len(), 1, "wakeup should resume retry delivery"); + assert_eq!(retried[0].signal_type(), SignalType::Logs); + }) + .validate(|_| async {}); } + /// Scenario: an unrelated wakeup arrives while durable-buffer has multiple + /// deferred retries pending behind its single retry wakeup slot. + /// Guarantees: the unrelated wakeup does not cause early redelivery or lose + /// deferred retries; the matching wakeup later resumes all due retries. #[test] - fn test_retry_ticket_encoding_max_values() { - let bundle_ref = BundleRef { - segment_seq: SegmentSeq::new(u64::MAX), - bundle_index: BundleIndex::new(u32::MAX), - }; - let retry_count = u32::MAX; + fn test_unknown_wakeup_does_not_lose_deferred_retries() { + use otap_df_config::node::NodeUserConfig; + use otap_df_engine::config::ProcessorConfig; + use otap_df_engine::context::ControllerContext; + use otap_df_engine::control::pipeline_completion_msg_channel; + use otap_df_engine::message::Message; + use otap_df_engine::testing::processor::TestRuntime; + use otap_df_engine::testing::test_node; + use otap_df_otap::testing::next_nack; + use otap_df_pdata::encode::encode_logs_otap_batch; + use otap_df_pdata::testing::fixtures::DataGenerator; + use serde_json::json; + + let rt = TestRuntime::new(); + let controller = ControllerContext::new(rt.metrics_registry()); + let pipeline_ctx = controller.pipeline_context_with("grp".into(), "pipe".into(), 0, 1, 0); + let temp_dir = tempfile::tempdir().expect("tempdir"); + + let mut node_config = NodeUserConfig::new_processor_config(DURABLE_BUFFER_URN); + node_config.config = json!({ + "path": temp_dir.path(), + "retention_size_cap": "256 MiB", + "poll_interval": "50ms", + "max_segment_open_duration": "1s", + "initial_retry_interval": "100ms", + "max_retry_interval": "100ms", + "retry_multiplier": 2.0, + "max_in_flight": 1000 + }); + + let processor = create_durable_buffer( + pipeline_ctx, + test_node("durable-buffer-unknown-wakeup"), + Arc::new(node_config), + &ProcessorConfig::with_channel_capacities("durable-buffer-unknown-wakeup", 1, 100), + ) + .expect("create durable buffer"); + + rt.set_processor(processor) + .run_test(move |mut ctx| async move { + let (pipeline_completion_tx, _pipeline_completion_rx) = + pipeline_completion_msg_channel(10); + ctx.set_pipeline_completion_sender(pipeline_completion_tx); + + let mut datagen = DataGenerator::new(2); + for _ in 0..2 { + let input = datagen.generate_logs(); + let rec = encode_logs_otap_batch(&input).expect("encode logs"); + ctx.process(Message::PData(OtapPdata::new_default(rec.into()))) + .await + .expect("process input"); + } - let calldata = encode_retry_ticket(bundle_ref, retry_count); - let decoded = decode_retry_ticket(&calldata); + ctx.process(Message::Control(NodeControlMsg::TimerTick {})) + .await + .expect("process timer tick"); + let mut outputs = ctx.drain_pdata().await; + assert_eq!(outputs.len(), 2, "timer tick should emit two bundles"); + for sent in outputs.drain(..) { + let (_, nack) = + next_nack(NackMsg::new("retry", sent)).expect("expected nack subscriber"); + ctx.process(Message::Control(NodeControlMsg::Nack(nack))) + .await + .expect("process nack"); + } - assert!(decoded.is_some()); - let (decoded_ref, decoded_count) = decoded.unwrap(); - assert_eq!(decoded_ref.segment_seq.raw(), u64::MAX); - assert_eq!(decoded_ref.bundle_index.raw(), u32::MAX); - assert_eq!(decoded_count, u32::MAX); + ctx.process(Message::Control(NodeControlMsg::Wakeup { + slot: WakeupSlot(999), + when: Instant::now(), + revision: 0, + })) + .await + .expect("process unknown wakeup"); + assert!( + ctx.drain_pdata().await.is_empty(), + "unknown wakeup should not redeliver deferred retries" + ); + + ctx.sleep(Duration::from_millis(200)).await; + ctx.process(Message::Control(NodeControlMsg::Wakeup { + slot: RETRY_WAKEUP_SLOT, + when: Instant::now(), + revision: 0, + })) + .await + .expect("process shared retry wakeup"); + let retried = ctx.drain_pdata().await; + assert_eq!( + retried.len(), + 2, + "matching wakeup should resume all due deferred retries" + ); + }) + .validate(|_| async {}); } + /// Scenario: two transient NACKs occur and both retries share the durable + /// buffer's single wakeup slot. + /// Guarantees: no retry is re-delivered before the shared wakeup fires, and + /// one matching wakeup resumes all due retries. #[test] - fn test_decode_retry_ticket_empty_calldata() { - let calldata: CallData = smallvec![]; - assert!(decode_retry_ticket(&calldata).is_none()); + fn test_multiple_retries_share_single_wakeup() { + use otap_df_config::node::NodeUserConfig; + use otap_df_engine::config::ProcessorConfig; + use otap_df_engine::context::ControllerContext; + use otap_df_engine::control::pipeline_completion_msg_channel; + use otap_df_engine::message::Message; + use otap_df_engine::testing::processor::TestRuntime; + use otap_df_engine::testing::test_node; + use otap_df_otap::testing::next_nack; + use otap_df_pdata::encode::encode_logs_otap_batch; + use otap_df_pdata::testing::fixtures::DataGenerator; + use serde_json::json; + + let rt = TestRuntime::new(); + let controller = ControllerContext::new(rt.metrics_registry()); + let pipeline_ctx = controller.pipeline_context_with("grp".into(), "pipe".into(), 0, 1, 0); + let temp_dir = tempfile::tempdir().expect("tempdir"); + + let mut node_config = NodeUserConfig::new_processor_config(DURABLE_BUFFER_URN); + node_config.config = json!({ + "path": temp_dir.path(), + "retention_size_cap": "256 MiB", + "poll_interval": "50ms", + "max_segment_open_duration": "1s", + "initial_retry_interval": "100ms", + "max_retry_interval": "100ms", + "retry_multiplier": 2.0, + "max_in_flight": 1000 + }); + + let processor = create_durable_buffer( + pipeline_ctx, + test_node("durable-buffer-shared-retry-wakeup"), + Arc::new(node_config), + &ProcessorConfig::with_channel_capacities("durable-buffer-shared-retry-wakeup", 1, 100), + ) + .expect("create durable buffer"); + + rt.set_processor(processor) + .run_test(move |mut ctx| async move { + let (pipeline_completion_tx, _pipeline_completion_rx) = + pipeline_completion_msg_channel(10); + ctx.set_pipeline_completion_sender(pipeline_completion_tx); + + let mut datagen = DataGenerator::new(2); + for _ in 0..2 { + let input = datagen.generate_logs(); + let rec = encode_logs_otap_batch(&input).expect("encode logs"); + ctx.process(Message::PData(OtapPdata::new_default(rec.into()))) + .await + .expect("process input"); + } + + ctx.process(Message::Control(NodeControlMsg::TimerTick {})) + .await + .expect("process timer tick"); + let mut outputs = ctx.drain_pdata().await; + assert_eq!(outputs.len(), 2, "timer tick should emit two bundles"); + for sent in outputs.drain(..) { + let (_, nack) = + next_nack(NackMsg::new("retry", sent)).expect("expected nack subscriber"); + ctx.process(Message::Control(NodeControlMsg::Nack(nack))) + .await + .expect("process nack"); + } + + ctx.process(Message::Control(NodeControlMsg::TimerTick {})) + .await + .expect("process immediate timer tick"); + assert!( + ctx.drain_pdata().await.is_empty(), + "shared wakeup retries should stay deferred until due" + ); + + ctx.sleep(Duration::from_millis(200)).await; + ctx.process(Message::Control(NodeControlMsg::Wakeup { + slot: RETRY_WAKEUP_SLOT, + when: Instant::now(), + revision: 0, + })) + .await + .expect("process shared retry wakeup"); + let wakeup_retry = ctx.drain_pdata().await; + assert_eq!( + wakeup_retry.len(), + 2, + "shared wakeup should resume all due retry deliveries" + ); + }) + .validate(|_| async {}); + } + + /// Scenario: a bundle is transiently NACKed, becomes deferred behind the + /// durable-buffer retry wakeup, and shutdown starts before that wakeup + /// fires. + /// Guarantees: shutdown clears deferred-retry gating so the existing drain + /// loop can forward that parked bundle instead of leaving it restart-dependent. + #[test] + fn test_shutdown_drains_deferred_retry_bundle() { + use otap_df_config::node::NodeUserConfig; + use otap_df_engine::config::ProcessorConfig; + use otap_df_engine::context::ControllerContext; + use otap_df_engine::control::pipeline_completion_msg_channel; + use otap_df_engine::message::Message; + use otap_df_engine::testing::processor::TestRuntime; + use otap_df_engine::testing::test_node; + use otap_df_otap::testing::next_nack; + use otap_df_pdata::encode::encode_logs_otap_batch; + use otap_df_pdata::testing::fixtures::DataGenerator; + use serde_json::json; + + let rt = TestRuntime::new(); + let controller = ControllerContext::new(rt.metrics_registry()); + let pipeline_ctx = controller.pipeline_context_with("grp".into(), "pipe".into(), 0, 1, 0); + let temp_dir = tempfile::tempdir().expect("tempdir"); + + let mut node_config = NodeUserConfig::new_processor_config(DURABLE_BUFFER_URN); + node_config.config = json!({ + "path": temp_dir.path(), + "retention_size_cap": "256 MiB", + "poll_interval": "100ms", + "max_segment_open_duration": "1s", + "initial_retry_interval": "10s", + "max_retry_interval": "10s", + "retry_multiplier": 2.0, + "max_in_flight": 1000 + }); + + let processor = create_durable_buffer( + pipeline_ctx, + test_node("durable-buffer-shutdown-drain-deferred"), + Arc::new(node_config), + &ProcessorConfig::new("durable-buffer-shutdown-drain-deferred"), + ) + .expect("create durable buffer"); + + rt.set_processor(processor) + .run_test(move |mut ctx| async move { + let (pipeline_completion_tx, _pipeline_completion_rx) = + pipeline_completion_msg_channel(10); + ctx.set_pipeline_completion_sender(pipeline_completion_tx); + + let mut datagen = DataGenerator::new(1); + let input = datagen.generate_logs(); + let rec = encode_logs_otap_batch(&input).expect("encode logs"); + ctx.process(Message::PData(OtapPdata::new_default(rec.into()))) + .await + .expect("process input"); + + ctx.process(Message::Control(NodeControlMsg::TimerTick {})) + .await + .expect("process timer tick"); + let mut outputs = ctx.drain_pdata().await; + assert_eq!(outputs.len(), 1, "timer tick should emit one bundle"); + + let sent = outputs.pop().expect("sent bundle"); + let (_, nack) = + next_nack(NackMsg::new("retry", sent)).expect("expected nack subscriber"); + ctx.process(Message::Control(NodeControlMsg::Nack(nack))) + .await + .expect("process nack"); + assert!( + ctx.drain_pdata().await.is_empty(), + "nack should defer delivery until either wakeup or shutdown drain" + ); + + ctx.process(Message::Control(NodeControlMsg::Shutdown { + deadline: Instant::now() + Duration::from_secs(1), + reason: "shutdown".to_owned(), + })) + .await + .expect("process shutdown"); + + let drained = ctx.drain_pdata().await; + assert_eq!( + drained.len(), + 1, + "shutdown drain should forward the deferred retry bundle" + ); + assert_eq!(drained[0].signal_type(), SignalType::Logs); + }) + .validate(|_| async {}); } #[test] diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/log_sampling_processor/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/log_sampling_processor/mod.rs index 526a02721c..24cc237a1c 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/log_sampling_processor/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/log_sampling_processor/mod.rs @@ -200,6 +200,7 @@ impl local::Processor for LogSamplingProcessor { | NodeControlMsg::Nack(_) | NodeControlMsg::MemoryPressureChanged { .. } | NodeControlMsg::DrainIngress { .. } + | NodeControlMsg::Wakeup { .. } | NodeControlMsg::DelayedData { .. } => Ok(()), }, } diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/retry_processor/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/retry_processor/mod.rs index 5a0ba4c14c..a95f9ad450 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/retry_processor/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/retry_processor/mod.rs @@ -651,6 +651,7 @@ impl Processor for RetryProcessor { NodeControlMsg::TimerTick { .. } => { unreachable!("unused"); } + NodeControlMsg::Wakeup { .. } => Ok(()), NodeControlMsg::MemoryPressureChanged { .. } => Ok(()), NodeControlMsg::DrainIngress { .. } => Ok(()), NodeControlMsg::Shutdown { .. } => Ok(()), diff --git a/rust/otap-dataflow/crates/engine/README.md b/rust/otap-dataflow/crates/engine/README.md index 59180340c7..1989460fd3 100644 --- a/rust/otap-dataflow/crates/engine/README.md +++ b/rust/otap-dataflow/crates/engine/README.md @@ -231,7 +231,7 @@ closed normal admission. The current message families are: - **Node control messages**: `Ack`, `Nack`, `Config`, `TimerTick`, - `CollectTelemetry`, `DelayedData`, `DrainIngress`, `Shutdown` + `CollectTelemetry`, `Wakeup`, `DelayedData`, `DrainIngress`, `Shutdown` - **Runtime control messages**: `StartTimer`, `CancelTimer`, `StartTelemetryTimer`, `CancelTelemetryTimer`, `DelayData`, `ReceiverDrained`, `Shutdown` @@ -274,6 +274,54 @@ behavior: empty and close. The later sections on Ack/Nack delivery and graceful shutdown cover this in more detail. +### Processor-Local Wakeups + +Processors can schedule local wakeups through the processor effect handler: + +- `set_wakeup(slot, when)` schedules or replaces the wakeup for `slot` and + returns whether that slot was inserted or replaced, along with the accepted + wakeup revision +- `cancel_wakeup(slot)` removes the wakeup for `slot` if one is live + +This API is intentionally processor-local: + +- `WakeupSlot` is scoped to one processor instance, not globally across the + pipeline +- a processor can define its own slot constants such as `WakeupSlot(0)` +- a processor can also encode compact structured local identifiers directly in + the widened `WakeupSlot(pub u128)` payload when that is more natural +- the engine does not interpret slot meaning; it only routes the slot back to + the originating processor + +Wakeups are delivered through `ProcessorInbox` as +`NodeControlMsg::Wakeup { slot, when, revision }`. They therefore participate +in the same receive loop and the same bounded fairness policy as other control +traffic. + +The current runtime properties and guarantees are: + +- **Keyed replacement:** there is at most one live wakeup per slot; scheduling + the same slot again replaces the previous due time +- **Revisioned delivery:** every accepted schedule gets a scheduler-assigned + revision; re-scheduling a live slot gives it a new revision so processors can + ignore stale wakeups for reused slots +- **Cancellation:** canceling a live slot prevents that wakeup from being + delivered later +- **Bounded live state:** scheduler state is bounded by the number of live + wakeup slots accepted for the processor +- **Deterministic ordering:** if two wakeups have the same due time, they are + delivered in schedule order +- **No payload retention:** wakeups carry only `(slot, when, revision)` and do + not retain deferred `pdata` +- **Shutdown rejection and drop:** once processor shutdown is latched, new + wakeups are rejected and pending wakeups are dropped immediately +- **No flush-on-shutdown guarantee:** pending wakeups are not drained or forced + through during shutdown + +Wakeups are best-effort runtime scheduling signals. They are not durable work +items and are not part of the runtime-control delayed-data mechanism that is +still used by retry-oriented flows outside this API. + ## Runtime Properties The runtime is organized around a small set of guarantees: @@ -316,6 +364,7 @@ In practice, effect handlers are how nodes: - subscribe to Ack/Nack interests on the forward path - emit Ack/Nack outcomes onto the pipeline-completion channel - schedule or cancel timers on the runtime-control channel +- schedule or cancel processor-local wakeups - return delayed data - report `ReceiverDrained` - create listeners and sockets with engine-defined socket options diff --git a/rust/otap-dataflow/crates/engine/src/control.rs b/rust/otap-dataflow/crates/engine/src/control.rs index f1335bc1a7..da153f335b 100644 --- a/rust/otap-dataflow/crates/engine/src/control.rs +++ b/rust/otap-dataflow/crates/engine/src/control.rs @@ -76,6 +76,26 @@ impl From for f64 { /// numbers, deadline, num_items, etc. pub type CallData = SmallVec<[Context8u8; 3]>; +/// Opaque key used to identify a processor-local scheduled wakeup. +/// +/// Slots are scoped to a single processor instance. They do not need to be +/// globally unique across the pipeline, so processors can define local +/// constants such as `WakeupSlot(0)` for their own internal timers. +/// +/// Re-scheduling the same slot replaces the previous wakeup for that slot. +/// The widened `u128` payload lets processors encode compact structured local +/// identifiers directly when that is more natural than allocating slot numbers. +#[repr(transparent)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct WakeupSlot(pub u128); + +/// Monotonic wakeup revision assigned by the scheduler each time a slot is set. +/// +/// Re-scheduling an existing slot gives it a new revision. Processors can use +/// the revision carried back in [`NodeControlMsg::Wakeup`] to distinguish a +/// current wakeup from a stale delivery for the same slot. +pub type WakeupRevision = u64; + /// Engine-managed call data envelope. Wraps the CallData with an envelope /// containing timestamp. Lives on the forward path (in context stack frames). #[derive(Clone, Debug, Default, PartialEq)] @@ -223,6 +243,26 @@ pub enum NodeControlMsg { metrics_reporter: MetricsReporter, }, + /// A processor-local wakeup scheduled by the processor effect handler. + /// + /// This is delivered back through the processor inbox as normal control + /// traffic. The slot identifies which logical wakeup fired; processors are + /// expected to interpret the slot according to their own local namespace. + /// The revision changes every time the slot is (re-)scheduled and allows + /// processors to ignore stale wakeups for a reused slot. + /// + /// Wakeups are best-effort runtime signals rather than durable work items: + /// once processor shutdown is latched, pending wakeups are dropped and no + /// further wakeups are accepted. + Wakeup { + /// Scheduled wakeup slot. + slot: WakeupSlot, + /// Scheduled due time currently associated with this slot. + when: Instant, + /// Scheduler-assigned revision for this slot schedule. + revision: WakeupRevision, + }, + /// Delayed data returning to the node which delayed it. DelayedData { /// When resumed diff --git a/rust/otap-dataflow/crates/engine/src/effect_handler.rs b/rust/otap-dataflow/crates/engine/src/effect_handler.rs index d93060af4d..5b714ceb15 100644 --- a/rust/otap-dataflow/crates/engine/src/effect_handler.rs +++ b/rust/otap-dataflow/crates/engine/src/effect_handler.rs @@ -7,10 +7,12 @@ use crate::Interests; use crate::completion_emission_metrics::CompletionEmissionMetricsHandle; use crate::control::{ AckMsg, NackMsg, PipelineCompletionMsg, PipelineCompletionMsgSender, RuntimeControlMsg, - RuntimeCtrlMsgSender, + RuntimeCtrlMsgSender, WakeupSlot, }; use crate::error::Error; use crate::node::NodeId; +use crate::node_local_scheduler::NodeLocalSchedulerHandle; +use crate::{WakeupError, WakeupSetOutcome}; use otap_df_channel::error::SendError; use otap_df_telemetry::error::Error as TelemetryError; use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler}; @@ -58,6 +60,8 @@ pub(crate) struct EffectHandlerCore { pub(crate) source_tag: SourceTagging, /// Precomputed node interests derived from metric level. node_interests: Interests, + /// Optional processor-local wakeup scheduler. + local_scheduler: Option, } impl EffectHandlerCore { @@ -71,6 +75,7 @@ impl EffectHandlerCore { completion_emission_metrics: None, source_tag: SourceTagging::Disabled, node_interests: Interests::empty(), + local_scheduler: None, } } @@ -103,6 +108,11 @@ impl EffectHandlerCore { self.completion_emission_metrics = completion_emission_metrics; } + /// Sets the processor-local wakeup scheduler for this effect handler. + pub(crate) fn set_local_scheduler(&mut self, local_scheduler: NodeLocalSchedulerHandle) { + self.local_scheduler = Some(local_scheduler); + } + /// Returns outgoing messages source tagging mode. #[must_use] pub const fn source_tagging(&self) -> SourceTagging { @@ -397,6 +407,48 @@ impl EffectHandlerCore { }) } + /// Set or replace a processor-local wakeup. + /// + /// Wakeups are keyed by [`WakeupSlot`]. Scheduling the same slot again + /// replaces the previous due time for that slot and assigns a new + /// scheduler revision for that slot. + /// + /// The returned [`WakeupSetOutcome`] tells the caller whether the slot was + /// newly inserted or whether an existing live wakeup was replaced, and + /// carries the accepted revision that will later be returned in + /// `NodeControlMsg::Wakeup`. + /// + /// # Errors + /// + /// Returns [`WakeupError::Unsupported`] when the processor runtime did not + /// enable processor-local wakeups. Returns [`WakeupError::ShuttingDown`] + /// once processor shutdown has been latched. Returns + /// [`WakeupError::Capacity`] if the processor has reached its configured + /// live wakeup-slot capacity. + pub fn set_wakeup( + &self, + slot: WakeupSlot, + when: Instant, + ) -> Result { + self.local_scheduler + .as_ref() + .ok_or(WakeupError::Unsupported)? + .set_wakeup(slot, when) + } + + /// Cancel a previously scheduled processor-local wakeup. + /// + /// Returns `true` when a live wakeup for `slot` was removed. Returns + /// `false` when the slot was not scheduled or when shutdown has already + /// been latched for the processor. + #[must_use] + pub fn cancel_wakeup(&self, slot: WakeupSlot) -> bool { + self.local_scheduler + .as_ref() + .map(|scheduler| scheduler.cancel_wakeup(slot)) + .unwrap_or(false) + } + /// Notifies the runtime control manager that this receiver has completed /// ingress drain. pub async fn notify_receiver_drained(&self) -> Result<(), Error> { diff --git a/rust/otap-dataflow/crates/engine/src/lib.rs b/rust/otap-dataflow/crates/engine/src/lib.rs index bec1a4896d..b2d4e77177 100644 --- a/rust/otap-dataflow/crates/engine/src/lib.rs +++ b/rust/otap-dataflow/crates/engine/src/lib.rs @@ -18,7 +18,7 @@ use crate::{ local::message::{LocalReceiver, LocalSender}, message::{Receiver, Sender}, node::{Node, NodeDefs, NodeId, NodeName, NodeType}, - processor::ProcessorWrapper, + processor::{ProcessorWrapper, validate_local_wakeup_requirements}, receiver::ReceiverWrapper, runtime_pipeline::{PipeNode, RuntimePipeline}, shared::message::{SharedReceiver, SharedSender}, @@ -72,6 +72,7 @@ pub mod entity_context; pub mod local; pub mod memory_limiter; pub mod node; +mod node_local_scheduler; pub mod output_router; pub mod pipeline_ctrl; mod pipeline_metrics; @@ -82,6 +83,8 @@ pub mod terminal_state; pub mod testing; pub mod topic; pub mod wiring_contract; +pub use node_local_scheduler::{WakeupError, WakeupSetOutcome}; +pub use processor::{LocalWakeupRequirements, ProcessorRuntimeRequirements}; /// Trait for factory types that expose a name. /// @@ -1463,6 +1466,8 @@ impl PipelineFactory { ) .map_err(|e| Error::ConfigError(Box::new(e)))?; + validate_local_wakeup_requirements(&node_id, processor.runtime_requirements())?; + otel_debug!( "processor.create.complete", pipeline_group_id = pipeline_group_id.as_ref(), diff --git a/rust/otap-dataflow/crates/engine/src/local/processor.rs b/rust/otap-dataflow/crates/engine/src/local/processor.rs index f78dc66665..a1dc10233c 100644 --- a/rust/otap-dataflow/crates/engine/src/local/processor.rs +++ b/rust/otap-dataflow/crates/engine/src/local/processor.rs @@ -33,7 +33,7 @@ //! in parallel on different cores, each with its own processor instance. use crate::Interests; -use crate::control::{AckMsg, NackMsg, RuntimeCtrlMsgSender}; +use crate::control::{AckMsg, NackMsg, RuntimeCtrlMsgSender, WakeupSlot}; use crate::effect_handler::{ EffectHandlerCore, SourceTagging, TelemetryTimerCancelHandle, TimerCancelHandle, }; @@ -42,6 +42,8 @@ use crate::message::{Message, Sender}; use crate::node::NodeId; use crate::output_router::OutputRouter; use crate::process_duration::ComputeDuration; +use crate::processor::ProcessorRuntimeRequirements; +use crate::{WakeupError, WakeupSetOutcome}; use async_trait::async_trait; use otap_df_config::PortName; use otap_df_telemetry::error::Error as TelemetryError; @@ -68,7 +70,13 @@ pub trait Processor { /// - Transform the message and return a new message /// - Filter the message by returning None /// - Split the message into multiple messages by returning a vector - /// - Handle control messages (e.g., Config, TimerTick, Shutdown) + /// - Handle control messages (e.g., Config, TimerTick, Wakeup, Shutdown) + /// + /// Processor-local wakeups are scheduled through + /// [`EffectHandler::set_wakeup`]. They are delivered back to the processor + /// as `Message::Control(NodeControlMsg::Wakeup { .. })` through the normal + /// inbox path and participate in the same control-vs-pdata fairness rules + /// as other control traffic. /// /// # Parameters /// @@ -97,6 +105,15 @@ pub trait Processor { fn accept_pdata(&self) -> bool { true } + + /// Returns optional runtime services that this processor needs from the engine. + /// + /// This is the single source of truth for runtime wiring. For example, + /// `local_wakeups: Some(...)` both enables processor-local wakeups and + /// declares the live slot count the engine must provision. + fn runtime_requirements(&self) -> ProcessorRuntimeRequirements { + ProcessorRuntimeRequirements::none() + } } /// A `!Send` implementation of the EffectHandler. @@ -262,6 +279,21 @@ impl EffectHandler { self.core.delay_data(when, data).await } + /// Set or replace a processor-local wakeup. + pub fn set_wakeup( + &self, + slot: WakeupSlot, + when: Instant, + ) -> Result { + self.core.set_wakeup(slot, when) + } + + /// Cancel a previously scheduled processor-local wakeup. + #[must_use] + pub fn cancel_wakeup(&self, slot: WakeupSlot) -> bool { + self.core.cancel_wakeup(slot) + } + /// Reports metrics collected by the processor. #[allow(dead_code)] // Will be used in the future. ToDo report metrics from channel and messages. pub(crate) fn report_metrics( @@ -317,12 +349,13 @@ mod tests { use crate::completion_emission_metrics::make_completion_emission_metrics; use crate::context::ControllerContext; use crate::control::{ - AckMsg, Frame, NackMsg, PipelineCompletionMsg, RouteData, pipeline_completion_msg_channel, + AckMsg, Frame, NackMsg, PipelineCompletionMsg, RouteData, WakeupSlot, + pipeline_completion_msg_channel, }; use crate::entity_context::NodeTelemetryHandle; use crate::local::message::LocalSender; use crate::testing::test_node; - use crate::{Interests, Unwindable}; + use crate::{Interests, Unwindable, WakeupError}; use otap_df_channel::error::SendError; use otap_df_channel::mpsc; use otap_df_config::{MetricLevel, node::NodeKind}; @@ -454,6 +487,25 @@ mod tests { ); } + /// Scenario: a processor effect handler has not been wired with the + /// processor-local wakeup runtime capability and attempts to schedule a + /// wakeup anyway. + /// Guarantees: the call fails with `WakeupError::Unsupported` instead of + /// panicking, so non-opting processors do not require the wakeup runtime + /// machinery to exist. + #[test] + fn effect_handler_set_wakeup_without_runtime_support_returns_unsupported() { + let (_metrics_rx, metrics_reporter) = MetricsReporter::create_new_and_receiver(1); + let eh = + EffectHandler::::new(test_node("proc"), HashMap::new(), None, metrics_reporter); + + assert_eq!( + eh.set_wakeup(WakeupSlot(0), Instant::now()), + Err(WakeupError::Unsupported) + ); + assert!(!eh.cancel_wakeup(WakeupSlot(0))); + } + #[tokio::test] async fn effect_handler_send_message_ambiguous_without_default() { let (a_tx, a_rx) = channel::(10); diff --git a/rust/otap-dataflow/crates/engine/src/message.rs b/rust/otap-dataflow/crates/engine/src/message.rs index 884e3fa5ac..9957fedabf 100644 --- a/rust/otap-dataflow/crates/engine/src/message.rs +++ b/rust/otap-dataflow/crates/engine/src/message.rs @@ -6,6 +6,7 @@ use crate::clock; use crate::control::{AckMsg, NackMsg, NodeControlMsg}; use crate::local::message::{LocalReceiver, LocalSender}; +use crate::node_local_scheduler::NodeLocalSchedulerHandle; use crate::shared::message::{SharedReceiver, SharedSender}; use crate::{Interests, ReceivedAtNode}; use otap_df_channel::error::{RecvError, SendError}; @@ -244,6 +245,7 @@ impl ChannelReceiver for SharedReceiver { /// /// This enum lets the shared core express that difference explicitly without /// forking the whole receive loop. +#[derive(Clone, Copy)] enum DrainPolicy { /// Respect the caller's admission flag even after shutdown has been /// latched. @@ -257,6 +259,7 @@ enum DrainPolicy { struct InboxCore { control_rx: Option, pdata_rx: Option, + local_scheduler: Option, /// Once a Shutdown is seen, this is set to `Some(instant)` representing the drain deadline. shutting_down_deadline: Option, /// Holds the ControlMsg::Shutdown until after we’ve drained pdata. @@ -270,10 +273,17 @@ struct InboxCore { } impl InboxCore { - fn new(control_rx: ControlRx, pdata_rx: PDataRx, node_id: usize, interests: Interests) -> Self { + fn new( + control_rx: ControlRx, + pdata_rx: PDataRx, + local_scheduler: Option, + node_id: usize, + interests: Interests, + ) -> Self { Self { control_rx: Some(control_rx), pdata_rx: Some(pdata_rx), + local_scheduler, shutting_down_deadline: None, pending_shutdown: None, node_id, @@ -285,6 +295,9 @@ impl InboxCore { fn shutdown(&mut self) { self.shutting_down_deadline = None; self.consecutive_control = 0; + if let Some(local_scheduler) = &self.local_scheduler { + local_scheduler.begin_shutdown(); + } drop(self.control_rx.take().expect("control_rx must exist")); drop(self.pdata_rx.take().expect("pdata_rx must exist")); } @@ -330,6 +343,39 @@ where accept_pdata || matches!(policy, DrainPolicy::ForceDrainDuringShutdown) } + fn shutdown_drain_complete(&self) -> bool { + self.pdata_rx + .as_ref() + .expect("pdata_rx must exist") + .is_empty() + && self + .local_scheduler + .as_ref() + .map(NodeLocalSchedulerHandle::is_drained) + .unwrap_or(true) + } + + fn pop_local_due(&mut self, now: Instant) -> Option> { + self.local_scheduler + .as_ref() + .and_then(|scheduler| scheduler.pop_due(now)) + .map(|(slot, when, revision)| { + self.control_message(NodeControlMsg::Wakeup { + slot, + when, + revision, + }) + }) + } + + fn next_local_expiry_sleep(&self, now: Instant) -> Option { + self.local_scheduler + .as_ref() + .and_then(NodeLocalSchedulerHandle::next_expiry) + .filter(|when| *when > now) + .map(clock::sleep_until) + } + async fn recv_with_policy( &mut self, accept_pdata: bool, @@ -373,12 +419,7 @@ where // only after the bounded pdata backlog is empty. This keeps the // channel-level drain contract explicit: upstream work that was // already accepted into the channel gets a chance to run first. - if self - .pdata_rx - .as_ref() - .expect("pdata_rx must exist") - .is_empty() - { + if self.shutdown_drain_complete() { let shutdown = self .pending_shutdown .take() @@ -392,6 +433,9 @@ where sleep_until_deadline = Some(clock::sleep_until(dl)); } + let now = clock::now(); + let mut sleep_until_local = self.next_local_expiry_sleep(now); + // Even while draining we cap control preference. This prevents a // sustained Ack/Nack or shutdown-control burst from starving the // already buffered pdata that shutdown is trying to drain. @@ -415,6 +459,28 @@ where } } + if !self + .control_rx + .as_ref() + .expect("control_rx must exist") + .is_empty() + { + match self + .control_rx + .as_mut() + .expect("control_rx must exist") + .try_recv() + { + Ok(msg) => return Ok(self.control_message(msg)), + Err(RecvError::Empty) => {} + Err(e) => return Err(e), + } + } + + if let Some(msg) = self.pop_local_due(now) { + return Ok(msg); + } + // Drain pdata (gated by accept_pdata) and deliver control messages. // Honoring accept_pdata during draining lets stateful processors // receive Ack/Nack to reduce in-flight state and reopen capacity. @@ -445,6 +511,22 @@ where Ok(msg) => return Ok(self.control_message(msg)), Err(e) => return Err(e), }, + + _ = async { + if let Some(delay) = sleep_until_local.as_mut() { + delay.await; + } + }, if sleep_until_local.is_some() => { + continue; + }, + + _ = async { + if let Some(local_scheduler) = self.local_scheduler.as_ref() { + local_scheduler.wait_for_change().await; + } + }, if self.local_scheduler.is_some() => { + continue; + }, } } else { tokio::select! { @@ -473,11 +555,30 @@ where return Ok(Message::Control(shutdown)); } }, + + _ = async { + if let Some(delay) = sleep_until_local.as_mut() { + delay.await; + } + }, if sleep_until_local.is_some() => { + continue; + }, + + _ = async { + if let Some(local_scheduler) = self.local_scheduler.as_ref() { + local_scheduler.wait_for_change().await; + } + }, if self.local_scheduler.is_some() => { + continue; + }, } } } // Normal mode: no shutdown yet + let now = clock::now(); + let mut sleep_until_local = self.next_local_expiry_sleep(now); + if accept_pdata && self.consecutive_control >= CONTROL_BURST_LIMIT { match self .pdata_rx @@ -491,6 +592,43 @@ where } } + if !self + .control_rx + .as_ref() + .expect("control_rx must exist") + .is_empty() + { + match self + .control_rx + .as_mut() + .expect("control_rx must exist") + .try_recv() + { + Ok(NodeControlMsg::Shutdown { deadline, reason }) => { + if deadline <= clock::now() { + self.shutdown(); + return Ok(Message::Control(NodeControlMsg::Shutdown { + deadline, + reason, + })); + } + if let Some(local_scheduler) = &self.local_scheduler { + local_scheduler.begin_shutdown(); + } + self.shutting_down_deadline = Some(deadline); + self.pending_shutdown = Some(NodeControlMsg::Shutdown { deadline, reason }); + continue; + } + Ok(msg) => return Ok(self.control_message(msg)), + Err(RecvError::Empty) => {} + Err(e) => return Err(e), + } + } + + if let Some(msg) = self.pop_local_due(now) { + return Ok(msg); + } + if accept_pdata && self.consecutive_control >= CONTROL_BURST_LIMIT { tokio::select! { biased; @@ -514,6 +652,9 @@ where self.shutdown(); return Ok(Message::Control(NodeControlMsg::Shutdown { deadline, reason })); } + if let Some(local_scheduler) = &self.local_scheduler { + local_scheduler.begin_shutdown(); + } self.shutting_down_deadline = Some(deadline); self.pending_shutdown = Some(NodeControlMsg::Shutdown { deadline, reason }); continue; @@ -521,6 +662,22 @@ where Ok(msg) => return Ok(self.control_message(msg)), Err(e) => return Err(e), }, + + _ = async { + if let Some(delay) = sleep_until_local.as_mut() { + delay.await; + } + }, if sleep_until_local.is_some() => { + continue; + }, + + _ = async { + if let Some(local_scheduler) = self.local_scheduler.as_ref() { + local_scheduler.wait_for_change().await; + } + }, if self.local_scheduler.is_some() => { + continue; + }, } } else { tokio::select! { @@ -536,6 +693,9 @@ where self.shutdown(); return Ok(Message::Control(NodeControlMsg::Shutdown { deadline, reason })); } + if let Some(local_scheduler) = &self.local_scheduler { + local_scheduler.begin_shutdown(); + } self.shutting_down_deadline = Some(deadline); self.pending_shutdown = Some(NodeControlMsg::Shutdown { deadline, reason }); continue; @@ -550,6 +710,22 @@ where Err(RecvError::Closed) => return Ok(self.closed_pdata_shutdown()), Err(e) => return Err(e), } + }, + + _ = async { + if let Some(delay) = sleep_until_local.as_mut() { + delay.await; + } + }, if sleep_until_local.is_some() => { + continue; + }, + + _ = async { + if let Some(local_scheduler) = self.local_scheduler.as_ref() { + local_scheduler.wait_for_change().await; + } + }, if self.local_scheduler.is_some() => { + continue; } } } @@ -576,7 +752,27 @@ impl ProcessorInbox { interests: Interests, ) -> Self { Self { - core: InboxCore::new(control_rx, pdata_rx, node_id, interests), + core: InboxCore::new(control_rx, pdata_rx, None, node_id, interests), + } + } + + /// Creates a new processor inbox with an explicit processor-local scheduler. + #[must_use] + pub(crate) fn new_with_local_scheduler( + control_rx: Receiver>, + pdata_rx: Receiver, + local_scheduler: NodeLocalSchedulerHandle, + node_id: usize, + interests: Interests, + ) -> Self { + Self { + core: InboxCore::new( + control_rx, + pdata_rx, + Some(local_scheduler), + node_id, + interests, + ), } } } @@ -613,7 +809,7 @@ impl ExporterInbox { interests: Interests, ) -> Self { Self { - core: InboxCore::new(control_rx, pdata_rx, node_id, interests), + core: InboxCore::new(control_rx, pdata_rx, None, node_id, interests), } } } @@ -668,3 +864,235 @@ impl ExporterInbox { /// Send-friendly exporter inbox type for shared exporter runtimes. pub(crate) type SharedExporterInbox = ExporterInbox>, SharedReceiver>; + +#[cfg(test)] +mod tests { + use super::*; + use crate::WakeupError; + use crate::local::message::LocalReceiver; + use crate::testing::TestMsg; + use otap_df_channel::mpsc; + use std::time::Duration; + + fn local_processor_inbox( + wakeup_capacity: usize, + ) -> ( + mpsc::Sender>, + mpsc::Sender, + NodeLocalSchedulerHandle, + ProcessorInbox, + ) { + let (control_tx, control_rx) = mpsc::Channel::>::new(64); + let (pdata_tx, pdata_rx) = mpsc::Channel::::new(64); + let scheduler = NodeLocalSchedulerHandle::new(wakeup_capacity); + let inbox = ProcessorInbox::new_with_local_scheduler( + Receiver::Local(LocalReceiver::mpsc(control_rx)), + Receiver::Local(LocalReceiver::mpsc(pdata_rx)), + scheduler.clone(), + 7, + Interests::empty(), + ); + (control_tx, pdata_tx, scheduler, inbox) + } + + /// Scenario: a processor-local wakeup is scheduled for immediate delivery + /// while the processor inbox is otherwise idle. + /// Guarantees: the inbox surfaces the due wakeup as + /// `NodeControlMsg::Wakeup` with the scheduled slot, deadline, and + /// accepted revision. + #[tokio::test] + async fn processor_inbox_emits_due_wakeup_as_control_message() { + let (_control_tx, _pdata_tx, scheduler, mut inbox) = local_processor_inbox(4); + let when = Instant::now(); + let outcome = scheduler + .set_wakeup(crate::control::WakeupSlot(0), when) + .expect("wakeup should schedule"); + let revision = outcome.revision(); + + let message = tokio::time::timeout(Duration::from_millis(50), inbox.recv_when(true)) + .await + .expect("inbox should wake") + .expect("message should arrive"); + assert!(matches!( + message, + Message::Control(NodeControlMsg::Wakeup { + slot: crate::control::WakeupSlot(0), + when: observed, + revision: observed_revision, + }) if observed == when && observed_revision == revision + )); + } + + /// Scenario: a processor inbox has both pending pdata and a burst of due + /// processor-local wakeups. + /// Guarantees: wakeups still count as ordinary control traffic for the + /// existing fairness policy, so pdata is eventually delivered instead of + /// starving behind an unbounded wakeup burst. + #[tokio::test] + async fn processor_inbox_wakeup_preserves_control_fairness() { + let (_control_tx, pdata_tx, scheduler, mut inbox) = local_processor_inbox(64); + pdata_tx + .send_async(TestMsg::new("pdata")) + .await + .expect("pdata should enqueue"); + let when = Instant::now(); + for slot in 0..40 { + let _ = scheduler + .set_wakeup(crate::control::WakeupSlot(slot), when) + .expect("wakeup should schedule"); + } + + let mut wakeups = 0usize; + let mut saw_pdata = false; + while wakeups <= CONTROL_BURST_LIMIT { + match inbox.recv_when(true).await.expect("message should arrive") { + Message::PData(TestMsg(value)) => { + assert_eq!(value, "pdata"); + saw_pdata = true; + break; + } + Message::Control(NodeControlMsg::Wakeup { .. }) => { + wakeups += 1; + } + other => panic!("unexpected message {other:?}"), + } + } + + assert!( + saw_pdata, + "pdata should not starve behind processor-local wakeups" + ); + } + + /// Scenario: a normal control message is already buffered in the processor + /// inbox when a processor-local wakeup also becomes due. + /// Guarantees: the buffered control message is delivered first, and the + /// due wakeup follows as ordinary control traffic rather than bypassing the + /// existing control queue. + #[tokio::test] + async fn processor_inbox_keeps_buffered_control_ahead_of_due_wakeups() { + let (control_tx, _pdata_tx, scheduler, mut inbox) = local_processor_inbox(4); + let when = Instant::now(); + + control_tx + .send_async(NodeControlMsg::Config { + config: serde_json::json!({"mode": "keep-control-order"}), + }) + .await + .expect("config should enqueue"); + let outcome = scheduler + .set_wakeup(crate::control::WakeupSlot(0), when) + .expect("wakeup should schedule"); + let revision = outcome.revision(); + + let first = inbox.recv_when(true).await.expect("message should arrive"); + assert!(matches!( + first, + Message::Control(NodeControlMsg::Config { .. }) + )); + + let second = inbox.recv_when(true).await.expect("message should arrive"); + assert!(matches!( + second, + Message::Control(NodeControlMsg::Wakeup { + slot: crate::control::WakeupSlot(0), + when: observed, + revision: observed_revision, + }) if observed == when && observed_revision == revision + )); + } + + /// Scenario: shutdown has been latched and the processor-local scheduler + /// receives a new wakeup request while the inbox is draining buffered + /// messages. + /// Guarantees: new wakeup requests are rejected with + /// `WakeupError::ShuttingDown` once shutdown has been latched. + #[tokio::test] + async fn processor_inbox_rejects_wakeups_after_shutdown_latch() { + let (control_tx, pdata_tx, scheduler, mut inbox) = local_processor_inbox(4); + pdata_tx + .send_async(TestMsg::new("buffered")) + .await + .expect("pdata should enqueue"); + control_tx + .send_async(NodeControlMsg::Shutdown { + deadline: Instant::now() + Duration::from_secs(1), + reason: "shutdown".to_owned(), + }) + .await + .expect("shutdown should enqueue"); + control_tx + .send_async(NodeControlMsg::Config { + config: serde_json::json!({"mode": "draining"}), + }) + .await + .expect("config should enqueue"); + + let first = inbox + .recv_when(false) + .await + .expect("control should arrive after shutdown latch"); + assert!(matches!( + first, + Message::Control(NodeControlMsg::Config { .. }) + )); + assert_eq!( + scheduler.set_wakeup(crate::control::WakeupSlot(1), Instant::now()), + Err(WakeupError::ShuttingDown) + ); + } + + /// Scenario: a processor-local wakeup is pending when shutdown is latched + /// and the inbox still has buffered pdata to drain. + /// Guarantees: pending wakeups are dropped immediately on shutdown latch, + /// buffered pdata still drains according to the inbox contract, and the + /// latched shutdown is delivered after draining completes. + #[tokio::test] + async fn processor_inbox_drops_pending_wakeups_on_shutdown_latch() { + let (control_tx, pdata_tx, scheduler, mut inbox) = local_processor_inbox(4); + pdata_tx + .send_async(TestMsg::new("buffered")) + .await + .expect("pdata should enqueue"); + let _ = scheduler + .set_wakeup(crate::control::WakeupSlot(2), Instant::now()) + .expect("wakeup should schedule"); + control_tx + .send_async(NodeControlMsg::Shutdown { + deadline: Instant::now() + Duration::from_secs(1), + reason: "shutdown".to_owned(), + }) + .await + .expect("shutdown should enqueue"); + control_tx + .send_async(NodeControlMsg::Config { + config: serde_json::json!({"drop": true}), + }) + .await + .expect("config should enqueue"); + + let first = inbox + .recv_when(false) + .await + .expect("control should arrive after shutdown latch"); + assert!(matches!( + first, + Message::Control(NodeControlMsg::Config { .. }) + )); + + let drained = inbox + .recv_when(true) + .await + .expect("buffered pdata should drain"); + assert!(matches!(drained, Message::PData(TestMsg(ref value)) if value == "buffered")); + + let shutdown = inbox + .recv_when(true) + .await + .expect("shutdown should follow drain"); + assert!(matches!( + shutdown, + Message::Control(NodeControlMsg::Shutdown { .. }) + )); + } +} diff --git a/rust/otap-dataflow/crates/engine/src/node_local_scheduler.rs b/rust/otap-dataflow/crates/engine/src/node_local_scheduler.rs new file mode 100644 index 0000000000..b93e0714fb --- /dev/null +++ b/rust/otap-dataflow/crates/engine/src/node_local_scheduler.rs @@ -0,0 +1,549 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Node-local wakeup scheduling for processor inboxes. + +use crate::control::{WakeupRevision, WakeupSlot}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::Instant; +use tokio::sync::Notify; + +/// Error returned when a wakeup request cannot be accepted. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum WakeupError { + /// Processor-local wakeups were not enabled for this processor runtime. + Unsupported, + /// The processor has already latched shutdown. + ShuttingDown, + /// The bounded live wakeup slot set is full. + Capacity, +} + +/// Outcome of setting a processor-local wakeup slot. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum WakeupSetOutcome { + /// A new live slot was inserted into the scheduler. + Inserted { + /// Scheduler-assigned revision for the live wakeup now stored in the slot. + revision: WakeupRevision, + }, + /// An existing live slot was updated in place. + Replaced { + /// Scheduler-assigned revision for the replacement wakeup now stored in the slot. + revision: WakeupRevision, + }, +} + +impl WakeupSetOutcome { + /// Returns the scheduler-assigned revision for the accepted wakeup. + #[must_use] + pub const fn revision(self) -> WakeupRevision { + match self { + Self::Inserted { revision } | Self::Replaced { revision } => revision, + } + } +} + +#[derive(Clone, Copy, Debug)] +struct ScheduledWakeup { + slot: WakeupSlot, + when: Instant, + revision: WakeupRevision, +} + +struct NodeLocalScheduler { + wakeup_capacity: usize, + next_revision: WakeupRevision, + wakeups: Vec, + wakeup_indices: HashMap, + shutting_down: bool, +} + +impl NodeLocalScheduler { + fn new(wakeup_capacity: usize) -> Self { + Self { + wakeup_capacity, + next_revision: 0, + wakeups: Vec::new(), + wakeup_indices: HashMap::new(), + shutting_down: false, + } + } + + fn next_revision(&mut self) -> WakeupRevision { + let next = self.next_revision; + self.next_revision = self.next_revision.saturating_add(1); + next + } + + fn wakeup_precedes(left: &ScheduledWakeup, right: &ScheduledWakeup) -> bool { + left.when < right.when || (left.when == right.when && left.revision < right.revision) + } + + fn swap_entries(&mut self, left: usize, right: usize) { + if left == right { + return; + } + + self.wakeups.swap(left, right); + + let left_slot = self.wakeups[left].slot; + let right_slot = self.wakeups[right].slot; + let _ = self + .wakeup_indices + .insert(left_slot, left) + .expect("left slot index should exist"); + let _ = self + .wakeup_indices + .insert(right_slot, right) + .expect("right slot index should exist"); + } + + fn sift_up(&mut self, mut index: usize) { + while index > 0 { + let parent = (index - 1) / 2; + if !Self::wakeup_precedes(&self.wakeups[index], &self.wakeups[parent]) { + break; + } + self.swap_entries(index, parent); + index = parent; + } + } + + fn sift_down(&mut self, mut index: usize) { + let len = self.wakeups.len(); + loop { + let left = index * 2 + 1; + if left >= len { + break; + } + + let right = left + 1; + let mut smallest = left; + if right < len && Self::wakeup_precedes(&self.wakeups[right], &self.wakeups[left]) { + smallest = right; + } + + if !Self::wakeup_precedes(&self.wakeups[smallest], &self.wakeups[index]) { + break; + } + + self.swap_entries(index, smallest); + index = smallest; + } + } + + fn repair_heap_at(&mut self, index: usize) { + if index > 0 { + let parent = (index - 1) / 2; + if Self::wakeup_precedes(&self.wakeups[index], &self.wakeups[parent]) { + self.sift_up(index); + return; + } + } + self.sift_down(index); + } + + fn remove_heap_entry(&mut self, index: usize) -> ScheduledWakeup { + let last = self + .wakeups + .len() + .checked_sub(1) + .expect("heap entry removal requires a non-empty heap"); + + if index == last { + return self.wakeups.pop().expect("last wakeup should exist"); + } + + self.wakeups.swap(index, last); + let removed = self.wakeups.pop().expect("removed wakeup should exist"); + + let moved_slot = self.wakeups[index].slot; + let _ = self + .wakeup_indices + .insert(moved_slot, index) + .expect("moved slot index should exist"); + self.repair_heap_at(index); + removed + } + + fn set_wakeup( + &mut self, + slot: WakeupSlot, + when: Instant, + ) -> Result { + if self.shutting_down { + return Err(WakeupError::ShuttingDown); + } + + if let Some(&index) = self.wakeup_indices.get(&slot) { + let revision = self.next_revision(); + self.wakeups[index].when = when; + self.wakeups[index].revision = revision; + self.repair_heap_at(index); + Ok(WakeupSetOutcome::Replaced { revision }) + } else { + if self.wakeup_indices.len() >= self.wakeup_capacity { + return Err(WakeupError::Capacity); + } + let revision = self.next_revision(); + let index = self.wakeups.len(); + self.wakeups.push(ScheduledWakeup { + slot, + when, + revision, + }); + assert!( + self.wakeup_indices.insert(slot, index).is_none(), + "new wakeup slot should not already exist" + ); + self.sift_up(index); + Ok(WakeupSetOutcome::Inserted { revision }) + } + } + + fn cancel_wakeup(&mut self, slot: WakeupSlot) -> bool { + if self.shutting_down { + return false; + } + + let Some(index) = self.wakeup_indices.remove(&slot) else { + return false; + }; + + let removed = self.remove_heap_entry(index); + debug_assert_eq!(removed.slot, slot); + true + } + + #[cfg(debug_assertions)] + fn assert_consistent(&self) { + assert_eq!(self.wakeups.len(), self.wakeup_indices.len()); + + for (index, wakeup) in self.wakeups.iter().enumerate() { + assert_eq!( + self.wakeup_indices.get(&wakeup.slot).copied(), + Some(index), + "heap index must match map entry" + ); + + if index > 0 { + let parent = (index - 1) / 2; + assert!( + !Self::wakeup_precedes(&self.wakeups[index], &self.wakeups[parent]), + "heap child must not precede parent" + ); + } + } + } + + fn next_expiry(&mut self) -> Option { + #[cfg(debug_assertions)] + self.assert_consistent(); + self.wakeups.first().map(|wakeup| wakeup.when) + } + + fn pop_due(&mut self, now: Instant) -> Option<(WakeupSlot, Instant, WakeupRevision)> { + #[cfg(debug_assertions)] + self.assert_consistent(); + + let next_due = self.wakeups.first().map(|wakeup| wakeup.when)?; + if next_due > now { + return None; + } + + let slot = self.wakeups.first().expect("due wakeup should exist").slot; + let removed_index = self + .wakeup_indices + .remove(&slot) + .expect("due wakeup slot index should exist"); + debug_assert_eq!(removed_index, 0); + let wakeup = self.remove_heap_entry(0); + Some((wakeup.slot, wakeup.when, wakeup.revision)) + } + + fn begin_shutdown(&mut self) { + if self.shutting_down { + return; + } + self.shutting_down = true; + self.wakeup_indices.clear(); + self.wakeups.clear(); + } + + fn is_drained(&self) -> bool { + self.wakeup_indices.is_empty() + } +} + +/// Shared handle used by the processor inbox and the processor effect handler. +pub(crate) struct NodeLocalSchedulerHandle { + inner: Arc>, + notify: Arc, +} + +impl Clone for NodeLocalSchedulerHandle { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + notify: Arc::clone(&self.notify), + } + } +} + +impl NodeLocalSchedulerHandle { + pub(crate) fn new(wakeup_capacity: usize) -> Self { + Self { + inner: Arc::new(Mutex::new(NodeLocalScheduler::new(wakeup_capacity))), + notify: Arc::new(Notify::new()), + } + } + + fn with_scheduler(&self, f: impl FnOnce(&mut NodeLocalScheduler) -> R) -> R { + let mut guard = self + .inner + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + f(&mut guard) + } + + pub(crate) fn set_wakeup( + &self, + slot: WakeupSlot, + when: Instant, + ) -> Result { + let result = self.with_scheduler(|scheduler| scheduler.set_wakeup(slot, when)); + if result.is_ok() { + self.notify.notify_one(); + } + result + } + + #[must_use] + pub(crate) fn cancel_wakeup(&self, slot: WakeupSlot) -> bool { + let changed = self.with_scheduler(|scheduler| scheduler.cancel_wakeup(slot)); + if changed { + self.notify.notify_one(); + } + changed + } + + pub(crate) fn next_expiry(&self) -> Option { + self.with_scheduler(NodeLocalScheduler::next_expiry) + } + + pub(crate) fn pop_due(&self, now: Instant) -> Option<(WakeupSlot, Instant, WakeupRevision)> { + self.with_scheduler(|scheduler| scheduler.pop_due(now)) + } + + pub(crate) fn begin_shutdown(&self) { + self.with_scheduler(NodeLocalScheduler::begin_shutdown); + self.notify.notify_waiters(); + } + + pub(crate) fn is_drained(&self) -> bool { + self.with_scheduler(|scheduler| scheduler.is_drained()) + } + + pub(crate) async fn wait_for_change(&self) { + self.notify.notified().await; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + fn assert_heap_bound(scheduler: &NodeLocalScheduler) { + assert_eq!( + scheduler.wakeups.len(), + scheduler.wakeup_indices.len(), + "scheduler should keep exactly one heap entry per live slot" + ); + #[cfg(debug_assertions)] + scheduler.assert_consistent(); + } + + #[test] + fn set_wakeup_schedules_a_wakeup() { + let mut scheduler = NodeLocalScheduler::new(2); + let now = Instant::now(); + let when = now + Duration::from_secs(1); + + assert_eq!( + scheduler.set_wakeup(WakeupSlot(7), when), + Ok(WakeupSetOutcome::Inserted { revision: 0 }) + ); + assert_heap_bound(&scheduler); + assert_eq!(scheduler.next_expiry(), Some(when)); + assert_eq!(scheduler.pop_due(now), None); + assert_eq!(scheduler.pop_due(when), Some((WakeupSlot(7), when, 0))); + assert_heap_bound(&scheduler); + assert_eq!(scheduler.next_expiry(), None); + } + + #[test] + fn setting_same_slot_replaces_previous_due_time() { + let mut scheduler = NodeLocalScheduler::new(2); + let now = Instant::now(); + let later = now + Duration::from_secs(10); + let sooner = now + Duration::from_secs(1); + + assert_eq!( + scheduler.set_wakeup(WakeupSlot(3), later), + Ok(WakeupSetOutcome::Inserted { revision: 0 }) + ); + assert_eq!( + scheduler.set_wakeup(WakeupSlot(3), sooner), + Ok(WakeupSetOutcome::Replaced { revision: 1 }) + ); + assert_heap_bound(&scheduler); + assert_eq!(scheduler.wakeups.len(), 1); + assert_eq!(scheduler.next_expiry(), Some(sooner)); + assert_eq!(scheduler.pop_due(sooner), Some((WakeupSlot(3), sooner, 1))); + assert_heap_bound(&scheduler); + assert_eq!(scheduler.pop_due(later), None); + } + + #[test] + fn cancel_wakeup_removes_pending_wakeup() { + let mut scheduler = NodeLocalScheduler::new(2); + let when = Instant::now() + Duration::from_secs(1); + + assert_eq!( + scheduler.set_wakeup(WakeupSlot(5), when), + Ok(WakeupSetOutcome::Inserted { revision: 0 }) + ); + assert_heap_bound(&scheduler); + assert!(scheduler.cancel_wakeup(WakeupSlot(5))); + assert_heap_bound(&scheduler); + assert!(!scheduler.cancel_wakeup(WakeupSlot(5))); + assert_eq!(scheduler.next_expiry(), None); + assert_eq!(scheduler.pop_due(when), None); + } + + /// Scenario: a wakeup is rescheduled after heap reordering and then + /// canceled while it is tracked at a moved, non-root heap index. + /// Guarantees: cancellation removes the correct slot, preserves heap/index + /// consistency, and leaves the remaining wakeups due in the expected order. + #[test] + fn cancel_after_reschedule_removes_the_moved_entry() { + let mut scheduler = NodeLocalScheduler::new(4); + let now = Instant::now(); + let first = now + Duration::from_secs(1); + let second = now + Duration::from_secs(10); + let third = now + Duration::from_secs(20); + let fourth = now + Duration::from_secs(30); + let moved = now + Duration::from_secs(2); + + assert_eq!( + scheduler.set_wakeup(WakeupSlot(1), first), + Ok(WakeupSetOutcome::Inserted { revision: 0 }) + ); + assert_eq!( + scheduler.set_wakeup(WakeupSlot(2), second), + Ok(WakeupSetOutcome::Inserted { revision: 1 }) + ); + assert_eq!( + scheduler.set_wakeup(WakeupSlot(3), third), + Ok(WakeupSetOutcome::Inserted { revision: 2 }) + ); + assert_eq!( + scheduler.set_wakeup(WakeupSlot(4), fourth), + Ok(WakeupSetOutcome::Inserted { revision: 3 }) + ); + assert_eq!( + scheduler.set_wakeup(WakeupSlot(3), moved), + Ok(WakeupSetOutcome::Replaced { revision: 4 }) + ); + + let moved_index = scheduler + .wakeup_indices + .get(&WakeupSlot(3)) + .copied() + .expect("rescheduled slot should still be tracked"); + assert!( + moved_index > 0, + "rescheduled slot should be a non-root entry" + ); + + assert!(scheduler.cancel_wakeup(WakeupSlot(3))); + assert_heap_bound(&scheduler); + assert_eq!(scheduler.pop_due(first), Some((WakeupSlot(1), first, 0))); + assert_eq!(scheduler.pop_due(moved), None); + assert_eq!(scheduler.pop_due(second), Some((WakeupSlot(2), second, 1))); + assert_eq!(scheduler.pop_due(fourth), Some((WakeupSlot(4), fourth, 3))); + assert_eq!(scheduler.next_expiry(), None); + } + + #[test] + fn capacity_is_enforced_on_distinct_live_slots() { + let mut scheduler = NodeLocalScheduler::new(1); + let when = Instant::now() + Duration::from_secs(1); + + assert_eq!( + scheduler.set_wakeup(WakeupSlot(0), when), + Ok(WakeupSetOutcome::Inserted { revision: 0 }) + ); + assert_eq!( + scheduler.set_wakeup(WakeupSlot(1), when), + Err(WakeupError::Capacity) + ); + assert_eq!( + scheduler.set_wakeup(WakeupSlot(0), when + Duration::from_secs(1)), + Ok(WakeupSetOutcome::Replaced { revision: 1 }) + ); + assert_heap_bound(&scheduler); + } + + #[test] + fn repeated_reschedules_keep_single_heap_entry() { + let mut scheduler = NodeLocalScheduler::new(2); + let now = Instant::now(); + for offset in (1..=32).rev() { + let when = now + Duration::from_secs(offset); + let outcome = scheduler + .set_wakeup(WakeupSlot(9), when) + .expect("wakeup should schedule"); + let expected_revision: WakeupRevision = 32 - offset; + assert_eq!(outcome.revision(), expected_revision); + assert_heap_bound(&scheduler); + assert_eq!(scheduler.wakeups.len(), 1); + assert_eq!(scheduler.next_expiry(), Some(when)); + } + + let expected = now + Duration::from_secs(1); + assert_eq!( + scheduler.pop_due(expected), + Some((WakeupSlot(9), expected, 31)) + ); + assert_eq!(scheduler.next_expiry(), None); + } + + #[test] + fn equal_deadlines_follow_schedule_sequence() { + let mut scheduler = NodeLocalScheduler::new(4); + let when = Instant::now() + Duration::from_secs(1); + + assert_eq!( + scheduler.set_wakeup(WakeupSlot(1), when), + Ok(WakeupSetOutcome::Inserted { revision: 0 }) + ); + assert_eq!( + scheduler.set_wakeup(WakeupSlot(2), when), + Ok(WakeupSetOutcome::Inserted { revision: 1 }) + ); + assert_eq!( + scheduler.set_wakeup(WakeupSlot(3), when), + Ok(WakeupSetOutcome::Inserted { revision: 2 }) + ); + assert_heap_bound(&scheduler); + + assert_eq!(scheduler.pop_due(when), Some((WakeupSlot(1), when, 0))); + assert_eq!(scheduler.pop_due(when), Some((WakeupSlot(2), when, 1))); + assert_eq!(scheduler.pop_due(when), Some((WakeupSlot(3), when, 2))); + assert_heap_bound(&scheduler); + } +} diff --git a/rust/otap-dataflow/crates/engine/src/processor.rs b/rust/otap-dataflow/crates/engine/src/processor.rs index c0fbc9d97d..55d514765e 100644 --- a/rust/otap-dataflow/crates/engine/src/processor.rs +++ b/rust/otap-dataflow/crates/engine/src/processor.rs @@ -24,6 +24,7 @@ use crate::local::message::{LocalReceiver, LocalSender}; use crate::local::processor as local; use crate::message::{Message, ProcessorInbox, Receiver, Sender}; use crate::node::{Node, NodeId, NodeWithPDataReceiver, NodeWithPDataSender}; +use crate::node_local_scheduler::NodeLocalSchedulerHandle; use crate::shared::message::{SharedReceiver, SharedSender}; use crate::shared::processor as shared; use otap_df_channel::error::SendError; @@ -35,6 +36,55 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +/// Processor-local wakeup requirements declared by a processor implementation. +/// +/// `live_slots` is the maximum number of distinct wakeup slots that can be +/// live at the same time for one processor instance. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct LocalWakeupRequirements { + /// Maximum number of concurrently live wakeup slots. + pub live_slots: usize, +} + +impl LocalWakeupRequirements { + /// Create local wakeup requirements for a processor. + #[must_use] + pub const fn new(live_slots: usize) -> Self { + Self { live_slots } + } +} + +/// Optional runtime services requested by a processor implementation. +/// +/// This is the single source of truth for processor runtime wiring. For +/// example, `local_wakeups: Some(...)` both enables processor-local wakeups and +/// declares the live slot count that the runtime must provision. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct ProcessorRuntimeRequirements { + /// Processor-local wakeup requirements, if the processor uses the local + /// wakeup API. + pub local_wakeups: Option, +} + +impl ProcessorRuntimeRequirements { + /// Runtime requirements for a processor that does not need any optional + /// engine services. + #[must_use] + pub const fn none() -> Self { + Self { + local_wakeups: None, + } + } + + /// Runtime requirements for a processor that uses local wakeups. + #[must_use] + pub const fn with_local_wakeups(live_slots: usize) -> Self { + Self { + local_wakeups: Some(LocalWakeupRequirements::new(live_slots)), + } + } +} + /// A wrapper for the processor that allows for both `Send` and `!Send` effect handlers. /// /// Note: This is useful for creating a single interface for the processor regardless of the effect @@ -233,6 +283,13 @@ impl ProcessorWrapper { } } + pub(crate) fn runtime_requirements(&self) -> ProcessorRuntimeRequirements { + match self { + ProcessorWrapper::Local { processor, .. } => processor.runtime_requirements(), + ProcessorWrapper::Shared { processor, .. } => processor.runtime_requirements(), + } + } + pub(crate) fn with_control_channel_metrics( self, pipeline_ctx: &PipelineContext, @@ -325,6 +382,7 @@ impl ProcessorWrapper { match self { ProcessorWrapper::Local { node_id, + runtime_config: _, processor, control_receiver, pdata_senders, @@ -333,17 +391,33 @@ impl ProcessorWrapper { source_tag, .. } => { - let inbox = ProcessorInbox::new( - Receiver::Local(control_receiver), - pdata_receiver.ok_or_else(|| Error::ProcessorError { - processor: node_id.clone(), - kind: ProcessorErrorKind::Configuration, - error: "The pdata receiver must be defined at this stage".to_owned(), - source_detail: String::new(), - })?, - node_id.index, - node_interests, - ); + let runtime_requirements = processor.runtime_requirements(); + let pdata_receiver = pdata_receiver.ok_or_else(|| Error::ProcessorError { + processor: node_id.clone(), + kind: ProcessorErrorKind::Configuration, + error: "The pdata receiver must be defined at this stage".to_owned(), + source_detail: String::new(), + })?; + validate_local_wakeup_requirements(&node_id, runtime_requirements)?; + let maybe_local_scheduler = runtime_requirements + .local_wakeups + .map(|requirements| NodeLocalSchedulerHandle::new(requirements.live_slots)); + let inbox = if let Some(local_scheduler) = maybe_local_scheduler.clone() { + ProcessorInbox::new_with_local_scheduler( + Receiver::Local(control_receiver), + pdata_receiver, + local_scheduler, + node_id.index, + node_interests, + ) + } else { + ProcessorInbox::new( + Receiver::Local(control_receiver), + pdata_receiver, + node_id.index, + node_interests, + ) + }; let default_port = user_config.default_output.clone(); let mut effect_handler = local::EffectHandler::new( node_id, @@ -352,6 +426,9 @@ impl ProcessorWrapper { metrics_reporter, ); effect_handler.set_source_tagging(source_tag); + if let Some(local_scheduler) = maybe_local_scheduler { + effect_handler.core.set_local_scheduler(local_scheduler); + } Ok(ProcessorWrapperRuntime::Local { processor, effect_handler, @@ -360,6 +437,7 @@ impl ProcessorWrapper { } ProcessorWrapper::Shared { node_id, + runtime_config: _, processor, control_receiver, pdata_senders, @@ -368,17 +446,34 @@ impl ProcessorWrapper { source_tag, .. } => { - let inbox = ProcessorInbox::new( - Receiver::Shared(control_receiver), + let runtime_requirements = processor.runtime_requirements(); + let pdata_receiver = Receiver::Shared(pdata_receiver.ok_or_else(|| Error::ProcessorError { processor: node_id.clone(), kind: ProcessorErrorKind::Configuration, error: "The pdata receiver must be defined at this stage".to_owned(), source_detail: String::new(), - })?), - node_id.index, - node_interests, - ); + })?); + validate_local_wakeup_requirements(&node_id, runtime_requirements)?; + let maybe_local_scheduler = runtime_requirements + .local_wakeups + .map(|requirements| NodeLocalSchedulerHandle::new(requirements.live_slots)); + let inbox = if let Some(local_scheduler) = maybe_local_scheduler.clone() { + ProcessorInbox::new_with_local_scheduler( + Receiver::Shared(control_receiver), + pdata_receiver, + local_scheduler, + node_id.index, + node_interests, + ) + } else { + ProcessorInbox::new( + Receiver::Shared(control_receiver), + pdata_receiver, + node_id.index, + node_interests, + ) + }; let default_port = user_config.default_output.clone(); let mut effect_handler = shared::EffectHandler::new( node_id, @@ -387,6 +482,9 @@ impl ProcessorWrapper { metrics_reporter, ); effect_handler.set_source_tagging(source_tag); + if let Some(local_scheduler) = maybe_local_scheduler { + effect_handler.core.set_local_scheduler(local_scheduler); + } Ok(ProcessorWrapperRuntime::Shared { processor, effect_handler, @@ -559,6 +657,27 @@ impl Node for ProcessorWrapper { } } +pub(crate) fn validate_local_wakeup_requirements( + node_id: &NodeId, + requirements: ProcessorRuntimeRequirements, +) -> Result<(), Error> { + let Some(local_wakeups) = requirements.local_wakeups else { + return Ok(()); + }; + + if local_wakeups.live_slots == 0 { + return Err(Error::ProcessorError { + processor: node_id.clone(), + kind: ProcessorErrorKind::Configuration, + error: "processor-local wakeup requirement must declare at least one live slot" + .to_owned(), + source_detail: String::new(), + }); + } + + Ok(()) +} + #[async_trait::async_trait(?Send)] impl Controllable for ProcessorWrapper { /// Returns the control message sender for the processor. @@ -635,7 +754,9 @@ mod tests { use crate::control::NodeControlMsg::{Config, Shutdown, TimerTick}; use crate::local::processor as local; use crate::message::Message; - use crate::processor::{Error, ProcessorWrapper}; + use crate::processor::{ + Error, ProcessorRuntimeRequirements, ProcessorWrapper, validate_local_wakeup_requirements, + }; use crate::shared::processor as shared; use crate::testing::processor::TestRuntime; use crate::testing::processor::{TestContext, ValidateContext}; @@ -807,4 +928,55 @@ mod tests { .run_test(scenario()) .validate(validation_procedure()); } + + /// Scenario: a processor does not request any processor-local wakeup + /// service from the runtime. + /// Guarantees: validation succeeds without requiring any local wakeup + /// capacity, so processors that do not use wakeups do not pay configuration + /// or startup costs for that service. + #[test] + fn validate_local_wakeup_requirements_accepts_processors_without_wakeups() { + assert!( + validate_local_wakeup_requirements( + &test_node("test_processor"), + ProcessorRuntimeRequirements::none(), + ) + .is_ok() + ); + } + + /// Scenario: a processor declares local wakeups but reports an invalid live + /// slot requirement of zero. + /// Guarantees: validation rejects the configuration before startup, so the + /// runtime never provisions an unusable local wakeup service. + #[test] + fn validate_local_wakeup_requirements_rejects_zero_live_slots() { + let err = validate_local_wakeup_requirements( + &test_node("test_processor"), + ProcessorRuntimeRequirements::with_local_wakeups(0), + ) + .expect_err("zero live slots must be rejected"); + + let Error::ProcessorError { error, .. } = err else { + panic!("expected processor configuration error"); + }; + assert_eq!( + error, + "processor-local wakeup requirement must declare at least one live slot" + ); + } + + /// Scenario: a processor declares a positive local wakeup live slot count. + /// Guarantees: validation succeeds so the declared slot count can act as + /// the single source of truth for local wakeup runtime provisioning. + #[test] + fn validate_local_wakeup_requirements_accepts_positive_live_slots() { + assert!( + validate_local_wakeup_requirements( + &test_node("test_processor"), + ProcessorRuntimeRequirements::with_local_wakeups(6), + ) + .is_ok() + ); + } } diff --git a/rust/otap-dataflow/crates/engine/src/shared/processor.rs b/rust/otap-dataflow/crates/engine/src/shared/processor.rs index 247714b0f3..ae3a873aab 100644 --- a/rust/otap-dataflow/crates/engine/src/shared/processor.rs +++ b/rust/otap-dataflow/crates/engine/src/shared/processor.rs @@ -32,7 +32,7 @@ //! in parallel on different cores, each with its own processor instance. use crate::Interests; -use crate::control::{AckMsg, NackMsg, RuntimeCtrlMsgSender}; +use crate::control::{AckMsg, NackMsg, RuntimeCtrlMsgSender, WakeupSlot}; use crate::effect_handler::{ EffectHandlerCore, SourceTagging, TelemetryTimerCancelHandle, TimerCancelHandle, }; @@ -40,7 +40,9 @@ use crate::error::{Error, TypedError}; use crate::message::Message; use crate::node::NodeId; use crate::output_router::OutputRouter; +use crate::processor::ProcessorRuntimeRequirements; use crate::shared::message::SharedSender; +use crate::{WakeupError, WakeupSetOutcome}; use async_trait::async_trait; use otap_df_config::PortName; use otap_df_telemetry::error::Error as TelemetryError; @@ -67,7 +69,13 @@ pub trait Processor { /// - Transform the message and return a new message /// - Filter the message by returning None /// - Split the message into multiple messages by returning a vector - /// - Handle control messages (e.g., Config, TimerTick, Shutdown) + /// - Handle control messages (e.g., Config, TimerTick, Wakeup, Shutdown) + /// + /// Processor-local wakeups are scheduled through + /// [`EffectHandler::set_wakeup`]. They are delivered back to the processor + /// as `Message::Control(NodeControlMsg::Wakeup { .. })` through the normal + /// inbox path and participate in the same control-vs-pdata fairness rules + /// as other control traffic. /// /// # Parameters /// @@ -96,6 +104,15 @@ pub trait Processor { fn accept_pdata(&self) -> bool { true } + + /// Returns optional runtime services that this processor needs from the engine. + /// + /// This is the single source of truth for runtime wiring. For example, + /// `local_wakeups: Some(...)` both enables processor-local wakeups and + /// declares the live slot count the engine must provision. + fn runtime_requirements(&self) -> ProcessorRuntimeRequirements { + ProcessorRuntimeRequirements::none() + } } /// A `Send` implementation of the EffectHandler. @@ -235,6 +252,21 @@ impl EffectHandler { self.core.delay_data(when, data).await } + /// Set or replace a processor-local wakeup. + pub fn set_wakeup( + &self, + slot: WakeupSlot, + when: Instant, + ) -> Result { + self.core.set_wakeup(slot, when) + } + + /// Cancel a previously scheduled processor-local wakeup. + #[must_use] + pub fn cancel_wakeup(&self, slot: WakeupSlot) -> bool { + self.core.cancel_wakeup(slot) + } + /// Reports metrics collected by the processor. #[allow(dead_code)] // Will be used in the future. ToDo report metrics from channel and messages. pub(crate) fn report_metrics( diff --git a/rust/otap-dataflow/crates/otap/tests/core_node_liveness_tests.rs b/rust/otap-dataflow/crates/otap/tests/core_node_liveness_tests.rs index 0ce4b565de..e0c45127dd 100644 --- a/rust/otap-dataflow/crates/otap/tests/core_node_liveness_tests.rs +++ b/rust/otap-dataflow/crates/otap/tests/core_node_liveness_tests.rs @@ -29,6 +29,7 @@ use otap_df_otap::OTAP_PIPELINE_FACTORY; use otap_df_state::store::ObservedStateStore; use otap_df_telemetry::InternalTelemetrySystem; use serde_json::json; +use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::time::{Duration, Instant}; @@ -52,6 +53,27 @@ fn fake_receiver_config( }) } +fn rate_limited_fake_receiver_config( + max_signal_count: u64, + max_batch_size: usize, + signals_per_second: usize, + enable_ack_nack: bool, +) -> serde_json::Value { + json!({ + "traffic_config": { + "signals_per_second": signals_per_second, + "max_signal_count": max_signal_count, + "max_batch_size": max_batch_size, + "metric_weight": 0, + "trace_weight": 0, + "log_weight": 100 + }, + "data_source": "static", + "generation_strategy": "pre_generated", + "enable_ack_nack": enable_ack_nack + }) +} + fn build_retry_pipeline_config( pipeline_group_id: &PipelineGroupId, pipeline_id: &PipelineId, @@ -126,6 +148,44 @@ fn build_batch_pipeline_config( .expect("failed to build batch liveness pipeline config") } +fn build_otlp_batch_local_wakeup_pipeline_config( + pipeline_group_id: &PipelineGroupId, + pipeline_id: &PipelineId, + counter_id: &str, +) -> PipelineConfig { + PipelineConfigBuilder::new() + .add_receiver( + "fake_receiver", + OTAP_FAKE_DATA_GENERATOR_URN, + Some(rate_limited_fake_receiver_config(5, 1, 1, true)), + ) + .add_processor( + "batch", + OTAP_BATCH_PROCESSOR_URN, + Some(json!({ + "format": "otlp", + "otlp": { + "min_size": 262144, + "sizer": "bytes" + }, + "max_batch_duration": "250ms" + })), + ) + .add_exporter( + "counting_exporter", + COUNTING_EXPORTER_URN, + Some(json!({"counter_id": counter_id})), + ) + .one_of("fake_receiver", ["batch"]) + .one_of("batch", ["counting_exporter"]) + .build( + PipelineType::Otap, + pipeline_group_id.clone(), + pipeline_id.clone(), + ) + .expect("failed to build local wakeup batch liveness pipeline config") +} + fn run_pipeline_with_condition( config: PipelineConfig, pipeline_group_id: &PipelineGroupId, @@ -223,6 +283,153 @@ fn run_pipeline_with_condition( ); } +#[derive(Debug, Default)] +struct BatchMetricsSnapshot { + fields: HashMap, +} + +impl BatchMetricsSnapshot { + fn get(&self, field: &str) -> u64 { + self.fields.get(field).copied().unwrap_or(u64::MAX) + } + + fn assert_eq(&self, field: &str, expected: u64) { + let actual = self.get(field); + assert_eq!( + actual, expected, + "{field}: expected {expected}, got {actual}" + ); + } +} + +fn capture_batch_metrics( + registry: &otap_df_telemetry::registry::TelemetryRegistryHandle, +) -> BatchMetricsSnapshot { + let mut snapshot = BatchMetricsSnapshot::default(); + registry.visit_current_metrics(|desc, _attrs, iter| { + if desc.name == "otap.processor.batch" { + for (field, value) in iter { + let _ = snapshot + .fields + .insert(field.name.to_owned(), value.to_u64_lossy()); + } + } + }); + snapshot +} + +fn run_pipeline_and_capture_batch_metrics( + config: PipelineConfig, + pipeline_group_id: &PipelineGroupId, + pipeline_id: &PipelineId, + max_duration: Duration, + shutdown_deadline: Duration, + shutdown_condition: F, +) -> BatchMetricsSnapshot +where + F: Fn() -> bool + Send + 'static, +{ + let telemetry_system = InternalTelemetrySystem::default(); + let registry = telemetry_system.registry(); + let collector = telemetry_system.collector(); + let controller_ctx = ControllerContext::new(registry.clone()); + let pipeline_ctx = controller_ctx.pipeline_context_with( + pipeline_group_id.clone(), + pipeline_id.clone(), + 0, + 1, + 0, + ); + let pipeline_entity_key = pipeline_ctx.register_pipeline_entity(); + let channel_capacity_policy = ChannelCapacityPolicy::default(); + let runtime_pipeline = OTAP_PIPELINE_FACTORY + .build( + pipeline_ctx.clone(), + config, + channel_capacity_policy.clone(), + TelemetryPolicy::default(), + None, // transport_headers_policy + None, // internal_telemetry + ) + .expect("failed to build runtime pipeline"); + + let (runtime_ctrl_tx, runtime_ctrl_rx) = + runtime_ctrl_msg_channel(channel_capacity_policy.control.pipeline); + let (pipeline_completion_tx, pipeline_completion_rx) = + pipeline_completion_msg_channel(channel_capacity_policy.control.completion); + let runtime_ctrl_tx_for_shutdown = runtime_ctrl_tx.clone(); + + let observed_state_store = + ObservedStateStore::new(&ObservedStateSettings::default(), registry.clone()); + let pipeline_key = DeployedPipelineKey { + pipeline_group_id: pipeline_group_id.clone(), + pipeline_id: pipeline_id.clone(), + core_id: 0, + }; + let metrics_reporter = telemetry_system.reporter(); + let event_reporter = observed_state_store.reporter(SendPolicy::default()); + + let capture_registry = registry.clone(); + let capture_collector = collector.clone(); + let shutdown_handle = std::thread::spawn(move || { + let start = Instant::now(); + let poll_interval = Duration::from_millis(10); + loop { + if start.elapsed() >= max_duration || shutdown_condition() { + break; + } + capture_collector.collect_pending(); + std::thread::sleep(poll_interval); + } + + let telemetry_wait = Duration::from_millis(1500); + let telemetry_start = Instant::now(); + while telemetry_start.elapsed() < telemetry_wait { + capture_collector.collect_pending(); + std::thread::sleep(poll_interval); + } + capture_collector.collect_pending(); + + let snapshot = capture_batch_metrics(&capture_registry); + + let deadline = Instant::now() + shutdown_deadline; + let _ = runtime_ctrl_tx_for_shutdown.try_send(RuntimeControlMsg::Shutdown { + deadline, + reason: "batch metrics capture shutdown".to_owned(), + }); + + snapshot + }); + + let run_result = { + let _pipeline_entity_guard = + set_pipeline_entity_key(pipeline_ctx.metrics_registry(), pipeline_entity_key); + let (_memory_pressure_tx, memory_pressure_rx) = tokio::sync::watch::channel( + otap_df_engine::memory_limiter::MemoryPressureChanged::initial(), + ); + runtime_pipeline.run_forever( + pipeline_key, + pipeline_ctx, + event_reporter, + metrics_reporter, + Duration::from_secs(1), + memory_pressure_rx, + runtime_ctrl_tx, + runtime_ctrl_rx, + pipeline_completion_tx, + pipeline_completion_rx, + ) + }; + let snapshot = shutdown_handle + .join() + .expect("batch metrics capture thread should succeed"); + assert!( + run_result.is_ok(), + "pipeline failed to shut down cleanly: {run_result:?}" + ); + snapshot +} + // This pipeline starts with a downstream exporter that transiently Nacks every // request. Once retries are demonstrably happening, the exporter flips to Ack // mode and the pipeline must eventually drain all admitted work. @@ -310,3 +517,57 @@ fn test_batch_pipeline_eventually_flushes_partial_batch() { ); counting_exporter::unregister_counter(test_id); } + +// This exercises a batch pipeline where: +// - the traffic generator emits 5 single-item OTLP log batches at 1 signal/sec +// - the batch processor uses byte sizing with a 256 KiB minimum size +// - each generated batch is intentionally tiny, so a size-triggered flush is +// impossible under this setup +// +// The test waits for all 5 items to reach the downstream counting exporter, +// then captures the batch processor metrics from the in-process telemetry +// registry. Under these inputs, the runtime guarantees we expect are: +// - the pipeline makes forward progress without any size-based flushes +// - every generated item is eventually exported downstream +// - every flush is attributed to `flushes.timer`, which means the processor's +// node-local wakeup path fired and delivered the timeout back through the +// real inbox/runtime path +// - the processor emits 5 output log batches after consuming 5 input log +// batches, so the wakeup-triggered flushes are producing real downstream +// pdata batches rather than being dropped internally +#[test] +fn test_batch_pipeline_uses_timer_wakeup_metrics_with_otlp_bytes_config() { + let pipeline_group_id: PipelineGroupId = "liveness-group".into(); + let pipeline_id: PipelineId = "batch-pipeline-local-wakeup".into(); + let test_id = "batch-pipeline-local-wakeup"; + let delivered_items = Arc::new(AtomicU64::new(0)); + counting_exporter::register_counter(test_id, delivered_items.clone()); + + let config = + build_otlp_batch_local_wakeup_pipeline_config(&pipeline_group_id, &pipeline_id, test_id); + let metrics = run_pipeline_and_capture_batch_metrics( + config, + &pipeline_group_id, + &pipeline_id, + Duration::from_secs(8), + Duration::from_secs(2), + { + let delivered_items = delivered_items.clone(); + move || delivered_items.load(Ordering::Acquire) >= 5 + }, + ); + + assert_eq!( + delivered_items.load(Ordering::Acquire), + 5, + "the local wakeup pipeline should export every generated item" + ); + metrics.assert_eq("consumed.items.logs", 5); + metrics.assert_eq("consumed.batches.logs", 5); + metrics.assert_eq("produced.items.logs", 5); + metrics.assert_eq("produced.batches.logs", 5); + metrics.assert_eq("flushes.size", 0); + metrics.assert_eq("flushes.timer", 5); + + counting_exporter::unregister_counter(test_id); +} diff --git a/rust/otap-dataflow/crates/pdata/src/validation/collector.rs b/rust/otap-dataflow/crates/pdata/src/validation/collector.rs index f5d67d3e39..b40a7c72de 100644 --- a/rust/otap-dataflow/crates/pdata/src/validation/collector.rs +++ b/rust/otap-dataflow/crates/pdata/src/validation/collector.rs @@ -40,6 +40,10 @@ pub static COLLECTOR_PATH: LazyLock = LazyLock::new(|| { path }); +pub(super) fn collector_available() -> bool { + Path::new(COLLECTOR_PATH.as_str()).exists() +} + /// Helper function to spawn an async task that reads lines from a buffer and logs them with a prefix. /// Optionally checks for a message substring and sends a signal when it matches. async fn spawn_line_reader( diff --git a/rust/otap-dataflow/crates/pdata/src/validation/scenarios.rs b/rust/otap-dataflow/crates/pdata/src/validation/scenarios.rs index 2b6404cf66..018b4a66a9 100644 --- a/rust/otap-dataflow/crates/pdata/src/validation/scenarios.rs +++ b/rust/otap-dataflow/crates/pdata/src/validation/scenarios.rs @@ -28,6 +28,14 @@ pub async fn run_single_round_trip_test( I::Response: std::fmt::Debug + PartialEq + Default, F: FnOnce() -> I::Request + 'static, { + if !super::collector::collector_available() { + eprintln!( + "Skipping validation test because collector binary is unavailable at '{}'.", + super::collector::COLLECTOR_PATH.as_str() + ); + return; + } + match run_single_round_trip::(create_request, expected_error).await { Ok(_) => {} Err(err) => {