Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions database/merkle/firewood/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
SimultaneousWorkLimit int
Log logging.Logger
StateSyncNodes []ids.NodeID
PeerTracker *p2p.PeerTracker
Registerer prometheus.Registerer
}

Expand Down Expand Up @@ -61,11 +62,12 @@ func newWithDB(config Config, db sync.DB[*RangeProof, struct{}], targetRoot ids.
sync.Config[*RangeProof, struct{}]{
RangeProofMarshaler: rangeProofMarshaler{},
ChangeProofMarshaler: changeProofMarshaler{},
EmptyRoot: ids.ID(types.EmptyRootHash),
ProofClient: proofClient,
TargetRoot: targetRoot,
EmptyRoot: ids.ID(types.EmptyRootHash),
PeerTracker: config.PeerTracker,
SimultaneousWorkLimit: config.SimultaneousWorkLimit,
Log: config.Log,
TargetRoot: targetRoot,
StateSyncNodes: config.StateSyncNodes,
},
config.Registerer,
Expand Down
1 change: 1 addition & 0 deletions database/merkle/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_test(
"//snow/engine/common",
"//utils/logging",
"//utils/maybe",
"//utils/set",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
116 changes: 116 additions & 0 deletions database/merkle/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/maybe"
"github.com/ava-labs/avalanchego/utils/set"

pb "github.com/ava-labs/avalanchego/proto/pb/sync"
)
Expand Down Expand Up @@ -318,6 +319,121 @@
require.ErrorIs(t, err, context.Canceled)
}

// Test_Sync_WithPeerTracker ensures that the peer tracker is used for sampling if provided.
func Test_Sync_WithPeerTracker(t *testing.T) {
Comment thread
JonathanOppenheimer marked this conversation as resolved.
Comment thread
alarso16 marked this conversation as resolved.
targetRoot := ids.GenerateTestID()
clientDB := &db{id: ids.Empty}

var observedNodeID ids.NodeID
response := marshalRangeProofResponse(t, &proofDouble{newRoot: targetRoot})
handler := p2p.TestHandler{
AppRequestF: func(_ context.Context, nodeID ids.NodeID, _ time.Time, _ []byte) ([]byte, *common.AppError) {
observedNodeID = nodeID
return response, nil
},
}

knownNodeID := ids.GenerateTestNodeID()
pt, err := p2p.NewPeerTracker(logging.NoLog{}, "", prometheus.NewRegistry(), set.Set[ids.NodeID]{}, nil)
require.NoError(t, err)
pt.Connected(knownNodeID, nil)

syncer, err := NewSyncer(
clientDB,
Config[*proofDouble, *proofDouble]{
TargetRoot: targetRoot,
RangeProofMarshaler: marshaler{},
ChangeProofMarshaler: marshaler{},
ProofClient: p2ptest.NewSelfClient(t, t.Context(), knownNodeID, handler),
PeerTracker: pt,
Log: logging.NoLog{},
SimultaneousWorkLimit: 1,
},
prometheus.NewRegistry(),
)
require.NoError(t, err)
require.NoErrorf(t, syncer.Sync(t.Context()), "%T.Sync()", syncer)
require.Equal(t, knownNodeID, observedNodeID, "peer tracker must drive node selection")
}

func Test_Sync_NoPeersAvailable(t *testing.T) {
targetRoot := ids.GenerateTestID()
clientDB := &db{id: ids.Empty}

pt, err := p2p.NewPeerTracker(logging.NoLog{}, "", prometheus.NewRegistry(), set.Set[ids.NodeID]{}, nil)
require.NoError(t, err)

syncer, err := NewSyncer(
clientDB,
Config[*proofDouble, *proofDouble]{
TargetRoot: targetRoot,
RangeProofMarshaler: marshaler{},
ChangeProofMarshaler: marshaler{},
ProofClient: p2ptest.NewSelfClient(t, t.Context(), ids.EmptyNodeID, p2p.TestHandler{}),
PeerTracker: pt,
Log: logging.NoLog{},
SimultaneousWorkLimit: 1,
},
prometheus.NewRegistry(),
)
require.NoError(t, err)
err = syncer.Sync(t.Context())
require.ErrorIs(t, err, errNoPeersAvailable)
}

