Skip to content

fix: handle DrainIngress in fake_data_generator to unblock graceful shutdown#2515

Open
sjmsft wants to merge 8 commits intoopen-telemetry:mainfrom
sjmsft:bug_2511
Open

fix: handle DrainIngress in fake_data_generator to unblock graceful shutdown#2515
sjmsft wants to merge 8 commits intoopen-telemetry:mainfrom
sjmsft:bug_2511

Conversation

@sjmsft
Copy link
Copy Markdown
Contributor

@sjmsft sjmsft commented Apr 2, 2026

Change Summary

The "Ack nack redesign" PR (3dca283) introduced a two-phase DrainIngress/ReceiverDrained shutdown protocol but missed updating the fake_data_generator receiver. Without the DrainIngress handler, the message falls into the _ => {} catch-all, notify_receiver_drained() is never called, the pipeline controller never removes the receiver from its pending set, and after the deadline expires it emits DrainDeadlineReached. This was causing pipeline-perf-test-basic to fail consistently.

What issue does this PR close?

pipeline-perf-test-basic unit test is failing.

How are these changes tested?

fake_data_generator and runtime_control_metrics tests were executed.

Are there any user-facing changes?

No, fake_data_generator is an internal test/load-generation receiver, not a user-facing component.

@sjmsft sjmsft requested a review from a team as a code owner April 2, 2026 16:46
@github-actions github-actions bot added the rust Pull requests that update Rust code label Apr 2, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 2, 2026

Codecov Report

❌ Patch coverage is 94.73684% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 88.37%. Comparing base (d8e64e0) to head (4862d79).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2515      +/-   ##
==========================================
+ Coverage   88.34%   88.37%   +0.02%     
==========================================
  Files         613      613              
  Lines      222675   222694      +19     
==========================================
+ Hits       196731   196805      +74     
+ Misses      25420    25365      -55     
  Partials      524      524              
Components Coverage Δ
otap-dataflow 90.27% <94.73%> (+0.03%) ⬆️
query_abstraction 80.61% <ø> (ø)
query_engine 90.74% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 52.45% <ø> (ø)
quiver 91.92% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lalitb
Copy link
Copy Markdown
Member

lalitb commented Apr 2, 2026

The sequencing here looks off. In graceful shutdown the runtime does:

DrainIngress -> ReceiverDrained -> downstream Shutdown.

This change makes fake_data_generator do

DrainIngress -> ReceiverDrained -> wait for Shutdown,

but that Shutdown is not part of the normal post-drain receiver path. For this receiver, once ingress is stopped there is no receiver-local work left to preserve, so it should exit directly on DrainIngress rather than report drained and then block waiting for another shutdown message.

Copy link
Copy Markdown
Member

@lalitb lalitb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please go through the comment here.

@lalitb
Copy link
Copy Markdown
Member

lalitb commented Apr 2, 2026

The correct fix should be:

DrainIngress -> notify_receiver_drained() -> return TerminalState immediately

Something like (not tested):

  Ok(NodeControlMsg::DrainIngress { deadline, .. }) => {
      otel_info!("fake_data_generator.drain_ingress");                                                                                                                                                                               
      effect_handler.notify_receiver_drained().await?;
      return Ok(TerminalState::new(deadline, [self.metrics.snapshot()]));                                                                                                                                                            
  }    

@lalitb
Copy link
Copy Markdown
Member

lalitb commented Apr 2, 2026

The fix now looks correct. However from CI failures, there looks like a shutdown race in fake_data_generator that is easy to hit on slower runners The test config uses signals_per_second = 1, so the receiver can sleep for close to 1 second between sends, while the shutdown deadline in test_telemetry_registries_cleanup is only 200ms. That means DrainIngress can arrive while the receiver is asleep, the runtime can move into forced shutdown before the receiver handles it, and then notify_receiver_drained().await? can fail with Channel is closed.

One option could be to address this in two places:

  • make the rate-limit sleep interruptible, since that looks like the root cause here.
if signals_per_second.is_some() {
    let remaining_time = wait_till - Instant::now();
    if remaining_time.as_secs_f64() > 0.0 {
        tokio::select! {
            biased;

            ctrl_msg = ctrl_msg_recv.recv() => {
                // handle DrainIngress / Shutdown during the rate-limit wait
                // using the same control-message handling as the main loop
            }

            _ = sleep(remaining_time) => {}
        }
    }
}
  • make notify_receiver_drained() best-effort on the terminal DrainIngress path, so a late control-plane teardown does not turn shutdown into an error.
Ok(NodeControlMsg::DrainIngress { deadline, .. }) => {
    otel_info!("fake_data_generator.drain_ingress");
    let _ = effect_handler.notify_receiver_drained().await;
    return Ok(TerminalState::new(deadline, [self.metrics.snapshot()]));
}

@sjmsft sjmsft requested a review from lalitb April 2, 2026 23:39
@lquerel
Copy link
Copy Markdown
Contributor

lquerel commented Apr 4, 2026

@sjmsft

[in addition to the sequence described by @lalitb ]

The exception is deadline-forced shutdown. If the drain deadline expires before the receiver reports drained, the runtime sends NodeControlMsg::Shutdown { deadline, reason } to any still-pending receivers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

pipeline-perf-test-basic unit test is failing

4 participants