Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,109 @@ func (f *TestFilter) IsAllowed(msg []byte) bool {
* Change the config.toml to add the filter provider config.
* Run kaf-relay with the filter plugin. `./kaf-relay.bin --mode single --stop-at-end --filter ./testfilter/testfilter.filter`

## Using as a library

The `pkg/relay` and `pkg/kafkatarget` packages can be imported to build custom relay daemons. The builtin Kafka target can be swapped for any backend by implementing the `relay.Target` interface.

Here is an example that shows a Redis target.

```go
package main

import (
"context"
"fmt"
"log/slog"
"strconv"

"github.com/redis/go-redis/v9"
"github.com/zerodha/kaf-relay/pkg/relay"
)

const watermarkKey = "_watermark" // Redis hash: topic -> partition -> offset

// RedisTarget implements relay.Target, writing messages to Redis streams
// and tracking offsets in a Redis hash for resumption.
type RedisTarget struct {
client *redis.Client
topic string
log *slog.Logger
stopCh chan struct{}
doneCh chan struct{}
}

func NewRedisTarget(addr, topic string, log *slog.Logger) *RedisTarget {
return &RedisTarget{
client: redis.NewClient(&redis.Options{Addr: addr}),
topic: topic,
log: log,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
}

// GetHighWatermark reads persisted offsets from a Redis hash for the relay to resume from the last checkpoint.
func (r *RedisTarget) GetHighWatermark(ctx context.Context) (relay.Offsets, error) {
vals, err := r.client.HGetAll(ctx, watermarkKey).Result()
if err != nil {
return nil, err
}

offsets := make(map[int32]int64, len(vals))
for field, val := range vals {
p, _ := strconv.Atoi(field)
o, _ := strconv.ParseInt(val, 10, 64)
offsets[int32(p)] = o
}

return relay.Offsets{r.topic: offsets}, nil
}

// Start blocks until Close() is called.
func (r *RedisTarget) Start() error {
defer close(r.doneCh)
<-r.stopCh
return nil
}

// Write writes a message to Redis and updates the watermark offset.
func (r *RedisTarget) Write(ctx context.Context, msg relay.Message) error {
pipe := r.client.Pipeline()

pipe.XAdd(ctx, &redis.XAddArgs{
Stream: msg.Topic,
Values: map[string]interface{}{
"key": string(msg.Key),
"value": string(msg.Value),
},
})

// Persist the source offset for GetHighWatermark() (for relay resumption).
pipe.HSet(ctx, watermarkKey, fmt.Sprintf("%d", msg.Partition), msg.Offset)

_, err := pipe.Exec(ctx)
return err
}

// Close signals Start() to return and waits for it and closes the Redis client.
func (r *RedisTarget) Close() error {
close(r.stopCh)
<-r.doneCh
return r.client.Close()
}
```

Connect it to the relay.

```go
target := NewRedisTarget("localhost:6379", log)

srcPool, _ := relay.NewSourcePool(poolCfg, consumerCfgs, topic, nil, metricsSet, log)

r, _ := relay.NewRelay(relayCfg, srcPool, target, topic, nil, metricsSet, log)
r.Start(ctx)
```

## Metrics

Replication metrics are exposed through a HTTP server.
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/go-viper/mapstructure/v2 v2.5.0 h1:vM5IJoUAy3d7zRSVtIwQgBj7BiWtMPfmPEgAXnvj1Ro=
github.com/go-viper/mapstructure/v2 v2.5.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw=
github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpbo=
Expand All @@ -34,12 +32,8 @@ github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk=
github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ=
github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/twmb/franz-go v1.20.6 h1:TpQTt4QcixJ1cHEmQGPOERvTzo99s8jAutmS7rbSD6w=
github.com/twmb/franz-go v1.20.6/go.mod h1:u+FzH2sInp7b9HNVv2cZN8AxdXy6y/AQ1Bkptu4c0FM=
github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8XMQBEC+60=
github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0=
github.com/twmb/franz-go/pkg/kadm v1.17.2 h1:g5f1sAxnTkYC6G96pV5u715HWhxd66hWaDZUAQ8xHY8=
github.com/twmb/franz-go/pkg/kadm v1.17.2/go.mod h1:ST55zUB+sUS+0y+GcKY/Tf1XxgVilaFpB9I19UubLmU=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20241202133023-293b7c4c56bb h1:cNa7PaAvkAhYIOootH/5gRO9ljbI3MIObf5qU/PKFKY=
Expand Down
2 changes: 1 addition & 1 deletion init.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/knadh/koanf/v2"
flag "github.com/spf13/pflag"
"github.com/zerodha/kaf-relay/filter"
"github.com/zerodha/kaf-relay/internal/relay"
"github.com/zerodha/kaf-relay/pkg/relay"
)

func initFlags(ko *koanf.Koanf) {
Expand Down
Loading