// Test_Sync_NodeIDs ensures that specific nodeIDs are used for state syncing.
func Test_Sync_NodeIDs(t *testing.T) {
targetRoot := ids.GenerateTestID()
clientDB := &db{id: ids.Empty}

const numRequests = 10
response := marshalRangeProofResponse(t, &proofDouble{newRoot: ids.Empty})
finalResponse := marshalRangeProofResponse(t, &proofDouble{newRoot: targetRoot})
count := 0
expectedHandler := p2p.TestHandler{
AppRequestF: func(context.Context, ids.NodeID, time.Time, []byte) ([]byte, *common.AppError) {
count++
if count >= numRequests {
return finalResponse, nil
}
return response, nil
},
}
unexpectedHandler := p2p.TestHandler{
AppRequestF: func(context.Context, ids.NodeID, time.Time, []byte) ([]byte, *common.AppError) {
assert.Fail(t, "unexpected AppRequest to node")

Check warning on line 404 in database/merkle/sync/sync_test.go

View workflow job for this annotation

GitHub Actions / Lint

`assert.Fail`" continues on failure; consider require for fail-fast
return nil, nil
},
}

expectedNodeID := ids.GenerateTestNodeID()
unexpectedNodeID := ids.GenerateTestNodeID()
syncer, err := NewSyncer(
clientDB,
Config[*proofDouble, *proofDouble]{
TargetRoot: targetRoot,
RangeProofMarshaler: marshaler{},
ChangeProofMarshaler: marshaler{},
ProofClient: p2ptest.NewClientWithPeers(
t,
t.Context(),
ids.EmptyNodeID,
p2p.TestHandler{},
map[ids.NodeID]p2p.Handler{
expectedNodeID: expectedHandler,
unexpectedNodeID: unexpectedHandler,
},
),
StateSyncNodes: []ids.NodeID{expectedNodeID},
Log: logging.NoLog{},
SimultaneousWorkLimit: 1,
},
prometheus.NewRegistry(),
)
require.NoError(t, err)
require.NoErrorf(t, syncer.Sync(t.Context()), "%T.Sync()", syncer)
}

func Test_Midpoint(t *testing.T) {
require := require.New(t)

Expand Down
81 changes: 62 additions & 19 deletions database/merkle/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
errInvalidChangeProof = errors.New("failed to verify change proof")
errTooManyBytes = errors.New("response contains more than requested bytes")
errUnexpectedResponseType = errors.New("unexpected response type")
errNoPeersAvailable = errors.New("no peers available")
)

