diff --git a/rust/otap-dataflow/Dockerfile b/rust/otap-dataflow/Dockerfile index 56586d8b3e..d6b25b8bde 100644 --- a/rust/otap-dataflow/Dockerfile +++ b/rust/otap-dataflow/Dockerfile @@ -20,6 +20,7 @@ RUN apt-get update && apt-get install -y protobuf-compiler COPY --from=otel-arrow /proto/opentelemetry/proto /build/proto/opentelemetry/proto COPY --from=otel-arrow /proto/opentelemetry-proto /build/proto/opentelemetry-proto COPY --from=otel-arrow /rust/experimental /build/rust/experimental +COPY --from=otel-arrow /THIRD_PARTY_NOTICES.txt /build/THIRD_PARTY_NOTICES.txt COPY . /build/rust/dataflow/. diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/mod.rs index 0d73a4d816..ece14b8fe1 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/mod.rs @@ -11,6 +11,7 @@ use crate::receivers::fake_data_generator::config::{ use async_trait::async_trait; use linkme::distributed_slice; use metrics::FakeSignalReceiverMetrics; +use otap_df_channel::error::RecvError; use otap_df_config::node::NodeUserConfig; use otap_df_engine::MessageSourceLocalEffectHandlerExtension; use otap_df_engine::config::ReceiverConfig; @@ -268,6 +269,37 @@ impl BatchCache { } } +/// Handle a control message received on the control channel. +/// +/// Returns `Ok(Some(terminal_state))` when the receiver should exit, +/// `Ok(None)` when it should continue the event loop, or `Err` on a +/// channel error. +async fn handle_control_msg( + ctrl_msg: Result, RecvError>, + effect_handler: &local::EffectHandler, + metrics: &mut MetricSet, +) -> Result, Error> { + match ctrl_msg { + Ok(NodeControlMsg::CollectTelemetry { + mut metrics_reporter, + }) => { + _ = metrics_reporter.report(metrics); + Ok(None) + } + Ok(NodeControlMsg::DrainIngress { deadline, .. }) => { + otel_info!("fake_data_generator.drain_ingress"); + effect_handler.notify_receiver_drained().await?; + Ok(Some(TerminalState::new(deadline, [metrics.snapshot()]))) + } + Ok(NodeControlMsg::Shutdown { deadline, .. }) => { + otel_info!("fake_data_generator.shutdown"); + Ok(Some(TerminalState::new(deadline, [metrics.snapshot()]))) + } + Err(e) => Err(Error::ChannelRecvError(e)), + _ => Ok(None), + } +} + /// Implement the Receiver trait for the FakeGeneratorReceiver #[async_trait(?Send)] impl local::Receiver for FakeGeneratorReceiver { @@ -377,24 +409,8 @@ impl local::Receiver for FakeGeneratorReceiver { biased; //prioritize ctrl_msg over all other blocks // Process internal event ctrl_msg = ctrl_msg_recv.recv() => { - match ctrl_msg { - Ok(NodeControlMsg::CollectTelemetry { - mut metrics_reporter, - }) => { - _ = metrics_reporter.report(&mut self.metrics); - } - Ok(NodeControlMsg::Shutdown {deadline, ..}) => { - otel_info!( - "fake_data_generator.shutdown" - ); - return Ok(TerminalState::new(deadline, [self.metrics.snapshot()])); - }, - Err(e) => { - return Err(Error::ChannelRecvError(e)); - } - _ => { - // unknown control message do nothing - } + if let Some(terminal) = handle_control_msg(ctrl_msg, &effect_handler, &mut self.metrics).await? { + return Ok(terminal); } } // generate and send signal based on provided configuration @@ -426,7 +442,23 @@ impl local::Receiver for FakeGeneratorReceiver { sleep_duration_ms = remaining_time.as_millis() as u64, "Sleeping to maintain configured signal rate" ); - sleep(remaining_time).await; + // Keep the original sleep deadline if non-terminal control + // messages arrive. Only DrainIngress/Shutdown should interrupt + // the rate-limit wait early. + let sleep_until = sleep(remaining_time); + tokio::pin!(sleep_until); + + loop { + tokio::select! { + biased; + ctrl_msg = ctrl_msg_recv.recv() => { + if let Some(terminal) = handle_control_msg(ctrl_msg, &effect_handler, &mut self.metrics).await? { + return Ok(terminal); + } + } + _ = &mut sleep_until => break, + } + } } // ToDo: Handle negative time, not able to keep up with specified rate limit } else { @@ -1352,6 +1384,151 @@ mod tests { .run_validation(validation_procedure_pregenerated()); } + /// Regression test: verifies that the receiver handles DrainIngress + /// promptly instead of stalling until the drain deadline expires. + /// Without proper DrainIngress handling the receiver would sleep + /// through the entire rate-limit interval, causing DrainDeadlineReached. + #[test] + fn test_drain_ingress_exits_promptly() { + let test_runtime = TestRuntime::new(); + + let registry_path = VirtualDirectoryPath::GitRepo { + url: "https://github.com/open-telemetry/semantic-conventions.git".to_owned(), + sub_folder: Some("model".to_owned()), + refspec: None, + }; + + // signals_per_second=1 means the receiver sleeps ~1s between sends. + // DrainIngress must interrupt that sleep and exit promptly. + let traffic_config = TrafficConfig::new(Some(1), None, 1, 0, 0, 1); + let config = + Config::new(traffic_config, registry_path).with_data_source(DataSource::Static); + + let node_config = Arc::new(NodeUserConfig::new_receiver_config( + OTAP_FAKE_DATA_GENERATOR_URN, + )); + let telemetry_registry_handle = TelemetryRegistryHandle::new(); + let controller_ctx = ControllerContext::new(telemetry_registry_handle.clone()); + let pipeline_ctx = + controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 1, 0); + let receiver = ReceiverWrapper::local( + FakeGeneratorReceiver::new(pipeline_ctx, config), + test_node("fake_receiver_drain"), + node_config, + test_runtime.config(), + ); + + let drain_scenario = + move |ctx: TestContext| -> Pin>> { + Box::pin(async move { + // Let the receiver start and enter its rate-limit sleep. + sleep(Duration::from_millis(200)).await; + let deadline = std::time::Instant::now() + Duration::from_secs(5); + ctx.send_control_msg(NodeControlMsg::DrainIngress { + deadline, + reason: "test drain".to_owned(), + }) + .await + .expect("Failed to send DrainIngress"); + }) + }; + + let drain_validation = + |_ctx: NotSendValidateContext| -> Pin>> { + Box::pin(async {}) + }; + + test_runtime + .set_receiver(receiver) + .run_test(drain_scenario) + .run_validation(drain_validation); + } + + /// Regression test: verifies that a non-terminal control message + /// (CollectTelemetry) arriving during the rate-limit sleep does NOT + /// break the sleep early – the receiver should still respect the + /// original wait_till deadline. + #[test] + fn test_non_terminal_ctrl_msg_does_not_break_rate_limit_sleep() { + let test_runtime = TestRuntime::new(); + + let registry_path = VirtualDirectoryPath::GitRepo { + url: "https://github.com/open-telemetry/semantic-conventions.git".to_owned(), + sub_folder: Some("model".to_owned()), + refspec: None, + }; + + // signals_per_second=1 with a single log per iteration means the + // receiver will sleep ~1s between sends. If the non-terminal control + // message breaks the sleep, we'd see more than 2 batches in 1.5s. + let traffic_config = TrafficConfig::new(Some(1), None, 1, 0, 0, 1); + let config = + Config::new(traffic_config, registry_path).with_data_source(DataSource::Static); + + let node_config = Arc::new(NodeUserConfig::new_receiver_config( + OTAP_FAKE_DATA_GENERATOR_URN, + )); + let telemetry_registry_handle = TelemetryRegistryHandle::new(); + let controller_ctx = ControllerContext::new(telemetry_registry_handle.clone()); + let pipeline_ctx = + controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 1, 0); + let receiver = ReceiverWrapper::local( + FakeGeneratorReceiver::new(pipeline_ctx, config), + test_node("fake_receiver_ctrl_sleep"), + node_config, + test_runtime.config(), + ); + + let ctrl_scenario = + move |ctx: TestContext| -> Pin>> { + Box::pin(async move { + // Let the receiver start and enter its rate-limit sleep. + sleep(Duration::from_millis(200)).await; + // Fire a CollectTelemetry message mid-sleep. This is a + // non-terminal control message and must NOT break the + // rate-limit sleep. + let (_rx, metrics_reporter) = + otap_df_telemetry::reporter::MetricsReporter::create_new_and_receiver(1); + ctx.send_control_msg(NodeControlMsg::CollectTelemetry { metrics_reporter }) + .await + .expect("Failed to send CollectTelemetry"); + + // Wait long enough for the first sleep to expire plus a + // small margin, but NOT long enough for a third iteration. + sleep(Duration::from_millis(1300)).await; + + ctx.send_shutdown(std::time::Instant::now(), "Test") + .await + .expect("Failed to send Shutdown"); + }) + }; + + let ctrl_validation = + |mut ctx: NotSendValidateContext| -> Pin>> { + Box::pin(async move { + let mut received_batches: u64 = 0; + + while let Ok(_received_signal) = ctx.recv().await { + received_batches += 1; + } + + // With 1 signal/sec and ~1.5s total runtime we expect at + // most 2 batches. If the non-terminal control message + // broke the sleep, we would see 3+. + assert!( + received_batches <= 2, + "Non-terminal control message broke the rate-limit sleep: \ + expected at most 2 batches, got {received_batches}" + ); + }) + }; + + test_runtime + .set_receiver(receiver) + .run_test(ctrl_scenario) + .run_validation(ctrl_validation); + } + #[test] fn test_resource_attribute_rotation_across_batches() { use crate::receivers::fake_data_generator::config::ResourceAttributeSet;