Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 65 additions & 78 deletions rust/numaflow-core/src/mapper/map.rs

Large diffs are not rendered by default.

59 changes: 43 additions & 16 deletions rust/numaflow-core/src/mapper/map/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use super::{
use crate::config::is_mono_vertex;
use crate::config::pipeline::VERTEX_TYPE_MAP_UDF;
use crate::error::{Error, Result};
use crate::message::Message;
use crate::message::{Message, MessageHandle};
use crate::monovertex::bypass_router::MvtxBypassRouter;
use crate::tracker::Tracker;
use crate::{mark_failed, mark_success};
use numaflow_pb::clients::map::{self, MapRequest, MapResponse, map_client::MapClient};
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -42,8 +43,8 @@ pub(in crate::mapper) struct BatchSenderMapState {
/// MapBatchTask encapsulates all the context needed to execute a batch map operation.
pub(in crate::mapper) struct MapBatchTask {
pub mapper: UserDefinedBatchMap,
pub batch: Vec<Message>,
pub output_tx: mpsc::Sender<Message>,
pub read_batch: Vec<MessageHandle>,
Copy link
Copy Markdown
Contributor

@vaibhavtiwari33 vaibhavtiwari33 Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename read_batch

pub output_tx: mpsc::Sender<MessageHandle>,
pub tracker: Tracker,
pub bypass_router: Option<MvtxBypassRouter>,
pub is_mono_vertex: bool,
Expand All @@ -55,10 +56,18 @@ impl MapBatchTask {
/// Returns an error if any message in the batch fails to be processed.
pub async fn execute(self) -> Result<()> {
// Store parent message info for each message before sending to UDF
let parent_infos: Vec<ParentMessageInfo> = self.batch.iter().map(|m| m.into()).collect();
let parent_infos: Vec<ParentMessageInfo> = self
.read_batch
.iter()
.map(|rm| rm.message().into())
.collect();

// Convert Messages to MapRequests
let requests: Vec<MapRequest> = self.batch.into_iter().map(|m| m.into()).collect();
let requests: Vec<MapRequest> = self
.read_batch
.iter()
.map(|rm| rm.message().clone().into())
.collect();

// Update read metrics for each request
for _ in &requests {
Expand All @@ -68,7 +77,10 @@ impl MapBatchTask {
// Call the UDF and get results directly
let results = self.mapper.batch(requests, self.cln_token).await;

for (result, parent_info) in results.into_iter().zip(parent_infos.into_iter()) {
for (result, (msg_handle, parent_info)) in results
.into_iter()
.zip(self.read_batch.into_iter().zip(parent_infos.into_iter()))
{
match result {
Ok(results) => {
// Convert raw results to Messages using parent info
Expand Down Expand Up @@ -96,29 +108,44 @@ impl MapBatchTask {
.await?;

for mapped_message in mapped_messages {
let bypassed = if let Some(ref bypass_router) = self.bypass_router {
bypass_router
.try_bypass(mapped_message.clone())
// Each downstream handle shares the original ack tracking — ACK is
// deferred until all mapped messages are written to ISB/sink.
let downstream_handle = msg_handle.with_message(mapped_message);

// Try to bypass the message. If bypassed, try_bypass takes ownership and returns None.
// If not bypassed, it returns Some(downstream_handle) for us to send downstream.
let downstream_handle = if let Some(ref bypass_router) = self.bypass_router
{
match bypass_router
.try_bypass(downstream_handle)
.await
.expect("failed to send message to bypass channel")
{
Some(msg) => msg,
None => continue, // Message was bypassed, move to next
}
} else {
false
downstream_handle
};

if !bypassed {
self.output_tx
.send(mapped_message)
.await
.expect("failed to send response");
}
self.output_tx
.send(downstream_handle)
.await
.expect("failed to send response");
}

// Decrement the original ref_count for this message now that all downstream
// handles have been created and sent.
mark_success!(msg_handle);
}
Err(e) => {
error!(err=?e, "failed to map message");
mark_failed!(msg_handle, &e);
return Err(e);
}
}
}

Ok(())
}
}
Expand Down
64 changes: 30 additions & 34 deletions rust/numaflow-core/src/mapper/map/stream.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::Ordering;

use crate::config::is_mono_vertex;
use crate::error::{Error, Result};
use crate::message::Message;
use crate::message::{Message, MessageHandle};
use crate::{mark_failed, mark_success};
use numaflow_pb::clients::map::{self, MapRequest, MapResponse, map_client::MapClient};
use tokio::sync::{OwnedSemaphorePermit, mpsc};
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -44,7 +44,7 @@ pub(in crate::mapper) struct StreamSenderMapState {
pub(in crate::mapper) struct MapStreamTask {
pub mapper: UserDefinedStreamMap,
pub permit: OwnedSemaphorePermit,
pub message: Message,
pub msg_handle: MessageHandle,
pub shared_ctx: Arc<SharedMapTaskContext>,
}

Expand All @@ -64,9 +64,9 @@ impl MapStreamTask {

// Store parent message info before sending to UDF
// parent_info contains offset, so we don't need to clone it separately
let mut parent_info: ParentMessageInfo = (&self.message).into();
let mut parent_info: ParentMessageInfo = self.msg_handle.message().into();

let request: MapRequest = self.message.into();
let request: MapRequest = self.msg_handle.message().clone().into();
update_udf_read_metric(self.shared_ctx.is_mono_vertex);

// Call the UDF and get receiver for raw results
Expand Down Expand Up @@ -106,53 +106,49 @@ impl MapStreamTask {
.await
.expect("failed to update tracker");

let bypassed =
// Each downstream handle shares the original ack tracking — ACK is
// deferred until all mapped messages are written to ISB/sink.
let msg_handle = self.msg_handle.with_message(mapped_message);

// Try to bypass the message. If bypassed, try_bypass takes ownership and returns None.
// If not bypassed, it returns Some(msg_handle) for us to send downstream.
let msg_handle =
if let Some(ref bypass_router) = self.shared_ctx.bypass_router {
bypass_router
.try_bypass(mapped_message.clone())
match bypass_router
.try_bypass(msg_handle)
.await
.expect("failed to send message to bypass channel")
{
Some(msg) => msg,
None => continue, // Message was bypassed, move to next
}
} else {
false
msg_handle
};

if !bypassed {
self.shared_ctx
.output_tx
.send(mapped_message)
.await
.expect("failed to send response");
}
self.shared_ctx
.output_tx
.send(msg_handle)
.await
.expect("failed to send response");
}
}
Some(Err(e)) => {
error!(?e, "failed to map message");
parent_info
.ack_handle
.as_ref()
.expect("ack handle should be present")
.is_failed
.store(true, Ordering::Relaxed);
mark_failed!(self.msg_handle, &e);
let _ = self.shared_ctx.error_tx.send(e).await;
return;
}
None => {
// Channel closed. If no results were ever sent (current_index == 0),
// this means the UDF stream may have closed unexpectedly (e.g., panic or gRPC
// stream error where the sender was dropped without delivering an error).
// Mark the message as failed so that it gets nacked.
if parent_info.current_index == 0 {
parent_info
.ack_handle
.as_ref()
.expect("ack handle should be present")
.is_failed
.store(true, Ordering::Relaxed);
}
// Channel closed — stream ended cleanly (e.g., UDF returned empty results or
// finished after sending all results). Fall through to mark_success below.
break;
}
}
}

// Decrement the original ref_count now that we've accounted for all downstream messages.
mark_success!(self.msg_handle);
}
}

Expand Down
56 changes: 33 additions & 23 deletions rust/numaflow-core/src/mapper/map/unary.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::Ordering;

use crate::config::is_mono_vertex;
use crate::error::{Error, Result};
use crate::message::Message;
use crate::message::{Message, MessageHandle};
use crate::{mark_failed, mark_success};
use numaflow_pb::clients::map::{self, MapRequest, MapResponse, map_client::MapClient};
use tokio::sync::{OwnedSemaphorePermit, mpsc, oneshot};
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -45,7 +45,7 @@ pub(in crate::mapper) struct MapUnaryTask {
/// Permit to achieve structured concurrency by ensuring we do not exceed the concurrency limit
/// and all the tasks are cleaned up when the component is shutting down.
pub permit: OwnedSemaphorePermit,
pub message: Message,
pub msg_handle: MessageHandle,
pub shared_ctx: Arc<SharedMapTaskContext>,
}

Expand All @@ -65,9 +65,9 @@ impl MapUnaryTask {

// Store parent message info before sending to UDF
// parent_info contains offset, so we don't need to clone it separately
let parent_info: ParentMessageInfo = (&self.message).into();
let parent_info: ParentMessageInfo = self.msg_handle.message().into();

let request: MapRequest = self.message.into();
let request: MapRequest = self.msg_handle.message().clone().into();
update_udf_read_metric(self.shared_ctx.is_mono_vertex);

// Call the UDF and get raw results
Expand All @@ -79,12 +79,7 @@ impl MapUnaryTask {
Ok(results) => results,
Err(e) => {
error!(?e, offset = ?parent_info.offset, "failed to map message");
parent_info
.ack_handle
.as_ref()
.expect("ack handle should be present")
.is_failed
.store(true, Ordering::Relaxed);
mark_failed!(self.msg_handle, &e);
let _ = self.shared_ctx.error_tx.send(e).await;
return;
}
Expand Down Expand Up @@ -115,25 +110,40 @@ impl MapUnaryTask {
.await
.expect("failed to update tracker");

// Send messages downstream
for mapped_message in mapped_messages {
let bypassed = if let Some(ref bypass_router) = self.shared_ctx.bypass_router {
bypass_router
.try_bypass(mapped_message.clone())
// Each downstream handle shares the original ack tracking — ACK is deferred until
// all mapped messages are written to ISB/sink.
let msg_handle = self.msg_handle.with_message(mapped_message);

// Try to bypass the message. If bypassed, try_bypass takes ownership and returns None.
// If not bypassed, it returns Some(msg_handle) for us to send downstream.
let msg_handle = if let Some(ref bypass_router) = self.shared_ctx.bypass_router {
match bypass_router
.try_bypass(msg_handle)
.await
.expect("failed to send message to bypass channel")
{
Some(msg) => msg,
None => {
// Message was bypassed (already acked by bypass_router), move to next.
continue;
}
}
} else {
false
msg_handle
};

if !bypassed {
self.shared_ctx
.output_tx
.send(mapped_message)
.await
.expect("failed to send response");
}
self.shared_ctx
.output_tx
.send(msg_handle)
.await
.expect("failed to send response");
}

// Decrement the original ref_count now that we've accounted for all downstream messages.
// The original msg_handle held ref_count=1; mark_success brings it to 0 contribution,
// and the downstream handles will each call mark_success when written to ISB/sink.
mark_success!(self.msg_handle);
}
}

Expand Down
Loading
Loading