type priority byte
Expand Down Expand Up @@ -144,14 +145,18 @@ type Syncer[R any, C any] struct {

// TODO remove non-config values out of this struct
type Config[R any, C any] struct {
Comment thread
alarso16 marked this conversation as resolved.
RangeProofMarshaler Marshaler[R]
ChangeProofMarshaler Marshaler[C]
ProofClient *p2p.Client
// Required fields:
RangeProofMarshaler Marshaler[R]
ChangeProofMarshaler Marshaler[C]
ProofClient *p2p.Client
TargetRoot ids.ID
EmptyRoot ids.ID // defaults to [ids.Empty] if not set

// Optional
SimultaneousWorkLimit int
Log logging.Logger
TargetRoot ids.ID
EmptyRoot ids.ID
StateSyncNodes []ids.NodeID
PeerTracker *p2p.PeerTracker
}

func NewSyncer[R any, C any](
Expand Down Expand Up @@ -418,15 +423,16 @@ func (s *Syncer[_, _]) requestChangeProof(ctx context.Context, work *workItem) {
return
}

onResponse := func(ctx context.Context, _ ids.NodeID, responseBytes []byte, err error) {
onResponse := func(ctx context.Context, _ ids.NodeID, responseBytes []byte, appErr error) bool {
defer s.finishWorkItem()

if err := s.handleChangeProofResponse(ctx, targetRootID, work, changeReq, responseBytes, err); err != nil {
if err := s.handleChangeProofResponse(ctx, targetRootID, work, changeReq, responseBytes, appErr); err != nil {
// TODO log responses
s.config.Log.Debug("dropping response", zap.Error(err), zap.Stringer("request", request))
s.retryWork(work)
return
return false
}
return true
}

if err := s.sendRequest(ctx, s.config.ProofClient, requestBytes, onResponse); err != nil {
Expand Down Expand Up @@ -473,15 +479,16 @@ func (s *Syncer[_, _]) requestRangeProof(ctx context.Context, work *workItem) {
return
}

onResponse := func(ctx context.Context, _ ids.NodeID, responseBytes []byte, appErr error) {
onResponse := func(ctx context.Context, _ ids.NodeID, responseBytes []byte, appErr error) bool {
defer s.finishWorkItem()

if err := s.handleRangeProofResponse(ctx, targetRootID, work, rangeReq, responseBytes, appErr); err != nil {
// TODO log responses
s.config.Log.Debug("dropping response", zap.Error(err), zap.Stringer("request", request))
s.retryWork(work)
return
return false
}
return true
}

if err := s.sendRequest(ctx, s.config.ProofClient, requestBytes, onResponse); err != nil {
Expand All @@ -493,22 +500,53 @@ func (s *Syncer[_, _]) requestRangeProof(ctx context.Context, work *workItem) {
s.metrics.RequestMade()
}

type appResponseCallbackWithResult func(ctx context.Context, nodeID ids.NodeID, responseBytes []byte, err error) bool

// toAppResponseCallback wraps the callback, discarding the bool return value.
func (c appResponseCallbackWithResult) toAppResponseCallback() p2p.AppResponseCallback {
return func(ctx context.Context, nodeID ids.NodeID, responseBytes []byte, err error) {
_ = c(ctx, nodeID, responseBytes, err)
}
}

func (s *Syncer[_, _]) sendRequest(
ctx context.Context,
client *p2p.Client,
requestBytes []byte,
onResponse p2p.AppResponseCallback,
onResponse appResponseCallbackWithResult,
) error {
if len(s.config.StateSyncNodes) == 0 {
return client.AppRequestAny(ctx, requestBytes, onResponse)
switch {
case len(s.config.StateSyncNodes) > 0:
Comment thread
alarso16 marked this conversation as resolved.
nodeIdx := atomic.AddUint32(&s.stateSyncNodeIdx, 1)
nodeID := s.config.StateSyncNodes[nodeIdx%uint32(len(s.config.StateSyncNodes))]
return client.AppRequest(ctx, set.Of(nodeID), requestBytes, onResponse.toAppResponseCallback())
case s.config.PeerTracker != nil:
// TODO(alarso16): This logic should be pushed to the [p2p.Client] implementation.
return sendRequestWithPeerTracker(ctx, client, s.config.PeerTracker, requestBytes, onResponse)
default:
// Default to built-in selection from the [p2p.NodeSampler].
return client.AppRequestAny(ctx, requestBytes, onResponse.toAppResponseCallback())
}
}

// Get the next nodeID to query using the [nodeIdx] offset.
// If we're out of nodes, loop back to 0.
// We do this try to query a different node each time if possible.
nodeIdx := atomic.AddUint32(&s.stateSyncNodeIdx, 1)
nodeID := s.config.StateSyncNodes[nodeIdx%uint32(len(s.config.StateSyncNodes))]
return client.AppRequest(ctx, set.Of(nodeID), requestBytes, onResponse)
func sendRequestWithPeerTracker(ctx context.Context, c *p2p.Client, pt *p2p.PeerTracker, requestBytes []byte, onResponse appResponseCallbackWithResult) error {
nodeID, ok := pt.SelectPeer()
if !ok {
return errNoPeersAvailable
}
Comment thread
alarso16 marked this conversation as resolved.
pt.RegisterRequest(nodeID)
start := time.Now()
return c.AppRequest(ctx, set.Of(nodeID), requestBytes, func(ctx context.Context, nodeID ids.NodeID, responseBytes []byte, appErr error) {
timeToRespond := time.Since(start)
if success := onResponse(ctx, nodeID, responseBytes, appErr); !success {
pt.RegisterFailure(nodeID)
return
}

const epsilon = 1e-9 // avoid division by zero
Comment thread
JonathanOppenheimer marked this conversation as resolved.
bandwidth := float64(len(responseBytes)) / float64(timeToRespond.Seconds()+epsilon)
pt.RegisterResponse(nodeID, bandwidth)
})
}

func (s *Syncer[_, _]) retryWork(work *workItem) {
Expand Down Expand Up @@ -745,8 +783,13 @@ func (s *Syncer[_, _]) setError(err error) {
s.errLock.Lock()
defer s.errLock.Unlock()

if s.fatalError != nil {
return
}

s.config.Log.Error("sync errored", zap.Error(err))
s.fatalError = err

// Call in goroutine because we might be holding [s.workLock]
go func() {
s.workLock.Lock()
Expand Down
20 changes: 6 additions & 14 deletions graft/coreth/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,7 @@ type SyncedNetworkClient interface {
// Returns response bytes, and ErrRequestFailed if the request should be retried.
SendSyncedAppRequest(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error)

// RegisterResponse records a successful response from nodeID with the
// observed bandwidth (response bytes divided by request time).
RegisterResponse(nodeID ids.NodeID, bandwidth float64)

// RegisterFailure records a failed response from nodeID.
RegisterFailure(nodeID ids.NodeID)
Comment thread
alarso16 marked this conversation as resolved.
PeerTracker() *p2p.PeerTracker
}

type Network interface {
Expand Down Expand Up @@ -168,6 +163,11 @@ func NewNetwork(
}, nil
}

// PeerTracker returns the bandwidth-tracked peer list for use during state sync.
func (n *network) PeerTracker() *p2p.PeerTracker {
return n.peers
}

// Sample returns a random sample of connected peers.
// `limit` is ignored, and one peer will be returned.
// The peer returned may not be a validator - to sample validators,
Expand Down Expand Up @@ -484,14 +484,6 @@ func (n *network) Size() uint32 {
return uint32(n.peers.Size())
}

func (n *network) RegisterResponse(nodeID ids.NodeID, bandwidth float64) {
n.peers.RegisterResponse(nodeID, bandwidth)
}

func (n *network) RegisterFailure(nodeID ids.NodeID) {
n.peers.RegisterFailure(nodeID)
}

// SendSyncedAppRequestAny synchronously sends request to an arbitrary peer.
// Returns response bytes, the ID of the chosen peer, and ErrRequestFailed if
// the request should be retried.
Expand Down
2 changes: 1 addition & 1 deletion graft/evm/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/holiman/bloomfilter/v2 v2.0.3
github.com/holiman/uint256 v1.2.4
github.com/prometheus/client_golang v1.23.0
github.com/stretchr/testify v1.11.1
go.uber.org/goleak v1.3.0
golang.org/x/crypto v0.49.0
Expand Down Expand Up @@ -65,7 +66,6 @@ require (
github.com/onsi/gomega v1.36.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.23.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
Expand Down
3 changes: 3 additions & 0 deletions graft/evm/sync/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ go_library(
"//graft/evm/sync/handlers",
"//ids",
"//network/p2p",
"//utils/logging",
"//utils/set",
"@com_github_ava_labs_libevm//common",
"@com_github_ava_labs_libevm//core/rawdb",
"@com_github_ava_labs_libevm//core/types",
Expand All @@ -26,6 +28,7 @@ go_library(
"@com_github_ava_labs_libevm//params",
"@com_github_ava_labs_libevm//rlp",
"@com_github_ava_labs_libevm//trie",
"@com_github_prometheus_client_golang//prometheus",
],
)

Expand Down
Loading
Loading