Skip to content

DurableEmitter: LOOP Plugin Support#2073

Draft
DylanTinianov wants to merge 14 commits into
mainfrom
CRE-3933-loop-plugin-support
Draft

DurableEmitter: LOOP Plugin Support#2073
DylanTinianov wants to merge 14 commits into
mainfrom
CRE-3933-loop-plugin-support

Conversation

@DylanTinianov
Copy link
Copy Markdown
Contributor

@DylanTinianov DylanTinianov commented May 19, 2026

Sets up DurableEmitter for LOOP Plugins in the Server and implements the following:

  • Extracts event store from core to common for reuse in LOOPs
  • Renames isHostProcess to retransmit
  • Extracts SetupDurableEmitter to common for reusability between core application and LOOPs
  • Adds env config for enabling

@DylanTinianov DylanTinianov self-assigned this May 19, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 19, 2026

✅ API Diff Results - github.com/smartcontractkit/chainlink-common

✅ Compatible Changes (4)

package github (1)
  • com/smartcontractkit/chainlink-common/pkg/beholder/beholderstore — ➕ Added
pkg/beholder.(*Client) (2)
  • SetupDurableEmitter — ➕ Added

  • StartDurableEmitter — ➕ Added

pkg/loop.EnvConfig (1)
  • ChipIngressDurableEmitterEnabled — ➕ Added

📄 View full apidiff report

@DylanTinianov DylanTinianov marked this pull request as ready for review May 19, 2026 19:23
@DylanTinianov DylanTinianov requested review from a team as code owners May 19, 2026 19:23
Comment thread pkg/beholder/durable_event_store_pg.go Outdated
@DylanTinianov DylanTinianov requested a review from jmank88 May 19, 2026 19:57
Comment thread pkg/beholder/pgstore/store.go Outdated
@DylanTinianov DylanTinianov requested a review from jmank88 May 20, 2026 13:28
Comment thread pkg/loop/server.go Outdated
Comment thread pkg/beholder/durable_emitter.go Outdated
Comment thread pkg/loop/server.go Outdated
Comment thread pkg/beholder/durable_emitter.go Outdated
Comment thread pkg/beholder/durable_emitter.go Outdated
Comment thread pkg/beholder/durable_emitter.go Outdated
@DylanTinianov DylanTinianov requested a review from jmank88 May 20, 2026 18:04
@DylanTinianov DylanTinianov enabled auto-merge May 20, 2026 18:15
@DylanTinianov DylanTinianov requested a review from tarcisiozf May 20, 2026 18:21
@DylanTinianov DylanTinianov requested a review from a team May 20, 2026 18:47
@patrickhuie19
Copy link
Copy Markdown
Contributor

@mathewdgardner FYI

@pkcll
Copy link
Copy Markdown
Contributor

pkcll commented May 20, 2026

Summary

This PR enables DurableEmitter for LOOP plugins by:

  • Extracting the event store from core to pkg/beholder/beholderstore
  • Renaming isHostProcessretransmit for clarity
  • Adding SetupDurableEmitter / StartDurableEmitter to Client for reuse
  • Adding CL_CHIP_INGRESS_DURABLE_EMITTER_ENABLED env config
  • Moving DB initialization earlier in Server.start() so it's available for durable emitter setup

Stats: 6 files, +343 / -61 | 14 commits | Mergeable state: blocked


Batching Logic Gaps (vs batch_emitter_service.go / batch/client.go)

🔴 Critical: No gRPC Request Size Splitting

batch/client.go has splitMessagesByRequestSize ensuring each PublishBatch RPC stays under maxGRPCRequestSize (default 10MB). DurableEmitter's flushBatch has no equivalent protection. A batch of large CloudEvents could exceed gRPC's maximum message size and fail the entire batch.

for _, batchMessages := range splitMessagesByRequestSize(messages, b.maxGRPCRequestSize) {
    // each sub-batch is guaranteed <= maxGRPCRequestSize
}

vs DurableEmitter which sends the entire collected batch without size checks:

if d.rawConn != nil {
    err = d.flushBatchRaw(pubCtx, batch)
} else {
    err = d.flushBatchTyped(pubCtx, batch)
}

🔴 Critical: No Seqnum Stamping

batch/client.go stamps a per-(source, type) seqnum extension on every event before publishing. DurableEmitter persists and publishes events without seqnum, meaning downstream consumers lose ordering guarantees for events flowing through the durable path.

seq := b.seqnumFor(eventToQueue.Source, eventToQueue.Type)
eventToQueue.Attributes["seqnum"] = &cepb.CloudEventAttributeValue{
    Attr: &cepb.CloudEventAttributeValue_CeString{
        CeString: strconv.FormatUint(seq, 10),
    },
}

