Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 231 additions & 36 deletions pkg/chipingress/batch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package batch
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"

cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

Expand All @@ -29,6 +33,7 @@ type seqnumKey struct {
type Client struct {
client chipingress.Client
batchSize int
maxGRPCRequestSize int
cloneEvent bool
maxConcurrentSends chan struct{}
batchInterval time.Duration
Expand All @@ -42,6 +47,20 @@ type Client struct {
batcherDone chan struct{}
cancelBatcher context.CancelFunc
counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop()

metrics batchClientMetrics
}

type batchClientMetrics struct {
sendRequestsTotal otelmetric.Int64Counter
requestSizeMessages otelmetric.Int64Histogram
requestSizeBytes otelmetric.Int64Histogram
requestLatencyMS otelmetric.Float64Histogram
configInfo otelmetric.Int64Gauge
batchSizeAttr otelmetric.MeasurementOption
maxGRPCReqSizeAttr otelmetric.MeasurementOption
successStatusAttr otelmetric.MeasurementOption
failureStatusAttr otelmetric.MeasurementOption
}

// Opt is a functional option for configuring the batch Client.
Expand All @@ -53,6 +72,7 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) {
client: client,
log: zap.NewNop().Sugar(),
batchSize: 10,
maxGRPCRequestSize: 10 * 1024 * 1024,
cloneEvent: true,
maxConcurrentSends: make(chan struct{}, 1),
messageBuffer: make(chan *messageWithCallback, 200),
Expand All @@ -68,11 +88,19 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) {
opt(c)
}

var err error
c.metrics, err = newBatchClientMetrics()
if err != nil {
return nil, err
}

return c, nil
}

