diff --git a/pkg/beholder/beholderstore/durable_event_store.go b/pkg/beholder/beholderstore/durable_event_store.go new file mode 100644 index 000000000..2bed8b8be --- /dev/null +++ b/pkg/beholder/beholderstore/durable_event_store.go @@ -0,0 +1,209 @@ +// Package beholderstore provides a Postgres-backed implementation of +// beholder.DurableEventStore. It is kept in a sibling package to pkg/beholder +// so that consumers of the beholder API (including builds targeting wasip1) +// do not transitively import lib/pq. +package beholderstore + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/lib/pq" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" +) + +const chipDurableEventsTable = "cre.chip_durable_events" + +// Store is a Postgres-backed implementation of beholder.DurableEventStore. +type Store struct { + ds sqlutil.DataSource +} + +var ( + _ beholder.DurableEventStore = (*Store)(nil) + _ beholder.DurableQueueObserver = (*Store)(nil) + _ beholder.BatchInserter = (*Store)(nil) +) + +// New returns a Postgres-backed DurableEventStore bound to ds. +func New(ds sqlutil.DataSource) *Store { + return &Store{ds: ds} +} + +func (s *Store) Insert(ctx context.Context, payload []byte) (int64, error) { + const q = `INSERT INTO ` + chipDurableEventsTable + ` (payload) VALUES ($1) RETURNING id` + var id int64 + if err := s.ds.GetContext(ctx, &id, q, payload); err != nil { + return 0, fmt.Errorf("failed to insert chip durable event: %w", err) + } + return id, nil +} + +func (s *Store) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error) { + if len(payloads) == 0 { + return nil, nil + } + placeholders := make([]string, len(payloads)) + args := make([]interface{}, len(payloads)) + for i, p := range payloads { + placeholders[i] = fmt.Sprintf("($%d)", i+1) + args[i] = p + } + q := fmt.Sprintf( + "INSERT INTO %s (payload) VALUES %s RETURNING id", + chipDurableEventsTable, + strings.Join(placeholders, ","), + ) + + var ids []int64 + if err := s.ds.SelectContext(ctx, &ids, q, args...); err != nil { + return nil, fmt.Errorf("failed to batch insert chip durable events: %w", err) + } + return ids, nil +} + +func (s *Store) Delete(ctx context.Context, id int64) error { + const q = `DELETE FROM ` + chipDurableEventsTable + ` WHERE id = $1` + if _, err := s.ds.ExecContext(ctx, q, id); err != nil { + return fmt.Errorf("failed to delete chip durable event id=%d: %w", id, err) + } + return nil +} + +func (s *Store) MarkDelivered(ctx context.Context, id int64) error { + const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = $1 AND delivered_at IS NULL` + if _, err := s.ds.ExecContext(ctx, q, id); err != nil { + return fmt.Errorf("failed to mark chip durable event delivered id=%d: %w", id, err) + } + return nil +} + +func (s *Store) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error) { + if len(ids) == 0 { + return 0, nil + } + const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = ANY($1) AND delivered_at IS NULL` + res, err := s.ds.ExecContext(ctx, q, pq.Array(ids)) + if err != nil { + return 0, fmt.Errorf("failed to batch mark chip durable events delivered: %w", err) + } + n, _ := res.RowsAffected() + return n, nil +} + +func (s *Store) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) { + if batchLimit <= 0 { + return 0, nil + } + const q = ` +WITH picked AS ( + SELECT id FROM ` + chipDurableEventsTable + ` + WHERE delivered_at IS NOT NULL + ORDER BY delivered_at ASC + LIMIT $1 +) +DELETE FROM ` + chipDurableEventsTable + ` AS t +USING picked WHERE t.id = picked.id` + res, err := s.ds.ExecContext(ctx, q, batchLimit) + if err != nil { + return 0, fmt.Errorf("failed to purge delivered chip durable events: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return 0, fmt.Errorf("purge delivered rows affected: %w", err) + } + return n, nil +} + +func (s *Store) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]beholder.DurableEvent, error) { + const q = ` +SELECT id, payload, created_at +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL + AND created_at < $1 +ORDER BY created_at ASC +LIMIT $2` + + type row struct { + ID int64 `db:"id"` + Payload []byte `db:"payload"` + CreatedAt time.Time `db:"created_at"` + } + + var rows []row + if err := s.ds.SelectContext(ctx, &rows, q, createdBefore, limit); err != nil { + return nil, fmt.Errorf("failed to list pending chip durable events: %w", err) + } + + out := make([]beholder.DurableEvent, 0, len(rows)) + for _, r := range rows { + out = append(out, beholder.DurableEvent{ + ID: r.ID, + Payload: r.Payload, + CreatedAt: r.CreatedAt, + }) + } + return out, nil +} + +func (s *Store) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) { + const q = ` +WITH deleted AS ( + DELETE FROM ` + chipDurableEventsTable + ` + WHERE created_at <= now() - $1::interval + RETURNING id +) +SELECT count(*) FROM deleted` + + var count int64 + if err := s.ds.GetContext(ctx, &count, q, ttl.String()); err != nil { + return 0, fmt.Errorf("failed to delete expired chip durable events: %w", err) + } + return count, nil +} + +type chipDurableQueueAgg struct { + Cnt int64 `db:"cnt"` + PayloadSum int64 `db:"payload_sum"` + MinCreated *time.Time `db:"min_created"` +} + +// ObserveDurableQueue implements beholder.DurableQueueObserver for queue depth / age gauges. +func (s *Store) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (beholder.DurableQueueStats, error) { + const qAgg = ` +SELECT + count(*)::bigint AS cnt, + coalesce(sum(octet_length(payload)), 0)::bigint AS payload_sum, + min(created_at) AS min_created +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL` + + var row chipDurableQueueAgg + if err := s.ds.GetContext(ctx, &row, qAgg); err != nil { + return beholder.DurableQueueStats{}, fmt.Errorf("durable queue aggregate: %w", err) + } + var st beholder.DurableQueueStats + st.Depth = row.Cnt + st.PayloadBytes = row.PayloadSum + if row.MinCreated != nil { + st.OldestPendingAge = time.Since(*row.MinCreated) + } + if eventTTL > 0 && nearExpiryLead > 0 && nearExpiryLead < eventTTL { + ttlSec := int64(eventTTL.Round(time.Second) / time.Second) + leadSec := int64(nearExpiryLead.Round(time.Second) / time.Second) + const qNear = ` +SELECT count(*)::bigint +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL + AND created_at >= now() - ($1::bigint * interval '1 second') + AND created_at < now() - (($1::bigint - $2::bigint) * interval '1 second')` + if err := s.ds.GetContext(ctx, &st.NearTTLCount, qNear, ttlSec, leadSec); err != nil { + return beholder.DurableQueueStats{}, fmt.Errorf("durable queue near-ttl: %w", err) + } + } + return st, nil +} diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index e8fce549b..250758b68 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -54,6 +54,9 @@ type Client struct { // Chip Chip chipingress.Client + // durableEmitter persists and resends events to Chip when configured + durableEmitter *DurableEmitter + // Providers LoggerProvider otellog.LoggerProvider TracerProvider oteltrace.TracerProvider @@ -356,6 +359,52 @@ func (c *Client) IsSignerSet() bool { return c.lazySigner != nil && c.lazySigner.IsSet() } +// SetupDurableEmitter replaces client.Emitter with a DualSourceEmitter whose Chip +// sink is a DurableEmitter backed by the supplied store. CloudEvents are persisted +// before async delivery to Chip ingress, so they survive process restarts and chip +// ingress outages. +// +// StartDurableEmitter must be called before emitting events. +func (c *Client) SetupDurableEmitter(store DurableEventStore, retransmit bool) error { + if c.Chip == nil { + return fmt.Errorf("chip ingress client is nil") + } + if store == nil { + return fmt.Errorf("durable emitter requires a non-nil DurableEventStore") + } + + lggr := c.Config.ChipIngressLogger + if lggr == nil { + return fmt.Errorf("chip ingress logger is required for durable emitter setup") + } + + durableEmitter, err := NewDurableEmitter(store, c.Chip, retransmit, DefaultDurableEmitterConfig(), lggr) + if err != nil { + return fmt.Errorf("failed to create durable emitter: %w", err) + } + + otlpEmitter := NewMessageEmitter(c.MessageLoggerProvider.Logger("durable-emitter")) + dualEmitter, err := NewDualSourceEmitter(durableEmitter, otlpEmitter) + if err != nil { + return fmt.Errorf("failed to create dual source emitter: %w", err) + } + + c.Emitter = dualEmitter + c.durableEmitter = durableEmitter + + lggr.Infow("Durable emitter enabled — all CloudEvent sources use the durable Chip queue") + return nil +} + +// StartDurableEmitter starts durable emitter. Close is handled transatively when Emitter is closed. +func (c *Client) StartDurableEmitter(ctx context.Context) error { + if c.durableEmitter == nil { + return fmt.Errorf("failed to start nil durable emitter; call SetupDurableEmitter first") + } + c.durableEmitter.Start(ctx) + return nil +} + func newOtelResource(cfg Config) (resource *sdkresource.Resource, err error) { extraResources, err := sdkresource.New( context.Background(), diff --git a/pkg/beholder/durable_emitter.go b/pkg/beholder/durable_emitter.go index 222ea4c36..7b031d98f 100644 --- a/pkg/beholder/durable_emitter.go +++ b/pkg/beholder/durable_emitter.go @@ -135,11 +135,12 @@ type insertResult struct { type DurableEmitter struct { store DurableEventStore client chipingress.Client - // isHostProcess determines if the emitter runs retransmit and cleanup loops. - // Should be set to false when initialized inside LOOP plugins. - isHostProcess bool - cfg DurableEmitterConfig - log logger.Logger + // retransmit determines if the emitter runs the retransmit, expiry, and purge + // loops over the shared queue. Should be set to false when initialized inside + // LOOP plugins so a single host process owns reclamation and cleanup. + retransmit bool + cfg DurableEmitterConfig + log logger.Logger metrics *durableEmitterMetrics @@ -225,7 +226,7 @@ var _ Emitter = (*DurableEmitter)(nil) func NewDurableEmitter( store DurableEventStore, client chipingress.Client, - isHostProcess bool, + retransmit bool, cfg DurableEmitterConfig, log logger.Logger, ) (*DurableEmitter, error) { @@ -251,13 +252,13 @@ func NewDurableEmitter( store = newMetricsInstrumentedStore(store, m) } d := &DurableEmitter{ - store: store, - client: client, - isHostProcess: isHostProcess, - cfg: cfg, - log: log, - metrics: m, - stopCh: make(chan struct{}), + store: store, + client: client, + retransmit: retransmit, + cfg: cfg, + log: log, + metrics: m, + stopCh: make(chan struct{}), } if cp, ok := client.(grpcConnProvider); ok { d.rawConn = cp.Conn() @@ -302,7 +303,7 @@ func (d *DurableEmitter) Start(_ context.Context) { insertWorkers = 4 } - if d.isHostProcess { + if d.retransmit { d.wg.Go(d.retransmitLoop) if !d.cfg.DisablePruning { d.wg.Go(d.expiryLoop) @@ -819,13 +820,13 @@ func (d *DurableEmitter) retransmitPending() { return } - d.retransmit(pending) + d.retransmitBatch(pending) } -// retransmit enqueues pending DB rows to publishCh so the batch workers handle -// publishing. When rawConn is set, payloads are passed through without +// retransmitBatch enqueues pending DB rows to publishCh so the batch workers +// handle publishing. When rawConn is set, payloads are passed through without // proto.Unmarshal — the batch workers use buildBatchBytes for the wire format. -func (d *DurableEmitter) retransmit(pending []DurableEvent) { +func (d *DurableEmitter) retransmitBatch(pending []DurableEvent) { var enqueued int for _, pe := range pending { diff --git a/pkg/beholder/durable_emitter_test.go b/pkg/beholder/durable_emitter_test.go index ad96b5307..0b704d886 100644 --- a/pkg/beholder/durable_emitter_test.go +++ b/pkg/beholder/durable_emitter_test.go @@ -224,7 +224,7 @@ func TestDurableEmitter_HooksPublishFailureSkipsMarkHook(t *testing.T) { assert.Equal(t, int32(0), markCalls.Load()) } -func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) { +func TestDurableEmitter_NoRetransmitSkipsRetransmitAndExpiry(t *testing.T) { store := NewMemDurableEventStore() client := &testChipClient{} client.setPublishErr(errors.New("chip unavailable")) @@ -249,14 +249,14 @@ func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) { return client.batchCount.Load() >= 1 && store.Len() == 1 }, 2*time.Second, 5*time.Millisecond, "initial PublishBatch should fail and leave the row") - // Several host-only ticks would have cleared or retried by now. + // Several retransmit-only ticks would have cleared or retried by now. time.Sleep(250 * time.Millisecond) - assert.Equal(t, 1, store.Len(), "non-host must not run retransmit or expiry loops") - assert.Equal(t, int64(1), client.batchCount.Load(), "non-host must not schedule extra PublishBatch via retransmit") + assert.Equal(t, 1, store.Len(), "retransmit=false must not run retransmit or expiry loops") + assert.Equal(t, int64(1), client.batchCount.Load(), "retransmit=false must not schedule extra PublishBatch via retransmit") } -func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) { +func TestDurableEmitter_NoRetransmitStillDeliversViaBatchWorkers(t *testing.T) { store := NewMemDurableEventStore() client := &testChipClient{} @@ -272,7 +272,7 @@ func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) require.Eventually(t, func() bool { return store.Len() == 0 && client.batchCount.Load() >= 1 - }, 2*time.Second, 10*time.Millisecond, "batch publish workers must still run when isHostProcess is false") + }, 2*time.Second, 10*time.Millisecond, "batch publish workers must still run when retransmit is false") } func TestDurableEmitter_EmitPersistsAndPublishes(t *testing.T) { diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 966be68e6..88851b155 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -84,9 +84,10 @@ const ( envTelemetryMetricCompressor = "CL_TELEMETRY_METRIC_COMPRESSOR" envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR" - envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" - envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" - envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" + envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" + envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" + envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" + envChipIngressDurableEmitterEnabled = "CL_CHIP_INGRESS_DURABLE_EMITTER_ENABLED" envCRESettings = cresettings.EnvNameSettings envCRESettingsDefault = cresettings.EnvNameSettingsDefault @@ -97,9 +98,10 @@ const ( type EnvConfig struct { AppID string - ChipIngressEndpoint string - ChipIngressInsecureConnection bool - ChipIngressBatchEmitterEnabled bool + ChipIngressEndpoint string + ChipIngressInsecureConnection bool + ChipIngressBatchEmitterEnabled bool + ChipIngressDurableEmitterEnabled bool CRESettings string CRESettingsDefault string @@ -139,15 +141,15 @@ type EnvConfig struct { PyroscopePPROFBlockProfileRate int PyroscopePPROFMutexProfileFraction int - TelemetryEnabled bool - TelemetryEndpoint string - TelemetryInsecureConnection bool - TelemetryCACertFile string - TelemetryAttributes OtelAttributes - TelemetryTraceSampleRatio float64 - TelemetryAuthHeaders map[string]string - TelemetryAuthPubKeyHex string - TelemetryAuthHeadersTTL time.Duration + TelemetryEnabled bool + TelemetryEndpoint string + TelemetryInsecureConnection bool + TelemetryCACertFile string + TelemetryAttributes OtelAttributes + TelemetryTraceSampleRatio float64 + TelemetryAuthHeaders map[string]string + TelemetryAuthPubKeyHex string + TelemetryAuthHeadersTTL time.Duration // TelemetryEmitterBatchProcessor maps to beholder Config.EmitterBatchProcessor // (batched async custom-message export vs immediate per-record export). TelemetryEmitterBatchProcessor bool @@ -260,6 +262,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envChipIngressEndpoint, e.ChipIngressEndpoint) add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection)) add(envChipIngressBatchEmitterEnabled, strconv.FormatBool(e.ChipIngressBatchEmitterEnabled)) + add(envChipIngressDurableEmitterEnabled, strconv.FormatBool(e.ChipIngressDurableEmitterEnabled)) if e.CRESettings != "" { add(envCRESettings, e.CRESettings) @@ -495,6 +498,10 @@ func (e *EnvConfig) parse() error { if err != nil { return fmt.Errorf("failed to parse %s: %w", envChipIngressBatchEmitterEnabled, err) } + e.ChipIngressDurableEmitterEnabled, err = getBool(envChipIngressDurableEmitterEnabled) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envChipIngressDurableEmitterEnabled, err) + } } e.CRESettings = os.Getenv(envCRESettings) diff --git a/pkg/loop/server.go b/pkg/loop/server.go index 307603d50..229bfa10f 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -19,6 +19,7 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/beholder/beholderstore" "github.com/smartcontractkit/chainlink-common/pkg/config/build" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -148,6 +149,28 @@ func (s *Server) start(opts ...ServerOpt) error { OnDialError: func(err error) { s.Logger.Errorw("Failed to dial", "err", err) }, } + if s.EnvConfig.DatabaseURL != nil { + pg.SetApplicationName(s.EnvConfig.DatabaseURL.URL(), build.Program) + dbURL := s.EnvConfig.DatabaseURL.URL().String() + var err error + s.db, err = pg.DBConfig{ + IdleInTxSessionTimeout: s.EnvConfig.DatabaseIdleInTxSessionTimeout, + LockTimeout: s.EnvConfig.DatabaseLockTimeout, + MaxOpenConns: s.EnvConfig.DatabaseMaxOpenConns, + MaxIdleConns: s.EnvConfig.DatabaseMaxIdleConns, + EnableTracing: s.EnvConfig.DatabaseTracingEnabled, + }.New(ctx, dbURL, pg.DriverPostgres) + if err != nil { + return fmt.Errorf("error connecting to DataBase: %w", err) + } + s.DataSource = sqlutil.WrapDataSource(s.db, s.Logger, + sqlutil.TimeoutHook(func() time.Duration { return s.EnvConfig.DatabaseQueryTimeout }), + sqlutil.MonitorHook(func() bool { return s.EnvConfig.DatabaseLogSQL })) + + s.dbStatsReporter = pg.NewStatsReporter(s.db.Stats, s.Logger) + s.dbStatsReporter.Start() + } + if s.EnvConfig.TelemetryEndpoint == "" { err := SetupTracing(tracingConfig) if err != nil { @@ -301,28 +324,6 @@ func (s *Server) start(opts ...ServerOpt) error { return fmt.Errorf("error starting health checker: %w", err) } - if s.EnvConfig.DatabaseURL != nil { - pg.SetApplicationName(s.EnvConfig.DatabaseURL.URL(), build.Program) - dbURL := s.EnvConfig.DatabaseURL.URL().String() - var err error - s.db, err = pg.DBConfig{ - IdleInTxSessionTimeout: s.EnvConfig.DatabaseIdleInTxSessionTimeout, - LockTimeout: s.EnvConfig.DatabaseLockTimeout, - MaxOpenConns: s.EnvConfig.DatabaseMaxOpenConns, - MaxIdleConns: s.EnvConfig.DatabaseMaxIdleConns, - EnableTracing: s.EnvConfig.DatabaseTracingEnabled, - }.New(ctx, dbURL, pg.DriverPostgres) - if err != nil { - return fmt.Errorf("error connecting to DataBase: %w", err) - } - s.DataSource = sqlutil.WrapDataSource(s.db, s.Logger, - sqlutil.TimeoutHook(func() time.Duration { return s.EnvConfig.DatabaseQueryTimeout }), - sqlutil.MonitorHook(func() bool { return s.EnvConfig.DatabaseLogSQL })) - - s.dbStatsReporter = pg.NewStatsReporter(s.db.Stats, s.Logger) - s.dbStatsReporter.Start() - } - s.LimitsFactory.Logger = s.Logger.Named("LimitsFactory") if bc := beholder.GetClient(); bc != nil { s.LimitsFactory.Meter = bc.Meter @@ -346,6 +347,21 @@ func (s *Server) startBeholderClient(ctx context.Context, beholderCfg beholder.C if err != nil { return fmt.Errorf("failed to create beholder client: %w", err) } + + if s.EnvConfig.ChipIngressDurableEmitterEnabled { + if s.DataSource == nil { + return fmt.Errorf("durable emitter requires a database connection: set CL_DATABASE_URL") + } + err = beholderClient.SetupDurableEmitter(beholderstore.New(s.DataSource), false) + if err != nil { + return err + } + err = beholderClient.StartDurableEmitter(ctx) + if err != nil { + return err + } + } + if err := beholderClient.Start(ctx); err != nil { return fmt.Errorf("failed to start beholder client: %w", err) }