Skip to content
72 changes: 56 additions & 16 deletions pkg/beholder/durable_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
)

// DurableEmitterConfig configures the DurableEmitter behaviour.
Expand Down Expand Up @@ -103,6 +104,44 @@ func DefaultDurableEmitterConfig() DurableEmitterConfig {
}
}

// SetupDurableEmitter replaces client.Emitter with a DualSourceEmitter whose Chip
// sink is a DurableEmitter backed by a Postgres event store. CloudEvents are
// persisted to the DB before async delivery to Chip ingress, so they survive
// process restarts and chip ingress outages.
func SetupDurableEmitter(ctx context.Context, client *Client, ds sqlutil.DataSource, retransmit bool, lggr logger.Logger) error {
if client == nil {
return errors.New("beholder client not initialized")
}
chipClient := client.Chip
if chipClient == nil {
return errors.New("chip ingress client not available")
}
if _, noop := chipClient.(*chipingress.NoopClient); noop {
return errors.New("chip ingress client is a no-op; configure CL_CHIP_INGRESS_ENDPOINT")
Comment thread
DylanTinianov marked this conversation as resolved.
Outdated
}
if ds == nil {
return errors.New("durable emitter requires a database connection")
}

pgStore := NewPgDurableEventStore(ds)
durableEmitter, err := NewDurableEmitter(pgStore, chipClient, retransmit, DefaultDurableEmitterConfig(), lggr)
if err != nil {
return fmt.Errorf("failed to create durable emitter: %w", err)
}

otlpEmitter := NewMessageEmitter(client.MessageLoggerProvider.Logger("durable-emitter"))
dualEmitter, err := NewDualSourceEmitter(durableEmitter, otlpEmitter)
if err != nil {
return fmt.Errorf("failed to create dual source emitter: %w", err)
}

durableEmitter.Start(ctx)
Comment thread
DylanTinianov marked this conversation as resolved.
Outdated
client.Emitter = dualEmitter

lggr.Infow("Durable emitter enabled — all CloudEvent sources use the durable Chip queue")
return nil
}

