feat: introduce concurrency env for udf invocation#3317
Conversation
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3317 +/- ##
==========================================
- Coverage 82.63% 82.55% -0.09%
==========================================
Files 307 307
Lines 77544 77618 +74
==========================================
- Hits 64079 64076 -3
- Misses 12907 12985 +78
+ Partials 558 557 -1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
| true => MAX_ACK_PENDING / self.read_batch_size, | ||
| true => std::cmp::max(1, self.concurrency / self.read_batch_size.max(1)), |
There was a problem hiding this comment.
This drop is significant. We're effectively dropping max ack pending from 10000 -> 500. It should be clearly noted in release notes.
| max_ack_pending, | ||
| // Cap inflight messages on this ISB reader to the vertex's `concurrency`. The | ||
| // reader holds a semaphore of this size so that, even with read-ahead, we never | ||
| // have more than `concurrency + read_batch_size` messages in flight. | ||
| max_ack_pending: concurrency, |
There was a problem hiding this comment.
We're dropping the default max ack pending from 25k -> 500 for ISB reader. This should also be noted as part of release notes since the change is quite significant.
| if vCopy.IsOrdered() && (vCopy.IsMapUDF() || vCopy.IsASink()) { | ||
| if vCopy.Limits == nil { | ||
| vCopy.Limits = &dfv1.VertexLimits{} | ||
| } | ||
| vCopy.Limits.Concurrency = ptr.To[uint64](1) | ||
| } |
There was a problem hiding this comment.
Should we throw an error when concurrency > 1 for ordered processing instead of overwriting?
| value: "true" | ||
| ``` | ||
|
|
||
| With read-ahead enabled the upper bound on in-flight messages becomes **`concurrency + readBatchSize`** (the data plane keeps reading until the in-flight count hits `concurrency`, then may pre-fetch one more batch). |
There was a problem hiding this comment.
I don't think this is correct. On source with read ahead enabled we cap at min(concurrency, readBatchSize) in this implementation
| // GetConcurrency returns the maximum number of in-flight (read-but-not-acked) messages allowed | ||
| // at any time. It defaults to the read batch size when unset, which preserves the historical | ||
| // behavior where concurrency was implicitly bounded by the batch size. | ||
| func (v VertexLimits) GetConcurrency() uint64 { | ||
| if v.Concurrency != nil { | ||
| return *v.Concurrency | ||
| } | ||
| return v.GetReadBatchSize() | ||
| } | ||
|
|
There was a problem hiding this comment.
nit: is this getting used anywhere?
There was a problem hiding this comment.
Not used, but still want to keep it because of the default value.
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Summary
MonoVertexLimits,PipelineLimits, andVertexLimits, replacing the undocumentedNUMAFLOW_UDF_CONCURRENCYandMAX_ACK_PENDINGenv vars.READ_AHEADto a controller-injected env var with sensible defaults — false for source vertices and MonoVertex (cheap re-reads, source ordering preserved), true for Map/Sink/Reduce (keeps ISBs full). Operators can override on the container template.Behavior
With read-ahead enabled, the maximum in-flight count per vertex is concurrency + readBatchSize (the data plane keeps reading until in-flight hits concurrency, then may pre-fetch one more batch). With read-ahead disabled, it's min(
concurrency,readBatchSize).To force strictly sequential processing, set concurrency: 1 (and disable
READ_AHEADfor non-source vertices, or rely on the source-vertex default).