diff --git a/graft/evm/sync/code/syncer.go b/graft/evm/sync/code/syncer.go index ef923b067bb2..7cff02e3cbcf 100644 --- a/graft/evm/sync/code/syncer.go +++ b/graft/evm/sync/code/syncer.go @@ -38,8 +38,8 @@ type Syncer struct { numWorkers int codeHashesPerReq int // best-effort target size - final batch may be smaller - // inFlight tracks code hashes currently being processed to dedupe work - // across workers and across repeated queue submissions. + // inFlight ensures only one worker fetches each hash. The slow path + // skips it so it can always clean up markers that AddCode just rewrote. inFlight sync.Map // key: common.Hash, value: struct{} } @@ -133,26 +133,21 @@ func (c *Syncer) work(ctx context.Context) error { return nil } - // Deduplicate in-flight code hashes across workers first to avoid - // racing repeated HasCode() checks for the same hash. - if _, loaded := c.inFlight.LoadOrStore(codeHash, struct{}{}); loaded { - continue - } - - // After acquiring responsibility for this hash, re-check whether the code - // is already present locally. If so, clean up and release responsibility. + // Slow path: code already on disk. Idempotent and ungated, so any + // concurrent AddCode rewrite is re-cleaned on its next dequeue. if rawdb.HasCode(c.db, codeHash) { - // Best-effort cleanup of stale marker. batch := c.db.NewBatch() if err := customrawdb.DeleteCodeToFetch(batch, codeHash); err != nil { return fmt.Errorf("failed to delete stale code marker: %w", err) } - if err := batch.Write(); err != nil { return fmt.Errorf("failed to write batch for stale code marker: %w", err) } - // Release in-flight ownership since no network fetch is needed. - c.inFlight.Delete(codeHash) + continue + } + + // Fast path: dedupe concurrent network fetches for the same hash. + if _, loaded := c.inFlight.LoadOrStore(codeHash, struct{}{}); loaded { continue } diff --git a/graft/evm/sync/code/syncer_test.go b/graft/evm/sync/code/syncer_test.go index 822e180c7893..9ae15ff3dfc2 100644 --- a/graft/evm/sync/code/syncer_test.go +++ b/graft/evm/sync/code/syncer_test.go @@ -6,6 +6,7 @@ package code import ( "context" "errors" + "sync/atomic" "testing" "github.com/ava-labs/libevm/common" @@ -14,6 +15,7 @@ import ( "github.com/ava-labs/libevm/ethdb" "github.com/ava-labs/libevm/ethdb/memorydb" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/ava-labs/avalanchego/codec" "github.com/ava-labs/avalanchego/graft/evm/message" @@ -161,6 +163,185 @@ func TestCodeSyncerAddsInProgressCodeHashes(t *testing.T) { }) } +// TestCodeSyncerCleansMarkerRewrittenMidCleanup is a deterministic +// regression test for an orphan code-to-fetch marker. A wrapped client +// DB pauses the first Batch.Write after the delete commits. The test +// then rewrites the marker and enqueues a duplicate, forcing a sibling +// worker to handle a marker recreated mid-cleanup. +func TestCodeSyncerCleansMarkerRewrittenMidCleanup(t *testing.T) { + t.Parallel() + + messagetest.ForEachCodec(t, func(c codec.Manager, _ message.LeafsRequestType) { + codeBytes := utils.RandomBytes(100) + codeHash := crypto.Keccak256Hash(codeBytes) + + // probeHash is a synchronization barrier. Its code is on disk so + // dequeueing it does not affect codeHash's marker. + probeBytes := utils.RandomBytes(100) + probeHash := crypto.Keccak256Hash(probeBytes) + + rawDB := rawdb.NewMemoryDatabase() + clientDB := newBlockingBatchDB(rawDB) + + rawdb.WriteCode(rawDB, codeHash, codeBytes) + require.NoError(t, customrawdb.WriteCodeToFetch(rawDB, codeHash)) + rawdb.WriteCode(rawDB, probeHash, probeBytes) + + serverDB := memorydb.New() + rawdb.WriteCode(serverDB, codeHash, codeBytes) + rawdb.WriteCode(serverDB, probeHash, probeBytes) + handler := handlers.NewCodeRequestHandler(serverDB, c, handlerstats.NewNoopHandlerStats()) + mockClient := client.NewTestClient(c, nil, handler, nil) + + ch := make(chan common.Hash) + codeSyncer, err := NewSyncer(mockClient, clientDB, ch, WithNumWorkers(2)) + require.NoError(t, err) + + syncErrCh := make(chan error, 1) + go func() { syncErrCh <- codeSyncer.Sync(t.Context()) }() + + // A worker takes codeHash, commits the marker delete, then pauses + // inside the wrapped Batch.Write. + ch <- codeHash + <-clientDB.blocked + + // Rewrite the marker (modelling a concurrent [Queue.AddCode]) and enqueue + // a duplicate for a sibling worker. + require.NoError(t, customrawdb.WriteCodeToFetch(rawDB, codeHash)) + ch <- codeHash + + // Barrier! This send returns only after the sibling has processed + // the duplicate and is back at the channel receive, so its decision + // is committed before we release the paused worker. + ch <- probeHash + + close(clientDB.release) + close(ch) + + require.NoError(t, <-syncErrCh) + + it := customrawdb.NewCodeToFetchIterator(rawDB) + defer it.Release() + require.False(t, it.Next(), "stale code-to-fetch marker remained after sync") + require.NoError(t, it.Error()) + }) +} + +// blockingBatchDB pauses inside the first Batch.Write after the commit +// lands. Subsequent writes are not blocked. +type blockingBatchDB struct { + ethdb.Database + primed atomic.Bool + blocked chan struct{} + release chan struct{} +} + +func newBlockingBatchDB(inner ethdb.Database) *blockingBatchDB { + return &blockingBatchDB{ + Database: inner, + blocked: make(chan struct{}), + release: make(chan struct{}), + } +} + +func (db *blockingBatchDB) NewBatch() ethdb.Batch { + return &blockingBatch{Batch: db.Database.NewBatch(), db: db} +} + +type blockingBatch struct { + ethdb.Batch + db *blockingBatchDB +} + +func (b *blockingBatch) Write() error { + err := b.Batch.Write() + if b.db.primed.CompareAndSwap(false, true) { + close(b.db.blocked) + <-b.db.release + } + return err +} + +// TestCodeSyncerDuplicateAddCodeNoMarkerLeak stresses the same invariant +// end-to-end: many producers hammer AddCode for one hash, and after sync +// no code-to-fetch markers may remain. Two variants: +// +// - "code-already-on-disk": code is pre-written, so each dequeue only +// needs to delete the marker. +// - "code-fetched-during-sync": the disk starts empty, the first dequeue +// fetches the code from the network and later duplicates only delete. +func TestCodeSyncerDuplicateAddCodeNoMarkerLeak(t *testing.T) { + tests := []struct { + name string + preWriteCode bool + }{ + {name: "code-already-on-disk", preWriteCode: true}, + {name: "code-fetched-during-sync", preWriteCode: false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + messagetest.ForEachCodec(t, func(c codec.Manager, _ message.LeafsRequestType) { + codeBytes := utils.RandomBytes(100) + codeHash := crypto.Keccak256Hash(codeBytes) + + clientDB := rawdb.NewMemoryDatabase() + if tt.preWriteCode { + rawdb.WriteCode(clientDB, codeHash, codeBytes) + } + + serverDB := memorydb.New() + rawdb.WriteCode(serverDB, codeHash, codeBytes) + handler := handlers.NewCodeRequestHandler(serverDB, c, handlerstats.NewNoopHandlerStats()) + mockClient := client.NewTestClient(c, nil, handler, nil) + + codeQueue, err := NewQueue(clientDB) + require.NoError(t, err) + + const ( + numWorkers = 8 + numProducers = 8 + iterations = 50_000 + ) + codeSyncer, err := NewSyncer(mockClient, clientDB, codeQueue.CodeHashes(), WithNumWorkers(numWorkers)) + require.NoError(t, err) + + // Run the syncer in a goroutine so the main test can drive + // producers and Finalize from the foreground. + syncErrCh := make(chan error, 1) + go func() { + syncErrCh <- codeSyncer.Sync(t.Context()) + }() + + // Tight-loop AddCode from many producers maximises the chance + // of landing inside a worker's marker-cleanup window. + var producers errgroup.Group + for range numProducers { + producers.Go(func() error { + for range iterations { + if err := codeQueue.AddCode(t.Context(), []common.Hash{codeHash}); err != nil { + return err + } + } + return nil + }) + } + require.NoError(t, producers.Wait()) + require.NoError(t, codeQueue.Finalize()) + require.NoError(t, <-syncErrCh) + + require.Equal(t, codeBytes, rawdb.ReadCode(clientDB, codeHash)) + + it := customrawdb.NewCodeToFetchIterator(clientDB) + defer it.Release() + require.False(t, it.Next(), "stale code-to-fetch marker remained after sync") + require.NoError(t, it.Error()) + }) + }) + } +} + func TestCodeSyncerAddsMoreInProgressThanQueueSize(t *testing.T) { t.Parallel() numCodeSlices := 100