Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 213 additions & 10 deletions docs/user-guide/user-defined-functions/map/map.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,113 @@ spec:
readBatchSize: 1
```

Check the links below to see the UDF examples in streaming mode for different languages.
### Streaming Mode Examples

- [Python](https://github.com/numaproj/numaflow-python/tree/main/packages/pynumaflow/examples/mapstream/flatmap_stream/)
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/examples/mapstreamer/flatmap_stream/)
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/mapstream/flatmapstream/)
Below are examples showing how to implement map streaming for flat-map operations:

=== "Go"
```go
// FlatMap is a MapStreamer that split the input message into multiple messages and stream them.
type FlatMap struct {
}

func (f *FlatMap) MapStream(ctx context.Context, keys []string, d mapstreamer.Datum, messageCh chan<- mapstreamer.Message) {
// we have to close to indicate the end of the stream, otherwise the client will wait forever.
defer close(messageCh)
msg := d.Value()
_ = d.EventTime() // Event time is available
_ = d.Watermark() // Watermark is available
// Split the msg into an array with comma.
strs := strings.Split(string(msg), ",")
for _, s := range strs {
messageCh <- mapstreamer.NewMessage([]byte(s))
}
}
```
[View full examples on GitHub](https://github.com/numaproj/numaflow-go/tree/main/examples/mapstreamer/flatmap_stream/)

=== "Python"
```python
class FlatMapStream(MapStreamer):
async def handler(self, keys: list[str], datum: Datum) -> AsyncIterable[Message]:
"""
A handler that splits the input datum value into multiple strings by `,` separator and
emits them as a stream.
"""
val = datum.value
_ = datum.event_time
_ = datum.watermark
strs = val.decode("utf-8").split(",")

if len(strs) == 0:
yield Message.to_drop()
return
for s in strs:
yield Message(str.encode(s))


async def map_stream_handler(_: list[str], datum: Datum) -> AsyncIterable[Message]:
"""
A handler that splits the input datum value into multiple strings by `,` separator and
emits them as a stream.
"""
val = datum.value
_ = datum.event_time
_ = datum.watermark
strs = val.decode("utf-8").split(",")

if len(strs) == 0:
yield Message.to_drop()
return
for s in strs:
yield Message(str.encode(s))
```
[View full examples on GitHub](https://github.com/numaproj/numaflow-python/tree/main/packages/pynumaflow/examples/mapstream/flatmap_stream)

=== "Java"
```java
/**
* This is a simple User Defined Function example which processes the input message
* and produces more than one output messages(flatMap) in a streaming mode
* example : if the input message is "dog,cat", it streams two output messages
* "dog" and "cat"
*/

public class FlatMapStreamFunction extends MapStreamer {
public void processMessage(String[] keys, Datum data, OutputObserver outputObserver) {
String msg = new String(data.getValue());
String[] strs = msg.split(",");

for (String str : strs) {
outputObserver.send(new Message(str.getBytes()));
}
}
}
```
[View full examples on GitHub](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/mapstream/flatmapstream/)

=== "Rust"
```rust
struct Cat;

