Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions graft/evm/sync/code/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type Syncer struct {
numWorkers int
codeHashesPerReq int // best-effort target size - final batch may be smaller

// inFlight tracks code hashes currently being processed to dedupe work
// across workers and across repeated queue submissions.
// inFlight ensures only one worker fetches each hash. The slow path
// skips it so it can always clean up markers that AddCode just rewrote.
inFlight sync.Map // key: common.Hash, value: struct{}
}

Expand Down Expand Up @@ -133,26 +133,21 @@ func (c *Syncer) work(ctx context.Context) error {
return nil
}

// Deduplicate in-flight code hashes across workers first to avoid
// racing repeated HasCode() checks for the same hash.
if _, loaded := c.inFlight.LoadOrStore(codeHash, struct{}{}); loaded {
continue
}

// After acquiring responsibility for this hash, re-check whether the code
// is already present locally. If so, clean up and release responsibility.
// Slow path: code already on disk. Idempotent and ungated, so any
// concurrent AddCode rewrite is re-cleaned on its next dequeue.
if rawdb.HasCode(c.db, codeHash) {
// Best-effort cleanup of stale marker.
batch := c.db.NewBatch()
if err := customrawdb.DeleteCodeToFetch(batch, codeHash); err != nil {
return fmt.Errorf("failed to delete stale code marker: %w", err)
}

if err := batch.Write(); err != nil {
return fmt.Errorf("failed to write batch for stale code marker: %w", err)
}
// Release in-flight ownership since no network fetch is needed.
c.inFlight.Delete(codeHash)
continue
}

// Fast path: dedupe concurrent network fetches for the same hash.
if _, loaded := c.inFlight.LoadOrStore(codeHash, struct{}{}); loaded {
continue
}

Expand Down
181 changes: 181 additions & 0 deletions graft/evm/sync/code/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package code
import (
"context"
"errors"
"sync/atomic"
"testing"

"github.com/ava-labs/libevm/common"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/ava-labs/libevm/ethdb"
"github.com/ava-labs/libevm/ethdb/memorydb"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/graft/evm/message"
Expand Down Expand Up @@ -161,6 +163,185 @@ func TestCodeSyncerAddsInProgressCodeHashes(t *testing.T) {
})
}

// TestCodeSyncerCleansMarkerRewrittenMidCleanup is a deterministic
// regression test for an orphan code-to-fetch marker. A wrapped client
// DB pauses the first Batch.Write after the delete commits. The test
// then rewrites the marker and enqueues a duplicate, forcing a sibling
// worker to handle a marker recreated mid-cleanup.
func TestCodeSyncerCleansMarkerRewrittenMidCleanup(t *testing.T) {
t.Parallel()

messagetest.ForEachCodec(t, func(c codec.Manager, _ message.LeafsRequestType) {
codeBytes := utils.RandomBytes(100)
codeHash := crypto.Keccak256Hash(codeBytes)

// probeHash is a synchronization barrier. Its code is on disk so
// dequeueing it does not affect codeHash's marker.
probeBytes := utils.RandomBytes(100)
probeHash := crypto.Keccak256Hash(probeBytes)

rawDB := rawdb.NewMemoryDatabase()
clientDB := newBlockingBatchDB(rawDB)

rawdb.WriteCode(rawDB, codeHash, codeBytes)
require.NoError(t, customrawdb.WriteCodeToFetch(rawDB, codeHash))
Comment thread
powerslider marked this conversation as resolved.
rawdb.WriteCode(rawDB, probeHash, probeBytes)

serverDB := memorydb.New()
rawdb.WriteCode(serverDB, codeHash, codeBytes)
rawdb.WriteCode(serverDB, probeHash, probeBytes)
handler := handlers.NewCodeRequestHandler(serverDB, c, handlerstats.NewNoopHandlerStats())
mockClient := client.NewTestClient(c, nil, handler, nil)

ch := make(chan common.Hash)
codeSyncer, err := NewSyncer(mockClient, clientDB, ch, WithNumWorkers(2))
require.NoError(t, err)

syncErrCh := make(chan error, 1)
go func() { syncErrCh <- codeSyncer.Sync(t.Context()) }()

// A worker takes codeHash, commits the marker delete, then pauses
// inside the wrapped Batch.Write.
ch <- codeHash
<-clientDB.blocked

// Rewrite the marker (modelling a concurrent [Queue.AddCode]) and enqueue
// a duplicate for a sibling worker.
require.NoError(t, customrawdb.WriteCodeToFetch(rawDB, codeHash))
ch <- codeHash

// Barrier! This send returns only after the sibling has processed
// the duplicate and is back at the channel receive, so its decision
// is committed before we release the paused worker.
ch <- probeHash

close(clientDB.release)
close(ch)

require.NoError(t, <-syncErrCh)

it := customrawdb.NewCodeToFetchIterator(rawDB)
defer it.Release()
require.False(t, it.Next(), "stale code-to-fetch marker remained after sync")
require.NoError(t, it.Error())
})
}

