Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/otap-dataflow/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ RUN apt-get update && apt-get install -y protobuf-compiler
COPY --from=otel-arrow /proto/opentelemetry/proto /build/proto/opentelemetry/proto
COPY --from=otel-arrow /proto/opentelemetry-proto /build/proto/opentelemetry-proto
COPY --from=otel-arrow /rust/experimental /build/rust/experimental
COPY --from=otel-arrow /THIRD_PARTY_NOTICES.txt /build/THIRD_PARTY_NOTICES.txt

COPY . /build/rust/dataflow/.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::receivers::fake_data_generator::config::{
use async_trait::async_trait;
use linkme::distributed_slice;
use metrics::FakeSignalReceiverMetrics;
use otap_df_channel::error::RecvError;
use otap_df_config::node::NodeUserConfig;
use otap_df_engine::MessageSourceLocalEffectHandlerExtension;
use otap_df_engine::config::ReceiverConfig;
Expand Down Expand Up @@ -268,6 +269,37 @@ impl BatchCache {
}
}

/// Handle a control message received on the control channel.
///
/// Returns `Ok(Some(terminal_state))` when the receiver should exit,
/// `Ok(None)` when it should continue the event loop, or `Err` on a
/// channel error.
async fn handle_control_msg(
ctrl_msg: Result<NodeControlMsg<OtapPdata>, RecvError>,
effect_handler: &local::EffectHandler<OtapPdata>,
metrics: &mut MetricSet<FakeSignalReceiverMetrics>,
) -> Result<Option<TerminalState>, Error> {
match ctrl_msg {
Ok(NodeControlMsg::CollectTelemetry {
mut metrics_reporter,
}) => {
_ = metrics_reporter.report(metrics);
Ok(None)
}
Ok(NodeControlMsg::DrainIngress { deadline, .. }) => {
otel_info!("fake_data_generator.drain_ingress");
effect_handler.notify_receiver_drained().await?;
Ok(Some(TerminalState::new(deadline, [metrics.snapshot()])))
}
Ok(NodeControlMsg::Shutdown { deadline, .. }) => {
otel_info!("fake_data_generator.shutdown");
Ok(Some(TerminalState::new(deadline, [metrics.snapshot()])))
}
Err(e) => Err(Error::ChannelRecvError(e)),
_ => Ok(None),
}
}

