Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 114 additions & 124 deletions rust/numaflow-core/src/mapper/map.rs

Large diffs are not rendered by default.

51 changes: 36 additions & 15 deletions rust/numaflow-core/src/mapper/map/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use super::{
use crate::config::is_mono_vertex;
use crate::config::pipeline::VERTEX_TYPE_MAP_UDF;
use crate::error::{Error, Result};
use crate::message::Message;
use crate::mark_success_batch;
use crate::message::{Message, MessageHandle};
use crate::monovertex::bypass_router::MvtxBypassRouter;
use crate::tracker::Tracker;
use numaflow_pb::clients::map::{self, MapRequest, MapResponse, map_client::MapClient};
Expand Down Expand Up @@ -42,8 +43,8 @@ pub(in crate::mapper) struct BatchSenderMapState {
/// MapBatchTask encapsulates all the context needed to execute a batch map operation.
pub(in crate::mapper) struct MapBatchTask {
pub mapper: UserDefinedBatchMap,
pub batch: Vec<Message>,
pub output_tx: mpsc::Sender<Message>,
pub read_batch: Vec<MessageHandle>,
Copy link
Copy Markdown
Contributor

@vaibhavtiwari33 vaibhavtiwari33 Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename read_batch

pub output_tx: mpsc::Sender<MessageHandle>,
pub tracker: Tracker,
pub bypass_router: Option<MvtxBypassRouter>,
pub is_mono_vertex: bool,
Expand All @@ -55,10 +56,18 @@ impl MapBatchTask {
/// Returns an error if any message in the batch fails to be processed.
pub async fn execute(self) -> Result<()> {
// Store parent message info for each message before sending to UDF
let parent_infos: Vec<ParentMessageInfo> = self.batch.iter().map(|m| m.into()).collect();
let parent_infos: Vec<ParentMessageInfo> = self
.read_batch
.iter()
.map(|rm| rm.message().into())
.collect();

// Convert Messages to MapRequests
let requests: Vec<MapRequest> = self.batch.into_iter().map(|m| m.into()).collect();
let requests: Vec<MapRequest> = self
.read_batch
.iter()
.map(|rm| rm.message().clone().into())
.collect();

// Update read metrics for each request
for _ in &requests {
Expand Down Expand Up @@ -95,30 +104,42 @@ impl MapBatchTask {
)
.await?;

// Downstream messages are independent - they use a no-op AckHandle.
// The original read_batch is ACK'd via mark_success_batch! below.
for mapped_message in mapped_messages {
let bypassed = if let Some(ref bypass_router) = self.bypass_router {
bypass_router
.try_bypass(mapped_message.clone())
let read_msg: MessageHandle = mapped_message.into();
Copy link
Copy Markdown
Contributor

@vaibhavtiwari33 vaibhavtiwari33 Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, why independent ackHandles?

// Try to bypass the message. If bypassed, try_bypass takes ownership and returns None.
// If not bypassed, it returns Some(read_msg) for us to send downstream.
let read_msg = if let Some(ref bypass_router) = self.bypass_router {
match bypass_router
.try_bypass(read_msg)
.await
.expect("failed to send message to bypass channel")
{
Some(msg) => msg,
None => continue, // Message was bypassed, move to next
}
} else {
false
read_msg
};

if !bypassed {
self.output_tx
.send(mapped_message)
.await
.expect("failed to send response");
}
self.output_tx
.send(read_msg)
.await
.expect("failed to send response");
}
}
Err(e) => {
error!(err=?e, "failed to map message");
// read_batch will be dropped without mark_success, causing NAK
return Err(e);
}
}
}

// we have successfully processed the batch, mark the batch as success.
mark_success_batch!(self.read_batch);

Ok(())
}
}
Expand Down
61 changes: 30 additions & 31 deletions rust/numaflow-core/src/mapper/map/stream.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::Ordering;

use crate::config::is_mono_vertex;
use crate::error::{Error, Result};
use crate::message::Message;
use crate::mark_success;
use crate::message::{Message, MessageHandle};
use numaflow_pb::clients::map::{self, MapRequest, MapResponse, map_client::MapClient};
use tokio::sync::{OwnedSemaphorePermit, mpsc};
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -44,7 +44,7 @@ pub(in crate::mapper) struct StreamSenderMapState {
pub(in crate::mapper) struct MapStreamTask {
pub mapper: UserDefinedStreamMap,
pub permit: OwnedSemaphorePermit,
pub message: Message,
pub read_message: MessageHandle,
pub shared_ctx: Arc<SharedMapTaskContext>,
}

Expand All @@ -64,9 +64,9 @@ impl MapStreamTask {

// Store parent message info before sending to UDF
// parent_info contains offset, so we don't need to clone it separately
let mut parent_info: ParentMessageInfo = (&self.message).into();
let mut parent_info: ParentMessageInfo = self.read_message.message().into();

let request: MapRequest = self.message.into();
let request: MapRequest = self.read_message.message().clone().into();
update_udf_read_metric(self.shared_ctx.is_mono_vertex);

// Call the UDF and get receiver for raw results
Expand Down Expand Up @@ -106,53 +106,52 @@ impl MapStreamTask {
.await
.expect("failed to update tracker");

let bypassed =
// Downstream messages are independent - they use a no-op AckHandle.
// The original read_message is ACK'd via mark_success! below.
let read_msg: MessageHandle = mapped_message.into();
Copy link
Copy Markdown
Contributor

@vaibhavtiwari33 vaibhavtiwari33 Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a no-op AckHandle? Shouldn't the final ack/nack for the message only be triggered when downstream (isb writer/sink) finishes processing?


// Try to bypass the message. If bypassed, try_bypass takes ownership and returns None.
// If not bypassed, it returns Some(read_msg) for us to send downstream.
let read_msg =
if let Some(ref bypass_router) = self.shared_ctx.bypass_router {
bypass_router
.try_bypass(mapped_message.clone())
match bypass_router
.try_bypass(read_msg)
.await
.expect("failed to send message to bypass channel")
{
Some(msg) => msg,
None => continue, // Message was bypassed, move to next
}
} else {
false
read_msg
};

if !bypassed {
self.shared_ctx
.output_tx
.send(mapped_message)
.await
.expect("failed to send response");
}
self.shared_ctx
.output_tx
.send(read_msg)
.await
.expect("failed to send response");
}
}
Some(Err(e)) => {
error!(?e, "failed to map message");
parent_info
.ack_handle
.as_ref()
.expect("ack handle should be present")
.is_failed
.store(true, Ordering::Relaxed);
// read_message will be dropped without mark_success, causing NAK
let _ = self.shared_ctx.error_tx.send(e).await;
return;
}
None => {
// Channel closed. If no results were ever sent (current_index == 0),
// this means the UDF stream may have closed unexpectedly (e.g., panic or gRPC
// stream error where the sender was dropped without delivering an error).
// Mark the message as failed so that it gets nacked.
if parent_info.current_index == 0 {
parent_info
.ack_handle
.as_ref()
.expect("ack handle should be present")
.is_failed
.store(true, Ordering::Relaxed);
}
// read_message will be dropped without mark_success, causing NAK automatically.
break;
}
}
}

// Mark the original read message as success (decrement its ref_count).
// The message will only be ACK'd when all downstream messages also call mark_success().
mark_success!(self.read_message);
}
}

