Skip to content
Draft
116 changes: 71 additions & 45 deletions bold/challenge/chain/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
gethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"

"github.com/offchainlabs/nitro/bold/api"
"github.com/offchainlabs/nitro/bold/api/db"
"github.com/offchainlabs/nitro/bold/challenge/tree"
"github.com/offchainlabs/nitro/bold/containers/events"
"github.com/offchainlabs/nitro/bold/containers/option"
"github.com/offchainlabs/nitro/bold/containers/threadsafe"
"github.com/offchainlabs/nitro/bold/protocol"
Expand Down Expand Up @@ -87,6 +89,7 @@ type Watcher struct {
// Track all if empty / nil.
trackChallengeParentAssertionHashes []protocol.AssertionHash
maxGetLogBlocks uint64
blockNotifier *events.Producer[*gethtypes.Header]
}

// New initializes a watcher service for frequently scanning the chain
Expand Down Expand Up @@ -124,6 +127,12 @@ func (w *Watcher) SetEdgeManager(em EdgeManager) {
w.edgeManager = em
}

// SetBlockNotifier sets a block notifier that the watcher will subscribe to
// for reactive event polling instead of using a fixed timer interval.
func (w *Watcher) SetBlockNotifier(notifier *events.Producer[*gethtypes.Header]) {
w.blockNotifier = notifier
}

// AvgBlockTime returns the average time for block creation.
func (w *Watcher) AvgBlockTime() time.Duration {
return w.averageTimeForBlockCreation
Expand Down Expand Up @@ -245,58 +254,75 @@ func (w *Watcher) Start(ctx context.Context) {
}

fromBlock = toBlock
ticker := time.NewTicker(w.pollEventsInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
toBlock, err := w.chain.DesiredHeaderU64(ctx)
if err != nil {
log.Error("Could not get latest header", "err", err)
continue
}
// AssertionChain's rpcHeadBlockNumber is set to finalized and this might occur due to l1 backends of load balancer
// not being in consensus wrt finalized. In which case we ignore and continue
if fromBlock > toBlock {
continue
}
if fromBlock == toBlock {
w.initialSyncCompleted.Store(true)
continue
}
// Get a challenge manager instance and filterer.
challengeManager := w.chain.SpecChallengeManager()
filterer, err = retry.UntilSucceeds(ctx, func() (*challengeV2gen.EdgeChallengeManagerFilterer, error) {
return challengeV2gen.NewEdgeChallengeManagerFilterer(challengeManager.Address(), w.backend)
})
if err != nil {
log.Error("Could not get challenge manager filterer", "err", err)
w.initialSyncCompleted.Store(true)
if w.blockNotifier != nil {
sub := w.blockNotifier.Subscribe()
for {
if _, done := sub.Next(ctx); done {
return
}
filterOpts := &bind.FilterOpts{
Start: fromBlock,
End: &toBlock,
Context: ctx,
}
if err = w.checkForEdgeAdded(ctx, filterer, filterOpts); err != nil {
log.Error("Could not check for edge added", "err", err)
continue
}
if err = w.checkForEdgeConfirmedByOneStepProof(ctx, filterer, filterOpts); err != nil {
log.Error("Could not check for edge confirmed by osp", "err", err)
continue
}
if err = w.checkForEdgeConfirmedByTime(ctx, filterer, filterOpts); err != nil {
log.Error("Could not check for edge confirmed by time", "err", err)
continue
fromBlock = w.processNewBlocks(ctx, fromBlock)
}
} else {
ticker := time.NewTicker(w.pollEventsInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fromBlock = w.processNewBlocks(ctx, fromBlock)
case <-ctx.Done():
return
}
fromBlock = toBlock
case <-ctx.Done():
return
}
}
}

// processNewBlocks polls for new edge events since fromBlock and returns the
// updated fromBlock value.
func (w *Watcher) processNewBlocks(ctx context.Context, fromBlock uint64) uint64 {
toBlock, err := w.chain.DesiredHeaderU64(ctx)
if err != nil {
log.Error("Could not get latest header", "err", err)
return fromBlock
}
// AssertionChain's rpcHeadBlockNumber is set to finalized and this might occur due to l1 backends of load balancer
// not being in consensus wrt finalized. In which case we ignore and continue
if fromBlock > toBlock {
return fromBlock
}
if fromBlock == toBlock {
w.initialSyncCompleted.Store(true)
return fromBlock
}
// Get a challenge manager instance and filterer.
challengeManager := w.chain.SpecChallengeManager()
filterer, err := retry.UntilSucceeds(ctx, func() (*challengeV2gen.EdgeChallengeManagerFilterer, error) {
return challengeV2gen.NewEdgeChallengeManagerFilterer(challengeManager.Address(), w.backend)
})
if err != nil {
log.Error("Could not get challenge manager filterer", "err", err)
return fromBlock
}
filterOpts := &bind.FilterOpts{
Start: fromBlock,
End: &toBlock,
Context: ctx,
}
if err = w.checkForEdgeAdded(ctx, filterer, filterOpts); err != nil {
log.Error("Could not check for edge added", "err", err)
return fromBlock
}
if err = w.checkForEdgeConfirmedByOneStepProof(ctx, filterer, filterOpts); err != nil {
log.Error("Could not check for edge confirmed by osp", "err", err)
return fromBlock
}
if err = w.checkForEdgeConfirmedByTime(ctx, filterer, filterOpts); err != nil {
log.Error("Could not check for edge confirmed by time", "err", err)
return fromBlock
}
return toBlock
}

// GetRoyalEdges returns all royal, tracked edges in the watcher by assertion
// hash.
func (w *Watcher) GetRoyalEdges(ctx context.Context) (map[protocol.AssertionHash][]*api.JsonTrackedRoyalEdge, error) {
Expand Down
1 change: 1 addition & 0 deletions bold/challenge/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func New(
o(m)
}
m.watcher.SetEdgeManager(m)
m.watcher.SetBlockNotifier(m.newBlockNotifier)
m.assertionManager.SetRivalHandler(m)
log.Info("Setting up challenge manager",
"name", m.name,
Expand Down
3 changes: 1 addition & 2 deletions bold/challenge/tracker/challenge_confirmation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/ccoveille/go-safecast"
"github.com/pkg/errors"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -339,7 +338,7 @@ func (cc *challengeConfirmer) waitForTxToBeSafe(

// This is to handle the case where the transaction is mined in a block, but then the block is reorged.
// In this case, we want to wait for the transaction to be mined again.
receiptLatest, err := bind.WaitMined(ctx, backend, tx)
receiptLatest, err := protocol.WaitMined(ctx, backend, tx)
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions bold/challenge/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ func (et *Tracker) Spawn(ctx context.Context) {
}
if err := et.Act(ctx); err != nil {
log.Error("Could not act with edge tracker", append(fields, "err", err)...)
select {
case <-time.After(5 * time.Second):
case <-ctx.Done():
return
}
}
}
}
Expand Down
21 changes: 14 additions & 7 deletions bold/containers/events/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Producer[T any] struct {
doneListener chan subId // channel to listen for IDs of subscriptions to be remove.
broadcastTimeout time.Duration // maximum duration to wait for an event to be sent.
nextId subId // monotonically increasing id for stable subscription identification
stopped chan struct{} // closed when the producer shuts down
}

type ProducerOpt[T any] func(*Producer[T])
Expand All @@ -47,6 +48,7 @@ func NewProducer[T any](opts ...ProducerOpt[T]) *Producer[T] {
subscriptionBufferSize: defaultSubscriptionBufferSize,
doneListener: make(chan subId, 100),
broadcastTimeout: defaultBroadcastTimeout,
stopped: make(chan struct{}),
}
for _, opt := range opts {
opt(producer)
Expand All @@ -73,6 +75,7 @@ func (ep *Producer[T]) Start(ctx context.Context) {
}
ep.Unlock()
case <-ctx.Done():
close(ep.stopped)
close(ep.doneListener)
ep.subs = nil
return
Expand All @@ -86,9 +89,10 @@ func (ep *Producer[T]) Subscribe() *Subscription[T] {
ep.Lock()
defer ep.Unlock()
sub := &Subscription[T]{
id: ep.nextId, // Assign a stable, monotonically increasing ID
events: make(chan T),
done: ep.doneListener,
id: ep.nextId, // Assign a stable, monotonically increasing ID
events: make(chan T),
done: ep.doneListener,
stopped: ep.stopped,
}
ep.nextId++
ep.subs = append(ep.subs, sub)
Expand All @@ -106,6 +110,7 @@ func (ep *Producer[T]) Broadcast(ctx context.Context, event T) {
go func(listener *Subscription[T]) {
select {
case listener.events <- event:
case <-listener.stopped:
case <-time.After(ep.broadcastTimeout):
case <-ctx.Done():
}
Expand All @@ -118,9 +123,10 @@ type subId int
// Subscription defines a generic handle to a subscription of
// events from a producer.
type Subscription[T any] struct {
id subId
events chan T
done chan subId
id subId
events chan T
done chan subId
stopped <-chan struct{}
}

// Next waits for the next event or context cancelation, returning the event or an error.
Expand All @@ -130,9 +136,10 @@ func (es *Subscription[T]) Next(ctx context.Context) (T, bool) {
select {
case ev := <-es.events:
return ev, false
case <-es.stopped:
return zeroVal, true
case <-ctx.Done():
es.done <- es.id
close(es.events)
return zeroVal, true
}
}
Expand Down
4 changes: 2 additions & 2 deletions bold/protocol/sol/transact.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (a *AssertionChain) transact(
}
ctxWaitMined, cancelWaitMined := context.WithTimeout(ctx, time.Minute)
defer cancelWaitMined()
receipt, err := bind.WaitMined(ctxWaitMined, backend, tx)
receipt, err := protocol.WaitMined(ctxWaitMined, backend, tx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -178,7 +178,7 @@ func (a *AssertionChain) waitForTxToBeSafe(

// This is to handle the case where the transaction is mined in a block, but then the block is reorged.
// In this case, we want to wait for the transaction to be mined again.
receiptLatest, err := bind.WaitMined(ctx, backend, tx)
receiptLatest, err := protocol.WaitMined(ctx, backend, tx)
if err != nil {
return nil, err
}
Expand Down
47 changes: 47 additions & 0 deletions bold/protocol/wait_mined.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023-2026, Offchain Labs, Inc.
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md

package protocol

import (
"context"
"fmt"

"github.com/pkg/errors"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

// WaitMined waits for a transaction to be mined by subscribing to new head
// notifications from the backend. This is faster than bind.WaitMined's
// hardcoded 1s polling because ChainBackend always supports head
// subscriptions. Falls back to bind.WaitMined if the subscription fails.
func WaitMined(ctx context.Context, b ChainBackend, tx *types.Transaction) (*types.Receipt, error) {
txHash := tx.Hash()
heads := make(chan *types.Header, 1)
sub, err := b.SubscribeNewHead(ctx, heads)
if err != nil {
log.Warn("Could not subscribe to new heads for WaitMined, falling back to polling", "err", err)
return bind.WaitMined(ctx, b, tx)
}
defer sub.Unsubscribe()

for {
receipt, err := b.TransactionReceipt(ctx, txHash)
if err == nil {
return receipt, nil
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-sub.Err():
if err != nil {
return nil, fmt.Errorf("head subscription error while waiting for tx: %w", err)
}
return nil, errors.New("head subscription closed unexpectedly")
case <-heads:
}
}
}
41 changes: 39 additions & 2 deletions bold/testing/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/pkg/errors"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -50,7 +51,14 @@ type committer interface {
Commit() common.Hash
}

// WaitForTx to be mined. This method will trigger .Commit() on a simulated backend.
type headSubscriber interface {
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
}

// WaitForTx waits for a transaction to be mined. It triggers .Commit() on
// simulated backends. If the backend supports head subscriptions, those are
// used for near-instant notification; otherwise it falls back to
// bind.WaitMined (1-second polling).
func WaitForTx(ctx context.Context, be bind.DeployBackend, tx *types.Transaction) error {
if simulated, ok := be.(committer); ok {
simulated.Commit()
Expand All @@ -59,7 +67,36 @@ func WaitForTx(ctx context.Context, be bind.DeployBackend, tx *types.Transaction
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

if subscriber, ok := be.(headSubscriber); ok {
return waitMinedWithSubscription(ctx, be, subscriber, tx)
}
_, err := bind.WaitMined(ctx, be, tx)

return err
}

func waitMinedWithSubscription(ctx context.Context, be bind.DeployBackend, subscriber headSubscriber, tx *types.Transaction) error {
heads := make(chan *types.Header, 8)
sub, err := subscriber.SubscribeNewHead(ctx, heads)
if err != nil {
_, err = bind.WaitMined(ctx, be, tx)
return err
}
defer sub.Unsubscribe()

for {
receipt, err := be.TransactionReceipt(ctx, tx.Hash())
if err == nil && receipt != nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-sub.Err():
// Subscription failed; fall back to polling.
_, err = bind.WaitMined(ctx, be, tx)
return err
case <-heads:
case <-time.After(time.Second):
}
}
}
Loading