diff --git a/auctioneer/client.go b/auctioneer/client.go index fcdf5c77e..1f2fbfaf6 100644 --- a/auctioneer/client.go +++ b/auctioneer/client.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "math" + "math/rand/v2" "net" "sync" "sync/atomic" @@ -46,12 +47,9 @@ const ( ) var ( - // ErrServerShutdown is the error that is returned if the auction server - // signals it's going to shut down. - ErrServerShutdown = errors.New("server shutting down") - // ErrServerErrored is the error that is returned if the auction server - // sends back an error instead of a proper message. + // sends back an error instead of a proper message, or if the server + // stream is closed in a way that requires a reconnect. ErrServerErrored = errors.New("server sent unexpected error") // ErrClientShutdown is the error that is returned if the trader client @@ -932,6 +930,16 @@ func (c *Client) IsSubscribed() bool { return c.serverStream != nil } +// jitterBackoff returns backoff with up to 25% additive jitter so reconnect +// attempts from a population of traders observing the same disconnect event +// fan out over a window rather than spike at one instant. The jitter is +// one-sided so we never wait less than the operator's configured floor. +// +//nolint:gosec // Backoff jitter doesn't need cryptographic randomness. +func jitterBackoff(backoff time.Duration) time.Duration { + return backoff + time.Duration(rand.Int64N(int64(backoff)/4+1)) +} + // connectServerStream opens the initial connection to the server for the stream // of account updates and handles reconnect trials with incremental backoff. func (c *Client) connectServerStream(initialBackoff time.Duration, @@ -948,8 +956,11 @@ func (c *Client) connectServerStream(initialBackoff time.Duration, ) for i := 0; i < numRetries; i++ { // Wait before connecting in case this is a reconnect trial. + // Apply additive jitter so a population of traders that all + // hit the same disconnect event don't fan in on the server at + // exactly the same instant. if backoff != 0 { - err = c.wait(backoff) + err = c.wait(jitterBackoff(backoff)) if err != nil { return err } @@ -1043,14 +1054,16 @@ func (c *Client) readIncomingStream() { // nolint:gocyclo poolrpc.PrintMsg(msg), err) switch { - // EOF is the "normal" close signal, meaning the server has - // cut its side of the connection. We will only get this during - // the proper shutdown of the server where we already have a - // reconnect scheduled. On an improper shutdown, we'll get an - // error, usually "transport is closing". + // EOF means the server has cut its side of the stream cleanly. + // This happens on planned server shutdowns, but also on + // proxy/load-balancer idle timeouts or any other clean-close + // scenario where the underlying TCP connection may still be + // alive. In all cases the long-lived subscription is gone and + // we need to trigger a reconnect, so route this through the + // same error path as any other stream failure. case err == io.EOF: select { - case c.errChanSwitch.ErrChan() <- ErrServerShutdown: + case c.errChanSwitch.ErrChan() <- ErrServerErrored: case <-c.quit: } return diff --git a/auctioneer/client_test.go b/auctioneer/client_test.go new file mode 100644 index 000000000..b6bc74d67 --- /dev/null +++ b/auctioneer/client_test.go @@ -0,0 +1,205 @@ +package auctioneer + +import ( + "errors" + "io" + "testing" + "time" + + "github.com/lightninglabs/pool/auctioneerrpc" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// TestJitterBackoffBounds samples the jitter helper for a typical configured +// backoff and asserts results fall in the expected [backoff, backoff + +// backoff/4] range and aren't pinned to a single value. +func TestJitterBackoffBounds(t *testing.T) { + t.Parallel() + + const ( + base = 5 * time.Second + samples = 200 + ) + seen := make(map[time.Duration]struct{}, samples) + for i := 0; i < samples; i++ { + got := jitterBackoff(base) + if got < base || got > base+base/4 { + t.Fatalf("jitterBackoff(%v) = %v, out of [%v, %v]", + base, got, base, base+base/4) + } + seen[got] = struct{}{} + } + + // In 200 samples over a 1.25s window of nanosecond resolution we + // expect many distinct values. If we get only a handful, jitter is + // broken. + if len(seen) < 10 { + t.Fatalf("expected diverse jitter samples, "+ + "only got %d unique values", len(seen)) + } +} + +// fakeServerStream is a minimal implementation of +// ChannelAuctioneer_SubscribeBatchAuctionClient that returns predetermined +// results from Recv. It is only sufficient for driving the client's read loop. +type fakeServerStream struct { + grpc.ClientStream + + recv chan recvResult +} + +type recvResult struct { + msg *auctioneerrpc.ServerAuctionMessage + err error +} + +func (s *fakeServerStream) Send(*auctioneerrpc.ClientAuctionMessage) error { + return nil +} + +func (s *fakeServerStream) Recv() (*auctioneerrpc.ServerAuctionMessage, error) { + r := <-s.recv + return r.msg, r.err +} + +// newTestClient returns a Client wired up just enough to drive +// readIncomingStream against a fake server stream. +func newTestClient(stream auctioneerrpc.ChannelAuctioneer_SubscribeBatchAuctionClient, +) (*Client, chan error) { + + mainErrChan := make(chan error, 1) + c := &Client{ + serverStream: stream, + FromServerChan: make(chan *auctioneerrpc.ServerAuctionMessage), + StreamErrChan: mainErrChan, + errChanSwitch: NewErrChanSwitch(mainErrChan), + quit: make(chan struct{}), + subscribedAccts: make(map[[33]byte]*acctSubscription), + } + c.errChanSwitch.Start() + return c, mainErrChan +} + +// runReadLoop runs readIncomingStream in a goroutine and returns a channel +// that closes when the loop exits. +func runReadLoop(c *Client) <-chan struct{} { + done := make(chan struct{}) + go func() { + c.readIncomingStream() + close(done) + }() + return done +} + +// TestReadIncomingStreamEOFTriggersReconnect ensures that an io.EOF received +// on the server stream is surfaced as ErrServerErrored on the error channel, +// which is the signal the rpcserver consumer uses to trigger reconnect logic. +// +// This is a regression test: EOF was previously reported as a separate +// "ErrServerShutdown" sentinel that the consumer silently ignored under the +// (incorrect) assumption that the client had already scheduled its own +// reconnect. The result was a permanently dead subscription stream after any +// clean close (proxy/LB timeout, planned server shutdown, etc.), with the +// trader being filtered as offline until the process restarted. +func TestReadIncomingStreamEOFTriggersReconnect(t *testing.T) { + t.Parallel() + + stream := &fakeServerStream{recv: make(chan recvResult, 1)} + c, mainErrChan := newTestClient(stream) + defer c.errChanSwitch.Stop() + defer close(c.quit) + + // Tell the fake stream to return io.EOF, simulating the server (or an + // intermediate proxy) cleanly closing its side of the bidi stream. + stream.recv <- recvResult{err: io.EOF} + + done := runReadLoop(c) + + select { + case err := <-mainErrChan: + if !errors.Is(err, ErrServerErrored) { + t.Fatalf("expected ErrServerErrored on EOF, got: %v", + err) + } + case <-time.After(defaultTimeout): + t.Fatal("timed out waiting for error after EOF") + } + + select { + case <-done: + case <-time.After(defaultTimeout): + t.Fatal("readIncomingStream did not return after EOF") + } +} + +// TestReadIncomingStreamTransportErrorTriggersReconnect ensures non-EOF +// transport errors continue to be surfaced as ErrServerErrored. This is the +// pre-existing behaviour we want to preserve after unifying it with the EOF +// path. +func TestReadIncomingStreamTransportErrorTriggersReconnect(t *testing.T) { + t.Parallel() + + stream := &fakeServerStream{recv: make(chan recvResult, 1)} + c, mainErrChan := newTestClient(stream) + defer c.errChanSwitch.Stop() + defer close(c.quit) + + // A "transport is closing" style error, which is what gRPC surfaces + // when the underlying TCP connection breaks abruptly. + stream.recv <- recvResult{ + err: status.Error(codes.Unavailable, "transport is closing"), + } + + done := runReadLoop(c) + + select { + case err := <-mainErrChan: + if !errors.Is(err, ErrServerErrored) { + t.Fatalf("expected ErrServerErrored on transport "+ + "error, got: %v", err) + } + case <-time.After(defaultTimeout): + t.Fatal("timed out waiting for error after transport failure") + } + + select { + case <-done: + case <-time.After(defaultTimeout): + t.Fatal("readIncomingStream did not return after transport " + + "failure") + } +} + +// TestReadIncomingStreamContextCanceledDoesNotReconnect ensures that a +// codes.Canceled error (which happens when *we* cancel the stream context +// during shutdown or a planned reconnect) does NOT surface an error to the +// consumer, so we don't accidentally schedule a second reconnect. +func TestReadIncomingStreamContextCanceledDoesNotReconnect(t *testing.T) { + t.Parallel() + + stream := &fakeServerStream{recv: make(chan recvResult, 1)} + c, mainErrChan := newTestClient(stream) + defer c.errChanSwitch.Stop() + defer close(c.quit) + + stream.recv <- recvResult{ + err: status.Error(codes.Canceled, "context canceled"), + } + + done := runReadLoop(c) + + select { + case <-done: + case <-time.After(defaultTimeout): + t.Fatal("readIncomingStream did not return after cancel") + } + + select { + case err := <-mainErrChan: + t.Fatalf("unexpected error surfaced on cancel: %v", err) + case <-time.After(defaultTimeout): + // Expected: no error surfaced. + } +} diff --git a/rpcserver.go b/rpcserver.go index 6d1144084..a0836de0d 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -307,10 +307,13 @@ func (s *rpcServer) serverHandler(blockChan chan int32, } case err := <-s.auctioneer.StreamErrChan: - // If the server is shutting down, then the client has - // already scheduled a restart. We only need to handle - // other errors here. - if err != nil && err != auctioneer.ErrServerShutdown { + // Any error received on the stream means the + // long-lived subscription is gone and needs to be + // re-established. The only other "shutdown" signal + // the client raises (an explicit SERVER_SHUTDOWN + // message from the auctioneer) is handled inline in + // the read loop and never reaches this channel. + if err != nil { rpcLog.Errorf("Error in server stream: %v", err) err := s.auctioneer.HandleServerShutdown(err) if err != nil { @@ -319,8 +322,6 @@ func (s *rpcServer) serverHandler(blockChan chan int32, } } - rpcLog.Errorf("Unknown server error: %v", err) - case height := <-blockChan: rpcLog.Infof("Received new block notification: "+ "height=%v", height)