Skip to content

Add peer tracking to merkle/sync#5415

Open
alarso16 wants to merge 1 commit into
masterfrom
alarso16/msync-peer-tracker
Open

Add peer tracking to merkle/sync#5415
alarso16 wants to merge 1 commit into
masterfrom
alarso16/msync-peer-tracker

Conversation

@alarso16
Copy link
Copy Markdown
Contributor

@alarso16 alarso16 commented May 21, 2026

Why this should be merged

We need some method of peer discovery for merkle sync. Many nodes will not implement the p2p.Client for Firewood, or drop the request, or not store enough state, or even generate malicious proofs. In all of these cases, the syncer should learn from this mistake and prioritize other peers.

How this works

coreth and subnet-evm already use a p2p.PeerTracker to track bandwidth, so we can use it with this state syncer as well.

How this was tested

I added a new unit test to check if the peer tracker is being used, but I don't know what else can be done to ensure responses are logged correctly, since the peer tracker exports very little data.

Need to be documented in RELEASES.md?

No

@alarso16 alarso16 force-pushed the alarso16/msync-peer-tracker branch from 8fdc6dd to c0cb54d Compare May 21, 2026 18:12
@alarso16 alarso16 changed the title Add tracking to merkle/sync Add peer tracking to merkle/sync May 21, 2026
@alarso16 alarso16 moved this to In Progress 🏗️ in avalanchego May 21, 2026
@alarso16 alarso16 force-pushed the alarso16/msync-peer-tracker branch from c0cb54d to f477c5e Compare May 21, 2026 18:45
@alarso16 alarso16 force-pushed the alarso16/msync-peer-tracker branch from f477c5e to b2381be Compare May 21, 2026 18:54
@alarso16 alarso16 marked this pull request as ready for review May 21, 2026 20:05
@alarso16 alarso16 requested a review from a team as a code owner May 21, 2026 20:05
Copilot AI review requested due to automatic review settings May 21, 2026 20:05
@alarso16 alarso16 requested a review from a team as a code owner May 21, 2026 20:05
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds peer performance tracking to database/merkle/sync by integrating p2p.PeerTracker, allowing the syncer to learn from slow/failed peers and prioritize better ones. It also updates the EVM/Firewood sync wiring to pass through the peer tracker and construct the appropriate p2p client.

Changes:

  • Extend merkle sync configuration to accept an optional *p2p.PeerTracker and use it for peer selection and response/failure registration.
  • Update graft coreth/subnet-evm network facades to expose PeerTracker() rather than bespoke response/failure registration methods.
  • Add a unit test verifying that PeerTracker drives node selection when provided.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
