Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions bindings/zeebe/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package zeebe

import (
"errors"
"fmt"
"strings"
"time"

"github.com/camunda/zeebe/clients/go/v8/pkg/zbc"
Expand All @@ -26,6 +28,8 @@ import (

var ErrMissingGatewayAddr = errors.New("gatewayAddr is a required attribute")

var ErrInvalidOAuthMetadata = errors.New("invalid OAuth metadata")

// ClientFactory enables injection for testing.
type ClientFactory interface {
Get(metadata bindings.Metadata) (zbc.Client, error)
Expand All @@ -41,6 +45,12 @@ type ClientMetadata struct {
GatewayKeepAlive time.Duration `json:"gatewayKeepAlive" mapstructure:"gatewayKeepAlive"`
CaCertificatePath string `json:"caCertificatePath" mapstructure:"caCertificatePath"`
UsePlaintextConnection bool `json:"usePlainTextConnection,string" mapstructure:"usePlainTextConnection"`
ClientID string `json:"clientId" mapstructure:"clientId"`
ClientSecret string `json:"clientSecret" mapstructure:"clientSecret"`
AuthorizationServerURL string `json:"authorizationServerUrl" mapstructure:"authorizationServerUrl"`
TokenAudience string `json:"tokenAudience" mapstructure:"tokenAudience"`
TokenScope string `json:"tokenScope" mapstructure:"tokenScope"`
ClientConfigPath string `json:"clientConfigPath" mapstructure:"clientConfigPath"`
}

// NewClientFactoryImpl returns a new ClientFactory instance.
Expand All @@ -54,11 +64,17 @@ func (c *ClientFactoryImpl) Get(metadata bindings.Metadata) (zbc.Client, error)
return nil, err
}

credentialsProvider, err := meta.newCredentialsProvider()
if err != nil {
return nil, err
}

client, err := zbc.NewClient(&zbc.ClientConfig{
GatewayAddress: meta.GatewayAddr,
UsePlaintextConnection: meta.UsePlaintextConnection,
CaCertificatePath: meta.CaCertificatePath,
KeepAlive: meta.GatewayKeepAlive,
CredentialsProvider: credentialsProvider,
})
if err != nil {
return nil, err
Expand All @@ -80,3 +96,67 @@ func (c *ClientFactoryImpl) parseMetadata(meta bindings.Metadata) (*ClientMetada

return &m, nil
}

func (m *ClientMetadata) oauthConfigured() bool {
return m.ClientID != "" ||
m.ClientSecret != "" ||
m.AuthorizationServerURL != "" ||
m.TokenAudience != "" ||
m.TokenScope != "" ||
m.ClientConfigPath != ""
}

func (m *ClientMetadata) validateOAuthMetadata() error {
if !m.oauthConfigured() {
return nil
}

missing := make([]string, 0, 4)
if m.ClientID == "" {
missing = append(missing, "clientId")
}
if m.ClientSecret == "" {
missing = append(missing, "clientSecret")
}
if m.AuthorizationServerURL == "" {
missing = append(missing, "authorizationServerUrl")
}
if m.TokenAudience == "" {
missing = append(missing, "tokenAudience")
}

if len(missing) > 0 {
return fmt.Errorf("%w: when OAuth is configured, clientId, clientSecret, authorizationServerUrl, and tokenAudience must all be provided; missing: %s", ErrInvalidOAuthMetadata, strings.Join(missing, ", "))
}

return nil
}

func (m *ClientMetadata) newCredentialsProvider() (zbc.CredentialsProvider, error) {
if err := m.validateOAuthMetadata(); err != nil {
return nil, err
}

if !m.oauthConfigured() {
return nil, nil
}

providerConfig := &zbc.OAuthProviderConfig{
ClientID: m.ClientID,
ClientSecret: m.ClientSecret,
Audience: m.TokenAudience,
Scope: m.TokenScope,
AuthorizationServerURL: m.AuthorizationServerURL,
}

if m.ClientConfigPath != "" {
cache, err := zbc.NewOAuthYamlCredentialsCache(m.ClientConfigPath)
if err != nil {
return nil, err
}

providerConfig.Cache = cache
}

return zbc.NewOAuthCredentialsProvider(providerConfig)
}
75 changes: 75 additions & 0 deletions bindings/zeebe/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package zeebe

import (
"path/filepath"
"testing"
"time"

Expand All @@ -31,6 +32,12 @@ func TestParseMetadata(t *testing.T) {
"gatewayKeepAlive": "5s",
"caCertificatePath": "/cert/path",
"usePlaintextConnection": "true",
"clientId": "zeebe-client",
"clientSecret": "zeebe-secret",
"authorizationServerUrl": "https://issuer.example.com/oauth/token",
"tokenAudience": "zeebe-api",
"tokenScope": "read write",
"clientConfigPath": "/tmp/zeebe-cache.yaml",
}}}
client := ClientFactoryImpl{logger: logger.NewLogger("test")}
meta, err := client.parseMetadata(m)
Expand All @@ -39,6 +46,12 @@ func TestParseMetadata(t *testing.T) {
assert.Equal(t, 5*time.Second, meta.GatewayKeepAlive)
assert.Equal(t, "/cert/path", meta.CaCertificatePath)
assert.True(t, meta.UsePlaintextConnection)
assert.Equal(t, "zeebe-client", meta.ClientID)
assert.Equal(t, "zeebe-secret", meta.ClientSecret)
assert.Equal(t, "https://issuer.example.com/oauth/token", meta.AuthorizationServerURL)
assert.Equal(t, "zeebe-api", meta.TokenAudience)
assert.Equal(t, "read write", meta.TokenScope)
assert.Equal(t, "/tmp/zeebe-cache.yaml", meta.ClientConfigPath)
}

func TestGatewayAddrMetadataIsMandatory(t *testing.T) {
Expand All @@ -58,4 +71,66 @@ func TestParseMetadataDefaultValues(t *testing.T) {
assert.Equal(t, time.Duration(0), meta.GatewayKeepAlive)
assert.Equal(t, "", meta.CaCertificatePath)
assert.False(t, meta.UsePlaintextConnection)
assert.Equal(t, "", meta.ClientID)
assert.Equal(t, "", meta.ClientSecret)
assert.Equal(t, "", meta.AuthorizationServerURL)
assert.Equal(t, "", meta.TokenAudience)
assert.Equal(t, "", meta.TokenScope)
assert.Equal(t, "", meta.ClientConfigPath)
}

func TestNewCredentialsProviderSkipsWhenOAuthNotConfigured(t *testing.T) {
meta := &ClientMetadata{}

provider, err := meta.newCredentialsProvider()

require.NoError(t, err)
assert.Nil(t, provider)
}

func TestNewCredentialsProviderReturnsErrorOnInvalidOAuthMetadata(t *testing.T) {
meta := &ClientMetadata{
AuthorizationServerURL: "https://issuer.example.com/oauth/token",
TokenAudience: "zeebe-api",
ClientID: "zeebe-client",
}

provider, err := meta.newCredentialsProvider()

assert.Nil(t, provider)
require.ErrorIs(t, err, ErrInvalidOAuthMetadata)
assert.Contains(t, err.Error(), "missing: clientSecret")
}

func TestNewCredentialsProviderReturnsErrorWhenOnlyOptionalOAuthFieldsProvided(t *testing.T) {
meta := &ClientMetadata{
TokenScope: "scopeA",
}

provider, err := meta.newCredentialsProvider()

assert.Nil(t, provider)
require.ErrorIs(t, err, ErrInvalidOAuthMetadata)
errMsg := err.Error()
assert.Contains(t, errMsg, "missing:")
assert.Contains(t, errMsg, "clientId")
assert.Contains(t, errMsg, "clientSecret")
assert.Contains(t, errMsg, "authorizationServerUrl")
assert.Contains(t, errMsg, "tokenAudience")
}

func TestNewCredentialsProviderCreatesOAuthProviderWithCustomCachePath(t *testing.T) {
meta := &ClientMetadata{
ClientID: "zeebe-client",
ClientSecret: "zeebe-secret",
AuthorizationServerURL: "https://issuer.example.com/oauth/token",
TokenAudience: "zeebe-api",
TokenScope: "scopeA",
ClientConfigPath: filepath.Join(t.TempDir(), "zeebe-credentials.yaml"),
}

provider, err := meta.newCredentialsProvider()

require.NoError(t, err)
assert.NotNil(t, provider)
}
31 changes: 31 additions & 0 deletions bindings/zeebe/command/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,35 @@ metadata:
required: false
description: The path to the CA cert
example: "/path/to/ca-cert"
type: string
- name: clientId
required: false
description: The OAuth client ID used to request an access token. Required when OAuth is configured, and must be set together with clientSecret, authorizationServerUrl, and tokenAudience.
example: "zeebe-client"
type: string
- name: clientSecret
required: false
sensitive: true
description: The OAuth client secret used to request an access token. Required when OAuth is configured, and must be set together with clientId, authorizationServerUrl, and tokenAudience.
example: "zeebe-secret"
type: string
- name: authorizationServerUrl
required: false
description: The OAuth authorization server URL used to obtain access tokens. Required when OAuth is configured, and must be set together with clientId, clientSecret, and tokenAudience.
example: "https://issuer.example.com/oauth/token"
type: string
- name: tokenAudience
required: false
description: The token audience for Zeebe API access. Required when OAuth is configured, and must be set together with clientId, clientSecret, and authorizationServerUrl.
example: "zeebe-api"
type: string
- name: tokenScope
required: false
description: Optional OAuth scope to request in the access token when OAuth is configured.
example: "read write"
type: string
- name: clientConfigPath
required: false
description: Optional path to the OAuth credentials cache file when OAuth is configured.
example: "/tmp/zeebe-credentials.yaml"
type: string
10 changes: 9 additions & 1 deletion bindings/zeebe/jobworker/jobworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ type jobWorkerMetadata struct {
RetryBackOff kitmd.Duration `mapstructure:"retryBackOff"`
}

// JobWorkerMetadata is an exported mirror type used by metadata reflection.
type JobWorkerMetadata jobWorkerMetadata

type componentMetadata struct {
zeebe.ClientMetadata `mapstructure:",squash"`
JobWorkerMetadata `mapstructure:",squash"`
}

type jobHandler struct {
callback bindings.Handler
logger logger.Logger
Expand Down Expand Up @@ -273,7 +281,7 @@ func (h *jobHandler) failJob(ctx context.Context, client worker.JobClient, job e

// GetComponentMetadata returns the metadata of the component.
func (z *ZeebeJobWorker) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
metadataStruct := jobWorkerMetadata{}
metadataStruct := componentMetadata{}
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.BindingType)
return
}
13 changes: 13 additions & 0 deletions bindings/zeebe/jobworker/jobworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,16 @@ func TestInit(t *testing.T) {
require.NoError(t, jobWorker.Close())
})
}

func TestGetComponentMetadataIncludesClientMetadata(t *testing.T) {
jobWorker := ZeebeJobWorker{}
metadataInfo := jobWorker.GetComponentMetadata()

assert.Contains(t, metadataInfo, "clientId")
assert.Contains(t, metadataInfo, "clientSecret")
assert.Contains(t, metadataInfo, "authorizationServerUrl")
assert.Contains(t, metadataInfo, "tokenAudience")
assert.Contains(t, metadataInfo, "tokenScope")
assert.Contains(t, metadataInfo, "clientConfigPath")
assert.Contains(t, metadataInfo, "jobType")
}
31 changes: 31 additions & 0 deletions bindings/zeebe/jobworker/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,37 @@ metadata:
description: The path to the CA cert
example: "/path/to/ca-cert"
type: string
- name: clientId
required: false
description: The OAuth client ID used to request an access token. Required when OAuth is configured, and must be set together with clientSecret, authorizationServerUrl, and tokenAudience.
example: "zeebe-client"
type: string
- name: clientSecret
required: false
sensitive: true
description: The OAuth client secret used to request an access token. Required when OAuth is configured, and must be set together with clientId, authorizationServerUrl, and tokenAudience.
example: "zeebe-secret"
type: string
- name: authorizationServerUrl
required: false
description: The OAuth authorization server URL used to obtain access tokens. Required when OAuth is configured, and must be set together with clientId, clientSecret, and tokenAudience.
example: "https://issuer.example.com/oauth/token"
type: string
- name: tokenAudience
required: false
description: The token audience for Zeebe API access. Required when OAuth is configured, and must be set together with clientId, clientSecret, and authorizationServerUrl.
example: "zeebe-api"
type: string
- name: tokenScope
required: false
description: Optional OAuth scope to request in the access token when OAuth is configured.
example: "read write"
type: string
- name: clientConfigPath
required: false
description: Optional path to the OAuth credentials cache file when OAuth is configured.
example: "/tmp/zeebe-credentials.yaml"
type: string
- name: workerName
required: false
description: The name of the worker activating the jobs, mostly used for logging purposes
Expand Down
Loading
Loading