-
Notifications
You must be signed in to change notification settings - Fork 29
pkg/beholder: add batch emitter service with service-engine lifecycle #2059
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
d3e2a0e
pkg/beholder: add batch emitter service and bump chipingress batching…
pkcll 0f95004
chore: bump chipingress dependency to publishBatch
pkcll 4601e78
refactor: remove context.Background() in batch emitter service and tests
pkcll b5776a5
chore: bump chipingress dependency to latest main
pkcll 7ec6e1a
Merge branch 'main' of github.com:smartcontractkit/chainlink-common i…
pkcll 3aa7eec
fix: use engine lifecycle context in batch emitter Start instead of s…
pkcll a2567d3
fix: use defaulted logger for legacy chip-ingress emitter to prevent …
pkcll 76bf8ce
Merge branch 'main' of github.com:smartcontractkit/chainlink-common i…
pkcll 65d0ba3
bump chipingress to main (bacfb6ba4146), gomodtidy
pkcll 38c1d35
refactor: use config structs for emitter/noop constructors preserving…
pkcll File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,211 @@ | ||
| package beholder | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "sync" | ||
|
|
||
| "go.opentelemetry.io/otel" | ||
| "go.opentelemetry.io/otel/attribute" | ||
| otelmetric "go.opentelemetry.io/otel/metric" | ||
|
|
||
| "github.com/smartcontractkit/chainlink-common/pkg/chipingress" | ||
| "github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch" | ||
| "github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
| "github.com/smartcontractkit/chainlink-common/pkg/services" | ||
| ) | ||
|
|
||
| // ChipIngressBatchEmitterService batches events and sends them via chipingress.Client.PublishBatch. | ||
| // It implements the Emitter interface. | ||
| type ChipIngressBatchEmitterService struct { | ||
| services.Service | ||
| eng *services.Engine | ||
|
|
||
| batchClient *batch.Client | ||
|
|
||
| metricAttrsCache sync.Map // map[string]otelmetric.MeasurementOption | ||
| metrics batchEmitterMetrics | ||
| } | ||
|
|
||
| type batchEmitterMetrics struct { | ||
| eventsSent otelmetric.Int64Counter | ||
| eventsDropped otelmetric.Int64Counter | ||
| } | ||
|
|
||
| // NewChipIngressBatchEmitterService creates a batch emitter service backed by the given chipingress client. | ||
| func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lggr logger.Logger) (*ChipIngressBatchEmitterService, error) { | ||
| if client == nil { | ||
| return nil, fmt.Errorf("chip ingress client is nil") | ||
| } | ||
|
|
||
| defaults := DefaultConfig() | ||
| bufferSize := int(cfg.ChipIngressBufferSize) | ||
| if bufferSize == 0 { | ||
| bufferSize = int(defaults.ChipIngressBufferSize) | ||
| } | ||
| maxBatchSize := int(cfg.ChipIngressMaxBatchSize) | ||
| if maxBatchSize == 0 { | ||
| maxBatchSize = int(defaults.ChipIngressMaxBatchSize) | ||
| } | ||
| maxConcurrentSends := cfg.ChipIngressMaxConcurrentSends | ||
| if maxConcurrentSends == 0 { | ||
| maxConcurrentSends = defaults.ChipIngressMaxConcurrentSends | ||
| } | ||
| sendInterval := cfg.ChipIngressSendInterval | ||
| if sendInterval == 0 { | ||
| sendInterval = defaults.ChipIngressSendInterval | ||
| } | ||
| sendTimeout := cfg.ChipIngressSendTimeout | ||
| if sendTimeout == 0 { | ||
| sendTimeout = defaults.ChipIngressSendTimeout | ||
| } | ||
| drainTimeout := cfg.ChipIngressDrainTimeout | ||
| if drainTimeout == 0 { | ||
| drainTimeout = defaults.ChipIngressDrainTimeout | ||
| } | ||
|
|
||
| meter := otel.Meter("beholder/chip_ingress_batch_emitter") | ||
| metrics, err := newBatchEmitterMetrics(meter) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to create batch emitter metrics: %w", err) | ||
| } | ||
|
|
||
| batchClient, err := batch.NewBatchClient(client, | ||
| batch.WithBatchSize(maxBatchSize), | ||
| batch.WithMessageBuffer(bufferSize), | ||
| batch.WithBatchInterval(sendInterval), | ||
| batch.WithMaxPublishTimeout(sendTimeout), | ||
| batch.WithShutdownTimeout(drainTimeout), | ||
| batch.WithMaxConcurrentSends(maxConcurrentSends), | ||
| batch.WithEventClone(false), | ||
| ) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to create batch client: %w", err) | ||
| } | ||
|
|
||
| e := &ChipIngressBatchEmitterService{ | ||
| batchClient: batchClient, | ||
| metrics: metrics, | ||
| } | ||
|
|
||
| e.Service, e.eng = services.Config{ | ||
| Name: "ChipIngressBatchEmitterService", | ||
| Start: e.start, | ||
| Close: e.stop, | ||
| }.NewServiceEngine(lggr) | ||
|
|
||
| return e, nil | ||
| } | ||
|
|
||
| func (e *ChipIngressBatchEmitterService) start(_ context.Context) error { | ||
| // Do not pass the startup ctx — the services contract forbids retaining it | ||
| // after Start returns. Use the engine's lifecycle context so the batcher | ||
| // is cancelled when the service shuts down (StopChan closes before stop() runs). | ||
| ctx, _ := e.eng.NewCtx() | ||
| e.batchClient.Start(ctx) | ||
| return nil | ||
| } | ||
|
|
||
| func (e *ChipIngressBatchEmitterService) stop() error { | ||
| e.batchClient.Stop() | ||
| return nil | ||
| } | ||
|
|
||
| // Emit queues an event for batched delivery without blocking. | ||
| // Returns an error if the emitter is stopped or the context is cancelled. | ||
| // If the buffer is full, the event is silently dropped. | ||
| func (e *ChipIngressBatchEmitterService) Emit(ctx context.Context, body []byte, attrKVs ...any) error { | ||
| return e.emitInternal(ctx, body, nil, attrKVs...) | ||
| } | ||
|
|
||
| // EmitWithCallback works like Emit but invokes callback once the event's fate | ||
| // is determined (nil on success, non-nil on failure or buffer-full drop). | ||
| // | ||
| // If EmitWithCallback returns a non-nil error, the callback will NOT be invoked. | ||
| // If it returns nil, the callback is guaranteed to fire exactly once. | ||
| func (e *ChipIngressBatchEmitterService) EmitWithCallback(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error { | ||
| return e.emitInternal(ctx, body, callback, attrKVs...) | ||
| } | ||
|
|
||
| func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error { | ||
| return e.eng.IfStarted(func() error { | ||
| domain, entity, err := ExtractSourceAndType(attrKVs...) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| attributes := newAttributes(attrKVs...) | ||
|
|
||
| event, err := chipingress.NewEvent(domain, entity, body, attributes) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create CloudEvent: %w", err) | ||
| } | ||
| eventPb, err := chipingress.EventToProto(event) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to convert to proto: %w", err) | ||
| } | ||
|
|
||
| if err := ctx.Err(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| metricAttrs := e.metricAttrsFor(domain, entity) | ||
|
|
||
| queueErr := e.batchClient.QueueMessage(eventPb, func(sendErr error) { | ||
| // The callback fires asynchronously after the batch is sent, | ||
| // so the caller's ctx may already be cancelled. Use ctx directly | ||
| // for metric recording — OTel Add is non-blocking and tolerates | ||
| // cancelled contexts. | ||
| if sendErr != nil { | ||
| e.metrics.eventsDropped.Add(ctx, 1, metricAttrs) | ||
| } else { | ||
| e.metrics.eventsSent.Add(ctx, 1, metricAttrs) | ||
| } | ||
| if callback != nil { | ||
| callback(sendErr) | ||
| } | ||
| }) | ||
| if queueErr != nil { | ||
| e.metrics.eventsDropped.Add(ctx, 1, metricAttrs) | ||
| if callback != nil { | ||
| callback(queueErr) | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| }) | ||
| } | ||
|
|
||
| func (e *ChipIngressBatchEmitterService) metricAttrsFor(domain, entity string) otelmetric.MeasurementOption { | ||
| key := domain + "\x00" + entity | ||
| if v, ok := e.metricAttrsCache.Load(key); ok { | ||
| return v.(otelmetric.MeasurementOption) | ||
| } | ||
| attrs := otelmetric.WithAttributeSet(attribute.NewSet( | ||
| attribute.String("domain", domain), | ||
| attribute.String("entity", entity), | ||
| )) | ||
| v, _ := e.metricAttrsCache.LoadOrStore(key, attrs) | ||
| return v.(otelmetric.MeasurementOption) | ||
| } | ||
|
|
||
| func newBatchEmitterMetrics(meter otelmetric.Meter) (batchEmitterMetrics, error) { | ||
| eventsSent, err := meter.Int64Counter("chip_ingress.events_sent", | ||
| otelmetric.WithDescription("Total events successfully sent via PublishBatch"), | ||
| otelmetric.WithUnit("{event}")) | ||
| if err != nil { | ||
| return batchEmitterMetrics{}, err | ||
| } | ||
|
|
||
| eventsDropped, err := meter.Int64Counter("chip_ingress.events_dropped", | ||
| otelmetric.WithDescription("Total events dropped (buffer full or send failure)"), | ||
| otelmetric.WithUnit("{event}")) | ||
| if err != nil { | ||
| return batchEmitterMetrics{}, err | ||
| } | ||
|
|
||
| return batchEmitterMetrics{ | ||
| eventsSent: eventsSent, | ||
| eventsDropped: eventsDropped, | ||
| }, nil | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.