Skip to content

[WIP] Add node-local wakeups to ProcessorInbox#2470

Draft
lquerel wants to merge 18 commits intoopen-telemetry:mainfrom
lquerel:feature/node-local-wakeup
Draft

[WIP] Add node-local wakeups to ProcessorInbox#2470
lquerel wants to merge 18 commits intoopen-telemetry:mainfrom
lquerel:feature/node-local-wakeup

Conversation

@lquerel
Copy link
Copy Markdown
Contributor

@lquerel lquerel commented Mar 31, 2026

Not ready for review. #2469 must be merged first to reduce the scope of this PR

Change Summary

Add a node-local wakeup API for processors and integrate it into ProcessorInbox.

This PR introduces keyed, cancelable wakeups that let a processor schedule local wakeup work without sending dummy pdata back through the old delayed data path. Due wakeups are surfaced through ProcessorInbox as control
messages.

This is the next step in the redesign tracked in #2465. The goal is to split two different behaviors that were previously mixed together:

  • wakeup-style local scheduling
  • delayed pdata resume

This PR handles the first one only. It migrates the wakeup-style usages in batch_processor and durable_buffer_processor, while intentionally leaving retry and the old global delayed-data path in place for the next PRs.

What issue does this PR close?

How are these changes tested?

  • Added/updated engine tests covering:
    • local wakeup scheduling
    • wakeup replacement and cancellation
    • ProcessorInbox delivery of due wakeups
    • shutdown behavior for pending wakeups
  • Added/updated processor tests covering:
    • batch wakeup-driven flush behavior
    • durable-buffer wakeup-driven retry/resume behavior
  • Verified with:
    • cargo xtask check

Are there any user-facing changes?

No user-facing changes.

This PR is an internal runtime change only.

@github-actions github-actions bot added the rust Pull requests that update Rust code label Mar 31, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Mar 31, 2026

Codecov Report

❌ Patch coverage is 91.71447% with 130 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.38%. Comparing base (d8e64e0) to head (4a0338d).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2470      +/-   ##
==========================================
+ Coverage   88.34%   88.38%   +0.03%     
==========================================
  Files         613      615       +2     
  Lines      222675   224019    +1344     
==========================================
+ Hits       196731   198005    +1274     
- Misses      25420    25490      +70     
  Partials      524      524              
Components Coverage Δ
otap-dataflow 90.28% <91.71%> (+0.03%) ⬆️
query_abstraction 80.61% <ø> (ø)
query_engine 90.74% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 52.45% <ø> (ø)
quiver 91.92% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lquerel lquerel force-pushed the feature/node-local-wakeup branch from 651b659 to 44d64d4 Compare April 2, 2026 17:52
Comment on lines +82 to +144
fn swap_entries(&mut self, left: usize, right: usize) {
if left == right {
return;
}

self.wakeups.swap(left, right);

let left_slot = self.wakeups[left].slot;
let right_slot = self.wakeups[right].slot;
let _ = self
.wakeup_indices
.insert(left_slot, left)
.expect("left slot index should exist");
let _ = self
.wakeup_indices
.insert(right_slot, right)
.expect("right slot index should exist");
}

fn sift_up(&mut self, mut index: usize) {
while index > 0 {
let parent = (index - 1) / 2;
if !Self::wakeup_precedes(&self.wakeups[index], &self.wakeups[parent]) {
break;
}
self.swap_entries(index, parent);
index = parent;
}
}

fn sift_down(&mut self, mut index: usize) {
let len = self.wakeups.len();
loop {
let left = index * 2 + 1;
if left >= len {
break;
}

let right = left + 1;
let mut smallest = left;
if right < len && Self::wakeup_precedes(&self.wakeups[right], &self.wakeups[left]) {
smallest = right;
}

if !Self::wakeup_precedes(&self.wakeups[smallest], &self.wakeups[index]) {
break;
}

self.swap_entries(index, smallest);
index = smallest;
}
}

