-
Notifications
You must be signed in to change notification settings - Fork 29
DurableEmitter: LOOP Plugin Support #2073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 8 commits
038fe22
1837553
64c703a
71a8313
f23047d
601238c
63cb609
972243c
c8bb5e3
a612830
eb1cd2c
5acdb28
dabe6d6
16779f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -103,6 +103,48 @@ func DefaultDurableEmitterConfig() DurableEmitterConfig { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // SetupDurableEmitter replaces client.Emitter with a DualSourceEmitter whose Chip | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // sink is a DurableEmitter backed by the supplied store. CloudEvents are persisted | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // before async delivery to Chip ingress, so they survive process restarts and chip | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // ingress outages. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // store is the persistence layer. Callers in a Postgres environment should pass | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // durable_events.New(ds); the indirection keeps the lib/pq driver out of | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // consumers that only need the beholder API (e.g. wasip1 builds of the workflow | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // runtime). | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func SetupDurableEmitter(ctx context.Context, client *Client, store DurableEventStore, retransmit bool, lggr logger.Logger) error { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
DylanTinianov marked this conversation as resolved.
Outdated
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if client == nil { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return errors.New("beholder client not initialized") | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| chipClient := client.Chip | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if chipClient == nil { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return errors.New("chip ingress client not available") | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if _, noop := chipClient.(*chipingress.NoopClient); noop { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return errors.New("chip ingress client is a no-op; configure CL_CHIP_INGRESS_ENDPOINT") | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
DylanTinianov marked this conversation as resolved.
Outdated
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if store == nil { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return errors.New("durable emitter requires a non-nil DurableEventStore") | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| durableEmitter, err := NewDurableEmitter(store, chipClient, retransmit, DefaultDurableEmitterConfig(), lggr) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err != nil { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("failed to create durable emitter: %w", err) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| otlpEmitter := NewMessageEmitter(client.MessageLoggerProvider.Logger("durable-emitter")) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dualEmitter, err := NewDualSourceEmitter(durableEmitter, otlpEmitter) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err != nil { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("failed to create dual source emitter: %w", err) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| durableEmitter.Start(ctx) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
DylanTinianov marked this conversation as resolved.
Outdated
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| client.Emitter = dualEmitter | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| lggr.Infow("Durable emitter enabled — all CloudEvent sources use the durable Chip queue") | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // DurableEmitter implements Emitter with persistence-backed delivery guarantees. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Emit writes to a DurableEventStore, returns nil after insert, and enqueues the | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -135,11 +177,12 @@ type insertResult struct { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| type DurableEmitter struct { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| store DurableEventStore | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| client chipingress.Client | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // isHostProcess determines if the emitter runs retransmit and cleanup loops. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Should be set to false when initialized inside LOOP plugins. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isHostProcess bool | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cfg DurableEmitterConfig | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log logger.Logger | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // retransmit determines if the emitter runs the retransmit, expiry, and purge | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // loops over the shared queue. Should be set to false when initialized inside | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // LOOP plugins so a single host process owns reclamation and cleanup. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retransmit bool | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cfg DurableEmitterConfig | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log logger.Logger | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metrics *durableEmitterMetrics | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Observability Comparison: DurableEmitter vs BatchEmitterService + batch.ClientMetrics Coverage
Key Gaps in DurableEmitter
Where DurableEmitter is Better
Logging
SummaryBatchEmitterService has better transport-layer observability (request sizes, latency histograms, per-domain attribution, config gauges). DurableEmitter has better persistence-layer observability (queue depth, DB operation metrics, emit latency). If DurableEmitter composed over BatchEmitterService, you'd get both layers covered without duplication. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -225,7 +268,7 @@ var _ Emitter = (*DurableEmitter)(nil) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func NewDurableEmitter( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| store DurableEventStore, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| client chipingress.Client, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isHostProcess bool, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retransmit bool, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cfg DurableEmitterConfig, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log logger.Logger, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) (*DurableEmitter, error) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -251,13 +294,13 @@ func NewDurableEmitter( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| store = newMetricsInstrumentedStore(store, m) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| d := &DurableEmitter{ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| store: store, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| client: client, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isHostProcess: isHostProcess, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cfg: cfg, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log: log, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metrics: m, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stopCh: make(chan struct{}), | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| store: store, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| client: client, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retransmit: retransmit, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cfg: cfg, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log: log, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metrics: m, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stopCh: make(chan struct{}), | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if cp, ok := client.(grpcConnProvider); ok { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| d.rawConn = cp.Conn() | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -302,7 +345,7 @@ func (d *DurableEmitter) Start(_ context.Context) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| insertWorkers = 4 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if d.isHostProcess { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if d.retransmit { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| d.wg.Go(d.retransmitLoop) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !d.cfg.DisablePruning { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| d.wg.Go(d.expiryLoop) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -819,13 +862,13 @@ func (d *DurableEmitter) retransmitPending() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| d.retransmit(pending) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| d.retransmitBatch(pending) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // retransmit enqueues pending DB rows to publishCh so the batch workers handle | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // publishing. When rawConn is set, payloads are passed through without | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // retransmitBatch enqueues pending DB rows to publishCh so the batch workers | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // handle publishing. When rawConn is set, payloads are passed through without | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // proto.Unmarshal — the batch workers use buildBatchBytes for the wire format. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func (d *DurableEmitter) retransmit(pending []DurableEvent) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func (d *DurableEmitter) retransmitBatch(pending []DurableEvent) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var enqueued int | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for _, pe := range pending { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,209 @@ | ||
| // Package durable_events provides a Postgres-backed implementation of | ||
| // beholder.DurableEventStore. It is kept in a sibling package to pkg/beholder | ||
| // so that consumers of the beholder API (including builds targeting wasip1) | ||
| // do not transitively import lib/pq. | ||
| package durable_events | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/lib/pq" | ||
|
|
||
| "github.com/smartcontractkit/chainlink-common/pkg/beholder" | ||
| "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" | ||
| ) | ||
|
|
||
| const chipDurableEventsTable = "cre.chip_durable_events" | ||
|
|
||
| // Store is a Postgres-backed implementation of beholder.DurableEventStore. | ||
| type Store struct { | ||
| ds sqlutil.DataSource | ||
| } | ||
|
|
||
| var ( | ||
| _ beholder.DurableEventStore = (*Store)(nil) | ||
| _ beholder.DurableQueueObserver = (*Store)(nil) | ||
| _ beholder.BatchInserter = (*Store)(nil) | ||
| ) | ||
|
|
||
| // New returns a Postgres-backed DurableEventStore bound to ds. | ||
| func New(ds sqlutil.DataSource) *Store { | ||
| return &Store{ds: ds} | ||
| } | ||
|
|
||
| func (s *Store) Insert(ctx context.Context, payload []byte) (int64, error) { | ||
| const q = `INSERT INTO ` + chipDurableEventsTable + ` (payload) VALUES ($1) RETURNING id` | ||
| var id int64 | ||
| if err := s.ds.GetContext(ctx, &id, q, payload); err != nil { | ||
| return 0, fmt.Errorf("failed to insert chip durable event: %w", err) | ||
| } | ||
| return id, nil | ||
| } | ||
|
|
||
| func (s *Store) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error) { | ||
| if len(payloads) == 0 { | ||
| return nil, nil | ||
| } | ||
| placeholders := make([]string, len(payloads)) | ||
| args := make([]interface{}, len(payloads)) | ||
| for i, p := range payloads { | ||
| placeholders[i] = fmt.Sprintf("($%d)", i+1) | ||
| args[i] = p | ||
| } | ||
| q := fmt.Sprintf( | ||
| "INSERT INTO %s (payload) VALUES %s RETURNING id", | ||
| chipDurableEventsTable, | ||
| strings.Join(placeholders, ","), | ||
| ) | ||
|
|
||
| var ids []int64 | ||
| if err := s.ds.SelectContext(ctx, &ids, q, args...); err != nil { | ||
| return nil, fmt.Errorf("failed to batch insert chip durable events: %w", err) | ||
| } | ||
| return ids, nil | ||
| } | ||
|
|
||
| func (s *Store) Delete(ctx context.Context, id int64) error { | ||
| const q = `DELETE FROM ` + chipDurableEventsTable + ` WHERE id = $1` | ||
| if _, err := s.ds.ExecContext(ctx, q, id); err != nil { | ||
| return fmt.Errorf("failed to delete chip durable event id=%d: %w", id, err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (s *Store) MarkDelivered(ctx context.Context, id int64) error { | ||
| const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = $1 AND delivered_at IS NULL` | ||
| if _, err := s.ds.ExecContext(ctx, q, id); err != nil { | ||
| return fmt.Errorf("failed to mark chip durable event delivered id=%d: %w", id, err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (s *Store) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error) { | ||
| if len(ids) == 0 { | ||
| return 0, nil | ||
| } | ||
| const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = ANY($1) AND delivered_at IS NULL` | ||
| res, err := s.ds.ExecContext(ctx, q, pq.Array(ids)) | ||
| if err != nil { | ||
| return 0, fmt.Errorf("failed to batch mark chip durable events delivered: %w", err) | ||
| } | ||
| n, _ := res.RowsAffected() | ||
| return n, nil | ||
| } | ||
|
|
||
| func (s *Store) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) { | ||
| if batchLimit <= 0 { | ||
| return 0, nil | ||
| } | ||
| const q = ` | ||
| WITH picked AS ( | ||
| SELECT id FROM ` + chipDurableEventsTable + ` | ||
| WHERE delivered_at IS NOT NULL | ||
| ORDER BY delivered_at ASC | ||
| LIMIT $1 | ||
| ) | ||
| DELETE FROM ` + chipDurableEventsTable + ` AS t | ||
| USING picked WHERE t.id = picked.id` | ||
| res, err := s.ds.ExecContext(ctx, q, batchLimit) | ||
| if err != nil { | ||
| return 0, fmt.Errorf("failed to purge delivered chip durable events: %w", err) | ||
| } | ||
| n, err := res.RowsAffected() | ||
| if err != nil { | ||
| return 0, fmt.Errorf("purge delivered rows affected: %w", err) | ||
| } | ||
| return n, nil | ||
| } | ||
|
|
||
| func (s *Store) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]beholder.DurableEvent, error) { | ||
| const q = ` | ||
| SELECT id, payload, created_at | ||
| FROM ` + chipDurableEventsTable + ` | ||
| WHERE delivered_at IS NULL | ||
| AND created_at < $1 | ||
| ORDER BY created_at ASC | ||
| LIMIT $2` | ||
|
|
||
| type row struct { | ||
| ID int64 `db:"id"` | ||
| Payload []byte `db:"payload"` | ||
| CreatedAt time.Time `db:"created_at"` | ||
| } | ||
|
|
||
| var rows []row | ||
| if err := s.ds.SelectContext(ctx, &rows, q, createdBefore, limit); err != nil { | ||
| return nil, fmt.Errorf("failed to list pending chip durable events: %w", err) | ||
| } | ||
|
|
||
| out := make([]beholder.DurableEvent, 0, len(rows)) | ||
| for _, r := range rows { | ||
| out = append(out, beholder.DurableEvent{ | ||
| ID: r.ID, | ||
| Payload: r.Payload, | ||
| CreatedAt: r.CreatedAt, | ||
| }) | ||
| } | ||
| return out, nil | ||
| } | ||
|
|
||
| func (s *Store) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) { | ||
| const q = ` | ||
| WITH deleted AS ( | ||
| DELETE FROM ` + chipDurableEventsTable + ` | ||
| WHERE created_at <= now() - $1::interval | ||
| RETURNING id | ||
| ) | ||
| SELECT count(*) FROM deleted` | ||
|
|
||
| var count int64 | ||
| if err := s.ds.GetContext(ctx, &count, q, ttl.String()); err != nil { | ||
| return 0, fmt.Errorf("failed to delete expired chip durable events: %w", err) | ||
| } | ||
| return count, nil | ||
| } | ||
|
|
||
| type chipDurableQueueAgg struct { | ||
| Cnt int64 `db:"cnt"` | ||
| PayloadSum int64 `db:"payload_sum"` | ||
| MinCreated *time.Time `db:"min_created"` | ||
| } | ||
|
|
||
| // ObserveDurableQueue implements beholder.DurableQueueObserver for queue depth / age gauges. | ||
| func (s *Store) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (beholder.DurableQueueStats, error) { | ||
| const qAgg = ` | ||
| SELECT | ||
| count(*)::bigint AS cnt, | ||
| coalesce(sum(octet_length(payload)), 0)::bigint AS payload_sum, | ||
| min(created_at) AS min_created | ||
| FROM ` + chipDurableEventsTable + ` | ||
| WHERE delivered_at IS NULL` | ||
|
|
||
| var row chipDurableQueueAgg | ||
| if err := s.ds.GetContext(ctx, &row, qAgg); err != nil { | ||
| return beholder.DurableQueueStats{}, fmt.Errorf("durable queue aggregate: %w", err) | ||
| } | ||
| var st beholder.DurableQueueStats | ||
| st.Depth = row.Cnt | ||
| st.PayloadBytes = row.PayloadSum | ||
| if row.MinCreated != nil { | ||
| st.OldestPendingAge = time.Since(*row.MinCreated) | ||
| } | ||
| if eventTTL > 0 && nearExpiryLead > 0 && nearExpiryLead < eventTTL { | ||
| ttlSec := int64(eventTTL.Round(time.Second) / time.Second) | ||
| leadSec := int64(nearExpiryLead.Round(time.Second) / time.Second) | ||
| const qNear = ` | ||
| SELECT count(*)::bigint | ||
| FROM ` + chipDurableEventsTable + ` | ||
| WHERE delivered_at IS NULL | ||
| AND created_at >= now() - ($1::bigint * interval '1 second') | ||
| AND created_at < now() - (($1::bigint - $2::bigint) * interval '1 second')` | ||
| if err := s.ds.GetContext(ctx, &st.NearTTLCount, qNear, ttlSec, leadSec); err != nil { | ||
| return beholder.DurableQueueStats{}, fmt.Errorf("durable queue near-ttl: %w", err) | ||
| } | ||
| } | ||
| return st, nil | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.