// DurableEmitter implements Emitter with persistence-backed delivery guarantees.
//
// Emit writes to a DurableEventStore, returns nil after insert, and enqueues the
Expand Down Expand Up @@ -135,9 +174,10 @@ type insertResult struct {
type DurableEmitter struct {
store DurableEventStore
client chipingress.Client
// isHostProcess determines if the emitter runs retransmit and cleanup loops.
// Should be set to false when initialized inside LOOP plugins.
isHostProcess bool
// retransmit determines if the emitter runs the retransmit, expiry, and purge
// loops over the shared queue. Should be set to false when initialized inside
// LOOP plugins so a single host process owns reclamation and cleanup.
retransmit bool
cfg DurableEmitterConfig
log logger.Logger

Expand Down Expand Up @@ -225,7 +265,7 @@ var _ Emitter = (*DurableEmitter)(nil)
func NewDurableEmitter(
store DurableEventStore,
client chipingress.Client,
isHostProcess bool,
retransmit bool,
cfg DurableEmitterConfig,
log logger.Logger,
) (*DurableEmitter, error) {
Expand All @@ -251,13 +291,13 @@ func NewDurableEmitter(
store = newMetricsInstrumentedStore(store, m)
}
d := &DurableEmitter{
store: store,
client: client,
isHostProcess: isHostProcess,
cfg: cfg,
log: log,
metrics: m,
stopCh: make(chan struct{}),
store: store,
client: client,
retransmit: retransmit,
cfg: cfg,
log: log,
metrics: m,
stopCh: make(chan struct{}),
}
if cp, ok := client.(grpcConnProvider); ok {
d.rawConn = cp.Conn()
Expand Down Expand Up @@ -302,7 +342,7 @@ func (d *DurableEmitter) Start(_ context.Context) {
insertWorkers = 4
}

if d.isHostProcess {
if d.retransmit {
d.wg.Go(d.retransmitLoop)
if !d.cfg.DisablePruning {
d.wg.Go(d.expiryLoop)
Expand Down Expand Up @@ -819,13 +859,13 @@ func (d *DurableEmitter) retransmitPending() {
return
}

d.retransmit(pending)
d.retransmitBatch(pending)
}

// retransmit enqueues pending DB rows to publishCh so the batch workers handle
// publishing. When rawConn is set, payloads are passed through without
// retransmitBatch enqueues pending DB rows to publishCh so the batch workers
// handle publishing. When rawConn is set, payloads are passed through without
// proto.Unmarshal — the batch workers use buildBatchBytes for the wire format.
func (d *DurableEmitter) retransmit(pending []DurableEvent) {
func (d *DurableEmitter) retransmitBatch(pending []DurableEvent) {
var enqueued int

for _, pe := range pending {
Expand Down
12 changes: 6 additions & 6 deletions pkg/beholder/durable_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestDurableEmitter_HooksPublishFailureSkipsMarkHook(t *testing.T) {
assert.Equal(t, int32(0), markCalls.Load())
}

func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) {
func TestDurableEmitter_NoRetransmitSkipsRetransmitAndExpiry(t *testing.T) {
store := NewMemDurableEventStore()
client := &testChipClient{}
client.setPublishErr(errors.New("chip unavailable"))
Expand All @@ -249,14 +249,14 @@ func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) {
return client.batchCount.Load() >= 1 && store.Len() == 1
}, 2*time.Second, 5*time.Millisecond, "initial PublishBatch should fail and leave the row")

// Several host-only ticks would have cleared or retried by now.
// Several retransmit-only ticks would have cleared or retried by now.
time.Sleep(250 * time.Millisecond)

assert.Equal(t, 1, store.Len(), "non-host must not run retransmit or expiry loops")
assert.Equal(t, int64(1), client.batchCount.Load(), "non-host must not schedule extra PublishBatch via retransmit")
assert.Equal(t, 1, store.Len(), "retransmit=false must not run retransmit or expiry loops")
assert.Equal(t, int64(1), client.batchCount.Load(), "retransmit=false must not schedule extra PublishBatch via retransmit")
}

func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) {
func TestDurableEmitter_NoRetransmitStillDeliversViaBatchWorkers(t *testing.T) {
store := NewMemDurableEventStore()
client := &testChipClient{}

Expand All @@ -272,7 +272,7 @@ func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T)

require.Eventually(t, func() bool {
return store.Len() == 0 && client.batchCount.Load() >= 1
}, 2*time.Second, 10*time.Millisecond, "batch publish workers must still run when isHostProcess is false")
}, 2*time.Second, 10*time.Millisecond, "batch publish workers must still run when retransmit is false")
}

func TestDurableEmitter_EmitPersistsAndPublishes(t *testing.T) {
Expand Down
203 changes: 203 additions & 0 deletions pkg/beholder/durable_event_store_pg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package beholder

import (
"context"
"fmt"
"strings"
"time"

"github.com/lib/pq"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
)

const chipDurableEventsTable = "cre.chip_durable_events"

// PgDurableEventStore is a Postgres-backed implementation of DurableEventStore.
type PgDurableEventStore struct {
Comment thread
DylanTinianov marked this conversation as resolved.
Outdated
ds sqlutil.DataSource
}

var (
_ DurableEventStore = (*PgDurableEventStore)(nil)
_ DurableQueueObserver = (*PgDurableEventStore)(nil)
_ BatchInserter = (*PgDurableEventStore)(nil)
)

func NewPgDurableEventStore(ds sqlutil.DataSource) *PgDurableEventStore {
return &PgDurableEventStore{ds: ds}
}

func (s *PgDurableEventStore) Insert(ctx context.Context, payload []byte) (int64, error) {
const q = `INSERT INTO ` + chipDurableEventsTable + ` (payload) VALUES ($1) RETURNING id`
var id int64
if err := s.ds.GetContext(ctx, &id, q, payload); err != nil {
return 0, fmt.Errorf("failed to insert chip durable event: %w", err)
}
return id, nil
}

func (s *PgDurableEventStore) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error) {
if len(payloads) == 0 {
return nil, nil
}
placeholders := make([]string, len(payloads))
args := make([]interface{}, len(payloads))
for i, p := range payloads {
placeholders[i] = fmt.Sprintf("($%d)", i+1)
args[i] = p
}
q := fmt.Sprintf(
"INSERT INTO %s (payload) VALUES %s RETURNING id",
chipDurableEventsTable,
strings.Join(placeholders, ","),
)

var ids []int64
if err := s.ds.SelectContext(ctx, &ids, q, args...); err != nil {
return nil, fmt.Errorf("failed to batch insert chip durable events: %w", err)
}
return ids, nil
}

func (s *PgDurableEventStore) Delete(ctx context.Context, id int64) error {
const q = `DELETE FROM ` + chipDurableEventsTable + ` WHERE id = $1`
if _, err := s.ds.ExecContext(ctx, q, id); err != nil {
return fmt.Errorf("failed to delete chip durable event id=%d: %w", id, err)
}
return nil
}

func (s *PgDurableEventStore) MarkDelivered(ctx context.Context, id int64) error {
const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = $1 AND delivered_at IS NULL`
if _, err := s.ds.ExecContext(ctx, q, id); err != nil {
return fmt.Errorf("failed to mark chip durable event delivered id=%d: %w", id, err)
}
return nil
}

func (s *PgDurableEventStore) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error) {
if len(ids) == 0 {
return 0, nil
}
const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = ANY($1) AND delivered_at IS NULL`
res, err := s.ds.ExecContext(ctx, q, pq.Array(ids))
if err != nil {
return 0, fmt.Errorf("failed to batch mark chip durable events delivered: %w", err)
}
n, _ := res.RowsAffected()
return n, nil
}

func (s *PgDurableEventStore) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) {
if batchLimit <= 0 {
return 0, nil
}
const q = `
WITH picked AS (
SELECT id FROM ` + chipDurableEventsTable + `
WHERE delivered_at IS NOT NULL
ORDER BY delivered_at ASC
LIMIT $1
)
DELETE FROM ` + chipDurableEventsTable + ` AS t
USING picked WHERE t.id = picked.id`
res, err := s.ds.ExecContext(ctx, q, batchLimit)
if err != nil {
return 0, fmt.Errorf("failed to purge delivered chip durable events: %w", err)
}
n, err := res.RowsAffected()
if err != nil {
return 0, fmt.Errorf("purge delivered rows affected: %w", err)
}
return n, nil
}

func (s *PgDurableEventStore) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error) {
const q = `
SELECT id, payload, created_at
FROM ` + chipDurableEventsTable + `
WHERE delivered_at IS NULL
AND created_at < $1
ORDER BY created_at ASC
LIMIT $2`

type row struct {
ID int64 `db:"id"`
Payload []byte `db:"payload"`
CreatedAt time.Time `db:"created_at"`
}

var rows []row
if err := s.ds.SelectContext(ctx, &rows, q, createdBefore, limit); err != nil {
return nil, fmt.Errorf("failed to list pending chip durable events: %w", err)
}

out := make([]DurableEvent, 0, len(rows))
for _, r := range rows {
out = append(out, DurableEvent{
ID: r.ID,
Payload: r.Payload,
CreatedAt: r.CreatedAt,
})
}
return out, nil
}

func (s *PgDurableEventStore) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) {
const q = `
WITH deleted AS (
DELETE FROM ` + chipDurableEventsTable + `
WHERE created_at <= now() - $1::interval
RETURNING id
)
SELECT count(*) FROM deleted`

var count int64
if err := s.ds.GetContext(ctx, &count, q, ttl.String()); err != nil {
return 0, fmt.Errorf("failed to delete expired chip durable events: %w", err)
}
return count, nil
}

type chipDurableQueueAgg struct {
Cnt int64 `db:"cnt"`
PayloadSum int64 `db:"payload_sum"`
MinCreated *time.Time `db:"min_created"`
}

// ObserveDurableQueue implements DurableQueueObserver for queue depth / age gauges.
func (s *PgDurableEventStore) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error) {
const qAgg = `
SELECT
count(*)::bigint AS cnt,
coalesce(sum(octet_length(payload)), 0)::bigint AS payload_sum,
min(created_at) AS min_created
FROM ` + chipDurableEventsTable + `
WHERE delivered_at IS NULL`

var row chipDurableQueueAgg
if err := s.ds.GetContext(ctx, &row, qAgg); err != nil {
return DurableQueueStats{}, fmt.Errorf("durable queue aggregate: %w", err)
}
var st DurableQueueStats
st.Depth = row.Cnt
st.PayloadBytes = row.PayloadSum
if row.MinCreated != nil {
st.OldestPendingAge = time.Since(*row.MinCreated)
}
if eventTTL > 0 && nearExpiryLead > 0 && nearExpiryLead < eventTTL {
ttlSec := int64(eventTTL.Round(time.Second) / time.Second)
leadSec := int64(nearExpiryLead.Round(time.Second) / time.Second)
const qNear = `
SELECT count(*)::bigint
FROM ` + chipDurableEventsTable + `
WHERE delivered_at IS NULL
AND created_at >= now() - ($1::bigint * interval '1 second')
AND created_at < now() - (($1::bigint - $2::bigint) * interval '1 second')`
if err := s.ds.GetContext(ctx, &st.NearTTLCount, qNear, ttlSec, leadSec); err != nil {
return DurableQueueStats{}, fmt.Errorf("durable queue near-ttl: %w", err)
}
}
return st, nil
}
Loading
Loading