/// Implement the Receiver trait for the FakeGeneratorReceiver
#[async_trait(?Send)]
impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {
Expand Down Expand Up @@ -377,24 +409,8 @@ impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {
biased; //prioritize ctrl_msg over all other blocks
// Process internal event
ctrl_msg = ctrl_msg_recv.recv() => {
match ctrl_msg {
Ok(NodeControlMsg::CollectTelemetry {
mut metrics_reporter,
}) => {
_ = metrics_reporter.report(&mut self.metrics);
}
Ok(NodeControlMsg::Shutdown {deadline, ..}) => {
otel_info!(
"fake_data_generator.shutdown"
);
return Ok(TerminalState::new(deadline, [self.metrics.snapshot()]));
},
Err(e) => {
return Err(Error::ChannelRecvError(e));
}
_ => {
// unknown control message do nothing
}
if let Some(terminal) = handle_control_msg(ctrl_msg, &effect_handler, &mut self.metrics).await? {
return Ok(terminal);
}
}
// generate and send signal based on provided configuration
Expand Down Expand Up @@ -426,7 +442,23 @@ impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {
sleep_duration_ms = remaining_time.as_millis() as u64,
"Sleeping to maintain configured signal rate"
);
sleep(remaining_time).await;
// Keep the original sleep deadline if non-terminal control
// messages arrive. Only DrainIngress/Shutdown should interrupt
// the rate-limit wait early.
let sleep_until = sleep(remaining_time);
tokio::pin!(sleep_until);

loop {
tokio::select! {
biased;
ctrl_msg = ctrl_msg_recv.recv() => {
if let Some(terminal) = handle_control_msg(ctrl_msg, &effect_handler, &mut self.metrics).await? {
return Ok(terminal);
}
}
_ = &mut sleep_until => break,
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current sleep is making the DrainIngress/Shutdown responsible , but it is also changing the rate-limiting behavior - so any non-terminal control message handled as Ok(None) exist the sleep immediately and next batch can be sent before the original wait_till. We should replace the line 445-456 above with:

// Keep the original sleep deadline even if non-terminal control
// messages arrive. Only DrainIngress/Shutdown should interrupt
// the rate-limit wait early.
let sleep_until = sleep(remaining_time);
tokio::pin!(sleep_until);

loop {
    tokio::select! {
        biased;
        ctrl_msg = ctrl_msg_recv.recv() => {
            if let Some(terminal) =
                handle_control_msg(ctrl_msg, &effect_handler, &mut self.metrics).await?
            {
                return Ok(terminal);
            }
        }
        _ = &mut sleep_until => break,
    }
}

Copy link
Copy Markdown
Contributor Author

@sjmsft sjmsft Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the new changes + new test.

}
// ToDo: Handle negative time, not able to keep up with specified rate limit
} else {
Expand Down Expand Up @@ -1352,6 +1384,151 @@ mod tests {
.run_validation(validation_procedure_pregenerated());
}

/// Regression test: verifies that the receiver handles DrainIngress
/// promptly instead of stalling until the drain deadline expires.
/// Without proper DrainIngress handling the receiver would sleep
/// through the entire rate-limit interval, causing DrainDeadlineReached.
#[test]
fn test_drain_ingress_exits_promptly() {
let test_runtime = TestRuntime::new();

let registry_path = VirtualDirectoryPath::GitRepo {
url: "https://github.com/open-telemetry/semantic-conventions.git".to_owned(),
sub_folder: Some("model".to_owned()),
refspec: None,
};

// signals_per_second=1 means the receiver sleeps ~1s between sends.
// DrainIngress must interrupt that sleep and exit promptly.
let traffic_config = TrafficConfig::new(Some(1), None, 1, 0, 0, 1);
let config =
Config::new(traffic_config, registry_path).with_data_source(DataSource::Static);

let node_config = Arc::new(NodeUserConfig::new_receiver_config(
OTAP_FAKE_DATA_GENERATOR_URN,
));
let telemetry_registry_handle = TelemetryRegistryHandle::new();
let controller_ctx = ControllerContext::new(telemetry_registry_handle.clone());
let pipeline_ctx =
controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 1, 0);
let receiver = ReceiverWrapper::local(
FakeGeneratorReceiver::new(pipeline_ctx, config),
test_node("fake_receiver_drain"),
node_config,
test_runtime.config(),
);

let drain_scenario =
move |ctx: TestContext<OtapPdata>| -> Pin<Box<dyn Future<Output = ()>>> {
Box::pin(async move {
// Let the receiver start and enter its rate-limit sleep.
sleep(Duration::from_millis(200)).await;
let deadline = std::time::Instant::now() + Duration::from_secs(5);
ctx.send_control_msg(NodeControlMsg::DrainIngress {
deadline,
reason: "test drain".to_owned(),
})
.await
.expect("Failed to send DrainIngress");
})
};

let drain_validation =
|_ctx: NotSendValidateContext<OtapPdata>| -> Pin<Box<dyn Future<Output = ()>>> {
Box::pin(async {})
};

test_runtime
.set_receiver(receiver)
.run_test(drain_scenario)
.run_validation(drain_validation);
}

/// Regression test: verifies that a non-terminal control message
/// (CollectTelemetry) arriving during the rate-limit sleep does NOT
/// break the sleep early – the receiver should still respect the
/// original wait_till deadline.
#[test]
fn test_non_terminal_ctrl_msg_does_not_break_rate_limit_sleep() {
let test_runtime = TestRuntime::new();

let registry_path = VirtualDirectoryPath::GitRepo {
url: "https://github.com/open-telemetry/semantic-conventions.git".to_owned(),
sub_folder: Some("model".to_owned()),
refspec: None,
};

// signals_per_second=1 with a single log per iteration means the
// receiver will sleep ~1s between sends. If the non-terminal control
// message breaks the sleep, we'd see more than 2 batches in 1.5s.
let traffic_config = TrafficConfig::new(Some(1), None, 1, 0, 0, 1);
let config =
Config::new(traffic_config, registry_path).with_data_source(DataSource::Static);

let node_config = Arc::new(NodeUserConfig::new_receiver_config(
OTAP_FAKE_DATA_GENERATOR_URN,
));
let telemetry_registry_handle = TelemetryRegistryHandle::new();
let controller_ctx = ControllerContext::new(telemetry_registry_handle.clone());
let pipeline_ctx =
controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 1, 0);
let receiver = ReceiverWrapper::local(
FakeGeneratorReceiver::new(pipeline_ctx, config),
test_node("fake_receiver_ctrl_sleep"),
node_config,
test_runtime.config(),
);

let ctrl_scenario =
move |ctx: TestContext<OtapPdata>| -> Pin<Box<dyn Future<Output = ()>>> {
Box::pin(async move {
// Let the receiver start and enter its rate-limit sleep.
sleep(Duration::from_millis(200)).await;
// Fire a CollectTelemetry message mid-sleep. This is a
// non-terminal control message and must NOT break the
// rate-limit sleep.
let (_rx, metrics_reporter) =
otap_df_telemetry::reporter::MetricsReporter::create_new_and_receiver(1);
ctx.send_control_msg(NodeControlMsg::CollectTelemetry { metrics_reporter })
.await
.expect("Failed to send CollectTelemetry");

// Wait long enough for the first sleep to expire plus a
// small margin, but NOT long enough for a third iteration.
sleep(Duration::from_millis(1300)).await;

ctx.send_shutdown(std::time::Instant::now(), "Test")
.await
.expect("Failed to send Shutdown");
})
};

let ctrl_validation =
|mut ctx: NotSendValidateContext<OtapPdata>| -> Pin<Box<dyn Future<Output = ()>>> {
Box::pin(async move {
let mut received_batches: u64 = 0;

while let Ok(_received_signal) = ctx.recv().await {
received_batches += 1;
}

// With 1 signal/sec and ~1.5s total runtime we expect at
// most 2 batches. If the non-terminal control message
// broke the sleep, we would see 3+.
assert!(
received_batches <= 2,
"Non-terminal control message broke the rate-limit sleep: \
expected at most 2 batches, got {received_batches}"
);
})
};

test_runtime
.set_receiver(receiver)
.run_test(ctrl_scenario)
.run_validation(ctrl_validation);
}

#[test]
fn test_resource_attribute_rotation_across_batches() {
use crate::receivers::fake_data_generator::config::ResourceAttributeSet;
Expand Down
Loading