// Start begins processing messages from the queue and sending them in batches
func (b *Client) Start(ctx context.Context) {
b.metrics.recordConfig(ctx, b)

// Create a cancellable context for the batcher
batcherCtx, cancel := context.WithCancel(ctx)
b.cancelBatcher = cancel
Expand Down Expand Up @@ -110,34 +138,45 @@ func (b *Client) Start(ctx context.Context) {
// Forcibly shutdowns down after timeout if not completed.
func (b *Client) Stop() {
b.shutdownOnce.Do(func() {
ctx, cancel := b.stopCh.CtxWithTimeout(b.shutdownTimeout)
// Use a standalone timeout context so the shutdown wait isn't cancelled
// by close(b.stopCh) below.
ctx, cancel := context.WithTimeout(context.Background(), b.shutdownTimeout)
defer cancel()

if b.cancelBatcher != nil {
started := b.cancelBatcher != nil
if started {
b.cancelBatcher()
}
close(b.stopCh)

done := make(chan struct{})
go func() {
<-b.batcherDone
for range cap(b.maxConcurrentSends) {
b.maxConcurrentSends <- struct{}{}
}
// wait for all callbacks to complete
b.callbackWg.Wait()
close(done)
}()
// Only wait for the batcher goroutine when Start() was called;
// otherwise batcherDone is never closed and we'd block until timeout.
if started {
done := make(chan struct{})
go func() {
<-b.batcherDone
for range cap(b.maxConcurrentSends) {
b.maxConcurrentSends <- struct{}{}
}
// wait for all callbacks to complete
b.callbackWg.Wait()
close(done)
}()

select {
case <-done:
// All successfully shutdown
case <-ctx.Done(): // timeout or context cancelled
b.log.Warnw("timed out waiting for shutdown to finish, force closing", "timeout", b.shutdownTimeout)
select {
case <-done:
// All successfully shutdown
case <-ctx.Done(): // timeout or context cancelled
b.log.Warnw("timed out waiting for shutdown to finish, force closing", "timeout", b.shutdownTimeout)
}
}

// Release per-stream seqnum state to avoid unbounded growth from high-cardinality source/type values.
b.clearCounters()

if err := b.client.Close(); err != nil {
b.log.Warnw("failed to close chip ingress client", "error", err)
Comment on lines +177 to +178
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't need a nil guard

}
})
}

Expand Down Expand Up @@ -222,37 +261,94 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback)

go func() {
defer func() { <-b.maxConcurrentSends }()
// this is specifically to prevent long running network calls
ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout)
defer cancel()

events := make([]*chipingress.CloudEventPb, len(messages))
for i, msg := range messages {
events[i] = msg.event
}
_, err := b.client.PublishBatch(ctxTimeout, &chipingress.CloudEventBatch{Events: events})
if err != nil {
b.log.Errorw("failed to publish batch", "error", err)
}
// the callbacks are placed in their own goroutine to not block releasing the semaphore
// we use a wait group, to ensure all callbacks are completed if .Stop() is called.
b.callbackWg.Go(func() {
for _, msg := range messages {
if msg.callback != nil {
msg.callback(err)
}
for _, batchMessages := range splitMessagesByRequestSize(messages, b.maxGRPCRequestSize) {
batchReq, batchBytes := newBatchRequest(batchMessages)
if b.maxGRPCRequestSize > 0 && batchBytes > b.maxGRPCRequestSize {
err := fmt.Errorf("publish batch serialized size %d exceeds max gRPC request size %d", batchBytes, b.maxGRPCRequestSize)
b.metrics.recordSend(context.Background(), len(batchMessages), batchBytes, 0, false)
Comment thread
jmank88 marked this conversation as resolved.
Outdated
b.log.Errorw("failed to publish batch", "error", err)
b.completeBatchCallbacks(batchMessages, err)
continue
}

// this is specifically to prevent long running network calls
ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout)
startedAt := time.Now()
_, err := b.client.PublishBatch(ctxTimeout, batchReq)
cancel()

b.metrics.recordSend(context.Background(), len(batchMessages), batchBytes, time.Since(startedAt), err == nil)
if err != nil {
b.log.Errorw("failed to publish batch", "error", err)
}
})
b.completeBatchCallbacks(batchMessages, err)
}
}()
}

func (b *Client) completeBatchCallbacks(messages []*messageWithCallback, err error) {
callbackMessages, callbackErr := messages, err
// the callbacks are placed in their own goroutine to not block releasing the semaphore
// we use a wait group, to ensure all callbacks are completed if .Stop() is called.
b.callbackWg.Go(func() {
for _, msg := range callbackMessages {
if msg.callback != nil {
msg.callback(callbackErr)
}
}
})
}

func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize int) [][]*messageWithCallback {
if len(messages) == 0 {
return nil
}
if maxRequestSize <= 0 {
return [][]*messageWithCallback{messages}
}

var batches [][]*messageWithCallback
current := make([]*messageWithCallback, 0, len(messages))
for _, msg := range messages {
candidate := append(current, msg)
_, candidateBytes := newBatchRequest(candidate)
if len(current) > 0 && candidateBytes > maxRequestSize {
batches = append(batches, current)
current = []*messageWithCallback{msg}
continue
}
current = candidate
}
Comment on lines +311 to +322
if len(current) > 0 {
batches = append(batches, current)
}
return batches
}

func newBatchRequest(messages []*messageWithCallback) (*chipingress.CloudEventBatch, int) {
events := make([]*chipingress.CloudEventPb, len(messages))
for i, msg := range messages {
events[i] = msg.event
}
batchReq := &chipingress.CloudEventBatch{Events: events}
return batchReq, proto.Size(batchReq)
}

// WithBatchSize sets the number of messages to accumulate before sending a batch
func WithBatchSize(batchSize int) Opt {
return func(c *Client) {
c.batchSize = batchSize
}
}

// WithMaxGRPCRequestSize sets the max gRPC request size in bytes used for metric comparison attributes.
func WithMaxGRPCRequestSize(maxReqSize int) Opt {
return func(c *Client) {
c.maxGRPCRequestSize = maxReqSize
}
}

// WithEventClone controls whether QueueMessage clones events before stamping seqnum and buffering.
// Defaults to true for safety when caller reuses event pointers.
func WithEventClone(clone bool) Opt {
Expand Down Expand Up @@ -302,3 +398,102 @@ func WithLogger(log *zap.SugaredLogger) Opt {
c.log = log
}
}

