Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions database/merkle/firewood/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
SimultaneousWorkLimit int
Log logging.Logger
StateSyncNodes []ids.NodeID
PeerTracker *p2p.PeerTracker
Registerer prometheus.Registerer
}

Expand Down Expand Up @@ -61,11 +62,12 @@ func newWithDB(config Config, db sync.DB[*RangeProof, struct{}], targetRoot ids.
sync.Config[*RangeProof, struct{}]{
RangeProofMarshaler: rangeProofMarshaler{},
ChangeProofMarshaler: changeProofMarshaler{},
EmptyRoot: ids.ID(types.EmptyRootHash),
ProofClient: proofClient,
TargetRoot: targetRoot,
EmptyRoot: ids.ID(types.EmptyRootHash),
PeerTracker: config.PeerTracker,
SimultaneousWorkLimit: config.SimultaneousWorkLimit,
Log: config.Log,
TargetRoot: targetRoot,
StateSyncNodes: config.StateSyncNodes,
},
config.Registerer,
Expand Down
1 change: 1 addition & 0 deletions database/merkle/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_test(
"//snow/engine/common",
"//utils/logging",
"//utils/maybe",
"//utils/set",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
38 changes: 38 additions & 0 deletions database/merkle/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/maybe"
"github.com/ava-labs/avalanchego/utils/set"

pb "github.com/ava-labs/avalanchego/proto/pb/sync"
)
Expand Down Expand Up @@ -318,6 +319,43 @@ func Test_Sync_BusyContextCancellation(t *testing.T) {
require.ErrorIs(t, err, context.Canceled)
}

// Test_Sync_WithPeerTracker ensures that the peer tracker is used for sampling if provided.
func Test_Sync_WithPeerTracker(t *testing.T) {
Comment thread
JonathanOppenheimer marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two more test ideas -- right now this only tests which node was selected. You write in the description about testing difficulties but you could

  • Add two peers, return a malformed proof for one, force a retry, assert the second request goes to the other peer (SelectPeer SHOULD prefer the responsive one).
  • No-peers error path: don't call pt.Connected, and assert Sync() fails with the expected error (maybe make "no peers available" a sentinel error?)

targetRoot := ids.GenerateTestID()
clientDB := &db{id: ids.Empty}

var observedNodeID ids.NodeID
response := marshalRangeProofResponse(t, &proofDouble{newRoot: targetRoot})
handler := p2p.TestHandler{
AppRequestF: func(_ context.Context, nodeID ids.NodeID, _ time.Time, _ []byte) ([]byte, *common.AppError) {
observedNodeID = nodeID
return response, nil
},
}

knownNodeID := ids.GenerateTestNodeID()
pt, err := p2p.NewPeerTracker(logging.NoLog{}, "", prometheus.NewRegistry(), set.Set[ids.NodeID]{}, nil)
require.NoError(t, err)
pt.Connected(knownNodeID, nil)

syncer, err := NewSyncer(
clientDB,
Config[*proofDouble, *proofDouble]{
TargetRoot: targetRoot,
RangeProofMarshaler: marshaler{},
ChangeProofMarshaler: marshaler{},
ProofClient: p2ptest.NewSelfClient(t, t.Context(), knownNodeID, handler),
PeerTracker: pt,
Log: logging.NoLog{},
SimultaneousWorkLimit: 1,
},
prometheus.NewRegistry(),
)
require.NoError(t, err)
require.NoErrorf(t, syncer.Sync(t.Context()), "%T.Sync()", syncer)
require.Equal(t, knownNodeID, observedNodeID, "peer tracker must drive node selection")
}

func Test_Midpoint(t *testing.T) {
require := require.New(t)

Expand Down
75 changes: 56 additions & 19 deletions database/merkle/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,18 @@ type Syncer[R any, C any] struct {

// TODO remove non-config values out of this struct
type Config[R any, C any] struct {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish we would merge the struct ordering linter lol

RangeProofMarshaler Marshaler[R]
ChangeProofMarshaler Marshaler[C]
ProofClient *p2p.Client
// Required fields:
RangeProofMarshaler Marshaler[R]
ChangeProofMarshaler Marshaler[C]
ProofClient *p2p.Client
TargetRoot ids.ID
EmptyRoot ids.ID // defaults to [ids.Empty] if not set

// Optional
SimultaneousWorkLimit int
Log logging.Logger
TargetRoot ids.ID
EmptyRoot ids.ID
StateSyncNodes []ids.NodeID
PeerTracker *p2p.PeerTracker
}

func NewSyncer[R any, C any](
Expand Down Expand Up @@ -418,15 +422,16 @@ func (s *Syncer[_, _]) requestChangeProof(ctx context.Context, work *workItem) {
return
}

onResponse := func(ctx context.Context, _ ids.NodeID, responseBytes []byte, err error) {
onResponse := func(ctx context.Context, _ ids.NodeID, responseBytes []byte, appErr error) bool {
defer s.finishWorkItem()

if err := s.handleChangeProofResponse(ctx, targetRootID, work, changeReq, responseBytes, err); err != nil {
if err := s.handleChangeProofResponse(ctx, targetRootID, work, changeReq, responseBytes, appErr); err != nil {
// TODO log responses
s.config.Log.Debug("dropping response", zap.Error(err), zap.Stringer("request", request))
s.retryWork(work)
return
return false
}
return true
}

if err := s.sendRequest(ctx, s.config.ProofClient, requestBytes, onResponse); err != nil {
Expand Down Expand Up @@ -473,15 +478,16 @@ func (s *Syncer[_, _]) requestRangeProof(ctx context.Context, work *workItem) {
return
}

onResponse := func(ctx context.Context, _ ids.NodeID, responseBytes []byte, appErr error) {
onResponse := func(ctx context.Context, _ ids.NodeID, responseBytes []byte, appErr error) bool {
defer s.finishWorkItem()

if err := s.handleRangeProofResponse(ctx, targetRootID, work, rangeReq, responseBytes, appErr); err != nil {
// TODO log responses
s.config.Log.Debug("dropping response", zap.Error(err), zap.Stringer("request", request))
s.retryWork(work)
return
return false
}
return true
}

if err := s.sendRequest(ctx, s.config.ProofClient, requestBytes, onResponse); err != nil {
Expand All @@ -493,22 +499,53 @@ func (s *Syncer[_, _]) requestRangeProof(ctx context.Context, work *workItem) {
s.metrics.RequestMade()
}

type appResponseCallbackWithCheck func(ctx context.Context, nodeID ids.NodeID, responseBytes []byte, err error) bool
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe withResult instead? It's not really a check.


// toAppResponseCallback wraps the callback, discarding the bool return value.
func (c appResponseCallbackWithCheck) toAppResponseCallback() p2p.AppResponseCallback {
return func(ctx context.Context, nodeID ids.NodeID, responseBytes []byte, err error) {
_ = c(ctx, nodeID, responseBytes, err)
}
}

func (s *Syncer[_, _]) sendRequest(
ctx context.Context,
client *p2p.Client,
requestBytes []byte,
onResponse p2p.AppResponseCallback,
onResponse appResponseCallbackWithCheck,
) error {
if len(s.config.StateSyncNodes) == 0 {
return client.AppRequestAny(ctx, requestBytes, onResponse)
switch {
case len(s.config.StateSyncNodes) > 0:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't appear that you test the precedence or ordering for this new behavior in the switch here. We have TestStateSyncNodes and the new Test_Sync_WithPeerTracker, but would a small precedence test for this code here make sense?

nodeIdx := atomic.AddUint32(&s.stateSyncNodeIdx, 1)
nodeID := s.config.StateSyncNodes[nodeIdx%uint32(len(s.config.StateSyncNodes))]
return client.AppRequest(ctx, set.Of(nodeID), requestBytes, onResponse.toAppResponseCallback())
case s.config.PeerTracker != nil:
// TODO(alarso16): This logic should be pushed to the [p2p.Client] implementation.
return sendRequestWithPeerTracker(ctx, client, s.config.PeerTracker, requestBytes, onResponse)
default:
// Default to built-in selection from the [p2p.NodeSampler].
return client.AppRequestAny(ctx, requestBytes, onResponse.toAppResponseCallback())
}
}

func sendRequestWithPeerTracker(ctx context.Context, c *p2p.Client, pt *p2p.PeerTracker, requestBytes []byte, onResponse appResponseCallbackWithCheck) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also why isn't this on the syncer?

nodeID, ok := pt.SelectPeer()
if !ok {
return errors.New("no peers available")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a node starts state sync before any peer connects to the firewood handler, wouldn't the sync abort permanently? Shouldn't we retry in this case? or do something else?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the EVM client will sleep and then retry:

func (c *client) get(ctx context.Context, request message.Request, parseFn parseResponseFn) (interface{}, error) {
// marshal the request into requestBytes
requestBytes, err := message.RequestToBytes(c.codec, request)
if err != nil {
return nil, err
}
metric, err := c.stats.GetMetric(request)
if err != nil {
return nil, err
}
var (
responseIntf interface{}
numElements int
lastErr error
)
// Loop until the context is cancelled or we get a valid response.
for attempt := 0; ; attempt++ {
// If the context has finished, return the context error early.
if ctxErr := ctx.Err(); ctxErr != nil {
if lastErr != nil {
return nil, fmt.Errorf("request failed after %d attempts with last error %w and ctx error %w", attempt, lastErr, ctxErr)
} else {
return nil, ctxErr
}
}
metric.IncRequested()
var (
response []byte
nodeID ids.NodeID
start = time.Now()
)
if len(c.stateSyncNodes) == 0 {
response, nodeID, err = c.network.SendSyncedAppRequestAny(ctx, requestBytes)
} else {
// get the next nodeID using the nodeIdx offset. If we're out of nodes, loop back to 0
// we do this every attempt to ensure we get a different node each time if possible.
nodeIdx := atomic.AddUint32(&c.stateSyncNodeIdx, 1)
nodeID = c.stateSyncNodes[nodeIdx%uint32(len(c.stateSyncNodes))]
response, err = c.network.SendSyncedAppRequest(ctx, nodeID, requestBytes)
}
metric.UpdateRequestLatency(time.Since(start))
if err != nil {
logCtx := make([]any, 0, 8)
if nodeID != ids.EmptyNodeID {
logCtx = append(logCtx, "nodeID", nodeID)
}
logCtx = append(logCtx, "attempt", attempt, "request", request, "err", err)
log.Debug("request failed, retrying", logCtx...)
metric.IncFailed()
c.pt.RegisterFailure(nodeID)
time.Sleep(failedRequestSleepInterval)
continue
} else {
responseIntf, numElements, err = parseFn(c.codec, request, response)
if err != nil {
lastErr = err
log.Debug("could not validate response, retrying", "nodeID", nodeID, "attempt", attempt, "request", request, "err", err)
c.pt.RegisterFailure(nodeID)
metric.IncFailed()
metric.IncInvalidResponse()
continue
}
bandwidth := float64(len(response)) / (time.Since(start).Seconds() + epsilon)
c.pt.RegisterResponse(nodeID, bandwidth)
metric.IncSucceeded()
metric.IncReceived(int64(numElements))
return responseIntf, nil
}
}
}

}
Comment on lines +532 to +535
pt.RegisterRequest(nodeID)
t := time.Now()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start instead?

return c.AppRequest(ctx, set.Of(nodeID), requestBytes, func(ctx context.Context, nodeID ids.NodeID, responseBytes []byte, appErr error) {
timeToRespond := time.Since(t)
if handled := onResponse(ctx, nodeID, responseBytes, appErr); !handled {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also a nit on naming, I think success or ok is better than handled -- it's handled regardless of success or failure!

pt.RegisterFailure(nodeID)
return
}

// Get the next nodeID to query using the [nodeIdx] offset.
// If we're out of nodes, loop back to 0.
// We do this try to query a different node each time if possible.
nodeIdx := atomic.AddUint32(&s.stateSyncNodeIdx, 1)
nodeID := s.config.StateSyncNodes[nodeIdx%uint32(len(s.config.StateSyncNodes))]
return client.AppRequest(ctx, set.Of(nodeID), requestBytes, onResponse)
const epsilon = 1e-9 // avoid division by zero
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we use 1e-6 before and now we use 1e-9 now?

epsilon = 1e-6 // small amount to add to time to avoid division by 0

Doesn't matter at all, I'm just curious.

bandwidth := float64(len(responseBytes)) / float64(timeToRespond.Seconds()+epsilon)
pt.RegisterResponse(nodeID, bandwidth)
})
}

func (s *Syncer[_, _]) retryWork(work *workItem) {
Expand Down
12 changes: 6 additions & 6 deletions graft/coreth/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,7 @@ type SyncedNetworkClient interface {
// Returns response bytes, and ErrRequestFailed if the request should be retried.
SendSyncedAppRequest(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error)

// RegisterResponse records a successful response from nodeID with the
// observed bandwidth (response bytes divided by request time).
RegisterResponse(nodeID ids.NodeID, bandwidth float64)

// RegisterFailure records a failed response from nodeID.
RegisterFailure(nodeID ids.NodeID)
Comment on lines -61 to -66
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You deleted these from the interface, but you didn't actually delete the implementations. I think this is now dead code. You should do the same in subnet-evm

Image

PeerTracker() *p2p.PeerTracker
}

type Network interface {
Expand Down Expand Up @@ -168,6 +163,11 @@ func NewNetwork(
}, nil
}

// PeerTracker returns the bandwidth-tracked peer list for use during state sync.
func (n *network) PeerTracker() *p2p.PeerTracker {
return n.peers
}

// Sample returns a random sample of connected peers.
// `limit` is ignored, and one peer will be returned.
// The peer returned may not be a validator - to sample validators,
Expand Down
2 changes: 1 addition & 1 deletion graft/evm/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/holiman/bloomfilter/v2 v2.0.3
github.com/holiman/uint256 v1.2.4
github.com/prometheus/client_golang v1.23.0
github.com/stretchr/testify v1.11.1
go.uber.org/goleak v1.3.0
golang.org/x/crypto v0.49.0
Expand Down Expand Up @@ -65,7 +66,6 @@ require (
github.com/onsi/gomega v1.36.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.23.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
Expand Down
3 changes: 3 additions & 0 deletions graft/evm/sync/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ go_library(
"//graft/evm/sync/handlers",
"//ids",
"//network/p2p",
"//utils/logging",
"//utils/set",
"@com_github_ava_labs_libevm//common",
"@com_github_ava_labs_libevm//core/rawdb",
"@com_github_ava_labs_libevm//core/types",
Expand All @@ -26,6 +28,7 @@ go_library(
"@com_github_ava_labs_libevm//params",
"@com_github_ava_labs_libevm//rlp",
"@com_github_ava_labs_libevm//trie",
"@com_github_prometheus_client_golang//prometheus",
],
)

Expand Down
22 changes: 9 additions & 13 deletions graft/evm/sync/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,7 @@ type Network interface {
// Returns response bytes, and ErrRequestFailed if the request should be retried.
SendSyncedAppRequest(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error)

// RegisterResponse records a successful response from nodeID with the
// observed bandwidth. Used to prioritize more responsive peers.
RegisterResponse(nodeID ids.NodeID, bandwidth float64)

// RegisterFailure records a failed response from nodeID.
RegisterFailure(nodeID ids.NodeID)
PeerTracker() *p2p.PeerTracker

// P2PNetwork returns the unabstracted [p2p.Network].
P2PNetwork() *p2p.Network
Expand All @@ -87,8 +82,7 @@ type Client interface {
// GetCode synchronously retrieves code associated with the given hashes
GetCode(ctx context.Context, hashes []common.Hash) ([][]byte, error)

// AddClient creates a separate client on the underlying [p2p.Network].
AddClient(handlerID uint64) *p2p.Client
Network() Network

// StateSyncNodes returns the list of nodes provided via config.
StateSyncNodes() []ids.NodeID
Expand All @@ -107,6 +101,7 @@ type client struct {
stateSyncNodeIdx uint32
stats stats.ClientSyncerStats
blockParser EthBlockParser
pt *p2p.PeerTracker
}

type Config struct {
Expand All @@ -128,11 +123,12 @@ func New(config *Config) *client {
stats: config.Stats,
stateSyncNodes: config.StateSyncNodeIDs,
blockParser: config.BlockParser,
pt: config.Network.PeerTracker(),
}
}

func (c *client) AddClient(handlerID uint64) *p2p.Client {
return c.network.P2PNetwork().NewClient(handlerID, c.network)
func (c *client) Network() Network {
return c.network
}

func (c *client) StateSyncNodes() []ids.NodeID {
Expand Down Expand Up @@ -374,22 +370,22 @@ func (c *client) get(ctx context.Context, request message.Request, parseFn parse
logCtx = append(logCtx, "attempt", attempt, "request", request, "err", err)
log.Debug("request failed, retrying", logCtx...)
metric.IncFailed()
c.network.RegisterFailure(nodeID)
c.pt.RegisterFailure(nodeID)
time.Sleep(failedRequestSleepInterval)
continue
} else {
responseIntf, numElements, err = parseFn(c.codec, request, response)
if err != nil {
lastErr = err
log.Debug("could not validate response, retrying", "nodeID", nodeID, "attempt", attempt, "request", request, "err", err)
c.network.RegisterFailure(nodeID)
c.pt.RegisterFailure(nodeID)
metric.IncFailed()
metric.IncInvalidResponse()
continue
}

bandwidth := float64(len(response)) / (time.Since(start).Seconds() + epsilon)
c.network.RegisterResponse(nodeID, bandwidth)
c.pt.RegisterResponse(nodeID, bandwidth)
metric.IncSucceeded()
metric.IncReceived(int64(numElements))
return responseIntf, nil
Expand Down
7 changes: 3 additions & 4 deletions graft/evm/sync/client/test_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/ava-labs/avalanchego/graft/evm/message"
"github.com/ava-labs/avalanchego/graft/evm/sync/handlers"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
)

var (
Expand Down Expand Up @@ -57,11 +56,11 @@ func NewTestClient(
}
}

func (*TestClient) AddClient(uint64) *p2p.Client {
panic("AddClient is not supported in TestClient")
func (*TestClient) StateSyncNodes() []ids.NodeID {
return nil
}

func (*TestClient) StateSyncNodes() []ids.NodeID {
func (*TestClient) Network() Network {
return nil
}
Comment on lines +63 to 65
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct -- compare to AddClient right?


Expand Down
Loading
Loading