Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/otap-dataflow/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ RUN apt-get update && apt-get install -y protobuf-compiler
COPY --from=otel-arrow /proto/opentelemetry/proto /build/proto/opentelemetry/proto
COPY --from=otel-arrow /proto/opentelemetry-proto /build/proto/opentelemetry-proto
COPY --from=otel-arrow /rust/experimental /build/rust/experimental
COPY --from=otel-arrow /THIRD_PARTY_NOTICES.txt /build/THIRD_PARTY_NOTICES.txt

COPY . /build/rust/dataflow/.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::receivers::fake_data_generator::config::{
use async_trait::async_trait;
use linkme::distributed_slice;
use metrics::FakeSignalReceiverMetrics;
use otap_df_channel::error::RecvError;
use otap_df_config::node::NodeUserConfig;
use otap_df_engine::MessageSourceLocalEffectHandlerExtension;
use otap_df_engine::config::ReceiverConfig;
Expand Down Expand Up @@ -268,6 +269,37 @@ impl BatchCache {
}
}

/// Handle a control message received on the control channel.
///
/// Returns `Ok(Some(terminal_state))` when the receiver should exit,
/// `Ok(None)` when it should continue the event loop, or `Err` on a
/// channel error.
async fn handle_control_msg(
ctrl_msg: Result<NodeControlMsg<OtapPdata>, RecvError>,
effect_handler: &local::EffectHandler<OtapPdata>,
metrics: &mut MetricSet<FakeSignalReceiverMetrics>,
) -> Result<Option<TerminalState>, Error> {
match ctrl_msg {
Ok(NodeControlMsg::CollectTelemetry {
mut metrics_reporter,
}) => {
_ = metrics_reporter.report(metrics);
Ok(None)
}
Ok(NodeControlMsg::DrainIngress { deadline, .. }) => {
otel_info!("fake_data_generator.drain_ingress");
let _ = effect_handler.notify_receiver_drained().await;
Copy link
Copy Markdown
Member

@lalitb lalitb Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let _ = effect_handler.notify_receiver_drained().await;
effect_handler.notify_receiver_drained().await?;

Ok(Some(TerminalState::new(deadline, [metrics.snapshot()])))
}
Ok(NodeControlMsg::Shutdown { deadline, .. }) => {
otel_info!("fake_data_generator.shutdown");
Ok(Some(TerminalState::new(deadline, [metrics.snapshot()])))
}
Err(e) => Err(Error::ChannelRecvError(e)),
_ => Ok(None),
}
}

/// Implement the Receiver trait for the FakeGeneratorReceiver
#[async_trait(?Send)]
impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {
Expand Down Expand Up @@ -377,24 +409,8 @@ impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {
biased; //prioritize ctrl_msg over all other blocks
// Process internal event
ctrl_msg = ctrl_msg_recv.recv() => {
match ctrl_msg {
Ok(NodeControlMsg::CollectTelemetry {
mut metrics_reporter,
}) => {
_ = metrics_reporter.report(&mut self.metrics);
}
Ok(NodeControlMsg::Shutdown {deadline, ..}) => {
otel_info!(
"fake_data_generator.shutdown"
);
return Ok(TerminalState::new(deadline, [self.metrics.snapshot()]));
},
Err(e) => {
return Err(Error::ChannelRecvError(e));
}
_ => {
// unknown control message do nothing
}
if let Some(terminal) = handle_control_msg(ctrl_msg, &effect_handler, &mut self.metrics).await? {
return Ok(terminal);
}
}
// generate and send signal based on provided configuration
Expand Down Expand Up @@ -426,7 +442,18 @@ impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {
sleep_duration_ms = remaining_time.as_millis() as u64,
"Sleeping to maintain configured signal rate"
);
sleep(remaining_time).await;
// Listen for control messages during the rate-limit
// sleep so that DrainIngress/Shutdown are handled
// promptly instead of waiting for the full interval.
tokio::select! {
biased;
ctrl_msg = ctrl_msg_recv.recv() => {
if let Some(terminal) = handle_control_msg(ctrl_msg, &effect_handler, &mut self.metrics).await? {
return Ok(terminal);
}
}
_ = sleep(remaining_time) => {}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current sleep is making the DrainIngress/Shutdown responsible , but it is also changing the rate-limiting behavior - so any non-terminal control message handled as Ok(None) exist the sleep immediately and next batch can be sent before the original wait_till. We should replace the line 445-456 above with:

// Keep the original sleep deadline even if non-terminal control
// messages arrive. Only DrainIngress/Shutdown should interrupt
// the rate-limit wait early.
let sleep_until = sleep(remaining_time);
tokio::pin!(sleep_until);

loop {
    tokio::select! {
        biased;
        ctrl_msg = ctrl_msg_recv.recv() => {
            if let Some(terminal) =
                handle_control_msg(ctrl_msg, &effect_handler, &mut self.metrics).await?
            {
                return Ok(terminal);
            }
        }
        _ = &mut sleep_until => break,
    }
}

Copy link
Copy Markdown
Contributor Author

@sjmsft sjmsft Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the new changes + new test.

}
// ToDo: Handle negative time, not able to keep up with specified rate limit
} else {
Expand Down Expand Up @@ -1352,6 +1379,66 @@ mod tests {
.run_validation(validation_procedure_pregenerated());
}

/// Regression test: verifies that the receiver handles DrainIngress
/// promptly instead of stalling until the drain deadline expires.
/// Without proper DrainIngress handling the receiver would sleep
/// through the entire rate-limit interval, causing DrainDeadlineReached.
#[test]
fn test_drain_ingress_exits_promptly() {
let test_runtime = TestRuntime::new();

let registry_path = VirtualDirectoryPath::GitRepo {
url: "https://github.com/open-telemetry/semantic-conventions.git".to_owned(),
sub_folder: Some("model".to_owned()),
refspec: None,
};

// signals_per_second=1 means the receiver sleeps ~1s between sends.
// DrainIngress must interrupt that sleep and exit promptly.
let traffic_config = TrafficConfig::new(Some(1), None, 1, 0, 0, 1);
let config =
Config::new(traffic_config, registry_path).with_data_source(DataSource::Static);

let node_config = Arc::new(NodeUserConfig::new_receiver_config(
OTAP_FAKE_DATA_GENERATOR_URN,
));
let telemetry_registry_handle = TelemetryRegistryHandle::new();
let controller_ctx = ControllerContext::new(telemetry_registry_handle.clone());
let pipeline_ctx =
controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 1, 0);
let receiver = ReceiverWrapper::local(
FakeGeneratorReceiver::new(pipeline_ctx, config),
test_node("fake_receiver_drain"),
node_config,
test_runtime.config(),
);

let drain_scenario =
move |ctx: TestContext<OtapPdata>| -> Pin<Box<dyn Future<Output = ()>>> {
Box::pin(async move {
// Let the receiver start and enter its rate-limit sleep.
sleep(Duration::from_millis(200)).await;
let deadline = std::time::Instant::now() + Duration::from_secs(5);
ctx.send_control_msg(NodeControlMsg::DrainIngress {
deadline,
reason: "test drain".to_owned(),
})
.await
.expect("Failed to send DrainIngress");
})
};

let drain_validation =
|_ctx: NotSendValidateContext<OtapPdata>| -> Pin<Box<dyn Future<Output = ()>>> {
Box::pin(async {})
};

test_runtime
.set_receiver(receiver)
.run_test(drain_scenario)
.run_validation(drain_validation);
}

#[test]
fn test_resource_attribute_rotation_across_batches() {
use crate::receivers::fake_data_generator::config::ResourceAttributeSet;
Expand Down
Loading