Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions graft/evm/sync/code/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type Syncer struct {
numWorkers int
codeHashesPerReq int // best-effort target size - final batch may be smaller

// inFlight tracks code hashes currently being processed to dedupe work
// across workers and across repeated queue submissions.
// inFlight ensures only one worker fetches each hash. The slow path
// skips it so it can always clean up markers that AddCode just rewrote.
inFlight sync.Map // key: common.Hash, value: struct{}
}

Expand Down Expand Up @@ -133,26 +133,21 @@ func (c *Syncer) work(ctx context.Context) error {
return nil
}

// Deduplicate in-flight code hashes across workers first to avoid
// racing repeated HasCode() checks for the same hash.
if _, loaded := c.inFlight.LoadOrStore(codeHash, struct{}{}); loaded {
continue
}

// After acquiring responsibility for this hash, re-check whether the code
// is already present locally. If so, clean up and release responsibility.
// Slow path: code already on disk. Idempotent and ungated, so any
// concurrent AddCode rewrite is re-cleaned on its next dequeue.
if rawdb.HasCode(c.db, codeHash) {
// Best-effort cleanup of stale marker.
batch := c.db.NewBatch()
if err := customrawdb.DeleteCodeToFetch(batch, codeHash); err != nil {
return fmt.Errorf("failed to delete stale code marker: %w", err)
}

if err := batch.Write(); err != nil {
return fmt.Errorf("failed to write batch for stale code marker: %w", err)
}
// Release in-flight ownership since no network fetch is needed.
c.inFlight.Delete(codeHash)
continue
}

// Fast path: dedupe concurrent network fetches for the same hash.
if _, loaded := c.inFlight.LoadOrStore(codeHash, struct{}{}); loaded {
continue
}

Expand Down
115 changes: 115 additions & 0 deletions graft/evm/sync/code/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package code
import (
"context"
"errors"
"sync"
"testing"

"github.com/ava-labs/libevm/common"
Expand Down Expand Up @@ -161,6 +162,120 @@ func TestCodeSyncerAddsInProgressCodeHashes(t *testing.T) {
})
}

// TestCodeSyncerCleansMarkerWhenCodeOnDiskAndInFlightHeld is a deterministic
// regression test for an orphan code-to-fetch marker. When the code is
// already on disk, the worker must delete the marker even if inFlight is
// already held by another worker.
func TestCodeSyncerCleansMarkerWhenCodeOnDiskAndInFlightHeld(t *testing.T) {
t.Parallel()

messagetest.ForEachCodec(t, func(c codec.Manager, _ message.LeafsRequestType) {
codeBytes := utils.RandomBytes(100)
codeHash := crypto.Keccak256Hash(codeBytes)

clientDB := rawdb.NewMemoryDatabase()
rawdb.WriteCode(clientDB, codeHash, codeBytes)
require.NoError(t, customrawdb.WriteCodeToFetch(clientDB, codeHash))

serverDB := memorydb.New()
rawdb.WriteCode(serverDB, codeHash, codeBytes)
handler := handlers.NewCodeRequestHandler(serverDB, c, handlerstats.NewNoopHandlerStats())
mockClient := client.NewTestClient(c, nil, handler, nil)

ch := make(chan common.Hash, 1)
codeSyncer, err := NewSyncer(mockClient, clientDB, ch)
require.NoError(t, err)

// Pretend another worker is already processing this hash.
codeSyncer.inFlight.Store(codeHash, struct{}{})
Comment thread
powerslider marked this conversation as resolved.
Outdated

ch <- codeHash
close(ch)

require.NoError(t, codeSyncer.Sync(t.Context()))

it := customrawdb.NewCodeToFetchIterator(clientDB)
defer it.Release()
require.False(t, it.Next(), "stale code-to-fetch marker remained after sync")
require.NoError(t, it.Error())
})
}

// TestCodeSyncerDuplicateAddCodeNoMarkerLeak stresses the same invariant
// end-to-end: many producers hammer AddCode for one hash, and after sync
// no code-to-fetch markers may remain. Two variants:
//
// - "code-already-on-disk": code is pre-written, so each dequeue only
// needs to delete the marker.
// - "code-fetched-during-sync": the disk starts empty, the first dequeue
// fetches the code from the network and later duplicates only delete.
func TestCodeSyncerDuplicateAddCodeNoMarkerLeak(t *testing.T) {
tests := []struct {
name string
preWriteCode bool
}{
{name: "code-already-on-disk", preWriteCode: true},
{name: "code-fetched-during-sync", preWriteCode: false},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
messagetest.ForEachCodec(t, func(c codec.Manager, _ message.LeafsRequestType) {
codeBytes := utils.RandomBytes(100)
codeHash := crypto.Keccak256Hash(codeBytes)

clientDB := rawdb.NewMemoryDatabase()
if tt.preWriteCode {
rawdb.WriteCode(clientDB, codeHash, codeBytes)
}

serverDB := memorydb.New()
rawdb.WriteCode(serverDB, codeHash, codeBytes)
handler := handlers.NewCodeRequestHandler(serverDB, c, handlerstats.NewNoopHandlerStats())
mockClient := client.NewTestClient(c, nil, handler, nil)

codeQueue, err := NewQueue(clientDB)
require.NoError(t, err)

codeSyncer, err := NewSyncer(mockClient, clientDB, codeQueue.CodeHashes())
require.NoError(t, err)

// Tight-loop AddCode from many producers maximises the chance of
// landing inside a worker's marker-cleanup window.
const (
numProducers = 8
Comment thread
alarso16 marked this conversation as resolved.
iterations = 50_000
)
var wg sync.WaitGroup
wg.Add(numProducers)
for range numProducers {
go func() {
defer wg.Done()
for range iterations {
if err := codeQueue.AddCode(t.Context(), []common.Hash{codeHash}); err != nil {
return
Comment thread
powerslider marked this conversation as resolved.
Outdated
}
}
}()
}
go func() {
wg.Wait()
_ = codeQueue.Finalize()
Comment thread
powerslider marked this conversation as resolved.
Outdated
}()

require.NoError(t, codeSyncer.Sync(t.Context()))
require.Equal(t, codeBytes, rawdb.ReadCode(clientDB, codeHash))

it := customrawdb.NewCodeToFetchIterator(clientDB)
defer it.Release()
require.False(t, it.Next(), "stale code-to-fetch marker remained after sync")
require.NoError(t, it.Error())
})
})
}
}

func TestCodeSyncerAddsMoreInProgressThanQueueSize(t *testing.T) {
t.Parallel()
numCodeSlices := 100
Expand Down
Loading