fn repair_heap_at(&mut self, index: usize) {
if index > 0 {
let parent = (index - 1) / 2;
if Self::wakeup_precedes(&self.wakeups[index], &self.wakeups[parent]) {
self.sift_up(index);
return;
}
}
self.sift_down(index);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Rust really not have a crate for this? Hmm (thinking like Golang https://pkg.go.dev/container/heap). Don't get me wrong-- I love heaps! I would put this heap-helper stuff in a new data structure-only file.

}
}

/// Retry state for a bundle that could not acquire an engine wakeup slot yet.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is still a WIP, so not leaving detailed comments yet. Just wanted to chime in early on the approach to hopefully reduce overwork.

My general concern in the durable_buffer changes specifically (not the general mechanism) is that we'd be doing something relatively complex with the 'many wakeup slots + OverflowRetry' approach at a time when we can least afford to do so (i.e. under heavy NACK pressure from a network outage).

Instead of one wakeup slot per bundle, could we simplify by only maintaining a single WakeupSlot that tracks the earliest pending retry deadline. When a NACK arrives (or the wakeup fires and we need to reschedule), we compute the backoff time and call set_wakeup(slot, min(current_scheduled, new_retry_at)). When the wakeup fires, scan Quiver for all bundles whose backoff has elapsed and resend them. We still get exact timing for the next retry without per-bundle slot management. The state the durable_buffer_processor maintains reduces to one Instant and one wakeup slot. The OverflowRetry mechanism goes away in the durable buffer (no longer needed) and we don't need to coordinate between multiple scheduling methods.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds reasonable, I'll take a look at what I can do.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look at the updated code.

@lquerel
Copy link
Copy Markdown
Contributor Author

lquerel commented Apr 3, 2026

@AaronRM and @JakeDern , could you each review the integration I made of the new wakeup API, respectively in the durable buffer processor and the batch processor, replacing the delayed data control message that was used before?

@lquerel
Copy link
Copy Markdown
Contributor Author

lquerel commented Apr 4, 2026

@AaronRM At the durable buffer retry state level, there is still a potential issue with unbounded heap memory usage. With the new wakeup API, I think the situation is better than before, but the heap footprint can still grow with the number of deferred bundles.

During a prolonged transient outage with small bundles, the durable backlog can become large in terms of in-memory retry metadata. I did not try to fix this because it goes well beyond the integration of the wakeup API, but I think a long-term solution would be to move the retry_at metadata directly into durable storage so that retry bookkeeping remains bounded in memory.

That said, I don't have your full design in mind, and this may not be the best approach. In any case, I'll leave it to you to take a look.

Copy link
Copy Markdown
Contributor

@JakeDern JakeDern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch processor changes look reasonable to me. I assume we'll remove the DelayedData handling code before merge since it's obsolete now

Comment on lines +412 to +414
/// Wakeups are keyed by [`WakeupSlot`]. Scheduling the same slot again
/// replaces the previous due time for that slot and assigns a new
/// scheduler revision for that slot.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

/// Re-scheduling an existing slot gives it a new revision. Processors can use
/// the revision carried back in [`NodeControlMsg::Wakeup`] to distinguish a
/// current wakeup from a stale delivery for the same slot.
pub type WakeupRevision = u64;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad this is part of the engine! I was wondering if I'd have to do a similar thing at the processor level

@AaronRM
Copy link
Copy Markdown
Contributor

AaronRM commented Apr 6, 2026

@AaronRM At the durable buffer retry state level, there is still a potential issue with unbounded heap memory usage. With the new wakeup API, I think the situation is better than before, but the heap footprint can still grow with the number of deferred bundles.

Thanks @lquerel. The changes with the single slot redesign now look good to me from the durable_buffer perspective. 👍

To your concern re: unbounded heap growth, I filed #2552 to track that separately. I agree that it's not something your PR should address, although as you note, your changes already improve the situation slightly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

4 participants