func newBatchClientMetrics() (batchClientMetrics, error) {
meter := otel.Meter("chipingress/batch_client")
sendRequestsTotal, err := meter.Int64Counter(
"chip_ingress.batch.send_requests_total",
otelmetric.WithDescription("Total PublishBatch requests sent by batch client"),
otelmetric.WithUnit("{request}"),
)
if err != nil {
return batchClientMetrics{}, err
}
requestSizeMessages, err := meter.Int64Histogram(
"chip_ingress.batch.request_size_messages",
otelmetric.WithDescription("PublishBatch request size measured in number of events"),
otelmetric.WithUnit("{event}"),
)
if err != nil {
return batchClientMetrics{}, err
}
requestSizeBytes, err := meter.Int64Histogram(
"chip_ingress.batch.request_size_bytes",
otelmetric.WithDescription("PublishBatch request size measured in bytes"),
otelmetric.WithUnit("By"),
)
if err != nil {
return batchClientMetrics{}, err
}
requestLatencyMS, err := meter.Float64Histogram(
"chip_ingress.batch.request_latency_ms",
otelmetric.WithDescription("PublishBatch end-to-end latency in milliseconds"),
otelmetric.WithUnit("ms"),
)
if err != nil {
return batchClientMetrics{}, err
}
configInfo, err := meter.Int64Gauge(
"chip_ingress.batch.config.info",
otelmetric.WithDescription("Batch client configuration info metric"),
otelmetric.WithUnit("{info}"),
)
if err != nil {
return batchClientMetrics{}, err
}

return batchClientMetrics{
sendRequestsTotal: sendRequestsTotal,

requestSizeMessages: requestSizeMessages,
requestSizeBytes: requestSizeBytes,
requestLatencyMS: requestLatencyMS,
configInfo: configInfo,
successStatusAttr: otelmetric.WithAttributeSet(attribute.NewSet(
attribute.String("status", "success"),
)),
failureStatusAttr: otelmetric.WithAttributeSet(attribute.NewSet(
attribute.String("status", "failure"),
)),
}, nil
}

func (m *batchClientMetrics) recordConfig(ctx context.Context, c *Client) {
m.batchSizeAttr = otelmetric.WithAttributeSet(attribute.NewSet(
attribute.Int("max_batch_size", c.batchSize),
))
m.maxGRPCReqSizeAttr = otelmetric.WithAttributeSet(attribute.NewSet(
attribute.Int("max_grpc_request_size_bytes", c.maxGRPCRequestSize),
))
m.configInfo.Record(ctx, 1, otelmetric.WithAttributes(
attribute.Int("max_batch_size", c.batchSize),
attribute.Int("message_buffer_size", cap(c.messageBuffer)),
attribute.Int("max_concurrent_sends", cap(c.maxConcurrentSends)),
attribute.Int64("batch_interval_ms", c.batchInterval.Milliseconds()),
attribute.Int64("max_publish_timeout_ms", c.maxPublishTimeout.Milliseconds()),
attribute.Int64("shutdown_timeout_ms", c.shutdownTimeout.Milliseconds()),
attribute.Bool("clone_event", c.cloneEvent),
attribute.Int("max_grpc_request_size_bytes", c.maxGRPCRequestSize),
))
}

func (m *batchClientMetrics) recordSend(ctx context.Context, messageCount int, requestBytes int, latency time.Duration, success bool) {
statusAttr := m.successStatusAttr
if !success {
statusAttr = m.failureStatusAttr
}
m.sendRequestsTotal.Add(ctx, 1, statusAttr)

messageSizeOpts := []otelmetric.RecordOption{}
if m.batchSizeAttr != nil {
messageSizeOpts = append(messageSizeOpts, m.batchSizeAttr)
}
requestSizeOpts := []otelmetric.RecordOption{}
if m.maxGRPCReqSizeAttr != nil {
requestSizeOpts = append(requestSizeOpts, m.maxGRPCReqSizeAttr)
}

m.requestSizeMessages.Record(ctx, int64(messageCount), messageSizeOpts...)
m.requestSizeBytes.Record(ctx, int64(requestBytes), requestSizeOpts...)
m.requestLatencyMS.Record(ctx, float64(latency)/float64(time.Millisecond), statusAttr)
}
Loading
Loading