🟡 Medium: No Shutdown Timeout

batch/client.go enforces a shutdownTimeout with forced close. DurableEmitter's Close() calls d.wg.Wait() with no deadline — if a MarkDeliveredBatch DB call hangs, shutdown blocks indefinitely:

func (d *DurableEmitter) Close() error {
    // ...
    close(d.stopCh)
    d.wg.Wait()      // ← no timeout
    close(d.publishCh)
    return nil
}

🟡 Medium: Unbounded MarkDelivered Goroutines

Each successful flushBatch spawns a new d.wg.Go(...) for MarkDeliveredBatch. Under high throughput with slow DB, goroutine count grows unbounded. batch/client.go limits concurrency with maxConcurrentSends semaphore.

🟡 Medium: LOOP Plugin Orphan Risk with retransmit=false

In server.go, the LOOP plugin sets retransmit=false:

err = beholderClient.SetupDurableEmitter(beholderstore.New(s.DataSource), false)

If publishCh is full, events are left in DB relying on the host process to retransmit them. This is architecturally correct, but there's no validation that the host process is actually running with retransmit=true over the same database. If it isn't, events silently accumulate until EventTTL expires them.


Other Observations

StartDurableEmitter ordering in server.go

StartDurableEmitter is called before beholderClient.Start(ctx). Since they use separate gRPC clients this is likely fine, but worth a comment explaining the ordering contract.

durable_event_store.goDeleteExpired unbounded DELETE

The CTE DELETE FROM ... WHERE created_at <= now() - $1::interval has no LIMIT, unlike PurgeDelivered. For tables with millions of expired rows, this could cause long-running transactions and bloat.


Merge Readiness

Factor Status
Tests updated ✅ Renames covered
New store code tested ❓ No test file for beholderstore in this PR
gRPC size safety ❌ Missing
Seqnum parity ❌ Missing

Recommendation: Address the gRPC size splitting and seqnum gaps before merge — both are correctness issues that could cause silent data loss or ordering violations in production.


@pkcll
Copy link
Copy Markdown
Contributor

pkcll commented May 20, 2026

Suggestion to consolidate batching logic

DurableEmitter as DB-persistence layer + BatchEmitterService as transport layer

Current design — DurableEmitter reimplements batching from scratch:

Emit() → DB Insert → publishCh → batchPublishLoop (linger) → flushBatch → gRPC PublishBatch
                                                                         → MarkDeliveredBatch

Proposed design — DurableEmitter delegates transport to BatchEmitterService:

Emit() → DB Insert → BatchEmitterService.EmitWithCallback(payload, onDelivered)
                                            ↓
                              batch.Client handles: seqnum, size splitting,
                              concurrency, shutdown timeout
                                            ↓
                              callback(nil) → MarkDeliveredBatch

Why it fits naturally

  1. EmitWithCallback already existsChipIngressBatchEmitterService exposes exactly the contract DurableEmitter needs: "send this event, tell me when it lands or fails." That callback is the perfect hook for MarkDeliveredBatch.

  2. Eliminates all the gaps I flagged — seqnum stamping, gRPC size splitting, concurrency limiting, and shutdown timeouts come free from batch.Client.

  3. DurableEmitter becomes purely a durability concern — its responsibilities shrink to:

    • Persist to DB on Emit (at-least-once guarantee)
    • Forward to batch emitter (happy path)
    • Retransmit loop re-emits stale rows through the same batch emitter
    • Purge/expiry lifecycle
  4. Retransmit simplifies — instead of a custom publishCh + drain logic, retransmitBatch just calls batchEmitter.EmitWithCallback(...) for each pending row. The batch emitter handles the rest.

Tradeoffs to consider

Concern Assessment
Zero-copy raw gRPC path Lost — but could be added as a batch.Client option for everyone's benefit
Extra channel hop Minimal latency; one buffer instead of two is actually simpler
Buffer-full drops batch.Client drops silently when buffer is full — but for DurableEmitter this is fine since retransmit loop will retry from DB
Coupling DurableEmitter would depend on the batch emitter interface rather than the raw chipingress.Client — tighter coupling but to an internal package

Sketch

type DurableEmitter struct {
    store       DurableEventStore
    batchEmitter interface {
        EmitWithCallback(ctx context.Context, body []byte, cb func(error), attrKVs ...any) error
    }
    // retransmit, cfg, stopCh, wg, metrics — same as today
}

func (d *DurableEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
    payload := serialize(body, attrKVs...)
    id, err := d.store.Insert(ctx, payload)
    if err != nil {
        return err
    }

    // Fire-and-forget through batch emitter; callback marks delivered
    _ = d.batchEmitter.EmitWithCallback(ctx, body, func(sendErr error) {
        if sendErr == nil {
            d.store.MarkDelivered(context.Background(), id)
        }
    }, attrKVs...)
    return nil
}