// blockingBatchDB pauses inside the first Batch.Write after the commit
// lands. Subsequent writes are not blocked.
type blockingBatchDB struct {
ethdb.Database
primed atomic.Bool
blocked chan struct{}
release chan struct{}
}

func newBlockingBatchDB(inner ethdb.Database) *blockingBatchDB {
return &blockingBatchDB{
Database: inner,
blocked: make(chan struct{}),
release: make(chan struct{}),
}
}

func (db *blockingBatchDB) NewBatch() ethdb.Batch {
return &blockingBatch{Batch: db.Database.NewBatch(), db: db}
}

type blockingBatch struct {
ethdb.Batch
db *blockingBatchDB
}

func (b *blockingBatch) Write() error {
err := b.Batch.Write()
if b.db.primed.CompareAndSwap(false, true) {
close(b.db.blocked)
<-b.db.release
}
return err
}

// TestCodeSyncerDuplicateAddCodeNoMarkerLeak stresses the same invariant
// end-to-end: many producers hammer AddCode for one hash, and after sync
// no code-to-fetch markers may remain. Two variants:
//
// - "code-already-on-disk": code is pre-written, so each dequeue only
// needs to delete the marker.
// - "code-fetched-during-sync": the disk starts empty, the first dequeue
// fetches the code from the network and later duplicates only delete.
func TestCodeSyncerDuplicateAddCodeNoMarkerLeak(t *testing.T) {
tests := []struct {
name string
preWriteCode bool
}{
{name: "code-already-on-disk", preWriteCode: true},
{name: "code-fetched-during-sync", preWriteCode: false},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
messagetest.ForEachCodec(t, func(c codec.Manager, _ message.LeafsRequestType) {
codeBytes := utils.RandomBytes(100)
codeHash := crypto.Keccak256Hash(codeBytes)

clientDB := rawdb.NewMemoryDatabase()
if tt.preWriteCode {
rawdb.WriteCode(clientDB, codeHash, codeBytes)
}

serverDB := memorydb.New()
rawdb.WriteCode(serverDB, codeHash, codeBytes)
handler := handlers.NewCodeRequestHandler(serverDB, c, handlerstats.NewNoopHandlerStats())
mockClient := client.NewTestClient(c, nil, handler, nil)

codeQueue, err := NewQueue(clientDB)
require.NoError(t, err)

const (
numWorkers = 8
numProducers = 8
Comment thread
alarso16 marked this conversation as resolved.
iterations = 50_000
)
codeSyncer, err := NewSyncer(mockClient, clientDB, codeQueue.CodeHashes(), WithNumWorkers(numWorkers))
require.NoError(t, err)

// Run the syncer in a goroutine so the main test can drive
// producers and Finalize from the foreground.
syncErrCh := make(chan error, 1)
go func() {
syncErrCh <- codeSyncer.Sync(t.Context())
}()

// Tight-loop AddCode from many producers maximises the chance
// of landing inside a worker's marker-cleanup window.
var producers errgroup.Group
for range numProducers {
producers.Go(func() error {
for range iterations {
if err := codeQueue.AddCode(t.Context(), []common.Hash{codeHash}); err != nil {
return err
}
}
return nil
})
}
require.NoError(t, producers.Wait())
require.NoError(t, codeQueue.Finalize())
require.NoError(t, <-syncErrCh)

require.Equal(t, codeBytes, rawdb.ReadCode(clientDB, codeHash))

it := customrawdb.NewCodeToFetchIterator(clientDB)
defer it.Release()
require.False(t, it.Next(), "stale code-to-fetch marker remained after sync")
require.NoError(t, it.Error())
})
})
}
}

func TestCodeSyncerAddsMoreInProgressThanQueueSize(t *testing.T) {
t.Parallel()
numCodeSlices := 100
Expand Down
Loading