Expand Down
52 changes: 29 additions & 23 deletions rust/numaflow-core/src/mapper/map/unary.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::Ordering;

use crate::config::is_mono_vertex;
use crate::error::{Error, Result};
use crate::message::Message;
use crate::mark_success;
use crate::message::{Message, MessageHandle};
use numaflow_pb::clients::map::{self, MapRequest, MapResponse, map_client::MapClient};
use tokio::sync::{OwnedSemaphorePermit, mpsc, oneshot};
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -45,7 +45,7 @@ pub(in crate::mapper) struct MapUnaryTask {
/// Permit to achieve structured concurrency by ensuring we do not exceed the concurrency limit
/// and all the tasks are cleaned up when the component is shutting down.
pub permit: OwnedSemaphorePermit,
pub message: Message,
pub read_message: MessageHandle,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
msg_handle: MessageHandle

pub shared_ctx: Arc<SharedMapTaskContext>,
}

Expand All @@ -65,9 +65,9 @@ impl MapUnaryTask {

// Store parent message info before sending to UDF
// parent_info contains offset, so we don't need to clone it separately
let parent_info: ParentMessageInfo = (&self.message).into();
let parent_info: ParentMessageInfo = self.read_message.message().into();

let request: MapRequest = self.message.into();
let request: MapRequest = self.read_message.message().clone().into();
update_udf_read_metric(self.shared_ctx.is_mono_vertex);

// Call the UDF and get raw results
Expand All @@ -79,12 +79,7 @@ impl MapUnaryTask {
Ok(results) => results,
Err(e) => {
error!(?e, offset = ?parent_info.offset, "failed to map message");
parent_info
.ack_handle
.as_ref()
.expect("ack handle should be present")
.is_failed
.store(true, Ordering::Relaxed);
// read_message will be dropped without mark_success, causing NAK
let _ = self.shared_ctx.error_tx.send(e).await;
return;
}
Expand Down Expand Up @@ -115,25 +110,36 @@ impl MapUnaryTask {
.await
.expect("failed to update tracker");

// Send messages downstream
for mapped_message in mapped_messages {
let bypassed = if let Some(ref bypass_router) = self.shared_ctx.bypass_router {
bypass_router
.try_bypass(mapped_message.clone())
// Downstream messages are independent - they use a no-op AckHandle.
// The original read_message is ACK'd via mark_success! below.
let read_msg: MessageHandle = mapped_message.into();

// Try to bypass the message. If bypassed, try_bypass takes ownership and returns None.
// If not bypassed, it returns Some(read_msg) for us to send downstream.
let read_msg = if let Some(ref bypass_router) = self.shared_ctx.bypass_router {
match bypass_router
.try_bypass(read_msg)
.await
.expect("failed to send message to bypass channel")
{
Some(msg) => msg,
None => continue, // Message was bypassed, move to next
}
} else {
false
read_msg
};

if !bypassed {
self.shared_ctx
.output_tx
.send(mapped_message)
.await
.expect("failed to send response");
}
self.shared_ctx
.output_tx
.send(read_msg)
.await
.expect("failed to send response");
}

// Mark the original read message as success (decrement its ref_count).
// The message will only be ACK'd when all downstream messages also call mark_success().
mark_success!(self.read_message);
}
}

Expand Down
Loading
Loading