Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions bindings/zeebe/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ type ClientMetadata struct {
GatewayKeepAlive time.Duration `json:"gatewayKeepAlive" mapstructure:"gatewayKeepAlive"`
CaCertificatePath string `json:"caCertificatePath" mapstructure:"caCertificatePath"`
UsePlaintextConnection bool `json:"usePlainTextConnection,string" mapstructure:"usePlainTextConnection"`
ClientID string `json:"clientId" mapstructure:"clientId"`
ClientSecret string `json:"clientSecret" mapstructure:"clientSecret"`
AuthorizationServerURL string `json:"authorizationServerUrl" mapstructure:"authorizationServerUrl"`
TokenAudience string `json:"tokenAudience" mapstructure:"tokenAudience"`
TokenScope string `json:"tokenScope" mapstructure:"tokenScope"`
ClientConfigPath string `json:"clientConfigPath" mapstructure:"clientConfigPath"`
}

// NewClientFactoryImpl returns a new ClientFactory instance.
Expand All @@ -54,11 +60,17 @@ func (c *ClientFactoryImpl) Get(metadata bindings.Metadata) (zbc.Client, error)
return nil, err
}

credentialsProvider, err := meta.newCredentialsProvider()
if err != nil {
return nil, err
}

client, err := zbc.NewClient(&zbc.ClientConfig{
GatewayAddress: meta.GatewayAddr,
UsePlaintextConnection: meta.UsePlaintextConnection,
CaCertificatePath: meta.CaCertificatePath,
KeepAlive: meta.GatewayKeepAlive,
CredentialsProvider: credentialsProvider,
})
if err != nil {
return nil, err
Expand All @@ -80,3 +92,37 @@ func (c *ClientFactoryImpl) parseMetadata(meta bindings.Metadata) (*ClientMetada

return &m, nil
}

func (m *ClientMetadata) oauthConfigured() bool {
return m.ClientID != "" ||
m.ClientSecret != "" ||
m.AuthorizationServerURL != "" ||
m.TokenAudience != "" ||
m.TokenScope != "" ||
m.ClientConfigPath != ""
}

func (m *ClientMetadata) newCredentialsProvider() (zbc.CredentialsProvider, error) {
if !m.oauthConfigured() {
return nil, nil
}

providerConfig := &zbc.OAuthProviderConfig{
ClientID: m.ClientID,
ClientSecret: m.ClientSecret,
Audience: m.TokenAudience,
Scope: m.TokenScope,
AuthorizationServerURL: m.AuthorizationServerURL,
}

if m.ClientConfigPath != "" {
cache, err := zbc.NewOAuthYamlCredentialsCache(m.ClientConfigPath)
if err != nil {
return nil, err
}

providerConfig.Cache = cache
}

return zbc.NewOAuthCredentialsProvider(providerConfig)
}
58 changes: 58 additions & 0 deletions bindings/zeebe/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package zeebe

import (
"path/filepath"
"testing"
"time"

Expand All @@ -31,6 +32,12 @@ func TestParseMetadata(t *testing.T) {
"gatewayKeepAlive": "5s",
"caCertificatePath": "/cert/path",
"usePlaintextConnection": "true",
"clientId": "zeebe-client",
"clientSecret": "zeebe-secret",
"authorizationServerUrl": "https://issuer.example.com/oauth/token",
"tokenAudience": "zeebe-api",
"tokenScope": "read write",
"clientConfigPath": "/tmp/zeebe-cache.yaml",
}}}
client := ClientFactoryImpl{logger: logger.NewLogger("test")}
meta, err := client.parseMetadata(m)
Expand All @@ -39,6 +46,12 @@ func TestParseMetadata(t *testing.T) {
assert.Equal(t, 5*time.Second, meta.GatewayKeepAlive)
assert.Equal(t, "/cert/path", meta.CaCertificatePath)
assert.True(t, meta.UsePlaintextConnection)
assert.Equal(t, "zeebe-client", meta.ClientID)
assert.Equal(t, "zeebe-secret", meta.ClientSecret)
assert.Equal(t, "https://issuer.example.com/oauth/token", meta.AuthorizationServerURL)
assert.Equal(t, "zeebe-api", meta.TokenAudience)
assert.Equal(t, "read write", meta.TokenScope)
assert.Equal(t, "/tmp/zeebe-cache.yaml", meta.ClientConfigPath)
}