#[tonic::async_trait]
impl mapstream::MapStreamer for Cat {
async fn map_stream(&self, input: mapstream::MapStreamRequest, tx: Sender<Message>) {
let payload_str = String::from_utf8(input.value).unwrap_or_default();
let splits: Vec<&str> = payload_str.split(',').collect();

for split in splits {
let message = Message::new(split.as_bytes().to_vec())
.with_keys(input.keys.clone())
.with_tags(vec![]);
if tx.send(message).await.is_err() {
break;
}
}
}
}
```
[View full examples on GitHub](https://github.com/numaproj/numaflow-rs/tree/main/examples/flatmap-stream)

### Batch Map Mode

Expand All @@ -64,12 +166,113 @@ Each Datum has a unique ID tag, which will be used by Numaflow to ensure correct
every input data item, there should be a corresponding response in the BatchResponses list.
- The total batch size can be up to `readBatchSize` long.

Check the links below to see the UDF examples in batch mode for different languages.
### Batch Mode Examples

Below are examples showing how to implement batch map operations:

=== "Go"
```go
func batchMapFn(_ context.Context, datums <-chan batchmapper.Datum) batchmapper.BatchResponses {
batchResponses := batchmapper.BatchResponsesBuilder()
for d := range datums {
msg := d.Value()
_ = d.EventTime() // Event time is available
_ = d.Watermark() // Watermark is available
batchResponse := batchmapper.NewBatchResponse(d.Id())
strs := strings.Split(string(msg), ",")
for _, s := range strs {
batchResponse = batchResponse.Append(batchmapper.NewMessage([]byte(s)))
}

batchResponses = batchResponses.Append(batchResponse)
}
return batchResponses
}
```
[View full examples on GitHub](https://github.com/numaproj/numaflow-go/tree/main/examples/batchmapper/)

=== "Python"
```python
class Flatmap(BatchMapper):
"""
This is a class that inherits from the BatchMapper class.
It implements a flatmap operation over a batch of input messages
"""

async def handler(
self,
datums: AsyncIterable[Datum],
) -> BatchResponses:
batch_responses = BatchResponses()
async for datum in datums:
val = datum.value
_ = datum.event_time
_ = datum.watermark
strs = val.decode("utf-8").split(",")
batch_response = BatchResponse.from_id(datum.id)
if len(strs) == 0:
batch_response.append(Message.to_drop())
else:
for s in strs:
batch_response.append(Message(str.encode(s)))
batch_responses.append(batch_response)

return batch_responses
```
[View full examples on GitHub](https://github.com/numaproj/numaflow-python/tree/main/packages/pynumaflow/examples/batchmap)

=== "Java"
```java
public class BatchFlatMap extends BatchMapper {
@Override
public BatchResponses processMessage(DatumIterator datumStream) {
BatchResponses batchResponses = new BatchResponses();
while (true) {
Datum datum = null;
try {
datum = datumStream.next();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
continue;
}
// null means the iterator is closed so we are good to break the loop.
if (datum == null) {
break;
}
try {
String msg = new String(datum.getValue());
String[] strs = msg.split(",");
BatchResponse batchResponse = new BatchResponse(datum.getId());
for (String str : strs) {
batchResponse.append(new Message(str.getBytes()));
}
batchResponses.append(batchResponse);
} catch (Exception e) {
batchResponses.append(new BatchResponse(datum.getId()));
}
}
return batchResponses;
}
}
```
[View full examples on GitHub](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/batchmap/)

- [Python](https://github.com/numaproj/numaflow-python/tree/main/packages/pynumaflow/examples/batchmap/)
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/examples/batchmapper/)
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/batchmap/)
- [Rust](https://github.com/numaproj/numaflow-rs/tree/main/examples/batchmap-cat/)
=== "Rust"
```rust
#[tonic::async_trait]
impl batchmap::BatchMapper for Cat {
async fn batchmap(&self, mut input: tokio::sync::mpsc::Receiver<Datum>) -> Vec<BatchResponse> {
let mut responses: Vec<BatchResponse> = Vec::new();
while let Some(datum) = input.recv().await {
let mut response = BatchResponse::from_id(datum.id);
response.append(Message::new(datum.value).with_keys(datum.keys.clone()));
responses.push(response);
}
responses
}
}
```
[View full examples on GitHub](https://github.com/numaproj/numaflow-rs/tree/main/examples/batchmap-cat/)

### Available Environment Variables

Expand All @@ -89,4 +292,4 @@ Configuration data can be provided to the UDF container at runtime multiple ways
- `args`
- `command`
- [`volumes`](../../reference/configuration/volumes.md)
- [`init containers`](../../reference/configuration/init-containers.md)
- [`init containers`](../../reference/configuration/init-containers.md)
Loading