Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::receivers::fake_data_generator::config::{
use async_trait::async_trait;
use linkme::distributed_slice;
use metrics::FakeSignalReceiverMetrics;
use otap_df_channel::error::RecvError;
use otap_df_config::node::NodeUserConfig;
use otap_df_engine::MessageSourceLocalEffectHandlerExtension;
use otap_df_engine::config::ReceiverConfig;
Expand Down Expand Up @@ -264,6 +265,37 @@ impl BatchCache {
}
}

/// Handle a control message received on the control channel.
///
/// Returns `Ok(Some(terminal_state))` when the receiver should exit,
/// `Ok(None)` when it should continue the event loop, or `Err` on a
/// channel error.
async fn handle_control_msg(
ctrl_msg: Result<NodeControlMsg<OtapPdata>, RecvError>,
effect_handler: &local::EffectHandler<OtapPdata>,
metrics: &mut MetricSet<FakeSignalReceiverMetrics>,
) -> Result<Option<TerminalState>, Error> {
match ctrl_msg {
Ok(NodeControlMsg::CollectTelemetry {
mut metrics_reporter,
}) => {
_ = metrics_reporter.report(metrics);
Ok(None)
}
Ok(NodeControlMsg::DrainIngress { deadline, .. }) => {
otel_info!("fake_data_generator.drain_ingress");
let _ = effect_handler.notify_receiver_drained().await;
Ok(Some(TerminalState::new(deadline, [metrics.snapshot()])))
}
Ok(NodeControlMsg::Shutdown { deadline, .. }) => {
otel_info!("fake_data_generator.shutdown");
Ok(Some(TerminalState::new(deadline, [metrics.snapshot()])))
}
Err(e) => Err(Error::ChannelRecvError(e)),
_ => Ok(None),
}
}

/// Implement the Receiver trait for the FakeGeneratorReceiver
#[async_trait(?Send)]
impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {
Expand Down Expand Up @@ -372,24 +404,8 @@ impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {
biased; //prioritize ctrl_msg over all other blocks
// Process internal event
ctrl_msg = ctrl_msg_recv.recv() => {
match ctrl_msg {
Ok(NodeControlMsg::CollectTelemetry {
mut metrics_reporter,
}) => {
_ = metrics_reporter.report(&mut self.metrics);
}
Ok(NodeControlMsg::Shutdown {deadline, ..}) => {
otel_info!(
"fake_data_generator.shutdown"
);
return Ok(TerminalState::new(deadline, [self.metrics.snapshot()]));
},
Err(e) => {
return Err(Error::ChannelRecvError(e));
}
_ => {
// unknown control message do nothing
}
if let Some(terminal) = handle_control_msg(ctrl_msg, &effect_handler, &mut self.metrics).await? {
return Ok(terminal);
}
}
// generate and send signal based on provided configuration
Expand Down Expand Up @@ -421,7 +437,18 @@ impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {
sleep_duration_ms = remaining_time.as_millis() as u64,
"Sleeping to maintain configured signal rate"
);
sleep(remaining_time).await;
// Listen for control messages during the rate-limit
// sleep so that DrainIngress/Shutdown are handled
// promptly instead of waiting for the full interval.
tokio::select! {
biased;
ctrl_msg = ctrl_msg_recv.recv() => {
if let Some(terminal) = handle_control_msg(ctrl_msg, &effect_handler, &mut self.metrics).await? {
return Ok(terminal);
}
}
_ = sleep(remaining_time) => {}
}
}
// ToDo: Handle negative time, not able to keep up with specified rate limit
} else {
Expand Down
Loading