refactor: Nack Messages by default to avoid acking failed messages#3342
refactor: Nack Messages by default to avoid acking failed messages#3342
Conversation
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3342 +/- ##
==========================================
+ Coverage 82.55% 82.57% +0.01%
==========================================
Files 306 306
Lines 74445 74510 +65
==========================================
+ Hits 61460 61528 +68
Misses 12427 12427
+ Partials 558 555 -3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| fn clone(&self) -> Self { | ||
| self.ack_handle | ||
| .ref_count | ||
| .fetch_add(1, std::sync::atomic::Ordering::SeqCst); |
There was a problem hiding this comment.
Why do we need sequential consistency for this? Cloning would get expensive. Especially since this is for each message.
| if self.is_failed.load(std::sync::atomic::Ordering::Relaxed) { | ||
| ack_handle.send(ReadAck::Nak).expect("Failed to send nak"); | ||
| // NAK if ref_count is not 0 (meaning not all references were marked as success) | ||
| if self.ref_count.load(std::sync::atomic::Ordering::SeqCst) != 0 { |
There was a problem hiding this comment.
Similarly, here, IMO we don't need sequential consistency here. Wrapping AckHandle in Arc guarantees that at the time of drop, there wouldn't exist another thread holding this AckHandle for it to increase the value of the ref_count.
| pub(crate) fn mark_success(&self) { | ||
| self.ack_handle | ||
| .ref_count | ||
| .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); |
There was a problem hiding this comment.
Ok, I see why we're probably using SeqCst, to try to avoid scenarios where ref_count becomes negative. But I think, at the end of the day, we still can get away with Relaxed ordering since AckHandle is wrapped in Arc which already uses Release/Acquire, so the final count during Drop will be correct.
Also, similar to how there can be bugs because of missing out on marking a message as success, we can also create bugs by marking a message as success too many times (ref_count < 0). Should we limit the ref_count to only be decreased till 0?
| message.ack_handle = Some(Arc::new(AckHandle::new(resp_ack_tx))); | ||
| let ack_handle = Arc::new(AckHandle::new(resp_ack_tx)); | ||
|
|
||
| // insert the offset and the ack one shot in the tracker. |
There was a problem hiding this comment.
| // insert the offset and the ack one shot in the tracker. | |
| // insert the message (with offset) into the tracker |
| ack_handle.send(ReadAck::Ack).expect("Failed to send ack"); | ||
| let _ = ack_handle.send(ReadAck::Ack); |
There was a problem hiding this comment.
I agree that we shouldn't panic if sending the ack/nack fails, but since the tokio tasks spawned for listening to these ack/nacks aren't structured, we should have a warn log here to notify that the receiver task exit before receiving ack/nack.
| /// Permit to achieve structured concurrency by ensuring we do not exceed the concurrency limit | ||
| /// and all the tasks are cleaned up when the component is shutting down. | ||
| permit: OwnedSemaphorePermit, | ||
| read_message: MessageHandle, |
There was a problem hiding this comment.
nit:
msg_handle: MessageHandle
| /// and all the tasks are cleaned up when the component is shutting down. | ||
| pub permit: OwnedSemaphorePermit, | ||
| pub message: Message, | ||
| pub read_message: MessageHandle, |
There was a problem hiding this comment.
nit:
msg_handle: MessageHandle
| // Downstream messages are independent - they use a no-op AckHandle. | ||
| // The original read_message is ACK'd via mark_success! below. | ||
| let read_msg: MessageHandle = mapped_message.into(); |
There was a problem hiding this comment.
Why a no-op AckHandle? Shouldn't the final ack/nack for the message only be triggered when downstream (isb writer/sink) finishes processing?
| let bypassed = if let Some(ref bypass_router) = self.bypass_router { | ||
| bypass_router | ||
| .try_bypass(mapped_message.clone()) | ||
| let read_msg: MessageHandle = mapped_message.into(); |
There was a problem hiding this comment.
Same here, why independent ackHandles?
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
| pub(crate) struct SourceWatermarkEntry { | ||
| pub(crate) partition_id: u16, | ||
| pub(crate) event_time_ms: i64, | ||
| } | ||
|
|
||
| impl From<&MessageHandle> for SourceWatermarkEntry { | ||
| fn from(handle: &MessageHandle) -> Self { | ||
| let msg = handle.message(); | ||
| let partition_id = match &msg.offset { | ||
| Offset::Int(o) => o.partition_idx, | ||
| Offset::String(o) => o.partition_idx, | ||
| }; | ||
| Self { | ||
| partition_id, | ||
| event_time_ms: msg.event_time.timestamp_millis(), | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
I don't think we need to introduce this tbh. We're abstracting away the partition idx extraction logic which is only being used once.
Let's remove this.
There was a problem hiding this comment.
Keeping SourceWatermarkEntry so the watermark code doesn't need to know about MessageHandle. Without it we'd pass &[(u16, i64)] which is unclear. From is the standard way to do conversions in Rust.
There was a problem hiding this comment.
I think without it, we were directly passing the Message around earlier, but fair that we want to limit the scope of info available for watermark code.
| /// ref_count is not decremented, so the message will be NAK'd when the AckHandle is dropped. | ||
| /// The error is logged at NAK time. | ||
| pub(crate) fn mark_failed(self, reason: impl fmt::Display) { | ||
| *self.ack_handle.failure_reason.lock().unwrap() = Some(reason.to_string()); |
There was a problem hiding this comment.
Failure reasons would be overwritten everytime we call mark_failed for the same ack handle.
Also, can we remove unwrap here?
| /// mark_success is called. On drop: NAK if ref_count != 0, ACK if ref_count == 0. | ||
| ref_count: AtomicUsize, | ||
| /// Set by mark_failed to record why the message is being nacked. | ||
| failure_reason: Mutex<Option<String>>, |
There was a problem hiding this comment.
I'm not sure about the Mutex here.
Our final goal is to only capture the error message, it should not require locking overhead.
I see that we're overwriting the error message every time anyways. Let's use OnceLock to only capture the first mark_failure message. This will avoid Mutex usage.
| @@ -519,23 +518,21 @@ impl<C: crate::typ::NumaflowTypeConfig> Source<C> { | |||
|
|
|||
| let mut ack_handles = vec![]; | |||
There was a problem hiding this comment.
Can we change this to msg_handles instead, since that is what we're tracking in this vector. Will avoid confusions with actual ack_handle
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
# Conflicts: # rust/numaflow-core/src/mapper/map/batch.rs # rust/numaflow-core/src/reduce/pbq.rs # rust/numaflow-core/src/reduce/reducer/aligned/user_defined.rs
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
| let mut fallback_messages: Vec<Message> = vec![]; | ||
| let mut on_success_messages: Vec<Message> = vec![]; | ||
| let mut ack_handles = vec![]; | ||
| let mut read_messages: Vec<MessageHandle> = vec![]; |
There was a problem hiding this comment.
nit: rename to msg_handles
| pub mapper: UserDefinedBatchMap, | ||
| pub batch: Vec<Message>, | ||
| pub output_tx: mpsc::Sender<Message>, | ||
| pub read_batch: Vec<MessageHandle>, |
There was a problem hiding this comment.
nit: rename read_batch
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Previously, there was a potential for messages to be accidentally acked if a panic or error occurred mid-processing. This could happen because the default behavior was to ack messages when dropped, so any message that didn't get explicitly marked as failed could be silently acked and lost.
This PR introduces a
MessageHandletype that wraps messages and tracks acknowledgment state throughout the pipeline. The default behavior is now to nack, and a message is only acked after it has been fully processed and all downstream messages have been successfully written. Explicitmark_success()andmark_failed()calls replace the previous manual isFailed flag manipulation, making the ack/nack logic clearer and safer.This ensures that in panic or error scenarios, messages are correctly nacked and retried rather than being silently lost.