Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/apis/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ components:
- type: array
items:
type: string
description: '"*" or an array of enabled topics.'
description: '"*" or an array of enabled topics. Topic strings can include "*" as a wildcard matching any run of characters. When available topics are configured, wildcard patterns must match at least one available topic.'
example: "*"

Filter:
Expand Down
4 changes: 4 additions & 0 deletions docs/content/features/topics.mdoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ curl '{% $OUTPOST_API_BASE_URL %}/tenants/<TENANT_ID>/destinations' \
}'
```

Destination topics can also use `*` inside a topic string as a wildcard that matches any run of characters. For example, `user.*` matches `user.created` and `user.profile.updated`, `*.created` matches `user.created` and `order.created`, and `order.*.completed` matches `order.payment.completed`.

When available topics are configured, wildcard patterns must match at least one available topic.

## Event Fanout

A single published event is independently delivered to every destination that matches its topic. Each delivery attempt is tracked separately.
Expand Down
31 changes: 31 additions & 0 deletions internal/apirouter/destination_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,37 @@ func TestAPI_Destinations(t *testing.T) {
require.Equal(t, http.StatusUnprocessableEntity, resp.Code)
})

t.Run("wildcard topic pattern matching configured topic is accepted", func(t *testing.T) {
h := newAPITest(t)
h.tenantStore.UpsertTenant(t.Context(), tf.Any(tf.WithID("t1")))

req := h.jsonReq(http.MethodPost, "/api/v1/tenants/t1/destinations", map[string]any{
"type": "webhook",
"topics": []string{"user.*"},
"config": map[string]string{"url": "https://example.com/hook"},
})
resp := h.do(h.withAPIKey(req))

require.Equal(t, http.StatusCreated, resp.Code)
var dest destregistry.DestinationDisplay
require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &dest))
assert.Equal(t, models.Topics{"user.*"}, dest.Topics)
})

t.Run("wildcard topic pattern not matching configured topic returns 422", func(t *testing.T) {
h := newAPITest(t)
h.tenantStore.UpsertTenant(t.Context(), tf.Any(tf.WithID("t1")))

req := h.jsonReq(http.MethodPost, "/api/v1/tenants/t1/destinations", map[string]any{
"type": "webhook",
"topics": []string{"order.*"},
"config": map[string]string{"url": "https://example.com/hook"},
})
resp := h.do(h.withAPIKey(req))

require.Equal(t, http.StatusUnprocessableEntity, resp.Code)
})

t.Run("import timestamps", func(t *testing.T) {
t.Run("disabled_at preserved on create", func(t *testing.T) {
h := newAPITest(t)
Expand Down
61 changes: 60 additions & 1 deletion internal/models/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,15 @@ func (t *Topics) MatchesAll() bool {
}

func (t *Topics) MatchTopic(eventTopic string) bool {
return eventTopic == "" || eventTopic == "*" || t.MatchesAll() || slices.Contains(*t, eventTopic)
if eventTopic == "" || eventTopic == "*" || t.MatchesAll() {
return true
}
for _, topic := range *t {
if matchTopicPattern(topic, eventTopic) {
return true
}
}
return false
}

func (t *Topics) Validate(availableTopics []string) error {
Expand All @@ -166,13 +174,64 @@ func (t *Topics) Validate(availableTopics []string) error {
if topic == "*" {
return ErrInvalidTopics
}
if strings.Contains(topic, "*") {
if !topicPatternMatchesAny(topic, availableTopics) {
return ErrInvalidTopics
}
continue
}
if !slices.Contains(availableTopics, topic) {
return ErrInvalidTopics
}
}
return nil
}

func topicPatternMatchesAny(pattern string, topics []string) bool {
for _, topic := range topics {
if matchTopicPattern(pattern, topic) {
return true
}
}
return false
}

func matchTopicPattern(pattern, topic string) bool {
if pattern == topic {
return true
}
if !strings.Contains(pattern, "*") {
return false
}

patternIndex, topicIndex := 0, 0
starIndex, starTopicIndex := -1, 0
for topicIndex < len(topic) {
if patternIndex < len(pattern) && pattern[patternIndex] == topic[topicIndex] {
patternIndex++
topicIndex++
continue
}
if patternIndex < len(pattern) && pattern[patternIndex] == '*' {
starIndex = patternIndex
starTopicIndex = topicIndex
patternIndex++
continue
}
if starIndex != -1 {
patternIndex = starIndex + 1
starTopicIndex++
topicIndex = starTopicIndex
continue
}
return false
}
for patternIndex < len(pattern) && pattern[patternIndex] == '*' {
patternIndex++
}
return patternIndex == len(pattern)
}

func TopicsFromString(s string) Topics {
return Topics(strings.Split(s, ","))
}
111 changes: 111 additions & 0 deletions internal/models/entities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,26 @@ func TestDestinationTopics_Validate(t *testing.T) {
availableTopics: testutil.TestTopics,
validated: true,
},
{
topics: []string{"user.*"},
availableTopics: testutil.TestTopics,
validated: true,
},
{
topics: []string{"order.*"},
availableTopics: testutil.TestTopics,
validated: false,
},
{
topics: []string{"user.created", "order.*"},
availableTopics: testutil.TestTopics,
validated: false,
},
{
topics: []string{"order.*"},
availableTopics: []string{"order.created", "user.created"},
validated: true,
},
{
topics: []string{"*"},
availableTopics: testutil.TestTopics,
Expand Down Expand Up @@ -218,6 +238,54 @@ func TestTopics_MatchTopic(t *testing.T) {
eventTopic: "user.created",
expected: true,
},
{
name: "prefix wildcard matches topic family",
topics: []string{"user.*"},
eventTopic: "user.created",
expected: true,
},
{
name: "prefix wildcard matches nested topic family",
topics: []string{"user.*"},
eventTopic: "user.profile.updated",
expected: true,
},
{
name: "suffix wildcard matches topic family",
topics: []string{"*.created"},
eventTopic: "order.created",
expected: true,
},
{
name: "middle wildcard matches topic family",
topics: []string{"order.*.completed"},
eventTopic: "order.payment.completed",
expected: true,
},
{
name: "wildcard can match empty run of characters",
topics: []string{"user.*.created"},
eventTopic: "user..created",
expected: true,
},
{
name: "prefix wildcard does not match different prefix",
topics: []string{"user.*"},
eventTopic: "order.created",
expected: false,
},
{
name: "suffix wildcard does not match different suffix",
topics: []string{"*.created"},
eventTopic: "user.deleted",
expected: false,
},
{
name: "middle wildcard requires suffix",
topics: []string{"order.*.completed"},
eventTopic: "order.payment.failed",
expected: false,
},
// No match
{
name: "no topic match",
Expand Down Expand Up @@ -256,6 +324,49 @@ func TestTopics_MatchTopic(t *testing.T) {
}
}

func BenchmarkTopics_MatchTopic(b *testing.B) {
topicsExact := models.Topics{
"user.created",
"user.deleted",
"user.updated",
"order.created",
"order.deleted",
"order.updated",
}
topicsPattern := models.Topics{
"user.*",
"*.deleted",
"order.*.completed",
"invoice.created",
"invoice.deleted",
"invoice.updated",
}

b.Run("exact match", func(b *testing.B) {
for range b.N {
_ = topicsExact.MatchTopic("order.updated")
}
})

b.Run("exact miss", func(b *testing.B) {
for range b.N {
_ = topicsExact.MatchTopic("payment.created")
}
})

b.Run("pattern match", func(b *testing.B) {
for range b.N {
_ = topicsPattern.MatchTopic("order.payment.completed")
}
})

b.Run("pattern miss", func(b *testing.B) {
for range b.N {
_ = topicsPattern.MatchTopic("order.payment.failed")
}
})
}

func TestFilter_MarshalBinary(t *testing.T) {
t.Parallel()

Expand Down
69 changes: 69 additions & 0 deletions internal/tenantstore/drivertest/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,75 @@ func testMatch(t *testing.T, newHarness HarnessMaker) {
})
})

t.Run("MatchByWildcardTopic", func(t *testing.T) {
ctx := context.Background()
h, err := newHarness(ctx, t)
require.NoError(t, err)
t.Cleanup(h.Close)

store, err := h.MakeDriver(ctx)
require.NoError(t, err)

tenant := models.Tenant{ID: idgen.String()}
require.NoError(t, store.UpsertTenant(ctx, tenant))

destUserFamily := testutil.DestinationFactory.Any(
testutil.DestinationFactory.WithID("dest_user_family"),
testutil.DestinationFactory.WithTenantID(tenant.ID),
testutil.DestinationFactory.WithTopics([]string{"user.*"}),
)
destCreatedFamily := testutil.DestinationFactory.Any(
testutil.DestinationFactory.WithID("dest_created_family"),
testutil.DestinationFactory.WithTenantID(tenant.ID),
testutil.DestinationFactory.WithTopics([]string{"*.created"}),
)
destOrderCompletedFamily := testutil.DestinationFactory.Any(
testutil.DestinationFactory.WithID("dest_order_completed_family"),
testutil.DestinationFactory.WithTenantID(tenant.ID),
testutil.DestinationFactory.WithTopics([]string{"order.*.completed"}),
)
destExact := testutil.DestinationFactory.Any(
testutil.DestinationFactory.WithID("dest_exact"),
testutil.DestinationFactory.WithTenantID(tenant.ID),
testutil.DestinationFactory.WithTopics([]string{"user.created"}),
)

require.NoError(t, store.CreateDestination(ctx, destUserFamily))
require.NoError(t, store.CreateDestination(ctx, destCreatedFamily))
require.NoError(t, store.CreateDestination(ctx, destOrderCompletedFamily))
require.NoError(t, store.CreateDestination(ctx, destExact))

t.Run("matches prefix and suffix wildcard subscriptions", func(t *testing.T) {
event := testutil.EventFactory.Any(
testutil.EventFactory.WithTenantID(tenant.ID),
testutil.EventFactory.WithTopic("user.created"),
)
matched, err := store.MatchEvent(ctx, event)
require.NoError(t, err)
assert.ElementsMatch(t, []string{"dest_user_family", "dest_created_family", "dest_exact"}, matched)
})

t.Run("matches separator agnostic middle wildcard subscription", func(t *testing.T) {
event := testutil.EventFactory.Any(
testutil.EventFactory.WithTenantID(tenant.ID),
testutil.EventFactory.WithTopic("order.payment.completed"),
)
matched, err := store.MatchEvent(ctx, event)
require.NoError(t, err)
assert.ElementsMatch(t, []string{"dest_order_completed_family"}, matched)
})

t.Run("does not overmatch unrelated topic", func(t *testing.T) {
event := testutil.EventFactory.Any(
testutil.EventFactory.WithTenantID(tenant.ID),
testutil.EventFactory.WithTopic("order.payment.failed"),
)
matched, err := store.MatchEvent(ctx, event)
require.NoError(t, err)
assert.Empty(t, matched)
})
})

t.Run("MatchEventWithFilter", func(t *testing.T) {
ctx := context.Background()
h, err := newHarness(ctx, t)
Expand Down