Bottom line: DurableEmitter currently duplicates ~300 lines of batching logic that batch.Client already handles better. Wrapping the batch emitter would make DurableEmitter a thin persistence/retry layer, which is what it conceptually is. The main cost is losing the zero-copy optimization, which could be upstreamed into batch.Client as a separate concern.

@DylanTinianov DylanTinianov disabled auto-merge May 20, 2026 19:54
Copy link
Copy Markdown
Contributor

@pkcll pkcll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DylanTinianov, Batching logic gaps DurableEmitter is missing gRPC request size splitting (risks exceeding max message size) and seqnum stamping (breaks ordering guarantees for downstream consumers). Also flagged: no shutdown timeout in Close() and unbounded MarkDelivered goroutines. Please address before merge.

@pkcll
Copy link
Copy Markdown
Contributor

pkcll commented May 20, 2026

Proposal: Compose with BatchEmitterService
Rather than reimplementing batching, DurableEmitter could delegate transport to ChipIngressBatchEmitterService via EmitWithCallback. This would close the gaps above by construction and reduce ~300 lines of duplicated linger/flush/drain logic.

@DylanTinianov, @patrickhuie19, @jmank88, would appreciate your thoughts on feasibility.

@DylanTinianov
Copy link
Copy Markdown
Contributor Author

DylanTinianov commented May 20, 2026

@pkcll Thanks for the comments! Added tickets for these to our EPIC. Will discuss with @patrickhuie19 and @tarcisiozf to scope this out.

This PR is still needed for LOOP support, regardless of the implementation changes.

Also beholderstore is tested in chainlink repo since it relies on the core DB migrations.

@pkcll
Copy link
Copy Markdown
Contributor

pkcll commented May 20, 2026

@DylanTinianov, this one needs to be addressed for sure before merging Critical: No gRPC Request Size Splitting

@DylanTinianov
Copy link
Copy Markdown
Contributor Author

DylanTinianov commented May 20, 2026

Also @jmank88 let's discuss where the setup code should live.

Some concerns from @mathewdgardner :

I don't think this should be touching the beholder client
My concern is about code ownership really
Also, the durable emitter is about durable events and not telemetry. Beholder is about telemetry and I'd like to keep those things separate

We could revert back to the helper function I had initially? Although I agree the implementation is cleaner on the client.