graft/subnet-evm/network/network.go Exposes the network’s PeerTracker() to consumers.
graft/coreth/network/network.go Exposes the network’s PeerTracker() to consumers.
graft/evm/sync/engine/client.go Passes PeerTracker into Firewood syncer config and constructs a dedicated p2p client via P2PNetwork().NewClient.
graft/evm/sync/client/test_network.go Updates test network to satisfy the new PeerTracker() requirement.
graft/evm/sync/client/test_client.go Updates Client interface conformance (now includes Network()), but currently returns nil.
graft/evm/sync/client/client.go Switches request outcome tracking to use a held *p2p.PeerTracker instead of network callbacks; exposes Network().
graft/evm/sync/client/BUILD.bazel Adds new deps needed by updated test network code (prometheus/logging/set).
graft/evm/go.mod Promotes prometheus/client_golang to a direct dependency (now imported).
database/merkle/sync/syncer.go Adds PeerTracker to config and routes requests through it when provided (including bandwidth/failure recording).
database/merkle/sync/sync_test.go Adds unit test ensuring PeerTracker drives peer selection.
database/merkle/sync/BUILD.bazel Adds //utils/set dep for the new unit test.
database/merkle/firewood/syncer/syncer.go Threads PeerTracker through to the underlying merkle sync config.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +532 to +535
nodeID, ok := pt.SelectPeer()
if !ok {
return errors.New("no peers available")
}
Comment on lines +63 to 65
func (*TestClient) Network() Network {
return nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct -- compare to AddClient right?

Comment on lines +414 to 415
network := c.config.Client.Network()
stateSyncer, err = evmstate.NewFirewoodSyncer(
Copy link
Copy Markdown
Contributor

@JonathanOppenheimer JonathanOppenheimer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an important PR! you should also probably determine if the copilot comments are BS or not.

Comment on lines +322 to +323
// Test_Sync_WithPeerTracker ensures that the peer tracker is used for sampling if provided.
func Test_Sync_WithPeerTracker(t *testing.T) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been reading through the go style guide and recall that this is allowed

Test, Benchmark and Example function names within *_test.go files may include underscores.

but we never seemingly do this for other tests. Why did you do it here?

if len(s.config.StateSyncNodes) == 0 {
return client.AppRequestAny(ctx, requestBytes, onResponse)
switch {
case len(s.config.StateSyncNodes) > 0:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't appear that you test the precedence or ordering for this new behavior in the switch here. We have TestStateSyncNodes and the new Test_Sync_WithPeerTracker, but would a small precedence test for this code here make sense?

@@ -144,14 +144,18 @@ type Syncer[R any, C any] struct {

// TODO remove non-config values out of this struct
type Config[R any, C any] struct {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish we would merge the struct ordering linter lol

Comment on lines -61 to -66
// RegisterResponse records a successful response from nodeID with the
// observed bandwidth (response bytes divided by request time).
RegisterResponse(nodeID ids.NodeID, bandwidth float64)

// RegisterFailure records a failed response from nodeID.
RegisterFailure(nodeID ids.NodeID)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You deleted these from the interface, but you didn't actually delete the implementations. I think this is now dead code. You should do the same in subnet-evm

Image

s.metrics.RequestMade()
}

type appResponseCallbackWithCheck func(ctx context.Context, nodeID ids.NodeID, responseBytes []byte, err error) bool
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe withResult instead? It's not really a check.

}
}

func sendRequestWithPeerTracker(ctx context.Context, c *p2p.Client, pt *p2p.PeerTracker, requestBytes []byte, onResponse appResponseCallbackWithCheck) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also why isn't this on the syncer?

Comment on lines +63 to 65
func (*TestClient) Network() Network {
return nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct -- compare to AddClient right?

}

// PeerTracker returns a new empty peer tracker.
func (*testNetwork) PeerTracker() *p2p.PeerTracker {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to cache the tracker like in production? If a test calls PeerTracker() twice, they'd see different things.

}

// Test_Sync_WithPeerTracker ensures that the peer tracker is used for sampling if provided.
func Test_Sync_WithPeerTracker(t *testing.T) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two more test ideas -- right now this only tests which node was selected. You write in the description about testing difficulties but you could

  • Add two peers, return a malformed proof for one, force a retry, assert the second request goes to the other peer (SelectPeer SHOULD prefer the responsive one).
  • No-peers error path: don't call pt.Connected, and assert Sync() fails with the expected error (maybe make "no peers available" a sentinel error?)

func sendRequestWithPeerTracker(ctx context.Context, c *p2p.Client, pt *p2p.PeerTracker, requestBytes []byte, onResponse appResponseCallbackWithCheck) error {
nodeID, ok := pt.SelectPeer()
if !ok {
return errors.New("no peers available")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a node starts state sync before any peer connects to the firewood handler, wouldn't the sync abort permanently? Shouldn't we retry in this case? or do something else?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the EVM client will sleep and then retry:

func (c *client) get(ctx context.Context, request message.Request, parseFn parseResponseFn) (interface{}, error) {
// marshal the request into requestBytes
requestBytes, err := message.RequestToBytes(c.codec, request)
if err != nil {
return nil, err
}
metric, err := c.stats.GetMetric(request)
if err != nil {
return nil, err
}
var (
responseIntf interface{}
numElements int
lastErr error
)
// Loop until the context is cancelled or we get a valid response.
for attempt := 0; ; attempt++ {
// If the context has finished, return the context error early.
if ctxErr := ctx.Err(); ctxErr != nil {
if lastErr != nil {
return nil, fmt.Errorf("request failed after %d attempts with last error %w and ctx error %w", attempt, lastErr, ctxErr)
} else {
return nil, ctxErr
}
}
metric.IncRequested()
var (
response []byte
nodeID ids.NodeID
start = time.Now()
)
if len(c.stateSyncNodes) == 0 {
response, nodeID, err = c.network.SendSyncedAppRequestAny(ctx, requestBytes)
} else {
// get the next nodeID using the nodeIdx offset. If we're out of nodes, loop back to 0
// we do this every attempt to ensure we get a different node each time if possible.
nodeIdx := atomic.AddUint32(&c.stateSyncNodeIdx, 1)
nodeID = c.stateSyncNodes[nodeIdx%uint32(len(c.stateSyncNodes))]
response, err = c.network.SendSyncedAppRequest(ctx, nodeID, requestBytes)
}
metric.UpdateRequestLatency(time.Since(start))
if err != nil {
logCtx := make([]any, 0, 8)
if nodeID != ids.EmptyNodeID {
logCtx = append(logCtx, "nodeID", nodeID)
}
logCtx = append(logCtx, "attempt", attempt, "request", request, "err", err)
log.Debug("request failed, retrying", logCtx...)
metric.IncFailed()
c.pt.RegisterFailure(nodeID)
time.Sleep(failedRequestSleepInterval)
continue
} else {
responseIntf, numElements, err = parseFn(c.codec, request, response)
if err != nil {
lastErr = err
log.Debug("could not validate response, retrying", "nodeID", nodeID, "attempt", attempt, "request", request, "err", err)
c.pt.RegisterFailure(nodeID)
metric.IncFailed()
metric.IncInvalidResponse()
continue
}
bandwidth := float64(len(response)) / (time.Since(start).Seconds() + epsilon)
c.pt.RegisterResponse(nodeID, bandwidth)
metric.IncSucceeded()
metric.IncReceived(int64(numElements))
return responseIntf, nil
}
}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: In Progress 🏗️

Development

Successfully merging this pull request may close these issues.

3 participants