diff --git a/actor/backpressure_mailbox.go b/actor/backpressure_mailbox.go index 016b4576dd9..8f5a4ce76e1 100644 --- a/actor/backpressure_mailbox.go +++ b/actor/backpressure_mailbox.go @@ -9,10 +9,31 @@ import ( "github.com/lightningnetwork/lnd/queue" ) +// BackpressureMailboxCfg holds optional configuration for a +// BackpressureMailbox. A zero-value config is valid and disables +// automatic first-drop logging. +type BackpressureMailboxCfg struct { + // Name is a human-readable label included in the first-drop log + // line. When empty, no automatic logging is performed and the + // caller is expected to use FirstDropClaim() to drive its own + // logging. + Name string +} + // BackpressureMailbox implements the Mailbox interface using a // queue.BackpressureQueue as its core buffer. The BackpressureQueue's drop // predicate is consulted on every Send/TrySend, allowing RED-style load // shedding before the mailbox is full. +// +// Every predicate rejection is counted and exposed via Dropped(). Two +// independent one-shot flags exist for first-drop signaling: +// +// - When a Name is configured, the mailbox emits a single info-level +// log the first time the predicate fires (gated by firstLog). +// - FirstDropClaim() exposes a separate one-shot flag (firstDrop) +// for callers that want to emit their own log at the call site. +// +// The two flags are independent: using one does not consume the other. type BackpressureMailbox[M Message, R any] struct { // queue is the underlying backpressure-aware buffer. queue *queue.BackpressureQueue[envelope[M, R]] @@ -20,8 +41,27 @@ type BackpressureMailbox[M Message, R any] struct { // closed tracks whether the mailbox has been closed. closed atomic.Bool - // mu protects Send/TrySend operations to prevent send-on-closed-channel - // panics. Close() acquires write lock, Send/TrySend acquire read lock. + // dropped counts the total number of messages rejected by the + // drop predicate since the mailbox was created. + dropped atomic.Uint64 + + // firstLog is consumed internally by the counting predicate + // wrapper. When Name is set, the wrapper CAS-flips this flag + // on the first rejection and emits an info-level log line. + firstLog atomic.Bool + + // firstDrop is exposed to callers via FirstDropClaim(). It is + // independent of firstLog so that the internal auto-log and an + // external caller-driven log can coexist without racing for + // the same flag. + firstDrop atomic.Bool + + // name is the optional label from BackpressureMailboxCfg.Name. + name string + + // mu protects Send/TrySend operations to prevent + // send-on-closed-channel panics. Close() acquires write lock, + // Send/TrySend acquire read lock. mu sync.RWMutex // closeOnce ensures Close() executes exactly once. @@ -31,25 +71,71 @@ type BackpressureMailbox[M Message, R any] struct { actorCtx context.Context } -// NewBackpressureMailbox creates a new mailbox backed by a BackpressureQueue. -// The shouldDrop function is called with the current queue depth on every send -// attempt; if it returns true the message is silently dropped. +// NewBackpressureMailbox creates a new mailbox backed by a +// BackpressureQueue. The shouldDrop function is called with the current +// queue depth on every send attempt; if it returns true the message is +// dropped and the internal drop counter is incremented. func NewBackpressureMailbox[M Message, R any]( actorCtx context.Context, capacity int, shouldDrop queue.DropCheckFunc, + cfg BackpressureMailboxCfg, ) *BackpressureMailbox[M, R] { if capacity <= 0 { capacity = 1 } - pred := queue.AsDropPredicate[envelope[M, R]](shouldDrop) - - return &BackpressureMailbox[M, R]{ - queue: queue.NewBackpressureQueue(capacity, pred), + mb := &BackpressureMailbox[M, R]{ actorCtx: actorCtx, + name: cfg.Name, + } + + // Wrap the caller's predicate so every rejection increments + // the drop counter and, when a name is configured, emits a + // one-shot info log on the first drop. + inner := queue.AsDropPredicate[envelope[M, R]](shouldDrop) + counting := func(queueLen int, item envelope[M, R]) bool { + if !inner(queueLen, item) { + return false + } + + mb.dropped.Add(1) + + if mb.name != "" && + mb.firstLog.CompareAndSwap(false, true) { + + log.Infof("Mailbox(%s): first message "+ + "dropped (queue_depth=%d)", + mb.name, queueLen) + } + + return true + } + + mb.queue = queue.NewBackpressureQueue(capacity, counting) + + return mb +} + +// Dropped returns the total number of messages rejected by the drop +// predicate since the mailbox was created. +func (m *BackpressureMailbox[M, R]) Dropped() uint64 { + return m.dropped.Load() +} + +// FirstDropClaim atomically returns true exactly once, and only after +// at least one message has actually been dropped by the predicate. It +// is intended for call sites that want to emit a one-shot log or +// metric when the mailbox first starts shedding load. This flag is +// independent of the built-in first-drop log gated by +// BackpressureMailboxCfg.Name; using one does not consume the other. +func (m *BackpressureMailbox[M, R]) FirstDropClaim() bool { + if m.dropped.Load() == 0 { + return false } + + return m.firstDrop.CompareAndSwap(false, true) } // Send attempts to send an envelope to the mailbox. The BackpressureQueue's diff --git a/actor/backpressure_mailbox_test.go b/actor/backpressure_mailbox_test.go index e8ff56007fc..41c3bea7b56 100644 --- a/actor/backpressure_mailbox_test.go +++ b/actor/backpressure_mailbox_test.go @@ -27,7 +27,7 @@ func TestBackpressureMailboxDropsWhenThresholdReached(t *testing.T) { }) mbox := NewBackpressureMailbox[TestMessage, int]( - ctx, capacity, shouldDrop, + ctx, capacity, shouldDrop, BackpressureMailboxCfg{}, ) // Fill up to the drop threshold — these should all succeed. @@ -61,7 +61,7 @@ func TestBackpressureMailboxTrySendDrops(t *testing.T) { }) mbox := NewBackpressureMailbox[TestMessage, int]( - ctx, capacity, shouldDrop, + ctx, capacity, shouldDrop, BackpressureMailboxCfg{}, ) // Fill to threshold. @@ -94,7 +94,7 @@ func TestBackpressureMailboxNeverDropPassesThrough(t *testing.T) { }) mbox := NewBackpressureMailbox[TestMessage, int]( - ctx, capacity, neverDrop, + ctx, capacity, neverDrop, BackpressureMailboxCfg{}, ) // Fill the entire capacity. @@ -120,7 +120,7 @@ func TestBackpressureMailboxDelegatesReceive(t *testing.T) { return false }) mbox := NewBackpressureMailbox[TestMessage, int]( - ctx, capacity, neverDrop, + ctx, capacity, neverDrop, BackpressureMailboxCfg{}, ) // Send two messages. @@ -153,7 +153,7 @@ func TestBackpressureMailboxDelegatesDrain(t *testing.T) { return false }) mbox := NewBackpressureMailbox[TestMessage, int]( - ctx, capacity, neverDrop, + ctx, capacity, neverDrop, BackpressureMailboxCfg{}, ) // Send messages and close. @@ -188,6 +188,7 @@ func TestBackpressureMailboxSendRespectsActorCtx(t *testing.T) { }) mbox := NewBackpressureMailbox[TestMessage, int]( actorCtx, capacity, neverDrop, + BackpressureMailboxCfg{}, ) // Fill the mailbox to capacity. @@ -218,7 +219,7 @@ func TestBackpressureMailboxReceiveAfterClose(t *testing.T) { return false }) mbox := NewBackpressureMailbox[TestMessage, int]( - ctx, capacity, neverDrop, + ctx, capacity, neverDrop, BackpressureMailboxCfg{}, ) mbox.Close() @@ -248,7 +249,7 @@ func TestBackpressureMailboxDrainAfterDrain(t *testing.T) { return false }) mbox := NewBackpressureMailbox[TestMessage, int]( - ctx, capacity, neverDrop, + ctx, capacity, neverDrop, BackpressureMailboxCfg{}, ) // Send one message and close. @@ -289,7 +290,7 @@ func TestBackpressureMailboxConcurrentSendClose(t *testing.T) { }) mbox := NewBackpressureMailbox[TestMessage, int]( - ctx, capacity, neverDrop, + ctx, capacity, neverDrop, BackpressureMailboxCfg{}, ) var wg sync.WaitGroup @@ -356,6 +357,236 @@ func TestBackpressureMailboxConcurrentSendClose(t *testing.T) { require.False(t, mbox.TrySend(env)) } +// TestBackpressureMailboxDroppedCounter verifies that the Dropped counter +// increments for each predicate rejection across Send and TrySend. +func TestBackpressureMailboxDroppedCounter(t *testing.T) { + t.Parallel() + + ctx := context.Background() + const capacity = 10 + const dropThreshold = 3 + + shouldDrop := queue.DropCheckFunc(func(queueLen int) bool { + return queueLen >= dropThreshold + }) + + mbox := NewBackpressureMailbox[TestMessage, int]( + ctx, capacity, shouldDrop, BackpressureMailboxCfg{}, + ) + + require.Zero(t, mbox.Dropped(), "initial drop count must be 0") + + // Fill up to the drop threshold — these should all succeed. + for i := range dropThreshold { + env := envelope[TestMessage, int]{ + message: TestMessage{Value: i}, + } + require.True(t, mbox.Send(ctx, env)) + } + + require.Zero(t, mbox.Dropped(), + "no drops should have occurred yet") + + // Send two more; both should be dropped by the predicate. + for i := range 2 { + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 100 + i}, + } + require.False(t, mbox.Send(ctx, env)) + } + + require.Equal(t, uint64(2), mbox.Dropped()) + + // TrySend another — also dropped. + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 200}, + } + require.False(t, mbox.TrySend(env)) + + require.Equal(t, uint64(3), mbox.Dropped()) +} + +// TestBackpressureMailboxFirstDropClaim verifies that FirstDropClaim +// returns true exactly once, only after at least one message has been +// dropped, and that the flag is independent of the internal first-log +// flag used by the auto-log path. +func TestBackpressureMailboxFirstDropClaim(t *testing.T) { + t.Parallel() + + ctx := context.Background() + const capacity = 10 + const dropThreshold = 2 + + shouldDrop := queue.DropCheckFunc(func(queueLen int) bool { + return queueLen >= dropThreshold + }) + + // Unnamed mailbox: FirstDropClaim is a standalone one-shot + // that only succeeds after a real drop. + mbox := NewBackpressureMailbox[TestMessage, int]( + ctx, capacity, shouldDrop, BackpressureMailboxCfg{}, + ) + + require.False(t, mbox.FirstDropClaim(), + "must not claim before any drop has occurred") + + // Fill to threshold, then trigger one drop. + for i := range dropThreshold { + env := envelope[TestMessage, int]{ + message: TestMessage{Value: i}, + } + require.True(t, mbox.Send(ctx, env)) + } + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 99}, + } + require.False(t, mbox.Send(ctx, env)) + + require.True(t, mbox.FirstDropClaim(), + "first call after a drop should claim the flag") + require.False(t, mbox.FirstDropClaim(), + "second call must return false") + + // Named mailbox: the internal auto-log consumes firstLog, + // but FirstDropClaim uses a separate firstDrop flag, so the + // caller can still claim it independently. + mbox2 := NewBackpressureMailbox[TestMessage, int]( + ctx, capacity, shouldDrop, + BackpressureMailboxCfg{Name: "test-mailbox"}, + ) + + // Fill to threshold so the next send is dropped. + for i := range dropThreshold { + env := envelope[TestMessage, int]{ + message: TestMessage{Value: i}, + } + require.True(t, mbox2.Send(ctx, env)) + } + + // This send triggers the predicate, which internally + // CAS-flips firstLog (because Name is set). + env = envelope[TestMessage, int]{ + message: TestMessage{Value: 99}, + } + require.False(t, mbox2.Send(ctx, env)) + + // FirstDropClaim must still succeed because it uses the + // separate firstDrop flag. + require.True(t, mbox2.FirstDropClaim(), + "firstDrop should be independent of firstLog") + require.False(t, mbox2.FirstDropClaim(), + "second call must return false") + require.Equal(t, uint64(1), mbox2.Dropped()) +} + +// TestBackpressureMailboxNamedVsUnnamed verifies that a named mailbox +// logs on first drop (consuming firstLog) while an unnamed mailbox +// leaves firstLog untouched. Both must count drops identically. +func TestBackpressureMailboxNamedVsUnnamed(t *testing.T) { + t.Parallel() + + ctx := context.Background() + const capacity = 5 + const dropThreshold = 2 + + shouldDrop := queue.DropCheckFunc(func(queueLen int) bool { + return queueLen >= dropThreshold + }) + + // Unnamed mailbox: zero-value config. + unnamed := NewBackpressureMailbox[TestMessage, int]( + ctx, capacity, shouldDrop, BackpressureMailboxCfg{}, + ) + + // Named mailbox. + named := NewBackpressureMailbox[TestMessage, int]( + ctx, capacity, shouldDrop, + BackpressureMailboxCfg{Name: "test"}, + ) + + // Fill both to threshold, then send one more to trigger a + // drop on each. + for _, mb := range []*BackpressureMailbox[TestMessage, int]{ + unnamed, named, + } { + for i := range dropThreshold { + env := envelope[TestMessage, int]{ + message: TestMessage{Value: i}, + } + require.True(t, mb.Send(ctx, env)) + } + + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 99}, + } + require.False(t, mb.Send(ctx, env)) + } + + // Both should have exactly 1 drop counted. + require.Equal(t, uint64(1), unnamed.Dropped()) + require.Equal(t, uint64(1), named.Dropped()) + + // FirstDropClaim should be available on both (independent + // of the internal firstLog flag). + require.True(t, unnamed.FirstDropClaim()) + require.True(t, named.FirstDropClaim()) +} + +// TestBackpressureMailboxConcurrentDropCounter verifies that the drop +// counter is accurate under concurrent send contention. +func TestBackpressureMailboxConcurrentDropCounter(t *testing.T) { + t.Parallel() + + ctx := context.Background() + const capacity = 10 + + // Always-drop predicate: every send is a predicate rejection. + alwaysDrop := queue.DropCheckFunc(func(int) bool { + return true + }) + + mbox := NewBackpressureMailbox[TestMessage, int]( + ctx, capacity, alwaysDrop, + BackpressureMailboxCfg{Name: "concurrent-test"}, + ) + + const numGoroutines = 20 + const sendsPerGoroutine = 500 + + var wg sync.WaitGroup + for i := range numGoroutines { + wg.Add(1) + go func() { + defer wg.Done() + + for j := range sendsPerGoroutine { + env := envelope[TestMessage, int]{ + message: TestMessage{ + Value: i*sendsPerGoroutine + j, + }, + } + + // Alternate between Send and TrySend. + if j%2 == 0 { + mbox.Send(ctx, env) + } else { + mbox.TrySend(env) + } + } + }() + } + wg.Wait() + + expected := uint64(numGoroutines * sendsPerGoroutine) + require.Equal(t, expected, mbox.Dropped(), + "every send should have been counted as a drop") + + // FirstDropClaim should have been left unclaimed (it's + // independent of the internal firstLog that the named + // mailbox consumed). + require.True(t, mbox.FirstDropClaim()) +} + // TestBackpressureMailboxConcurrentMultiClose verifies that calling Close // from multiple goroutines simultaneously does not panic. func TestBackpressureMailboxConcurrentMultiClose(t *testing.T) { @@ -367,7 +598,7 @@ func TestBackpressureMailboxConcurrentMultiClose(t *testing.T) { }) mbox := NewBackpressureMailbox[TestMessage, int]( - ctx, 10, neverDrop, + ctx, 10, neverDrop, BackpressureMailboxCfg{}, ) // Send a few messages first. diff --git a/docs/release-notes/release-notes-0.21.1.md b/docs/release-notes/release-notes-0.21.1.md new file mode 100644 index 00000000000..bbf1eda10af --- /dev/null +++ b/docs/release-notes/release-notes-0.21.1.md @@ -0,0 +1,71 @@ +# Release Notes +- [Bug Fixes](#bug-fixes) +- [New Features](#new-features) + - [Functional Enhancements](#functional-enhancements) + - [RPC Additions](#rpc-additions) + - [lncli Additions](#lncli-additions) +- [Improvements](#improvements) + - [Functional Updates](#functional-updates) + - [RPC Updates](#rpc-updates) + - [lncli Updates](#lncli-updates) + - [Breaking Changes](#breaking-changes) + - [Performance Improvements](#performance-improvements) + - [Deprecations](#deprecations) +- [Technical and Architectural Updates](#technical-and-architectural-updates) + - [BOLT Spec Updates](#bolt-spec-updates) + - [Testing](#testing) + - [Database](#database) + - [Code Health](#code-health) + - [Tooling and Documentation](#tooling-and-documentation) +- [Contributors (Alphabetical Order)](#contributors) + +# Bug Fixes + +# New Features + +## Functional Enhancements + +## RPC Additions + +## lncli Additions + +# Improvements + +## Functional Updates + +## RPC Updates + +## lncli Updates + +## Code Health + +## Breaking Changes + +## Performance Improvements + +## Deprecations + +# Technical and Architectural Updates + +## BOLT Spec Updates + +## Testing + +## Database + +## Code Health + +* Added drop-counter and one-shot first-drop observability to the + `actor.BackpressureMailbox`. Operators can now see when an actor's + mailbox starts shedding load via a single info-level log line on the + first predicate drop and a running `Dropped()` counter for total + rejections since start. The onion message actor's mailbox is wired + up as the first consumer, complementing the per-peer and global + onion-message rate-limiter first-drop logs introduced in + [#10713](https://github.com/lightningnetwork/lnd/pull/10713). + +## Tooling and Documentation + +# Contributors (Alphabetical Order) + +* Gijs van Dam diff --git a/go.mod b/go.mod index 8457d8a8d2a..0799f5e4978 100644 --- a/go.mod +++ b/go.mod @@ -207,6 +207,10 @@ require ( // TODO(gijs): remove once new queue package is released. replace github.com/lightningnetwork/lnd/queue => ./queue +// TODO(gijs): remove once new actor package is released with +// BackpressureMailboxCfg. +replace github.com/lightningnetwork/lnd/actor => ./actor + // TODO(elle): remove once the gossip V2 sqldb changes have been made. replace github.com/lightningnetwork/lnd/sqldb => ./sqldb diff --git a/go.sum b/go.sum index 99ae508a284..e99d976a411 100644 --- a/go.sum +++ b/go.sum @@ -374,8 +374,6 @@ github.com/lightninglabs/protobuf-go-hex-display v1.33.0-hex-display h1:Y2WiPkBS github.com/lightninglabs/protobuf-go-hex-display v1.33.0-hex-display/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= github.com/lightningnetwork/lightning-onion v1.3.0 h1:FqILgHjD6euc/Muo1VOzZ4+XDPuFnw6EYROBq0rR/5c= github.com/lightningnetwork/lightning-onion v1.3.0/go.mod h1:nP85zMHG7c0si/eHBbSQpuDCtnIXfSvFrK3tW6YWzmU= -github.com/lightningnetwork/lnd/actor v0.0.6 h1:Ge8N2wivARG+27qJBwTlB0vwsypStZYZy8vk4Zl38sU= -github.com/lightningnetwork/lnd/actor v0.0.6/go.mod h1:YAsoniSbY/cAM9HTVNfZLvt7RI6swDxy6wzPspTcMZg= github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf0d0Uy4qBjI= github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U= github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0= diff --git a/onionmessage/actor.go b/onionmessage/actor.go index 355bee033ca..7b046ee60d8 100644 --- a/onionmessage/actor.go +++ b/onionmessage/actor.go @@ -336,6 +336,9 @@ func DefaultOnionActorOpts() []actor.ActorOption[*Request, *Response] { return actor.NewBackpressureMailbox[*Request, *Response]( ctx, capacity, shouldDrop, + actor.BackpressureMailboxCfg{ + Name: "onion-message", + }, ) }