func TestGatewayAddrMetadataIsMandatory(t *testing.T) {
Expand All @@ -58,4 +71,49 @@ func TestParseMetadataDefaultValues(t *testing.T) {
assert.Equal(t, time.Duration(0), meta.GatewayKeepAlive)
assert.Equal(t, "", meta.CaCertificatePath)
assert.False(t, meta.UsePlaintextConnection)
assert.Equal(t, "", meta.ClientID)
assert.Equal(t, "", meta.ClientSecret)
assert.Equal(t, "", meta.AuthorizationServerURL)
assert.Equal(t, "", meta.TokenAudience)
assert.Equal(t, "", meta.TokenScope)
assert.Equal(t, "", meta.ClientConfigPath)
}

func TestNewCredentialsProviderSkipsWhenOAuthNotConfigured(t *testing.T) {
meta := &ClientMetadata{}

provider, err := meta.newCredentialsProvider()

require.NoError(t, err)
assert.Nil(t, provider)
}

func TestNewCredentialsProviderReturnsErrorOnInvalidOAuthMetadata(t *testing.T) {
meta := &ClientMetadata{
AuthorizationServerURL: "https://issuer.example.com/oauth/token",
TokenAudience: "zeebe-api",
ClientID: "zeebe-client",
}

provider, err := meta.newCredentialsProvider()

assert.Nil(t, provider)
require.Error(t, err)
assert.Contains(t, err.Error(), "non-empty client secret")
}

func TestNewCredentialsProviderCreatesOAuthProviderWithCustomCachePath(t *testing.T) {
meta := &ClientMetadata{
ClientID: "zeebe-client",
ClientSecret: "zeebe-secret",
AuthorizationServerURL: "https://issuer.example.com/oauth/token",
TokenAudience: "zeebe-api",
TokenScope: "scopeA",
ClientConfigPath: filepath.Join(t.TempDir(), "zeebe-credentials.yaml"),
}

provider, err := meta.newCredentialsProvider()

require.NoError(t, err)
assert.NotNil(t, provider)
}
31 changes: 31 additions & 0 deletions bindings/zeebe/command/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,35 @@ metadata:
required: false
description: The path to the CA cert
example: "/path/to/ca-cert"
type: string
- name: clientId
required: false
description: The OAuth client ID used to request an access token.
example: "zeebe-client"
type: string
- name: clientSecret
required: false
sensitive: true
description: The OAuth client secret used to request an access token.
example: "zeebe-secret"
type: string
- name: authorizationServerUrl
required: false
description: The OAuth authorization server URL used to obtain access tokens.
example: "https://issuer.example.com/oauth/token"
type: string
- name: tokenAudience
required: false
description: The token audience for Zeebe API access.
example: "zeebe-api"
type: string
- name: tokenScope
required: false
description: Optional OAuth scope to request in the access token.
example: "read write"
type: string
- name: clientConfigPath
required: false
description: Optional path to the OAuth credentials cache file.
example: "/tmp/zeebe-credentials.yaml"
type: string
31 changes: 31 additions & 0 deletions bindings/zeebe/jobworker/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,37 @@ metadata:
description: The path to the CA cert
example: "/path/to/ca-cert"
type: string
- name: clientId
required: false
description: The OAuth client ID used to request an access token.
example: "zeebe-client"
type: string
- name: clientSecret
required: false
sensitive: true
description: The OAuth client secret used to request an access token.
example: "zeebe-secret"
type: string
- name: authorizationServerUrl
required: false
description: The OAuth authorization server URL used to obtain access tokens.
example: "https://issuer.example.com/oauth/token"
type: string
- name: tokenAudience
required: false
description: The token audience for Zeebe API access.
example: "zeebe-api"
type: string
- name: tokenScope
required: false
description: Optional OAuth scope to request in the access token.
example: "read write"
type: string
- name: clientConfigPath
required: false
description: Optional path to the OAuth credentials cache file.
example: "/tmp/zeebe-credentials.yaml"
type: string
- name: workerName
required: false
description: The name of the worker activating the jobs, mostly used for logging purposes
Expand Down
148 changes: 148 additions & 0 deletions tests/certification/bindings/zeebe/command/topology_oauth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2026 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package command_test

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strconv"
"sync/atomic"
"testing"
"time"

"github.com/camunda/zeebe/clients/go/v8/pkg/pb"
bindings_zeebe_command "github.com/dapr/components-contrib/bindings/zeebe/command"
zeebe_test "github.com/dapr/components-contrib/tests/certification/bindings/zeebe"
"github.com/dapr/components-contrib/tests/certification/embedded"
"github.com/dapr/components-contrib/tests/certification/flow"
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
"github.com/dapr/components-contrib/tests/certification/flow/retry"
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
dapr_testing "github.com/dapr/dapr/pkg/testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTopologyOperationWithOAuthMetadata(t *testing.T) {
ports, _ := dapr_testing.GetFreePorts(2)
grpcPort := ports[0]
httpPort := ports[1]

var oauthRequests int64
oauthSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&oauthRequests, 1)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"access_token":"cert-test-token","token_type":"Bearer","expires_in":3600}`))
}))
defer oauthSrv.Close()

resourcesPath := createOAuthTestResources(t, oauthSrv.URL)

testInvokeTopology := func(ctx flow.Context) error {
client := zeebe_test.GetDaprClient(grpcPort)
defer client.Close()

res, err := zeebe_test.ExecCommandOperation(
ctx,
client,
bindings_zeebe_command.TopologyOperation,
nil,
map[string]string{},
)
require.NoError(t, err)

topology := &pb.TopologyResponse{}
err = json.Unmarshal(res.Data, topology)
require.NoError(t, err)
require.NotEmpty(t, topology.Brokers)

res, err = zeebe_test.ExecCommandOperation(
ctx,
client,
bindings_zeebe_command.TopologyOperation,
nil,
map[string]string{},
)
require.NoError(t, err)

topology = &pb.TopologyResponse{}
err = json.Unmarshal(res.Data, topology)
require.NoError(t, err)
require.NotEmpty(t, topology.Brokers)

assert.Equal(t, int64(1), atomic.LoadInt64(&oauthRequests))
return nil
}

flow.New(t, "Test topology operation with OAuth metadata").
Step(dockercompose.Run("zeebe", zeebe_test.DockerComposeYaml)).
Step("Waiting for Zeebe Readiness...", retry.Do(time.Second*3, 10, zeebe_test.CheckZeebeConnection)).
Step(sidecar.Run(zeebe_test.SidecarName,
append(componentRuntimeOptions(),
embedded.WithoutApp(),
embedded.WithResourcesPath(resourcesPath),
embedded.WithDaprGRPCPort(strconv.Itoa(grpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(httpPort)),
)...,
)).
Step("Waiting for the component to start", flow.Sleep(10*time.Second)).
Step("Invoke topology operation", testInvokeTopology).
Run()
}

func createOAuthTestResources(t *testing.T, oauthServerURL string) string {
t.Helper()

resourcesPath := filepath.Join(t.TempDir(), "components")
err := os.MkdirAll(resourcesPath, 0o755)
require.NoError(t, err)

componentPath := filepath.Join(resourcesPath, "zeebe-command.yaml")
componentYAML := fmt.Sprintf(`apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: zeebe-command
spec:
type: bindings.zeebe.command
version: v1
metadata:
- name: gatewayAddr
value: localhost:26500
- name: gatewayKeepAlive
value: 45s
- name: usePlainTextConnection
value: true
- name: clientId
value: cert-test-client
- name: clientSecret
value: cert-test-secret
- name: authorizationServerUrl
value: "%s"
- name: tokenAudience
value: localhost
- name: tokenScope
value: test.scope
- name: clientConfigPath
value: "%s"
`, oauthServerURL, filepath.Join(resourcesPath, "zeebe-credentials.yaml"))

err = os.WriteFile(componentPath, []byte(componentYAML), 0o600)
require.NoError(t, err)

return resourcesPath
}
Loading
Loading