From 038fe22078bcfc78fa459baef0021b0cbaa2ed32 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 19 May 2026 14:56:33 -0400 Subject: [PATCH 01/14] LOOP support --- pkg/beholder/durable_event_store_pg.go | 203 +++++++++++++++++++++++++ pkg/loop/config.go | 31 ++-- pkg/loop/server.go | 86 ++++++++--- 3 files changed, 283 insertions(+), 37 deletions(-) create mode 100644 pkg/beholder/durable_event_store_pg.go diff --git a/pkg/beholder/durable_event_store_pg.go b/pkg/beholder/durable_event_store_pg.go new file mode 100644 index 0000000000..ad41337004 --- /dev/null +++ b/pkg/beholder/durable_event_store_pg.go @@ -0,0 +1,203 @@ +package beholder + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/lib/pq" + + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" +) + +const chipDurableEventsTable = "cre.chip_durable_events" + +// PgDurableEventStore is a Postgres-backed implementation of DurableEventStore. +type PgDurableEventStore struct { + ds sqlutil.DataSource +} + +var ( + _ DurableEventStore = (*PgDurableEventStore)(nil) + _ DurableQueueObserver = (*PgDurableEventStore)(nil) + _ BatchInserter = (*PgDurableEventStore)(nil) +) + +func NewPgDurableEventStore(ds sqlutil.DataSource) *PgDurableEventStore { + return &PgDurableEventStore{ds: ds} +} + +func (s *PgDurableEventStore) Insert(ctx context.Context, payload []byte) (int64, error) { + const q = `INSERT INTO ` + chipDurableEventsTable + ` (payload) VALUES ($1) RETURNING id` + var id int64 + if err := s.ds.GetContext(ctx, &id, q, payload); err != nil { + return 0, fmt.Errorf("failed to insert chip durable event: %w", err) + } + return id, nil +} + +func (s *PgDurableEventStore) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error) { + if len(payloads) == 0 { + return nil, nil + } + placeholders := make([]string, len(payloads)) + args := make([]interface{}, len(payloads)) + for i, p := range payloads { + placeholders[i] = fmt.Sprintf("($%d)", i+1) + args[i] = p + } + q := fmt.Sprintf( + "INSERT INTO %s (payload) VALUES %s RETURNING id", + chipDurableEventsTable, + strings.Join(placeholders, ","), + ) + + var ids []int64 + if err := s.ds.SelectContext(ctx, &ids, q, args...); err != nil { + return nil, fmt.Errorf("failed to batch insert chip durable events: %w", err) + } + return ids, nil +} + +func (s *PgDurableEventStore) Delete(ctx context.Context, id int64) error { + const q = `DELETE FROM ` + chipDurableEventsTable + ` WHERE id = $1` + if _, err := s.ds.ExecContext(ctx, q, id); err != nil { + return fmt.Errorf("failed to delete chip durable event id=%d: %w", id, err) + } + return nil +} + +func (s *PgDurableEventStore) MarkDelivered(ctx context.Context, id int64) error { + const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = $1 AND delivered_at IS NULL` + if _, err := s.ds.ExecContext(ctx, q, id); err != nil { + return fmt.Errorf("failed to mark chip durable event delivered id=%d: %w", id, err) + } + return nil +} + +func (s *PgDurableEventStore) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error) { + if len(ids) == 0 { + return 0, nil + } + const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = ANY($1) AND delivered_at IS NULL` + res, err := s.ds.ExecContext(ctx, q, pq.Array(ids)) + if err != nil { + return 0, fmt.Errorf("failed to batch mark chip durable events delivered: %w", err) + } + n, _ := res.RowsAffected() + return n, nil +} + +func (s *PgDurableEventStore) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) { + if batchLimit <= 0 { + return 0, nil + } + const q = ` +WITH picked AS ( + SELECT id FROM ` + chipDurableEventsTable + ` + WHERE delivered_at IS NOT NULL + ORDER BY delivered_at ASC + LIMIT $1 +) +DELETE FROM ` + chipDurableEventsTable + ` AS t +USING picked WHERE t.id = picked.id` + res, err := s.ds.ExecContext(ctx, q, batchLimit) + if err != nil { + return 0, fmt.Errorf("failed to purge delivered chip durable events: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return 0, fmt.Errorf("purge delivered rows affected: %w", err) + } + return n, nil +} + +func (s *PgDurableEventStore) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error) { + const q = ` +SELECT id, payload, created_at +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL + AND created_at < $1 +ORDER BY created_at ASC +LIMIT $2` + + type row struct { + ID int64 `db:"id"` + Payload []byte `db:"payload"` + CreatedAt time.Time `db:"created_at"` + } + + var rows []row + if err := s.ds.SelectContext(ctx, &rows, q, createdBefore, limit); err != nil { + return nil, fmt.Errorf("failed to list pending chip durable events: %w", err) + } + + out := make([]DurableEvent, 0, len(rows)) + for _, r := range rows { + out = append(out, DurableEvent{ + ID: r.ID, + Payload: r.Payload, + CreatedAt: r.CreatedAt, + }) + } + return out, nil +} + +func (s *PgDurableEventStore) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) { + const q = ` +WITH deleted AS ( + DELETE FROM ` + chipDurableEventsTable + ` + WHERE created_at <= now() - $1::interval + RETURNING id +) +SELECT count(*) FROM deleted` + + var count int64 + if err := s.ds.GetContext(ctx, &count, q, ttl.String()); err != nil { + return 0, fmt.Errorf("failed to delete expired chip durable events: %w", err) + } + return count, nil +} + +type chipDurableQueueAgg struct { + Cnt int64 `db:"cnt"` + PayloadSum int64 `db:"payload_sum"` + MinCreated *time.Time `db:"min_created"` +} + +// ObserveDurableQueue implements DurableQueueObserver for queue depth / age gauges. +func (s *PgDurableEventStore) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error) { + const qAgg = ` +SELECT + count(*)::bigint AS cnt, + coalesce(sum(octet_length(payload)), 0)::bigint AS payload_sum, + min(created_at) AS min_created +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL` + + var row chipDurableQueueAgg + if err := s.ds.GetContext(ctx, &row, qAgg); err != nil { + return DurableQueueStats{}, fmt.Errorf("durable queue aggregate: %w", err) + } + var st DurableQueueStats + st.Depth = row.Cnt + st.PayloadBytes = row.PayloadSum + if row.MinCreated != nil { + st.OldestPendingAge = time.Since(*row.MinCreated) + } + if eventTTL > 0 && nearExpiryLead > 0 && nearExpiryLead < eventTTL { + ttlSec := int64(eventTTL.Round(time.Second) / time.Second) + leadSec := int64(nearExpiryLead.Round(time.Second) / time.Second) + const qNear = ` +SELECT count(*)::bigint +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL + AND created_at >= now() - ($1::bigint * interval '1 second') + AND created_at < now() - (($1::bigint - $2::bigint) * interval '1 second')` + if err := s.ds.GetContext(ctx, &st.NearTTLCount, qNear, ttlSec, leadSec); err != nil { + return DurableQueueStats{}, fmt.Errorf("durable queue near-ttl: %w", err) + } + } + return st, nil +} diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 966be68e66..78e1c1308e 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -84,9 +84,9 @@ const ( envTelemetryMetricCompressor = "CL_TELEMETRY_METRIC_COMPRESSOR" envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR" - envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" - envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" - envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" + envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" + envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" + envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" envCRESettings = cresettings.EnvNameSettings envCRESettingsDefault = cresettings.EnvNameSettingsDefault @@ -97,9 +97,10 @@ const ( type EnvConfig struct { AppID string - ChipIngressEndpoint string - ChipIngressInsecureConnection bool - ChipIngressBatchEmitterEnabled bool + ChipIngressEndpoint string + ChipIngressInsecureConnection bool + ChipIngressBatchEmitterEnabled bool + ChipIngressDurableEmitterEnabled bool CRESettings string CRESettingsDefault string @@ -139,15 +140,15 @@ type EnvConfig struct { PyroscopePPROFBlockProfileRate int PyroscopePPROFMutexProfileFraction int - TelemetryEnabled bool - TelemetryEndpoint string - TelemetryInsecureConnection bool - TelemetryCACertFile string - TelemetryAttributes OtelAttributes - TelemetryTraceSampleRatio float64 - TelemetryAuthHeaders map[string]string - TelemetryAuthPubKeyHex string - TelemetryAuthHeadersTTL time.Duration + TelemetryEnabled bool + TelemetryEndpoint string + TelemetryInsecureConnection bool + TelemetryCACertFile string + TelemetryAttributes OtelAttributes + TelemetryTraceSampleRatio float64 + TelemetryAuthHeaders map[string]string + TelemetryAuthPubKeyHex string + TelemetryAuthHeadersTTL time.Duration // TelemetryEmitterBatchProcessor maps to beholder Config.EmitterBatchProcessor // (batched async custom-message export vs immediate per-record export). TelemetryEmitterBatchProcessor bool diff --git a/pkg/loop/server.go b/pkg/loop/server.go index 307603d502..7a8834a7b3 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -148,6 +148,28 @@ func (s *Server) start(opts ...ServerOpt) error { OnDialError: func(err error) { s.Logger.Errorw("Failed to dial", "err", err) }, } + if s.EnvConfig.DatabaseURL != nil { + pg.SetApplicationName(s.EnvConfig.DatabaseURL.URL(), build.Program) + dbURL := s.EnvConfig.DatabaseURL.URL().String() + var err error + s.db, err = pg.DBConfig{ + IdleInTxSessionTimeout: s.EnvConfig.DatabaseIdleInTxSessionTimeout, + LockTimeout: s.EnvConfig.DatabaseLockTimeout, + MaxOpenConns: s.EnvConfig.DatabaseMaxOpenConns, + MaxIdleConns: s.EnvConfig.DatabaseMaxIdleConns, + EnableTracing: s.EnvConfig.DatabaseTracingEnabled, + }.New(ctx, dbURL, pg.DriverPostgres) + if err != nil { + return fmt.Errorf("error connecting to DataBase: %w", err) + } + s.DataSource = sqlutil.WrapDataSource(s.db, s.Logger, + sqlutil.TimeoutHook(func() time.Duration { return s.EnvConfig.DatabaseQueryTimeout }), + sqlutil.MonitorHook(func() bool { return s.EnvConfig.DatabaseLogSQL })) + + s.dbStatsReporter = pg.NewStatsReporter(s.db.Stats, s.Logger) + s.dbStatsReporter.Start() + } + if s.EnvConfig.TelemetryEndpoint == "" { err := SetupTracing(tracingConfig) if err != nil { @@ -218,6 +240,12 @@ func (s *Server) start(opts ...ServerOpt) error { if err := s.startBeholderClient(ctx, beholderCfg); err != nil { return err } + + if s.EnvConfig.ChipIngressDurableEmitterEnabled { + if err := s.setupDurableEmitter(ctx); err != nil { + return err + } + } } if addr := s.EnvConfig.PyroscopeServerAddress; addr != "" { @@ -301,28 +329,6 @@ func (s *Server) start(opts ...ServerOpt) error { return fmt.Errorf("error starting health checker: %w", err) } - if s.EnvConfig.DatabaseURL != nil { - pg.SetApplicationName(s.EnvConfig.DatabaseURL.URL(), build.Program) - dbURL := s.EnvConfig.DatabaseURL.URL().String() - var err error - s.db, err = pg.DBConfig{ - IdleInTxSessionTimeout: s.EnvConfig.DatabaseIdleInTxSessionTimeout, - LockTimeout: s.EnvConfig.DatabaseLockTimeout, - MaxOpenConns: s.EnvConfig.DatabaseMaxOpenConns, - MaxIdleConns: s.EnvConfig.DatabaseMaxIdleConns, - EnableTracing: s.EnvConfig.DatabaseTracingEnabled, - }.New(ctx, dbURL, pg.DriverPostgres) - if err != nil { - return fmt.Errorf("error connecting to DataBase: %w", err) - } - s.DataSource = sqlutil.WrapDataSource(s.db, s.Logger, - sqlutil.TimeoutHook(func() time.Duration { return s.EnvConfig.DatabaseQueryTimeout }), - sqlutil.MonitorHook(func() bool { return s.EnvConfig.DatabaseLogSQL })) - - s.dbStatsReporter = pg.NewStatsReporter(s.db.Stats, s.Logger) - s.dbStatsReporter.Start() - } - s.LimitsFactory.Logger = s.Logger.Named("LimitsFactory") if bc := beholder.GetClient(); bc != nil { s.LimitsFactory.Meter = bc.Meter @@ -364,6 +370,42 @@ func (s *Server) startBeholderClient(ctx context.Context, beholderCfg beholder.C return nil } +func (s *Server) setupDurableEmitter(ctx context.Context) error { + if s.beholderClient == nil { + return fmt.Errorf("beholder client not initialized") + } + + chipClient := s.beholderClient.Chip + if chipClient == nil { + return fmt.Errorf("chip ingress client not available") + } + + if s.DataSource == nil { + return fmt.Errorf("durable emitter requires a database connection: set CL_DATABASE_URL") + } + + pgStore := beholder.NewPgDurableEventStore(s.DataSource) + durableCfg := beholder.DefaultDurableEmitterConfig() + durableEmitter, err := beholder.NewDurableEmitter(pgStore, chipClient, false, durableCfg, s.Logger) + if err != nil { + return fmt.Errorf("failed to create durable emitter: %w", err) + } + + // Build a new DualSourceEmitter: durable chip + OTLP. + messageLogger := s.beholderClient.MessageLoggerProvider.Logger("durable-emitter") + otlpEmitter := beholder.NewMessageEmitter(messageLogger) + dualEmitter, err := beholder.NewDualSourceEmitter(durableEmitter, otlpEmitter) + if err != nil { + return fmt.Errorf("failed to create dual source emitter: %w", err) + } + + durableEmitter.Start(ctx) + s.beholderClient.Emitter = dualEmitter + + s.Logger.Infow("Durable emitter enabled — all CloudEvent sources use the durable Chip queue") + return nil +} + // Stop closes resources and flushes logs. func (s *Server) Stop() { if s.beholderClient != nil { From 1837553ba7c3d0c11321eeb83ef65ab9d298620e Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 19 May 2026 15:02:04 -0400 Subject: [PATCH 02/14] Extract setup --- pkg/beholder/durable_emitter.go | 39 +++++++++++++++++++++++++++++++++ pkg/loop/server.go | 38 +------------------------------- 2 files changed, 40 insertions(+), 37 deletions(-) diff --git a/pkg/beholder/durable_emitter.go b/pkg/beholder/durable_emitter.go index 222ea4c368..15ecd02156 100644 --- a/pkg/beholder/durable_emitter.go +++ b/pkg/beholder/durable_emitter.go @@ -17,6 +17,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" ) // DurableEmitterConfig configures the DurableEmitter behaviour. @@ -103,6 +104,44 @@ func DefaultDurableEmitterConfig() DurableEmitterConfig { } } +// SetupDurableEmitter replaces client.Emitter with a DualSourceEmitter whose Chip +// sink is a DurableEmitter backed by a Postgres event store. CloudEvents are +// persisted to the DB before async delivery to Chip ingress, so they survive +// process restarts and chip ingress outages. +func SetupDurableEmitter(ctx context.Context, client *Client, ds sqlutil.DataSource, isHostProcess bool, lggr logger.SugaredLogger) error { + if client == nil { + return errors.New("beholder client not initialized") + } + chipClient := client.Chip + if chipClient == nil { + return errors.New("chip ingress client not available") + } + if _, noop := chipClient.(*chipingress.NoopClient); noop { + return errors.New("chip ingress client is a no-op; configure CL_CHIP_INGRESS_ENDPOINT") + } + if ds == nil { + return errors.New("durable emitter requires a database connection") + } + + pgStore := NewPgDurableEventStore(ds) + durableEmitter, err := NewDurableEmitter(pgStore, chipClient, isHostProcess, DefaultDurableEmitterConfig(), lggr) + if err != nil { + return fmt.Errorf("failed to create durable emitter: %w", err) + } + + otlpEmitter := NewMessageEmitter(client.MessageLoggerProvider.Logger("durable-emitter")) + dualEmitter, err := NewDualSourceEmitter(durableEmitter, otlpEmitter) + if err != nil { + return fmt.Errorf("failed to create dual source emitter: %w", err) + } + + durableEmitter.Start(ctx) + client.Emitter = dualEmitter + + lggr.Infow("Durable emitter enabled — all CloudEvent sources use the durable Chip queue") + return nil +} + // DurableEmitter implements Emitter with persistence-backed delivery guarantees. // // Emit writes to a DurableEventStore, returns nil after insert, and enqueues the diff --git a/pkg/loop/server.go b/pkg/loop/server.go index 7a8834a7b3..a3bcd393e3 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -242,7 +242,7 @@ func (s *Server) start(opts ...ServerOpt) error { } if s.EnvConfig.ChipIngressDurableEmitterEnabled { - if err := s.setupDurableEmitter(ctx); err != nil { + if err := beholder.SetupDurableEmitter(ctx, s.beholderClient, s.DataSource, false, s.Logger); err != nil { return err } } @@ -370,42 +370,6 @@ func (s *Server) startBeholderClient(ctx context.Context, beholderCfg beholder.C return nil } -func (s *Server) setupDurableEmitter(ctx context.Context) error { - if s.beholderClient == nil { - return fmt.Errorf("beholder client not initialized") - } - - chipClient := s.beholderClient.Chip - if chipClient == nil { - return fmt.Errorf("chip ingress client not available") - } - - if s.DataSource == nil { - return fmt.Errorf("durable emitter requires a database connection: set CL_DATABASE_URL") - } - - pgStore := beholder.NewPgDurableEventStore(s.DataSource) - durableCfg := beholder.DefaultDurableEmitterConfig() - durableEmitter, err := beholder.NewDurableEmitter(pgStore, chipClient, false, durableCfg, s.Logger) - if err != nil { - return fmt.Errorf("failed to create durable emitter: %w", err) - } - - // Build a new DualSourceEmitter: durable chip + OTLP. - messageLogger := s.beholderClient.MessageLoggerProvider.Logger("durable-emitter") - otlpEmitter := beholder.NewMessageEmitter(messageLogger) - dualEmitter, err := beholder.NewDualSourceEmitter(durableEmitter, otlpEmitter) - if err != nil { - return fmt.Errorf("failed to create dual source emitter: %w", err) - } - - durableEmitter.Start(ctx) - s.beholderClient.Emitter = dualEmitter - - s.Logger.Infow("Durable emitter enabled — all CloudEvent sources use the durable Chip queue") - return nil -} - // Stop closes resources and flushes logs. func (s *Server) Stop() { if s.beholderClient != nil { From 64c703a7403aecc4162d4d83b33ec3931724cc7a Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 19 May 2026 15:06:57 -0400 Subject: [PATCH 03/14] Update durable_emitter.go --- pkg/beholder/durable_emitter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/beholder/durable_emitter.go b/pkg/beholder/durable_emitter.go index 15ecd02156..9d6e6bf228 100644 --- a/pkg/beholder/durable_emitter.go +++ b/pkg/beholder/durable_emitter.go @@ -108,7 +108,7 @@ func DefaultDurableEmitterConfig() DurableEmitterConfig { // sink is a DurableEmitter backed by a Postgres event store. CloudEvents are // persisted to the DB before async delivery to Chip ingress, so they survive // process restarts and chip ingress outages. -func SetupDurableEmitter(ctx context.Context, client *Client, ds sqlutil.DataSource, isHostProcess bool, lggr logger.SugaredLogger) error { +func SetupDurableEmitter(ctx context.Context, client *Client, ds sqlutil.DataSource, isHostProcess bool, lggr logger.Logger) error { if client == nil { return errors.New("beholder client not initialized") } From 71a8313dfd8be56e869c9747b4c1b5ca18f3e78a Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 19 May 2026 15:13:45 -0400 Subject: [PATCH 04/14] Rename retransmit --- pkg/beholder/durable_emitter.go | 37 ++++++++++++++-------------- pkg/beholder/durable_emitter_test.go | 12 ++++----- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/pkg/beholder/durable_emitter.go b/pkg/beholder/durable_emitter.go index 9d6e6bf228..408e654ab6 100644 --- a/pkg/beholder/durable_emitter.go +++ b/pkg/beholder/durable_emitter.go @@ -108,7 +108,7 @@ func DefaultDurableEmitterConfig() DurableEmitterConfig { // sink is a DurableEmitter backed by a Postgres event store. CloudEvents are // persisted to the DB before async delivery to Chip ingress, so they survive // process restarts and chip ingress outages. -func SetupDurableEmitter(ctx context.Context, client *Client, ds sqlutil.DataSource, isHostProcess bool, lggr logger.Logger) error { +func SetupDurableEmitter(ctx context.Context, client *Client, ds sqlutil.DataSource, retransmit bool, lggr logger.Logger) error { if client == nil { return errors.New("beholder client not initialized") } @@ -124,7 +124,7 @@ func SetupDurableEmitter(ctx context.Context, client *Client, ds sqlutil.DataSou } pgStore := NewPgDurableEventStore(ds) - durableEmitter, err := NewDurableEmitter(pgStore, chipClient, isHostProcess, DefaultDurableEmitterConfig(), lggr) + durableEmitter, err := NewDurableEmitter(pgStore, chipClient, retransmit, DefaultDurableEmitterConfig(), lggr) if err != nil { return fmt.Errorf("failed to create durable emitter: %w", err) } @@ -174,9 +174,10 @@ type insertResult struct { type DurableEmitter struct { store DurableEventStore client chipingress.Client - // isHostProcess determines if the emitter runs retransmit and cleanup loops. - // Should be set to false when initialized inside LOOP plugins. - isHostProcess bool + // retransmit determines if the emitter runs the retransmit, expiry, and purge + // loops over the shared queue. Should be set to false when initialized inside + // LOOP plugins so a single host process owns reclamation and cleanup. + retransmit bool cfg DurableEmitterConfig log logger.Logger @@ -264,7 +265,7 @@ var _ Emitter = (*DurableEmitter)(nil) func NewDurableEmitter( store DurableEventStore, client chipingress.Client, - isHostProcess bool, + retransmit bool, cfg DurableEmitterConfig, log logger.Logger, ) (*DurableEmitter, error) { @@ -290,13 +291,13 @@ func NewDurableEmitter( store = newMetricsInstrumentedStore(store, m) } d := &DurableEmitter{ - store: store, - client: client, - isHostProcess: isHostProcess, - cfg: cfg, - log: log, - metrics: m, - stopCh: make(chan struct{}), + store: store, + client: client, + retransmit: retransmit, + cfg: cfg, + log: log, + metrics: m, + stopCh: make(chan struct{}), } if cp, ok := client.(grpcConnProvider); ok { d.rawConn = cp.Conn() @@ -341,7 +342,7 @@ func (d *DurableEmitter) Start(_ context.Context) { insertWorkers = 4 } - if d.isHostProcess { + if d.retransmit { d.wg.Go(d.retransmitLoop) if !d.cfg.DisablePruning { d.wg.Go(d.expiryLoop) @@ -858,13 +859,13 @@ func (d *DurableEmitter) retransmitPending() { return } - d.retransmit(pending) + d.retransmitBatch(pending) } -// retransmit enqueues pending DB rows to publishCh so the batch workers handle -// publishing. When rawConn is set, payloads are passed through without +// retransmitBatch enqueues pending DB rows to publishCh so the batch workers +// handle publishing. When rawConn is set, payloads are passed through without // proto.Unmarshal — the batch workers use buildBatchBytes for the wire format. -func (d *DurableEmitter) retransmit(pending []DurableEvent) { +func (d *DurableEmitter) retransmitBatch(pending []DurableEvent) { var enqueued int for _, pe := range pending { diff --git a/pkg/beholder/durable_emitter_test.go b/pkg/beholder/durable_emitter_test.go index ad96b53071..0b704d8860 100644 --- a/pkg/beholder/durable_emitter_test.go +++ b/pkg/beholder/durable_emitter_test.go @@ -224,7 +224,7 @@ func TestDurableEmitter_HooksPublishFailureSkipsMarkHook(t *testing.T) { assert.Equal(t, int32(0), markCalls.Load()) } -func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) { +func TestDurableEmitter_NoRetransmitSkipsRetransmitAndExpiry(t *testing.T) { store := NewMemDurableEventStore() client := &testChipClient{} client.setPublishErr(errors.New("chip unavailable")) @@ -249,14 +249,14 @@ func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) { return client.batchCount.Load() >= 1 && store.Len() == 1 }, 2*time.Second, 5*time.Millisecond, "initial PublishBatch should fail and leave the row") - // Several host-only ticks would have cleared or retried by now. + // Several retransmit-only ticks would have cleared or retried by now. time.Sleep(250 * time.Millisecond) - assert.Equal(t, 1, store.Len(), "non-host must not run retransmit or expiry loops") - assert.Equal(t, int64(1), client.batchCount.Load(), "non-host must not schedule extra PublishBatch via retransmit") + assert.Equal(t, 1, store.Len(), "retransmit=false must not run retransmit or expiry loops") + assert.Equal(t, int64(1), client.batchCount.Load(), "retransmit=false must not schedule extra PublishBatch via retransmit") } -func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) { +func TestDurableEmitter_NoRetransmitStillDeliversViaBatchWorkers(t *testing.T) { store := NewMemDurableEventStore() client := &testChipClient{} @@ -272,7 +272,7 @@ func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) require.Eventually(t, func() bool { return store.Len() == 0 && client.batchCount.Load() >= 1 - }, 2*time.Second, 10*time.Millisecond, "batch publish workers must still run when isHostProcess is false") + }, 2*time.Second, 10*time.Millisecond, "batch publish workers must still run when retransmit is false") } func TestDurableEmitter_EmitPersistsAndPublishes(t *testing.T) { From f23047d6fccf73d9051e43d11b21b5e2a80d5715 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 19 May 2026 15:23:06 -0400 Subject: [PATCH 05/14] Update config.go --- pkg/loop/config.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 78e1c1308e..88851b155d 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -84,9 +84,10 @@ const ( envTelemetryMetricCompressor = "CL_TELEMETRY_METRIC_COMPRESSOR" envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR" - envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" - envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" - envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" + envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" + envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" + envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" + envChipIngressDurableEmitterEnabled = "CL_CHIP_INGRESS_DURABLE_EMITTER_ENABLED" envCRESettings = cresettings.EnvNameSettings envCRESettingsDefault = cresettings.EnvNameSettingsDefault @@ -261,6 +262,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envChipIngressEndpoint, e.ChipIngressEndpoint) add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection)) add(envChipIngressBatchEmitterEnabled, strconv.FormatBool(e.ChipIngressBatchEmitterEnabled)) + add(envChipIngressDurableEmitterEnabled, strconv.FormatBool(e.ChipIngressDurableEmitterEnabled)) if e.CRESettings != "" { add(envCRESettings, e.CRESettings) @@ -496,6 +498,10 @@ func (e *EnvConfig) parse() error { if err != nil { return fmt.Errorf("failed to parse %s: %w", envChipIngressBatchEmitterEnabled, err) } + e.ChipIngressDurableEmitterEnabled, err = getBool(envChipIngressDurableEmitterEnabled) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envChipIngressDurableEmitterEnabled, err) + } } e.CRESettings = os.Getenv(envCRESettings) From 601238c52ef8460e7d3046bfdb620fff1dfee418 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 19 May 2026 15:56:27 -0400 Subject: [PATCH 06/14] Rename --- pkg/beholder/durable_emitter.go | 6 ++--- pkg/beholder/durable_event_store_pg.go | 32 +++++++++++++------------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/pkg/beholder/durable_emitter.go b/pkg/beholder/durable_emitter.go index 408e654ab6..0b5455c13b 100644 --- a/pkg/beholder/durable_emitter.go +++ b/pkg/beholder/durable_emitter.go @@ -123,7 +123,7 @@ func SetupDurableEmitter(ctx context.Context, client *Client, ds sqlutil.DataSou return errors.New("durable emitter requires a database connection") } - pgStore := NewPgDurableEventStore(ds) + pgStore := NewPGDurableEventStore(ds) durableEmitter, err := NewDurableEmitter(pgStore, chipClient, retransmit, DefaultDurableEmitterConfig(), lggr) if err != nil { return fmt.Errorf("failed to create durable emitter: %w", err) @@ -178,8 +178,8 @@ type DurableEmitter struct { // loops over the shared queue. Should be set to false when initialized inside // LOOP plugins so a single host process owns reclamation and cleanup. retransmit bool - cfg DurableEmitterConfig - log logger.Logger + cfg DurableEmitterConfig + log logger.Logger metrics *durableEmitterMetrics diff --git a/pkg/beholder/durable_event_store_pg.go b/pkg/beholder/durable_event_store_pg.go index ad41337004..3e3711b136 100644 --- a/pkg/beholder/durable_event_store_pg.go +++ b/pkg/beholder/durable_event_store_pg.go @@ -13,22 +13,22 @@ import ( const chipDurableEventsTable = "cre.chip_durable_events" -// PgDurableEventStore is a Postgres-backed implementation of DurableEventStore. -type PgDurableEventStore struct { +// PGDurableEventStore is a Postgres-backed implementation of DurableEventStore. +type PGDurableEventStore struct { ds sqlutil.DataSource } var ( - _ DurableEventStore = (*PgDurableEventStore)(nil) - _ DurableQueueObserver = (*PgDurableEventStore)(nil) - _ BatchInserter = (*PgDurableEventStore)(nil) + _ DurableEventStore = (*PGDurableEventStore)(nil) + _ DurableQueueObserver = (*PGDurableEventStore)(nil) + _ BatchInserter = (*PGDurableEventStore)(nil) ) -func NewPgDurableEventStore(ds sqlutil.DataSource) *PgDurableEventStore { - return &PgDurableEventStore{ds: ds} +func NewPGDurableEventStore(ds sqlutil.DataSource) *PGDurableEventStore { + return &PGDurableEventStore{ds: ds} } -func (s *PgDurableEventStore) Insert(ctx context.Context, payload []byte) (int64, error) { +func (s *PGDurableEventStore) Insert(ctx context.Context, payload []byte) (int64, error) { const q = `INSERT INTO ` + chipDurableEventsTable + ` (payload) VALUES ($1) RETURNING id` var id int64 if err := s.ds.GetContext(ctx, &id, q, payload); err != nil { @@ -37,7 +37,7 @@ func (s *PgDurableEventStore) Insert(ctx context.Context, payload []byte) (int64 return id, nil } -func (s *PgDurableEventStore) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error) { +func (s *PGDurableEventStore) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error) { if len(payloads) == 0 { return nil, nil } @@ -60,7 +60,7 @@ func (s *PgDurableEventStore) InsertBatch(ctx context.Context, payloads [][]byte return ids, nil } -func (s *PgDurableEventStore) Delete(ctx context.Context, id int64) error { +func (s *PGDurableEventStore) Delete(ctx context.Context, id int64) error { const q = `DELETE FROM ` + chipDurableEventsTable + ` WHERE id = $1` if _, err := s.ds.ExecContext(ctx, q, id); err != nil { return fmt.Errorf("failed to delete chip durable event id=%d: %w", id, err) @@ -68,7 +68,7 @@ func (s *PgDurableEventStore) Delete(ctx context.Context, id int64) error { return nil } -func (s *PgDurableEventStore) MarkDelivered(ctx context.Context, id int64) error { +func (s *PGDurableEventStore) MarkDelivered(ctx context.Context, id int64) error { const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = $1 AND delivered_at IS NULL` if _, err := s.ds.ExecContext(ctx, q, id); err != nil { return fmt.Errorf("failed to mark chip durable event delivered id=%d: %w", id, err) @@ -76,7 +76,7 @@ func (s *PgDurableEventStore) MarkDelivered(ctx context.Context, id int64) error return nil } -func (s *PgDurableEventStore) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error) { +func (s *PGDurableEventStore) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error) { if len(ids) == 0 { return 0, nil } @@ -89,7 +89,7 @@ func (s *PgDurableEventStore) MarkDeliveredBatch(ctx context.Context, ids []int6 return n, nil } -func (s *PgDurableEventStore) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) { +func (s *PGDurableEventStore) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) { if batchLimit <= 0 { return 0, nil } @@ -113,7 +113,7 @@ USING picked WHERE t.id = picked.id` return n, nil } -func (s *PgDurableEventStore) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error) { +func (s *PGDurableEventStore) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error) { const q = ` SELECT id, payload, created_at FROM ` + chipDurableEventsTable + ` @@ -144,7 +144,7 @@ LIMIT $2` return out, nil } -func (s *PgDurableEventStore) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) { +func (s *PGDurableEventStore) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) { const q = ` WITH deleted AS ( DELETE FROM ` + chipDurableEventsTable + ` @@ -167,7 +167,7 @@ type chipDurableQueueAgg struct { } // ObserveDurableQueue implements DurableQueueObserver for queue depth / age gauges. -func (s *PgDurableEventStore) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error) { +func (s *PGDurableEventStore) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error) { const qAgg = ` SELECT count(*)::bigint AS cnt, From 63cb60964af1ace99b85b296b58189297360b37d Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 19 May 2026 16:27:07 -0400 Subject: [PATCH 07/14] Move event store --- pkg/beholder/durable_emitter.go | 20 +++---- .../store.go} | 52 +++++++++++-------- pkg/loop/server.go | 6 ++- 3 files changed, 45 insertions(+), 33 deletions(-) rename pkg/beholder/{durable_event_store_pg.go => pgstore/store.go} (70%) diff --git a/pkg/beholder/durable_emitter.go b/pkg/beholder/durable_emitter.go index 0b5455c13b..7859ed04a7 100644 --- a/pkg/beholder/durable_emitter.go +++ b/pkg/beholder/durable_emitter.go @@ -17,7 +17,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" ) // DurableEmitterConfig configures the DurableEmitter behaviour. @@ -105,10 +104,14 @@ func DefaultDurableEmitterConfig() DurableEmitterConfig { } // SetupDurableEmitter replaces client.Emitter with a DualSourceEmitter whose Chip -// sink is a DurableEmitter backed by a Postgres event store. CloudEvents are -// persisted to the DB before async delivery to Chip ingress, so they survive -// process restarts and chip ingress outages. -func SetupDurableEmitter(ctx context.Context, client *Client, ds sqlutil.DataSource, retransmit bool, lggr logger.Logger) error { +// sink is a DurableEmitter backed by the supplied store. CloudEvents are persisted +// before async delivery to Chip ingress, so they survive process restarts and chip +// ingress outages. +// +// store is the persistence layer. Callers in a Postgres environment should pass +// pgstore.New(ds); the indirection keeps the lib/pq driver out of consumers that +// only need the beholder API (e.g. wasip1 builds of the workflow runtime). +func SetupDurableEmitter(ctx context.Context, client *Client, store DurableEventStore, retransmit bool, lggr logger.Logger) error { if client == nil { return errors.New("beholder client not initialized") } @@ -119,12 +122,11 @@ func SetupDurableEmitter(ctx context.Context, client *Client, ds sqlutil.DataSou if _, noop := chipClient.(*chipingress.NoopClient); noop { return errors.New("chip ingress client is a no-op; configure CL_CHIP_INGRESS_ENDPOINT") } - if ds == nil { - return errors.New("durable emitter requires a database connection") + if store == nil { + return errors.New("durable emitter requires a non-nil DurableEventStore") } - pgStore := NewPGDurableEventStore(ds) - durableEmitter, err := NewDurableEmitter(pgStore, chipClient, retransmit, DefaultDurableEmitterConfig(), lggr) + durableEmitter, err := NewDurableEmitter(store, chipClient, retransmit, DefaultDurableEmitterConfig(), lggr) if err != nil { return fmt.Errorf("failed to create durable emitter: %w", err) } diff --git a/pkg/beholder/durable_event_store_pg.go b/pkg/beholder/pgstore/store.go similarity index 70% rename from pkg/beholder/durable_event_store_pg.go rename to pkg/beholder/pgstore/store.go index 3e3711b136..9a39e8c184 100644 --- a/pkg/beholder/durable_event_store_pg.go +++ b/pkg/beholder/pgstore/store.go @@ -1,4 +1,8 @@ -package beholder +// Package pgstore provides a Postgres-backed implementation of +// beholder.DurableEventStore. It is kept in a sibling package to pkg/beholder +// so that consumers of the beholder API (including builds targeting wasip1) +// do not transitively import lib/pq. +package pgstore import ( "context" @@ -8,27 +12,29 @@ import ( "github.com/lib/pq" + "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" ) const chipDurableEventsTable = "cre.chip_durable_events" -// PGDurableEventStore is a Postgres-backed implementation of DurableEventStore. -type PGDurableEventStore struct { +// Store is a Postgres-backed implementation of beholder.DurableEventStore. +type Store struct { ds sqlutil.DataSource } var ( - _ DurableEventStore = (*PGDurableEventStore)(nil) - _ DurableQueueObserver = (*PGDurableEventStore)(nil) - _ BatchInserter = (*PGDurableEventStore)(nil) + _ beholder.DurableEventStore = (*Store)(nil) + _ beholder.DurableQueueObserver = (*Store)(nil) + _ beholder.BatchInserter = (*Store)(nil) ) -func NewPGDurableEventStore(ds sqlutil.DataSource) *PGDurableEventStore { - return &PGDurableEventStore{ds: ds} +// New returns a Postgres-backed DurableEventStore bound to ds. +func New(ds sqlutil.DataSource) *Store { + return &Store{ds: ds} } -func (s *PGDurableEventStore) Insert(ctx context.Context, payload []byte) (int64, error) { +func (s *Store) Insert(ctx context.Context, payload []byte) (int64, error) { const q = `INSERT INTO ` + chipDurableEventsTable + ` (payload) VALUES ($1) RETURNING id` var id int64 if err := s.ds.GetContext(ctx, &id, q, payload); err != nil { @@ -37,7 +43,7 @@ func (s *PGDurableEventStore) Insert(ctx context.Context, payload []byte) (int64 return id, nil } -func (s *PGDurableEventStore) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error) { +func (s *Store) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error) { if len(payloads) == 0 { return nil, nil } @@ -60,7 +66,7 @@ func (s *PGDurableEventStore) InsertBatch(ctx context.Context, payloads [][]byte return ids, nil } -func (s *PGDurableEventStore) Delete(ctx context.Context, id int64) error { +func (s *Store) Delete(ctx context.Context, id int64) error { const q = `DELETE FROM ` + chipDurableEventsTable + ` WHERE id = $1` if _, err := s.ds.ExecContext(ctx, q, id); err != nil { return fmt.Errorf("failed to delete chip durable event id=%d: %w", id, err) @@ -68,7 +74,7 @@ func (s *PGDurableEventStore) Delete(ctx context.Context, id int64) error { return nil } -func (s *PGDurableEventStore) MarkDelivered(ctx context.Context, id int64) error { +func (s *Store) MarkDelivered(ctx context.Context, id int64) error { const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = $1 AND delivered_at IS NULL` if _, err := s.ds.ExecContext(ctx, q, id); err != nil { return fmt.Errorf("failed to mark chip durable event delivered id=%d: %w", id, err) @@ -76,7 +82,7 @@ func (s *PGDurableEventStore) MarkDelivered(ctx context.Context, id int64) error return nil } -func (s *PGDurableEventStore) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error) { +func (s *Store) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error) { if len(ids) == 0 { return 0, nil } @@ -89,7 +95,7 @@ func (s *PGDurableEventStore) MarkDeliveredBatch(ctx context.Context, ids []int6 return n, nil } -func (s *PGDurableEventStore) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) { +func (s *Store) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) { if batchLimit <= 0 { return 0, nil } @@ -113,7 +119,7 @@ USING picked WHERE t.id = picked.id` return n, nil } -func (s *PGDurableEventStore) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error) { +func (s *Store) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]beholder.DurableEvent, error) { const q = ` SELECT id, payload, created_at FROM ` + chipDurableEventsTable + ` @@ -133,9 +139,9 @@ LIMIT $2` return nil, fmt.Errorf("failed to list pending chip durable events: %w", err) } - out := make([]DurableEvent, 0, len(rows)) + out := make([]beholder.DurableEvent, 0, len(rows)) for _, r := range rows { - out = append(out, DurableEvent{ + out = append(out, beholder.DurableEvent{ ID: r.ID, Payload: r.Payload, CreatedAt: r.CreatedAt, @@ -144,7 +150,7 @@ LIMIT $2` return out, nil } -func (s *PGDurableEventStore) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) { +func (s *Store) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) { const q = ` WITH deleted AS ( DELETE FROM ` + chipDurableEventsTable + ` @@ -166,8 +172,8 @@ type chipDurableQueueAgg struct { MinCreated *time.Time `db:"min_created"` } -// ObserveDurableQueue implements DurableQueueObserver for queue depth / age gauges. -func (s *PGDurableEventStore) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error) { +// ObserveDurableQueue implements beholder.DurableQueueObserver for queue depth / age gauges. +func (s *Store) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (beholder.DurableQueueStats, error) { const qAgg = ` SELECT count(*)::bigint AS cnt, @@ -178,9 +184,9 @@ WHERE delivered_at IS NULL` var row chipDurableQueueAgg if err := s.ds.GetContext(ctx, &row, qAgg); err != nil { - return DurableQueueStats{}, fmt.Errorf("durable queue aggregate: %w", err) + return beholder.DurableQueueStats{}, fmt.Errorf("durable queue aggregate: %w", err) } - var st DurableQueueStats + var st beholder.DurableQueueStats st.Depth = row.Cnt st.PayloadBytes = row.PayloadSum if row.MinCreated != nil { @@ -196,7 +202,7 @@ WHERE delivered_at IS NULL AND created_at >= now() - ($1::bigint * interval '1 second') AND created_at < now() - (($1::bigint - $2::bigint) * interval '1 second')` if err := s.ds.GetContext(ctx, &st.NearTTLCount, qNear, ttlSec, leadSec); err != nil { - return DurableQueueStats{}, fmt.Errorf("durable queue near-ttl: %w", err) + return beholder.DurableQueueStats{}, fmt.Errorf("durable queue near-ttl: %w", err) } } return st, nil diff --git a/pkg/loop/server.go b/pkg/loop/server.go index a3bcd393e3..a4c6d61cf1 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -19,6 +19,7 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/beholder/pgstore" "github.com/smartcontractkit/chainlink-common/pkg/config/build" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -242,7 +243,10 @@ func (s *Server) start(opts ...ServerOpt) error { } if s.EnvConfig.ChipIngressDurableEmitterEnabled { - if err := beholder.SetupDurableEmitter(ctx, s.beholderClient, s.DataSource, false, s.Logger); err != nil { + if s.DataSource == nil { + return fmt.Errorf("durable emitter requires a database connection: set CL_DATABASE_URL") + } + if err := beholder.SetupDurableEmitter(ctx, s.beholderClient, pgstore.New(s.DataSource), false, s.Logger); err != nil { return err } } From 972243c949bb61f1e707a9eff863be42042a17be Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 20 May 2026 09:25:44 -0400 Subject: [PATCH 08/14] Rename store --- pkg/beholder/durable_emitter.go | 5 +++-- .../store.go => durable_events/durable_event_store.go} | 4 ++-- pkg/loop/server.go | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) rename pkg/beholder/{pgstore/store.go => durable_events/durable_event_store.go} (98%) diff --git a/pkg/beholder/durable_emitter.go b/pkg/beholder/durable_emitter.go index 7859ed04a7..e9880af431 100644 --- a/pkg/beholder/durable_emitter.go +++ b/pkg/beholder/durable_emitter.go @@ -109,8 +109,9 @@ func DefaultDurableEmitterConfig() DurableEmitterConfig { // ingress outages. // // store is the persistence layer. Callers in a Postgres environment should pass -// pgstore.New(ds); the indirection keeps the lib/pq driver out of consumers that -// only need the beholder API (e.g. wasip1 builds of the workflow runtime). +// durable_events.New(ds); the indirection keeps the lib/pq driver out of +// consumers that only need the beholder API (e.g. wasip1 builds of the workflow +// runtime). func SetupDurableEmitter(ctx context.Context, client *Client, store DurableEventStore, retransmit bool, lggr logger.Logger) error { if client == nil { return errors.New("beholder client not initialized") diff --git a/pkg/beholder/pgstore/store.go b/pkg/beholder/durable_events/durable_event_store.go similarity index 98% rename from pkg/beholder/pgstore/store.go rename to pkg/beholder/durable_events/durable_event_store.go index 9a39e8c184..3bb0a69bfd 100644 --- a/pkg/beholder/pgstore/store.go +++ b/pkg/beholder/durable_events/durable_event_store.go @@ -1,8 +1,8 @@ -// Package pgstore provides a Postgres-backed implementation of +// Package durable_events provides a Postgres-backed implementation of // beholder.DurableEventStore. It is kept in a sibling package to pkg/beholder // so that consumers of the beholder API (including builds targeting wasip1) // do not transitively import lib/pq. -package pgstore +package durable_events import ( "context" diff --git a/pkg/loop/server.go b/pkg/loop/server.go index a4c6d61cf1..abcc6811f8 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -19,7 +19,7 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "github.com/smartcontractkit/chainlink-common/pkg/beholder" - "github.com/smartcontractkit/chainlink-common/pkg/beholder/pgstore" + "github.com/smartcontractkit/chainlink-common/pkg/beholder/durable_events" "github.com/smartcontractkit/chainlink-common/pkg/config/build" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -246,7 +246,7 @@ func (s *Server) start(opts ...ServerOpt) error { if s.DataSource == nil { return fmt.Errorf("durable emitter requires a database connection: set CL_DATABASE_URL") } - if err := beholder.SetupDurableEmitter(ctx, s.beholderClient, pgstore.New(s.DataSource), false, s.Logger); err != nil { + if err := beholder.SetupDurableEmitter(ctx, s.beholderClient, durable_events.New(s.DataSource), false, s.Logger); err != nil { return err } } From c8bb5e38686552cca690f66a27f689c8b1cd52e0 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 20 May 2026 10:09:44 -0400 Subject: [PATCH 09/14] Move durable emitter to beholder start --- .../beholderstore/durable_event_store.go | 209 ++++++++++++++++++ pkg/beholder/durable_emitter.go | 5 - pkg/loop/server.go | 20 +- 3 files changed, 220 insertions(+), 14 deletions(-) create mode 100644 pkg/beholder/beholderstore/durable_event_store.go diff --git a/pkg/beholder/beholderstore/durable_event_store.go b/pkg/beholder/beholderstore/durable_event_store.go new file mode 100644 index 0000000000..2bed8b8beb --- /dev/null +++ b/pkg/beholder/beholderstore/durable_event_store.go @@ -0,0 +1,209 @@ +// Package beholderstore provides a Postgres-backed implementation of +// beholder.DurableEventStore. It is kept in a sibling package to pkg/beholder +// so that consumers of the beholder API (including builds targeting wasip1) +// do not transitively import lib/pq. +package beholderstore + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/lib/pq" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" +) + +const chipDurableEventsTable = "cre.chip_durable_events" + +// Store is a Postgres-backed implementation of beholder.DurableEventStore. +type Store struct { + ds sqlutil.DataSource +} + +var ( + _ beholder.DurableEventStore = (*Store)(nil) + _ beholder.DurableQueueObserver = (*Store)(nil) + _ beholder.BatchInserter = (*Store)(nil) +) + +// New returns a Postgres-backed DurableEventStore bound to ds. +func New(ds sqlutil.DataSource) *Store { + return &Store{ds: ds} +} + +func (s *Store) Insert(ctx context.Context, payload []byte) (int64, error) { + const q = `INSERT INTO ` + chipDurableEventsTable + ` (payload) VALUES ($1) RETURNING id` + var id int64 + if err := s.ds.GetContext(ctx, &id, q, payload); err != nil { + return 0, fmt.Errorf("failed to insert chip durable event: %w", err) + } + return id, nil +} + +func (s *Store) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error) { + if len(payloads) == 0 { + return nil, nil + } + placeholders := make([]string, len(payloads)) + args := make([]interface{}, len(payloads)) + for i, p := range payloads { + placeholders[i] = fmt.Sprintf("($%d)", i+1) + args[i] = p + } + q := fmt.Sprintf( + "INSERT INTO %s (payload) VALUES %s RETURNING id", + chipDurableEventsTable, + strings.Join(placeholders, ","), + ) + + var ids []int64 + if err := s.ds.SelectContext(ctx, &ids, q, args...); err != nil { + return nil, fmt.Errorf("failed to batch insert chip durable events: %w", err) + } + return ids, nil +} + +func (s *Store) Delete(ctx context.Context, id int64) error { + const q = `DELETE FROM ` + chipDurableEventsTable + ` WHERE id = $1` + if _, err := s.ds.ExecContext(ctx, q, id); err != nil { + return fmt.Errorf("failed to delete chip durable event id=%d: %w", id, err) + } + return nil +} + +func (s *Store) MarkDelivered(ctx context.Context, id int64) error { + const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = $1 AND delivered_at IS NULL` + if _, err := s.ds.ExecContext(ctx, q, id); err != nil { + return fmt.Errorf("failed to mark chip durable event delivered id=%d: %w", id, err) + } + return nil +} + +func (s *Store) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error) { + if len(ids) == 0 { + return 0, nil + } + const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = ANY($1) AND delivered_at IS NULL` + res, err := s.ds.ExecContext(ctx, q, pq.Array(ids)) + if err != nil { + return 0, fmt.Errorf("failed to batch mark chip durable events delivered: %w", err) + } + n, _ := res.RowsAffected() + return n, nil +} + +func (s *Store) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) { + if batchLimit <= 0 { + return 0, nil + } + const q = ` +WITH picked AS ( + SELECT id FROM ` + chipDurableEventsTable + ` + WHERE delivered_at IS NOT NULL + ORDER BY delivered_at ASC + LIMIT $1 +) +DELETE FROM ` + chipDurableEventsTable + ` AS t +USING picked WHERE t.id = picked.id` + res, err := s.ds.ExecContext(ctx, q, batchLimit) + if err != nil { + return 0, fmt.Errorf("failed to purge delivered chip durable events: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return 0, fmt.Errorf("purge delivered rows affected: %w", err) + } + return n, nil +} + +func (s *Store) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]beholder.DurableEvent, error) { + const q = ` +SELECT id, payload, created_at +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL + AND created_at < $1 +ORDER BY created_at ASC +LIMIT $2` + + type row struct { + ID int64 `db:"id"` + Payload []byte `db:"payload"` + CreatedAt time.Time `db:"created_at"` + } + + var rows []row + if err := s.ds.SelectContext(ctx, &rows, q, createdBefore, limit); err != nil { + return nil, fmt.Errorf("failed to list pending chip durable events: %w", err) + } + + out := make([]beholder.DurableEvent, 0, len(rows)) + for _, r := range rows { + out = append(out, beholder.DurableEvent{ + ID: r.ID, + Payload: r.Payload, + CreatedAt: r.CreatedAt, + }) + } + return out, nil +} + +func (s *Store) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) { + const q = ` +WITH deleted AS ( + DELETE FROM ` + chipDurableEventsTable + ` + WHERE created_at <= now() - $1::interval + RETURNING id +) +SELECT count(*) FROM deleted` + + var count int64 + if err := s.ds.GetContext(ctx, &count, q, ttl.String()); err != nil { + return 0, fmt.Errorf("failed to delete expired chip durable events: %w", err) + } + return count, nil +} + +type chipDurableQueueAgg struct { + Cnt int64 `db:"cnt"` + PayloadSum int64 `db:"payload_sum"` + MinCreated *time.Time `db:"min_created"` +} + +// ObserveDurableQueue implements beholder.DurableQueueObserver for queue depth / age gauges. +func (s *Store) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (beholder.DurableQueueStats, error) { + const qAgg = ` +SELECT + count(*)::bigint AS cnt, + coalesce(sum(octet_length(payload)), 0)::bigint AS payload_sum, + min(created_at) AS min_created +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL` + + var row chipDurableQueueAgg + if err := s.ds.GetContext(ctx, &row, qAgg); err != nil { + return beholder.DurableQueueStats{}, fmt.Errorf("durable queue aggregate: %w", err) + } + var st beholder.DurableQueueStats + st.Depth = row.Cnt + st.PayloadBytes = row.PayloadSum + if row.MinCreated != nil { + st.OldestPendingAge = time.Since(*row.MinCreated) + } + if eventTTL > 0 && nearExpiryLead > 0 && nearExpiryLead < eventTTL { + ttlSec := int64(eventTTL.Round(time.Second) / time.Second) + leadSec := int64(nearExpiryLead.Round(time.Second) / time.Second) + const qNear = ` +SELECT count(*)::bigint +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL + AND created_at >= now() - ($1::bigint * interval '1 second') + AND created_at < now() - (($1::bigint - $2::bigint) * interval '1 second')` + if err := s.ds.GetContext(ctx, &st.NearTTLCount, qNear, ttlSec, leadSec); err != nil { + return beholder.DurableQueueStats{}, fmt.Errorf("durable queue near-ttl: %w", err) + } + } + return st, nil +} diff --git a/pkg/beholder/durable_emitter.go b/pkg/beholder/durable_emitter.go index e9880af431..2f6d9e62b2 100644 --- a/pkg/beholder/durable_emitter.go +++ b/pkg/beholder/durable_emitter.go @@ -107,11 +107,6 @@ func DefaultDurableEmitterConfig() DurableEmitterConfig { // sink is a DurableEmitter backed by the supplied store. CloudEvents are persisted // before async delivery to Chip ingress, so they survive process restarts and chip // ingress outages. -// -// store is the persistence layer. Callers in a Postgres environment should pass -// durable_events.New(ds); the indirection keeps the lib/pq driver out of -// consumers that only need the beholder API (e.g. wasip1 builds of the workflow -// runtime). func SetupDurableEmitter(ctx context.Context, client *Client, store DurableEventStore, retransmit bool, lggr logger.Logger) error { if client == nil { return errors.New("beholder client not initialized") diff --git a/pkg/loop/server.go b/pkg/loop/server.go index abcc6811f8..dec0650494 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -241,15 +241,6 @@ func (s *Server) start(opts ...ServerOpt) error { if err := s.startBeholderClient(ctx, beholderCfg); err != nil { return err } - - if s.EnvConfig.ChipIngressDurableEmitterEnabled { - if s.DataSource == nil { - return fmt.Errorf("durable emitter requires a database connection: set CL_DATABASE_URL") - } - if err := beholder.SetupDurableEmitter(ctx, s.beholderClient, durable_events.New(s.DataSource), false, s.Logger); err != nil { - return err - } - } } if addr := s.EnvConfig.PyroscopeServerAddress; addr != "" { @@ -356,6 +347,17 @@ func (s *Server) startBeholderClient(ctx context.Context, beholderCfg beholder.C if err != nil { return fmt.Errorf("failed to create beholder client: %w", err) } + + if s.EnvConfig.ChipIngressDurableEmitterEnabled { + if s.DataSource == nil { + return fmt.Errorf("durable emitter requires a database connection: set CL_DATABASE_URL") + } + err = beholder.SetupDurableEmitter(ctx, beholderClient, durable_events.New(s.DataSource), false, s.Logger) + if err != nil { + return err + } + } + if err := beholderClient.Start(ctx); err != nil { return fmt.Errorf("failed to start beholder client: %w", err) } From a612830b901604da3b91a79163108a3d1ba4f4cb Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 20 May 2026 10:11:26 -0400 Subject: [PATCH 10/14] Delete durable_event_store.go --- .../durable_events/durable_event_store.go | 209 ------------------ 1 file changed, 209 deletions(-) delete mode 100644 pkg/beholder/durable_events/durable_event_store.go diff --git a/pkg/beholder/durable_events/durable_event_store.go b/pkg/beholder/durable_events/durable_event_store.go deleted file mode 100644 index 3bb0a69bfd..0000000000 --- a/pkg/beholder/durable_events/durable_event_store.go +++ /dev/null @@ -1,209 +0,0 @@ -// Package durable_events provides a Postgres-backed implementation of -// beholder.DurableEventStore. It is kept in a sibling package to pkg/beholder -// so that consumers of the beholder API (including builds targeting wasip1) -// do not transitively import lib/pq. -package durable_events - -import ( - "context" - "fmt" - "strings" - "time" - - "github.com/lib/pq" - - "github.com/smartcontractkit/chainlink-common/pkg/beholder" - "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" -) - -const chipDurableEventsTable = "cre.chip_durable_events" - -// Store is a Postgres-backed implementation of beholder.DurableEventStore. -type Store struct { - ds sqlutil.DataSource -} - -var ( - _ beholder.DurableEventStore = (*Store)(nil) - _ beholder.DurableQueueObserver = (*Store)(nil) - _ beholder.BatchInserter = (*Store)(nil) -) - -// New returns a Postgres-backed DurableEventStore bound to ds. -func New(ds sqlutil.DataSource) *Store { - return &Store{ds: ds} -} - -func (s *Store) Insert(ctx context.Context, payload []byte) (int64, error) { - const q = `INSERT INTO ` + chipDurableEventsTable + ` (payload) VALUES ($1) RETURNING id` - var id int64 - if err := s.ds.GetContext(ctx, &id, q, payload); err != nil { - return 0, fmt.Errorf("failed to insert chip durable event: %w", err) - } - return id, nil -} - -func (s *Store) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error) { - if len(payloads) == 0 { - return nil, nil - } - placeholders := make([]string, len(payloads)) - args := make([]interface{}, len(payloads)) - for i, p := range payloads { - placeholders[i] = fmt.Sprintf("($%d)", i+1) - args[i] = p - } - q := fmt.Sprintf( - "INSERT INTO %s (payload) VALUES %s RETURNING id", - chipDurableEventsTable, - strings.Join(placeholders, ","), - ) - - var ids []int64 - if err := s.ds.SelectContext(ctx, &ids, q, args...); err != nil { - return nil, fmt.Errorf("failed to batch insert chip durable events: %w", err) - } - return ids, nil -} - -func (s *Store) Delete(ctx context.Context, id int64) error { - const q = `DELETE FROM ` + chipDurableEventsTable + ` WHERE id = $1` - if _, err := s.ds.ExecContext(ctx, q, id); err != nil { - return fmt.Errorf("failed to delete chip durable event id=%d: %w", id, err) - } - return nil -} - -func (s *Store) MarkDelivered(ctx context.Context, id int64) error { - const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = $1 AND delivered_at IS NULL` - if _, err := s.ds.ExecContext(ctx, q, id); err != nil { - return fmt.Errorf("failed to mark chip durable event delivered id=%d: %w", id, err) - } - return nil -} - -func (s *Store) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error) { - if len(ids) == 0 { - return 0, nil - } - const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = ANY($1) AND delivered_at IS NULL` - res, err := s.ds.ExecContext(ctx, q, pq.Array(ids)) - if err != nil { - return 0, fmt.Errorf("failed to batch mark chip durable events delivered: %w", err) - } - n, _ := res.RowsAffected() - return n, nil -} - -func (s *Store) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) { - if batchLimit <= 0 { - return 0, nil - } - const q = ` -WITH picked AS ( - SELECT id FROM ` + chipDurableEventsTable + ` - WHERE delivered_at IS NOT NULL - ORDER BY delivered_at ASC - LIMIT $1 -) -DELETE FROM ` + chipDurableEventsTable + ` AS t -USING picked WHERE t.id = picked.id` - res, err := s.ds.ExecContext(ctx, q, batchLimit) - if err != nil { - return 0, fmt.Errorf("failed to purge delivered chip durable events: %w", err) - } - n, err := res.RowsAffected() - if err != nil { - return 0, fmt.Errorf("purge delivered rows affected: %w", err) - } - return n, nil -} - -func (s *Store) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]beholder.DurableEvent, error) { - const q = ` -SELECT id, payload, created_at -FROM ` + chipDurableEventsTable + ` -WHERE delivered_at IS NULL - AND created_at < $1 -ORDER BY created_at ASC -LIMIT $2` - - type row struct { - ID int64 `db:"id"` - Payload []byte `db:"payload"` - CreatedAt time.Time `db:"created_at"` - } - - var rows []row - if err := s.ds.SelectContext(ctx, &rows, q, createdBefore, limit); err != nil { - return nil, fmt.Errorf("failed to list pending chip durable events: %w", err) - } - - out := make([]beholder.DurableEvent, 0, len(rows)) - for _, r := range rows { - out = append(out, beholder.DurableEvent{ - ID: r.ID, - Payload: r.Payload, - CreatedAt: r.CreatedAt, - }) - } - return out, nil -} - -func (s *Store) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) { - const q = ` -WITH deleted AS ( - DELETE FROM ` + chipDurableEventsTable + ` - WHERE created_at <= now() - $1::interval - RETURNING id -) -SELECT count(*) FROM deleted` - - var count int64 - if err := s.ds.GetContext(ctx, &count, q, ttl.String()); err != nil { - return 0, fmt.Errorf("failed to delete expired chip durable events: %w", err) - } - return count, nil -} - -type chipDurableQueueAgg struct { - Cnt int64 `db:"cnt"` - PayloadSum int64 `db:"payload_sum"` - MinCreated *time.Time `db:"min_created"` -} - -// ObserveDurableQueue implements beholder.DurableQueueObserver for queue depth / age gauges. -func (s *Store) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (beholder.DurableQueueStats, error) { - const qAgg = ` -SELECT - count(*)::bigint AS cnt, - coalesce(sum(octet_length(payload)), 0)::bigint AS payload_sum, - min(created_at) AS min_created -FROM ` + chipDurableEventsTable + ` -WHERE delivered_at IS NULL` - - var row chipDurableQueueAgg - if err := s.ds.GetContext(ctx, &row, qAgg); err != nil { - return beholder.DurableQueueStats{}, fmt.Errorf("durable queue aggregate: %w", err) - } - var st beholder.DurableQueueStats - st.Depth = row.Cnt - st.PayloadBytes = row.PayloadSum - if row.MinCreated != nil { - st.OldestPendingAge = time.Since(*row.MinCreated) - } - if eventTTL > 0 && nearExpiryLead > 0 && nearExpiryLead < eventTTL { - ttlSec := int64(eventTTL.Round(time.Second) / time.Second) - leadSec := int64(nearExpiryLead.Round(time.Second) / time.Second) - const qNear = ` -SELECT count(*)::bigint -FROM ` + chipDurableEventsTable + ` -WHERE delivered_at IS NULL - AND created_at >= now() - ($1::bigint * interval '1 second') - AND created_at < now() - (($1::bigint - $2::bigint) * interval '1 second')` - if err := s.ds.GetContext(ctx, &st.NearTTLCount, qNear, ttlSec, leadSec); err != nil { - return beholder.DurableQueueStats{}, fmt.Errorf("durable queue near-ttl: %w", err) - } - } - return st, nil -} From eb1cd2ca1e8b00d36d138f364f5a8562a3823365 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 20 May 2026 10:21:30 -0400 Subject: [PATCH 11/14] Update server.go --- pkg/loop/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/loop/server.go b/pkg/loop/server.go index dec0650494..b1658085a5 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -19,7 +19,7 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "github.com/smartcontractkit/chainlink-common/pkg/beholder" - "github.com/smartcontractkit/chainlink-common/pkg/beholder/durable_events" + "github.com/smartcontractkit/chainlink-common/pkg/beholder/beholderstore" "github.com/smartcontractkit/chainlink-common/pkg/config/build" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -352,7 +352,7 @@ func (s *Server) startBeholderClient(ctx context.Context, beholderCfg beholder.C if s.DataSource == nil { return fmt.Errorf("durable emitter requires a database connection: set CL_DATABASE_URL") } - err = beholder.SetupDurableEmitter(ctx, beholderClient, durable_events.New(s.DataSource), false, s.Logger) + err = beholder.SetupDurableEmitter(ctx, beholderClient, beholderstore.New(s.DataSource), false, s.Logger) if err != nil { return err } From 5acdb28b34e3adb7f04183243d9535ed3a950727 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 20 May 2026 11:58:41 -0400 Subject: [PATCH 12/14] Move setup to client method --- pkg/beholder/client.go | 44 +++++++++++++++++++++++++++++++++ pkg/beholder/durable_emitter.go | 37 --------------------------- pkg/loop/server.go | 6 ++++- 3 files changed, 49 insertions(+), 38 deletions(-) diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index e8fce549bc..b06617f699 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -54,6 +54,9 @@ type Client struct { // Chip Chip chipingress.Client + // durableEmitter persists and resends events to Chip when configured + durableEmitter *DurableEmitter + // Providers LoggerProvider otellog.LoggerProvider TracerProvider oteltrace.TracerProvider @@ -356,6 +359,47 @@ func (c *Client) IsSignerSet() bool { return c.lazySigner != nil && c.lazySigner.IsSet() } +// SetupDurableEmitter replaces client.Emitter with a DualSourceEmitter whose Chip +// sink is a DurableEmitter backed by the supplied store. CloudEvents are persisted +// before async delivery to Chip ingress, so they survive process restarts and chip +// ingress outages. +// +// StartDurableEmitter must be called before emitting events. +func (c *Client) SetupDurableEmitter(store DurableEventStore, retransmit bool, lggr pkglogger.Logger) error { + if c.Chip == nil { + return fmt.Errorf("chip ingress client is nil") + } + if store == nil { + return fmt.Errorf("durable emitter requires a non-nil DurableEventStore") + } + + durableEmitter, err := NewDurableEmitter(store, c.Chip, retransmit, DefaultDurableEmitterConfig(), lggr) + if err != nil { + return fmt.Errorf("failed to create durable emitter: %w", err) + } + + otlpEmitter := NewMessageEmitter(c.MessageLoggerProvider.Logger("durable-emitter")) + dualEmitter, err := NewDualSourceEmitter(durableEmitter, otlpEmitter) + if err != nil { + return fmt.Errorf("failed to create dual source emitter: %w", err) + } + + c.Emitter = dualEmitter + c.durableEmitter = durableEmitter + + lggr.Infow("Durable emitter enabled — all CloudEvent sources use the durable Chip queue") + return nil +} + +// StartDurableEmitter starts durable emitter. Close is handled transatively when Emitter is closed. +func (c *Client) StartDurableEmitter(ctx context.Context) error { + if c.durableEmitter == nil { + return fmt.Errorf("failed to start nil durable emitter; call SetupDurableEmitter first") + } + c.durableEmitter.Start(ctx) + return nil +} + func newOtelResource(cfg Config) (resource *sdkresource.Resource, err error) { extraResources, err := sdkresource.New( context.Background(), diff --git a/pkg/beholder/durable_emitter.go b/pkg/beholder/durable_emitter.go index 2f6d9e62b2..7b031d98fa 100644 --- a/pkg/beholder/durable_emitter.go +++ b/pkg/beholder/durable_emitter.go @@ -103,43 +103,6 @@ func DefaultDurableEmitterConfig() DurableEmitterConfig { } } -// SetupDurableEmitter replaces client.Emitter with a DualSourceEmitter whose Chip -// sink is a DurableEmitter backed by the supplied store. CloudEvents are persisted -// before async delivery to Chip ingress, so they survive process restarts and chip -// ingress outages. -func SetupDurableEmitter(ctx context.Context, client *Client, store DurableEventStore, retransmit bool, lggr logger.Logger) error { - if client == nil { - return errors.New("beholder client not initialized") - } - chipClient := client.Chip - if chipClient == nil { - return errors.New("chip ingress client not available") - } - if _, noop := chipClient.(*chipingress.NoopClient); noop { - return errors.New("chip ingress client is a no-op; configure CL_CHIP_INGRESS_ENDPOINT") - } - if store == nil { - return errors.New("durable emitter requires a non-nil DurableEventStore") - } - - durableEmitter, err := NewDurableEmitter(store, chipClient, retransmit, DefaultDurableEmitterConfig(), lggr) - if err != nil { - return fmt.Errorf("failed to create durable emitter: %w", err) - } - - otlpEmitter := NewMessageEmitter(client.MessageLoggerProvider.Logger("durable-emitter")) - dualEmitter, err := NewDualSourceEmitter(durableEmitter, otlpEmitter) - if err != nil { - return fmt.Errorf("failed to create dual source emitter: %w", err) - } - - durableEmitter.Start(ctx) - client.Emitter = dualEmitter - - lggr.Infow("Durable emitter enabled — all CloudEvent sources use the durable Chip queue") - return nil -} - // DurableEmitter implements Emitter with persistence-backed delivery guarantees. // // Emit writes to a DurableEventStore, returns nil after insert, and enqueues the diff --git a/pkg/loop/server.go b/pkg/loop/server.go index b1658085a5..3e2fdf4f88 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -352,7 +352,11 @@ func (s *Server) startBeholderClient(ctx context.Context, beholderCfg beholder.C if s.DataSource == nil { return fmt.Errorf("durable emitter requires a database connection: set CL_DATABASE_URL") } - err = beholder.SetupDurableEmitter(ctx, beholderClient, beholderstore.New(s.DataSource), false, s.Logger) + err = beholderClient.SetupDurableEmitter(beholderstore.New(s.DataSource), false, s.Logger) + if err != nil { + return err + } + err = beholderClient.StartDurableEmitter(ctx) if err != nil { return err } From dabe6d6f5ae23dbb64358b64ba1a8b062bb27891 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 20 May 2026 13:46:34 -0400 Subject: [PATCH 13/14] Use chip logger --- pkg/beholder/client.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index b06617f699..250758b68f 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -365,7 +365,7 @@ func (c *Client) IsSignerSet() bool { // ingress outages. // // StartDurableEmitter must be called before emitting events. -func (c *Client) SetupDurableEmitter(store DurableEventStore, retransmit bool, lggr pkglogger.Logger) error { +func (c *Client) SetupDurableEmitter(store DurableEventStore, retransmit bool) error { if c.Chip == nil { return fmt.Errorf("chip ingress client is nil") } @@ -373,6 +373,11 @@ func (c *Client) SetupDurableEmitter(store DurableEventStore, retransmit bool, l return fmt.Errorf("durable emitter requires a non-nil DurableEventStore") } + lggr := c.Config.ChipIngressLogger + if lggr == nil { + return fmt.Errorf("chip ingress logger is required for durable emitter setup") + } + durableEmitter, err := NewDurableEmitter(store, c.Chip, retransmit, DefaultDurableEmitterConfig(), lggr) if err != nil { return fmt.Errorf("failed to create durable emitter: %w", err) From 16779f4207389abca8200d62024b9593c3e64a11 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 20 May 2026 13:48:52 -0400 Subject: [PATCH 14/14] Update server.go --- pkg/loop/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/loop/server.go b/pkg/loop/server.go index 3e2fdf4f88..229bfa10f2 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -352,7 +352,7 @@ func (s *Server) startBeholderClient(ctx context.Context, beholderCfg beholder.C if s.DataSource == nil { return fmt.Errorf("durable emitter requires a database connection: set CL_DATABASE_URL") } - err = beholderClient.SetupDurableEmitter(beholderstore.New(s.DataSource), false, s.Logger) + err = beholderClient.SetupDurableEmitter(beholderstore.New(s.DataSource), false) if err != nil { return err }