Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/scylladb/go-reflectx v1.0.1
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chain-selectors v1.0.89
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260515172105-f60f14be40ad
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

207 changes: 207 additions & 0 deletions pkg/beholder/batch_emitter_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package beholder

import (
"context"
"fmt"
"sync"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

// ChipIngressBatchEmitterService batches events and sends them via chipingress.Client.PublishBatch.
// It implements the Emitter interface.
type ChipIngressBatchEmitterService struct {
services.Service
eng *services.Engine

batchClient *batch.Client

metricAttrsCache sync.Map // map[string]otelmetric.MeasurementOption
metrics batchEmitterMetrics
}

type batchEmitterMetrics struct {
eventsSent otelmetric.Int64Counter
eventsDropped otelmetric.Int64Counter
}

// NewChipIngressBatchEmitterService creates a batch emitter service backed by the given chipingress client.
func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lggr logger.Logger) (*ChipIngressBatchEmitterService, error) {
if client == nil {
return nil, fmt.Errorf("chip ingress client is nil")
}

defaults := DefaultConfig()
bufferSize := int(cfg.ChipIngressBufferSize)
if bufferSize == 0 {
bufferSize = int(defaults.ChipIngressBufferSize)
}
maxBatchSize := int(cfg.ChipIngressMaxBatchSize)
if maxBatchSize == 0 {
maxBatchSize = int(defaults.ChipIngressMaxBatchSize)
}
maxConcurrentSends := cfg.ChipIngressMaxConcurrentSends
if maxConcurrentSends == 0 {
maxConcurrentSends = defaults.ChipIngressMaxConcurrentSends
}
sendInterval := cfg.ChipIngressSendInterval
if sendInterval == 0 {
sendInterval = defaults.ChipIngressSendInterval
}
sendTimeout := cfg.ChipIngressSendTimeout
if sendTimeout == 0 {
sendTimeout = defaults.ChipIngressSendTimeout
}
drainTimeout := cfg.ChipIngressDrainTimeout
if drainTimeout == 0 {
drainTimeout = defaults.ChipIngressDrainTimeout
}

meter := otel.Meter("beholder/chip_ingress_batch_emitter")
metrics, err := newBatchEmitterMetrics(meter)
if err != nil {
return nil, fmt.Errorf("failed to create batch emitter metrics: %w", err)
}

batchClient, err := batch.NewBatchClient(client,
batch.WithBatchSize(maxBatchSize),
batch.WithMessageBuffer(bufferSize),
batch.WithBatchInterval(sendInterval),
batch.WithMaxPublishTimeout(sendTimeout),
batch.WithShutdownTimeout(drainTimeout),
batch.WithMaxConcurrentSends(maxConcurrentSends),
batch.WithEventClone(false),
)
if err != nil {
return nil, fmt.Errorf("failed to create batch client: %w", err)
}

e := &ChipIngressBatchEmitterService{
batchClient: batchClient,
metrics: metrics,
}

e.Service, e.eng = services.Config{
Name: "ChipIngressBatchEmitterService",
Start: e.start,
Close: e.stop,
}.NewServiceEngine(lggr)

return e, nil
}

func (e *ChipIngressBatchEmitterService) start(ctx context.Context) error {
e.batchClient.Start(ctx)
Comment thread
jmank88 marked this conversation as resolved.
return nil
}

func (e *ChipIngressBatchEmitterService) stop() error {
e.batchClient.Stop()
return nil
}

// Emit queues an event for batched delivery without blocking.
// Returns an error if the emitter is stopped or the context is cancelled.
// If the buffer is full, the event is silently dropped.
func (e *ChipIngressBatchEmitterService) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
return e.emitInternal(ctx, body, nil, attrKVs...)
}

// EmitWithCallback works like Emit but invokes callback once the event's fate
// is determined (nil on success, non-nil on failure or buffer-full drop).
//
// If EmitWithCallback returns a non-nil error, the callback will NOT be invoked.
// If it returns nil, the callback is guaranteed to fire exactly once.
func (e *ChipIngressBatchEmitterService) EmitWithCallback(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error {
return e.emitInternal(ctx, body, callback, attrKVs...)
}

func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error {
return e.eng.IfStarted(func() error {
domain, entity, err := ExtractSourceAndType(attrKVs...)
if err != nil {
return err
}

attributes := newAttributes(attrKVs...)

event, err := chipingress.NewEvent(domain, entity, body, attributes)
if err != nil {
return fmt.Errorf("failed to create CloudEvent: %w", err)
}
eventPb, err := chipingress.EventToProto(event)
if err != nil {
return fmt.Errorf("failed to convert to proto: %w", err)
}

if err := ctx.Err(); err != nil {
return err
}

metricAttrs := e.metricAttrsFor(domain, entity)

queueErr := e.batchClient.QueueMessage(eventPb, func(sendErr error) {
// The callback fires asynchronously after the batch is sent,
// so the caller's ctx may already be cancelled. Use ctx directly
// for metric recording — OTel Add is non-blocking and tolerates
// cancelled contexts.
if sendErr != nil {
e.metrics.eventsDropped.Add(ctx, 1, metricAttrs)
} else {
e.metrics.eventsSent.Add(ctx, 1, metricAttrs)
}
if callback != nil {
callback(sendErr)
}
})
if queueErr != nil {
e.metrics.eventsDropped.Add(ctx, 1, metricAttrs)
if callback != nil {
callback(queueErr)
}
}

return nil
})
}

func (e *ChipIngressBatchEmitterService) metricAttrsFor(domain, entity string) otelmetric.MeasurementOption {
key := domain + "\x00" + entity
if v, ok := e.metricAttrsCache.Load(key); ok {
return v.(otelmetric.MeasurementOption)
}
attrs := otelmetric.WithAttributeSet(attribute.NewSet(
attribute.String("domain", domain),
attribute.String("entity", entity),
))
v, _ := e.metricAttrsCache.LoadOrStore(key, attrs)
return v.(otelmetric.MeasurementOption)
}

func newBatchEmitterMetrics(meter otelmetric.Meter) (batchEmitterMetrics, error) {
eventsSent, err := meter.Int64Counter("chip_ingress.events_sent",
otelmetric.WithDescription("Total events successfully sent via PublishBatch"),
otelmetric.WithUnit("{event}"))
if err != nil {
return batchEmitterMetrics{}, err
}

eventsDropped, err := meter.Int64Counter("chip_ingress.events_dropped",
otelmetric.WithDescription("Total events dropped (buffer full or send failure)"),
otelmetric.WithUnit("{event}"))
if err != nil {
return batchEmitterMetrics{}, err
}

return batchEmitterMetrics{
eventsSent: eventsSent,
eventsDropped: eventsDropped,
}, nil
}
Loading
Loading