Comment thread pkg/loop/server.go
Comment on lines +351 to +363
if s.EnvConfig.ChipIngressDurableEmitterEnabled {
if s.DataSource == nil {
return fmt.Errorf("durable emitter requires a database connection: set CL_DATABASE_URL")
}
err = beholderClient.SetupDurableEmitter(beholderstore.New(s.DataSource), false)
if err != nil {
return err
}
err = beholderClient.StartDurableEmitter(ctx)
if err != nil {
return err
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DylanTinianov

pkg/loop/server.go line 359 beholderClient.StartDurableEmitter(ctx)

DurableEmitter has a manual Start(ctx) / Close() lifecycle that doesn't implement services.Service. This means:

  1. No health reporting it can't be registered with s.checker, so a stuck retransmit or publish loop is invisible to health checks (have not done for ChipIngressBatchEmitterService yet)
  2. Fragile ordering StartDurableEmitter is called before beholderClient.Start(ctx), but shutdown relies on beholderClient.Close() transitively closing it. This implicit coupling is easy to break.
  3. No idempotency / state guards services.Service (via services.Engine) gives you IfStarted, IfNotStopped, ready/healthy state for free.

Consider having DurableEmitter implement services.Service (like ChipIngressBatchEmitterService already does)

This would also let you remove SetupDurableEmitter + StartDurableEmitter as separate methods on Client just expose the service and let the caller manage its lifecycle uniformly.

Comment thread pkg/loop/config.go
ChipIngressBatchEmitterEnabled bool
ChipIngressEndpoint string
ChipIngressInsecureConnection bool
ChipIngressBatchEmitterEnabled bool
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DylanTinianov

Need to add guard or mutual exclusion for these two ChipIngressBatchEmitterEnabled, ChipIngressDurableEmitterEnabled

If both are enabled it should either error out or document precedence.

They conflict silently. Here's what happens:

  1. NewClient sees ChipIngressBatchEmitterEnabled=true → creates ChipIngressBatchEmitterService as the chip emitter, wraps it in DualSourceEmitter, assigns to client.Emitter.

  2. Then SetupDurableEmitter (from this PR) overwrites client.Emitter entirely:

c.Emitter = dualEmitter   // replaces the previously-assigned DualSourceEmitter
c.durableEmitter = durableEmitter

So with both enabled:

  • The BatchEmitterService is still created and started as a sub-service (lifecycle managed by the engine) but nothing emits to it — it's orphaned.
  • DurableEmitter uses its own internal batching against the raw chipingress.Client, bypassing the batch emitter entirely.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, created a ticket for this as well.

Copy link
Copy Markdown

@mathewdgardner mathewdgardner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DylanTinianov the beholder client is for telemetry which is best effort and isn't durable by design. For the use cases where we want message delivery guarantees like some valuable events, we should use the chip-ingress gRPC client directly and outside of the beholder pkg. This also means that the chip-ingress client is a dependency for your new event emitter.

We should keep telemetry use cases separate from the durable event use cases.

@DylanTinianov DylanTinianov marked this pull request as draft May 21, 2026 14:00
cfg DurableEmitterConfig
log logger.Logger

metrics *durableEmitterMetrics
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DylanTinianov

Observability Comparison: DurableEmitter vs BatchEmitterService + batch.Client

Metrics Coverage

Concern BatchEmitterService + batch.Client DurableEmitter
Events sent (success) chip_ingress.events_sent per domain/entity beholder.durable_emitter.emit.success (count only, no domain/entity breakdown)
Events dropped/failed chip_ingress.events_dropped per domain/entity beholder.durable_emitter.emit.failure (no domain/entity)
Batch send requests chip_ingress.batch.send_requests_total with status=success/failure ❌ No per-RPC request counter
Request size (messages) chip_ingress.batch.request_size_messages histogram with max_batch_size attr ❌ Not tracked
Request size (bytes) chip_ingress.batch.request_size_bytes histogram with max_grpc_request_size attr ❌ Not tracked
Request latency chip_ingress.batch.request_latency_ms histogram with status ⚠️ Only via hooks (OnBatchPublish) — not exported as a metric instrument
Config info gauge chip_ingress.batch.config.info with all config attrs ❌ Not tracked
Queue depth ❌ Not tracked (fire-and-forget buffer) queueDepth / queueDepthMax gauges
Emit latency (caller-facing) ❌ Not tracked emit.duration + emit.total_duration histograms
DB store operations N/A (no DB) ✅ Instrumented store wrapper (insert/mark/purge durations)
Near-TTL / expiry N/A expiredPurged, NearTTLCount via ObserveDurableQueue
Publish batch events OK/err Implicit via sent/dropped counters publishBatchEvOK / publishBatchEvErr

Key Gaps in DurableEmitter

  1. No per-domain/entity attribution — BatchEmitterService tags every metric with domain + entity via metricAttrsFor(). DurableEmitter metrics are flat counters with no cardinality, making it impossible to identify which event source is failing.

  2. No request size observability — batch.Client records both message count and byte size histograms per send. DurableEmitter has no visibility into whether batches are approaching gRPC limits (which ties back to the missing size-splitting gap).

  3. No send request counter with status — batch.Client's send_requests_total with success/failure lets you compute error rate. DurableEmitter only logs failures; no metric-based alerting possible on publish RPC error rate.

  4. No config info metric — batch.Client emits a gauge with all configuration parameters (batch size, buffer size, timeouts, etc.) for runtime introspection. DurableEmitter has no equivalent — you can't verify running config from metrics alone.

  5. Latency only via hooks, not instrumentsOnBatchPublish callback provides latency to test code but doesn't register an OTel histogram. You can't dashboard or alert on publish latency without custom wiring.

Where DurableEmitter is Better

  • Queue depth visibility — critical for a persistence-backed system. You can alert on backlog growth.
  • Emit-path latency — tracks how long the caller blocks on DB insert, useful for detecting DB pressure.
  • Store-layer instrumentation — the metrics-instrumented store wrapper gives per-operation DB latency.

Logging

Concern batch.Client DurableEmitter
Publish failure Errorw("failed to publish batch") Warnw("PublishBatch failed, events will be retransmitted")
Buffer full Silent (returns error to caller) Warnw("batch publish channel full, relying on retransmit")
Retransmit activity N/A ✅ Logs enqueue counts, skipped, pending depth
Shutdown timeout Warnw("timed out waiting for shutdown") ❌ No timeout exists
Config at startup Via gauge metric only Via Infow log lines (coalescing, raw-codec)

Summary

BatchEmitterService has better transport-layer observability (request sizes, latency histograms, per-domain attribution, config gauges). DurableEmitter has better persistence-layer observability (queue depth, DB operation metrics, emit latency). If DurableEmitter composed over BatchEmitterService, you'd get both layers covered without duplication.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants