diff --git a/vms/saevm/cchain/BUILD.bazel b/vms/saevm/cchain/BUILD.bazel index e7c59c44b3c8..93dafa002d4f 100644 --- a/vms/saevm/cchain/BUILD.bazel +++ b/vms/saevm/cchain/BUILD.bazel @@ -17,12 +17,14 @@ go_library( name = "cchain", srcs = [ "api.go", + "gossip.go", "hooks.go", "vm.go", ], importpath = "github.com/ava-labs/avalanchego/vms/saevm/cchain", deps = [ "//api", + "//api/metrics", "//database", "//database/prefixdb", "//graft/coreth/core/extstate", @@ -30,9 +32,12 @@ go_library( "//graft/evm/constants", "//graft/evm/utils/rpc", "//ids", + "//network/p2p", + "//network/p2p/gossip", "//snow", "//snow/engine/common", "//snow/engine/snowman/block", + "//utils/bloom", "//utils/constants", "//utils/formatting", "//utils/formatting/address", @@ -71,6 +76,7 @@ go_test( name = "cchain_test", srcs = [ "api_test.go", + "gossip_test.go", "hooks_test.go", "vm_test.go", ], @@ -85,12 +91,14 @@ go_test( "//ids", "//snow", "//snow/engine/common", - "//snow/engine/enginetest", "//snow/engine/snowman/block", "//snow/snowtest", + "//snow/validators", + "//snow/validators/validatorstest", "//utils/crypto/secp256k1", "//utils/logging", "//utils/set", + "//version", "//vms/components/avax", "//vms/saevm/blocks", "//vms/saevm/cchain/tx", diff --git a/vms/saevm/cchain/api.go b/vms/saevm/cchain/api.go index 649c23bab932..2c20202e0bfc 100644 --- a/vms/saevm/cchain/api.go +++ b/vms/saevm/cchain/api.go @@ -14,6 +14,7 @@ import ( "github.com/ava-labs/avalanchego/api" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/formatting" @@ -32,9 +33,10 @@ import ( // The type is unexported but its methods are exported because gorilla RPC // reflects on them to dispatch requests. type service struct { - ctx *snow.Context - txpool *txpool.Txpool - state *state.State + ctx *snow.Context + txpool *txpool.Txpool + pushGossiper *gossip.PushGossiper[*gossipTx] + state *state.State chainAlias string hrp string @@ -44,6 +46,7 @@ type service struct { func newService( ctx *snow.Context, pool *txpool.Txpool, + pushGossiper *gossip.PushGossiper[*gossipTx], db *state.State, ) (*service, error) { chainAlias, err := ctx.BCLookup.PrimaryAlias(ctx.ChainID) @@ -58,9 +61,10 @@ func newService( } return &service{ - ctx: ctx, - txpool: pool, - state: db, + ctx: ctx, + txpool: pool, + pushGossiper: pushGossiper, + state: db, chainAlias: chainAlias, hrp: hrp, @@ -220,11 +224,12 @@ func (s *service) IssueTx(_ *http.Request, a *api.FormattedTx, r *api.JSONTxID) return fmt.Errorf("parsing transaction: %w", err) } - if err := s.txpool.Add(t); err != nil { + if err := s.txpool.Add(t); err != nil && !errors.Is(err, txpool.ErrAlreadyKnown) { return fmt.Errorf("%w: %w", errIssuingTx, err) } - // TODO(StephenButtolph): Push gossip the tx. + // Even if already in the pool from a peer's gossip, push it to peers. + s.pushGossiper.Add(toGossipTx(t)) r.TxID = t.ID() return nil diff --git a/vms/saevm/cchain/gossip.go b/vms/saevm/cchain/gossip.go new file mode 100644 index 000000000000..f6a12eaab376 --- /dev/null +++ b/vms/saevm/cchain/gossip.go @@ -0,0 +1,57 @@ +// Copyright (C) 2019, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package cchain + +import ( + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p/gossip" + "github.com/ava-labs/avalanchego/vms/saevm/cchain/tx" + "github.com/ava-labs/avalanchego/vms/saevm/cchain/txpool" +) + +var _ gossip.Gossipable = (*gossipTx)(nil) + +type gossipTx tx.Tx + +func toGossipTx(t *tx.Tx) *gossipTx { + return (*gossipTx)(t) +} + +func (t *gossipTx) GossipID() ids.ID { return t.toTx().ID() } +func (t *gossipTx) toTx() *tx.Tx { return (*tx.Tx)(t) } + +var _ gossip.Marshaller[*gossipTx] = gossipMarshaller{} + +type gossipMarshaller struct{} + +func (gossipMarshaller) MarshalGossip(t *gossipTx) ([]byte, error) { + return t.toTx().Bytes() +} + +func (gossipMarshaller) UnmarshalGossip(b []byte) (*gossipTx, error) { + t, err := tx.Parse(b) + return toGossipTx(t), err +} + +var _ gossip.Set[*gossipTx] = (*gossipTxPool)(nil) + +type gossipTxPool struct { + *txpool.Txpool +} + +func newGossipTxPool(pool *txpool.Txpool) *gossipTxPool { + return &gossipTxPool{Txpool: pool} +} + +func (p *gossipTxPool) Add(t *gossipTx) error { + return p.Txpool.Add(t.toTx()) +} + +func (p *gossipTxPool) Iterate(f func(*gossipTx) bool) { + for t := range p.Txpool.Iter() { + if !f(toGossipTx(t)) { + return + } + } +} diff --git a/vms/saevm/cchain/gossip_test.go b/vms/saevm/cchain/gossip_test.go new file mode 100644 index 000000000000..bb5955855311 --- /dev/null +++ b/vms/saevm/cchain/gossip_test.go @@ -0,0 +1,74 @@ +// Copyright (C) 2019, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package cchain + +import ( + "testing" + + "github.com/ava-labs/libevm/libevm/options" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/vms/saevm/cchain/tx" + "github.com/ava-labs/avalanchego/vms/saevm/cchain/tx/txtest" + "github.com/ava-labs/avalanchego/vms/saevm/saetest" +) + +// TestPushGossip verifies that a cross-chain transaction issued to an API node +// is push-gossiped to a validator for block building. +func TestPushGossip(t *testing.T) { + var ( + sk = txtest.NewKey(t) + withAlloc = options.Func[sutConfig](func(c *sutConfig) { + c.genesis.Alloc = saetest.MaxAllocFor(sk.EthAddress()) + }) + vdrID = ids.GenerateTestNodeID() + vdrs = set.Of(vdrID) + ) + apiCtx, api := newSUT(t, withAlloc, withValidators(vdrs)) + vdrCtx, vdr := newSUT(t, withAlloc, withNodeID(vdrID), withValidators(vdrs)) + saetest.Connect(t, api, vdr) + + w := newWallet(sk, api.snowCtx, api.Client) + stx := w.newMinimalTx(t) + require.NoErrorf(t, api.IssueTx(apiCtx, stx), "%T.IssueTx()", api.Client) + + blk := vdr.runConsensusLoop(vdrCtx, t) + if diff := cmp.Diff([]*tx.Tx{stx}, blockTxs(t, blk), txtest.CmpOpt()); diff != "" { + t.Errorf("%T built by validator after gossip (-want +got):\n%s", blk, diff) + } +} + +// TestPullGossip verifies that a validator will share a cross-chain transaction +// via pull gossip to another connected validator. +// +// The API node is only transitively connected to vdrB, so vdrB can only learn +// about the transaction by pulling it from vdrA. +func TestPullGossip(t *testing.T) { + var ( + sk = txtest.NewKey(t) + withAlloc = options.Func[sutConfig](func(c *sutConfig) { + c.genesis.Alloc = saetest.MaxAllocFor(sk.EthAddress()) + }) + vdrIDA = ids.GenerateTestNodeID() + vdrIDB = ids.GenerateTestNodeID() + vdrs = set.Of(vdrIDA, vdrIDB) + ) + apiCtx, api := newSUT(t, withAlloc, withValidators(vdrs)) + _, vdrA := newSUT(t, withAlloc, withNodeID(vdrIDA), withValidators(vdrs)) + vdrBCtx, vdrB := newSUT(t, withAlloc, withNodeID(vdrIDB), withValidators(vdrs)) + saetest.Connect(t, api, vdrA) + saetest.Connect(t, vdrA, vdrB) + + w := newWallet(sk, api.snowCtx, api.Client) + stx := w.newMinimalTx(t) + require.NoErrorf(t, api.IssueTx(apiCtx, stx), "%T.IssueTx()", api.Client) + + blk := vdrB.runConsensusLoop(vdrBCtx, t) + if diff := cmp.Diff([]*tx.Tx{stx}, blockTxs(t, blk), txtest.CmpOpt()); diff != "" { + t.Errorf("%T built by vdrB after gossip (-want +got):\n%s", blk, diff) + } +} diff --git a/vms/saevm/cchain/vm.go b/vms/saevm/cchain/vm.go index 5cca2dd7354a..ea4f7cce1e6d 100644 --- a/vms/saevm/cchain/vm.go +++ b/vms/saevm/cchain/vm.go @@ -13,16 +13,22 @@ import ( "fmt" "net/http" "slices" + "sync" + "time" "github.com/ava-labs/libevm/core" "github.com/ava-labs/libevm/core/rawdb" "github.com/ava-labs/libevm/core/txpool/legacypool" "github.com/ava-labs/libevm/triedb" + "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/graft/evm/utils/rpc" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/vms/evm/database" "github.com/ava-labs/avalanchego/vms/saevm/cchain/state" "github.com/ava-labs/avalanchego/vms/saevm/cchain/txpool" @@ -36,9 +42,14 @@ import ( type VM struct { *sae.VM // created by [VM.Initialize] - ctx *snow.Context - state *state.State - txpool *txpool.Txpool + // gossip frequencies are configurable to speed up testing. + pullGossipPeriod time.Duration + pushGossipPeriod time.Duration + + ctx *snow.Context + state *state.State + txpool *txpool.Txpool + pushGossiper *gossip.PushGossiper[*gossipTx] // onClose are executed in reverse order during [VM.Shutdown]. If a resource // depends on another resource, it MUST be added AFTER the resource it @@ -126,6 +137,61 @@ func (v *VM) Initialize( v.txpool.Close() return nil }) + + reg, err := metrics.MakeAndRegister(snowCtx.Metrics, "cchain") + if err != nil { + return fmt.Errorf("making metrics: %w", err) + } + bloomMetrics, err := bloom.NewMetrics("gossip_bloom", reg) + if err != nil { + return fmt.Errorf("creating gossip bloom metrics: %w", err) + } + gossipSet, err := gossip.NewBloomSet( + newGossipTxPool(v.txpool), + gossip.BloomSetConfig{ + Metrics: bloomMetrics, + }, + ) + if err != nil { + return fmt.Errorf("creating gossip bloom set: %w", err) + } + gossipHandler, pullGossiper, pushGossiper, err := gossip.NewSystem( + snowCtx.NodeID, + v.Network, + v.ValidatorPeers, + gossipSet, + gossipMarshaller{}, + gossip.SystemConfig{ + Log: snowCtx.Log, + Registry: reg, + Namespace: "gossip", + HandlerID: p2p.AtomicTxGossipHandlerID, + RequestPeriod: v.pullGossipPeriod, + }, + ) + if err != nil { + return fmt.Errorf("creating cross-chain tx gossip system: %w", err) + } + v.pushGossiper = pushGossiper + + if err := v.VM.AddHandler(p2p.AtomicTxGossipHandlerID, gossipHandler); err != nil { + return fmt.Errorf("registering cross-chain tx gossip handler: %w", err) + } + + gossipCtx, cancelGossip := context.WithCancel(context.Background()) + var gossipWG sync.WaitGroup + gossipWG.Go(func() { + gossip.Every(gossipCtx, snowCtx.Log, pullGossiper, v.pullGossipPeriod) + }) + gossipWG.Go(func() { + gossip.Every(gossipCtx, snowCtx.Log, pushGossiper, v.pushGossipPeriod) + }) + v.onClose = append(v.onClose, func(context.Context) error { + cancelGossip() + gossipWG.Wait() + return nil + }) + return nil } @@ -142,7 +208,7 @@ func (v *VM) CreateHandlers(ctx context.Context) (map[string]http.Handler, error return nil, fmt.Errorf("creating SAE handlers: %w", err) } - service, err := newService(v.ctx, v.txpool, v.state) + service, err := newService(v.ctx, v.txpool, v.pushGossiper, v.state) if err != nil { return nil, fmt.Errorf("creating avax service: %w", err) } diff --git a/vms/saevm/cchain/vm_test.go b/vms/saevm/cchain/vm_test.go index 026bb52735ac..beb083ea9440 100644 --- a/vms/saevm/cchain/vm_test.go +++ b/vms/saevm/cchain/vm_test.go @@ -10,6 +10,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/ava-labs/libevm/common" "github.com/ava-labs/libevm/core" @@ -30,11 +31,14 @@ import ( "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/customtypes" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow" - "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/snow/snowtest" + "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/snow/validators/validatorstest" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/version" "github.com/ava-labs/avalanchego/vms/components/avax" "github.com/ava-labs/avalanchego/vms/saevm/blocks" "github.com/ava-labs/avalanchego/vms/saevm/cchain/tx" @@ -62,16 +66,37 @@ type SUT struct { snowCtx *snow.Context memory *atomic.Memory + sender *saetest.Sender ethclient *ethclient.Client } +// Implements [saetest.Peer] +func (s *SUT) NodeID() ids.NodeID { return s.snowCtx.NodeID } +func (s *SUT) Sender() *saetest.Sender { return s.sender } + type ( sutConfig struct { - genesis core.Genesis + genesis core.Genesis + nodeID ids.NodeID + validators set.Set[ids.NodeID] } sutOption = options.Option[sutConfig] ) +// withNodeID overrides the SUT's randomly generated NodeID. +func withNodeID(id ids.NodeID) sutOption { + return options.Func[sutConfig](func(c *sutConfig) { + c.nodeID = id + }) +} + +// withValidators adds each NodeID to the validator set with weight 1. +func withValidators(vdrs set.Set[ids.NodeID]) sutOption { + return options.Func[sutConfig](func(c *sutConfig) { + c.validators = vdrs + }) +} + // newSUT initializes a cchain [VM], transitions it to [snow.NormalOp], and // mounts its HTTP handlers behind a local [httptest.Server] at the paths // [NewClient] expects. @@ -79,7 +104,10 @@ func newSUT(tb testing.TB, opts ...sutOption) (context.Context, *SUT) { tb.Helper() var ( - vm = &VM{} + vm = &VM{ + pullGossipPeriod: 100 * time.Millisecond, + pushGossipPeriod: 100 * time.Millisecond, + } db = memdb.New() cfg = options.ApplyTo(&sutConfig{ genesis: core.Genesis{ @@ -88,6 +116,7 @@ func newSUT(tb testing.TB, opts ...sutOption) (context.Context, *SUT) { Difficulty: big.NewInt(0), // irrelevant but required to marshal Alloc: types.GenesisAlloc{}, }, + nodeID: ids.GenerateTestNodeID(), }, opts...) ) @@ -95,21 +124,30 @@ func newSUT(tb testing.TB, opts ...sutOption) (context.Context, *SUT) { // [atomic.SharedMemory.Apply] writes to the VM DB. memory := atomic.NewMemory(prefixdb.New([]byte("sharedmemory"), db)) snowCtx := snowtest.Context(tb, snowtest.CChainID) + snowCtx.NodeID = cfg.nodeID snowCtx.SharedMemory = memory.NewSharedMemory(snowtest.CChainID) log := saetest.NewTBLogger(tb, logging.Debug) snowCtx.Log = log + vdrState, ok := snowCtx.ValidatorState.(*validatorstest.State) + require.Truef(tb, ok, "unexpected type %T for snowCtx.ValidatorState", snowCtx.ValidatorState) + vdrState.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + validatorSet := make(map[ids.NodeID]*validators.GetValidatorOutput, cfg.validators.Len()) + for id := range cfg.validators { + validatorSet[id] = &validators.GetValidatorOutput{ + NodeID: id, + Weight: 1, + } + } + return validatorSet, nil + } + chainDB := prefixdb.New([]byte("chain"), db) genesisBytes, err := json.Marshal(cfg.genesis) require.NoErrorf(tb, err, "json.Marshal(%T)", cfg.genesis) - // The SAE mempool may push gossip transactions when they are issued. - appSender := &enginetest.Sender{ - SendAppGossipF: func(context.Context, snowcommon.SendConfig, []byte) error { - return nil - }, - } + appSender := saetest.NewSender(tb, cfg.validators) ctx := log.CancelOnError(tb.Context()) require.NoErrorf(tb, vm.Initialize( @@ -130,6 +168,10 @@ func newSUT(tb testing.TB, opts ...sutOption) (context.Context, *SUT) { }) require.NoErrorf(tb, vm.SetState(ctx, snow.NormalOp), "%T.SetState(%s)", vm, snow.NormalOp) + // Avalanchego marks the local node as connected so that p2p protocols don't + // need to treat our node as a special case. + require.NoErrorf(tb, vm.Connected(ctx, snowCtx.NodeID, version.Current), "%T.Connected(%s)", vm, snowCtx.NodeID) + handlers, err := vm.CreateHandlers(ctx) require.NoErrorf(tb, err, "%T.CreateHandlers()", vm) @@ -146,13 +188,16 @@ func newSUT(tb testing.TB, opts ...sutOption) (context.Context, *SUT) { require.NoErrorf(tb, err, "rpc.Dial(%s)", wsURI) tb.Cleanup(ethRPCClient.Close) - return ctx, &SUT{ + sut := &SUT{ VM: vm, Client: NewClient(server.URL), snowCtx: snowCtx, memory: memory, + sender: appSender, ethclient: ethclient.NewClient(ethRPCClient), } + appSender.SetSelf(sut) + return ctx, sut } // assertUTXOsExist asserts that the shared memory between peerChainID and the diff --git a/vms/saevm/sae/BUILD.bazel b/vms/saevm/sae/BUILD.bazel index 01e2e67eb609..d51d24d150c1 100644 --- a/vms/saevm/sae/BUILD.bazel +++ b/vms/saevm/sae/BUILD.bazel @@ -89,7 +89,6 @@ go_test( "//snow", "//snow/consensus/snowman", "//snow/engine/common", - "//snow/engine/enginetest", "//snow/engine/snowman/block", "//snow/snowtest", "//snow/validators", @@ -97,7 +96,6 @@ go_test( "//utils", "//utils/logging", "//utils/set", - "//utils/timer/mockable", "//version", "//vms/saevm/adaptor", "//vms/saevm/blocks", diff --git a/vms/saevm/sae/networked_test.go b/vms/saevm/sae/networked_test.go index 19d9927dfa18..0a0d01927b00 100644 --- a/vms/saevm/sae/networked_test.go +++ b/vms/saevm/sae/networked_test.go @@ -4,25 +4,18 @@ package sae import ( - "context" "iter" - "maps" - "slices" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/engine/common" - "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/set" - "github.com/ava-labs/avalanchego/utils/timer/mockable" - "github.com/ava-labs/avalanchego/version" + "github.com/ava-labs/avalanchego/vms/saevm/saetest" ) type networkedSUTs struct { - validators, nonValidators map[ids.NodeID]*SUT + validators, nonValidators []*SUT } // newNetworkedSUTs creates a network of SUTs with the specified number of @@ -35,213 +28,53 @@ type networkedSUTs struct { func newNetworkedSUTs(tb testing.TB, numValidators, numNonValidators int) *networkedSUTs { tb.Helper() + vdrIDs := set.NewSet[ids.NodeID](numValidators) + for range numValidators { + vdrIDs.Add(ids.GenerateTestNodeID()) + } + net := &networkedSUTs{ - validators: make(map[ids.NodeID]*SUT, numValidators), - nonValidators: make(map[ids.NodeID]*SUT, numNonValidators), + validators: make([]*SUT, 0, numValidators), + nonValidators: make([]*SUT, 0, numNonValidators), } const numAccounts = 1 - for range numValidators { - _, sut := newSUT(tb, numAccounts) - net.validators[sut.nodeID()] = sut + for id := range vdrIDs { + _, sut := newSUT(tb, numAccounts, withNodeID(id), withValidators(vdrIDs)) + net.validators = append(net.validators, sut) } for range numNonValidators { - _, sut := newSUT(tb, numAccounts) - net.nonValidators[sut.nodeID()] = sut + _, sut := newSUT(tb, numAccounts, withValidators(vdrIDs)) + net.nonValidators = append(net.nonValidators, sut) } - // To sanity check that the nodes agree on the genesis block, otherwise the + // Sanity check that the nodes agree on the genesis block, otherwise the // network will exhibit very weird behavior. _, expectedSUT := newSUT(tb, numAccounts) - for selfID, sut := range net.allNodes() { require.Equalf(tb, expectedSUT.genesis.ID(), sut.genesis.ID(), "genesis ID for node %s", selfID) + } - sut.validators.GetValidatorSetF = net.getValidatorSet - - s := sut.sender - node := net.node(tb, selfID) - s.SendAppRequestF = node.SendAppRequest - s.SendAppResponseF = node.SendAppResponse - s.SendAppErrorF = node.SendAppError - s.SendAppGossipF = node.SendAppGossip - - // Connect all the peers _after_ setting up the sender functions. - defer node.markConnectedToPeers(tb) + // Fully connect the validator clique. + saetest.Connect(tb, net.validators...) + // Connect each non-validator only to the validators, mirroring production + // where non-validators don't attempt to connect to each other. + for _, nv := range net.nonValidators { + saetest.ConnectTo(tb, nv, net.validators...) } return net } func (net *networkedSUTs) allNodes() iter.Seq2[ids.NodeID, *SUT] { return func(yield func(ids.NodeID, *SUT) bool) { - for id, sut := range net.validators { - if !yield(id, sut) { + for _, sut := range net.validators { + if !yield(sut.NodeID(), sut) { return } } - for id, sut := range net.nonValidators { - if !yield(id, sut) { - return - } - } - } -} - -func (net *networkedSUTs) allValidators() []*SUT { - return slices.Collect(maps.Values(net.validators)) -} - -func (net *networkedSUTs) allNonValidators() []*SUT { - return slices.Collect(maps.Values(net.nonValidators)) -} - -// getValidatorSet implements the [validators.State.GetValidatorSet] method, -// returning all validators in the network, each with weight 1. -func (net *networkedSUTs) getValidatorSet(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { - vs := make(map[ids.NodeID]*validators.GetValidatorOutput) - for id, sut := range net.validators { - vs[id] = &validators.GetValidatorOutput{ - NodeID: id, - PublicKey: sut.rawVM.snowCtx.PublicKey, - Weight: 1, - } - } - return vs, nil -} - -func (net *networkedSUTs) sutByID(tb testing.TB, id ids.NodeID) *SUT { - tb.Helper() - if sut, ok := net.validators[id]; ok { - return sut - } - sut, ok := net.nonValidators[id] - require.Truef(tb, ok, "Node %s is neither a validator nor non-validator", id) - return sut -} - -// A node couples an [SUT] with all of its peers as defined by a [networkedSUTs] -// instance. -type node struct { - *SUT - tb testing.TB - peers struct { - all, validators, nonValidators map[ids.NodeID]*SUT - } -} - -func (net *networkedSUTs) node(tb testing.TB, selfID ids.NodeID) *node { - tb.Helper() - n := &node{ - SUT: net.sutByID(tb, selfID), - tb: tb, - } - n.peers.validators = net.validators - - n.peers.all = maps.Clone(net.validators) - n.peers.all[selfID] = n.SUT - // As in production, only validators connect to non-validators. - if _, ok := net.validators[selfID]; ok { - maps.Copy(n.peers.all, net.nonValidators) - n.peers.nonValidators = net.nonValidators - } - return n -} - -// peer returns the [SUT] with the given ID, asserting that it is in fact a peer -// of the current node. Requests for non-peer IDs will therefore result in a -// test failure but will allow the call site to continue if desired. -func (n *node) peer(id ids.NodeID, when string) (*SUT, bool) { - if !assert.Containsf(n.tb, n.peers.all, id, "unknown peer %s in %s", id, when) { - return nil, false - } - return n.peers.all[id], true -} - -// Although [node] implements the [common.AppSender] interface, this is only to -// have local confirmation of matching signatures. Methods are actually accessed -// via [enginetest.Sender]s in the same goroutine. As a result, they deliver -// every message in a new goroutine to prevent re-entrant calls, which would -// result in a deadlock. -var _ common.AppSender = (*node)(nil) - -func (n *node) SendAppRequest(ctx context.Context, to set.Set[ids.NodeID], requestID uint32, msg []byte) error { - go func() { - for peerID := range to { - p, ok := n.peer(peerID, "SendAppRequest") - if !ok { - continue - } - assert.NoErrorf(n.tb, p.AppRequest(ctx, n.nodeID(), requestID, mockable.MaxTime, msg), "AppRequest(ctx, %s, ...)", peerID) - } - }() - return nil -} - -func (n *node) SendAppResponse(ctx context.Context, peerID ids.NodeID, requestID uint32, msg []byte) error { - go func() { - p, ok := n.peer(peerID, "SendAppResponse") - if !ok { - return - } - assert.NoErrorf(n.tb, p.AppResponse(ctx, n.nodeID(), requestID, msg), "AppResponse(ctx, %s, ...)", peerID) - }() - return nil -} - -func (n *node) SendAppError(ctx context.Context, peerID ids.NodeID, requestID uint32, code int32, msg string) error { - go func() { - p, ok := n.peer(peerID, "SendAppError") - if !ok { - return - } - appErr := &common.AppError{ - Code: code, - Message: msg, - } - assert.NoErrorf(n.tb, p.AppRequestFailed(ctx, n.nodeID(), requestID, appErr), "AppRequestFailed(ctx, %s, ...)", peerID) - }() - return nil -} - -func (n *node) SendAppGossip(ctx context.Context, to common.SendConfig, msg []byte) error { - go func() { - var sent set.Set[ids.NodeID] - for peerID := range to.NodeIDs { - p, ok := n.peer(peerID, "SendAppGossip") - if !ok { + for _, sut := range net.nonValidators { + if !yield(sut.NodeID(), sut) { return } - assert.NoErrorf(n.tb, p.AppGossip(ctx, n.nodeID(), msg), "AppGossip(ctx, %s, ...)", peerID) - sent.Add(peerID) - } - - send := func(peers map[ids.NodeID]*SUT, count int) error { - for peerID, peer := range peers { - if count <= 0 { - break - } - if sent.Contains(peerID) { - continue - } - if err := peer.AppGossip(ctx, n.nodeID(), msg); err != nil { - return err - } - - sent.Add(peerID) - count-- - } - return nil } - - assert.NoError(n.tb, send(n.peers.validators, to.Validators), "sending to validators") - assert.NoError(n.tb, send(n.peers.nonValidators, to.NonValidators), "sending to non-validators") - assert.NoError(n.tb, send(n.peers.all, to.Peers), "sending to peers") - }() - return nil -} - -func (n *node) markConnectedToPeers(tb testing.TB) { - tb.Helper() - for peerID := range n.peers.all { - require.NoErrorf(tb, n.Connected(tb.Context(), peerID, version.Current), "%T.Connected(%s)", n.SUT, peerID) } } diff --git a/vms/saevm/sae/vm_test.go b/vms/saevm/sae/vm_test.go index f24dd81ff6ce..41846bfb50fc 100644 --- a/vms/saevm/sae/vm_test.go +++ b/vms/saevm/sae/vm_test.go @@ -41,12 +41,13 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/consensus/snowman" - "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/snow/snowtest" + "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/snow/validators/validatorstest" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/version" "github.com/ava-labs/avalanchego/vms/saevm/adaptor" "github.com/ava-labs/avalanchego/vms/saevm/blocks" "github.com/ava-labs/avalanchego/vms/saevm/blocks/blockstest" @@ -84,10 +85,13 @@ type SUT struct { hooks *hookstest.Stub logger *saetest.TBLogger - validators *validatorstest.State - sender *enginetest.Sender + sender *saetest.Sender } +// Implements [saetest.Peer]. +func (s *SUT) NodeID() ids.NodeID { return s.rawVM.snowCtx.NodeID } +func (s *SUT) Sender() *saetest.Sender { return s.sender } + type ( sutConfig struct { hooks *hookstest.Stub @@ -96,10 +100,26 @@ type ( genesis core.Genesis db database.Database precompiles map[common.Address]libevm.PrecompiledContract + nodeID ids.NodeID + validators set.Set[ids.NodeID] } sutOption = options.Option[sutConfig] ) +// withNodeID overrides the SUT's randomly generated NodeID. +func withNodeID(id ids.NodeID) sutOption { + return options.Func[sutConfig](func(c *sutConfig) { + c.nodeID = id + }) +} + +// withValidators adds each NodeID to the validator set with weight 1. +func withValidators(vdrs set.Set[ids.NodeID]) sutOption { + return options.Func[sutConfig](func(c *sutConfig) { + c.validators = vdrs + }) +} + // chainID is made a global to keep it constant across multiple SUTs. var chainID = ids.GenerateTestID() @@ -126,7 +146,8 @@ func newSUT(tb testing.TB, numAccounts uint, opts ...sutOption) (context.Context Timestamp: saeparams.TauSeconds, Difficulty: big.NewInt(0), // irrelevant but required }, - db: memdb.New(), + db: memdb.New(), + nodeID: ids.GenerateTestNodeID(), }, opts...) vm := NewSinceGenesis(conf.hooks, conf.vmConfig) @@ -140,13 +161,22 @@ func newSUT(tb testing.TB, numAccounts uint, opts ...sutOption) (context.Context ctx := logger.CancelOnError(tb.Context()) snowCtx := snowtest.Context(tb, chainID) snowCtx.Log = logger - - sender := &enginetest.Sender{ - SendAppGossipF: func(context.Context, snowcommon.SendConfig, []byte) error { - return nil - }, + snowCtx.NodeID = conf.nodeID + vdrState, ok := snowCtx.ValidatorState.(*validatorstest.State) + require.Truef(tb, ok, "unexpected type %T for snowCtx.ValidatorState", snowCtx.ValidatorState) + vdrState.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + validatorSet := make(map[ids.NodeID]*validators.GetValidatorOutput, conf.validators.Len()) + for id := range conf.validators { + validatorSet[id] = &validators.GetValidatorOutput{ + NodeID: id, + Weight: 1, + } + } + return validatorSet, nil } + sender := saetest.NewSender(tb, conf.validators) + require.NoError(tb, snow.Initialize( ctx, snowCtx, @@ -171,11 +201,12 @@ func newSUT(tb testing.TB, numAccounts uint, opts ...sutOption) (context.Context require.NoError(tb, vm.last.accepted.Load().WaitUntilExecuted(ctx), "{last-accepted block}.WaitUntilExecuted()") }) - rpcClient, ethClient := dialRPC(ctx, tb, snow) + // Avalanchego marks the local node as connected so that p2p protocols + // don't need to treat our node as a special case. + require.NoErrorf(tb, snow.Connected(ctx, snowCtx.NodeID, version.Current), "Connected(%s)", snowCtx.NodeID) - validators, ok := snowCtx.ValidatorState.(*validatorstest.State) - require.Truef(tb, ok, "unexpected type %T for snowCtx.ValidatorState", snowCtx.ValidatorState) - return ctx, &SUT{ + rpcClient, ethClient := dialRPC(ctx, tb, snow) + sut := &SUT{ ChainVM: snow, Client: ethClient, rpcClient: rpcClient, @@ -189,9 +220,10 @@ func newSUT(tb testing.TB, numAccounts uint, opts ...sutOption) (context.Context hooks: conf.hooks, logger: logger, - validators: validators, - sender: sender, + sender: sender, } + sender.SetSelf(sut) + return ctx, sut } func dialRPC(ctx context.Context, tb testing.TB, snow block.ChainVM) (*rpc.Client, *ethclient.Client) { @@ -325,10 +357,6 @@ func registerPrecompiles(tb testing.TB, precompiles map[common.Address]libevm.Pr h.Register(tb) } -func (s *SUT) nodeID() ids.NodeID { - return s.rawVM.snowCtx.NodeID -} - // context returns a [context.Context], derived from the [testing.TB], that is // cancelled if the SUT's default logger receives a log at [logging.Error] or // higher. @@ -968,7 +996,7 @@ func requireReceiveTx(tb testing.TB, nodes []*SUT, txHash common.Hash) { for _, sut := range nodes { assert.Eventuallyf(tb, func() bool { return sut.rawVM.mempool.Has(ids.ID(txHash)) - }, 5*time.Second, 100*time.Millisecond, "tx %x not gossiped to node %s", txHash, sut.nodeID()) + }, 5*time.Second, 100*time.Millisecond, "tx %x not gossiped to node %s", txHash, sut.NodeID()) } if tb.Failed() { tb.FailNow() @@ -978,7 +1006,7 @@ func requireReceiveTx(tb testing.TB, nodes []*SUT, txHash common.Hash) { func requireNotReceiveTx(tb testing.TB, nodes []*SUT, txHash common.Hash) { tb.Helper() for _, sut := range nodes { - assert.False(tb, sut.rawVM.mempool.Has(ids.ID(txHash)), "tx %x was gossiped to node %s", txHash, sut.nodeID()) + assert.False(tb, sut.rawVM.mempool.Has(ids.ID(txHash)), "tx %x was gossiped to node %s", txHash, sut.NodeID()) } if tb.Failed() { tb.FailNow() @@ -988,8 +1016,7 @@ func requireNotReceiveTx(tb testing.TB, nodes []*SUT, txHash common.Hash) { func TestGossip(t *testing.T) { n := newNetworkedSUTs(t, 2, 2) - nonValidators := n.allNonValidators() - api := nonValidators[0] + api := n.nonValidators[0] tx := api.wallet.SetNonceAndSign(t, 0, &types.DynamicFeeTx{ To: &common.Address{}, Gas: params.TxGas, @@ -997,8 +1024,8 @@ func TestGossip(t *testing.T) { Value: big.NewInt(1), }) api.mustSendTx(t, tx) - requireReceiveTx(t, n.allValidators(), tx.Hash()) - requireNotReceiveTx(t, nonValidators[1:], tx.Hash()) + requireReceiveTx(t, n.validators, tx.Hash()) + requireNotReceiveTx(t, n.nonValidators[1:], tx.Hash()) } func TestBlockSources(t *testing.T) { diff --git a/vms/saevm/saetest/BUILD.bazel b/vms/saevm/saetest/BUILD.bazel index 04f7460ac8c6..3022cb7dce3c 100644 --- a/vms/saevm/saetest/BUILD.bazel +++ b/vms/saevm/saetest/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "goleak.go", "heightdb.go", "logging.go", + "network.go", "saetest.go", "wallet.go", ], @@ -15,8 +16,14 @@ go_library( visibility = ["//visibility:public"], deps = [ "//database", + "//ids", + "//snow/engine/common", + "//snow/validators", "//utils/lock", "//utils/logging", + "//utils/set", + "//utils/timer/mockable", + "//version", "//vms/saevm/saedb", "//vms/saevm/types", "@com_github_ava_labs_libevm//common", @@ -26,9 +33,11 @@ go_library( "@com_github_ava_labs_libevm//crypto", "@com_github_ava_labs_libevm//event", "@com_github_ava_labs_libevm//libevm/ethtest", + "@com_github_ava_labs_libevm//libevm/eventual", "@com_github_ava_labs_libevm//params", "@com_github_ava_labs_libevm//trie", "@com_github_holiman_uint256//:uint256", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", "@org_uber_go_zap//:zap", diff --git a/vms/saevm/saetest/network.go b/vms/saevm/saetest/network.go new file mode 100644 index 000000000000..0bf403f32ae1 --- /dev/null +++ b/vms/saevm/saetest/network.go @@ -0,0 +1,217 @@ +// Copyright (C) 2019, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package saetest + +import ( + "context" + "sync" + "testing" + + "github.com/ava-labs/libevm/libevm/eventual" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/utils/timer/mockable" + "github.com/ava-labs/avalanchego/version" +) + +// Peer defines the minimal surface for using the [Sender] and [Connect] +// helpers. +type Peer interface { + common.AppHandler + validators.Connector + NodeID() ids.NodeID + Sender() *Sender +} + +// Sender is a test [common.AppSender] that routes messages between in-process +// peers registered via [Sender.AddPeer]. Like the production avalanchego +// instance, each call is delivered in its own goroutine. +type Sender struct { + tb testing.TB + vdrs set.Set[ids.NodeID] + + self eventual.Value[common.AppHandler] + selfID ids.NodeID + + peersLock sync.RWMutex + peers map[ids.NodeID]common.AppHandler +} + +// NewSender returns a [Sender] whose validator-set sampling is driven by vdrs. +func NewSender(tb testing.TB, vdrs set.Set[ids.NodeID]) *Sender { + return &Sender{ + tb: tb, + self: eventual.New[common.AppHandler](), + vdrs: vdrs, + peers: make(map[ids.NodeID]common.AppHandler), + } +} + +// SetSelf binds the sender to the local node. It MUST be called before any +// other peer's handler is invoked, since [Sender] uses self's NodeID as the +// source of every routed message. +func (s *Sender) SetSelf(self Peer) { + s.selfID = self.NodeID() + s.self.Put(self) +} + +// AddPeer registers peer so that messages addressed to peer.NodeID() are +// delivered to it. +func (s *Sender) AddPeer(peer Peer) { + s.peersLock.Lock() + defer s.peersLock.Unlock() + + s.peers[peer.NodeID()] = peer +} + +func (s *Sender) SendAppRequest(_ context.Context, to set.Set[ids.NodeID], requestID uint32, b []byte) error { + go s.sendAppRequest(to, requestID, b) + return nil +} + +func (s *Sender) SendAppResponse(_ context.Context, to ids.NodeID, requestID uint32, b []byte) error { + go s.sendAppResponse(to, requestID, b) + return nil +} + +func (s *Sender) SendAppError(_ context.Context, nodeID ids.NodeID, requestID uint32, code int32, message string) error { + go s.sendAppError(nodeID, requestID, code, message) + return nil +} + +func (s *Sender) SendAppGossip(_ context.Context, c common.SendConfig, b []byte) error { + go s.sendAppGossip(c, b) + return nil +} + +func (s *Sender) sendAppRequest(to set.Set[ids.NodeID], requestID uint32, b []byte) { + ctx := s.tb.Context() + self, selfID := s.getSelf() + for peerID := range to { + if peer, ok := s.getPeer(peerID); ok { + assert.NoErrorf(s.tb, peer.AppRequest(ctx, selfID, requestID, mockable.MaxTime, b), "%T.AppRequest(%s)", peer, selfID) + } else { + assert.NoErrorf(s.tb, self.AppRequestFailed(ctx, peerID, requestID, common.ErrTimeout), "%T.AppRequestFailed(%s)", self, peerID) + } + } +} + +func (s *Sender) sendAppResponse(to ids.NodeID, requestID uint32, b []byte) { + peer, ok := s.getPeer(to) + if !ok { + return + } + ctx := s.tb.Context() + _, selfID := s.getSelf() + assert.NoErrorf(s.tb, peer.AppResponse(ctx, selfID, requestID, b), "%T.AppResponse(%s)", peer, selfID) +} + +func (s *Sender) sendAppError(to ids.NodeID, requestID uint32, code int32, message string) { + peer, ok := s.getPeer(to) + if !ok { + return + } + ctx := s.tb.Context() + _, selfID := s.getSelf() + appErr := &common.AppError{ + Code: code, + Message: message, + } + assert.NoErrorf(s.tb, peer.AppRequestFailed(ctx, selfID, requestID, appErr), "%T.AppRequestFailed(%s)", peer, selfID) +} + +func (s *Sender) sendAppGossip(c common.SendConfig, b []byte) { + var ( + ctx = s.tb.Context() + _, selfID = s.getSelf() + ) + for _, peer := range s.sample(c) { + assert.NoErrorf(s.tb, peer.AppGossip(ctx, selfID, b), "%T.AppGossip(%s)", peer, selfID) + } +} + +func (s *Sender) sample(c common.SendConfig) []common.AppHandler { + var ( + sent set.Set[ids.NodeID] + toSend []common.AppHandler + ) + if self, selfID := s.getSelf(); c.NodeIDs.Contains(selfID) { + sent.Add(selfID) + toSend = append(toSend, self) + } + + s.peersLock.RLock() + defer s.peersLock.RUnlock() + + add := func(count int, allow func(peerID ids.NodeID) bool) { + for peerID, peer := range s.peers { + if count <= 0 { + break + } + if !sent.Contains(peerID) && allow(peerID) { + sent.Add(peerID) + toSend = append(toSend, peer) + count-- + } + } + } + add(c.NodeIDs.Len(), c.NodeIDs.Contains) + add(c.Validators, s.vdrs.Contains) + add(c.NonValidators, func(peerID ids.NodeID) bool { + return !s.vdrs.Contains(peerID) + }) + add(c.Peers, func(ids.NodeID) bool { return true }) + return toSend +} + +func (s *Sender) getSelf() (common.AppHandler, ids.NodeID) { + self := s.self.Peek() // ensure SetSelf is called before accessing selfID + return self, s.selfID +} + +func (s *Sender) getPeer(peerID ids.NodeID) (common.AppHandler, bool) { + if self, selfID := s.getSelf(); selfID == peerID { + return self, true + } + + s.peersLock.RLock() + defer s.peersLock.RUnlock() + + peer, ok := s.peers[peerID] + return peer, ok +} + +// Connect wires every given pair of peers together, marking both sides as +// connected. +func Connect[P Peer](tb testing.TB, peers ...P) { + tb.Helper() + + for i, peer := range peers { + ConnectTo(tb, peer, peers[:i]...) + } +} + +// ConnectTo wires self to each peer, marking both sides as connected. +func ConnectTo[P Peer](tb testing.TB, self P, peers ...P) { + tb.Helper() + + ctx := tb.Context() + var ( + selfID = self.NodeID() + selfSender = self.Sender() + ) + for _, peer := range peers { + selfSender.AddPeer(peer) + peer.Sender().AddPeer(self) + + dstID := peer.NodeID() + require.NoErrorf(tb, self.Connected(ctx, dstID, version.Current), "%T.Connected(%s)", self, dstID) + require.NoErrorf(tb, peer.Connected(ctx, selfID, version.Current), "%T.Connected(%s)", peer, selfID) + } +}