Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -20125,6 +20125,23 @@
],
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.ExactlyOnce": {
"properties": {
"consistentAck": {
"description": "ConsistentAck enables consistent acknowledgement of offsets to ISB throughout the pipeline.",
"type": "boolean"
},
"dedupWindow": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "DedupWindow is the duration for which the deduplication will be enabled."
},
"enabled": {
"description": "Enabled enables exactly once processing.",
"type": "boolean"
}
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.FixedWindow": {
"description": "FixedWindow describes a fixed window",
"properties": {
Expand Down Expand Up @@ -21537,6 +21554,10 @@
},
"type": "array"
},
"exactlyOnce": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ExactlyOnce",
"description": "ExactlyOnce is the exactly-once settings for the pipeline."
},
"interStepBuffer": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.InterStepBuffer",
"description": "InterStepBuffer configuration specific to this pipeline."
Expand Down Expand Up @@ -22918,6 +22939,10 @@
"description": "Set DNS policy for the pod. Defaults to \"ClusterFirst\". Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'. DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy. To have DNS options set along with hostNetwork, you have to specify DNS policy explicitly to 'ClusterFirstWithHostNet'.",
"type": "string"
},
"exactlyOnce": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ExactlyOnce",
"description": "ExactlyOnce indicates the exactly-once settings for the vertex, it's populated from the pipeline exactlyOnce settings."
},
"fromEdges": {
"items": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.CombinedEdge"
Expand Down
25 changes: 25 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -20129,6 +20129,23 @@
}
}
},
"io.numaproj.numaflow.v1alpha1.ExactlyOnce": {
"type": "object",
"properties": {
"consistentAck": {
"description": "ConsistentAck enables consistent acknowledgement of offsets to ISB throughout the pipeline.",
"type": "boolean"
},
"dedupWindow": {
"description": "DedupWindow is the duration for which the deduplication will be enabled.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"enabled": {
"description": "Enabled enables exactly once processing.",
"type": "boolean"
}
}
},
"io.numaproj.numaflow.v1alpha1.FixedWindow": {
"description": "FixedWindow describes a fixed window",
"type": "object",
Expand Down Expand Up @@ -21524,6 +21541,10 @@
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Edge"
}
},
"exactlyOnce": {
"description": "ExactlyOnce is the exactly-once settings for the pipeline.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ExactlyOnce"
},
"interStepBuffer": {
"description": "InterStepBuffer configuration specific to this pipeline.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.InterStepBuffer"
Expand Down Expand Up @@ -22900,6 +22921,10 @@
"description": "Set DNS policy for the pod. Defaults to \"ClusterFirst\". Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'. DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy. To have DNS options set along with hostNetwork, you have to specify DNS policy explicitly to 'ClusterFirstWithHostNet'.",
"type": "string"
},
"exactlyOnce": {
"description": "ExactlyOnce indicates the exactly-once settings for the vertex, it's populated from the pipeline exactlyOnce settings.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ExactlyOnce"
},
"fromEdges": {
"type": "array",
"items": {
Expand Down
10 changes: 10 additions & 0 deletions cmd/commands/isbsvc_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"os"
"time"

"github.com/spf13/cobra"
"go.uber.org/zap"
Expand All @@ -40,6 +41,7 @@ func NewISBSvcCreateCommand() *cobra.Command {
buckets []string
sideInputsStore string
servingSourceStore string
dedupWindow string
)

command := &cobra.Command{
Expand Down Expand Up @@ -79,6 +81,13 @@ func NewISBSvcCreateCommand() *cobra.Command {
return err
}
opts = append(opts, isbsvc.WithConfig(isbSvcConfig.JetStream.StreamConfig))
if dedupWindow != "" {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if ppl update the delivery semantics to an existing pipeline?

d, err := time.ParseDuration(dedupWindow)
if err != nil {
return fmt.Errorf("failed to parse dedup window duration %q, %w", dedupWindow, err)
}
opts = append(opts, isbsvc.WithDedupWindow(d))
}
default:
cmd.HelpFunc()(cmd, args)
return fmt.Errorf("unsupported isb service type %q", isbSvcType)
Expand All @@ -98,5 +107,6 @@ func NewISBSvcCreateCommand() *cobra.Command {
command.Flags().StringSliceVar(&buckets, "buckets", []string{}, "Buckets to create") // --buckets=xxa,xxb --buckets=xxc
command.Flags().StringVar(&sideInputsStore, "side-inputs-store", "", "Name of the side inputs store")
command.Flags().StringVar(&servingSourceStore, "serving-store", "", "Serving source streams to create") // --serving-store=a
command.Flags().StringVar(&dedupWindow, "dedup-window", "", "Deduplication window duration for exactly-once processing, e.g. 2m")
return command
}
12 changes: 12 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ spec:
- to
type: object
type: array
exactlyOnce:
properties:
consistentAck:
default: true
type: boolean
dedupWindow:
default: 2m
type: string
enabled:
default: false
type: boolean
type: object
interStepBuffer:
properties:
compression:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ spec:
- to
type: object
type: array
exactlyOnce:
properties:
consistentAck:
default: true
type: boolean
dedupWindow:
default: 2m
type: string
enabled:
default: false
type: boolean
type: object
interStepBuffer:
properties:
compression:
Expand Down
12 changes: 12 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,18 @@ spec:
type: object
dnsPolicy:
type: string
exactlyOnce:
properties:
consistentAck:
default: true
type: boolean
dedupWindow:
default: 2m
type: string
enabled:
default: false
type: boolean
type: object
fromEdges:
items:
properties:
Expand Down
36 changes: 36 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9806,6 +9806,18 @@ spec:
- to
type: object
type: array
exactlyOnce:
properties:
consistentAck:
default: true
type: boolean
dedupWindow:
default: 2m
type: string
enabled:
default: false
type: boolean
type: object
interStepBuffer:
properties:
compression:
Expand Down Expand Up @@ -22911,6 +22923,18 @@ spec:
- to
type: object
type: array
exactlyOnce:
properties:
consistentAck:
default: true
type: boolean
dedupWindow:
default: 2m
type: string
enabled:
default: false
type: boolean
type: object
interStepBuffer:
properties:
compression:
Expand Down Expand Up @@ -37853,6 +37877,18 @@ spec:
type: object
dnsPolicy:
type: string
exactlyOnce:
properties:
consistentAck:
default: true
type: boolean
dedupWindow:
default: 2m
type: string
enabled:
default: false
type: boolean
type: object
fromEdges:
items:
properties:
Expand Down
36 changes: 36 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9806,6 +9806,18 @@ spec:
- to
type: object
type: array
exactlyOnce:
properties:
consistentAck:
default: true
type: boolean
dedupWindow:
default: 2m
type: string
enabled:
default: false
type: boolean
type: object
interStepBuffer:
properties:
compression:
Expand Down Expand Up @@ -22911,6 +22923,18 @@ spec:
- to
type: object
type: array
exactlyOnce:
properties:
consistentAck:
default: true
type: boolean
dedupWindow:
default: 2m
type: string
enabled:
default: false
type: boolean
type: object
interStepBuffer:
properties:
compression:
Expand Down Expand Up @@ -37853,6 +37877,18 @@ spec:
type: object
dnsPolicy:
type: string
exactlyOnce:
properties:
consistentAck:
default: true
type: boolean
dedupWindow:
default: 2m
type: string
enabled:
default: false
type: boolean
type: object
fromEdges:
items:
properties:
Expand Down
Loading
Loading