Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -21136,12 +21136,17 @@
},
"io.numaproj.numaflow.v1alpha1.MonoVertexLimits": {
"properties": {
"concurrency": {
"description": "Concurrency defines the maximum number of messages that can be actively in-flight (read but not yet acknowledged) at any given time. With read-ahead enabled, the data plane keeps reading new batches from the source until the number of in-flight messages reaches `concurrency`; once that ceiling is hit, one more batch may be pre-fetched and held ready so that completed messages can be replaced immediately. Therefore the maximum in-flight count is at most `concurrency + readBatchSize`. With read-ahead disabled (the default for MonoVertex, since the MonoVertex always reads from a source), the data plane drains the current batch fully before the next read, so the upper bound becomes `min(concurrency, readBatchSize)`. `readBatchSize` controls only the size of an individual read; `concurrency` controls how many messages can be processed in parallel. To force strictly sequential processing, set `concurrency` to 1 (read-ahead is already off by default for MonoVertex).",
"format": "int64",
"type": "integer"
},
"rateLimit": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit",
"description": "RateLimit for MonoVertex defines how many messages can be read from Source. This is computed by number of `read` calls per second multiplied by the `readBatchSize`. This is how RateLimit is calculated for MonoVertex and for Source vertices."
},
"readBatchSize": {
"description": "Read batch size from the source.",
"description": "Read batch size from the source. ReadBatchSize controls only how many messages are fetched in a single read call from the source; it is not a cap on how many messages may be in-flight (use `concurrency` for that).",
"format": "int64",
"type": "integer"
},
Expand Down Expand Up @@ -21530,12 +21535,17 @@
"format": "int64",
"type": "integer"
},
"concurrency": {
"description": "Concurrency defines the maximum number of messages that can be actively in-flight (read but not yet acknowledged) at any given time across each vertex of the pipeline. With read-ahead enabled, the data plane keeps reading new batches from the source/buffer until the number of in-flight messages reaches `concurrency`; once that ceiling is hit, one more batch may be pre-fetched and held ready so that completed messages can be replaced immediately. Therefore the maximum in-flight count per vertex is at most `concurrency + readBatchSize`. `readBatchSize` controls only the size of an individual read; `concurrency` controls how many messages can be processed in parallel. By default, read-ahead is disabled on source vertices (so re-reads on failure stay cheap and source ordering is preserved) and enabled on Map/Sink/ Reduce vertices. To force strictly sequential processing, set `concurrency` to 1 and disable read-ahead via the `NUMAFLOW_READ_AHEAD` environment variable on the vertex's container template. Can be overridden by the vertex's limit settings.",
"format": "int64",
"type": "integer"
},
"rateLimit": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit",
"description": "RateLimit is used to define the rate limit for all the vertices in the pipeline, it could be overridden by the vertex's limit settings. For source vertices, it will be set to rate divided by readBatchSize because for source vertices, the rate limit is defined by how many times the `Read` is called per second Reduce does not support RateLimit."
},
"readBatchSize": {
"description": "Read batch size for all the vertices in the pipeline, can be overridden by the vertex's limit settings.",
"description": "Read batch size for all the vertices in the pipeline, can be overridden by the vertex's limit settings. ReadBatchSize controls only how many messages are fetched in a single read call from the source/buffer; it is not a cap on how many messages may be in-flight (use `concurrency` for that).",
"format": "int64",
"type": "integer"
},
Expand Down Expand Up @@ -22902,12 +22912,17 @@
"format": "int64",
"type": "integer"
},
"concurrency": {
"description": "Concurrency defines the maximum number of messages that can be actively in-flight (read but not yet acknowledged) at any given time. With read-ahead enabled, the data plane keeps reading new batches from the source/buffer until the number of in-flight messages reaches `concurrency`; once that ceiling is hit, one more batch may be pre-fetched and held ready so that completed messages can be replaced immediately. Therefore the maximum in-flight count is at most `concurrency + readBatchSize`. `readBatchSize` controls only the size of an individual read; `concurrency` controls how many messages can be processed in parallel. It overrides the settings from pipeline limits. By default, read-ahead is disabled on source vertices and enabled on Map/Sink/Reduce vertices. To force strictly sequential processing, set `concurrency` to 1 and disable read-ahead via the `NUMAFLOW_READ_AHEAD` environment variable on the vertex's container template.",
"format": "int64",
"type": "integer"
},
"rateLimit": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit",
"description": "RateLimit is used to define the rate limit for the vertex, it overrides the settings from pipeline limits. For Source vertices, the rate limit is defined by how many times the `Read` is called per second multiplied by the `readBatchSize`. Pipeline level rate limit is not applied to Source vertices."
},
"readBatchSize": {
"description": "Read batch size from the source or buffer. It overrides the settings from pipeline limits.",
"description": "Read batch size from the source or buffer. It overrides the settings from pipeline limits. ReadBatchSize controls only how many messages are fetched in a single read call from the source/buffer; it is not a cap on how many messages may be in-flight (use `concurrency` for that).",
"format": "int64",
"type": "integer"
},
Expand Down
21 changes: 18 additions & 3 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -21132,12 +21132,17 @@
"io.numaproj.numaflow.v1alpha1.MonoVertexLimits": {
"type": "object",
"properties": {
"concurrency": {
"description": "Concurrency defines the maximum number of messages that can be actively in-flight (read but not yet acknowledged) at any given time. With read-ahead enabled, the data plane keeps reading new batches from the source until the number of in-flight messages reaches `concurrency`; once that ceiling is hit, one more batch may be pre-fetched and held ready so that completed messages can be replaced immediately. Therefore the maximum in-flight count is at most `concurrency + readBatchSize`. With read-ahead disabled (the default for MonoVertex, since the MonoVertex always reads from a source), the data plane drains the current batch fully before the next read, so the upper bound becomes `min(concurrency, readBatchSize)`. `readBatchSize` controls only the size of an individual read; `concurrency` controls how many messages can be processed in parallel. To force strictly sequential processing, set `concurrency` to 1 (read-ahead is already off by default for MonoVertex).",
"type": "integer",
"format": "int64"
},
"rateLimit": {
"description": "RateLimit for MonoVertex defines how many messages can be read from Source. This is computed by number of `read` calls per second multiplied by the `readBatchSize`. This is how RateLimit is calculated for MonoVertex and for Source vertices.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit"
},
"readBatchSize": {
"description": "Read batch size from the source.",
"description": "Read batch size from the source. ReadBatchSize controls only how many messages are fetched in a single read call from the source; it is not a cap on how many messages may be in-flight (use `concurrency` for that).",
"type": "integer",
"format": "int64"
},
Expand Down Expand Up @@ -21517,12 +21522,17 @@
"type": "integer",
"format": "int64"
},
"concurrency": {
"description": "Concurrency defines the maximum number of messages that can be actively in-flight (read but not yet acknowledged) at any given time across each vertex of the pipeline. With read-ahead enabled, the data plane keeps reading new batches from the source/buffer until the number of in-flight messages reaches `concurrency`; once that ceiling is hit, one more batch may be pre-fetched and held ready so that completed messages can be replaced immediately. Therefore the maximum in-flight count per vertex is at most `concurrency + readBatchSize`. `readBatchSize` controls only the size of an individual read; `concurrency` controls how many messages can be processed in parallel. By default, read-ahead is disabled on source vertices (so re-reads on failure stay cheap and source ordering is preserved) and enabled on Map/Sink/ Reduce vertices. To force strictly sequential processing, set `concurrency` to 1 and disable read-ahead via the `NUMAFLOW_READ_AHEAD` environment variable on the vertex's container template. Can be overridden by the vertex's limit settings.",
"type": "integer",
"format": "int64"
},
"rateLimit": {
"description": "RateLimit is used to define the rate limit for all the vertices in the pipeline, it could be overridden by the vertex's limit settings. For source vertices, it will be set to rate divided by readBatchSize because for source vertices, the rate limit is defined by how many times the `Read` is called per second Reduce does not support RateLimit.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit"
},
"readBatchSize": {
"description": "Read batch size for all the vertices in the pipeline, can be overridden by the vertex's limit settings.",
"description": "Read batch size for all the vertices in the pipeline, can be overridden by the vertex's limit settings. ReadBatchSize controls only how many messages are fetched in a single read call from the source/buffer; it is not a cap on how many messages may be in-flight (use `concurrency` for that).",
"type": "integer",
"format": "int64"
},
Expand Down Expand Up @@ -22880,12 +22890,17 @@
"type": "integer",
"format": "int64"
},
"concurrency": {
"description": "Concurrency defines the maximum number of messages that can be actively in-flight (read but not yet acknowledged) at any given time. With read-ahead enabled, the data plane keeps reading new batches from the source/buffer until the number of in-flight messages reaches `concurrency`; once that ceiling is hit, one more batch may be pre-fetched and held ready so that completed messages can be replaced immediately. Therefore the maximum in-flight count is at most `concurrency + readBatchSize`. `readBatchSize` controls only the size of an individual read; `concurrency` controls how many messages can be processed in parallel. It overrides the settings from pipeline limits. By default, read-ahead is disabled on source vertices and enabled on Map/Sink/Reduce vertices. To force strictly sequential processing, set `concurrency` to 1 and disable read-ahead via the `NUMAFLOW_READ_AHEAD` environment variable on the vertex's container template.",
"type": "integer",
"format": "int64"
},
"rateLimit": {
"description": "RateLimit is used to define the rate limit for the vertex, it overrides the settings from pipeline limits. For Source vertices, the rate limit is defined by how many times the `Read` is called per second multiplied by the `readBatchSize`. Pipeline level rate limit is not applied to Source vertices.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit"
},
"readBatchSize": {
"description": "Read batch size from the source or buffer. It overrides the settings from pipeline limits.",
"description": "Read batch size from the source or buffer. It overrides the settings from pipeline limits. ReadBatchSize controls only how many messages are fetched in a single read call from the source/buffer; it is not a cap on how many messages may be in-flight (use `concurrency` for that).",
"type": "integer",
"format": "int64"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2609,6 +2609,11 @@ spec:
type: object
limits:
properties:
concurrency:
default: 500
format: int64
minimum: 1
type: integer
rateLimit:
properties:
max:
Expand Down Expand Up @@ -2840,6 +2845,7 @@ spec:
readBatchSize:
default: 500
format: int64
minimum: 1
type: integer
readTimeout:
default: 1s
Expand Down
11 changes: 11 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ spec:
default: 80
format: int32
type: integer
concurrency:
default: 500
format: int64
minimum: 1
type: integer
rateLimit:
properties:
max:
Expand Down Expand Up @@ -383,6 +388,7 @@ spec:
readBatchSize:
default: 500
format: int64
minimum: 1
type: integer
readTimeout:
default: 1s
Expand Down Expand Up @@ -7239,6 +7245,10 @@ spec:
bufferUsageLimit:
format: int32
type: integer
concurrency:
format: int64
minimum: 1
type: integer
rateLimit:
properties:
max:
Expand Down Expand Up @@ -7469,6 +7479,7 @@ spec:
type: object
readBatchSize:
format: int64
minimum: 1
type: integer
readTimeout:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ spec:
default: 80
format: int32
type: integer
concurrency:
default: 500
format: int64
minimum: 1
type: integer
rateLimit:
properties:
max:
Expand Down Expand Up @@ -362,6 +367,7 @@ spec:
readBatchSize:
default: 500
format: int64
minimum: 1
type: integer
readTimeout:
default: 1s
Expand Down Expand Up @@ -7218,6 +7224,10 @@ spec:
bufferUsageLimit:
format: int32
type: integer
concurrency:
format: int64
minimum: 1
type: integer
rateLimit:
properties:
max:
Expand Down Expand Up @@ -7448,6 +7458,7 @@ spec:
type: object
readBatchSize:
format: int64
minimum: 1
type: integer
readTimeout:
type: string
Expand Down
25 changes: 25 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,10 @@ spec:
bufferUsageLimit:
format: int32
type: integer
concurrency:
format: int64
minimum: 1
type: integer
rateLimit:
properties:
max:
Expand Down Expand Up @@ -1016,6 +1020,7 @@ spec:
type: object
readBatchSize:
format: int64
minimum: 1
type: integer
readTimeout:
type: string
Expand All @@ -1040,6 +1045,10 @@ spec:
bufferUsageLimit:
format: int32
type: integer
concurrency:
format: int64
minimum: 1
type: integer
rateLimit:
properties:
max:
Expand Down Expand Up @@ -1270,6 +1279,7 @@ spec:
type: object
readBatchSize:
format: int64
minimum: 1
type: integer
readTimeout:
type: string
Expand Down Expand Up @@ -2250,6 +2260,10 @@ spec:
bufferUsageLimit:
format: int32
type: integer
concurrency:
format: int64
minimum: 1
type: integer
rateLimit:
properties:
max:
Expand Down Expand Up @@ -2480,6 +2494,7 @@ spec:
type: object
readBatchSize:
format: int64
minimum: 1
type: integer
readTimeout:
type: string
Expand Down Expand Up @@ -6907,6 +6922,10 @@ spec:
bufferUsageLimit:
format: int32
type: integer
concurrency:
format: int64
minimum: 1
type: integer
rateLimit:
properties:
max:
Expand Down Expand Up @@ -7137,6 +7156,7 @@ spec:
type: object
readBatchSize:
format: int64
minimum: 1
type: integer
readTimeout:
type: string
Expand All @@ -7161,6 +7181,10 @@ spec:
bufferUsageLimit:
format: int32
type: integer
concurrency:
format: int64
minimum: 1
type: integer
rateLimit:
properties:
max:
Expand Down Expand Up @@ -7391,6 +7415,7 @@ spec:
type: object
readBatchSize:
format: int64
minimum: 1
type: integer
readTimeout:
type: string
Expand Down
Loading
Loading