Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions auctioneer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"math"
"math/rand/v2"
"net"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -46,12 +47,9 @@ const (
)

var (
// ErrServerShutdown is the error that is returned if the auction server
// signals it's going to shut down.
ErrServerShutdown = errors.New("server shutting down")

// ErrServerErrored is the error that is returned if the auction server
// sends back an error instead of a proper message.
// sends back an error instead of a proper message, or if the server
// stream is closed in a way that requires a reconnect.
ErrServerErrored = errors.New("server sent unexpected error")

// ErrClientShutdown is the error that is returned if the trader client
Expand Down Expand Up @@ -932,6 +930,16 @@ func (c *Client) IsSubscribed() bool {
return c.serverStream != nil
}

// jitterBackoff returns backoff with up to 25% additive jitter so reconnect
// attempts from a population of traders observing the same disconnect event
// fan out over a window rather than spike at one instant. The jitter is
// one-sided so we never wait less than the operator's configured floor.
//
//nolint:gosec // Backoff jitter doesn't need cryptographic randomness.
func jitterBackoff(backoff time.Duration) time.Duration {
return backoff + time.Duration(rand.Int64N(int64(backoff)/4+1))
}

// connectServerStream opens the initial connection to the server for the stream
// of account updates and handles reconnect trials with incremental backoff.
func (c *Client) connectServerStream(initialBackoff time.Duration,
Expand All @@ -948,8 +956,11 @@ func (c *Client) connectServerStream(initialBackoff time.Duration,
)
for i := 0; i < numRetries; i++ {
// Wait before connecting in case this is a reconnect trial.
// Apply additive jitter so a population of traders that all
// hit the same disconnect event don't fan in on the server at
// exactly the same instant.
if backoff != 0 {
err = c.wait(backoff)
err = c.wait(jitterBackoff(backoff))
Comment thread
djkazic marked this conversation as resolved.
if err != nil {
return err
}
Expand Down Expand Up @@ -1043,14 +1054,16 @@ func (c *Client) readIncomingStream() { // nolint:gocyclo
poolrpc.PrintMsg(msg), err)

switch {
// EOF is the "normal" close signal, meaning the server has
// cut its side of the connection. We will only get this during
// the proper shutdown of the server where we already have a
// reconnect scheduled. On an improper shutdown, we'll get an
// error, usually "transport is closing".
// EOF means the server has cut its side of the stream cleanly.
// This happens on planned server shutdowns, but also on
// proxy/load-balancer idle timeouts or any other clean-close
// scenario where the underlying TCP connection may still be
// alive. In all cases the long-lived subscription is gone and
// we need to trigger a reconnect, so route this through the
// same error path as any other stream failure.
case err == io.EOF:
select {
case c.errChanSwitch.ErrChan() <- ErrServerShutdown:
case c.errChanSwitch.ErrChan() <- ErrServerErrored:
case <-c.quit:
}
return
Expand Down
205 changes: 205 additions & 0 deletions auctioneer/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package auctioneer

import (
"errors"
"io"
"testing"
"time"

"github.com/lightninglabs/pool/auctioneerrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// TestJitterBackoffBounds samples the jitter helper for a typical configured
// backoff and asserts results fall in the expected [backoff, backoff +
// backoff/4] range and aren't pinned to a single value.
func TestJitterBackoffBounds(t *testing.T) {
t.Parallel()

const (
base = 5 * time.Second
samples = 200
)
seen := make(map[time.Duration]struct{}, samples)
for i := 0; i < samples; i++ {
got := jitterBackoff(base)
if got < base || got > base+base/4 {
t.Fatalf("jitterBackoff(%v) = %v, out of [%v, %v]",
base, got, base, base+base/4)
}
seen[got] = struct{}{}
}

// In 200 samples over a 1.25s window of nanosecond resolution we
// expect many distinct values. If we get only a handful, jitter is
// broken.
if len(seen) < 10 {
t.Fatalf("expected diverse jitter samples, "+
"only got %d unique values", len(seen))
}
}

// fakeServerStream is a minimal implementation of
// ChannelAuctioneer_SubscribeBatchAuctionClient that returns predetermined
// results from Recv. It is only sufficient for driving the client's read loop.
type fakeServerStream struct {
grpc.ClientStream

recv chan recvResult
}

type recvResult struct {
msg *auctioneerrpc.ServerAuctionMessage
err error
}

func (s *fakeServerStream) Send(*auctioneerrpc.ClientAuctionMessage) error {
return nil
}

func (s *fakeServerStream) Recv() (*auctioneerrpc.ServerAuctionMessage, error) {
r := <-s.recv
return r.msg, r.err
}

// newTestClient returns a Client wired up just enough to drive
// readIncomingStream against a fake server stream.
func newTestClient(stream auctioneerrpc.ChannelAuctioneer_SubscribeBatchAuctionClient,
) (*Client, chan error) {

mainErrChan := make(chan error, 1)
c := &Client{
serverStream: stream,
FromServerChan: make(chan *auctioneerrpc.ServerAuctionMessage),
StreamErrChan: mainErrChan,
errChanSwitch: NewErrChanSwitch(mainErrChan),
quit: make(chan struct{}),
subscribedAccts: make(map[[33]byte]*acctSubscription),
}
c.errChanSwitch.Start()
return c, mainErrChan
}

// runReadLoop runs readIncomingStream in a goroutine and returns a channel
// that closes when the loop exits.
func runReadLoop(c *Client) <-chan struct{} {
done := make(chan struct{})
go func() {
c.readIncomingStream()
close(done)
}()
return done
}

// TestReadIncomingStreamEOFTriggersReconnect ensures that an io.EOF received
// on the server stream is surfaced as ErrServerErrored on the error channel,
// which is the signal the rpcserver consumer uses to trigger reconnect logic.
//
// This is a regression test: EOF was previously reported as a separate
// "ErrServerShutdown" sentinel that the consumer silently ignored under the
// (incorrect) assumption that the client had already scheduled its own
// reconnect. The result was a permanently dead subscription stream after any
// clean close (proxy/LB timeout, planned server shutdown, etc.), with the
// trader being filtered as offline until the process restarted.
func TestReadIncomingStreamEOFTriggersReconnect(t *testing.T) {
t.Parallel()

stream := &fakeServerStream{recv: make(chan recvResult, 1)}
c, mainErrChan := newTestClient(stream)
defer c.errChanSwitch.Stop()
defer close(c.quit)

// Tell the fake stream to return io.EOF, simulating the server (or an
// intermediate proxy) cleanly closing its side of the bidi stream.
stream.recv <- recvResult{err: io.EOF}

done := runReadLoop(c)

select {
case err := <-mainErrChan:
if !errors.Is(err, ErrServerErrored) {
t.Fatalf("expected ErrServerErrored on EOF, got: %v",
err)
}
case <-time.After(defaultTimeout):
t.Fatal("timed out waiting for error after EOF")
}

select {
case <-done:
case <-time.After(defaultTimeout):
t.Fatal("readIncomingStream did not return after EOF")
}
}

// TestReadIncomingStreamTransportErrorTriggersReconnect ensures non-EOF
// transport errors continue to be surfaced as ErrServerErrored. This is the
// pre-existing behaviour we want to preserve after unifying it with the EOF
// path.
func TestReadIncomingStreamTransportErrorTriggersReconnect(t *testing.T) {
t.Parallel()

stream := &fakeServerStream{recv: make(chan recvResult, 1)}
c, mainErrChan := newTestClient(stream)
defer c.errChanSwitch.Stop()
defer close(c.quit)

// A "transport is closing" style error, which is what gRPC surfaces
// when the underlying TCP connection breaks abruptly.
stream.recv <- recvResult{
err: status.Error(codes.Unavailable, "transport is closing"),
}

done := runReadLoop(c)

select {
case err := <-mainErrChan:
if !errors.Is(err, ErrServerErrored) {
t.Fatalf("expected ErrServerErrored on transport "+
"error, got: %v", err)
}
case <-time.After(defaultTimeout):
t.Fatal("timed out waiting for error after transport failure")
}

select {
case <-done:
case <-time.After(defaultTimeout):
t.Fatal("readIncomingStream did not return after transport " +
"failure")
}
}

// TestReadIncomingStreamContextCanceledDoesNotReconnect ensures that a
// codes.Canceled error (which happens when *we* cancel the stream context
// during shutdown or a planned reconnect) does NOT surface an error to the
// consumer, so we don't accidentally schedule a second reconnect.
func TestReadIncomingStreamContextCanceledDoesNotReconnect(t *testing.T) {
t.Parallel()

stream := &fakeServerStream{recv: make(chan recvResult, 1)}
c, mainErrChan := newTestClient(stream)
defer c.errChanSwitch.Stop()
defer close(c.quit)

stream.recv <- recvResult{
err: status.Error(codes.Canceled, "context canceled"),
}

done := runReadLoop(c)

select {
case <-done:
case <-time.After(defaultTimeout):
t.Fatal("readIncomingStream did not return after cancel")
}

select {
case err := <-mainErrChan:
t.Fatalf("unexpected error surfaced on cancel: %v", err)
case <-time.After(defaultTimeout):
// Expected: no error surfaced.
}
}
13 changes: 7 additions & 6 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,13 @@ func (s *rpcServer) serverHandler(blockChan chan int32,
}

case err := <-s.auctioneer.StreamErrChan:
// If the server is shutting down, then the client has
// already scheduled a restart. We only need to handle
// other errors here.
if err != nil && err != auctioneer.ErrServerShutdown {
// Any error received on the stream means the
// long-lived subscription is gone and needs to be
// re-established. The only other "shutdown" signal
// the client raises (an explicit SERVER_SHUTDOWN
// message from the auctioneer) is handled inline in
// the read loop and never reaches this channel.
if err != nil {
rpcLog.Errorf("Error in server stream: %v", err)
err := s.auctioneer.HandleServerShutdown(err)
if err != nil {
Expand All @@ -319,8 +322,6 @@ func (s *rpcServer) serverHandler(blockChan chan int32,
}
}

rpcLog.Errorf("Unknown server error: %v", err)

case height := <-blockChan:
rpcLog.Infof("Received new block notification: "+
"height=%v", height)
Expand Down
Loading