From 2d513118926fa2651c1ed43e1716f308915512ab Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Thu, 30 Apr 2026 10:27:15 -0300 Subject: [PATCH 1/5] wait for log --- cmd/workflow/simulate/chain/evm/trigger.go | 29 ++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/cmd/workflow/simulate/chain/evm/trigger.go b/cmd/workflow/simulate/chain/evm/trigger.go index 68c3f178..ad8a71f4 100644 --- a/cmd/workflow/simulate/chain/evm/trigger.go +++ b/cmd/workflow/simulate/chain/evm/trigger.go @@ -2,13 +2,17 @@ package evm import ( "context" + "errors" "fmt" "math" "math/big" "strconv" "strings" + "time" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" evmpb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/evm" @@ -72,7 +76,9 @@ func GetEVMTriggerLog(ctx context.Context, ethClient *ethclient.Client) (*evmpb. return nil, fmt.Errorf("invalid event index: %w", err) } - return fetchAndConvertLog(ctx, ethClient, txHash, eventIndex, true) + timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + return fetchAndConvertLog(timeoutCtx, ethClient, txHash, eventIndex, true) } // GetEVMTriggerLogFromValues fetches a log given tx hash string and event index. @@ -99,7 +105,7 @@ func GetEVMTriggerLogFromValues(ctx context.Context, ethClient *ethclient.Client func fetchAndConvertLog(ctx context.Context, ethClient *ethclient.Client, txHash common.Hash, eventIndex uint64, verbose bool) (*evmpb.Log, error) { receiptSpinner := ui.NewSpinner() receiptSpinner.Start(fmt.Sprintf("Fetching transaction receipt for %s...", txHash.Hex())) - txReceipt, err := ethClient.TransactionReceipt(ctx, txHash) + txReceipt, err := waitForTransactionReceipt(ctx, ethClient, txHash) receiptSpinner.Stop() if err != nil { return nil, fmt.Errorf("failed to fetch transaction receipt: %w", err) @@ -146,3 +152,22 @@ func fetchAndConvertLog(ctx context.Context, ethClient *ethclient.Client, txHash } return pbLog, nil } + +func waitForTransactionReceipt(ctx context.Context, ethClient *ethclient.Client, txHash common.Hash) (*types.Receipt, error) { + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + txReceipt, err := ethClient.TransactionReceipt(ctx, txHash) + if err == nil { + return txReceipt, nil + } + if !errors.Is(err, ethereum.NotFound) { + return nil, err + } + // tx not found yet, wait and retry + time.Sleep(3 * time.Second) + } +} From d89ab17698a2da63a2dd5681486fb5f5c34ec384 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Thu, 30 Apr 2026 10:31:34 -0300 Subject: [PATCH 2/5] move timeout --- cmd/workflow/simulate/chain/evm/trigger.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/workflow/simulate/chain/evm/trigger.go b/cmd/workflow/simulate/chain/evm/trigger.go index ad8a71f4..f1742c78 100644 --- a/cmd/workflow/simulate/chain/evm/trigger.go +++ b/cmd/workflow/simulate/chain/evm/trigger.go @@ -76,9 +76,7 @@ func GetEVMTriggerLog(ctx context.Context, ethClient *ethclient.Client) (*evmpb. return nil, fmt.Errorf("invalid event index: %w", err) } - timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() - return fetchAndConvertLog(timeoutCtx, ethClient, txHash, eventIndex, true) + return fetchAndConvertLog(ctx, ethClient, txHash, eventIndex, true) } // GetEVMTriggerLogFromValues fetches a log given tx hash string and event index. @@ -105,11 +103,15 @@ func GetEVMTriggerLogFromValues(ctx context.Context, ethClient *ethclient.Client func fetchAndConvertLog(ctx context.Context, ethClient *ethclient.Client, txHash common.Hash, eventIndex uint64, verbose bool) (*evmpb.Log, error) { receiptSpinner := ui.NewSpinner() receiptSpinner.Start(fmt.Sprintf("Fetching transaction receipt for %s...", txHash.Hex())) - txReceipt, err := waitForTransactionReceipt(ctx, ethClient, txHash) + + timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute) + txReceipt, err := waitForTransactionReceipt(timeoutCtx, ethClient, txHash) receiptSpinner.Stop() + cancel() if err != nil { return nil, fmt.Errorf("failed to fetch transaction receipt: %w", err) } + if eventIndex >= uint64(len(txReceipt.Logs)) { return nil, fmt.Errorf("event index %d out of range, transaction has %d log events", eventIndex, len(txReceipt.Logs)) } From 9057571bc1afc84406a514656c1703ec80a1fda8 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Thu, 30 Apr 2026 10:39:05 -0300 Subject: [PATCH 3/5] move cli to flag --- cmd/workflow/simulate/chain/evm/chaintype.go | 23 +++++++++++++++---- cmd/workflow/simulate/chain/evm/trigger.go | 15 +++++++----- .../simulate/chain/evm/trigger_test.go | 17 +++++++------- 3 files changed, 37 insertions(+), 18 deletions(-) diff --git a/cmd/workflow/simulate/chain/evm/chaintype.go b/cmd/workflow/simulate/chain/evm/chaintype.go index 5cece5ef..224b1641 100644 --- a/cmd/workflow/simulate/chain/evm/chaintype.go +++ b/cmd/workflow/simulate/chain/evm/chaintype.go @@ -7,6 +7,7 @@ import ( "fmt" "strconv" "strings" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -33,6 +34,7 @@ func init() { }, []chain.CLIFlagDef{ {Name: TriggerInputTxHash, Description: "EVM trigger transaction hash (0x...)", FlagType: chain.CLIFlagString}, {Name: TriggerInputEventIndex, Description: "EVM trigger log index (0-based)", DefaultValue: "-1", FlagType: chain.CLIFlagInt}, + {Name: TriggerInputReceiptTimeout, Description: "Timeout for waiting on an EVM transaction receipt (e.g. 30s, 2m)", DefaultValue: "1m", FlagType: chain.CLIFlagString}, }) } @@ -266,8 +268,9 @@ func isHexString(s string) bool { // CLI input keys consumed from chain.TriggerParams.ChainTypeInputs. const ( - TriggerInputTxHash = "evm-tx-hash" - TriggerInputEventIndex = "evm-event-index" + TriggerInputTxHash = "evm-tx-hash" + TriggerInputEventIndex = "evm-event-index" + TriggerInputReceiptTimeout = "evm-receipt-timeout" ) func (ct *EVMChainType) CollectCLIInputs(v *viper.Viper) map[string]string { @@ -278,6 +281,9 @@ func (ct *EVMChainType) CollectCLIInputs(v *viper.Viper) map[string]string { if idx := v.GetInt(TriggerInputEventIndex); idx >= 0 { inputs[TriggerInputEventIndex] = strconv.Itoa(idx) } + if timeout := strings.TrimSpace(v.GetString(TriggerInputReceiptTimeout)); timeout != "" { + inputs[TriggerInputReceiptTimeout] = timeout + } return inputs } @@ -293,8 +299,17 @@ func (ct *EVMChainType) ResolveTriggerData(ctx context.Context, selector uint64, return nil, fmt.Errorf("invalid client type for EVM chain selector %d", selector) } + receiptTimeout := time.Minute // default + if raw := strings.TrimSpace(params.ChainTypeInputs[TriggerInputReceiptTimeout]); raw != "" { + d, err := time.ParseDuration(raw) + if err != nil { + return nil, fmt.Errorf("invalid --%s %q: %w", TriggerInputReceiptTimeout, raw, err) + } + receiptTimeout = d + } + if params.Interactive { - return GetEVMTriggerLog(ctx, client) + return GetEVMTriggerLog(ctx, client, receiptTimeout) } txHash := strings.TrimSpace(params.ChainTypeInputs[TriggerInputTxHash]) @@ -306,5 +321,5 @@ func (ct *EVMChainType) ResolveTriggerData(ctx context.Context, selector uint64, if err != nil { return nil, fmt.Errorf("invalid --evm-event-index %q: %w", eventIndexStr, err) } - return GetEVMTriggerLogFromValues(ctx, client, txHash, eventIndex) + return GetEVMTriggerLogFromValues(ctx, client, txHash, eventIndex, receiptTimeout) } diff --git a/cmd/workflow/simulate/chain/evm/trigger.go b/cmd/workflow/simulate/chain/evm/trigger.go index f1742c78..d5a674aa 100644 --- a/cmd/workflow/simulate/chain/evm/trigger.go +++ b/cmd/workflow/simulate/chain/evm/trigger.go @@ -22,7 +22,8 @@ import ( ) // GetEVMTriggerLog prompts user for EVM trigger data and fetches the log interactively. -func GetEVMTriggerLog(ctx context.Context, ethClient *ethclient.Client) (*evmpb.Log, error) { +// receiptTimeout controls how long to wait for the transaction receipt before giving up. +func GetEVMTriggerLog(ctx context.Context, ethClient *ethclient.Client, receiptTimeout time.Duration) (*evmpb.Log, error) { var txHashInput string var eventIndexInput string @@ -76,13 +77,14 @@ func GetEVMTriggerLog(ctx context.Context, ethClient *ethclient.Client) (*evmpb. return nil, fmt.Errorf("invalid event index: %w", err) } - return fetchAndConvertLog(ctx, ethClient, txHash, eventIndex, true) + return fetchAndConvertLog(ctx, ethClient, txHash, eventIndex, true, receiptTimeout) } // GetEVMTriggerLogFromValues fetches a log given tx hash string and event index. // Unlike GetEVMTriggerLog (interactive), this does not emit ui.Success messages // to keep non-interactive/CI output clean. -func GetEVMTriggerLogFromValues(ctx context.Context, ethClient *ethclient.Client, txHashStr string, eventIndex uint64) (*evmpb.Log, error) { +// receiptTimeout controls how long to wait for the transaction receipt before giving up. +func GetEVMTriggerLogFromValues(ctx context.Context, ethClient *ethclient.Client, txHashStr string, eventIndex uint64, receiptTimeout time.Duration) (*evmpb.Log, error) { txHashStr = strings.TrimSpace(txHashStr) if txHashStr == "" { return nil, fmt.Errorf("transaction hash cannot be empty") @@ -95,16 +97,17 @@ func GetEVMTriggerLogFromValues(ctx context.Context, ethClient *ethclient.Client } txHash := common.HexToHash(txHashStr) - return fetchAndConvertLog(ctx, ethClient, txHash, eventIndex, false) + return fetchAndConvertLog(ctx, ethClient, txHash, eventIndex, false, receiptTimeout) } // fetchAndConvertLog fetches a transaction receipt log and converts it to the protobuf format. // When verbose is true (interactive mode), ui.Success messages are emitted. -func fetchAndConvertLog(ctx context.Context, ethClient *ethclient.Client, txHash common.Hash, eventIndex uint64, verbose bool) (*evmpb.Log, error) { +// receiptTimeout controls how long to wait for the receipt before the context is cancelled. +func fetchAndConvertLog(ctx context.Context, ethClient *ethclient.Client, txHash common.Hash, eventIndex uint64, verbose bool, receiptTimeout time.Duration) (*evmpb.Log, error) { receiptSpinner := ui.NewSpinner() receiptSpinner.Start(fmt.Sprintf("Fetching transaction receipt for %s...", txHash.Hex())) - timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute) + timeoutCtx, cancel := context.WithTimeout(ctx, receiptTimeout) txReceipt, err := waitForTransactionReceipt(timeoutCtx, ethClient, txHash) receiptSpinner.Stop() cancel() diff --git a/cmd/workflow/simulate/chain/evm/trigger_test.go b/cmd/workflow/simulate/chain/evm/trigger_test.go index d176c97b..8cf059fe 100644 --- a/cmd/workflow/simulate/chain/evm/trigger_test.go +++ b/cmd/workflow/simulate/chain/evm/trigger_test.go @@ -9,6 +9,7 @@ import ( "net/http/httptest" "strings" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -39,7 +40,7 @@ func TestGetEVMTriggerLogFromValues_Validation(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() - _, err := GetEVMTriggerLogFromValues(context.Background(), nil, tt.hash, 0) + _, err := GetEVMTriggerLogFromValues(context.Background(), nil, tt.hash, 0, time.Minute) require.Error(t, err) assert.Contains(t, err.Error(), tt.errSub) }) @@ -156,7 +157,7 @@ func TestGetEVMTriggerLogFromValues_FetchError(t *testing.T) { txHash := "0x" + strings.Repeat("a", 64) m.errFor[strings.ToLower(txHash)] = fmt.Errorf("receipt not found") - _, err := GetEVMTriggerLogFromValues(context.Background(), c, txHash, 0) + _, err := GetEVMTriggerLogFromValues(context.Background(), c, txHash, 0, time.Minute) require.Error(t, err) assert.Contains(t, err.Error(), "failed to fetch transaction receipt") } @@ -182,7 +183,7 @@ func TestGetEVMTriggerLogFromValues_EventIndexOutOfRange(t *testing.T) { }) m.receipts[strings.ToLower(txHash)] = rec - _, err := GetEVMTriggerLogFromValues(context.Background(), c, txHash, 5) + _, err := GetEVMTriggerLogFromValues(context.Background(), c, txHash, 5, time.Minute) require.Error(t, err) assert.Contains(t, err.Error(), "event index 5 out of range") assert.Contains(t, err.Error(), "transaction has 1 log events") @@ -197,7 +198,7 @@ func TestGetEVMTriggerLogFromValues_ZeroLogs_OutOfRange(t *testing.T) { txHash := "0x" + strings.Repeat("c", 64) m.receipts[strings.ToLower(txHash)] = mkReceipt(hashFromHex(txHash), nil) - _, err := GetEVMTriggerLogFromValues(context.Background(), c, txHash, 0) + _, err := GetEVMTriggerLogFromValues(context.Background(), c, txHash, 0, time.Minute) require.Error(t, err) assert.Contains(t, err.Error(), "event index 0 out of range") assert.Contains(t, err.Error(), "transaction has 0 log events") @@ -229,7 +230,7 @@ func TestGetEVMTriggerLogFromValues_Success(t *testing.T) { }) m.receipts[strings.ToLower(txHash)] = rec - got, err := GetEVMTriggerLogFromValues(context.Background(), c, txHash, 0) + got, err := GetEVMTriggerLogFromValues(context.Background(), c, txHash, 0, time.Minute) require.NoError(t, err) require.NotNil(t, got) assert.Equal(t, log0Addr.Bytes(), got.Address) @@ -262,7 +263,7 @@ func TestGetEVMTriggerLogFromValues_SuccessNoTopicsLeavesEventSigNil(t *testing. }) m.receipts[strings.ToLower(txHash)] = rec - got, err := GetEVMTriggerLogFromValues(context.Background(), c, txHash, 0) + got, err := GetEVMTriggerLogFromValues(context.Background(), c, txHash, 0, time.Minute) require.NoError(t, err) assert.Empty(t, got.Topics) assert.Nil(t, got.EventSig) @@ -271,7 +272,7 @@ func TestGetEVMTriggerLogFromValues_SuccessNoTopicsLeavesEventSigNil(t *testing. func TestGetEVMTriggerLogFromValues_NoRPCWhenHashInvalid(t *testing.T) { t.Parallel() // Pass nil client; validation should fire before any RPC attempt. - _, err := GetEVMTriggerLogFromValues(context.Background(), nil, "not-a-hash", 0) + _, err := GetEVMTriggerLogFromValues(context.Background(), nil, "not-a-hash", 0, time.Minute) require.Error(t, err) assert.Contains(t, err.Error(), "must start with 0x") } @@ -295,7 +296,7 @@ func TestGetEVMTriggerLogFromValues_ZeroAddressLog(t *testing.T) { }) m.receipts[strings.ToLower(txHash)] = rec - got, err := GetEVMTriggerLogFromValues(context.Background(), c, txHash, 0) + got, err := GetEVMTriggerLogFromValues(context.Background(), c, txHash, 0, time.Minute) require.NoError(t, err) assert.Len(t, got.Address, 20) // 20-byte address always } From cc0e188949a1599e807ed744813b1efd6097db0e Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Mon, 8 Jun 2026 17:25:05 -0300 Subject: [PATCH 4/5] match by filter not by tx hash --- cmd/workflow/simulate/chain/evm/chaintype.go | 38 +- cmd/workflow/simulate/chain/evm/trigger.go | 342 +++++++++++--- .../simulate/chain/evm/trigger_test.go | 417 +++++++++++++++++- cmd/workflow/simulate/chain/types.go | 9 + cmd/workflow/simulate/simulate.go | 8 +- internal/ui/spinner.go | 9 +- 6 files changed, 740 insertions(+), 83 deletions(-) diff --git a/cmd/workflow/simulate/chain/evm/chaintype.go b/cmd/workflow/simulate/chain/evm/chaintype.go index 224b1641..0a91ad8c 100644 --- a/cmd/workflow/simulate/chain/evm/chaintype.go +++ b/cmd/workflow/simulate/chain/evm/chaintype.go @@ -288,7 +288,8 @@ func (ct *EVMChainType) CollectCLIInputs(v *viper.Viper) map[string]string { } // ResolveTriggerData fetches the EVM log payload for the given selector from -// CLI-supplied or interactively-prompted inputs. +// CLI-supplied inputs, falling back to live log subscription in interactive mode +// when no replay tx hash is given. func (ct *EVMChainType) ResolveTriggerData(ctx context.Context, selector uint64, params chain.TriggerParams) (interface{}, error) { clientIface, ok := params.Clients[selector] if !ok { @@ -308,18 +309,37 @@ func (ct *EVMChainType) ResolveTriggerData(ctx context.Context, selector uint64, receiptTimeout = d } - if params.Interactive { - return GetEVMTriggerLog(ctx, client, receiptTimeout) - } - txHash := strings.TrimSpace(params.ChainTypeInputs[TriggerInputTxHash]) eventIndexStr := strings.TrimSpace(params.ChainTypeInputs[TriggerInputEventIndex]) - if txHash == "" || eventIndexStr == "" { + + // Replay path: explicit --evm-tx-hash. Works in both interactive and + // non-interactive modes for deterministic testing / CI. + if txHash != "" { + if eventIndexStr == "" { + return nil, fmt.Errorf("--evm-event-index is required when --evm-tx-hash is provided") + } + eventIndex, err := strconv.ParseUint(eventIndexStr, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid --evm-event-index %q: %w", eventIndexStr, err) + } + if params.Interactive { + printEVMTriggerReplayHeader(selector, txHash, eventIndex) + } + return GetEVMTriggerLogFromValues(ctx, client, txHash, eventIndex, receiptTimeout) + } + + if !params.Interactive { return nil, fmt.Errorf("--evm-tx-hash and --evm-event-index are required for EVM triggers in non-interactive mode") } - eventIndex, err := strconv.ParseUint(eventIndexStr, 10, 64) + + // Interactive wait-for-log path. + cfg, err := decodeLogTriggerConfig(params.TriggerPayload) if err != nil { - return nil, fmt.Errorf("invalid --evm-event-index %q: %w", eventIndexStr, err) + return nil, fmt.Errorf("failed to decode EVM log trigger config: %w", err) } - return GetEVMTriggerLogFromValues(ctx, client, txHash, eventIndex, receiptTimeout) + return WaitForEVMTriggerLog(ctx, client, WaitForLogConfig{ + Selector: selector, + Filter: cfg, + WorkflowName: params.WorkflowName, + }) } diff --git a/cmd/workflow/simulate/chain/evm/trigger.go b/cmd/workflow/simulate/chain/evm/trigger.go index d5a674aa..0359aeaa 100644 --- a/cmd/workflow/simulate/chain/evm/trigger.go +++ b/cmd/workflow/simulate/chain/evm/trigger.go @@ -6,7 +6,6 @@ import ( "fmt" "math" "math/big" - "strconv" "strings" "time" @@ -14,75 +13,292 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" + "google.golang.org/protobuf/types/known/anypb" evmpb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/evm" valuespb "github.com/smartcontractkit/chainlink-protos/cre/go/values/pb" + "github.com/smartcontractkit/cre-cli/internal/settings" "github.com/smartcontractkit/cre-cli/internal/ui" ) -// GetEVMTriggerLog prompts user for EVM trigger data and fetches the log interactively. -// receiptTimeout controls how long to wait for the transaction receipt before giving up. -func GetEVMTriggerLog(ctx context.Context, ethClient *ethclient.Client, receiptTimeout time.Duration) (*evmpb.Log, error) { - var txHashInput string - var eventIndexInput string +// defaultWaitForLogPollInterval is how often WaitForEVMTriggerLog polls the RPC +// for new matching logs. Kept conservative so a typical wait doesn't hammer +// public RPCs. +const defaultWaitForLogPollInterval = 3 * time.Second - ui.Line() - if err := ui.InputForm([]ui.InputField{ - { - Title: "EVM Trigger Configuration", - Description: "Transaction hash for the EVM log event", - Placeholder: "0x...", - Value: &txHashInput, - Validate: func(s string) error { - s = strings.TrimSpace(s) - if s == "" { - return fmt.Errorf("transaction hash cannot be empty") - } - if !strings.HasPrefix(s, "0x") { - return fmt.Errorf("transaction hash must start with 0x") +// rescanOverlapBlocks is how many blocks behind the chain tip we re-scan on +// every poll iteration. Some public RPCs (notably load-balanced endpoints like +// publicnode) return a fresh block header from eth_blockNumber a few seconds +// before the same block's logs are visible to eth_getLogs. Without this +// overlap, we'd advance fromBlock past a block whose log hadn't been indexed +// yet and never re-check it. +const rescanOverlapBlocks = 5 + +// WaitForLogConfig describes a workflow's EVM log trigger subscription so the +// simulator can wait for the next matching on-chain event. +type WaitForLogConfig struct { + Selector uint64 + Filter *evmpb.FilterLogTriggerRequest + WorkflowName string + // PollInterval overrides the polling cadence for tests. Zero means default. + PollInterval time.Duration + // NowBlock overrides the initial "latest block" lookup for tests. When nil, + // HeaderByNumber(nil) is used. + NowBlock *big.Int +} + +// WaitForEVMTriggerLog blocks until a log matching the workflow's EVM log +// trigger config appears on chain, then converts and returns it. Cancel ctx +// (e.g. Ctrl+C) to abort the wait. +// +// Status output is written with plain ui.Print/ui.Dim rather than a bubbletea +// spinner. Bubble Tea puts the terminal in raw mode, which strips the OS +// translation of Ctrl+C into SIGINT — so signal.NotifyContext in the simulator +// would never fire while a spinner was active and the wait could not be +// interrupted. +func WaitForEVMTriggerLog(ctx context.Context, ethClient *ethclient.Client, cfg WaitForLogConfig) (*evmpb.Log, error) { + if cfg.Filter == nil || len(cfg.Filter.GetAddresses()) == 0 { + return nil, fmt.Errorf("EVM log trigger config is missing contract addresses; cannot wait for a matching event") + } + + addresses := make([]common.Address, 0, len(cfg.Filter.GetAddresses())) + for _, a := range cfg.Filter.GetAddresses() { + addresses = append(addresses, common.BytesToAddress(a)) + } + topics := topicsToFilter(cfg.Filter.GetTopics()) + + printEVMTriggerWaitHeader(cfg.Selector, addresses, topics, cfg.WorkflowName) + + poll := cfg.PollInterval + if poll <= 0 { + poll = defaultWaitForLogPollInterval + } + + var fromBlock *big.Int + if cfg.NowBlock != nil { + fromBlock = new(big.Int).Set(cfg.NowBlock) + } else { + head, err := ethClient.HeaderByNumber(ctx, nil) + if err != nil { + return nil, fmt.Errorf("failed to fetch latest block: %w", err) + } + fromBlock = new(big.Int).Set(head.Number) + } + ui.Dim(fmt.Sprintf("Listening for logs starting at block %s...", fromBlock.String())) + + ticker := time.NewTicker(poll) + defer ticker.Stop() + + // heartbeatEvery controls how often we print a "still waiting" line so the + // user knows the poller is alive on long waits. We use poll-iteration count + // instead of a separate timer to keep the loop single-threaded. + const heartbeatEvery = 10 + iter := 0 + + for { + head, err := ethClient.HeaderByNumber(ctx, nil) + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil, err + } + ui.Dim(fmt.Sprintf("RPC error fetching latest block: %v (retrying)", err)) + } else if head.Number.Cmp(fromBlock) >= 0 { + // Scan the closed range [fromBlock, head] in one query so we can't + // skip blocks that landed between polls. + query := ethereum.FilterQuery{ + FromBlock: new(big.Int).Set(fromBlock), + ToBlock: new(big.Int).Set(head.Number), + Addresses: addresses, + Topics: topics, + } + logs, err := ethClient.FilterLogs(ctx, query) + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil, err } - if len(s) != 66 { - return fmt.Errorf("invalid transaction hash length: expected 66 characters, got %d", len(s)) + ui.Dim(fmt.Sprintf("RPC error scanning blocks %s..%s: %v (retrying)", fromBlock, head.Number, err)) + } else if len(logs) > 0 { + ui.Success(fmt.Sprintf("Matching EVM log event found at block %d (tx %s, index %d)", + logs[0].BlockNumber, logs[0].TxHash.Hex(), logs[0].Index)) + return convertGethLog(&logs[0]) + } else { + // Advance past the range we just successfully scanned, but + // keep a small overlap behind the chain tip so we re-check + // recent blocks on the next iteration. This guards against + // RPCs whose log index lags eth_blockNumber by a few seconds. + nextFrom := new(big.Int).Add(head.Number, big.NewInt(1)) + rescanFrom := new(big.Int).Sub(head.Number, big.NewInt(int64(rescanOverlapBlocks-1))) + if rescanFrom.Cmp(nextFrom) < 0 { + nextFrom = rescanFrom } - return nil - }, - }, - { - Title: "Event Index", - Description: "Log event index (0-based)", - Placeholder: "0", - Suggestions: []string{"0"}, - Value: &eventIndexInput, - Validate: func(s string) error { - if strings.TrimSpace(s) == "" { - return fmt.Errorf("event index cannot be empty") + if nextFrom.Cmp(fromBlock) > 0 { + fromBlock = nextFrom } - if _, err := strconv.ParseUint(strings.TrimSpace(s), 10, 32); err != nil { - return fmt.Errorf("invalid event index: must be a number") + iter++ + if iter%heartbeatEvery == 0 { + ui.Dim(fmt.Sprintf("Still waiting (scanned through block %s)...", head.Number.String())) } - return nil - }, - }, - }); err != nil { - return nil, fmt.Errorf("EVM trigger input cancelled: %w", err) + } + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + } } +} - txHashInput = strings.TrimSpace(txHashInput) - txHash := common.HexToHash(txHashInput) +// topicsToFilter converts the protobuf topic-values structure (4 slots of +// possible values per topic) into the [][]common.Hash form ethclient expects. +func topicsToFilter(in []*evmpb.TopicValues) [][]common.Hash { + if len(in) == 0 { + return nil + } + out := make([][]common.Hash, 0, len(in)) + for _, slot := range in { + vals := slot.GetValues() + if len(vals) == 0 { + out = append(out, nil) // wildcard for this slot + continue + } + hashes := make([]common.Hash, 0, len(vals)) + for _, v := range vals { + hashes = append(hashes, common.BytesToHash(v)) + } + out = append(out, hashes) + } + return out +} - eventIndexInput = strings.TrimSpace(eventIndexInput) - eventIndex, err := strconv.ParseUint(eventIndexInput, 10, 32) - if err != nil { - return nil, fmt.Errorf("invalid event index: %w", err) +// decodeLogTriggerConfig unmarshals the TriggerSubscription's Any payload into +// the EVM FilterLogTriggerRequest message. Returns an error if the payload is +// missing or of the wrong message type. +func decodeLogTriggerConfig(payload *anypb.Any) (*evmpb.FilterLogTriggerRequest, error) { + if payload == nil { + return nil, fmt.Errorf("trigger subscription has no payload") + } + cfg := &evmpb.FilterLogTriggerRequest{} + if err := payload.UnmarshalTo(cfg); err != nil { + return nil, fmt.Errorf("payload is not a FilterLogTriggerRequest: %w", err) } + return cfg, nil +} - return fetchAndConvertLog(ctx, ethClient, txHash, eventIndex, true, receiptTimeout) +// printEVMTriggerWaitHeader prints the structured "what we're waiting for" +// summary used in interactive mode when no replay tx hash is provided. +func printEVMTriggerWaitHeader(selector uint64, addresses []common.Address, topics [][]common.Hash, workflowName string) { + chainName := chainNameFromSelector(selector) + ui.Line() + ui.Print(ui.RenderBold("EVM log trigger selected.")) + ui.Print("") + ui.Print("Waiting for a matching EVM log event...") + ui.Print(fmt.Sprintf("Chain: %s", chainName)) + ui.Print(fmt.Sprintf("Contract: %s", formatAddresses(addresses))) + ui.Print(fmt.Sprintf("Event: %s", formatEventTopic(topics))) + // Surface any extra indexed-arg constraints (topics[1..3]) so users can + // tell when their tx isn't matching because of an indexed-arg filter, + // not just the event signature. + for i, slot := range extraTopicLines(topics) { + ui.Print(fmt.Sprintf("Topic[%d]: %s", i+1, slot)) + } + ui.Print("Press Ctrl+C to stop the simulation.") + ui.Print("") + ui.Print("If you already have a transaction hash, restart with:") + ui.Print(fmt.Sprintf(" %s", ui.RenderCommand(replayCommandHint(workflowName)))) + ui.Line() +} + +// extraTopicLines formats topics[1..3] for display, returning one string per +// non-empty constraint slot. Empty/wildcard slots between concrete ones are +// rendered as "(any)" so positional context is preserved. +func extraTopicLines(topics [][]common.Hash) []string { + if len(topics) <= 1 { + return nil + } + // Trim trailing wildcard slots so we don't print "(any)" lines that + // carry no information. + lastConcrete := -1 + for i := 1; i < len(topics); i++ { + if len(topics[i]) > 0 { + lastConcrete = i + } + } + if lastConcrete < 1 { + return nil + } + out := make([]string, 0, lastConcrete) + for i := 1; i <= lastConcrete; i++ { + if len(topics[i]) == 0 { + out = append(out, "(any)") + continue + } + parts := make([]string, 0, len(topics[i])) + for _, h := range topics[i] { + parts = append(parts, h.Hex()) + } + out = append(out, strings.Join(parts, ", ")) + } + return out +} + +// printEVMTriggerReplayHeader prints the matching summary for the replay path +// (interactive with --evm-tx-hash). Keeps both flows symmetric so users know +// what's about to happen and how to abort. +func printEVMTriggerReplayHeader(selector uint64, txHash string, eventIndex uint64) { + chainName := chainNameFromSelector(selector) + ui.Line() + ui.Print(ui.RenderBold("EVM log trigger selected.")) + ui.Print("") + ui.Print("Replaying log event from a known transaction...") + ui.Print(fmt.Sprintf("Chain: %s", chainName)) + ui.Print(fmt.Sprintf("Transaction: %s", txHash)) + ui.Print(fmt.Sprintf("Event index: %d", eventIndex)) + ui.Print("Press Ctrl+C to stop the simulation.") + ui.Line() +} + +func chainNameFromSelector(selector uint64) string { + if name, err := settings.GetChainNameByChainSelector(selector); err == nil && name != "" { + return name + } + return fmt.Sprintf("chain-selector %d", selector) +} + +func formatAddresses(addrs []common.Address) string { + if len(addrs) == 0 { + return "(any)" + } + parts := make([]string, 0, len(addrs)) + for _, a := range addrs { + parts = append(parts, a.Hex()) + } + return strings.Join(parts, ", ") +} + +// formatEventTopic renders topic[0] (the event signature hash) if it's set; a +// workflow's EVM log trigger always pins topic[0] to at least one event sig. +func formatEventTopic(topics [][]common.Hash) string { + if len(topics) == 0 || len(topics[0]) == 0 { + return "(any)" + } + parts := make([]string, 0, len(topics[0])) + for _, h := range topics[0] { + parts = append(parts, h.Hex()) + } + return strings.Join(parts, ", ") +} + +func replayCommandHint(workflowName string) string { + if strings.TrimSpace(workflowName) == "" { + workflowName = "" + } + return fmt.Sprintf("cre workflow simulate %s --evm-tx-hash 0x... --evm-event-index 0", workflowName) } // GetEVMTriggerLogFromValues fetches a log given tx hash string and event index. -// Unlike GetEVMTriggerLog (interactive), this does not emit ui.Success messages -// to keep non-interactive/CI output clean. +// Used by the deterministic replay path (both interactive and non-interactive). // receiptTimeout controls how long to wait for the transaction receipt before giving up. func GetEVMTriggerLogFromValues(ctx context.Context, ethClient *ethclient.Client, txHashStr string, eventIndex uint64, receiptTimeout time.Duration) (*evmpb.Log, error) { txHashStr = strings.TrimSpace(txHashStr) @@ -97,13 +313,12 @@ func GetEVMTriggerLogFromValues(ctx context.Context, ethClient *ethclient.Client } txHash := common.HexToHash(txHashStr) - return fetchAndConvertLog(ctx, ethClient, txHash, eventIndex, false, receiptTimeout) + return fetchAndConvertLog(ctx, ethClient, txHash, eventIndex, receiptTimeout) } // fetchAndConvertLog fetches a transaction receipt log and converts it to the protobuf format. -// When verbose is true (interactive mode), ui.Success messages are emitted. // receiptTimeout controls how long to wait for the receipt before the context is cancelled. -func fetchAndConvertLog(ctx context.Context, ethClient *ethclient.Client, txHash common.Hash, eventIndex uint64, verbose bool, receiptTimeout time.Duration) (*evmpb.Log, error) { +func fetchAndConvertLog(ctx context.Context, ethClient *ethclient.Client, txHash common.Hash, eventIndex uint64, receiptTimeout time.Duration) (*evmpb.Log, error) { receiptSpinner := ui.NewSpinner() receiptSpinner.Start(fmt.Sprintf("Fetching transaction receipt for %s...", txHash.Hex())) @@ -119,29 +334,26 @@ func fetchAndConvertLog(ctx context.Context, ethClient *ethclient.Client, txHash return nil, fmt.Errorf("event index %d out of range, transaction has %d log events", eventIndex, len(txReceipt.Logs)) } - log := txReceipt.Logs[eventIndex] - if verbose { - ui.Success(fmt.Sprintf("Found log event at index %d: contract=%s, topics=%d", eventIndex, log.Address.Hex(), len(log.Topics))) - } + return convertGethLog(txReceipt.Logs[eventIndex]) +} - var txIndex, logIndex uint32 +// convertGethLog converts a go-ethereum types.Log into the protobuf evm.Log +// the simulator's trigger pipeline expects. +func convertGethLog(log *types.Log) (*evmpb.Log, error) { if log.TxIndex > math.MaxUint32 { return nil, fmt.Errorf("transaction index %d exceeds uint32 maximum value", log.TxIndex) } - txIndex = uint32(log.TxIndex) // #nosec G115 -- validated above - if log.Index > math.MaxUint32 { return nil, fmt.Errorf("log index %d exceeds uint32 maximum value", log.Index) } - logIndex = uint32(log.Index) // #nosec G115 -- validated above pbLog := &evmpb.Log{ Address: log.Address.Bytes(), Data: log.Data, BlockHash: log.BlockHash.Bytes(), TxHash: log.TxHash.Bytes(), - TxIndex: txIndex, - Index: logIndex, + TxIndex: uint32(log.TxIndex), // #nosec G115 -- validated above + Index: uint32(log.Index), // #nosec G115 -- validated above Removed: log.Removed, BlockNumber: valuespb.NewBigIntFromInt(new(big.Int).SetUint64(log.BlockNumber)), } @@ -151,10 +363,6 @@ func fetchAndConvertLog(ctx context.Context, ethClient *ethclient.Client, txHash if len(log.Topics) > 0 { pbLog.EventSig = log.Topics[0].Bytes() } - - if verbose { - ui.Success(fmt.Sprintf("Created EVM trigger log for transaction %s, event %d", txHash.Hex(), eventIndex)) - } return pbLog, nil } diff --git a/cmd/workflow/simulate/chain/evm/trigger_test.go b/cmd/workflow/simulate/chain/evm/trigger_test.go index 8cf059fe..5ae06be3 100644 --- a/cmd/workflow/simulate/chain/evm/trigger_test.go +++ b/cmd/workflow/simulate/chain/evm/trigger_test.go @@ -8,6 +8,7 @@ import ( "net/http" "net/http/httptest" "strings" + "sync" "testing" "time" @@ -15,6 +16,10 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/anypb" + + evmpb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/evm" + valuespb "github.com/smartcontractkit/chainlink-protos/cre/go/values/pb" ) const zero64 = "0x" + "0000000000000000000000000000000000000000000000000000000000000000" @@ -48,9 +53,13 @@ func TestGetEVMTriggerLogFromValues_Validation(t *testing.T) { } type mockRPC struct { - srv *httptest.Server - receipts map[string]*types.Receipt - errFor map[string]error + mu sync.Mutex + srv *httptest.Server + receipts map[string]*types.Receipt + errFor map[string]error + logs []*types.Log + headNumber uint64 + getLogsHook func() // optional callback invoked on each eth_getLogs call } func newMockRPC(t *testing.T) *mockRPC { @@ -90,6 +99,49 @@ func newMockRPC(t *testing.T) *mockRPC { resp["result"] = receiptToJSON(rec) case "eth_chainId": resp["result"] = "0x1" + case "eth_getBlockByNumber": + m.mu.Lock() + head := m.headNumber + m.mu.Unlock() + resp["result"] = map[string]any{ + "number": fmt.Sprintf("0x%x", head), + "hash": hashFromHex("0xab").Hex(), + "parentHash": hashFromHex("0xaa").Hex(), + "nonce": "0x0000000000000000", + "sha3Uncles": hashFromHex("0x00").Hex(), + "logsBloom": "0x" + strings.Repeat("00", 256), + "transactionsRoot": hashFromHex("0x00").Hex(), + "stateRoot": hashFromHex("0x00").Hex(), + "receiptsRoot": hashFromHex("0x00").Hex(), + "miner": "0x0000000000000000000000000000000000000000", + "difficulty": "0x0", + "extraData": "0x", + "size": "0x0", + "gasLimit": "0x0", + "gasUsed": "0x0", + "timestamp": "0x0", + "transactions": []string{}, + "uncles": []string{}, + "baseFeePerGas": "0x0", + } + case "eth_getLogs": + if m.getLogsHook != nil { + m.getLogsHook() + } + from, to := parseGetLogsRange(req.Params) + m.mu.Lock() + matching := make([]map[string]any, 0, len(m.logs)) + for _, l := range m.logs { + if from > 0 && l.BlockNumber < from { + continue + } + if to > 0 && l.BlockNumber > to { + continue + } + matching = append(matching, logToJSON(l)) + } + m.mu.Unlock() + resp["result"] = matching default: resp["error"] = map[string]any{"code": -32601, "message": "method not found"} } @@ -99,6 +151,53 @@ func newMockRPC(t *testing.T) *mockRPC { return m } +// parseGetLogsRange extracts the FromBlock/ToBlock from an eth_getLogs request +// param object. Returns 0 for missing or "latest"/"earliest" tags so the mock +// matches the relevant logs regardless. +func parseGetLogsRange(params []json.RawMessage) (uint64, uint64) { + if len(params) == 0 { + return 0, 0 + } + var arg struct { + FromBlock string `json:"fromBlock"` + ToBlock string `json:"toBlock"` + } + if err := json.Unmarshal(params[0], &arg); err != nil { + return 0, 0 + } + return parseHexBlock(arg.FromBlock), parseHexBlock(arg.ToBlock) +} + +func parseHexBlock(s string) uint64 { + s = strings.TrimSpace(s) + if !strings.HasPrefix(s, "0x") { + return 0 + } + v := new(big.Int) + if _, ok := v.SetString(strings.TrimPrefix(s, "0x"), 16); !ok { + return 0 + } + return v.Uint64() +} + +func logToJSON(l *types.Log) map[string]any { + tpcs := make([]string, 0, len(l.Topics)) + for _, t := range l.Topics { + tpcs = append(tpcs, t.Hex()) + } + return map[string]any{ + "address": l.Address.Hex(), + "topics": tpcs, + "data": "0x" + common.Bytes2Hex(l.Data), + "blockNumber": fmt.Sprintf("0x%x", l.BlockNumber), + "transactionHash": l.TxHash.Hex(), + "transactionIndex": fmt.Sprintf("0x%x", l.TxIndex), + "blockHash": l.BlockHash.Hex(), + "logIndex": fmt.Sprintf("0x%x", l.Index), + "removed": l.Removed, + } +} + func receiptToJSON(r *types.Receipt) map[string]any { logs := make([]map[string]any, 0, len(r.Logs)) for _, l := range r.Logs { @@ -277,6 +376,318 @@ func TestGetEVMTriggerLogFromValues_NoRPCWhenHashInvalid(t *testing.T) { assert.Contains(t, err.Error(), "must start with 0x") } +func TestDecodeLogTriggerConfig(t *testing.T) { + t.Parallel() + + t.Run("nil payload errors", func(t *testing.T) { + t.Parallel() + _, err := decodeLogTriggerConfig(nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "no payload") + }) + + t.Run("wrong message type errors", func(t *testing.T) { + t.Parallel() + msg, err := anypb.New(&evmpb.Log{}) + require.NoError(t, err) + _, err = decodeLogTriggerConfig(msg) + require.Error(t, err) + assert.Contains(t, err.Error(), "FilterLogTriggerRequest") + }) + + t.Run("round-trip", func(t *testing.T) { + t.Parallel() + addr := addrFromHex("0x1111111111111111111111111111111111111111") + eventSig := hashFromHex("0x" + strings.Repeat("2", 64)) + msg, err := anypb.New(&evmpb.FilterLogTriggerRequest{ + Addresses: [][]byte{addr.Bytes()}, + Topics: []*evmpb.TopicValues{{Values: [][]byte{eventSig.Bytes()}}}, + }) + require.NoError(t, err) + + cfg, err := decodeLogTriggerConfig(msg) + require.NoError(t, err) + require.Len(t, cfg.GetAddresses(), 1) + assert.Equal(t, addr.Bytes(), cfg.GetAddresses()[0]) + require.Len(t, cfg.GetTopics(), 1) + assert.Equal(t, eventSig.Bytes(), cfg.GetTopics()[0].GetValues()[0]) + }) +} + +func TestTopicsToFilter(t *testing.T) { + t.Parallel() + + t.Run("nil returns nil", func(t *testing.T) { + t.Parallel() + assert.Nil(t, topicsToFilter(nil)) + }) + + t.Run("empty slot becomes wildcard", func(t *testing.T) { + t.Parallel() + sig := hashFromHex("0x" + strings.Repeat("a", 64)) + got := topicsToFilter([]*evmpb.TopicValues{ + {Values: [][]byte{sig.Bytes()}}, + {Values: nil}, + }) + require.Len(t, got, 2) + require.Len(t, got[0], 1) + assert.Equal(t, sig, got[0][0]) + assert.Nil(t, got[1]) + }) + + t.Run("multiple values per slot", func(t *testing.T) { + t.Parallel() + a := hashFromHex("0x" + strings.Repeat("a", 64)) + b := hashFromHex("0x" + strings.Repeat("b", 64)) + got := topicsToFilter([]*evmpb.TopicValues{{Values: [][]byte{a.Bytes(), b.Bytes()}}}) + require.Len(t, got, 1) + require.Len(t, got[0], 2) + assert.Equal(t, a, got[0][0]) + assert.Equal(t, b, got[0][1]) + }) +} + +func TestWaitForEVMTriggerLog_NoAddressesErrors(t *testing.T) { + t.Parallel() + _, err := WaitForEVMTriggerLog(context.Background(), nil, WaitForLogConfig{ + Filter: &evmpb.FilterLogTriggerRequest{}, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "missing contract addresses") +} + +func TestWaitForEVMTriggerLog_ReturnsFirstMatchingLog(t *testing.T) { + t.Parallel() + + m := newMockRPC(t) + c := newEthClient(t, m.srv.URL) + defer c.Close() + + addr := addrFromHex("0xabcd000000000000000000000000000000000001") + sig := hashFromHex("0x" + strings.Repeat("c", 64)) + m.mu.Lock() + m.headNumber = 100 + m.logs = []*types.Log{ + { + Address: addr, + Topics: []common.Hash{sig}, + Data: []byte{0xab}, + BlockHash: hashFromHex("0xbb"), + TxHash: hashFromHex("0x" + strings.Repeat("d", 64)), + BlockNumber: 100, // matches the initial head so the first poll finds it + TxIndex: 2, + Index: 0, + }, + } + m.mu.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + got, err := WaitForEVMTriggerLog(ctx, c, WaitForLogConfig{ + Selector: 16015286601757825753, // ethereum-testnet-sepolia + Filter: &evmpb.FilterLogTriggerRequest{ + Addresses: [][]byte{addr.Bytes()}, + Topics: []*evmpb.TopicValues{{Values: [][]byte{sig.Bytes()}}}, + }, + PollInterval: 10 * time.Millisecond, + }) + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, addr.Bytes(), got.Address) + require.Len(t, got.Topics, 1) + assert.Equal(t, sig.Bytes(), got.Topics[0]) + assert.Equal(t, sig.Bytes(), got.EventSig) +} + +// TestWaitForEVMTriggerLog_DoesNotSkipBlocksBetweenPolls validates that the +// poller correctly scans the inclusive [fromBlock, head] range across +// iterations. The bug we're guarding against: advancing fromBlock to the new +// head without first scanning the blocks between the previous head and the +// new head, which dropped events that landed in that window. +func TestWaitForEVMTriggerLog_DoesNotSkipBlocksBetweenPolls(t *testing.T) { + t.Parallel() + + m := newMockRPC(t) + c := newEthClient(t, m.srv.URL) + defer c.Close() + + addr := addrFromHex("0xabcd000000000000000000000000000000000003") + sig := hashFromHex("0x" + strings.Repeat("e", 64)) + + // Simulate the chain advancing by 5 blocks between two consecutive polls + // and the matching log living in one of those intermediate blocks. + m.mu.Lock() + m.headNumber = 100 + m.mu.Unlock() + matchingLog := &types.Log{ + Address: addr, + Topics: []common.Hash{sig}, + Data: []byte{0xaa}, + BlockHash: hashFromHex("0xbe"), + TxHash: hashFromHex("0x" + strings.Repeat("f", 64)), + BlockNumber: 103, // Falls between initial head (100) and post-jump head (105) + TxIndex: 0, + Index: 0, + } + + getLogsCalls := 0 + m.getLogsHook = func() { + getLogsCalls++ + // After the first poll completes (with no logs), jump head forward + // and surface the matching log. The next iteration must scan + // [101, 105], not [105, 105]. + if getLogsCalls == 1 { + m.mu.Lock() + m.headNumber = 105 + m.logs = []*types.Log{matchingLog} + m.mu.Unlock() + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + got, err := WaitForEVMTriggerLog(ctx, c, WaitForLogConfig{ + Selector: 16015286601757825753, + Filter: &evmpb.FilterLogTriggerRequest{ + Addresses: [][]byte{addr.Bytes()}, + Topics: []*evmpb.TopicValues{{Values: [][]byte{sig.Bytes()}}}, + }, + PollInterval: 10 * time.Millisecond, + }) + require.NoError(t, err) + require.NotNil(t, got) + require.NotNil(t, got.BlockNumber) + assert.Equal(t, uint64(103), valuespb.NewIntFromBigInt(got.BlockNumber).Uint64()) +} + +func TestWaitForEVMTriggerLog_CancelsOnContext(t *testing.T) { + t.Parallel() + + m := newMockRPC(t) + c := newEthClient(t, m.srv.URL) + defer c.Close() + + addr := addrFromHex("0xabcd000000000000000000000000000000000002") + m.mu.Lock() + m.headNumber = 50 + m.mu.Unlock() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // No matching logs are ever set; cancel quickly to confirm the wait loop + // exits when ctx is done (the Ctrl+C analogue). + go func() { + time.Sleep(80 * time.Millisecond) + cancel() + }() + + _, err := WaitForEVMTriggerLog(ctx, c, WaitForLogConfig{ + Selector: 16015286601757825753, + Filter: &evmpb.FilterLogTriggerRequest{ + Addresses: [][]byte{addr.Bytes()}, + }, + PollInterval: 20 * time.Millisecond, + }) + require.Error(t, err) + assert.ErrorIs(t, err, context.Canceled) +} + +// TestWaitForEVMTriggerLog_RescansForRPCLag simulates an RPC that publishes +// the block header before the log index, then catches up a couple of polls +// later. With the rescan overlap, the wait loop must re-scan blocks that +// initially returned empty and pick up the late-indexed log instead of +// advancing past them. +func TestWaitForEVMTriggerLog_RescansForRPCLag(t *testing.T) { + t.Parallel() + + m := newMockRPC(t) + c := newEthClient(t, m.srv.URL) + defer c.Close() + + addr := addrFromHex("0xabcd000000000000000000000000000000000004") + sig := hashFromHex("0x" + strings.Repeat("d", 64)) + + // Initial chain state: head at 200, no logs visible to eth_getLogs yet. + m.mu.Lock() + m.headNumber = 200 + m.mu.Unlock() + + lateLog := &types.Log{ + Address: addr, + Topics: []common.Hash{sig}, + Data: []byte{0xab}, + BlockHash: hashFromHex("0xbe"), + TxHash: hashFromHex("0x" + strings.Repeat("a", 64)), + BlockNumber: 200, // the log exists in the current tip block... + TxIndex: 0, + Index: 0, + } + + hookCalls := 0 + m.getLogsHook = func() { + hookCalls++ + // Publish the log only after the wait loop has already scanned and + // found nothing. Without rescan-overlap, the loop would advance past + // block 200 and never see this log. + if hookCalls == 2 { + m.mu.Lock() + m.logs = []*types.Log{lateLog} + m.mu.Unlock() + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + got, err := WaitForEVMTriggerLog(ctx, c, WaitForLogConfig{ + Selector: 16015286601757825753, + Filter: &evmpb.FilterLogTriggerRequest{ + Addresses: [][]byte{addr.Bytes()}, + Topics: []*evmpb.TopicValues{{Values: [][]byte{sig.Bytes()}}}, + }, + PollInterval: 10 * time.Millisecond, + }) + require.NoError(t, err) + require.NotNil(t, got) + require.NotNil(t, got.BlockNumber) + assert.Equal(t, uint64(200), valuespb.NewIntFromBigInt(got.BlockNumber).Uint64()) +} + +func TestExtraTopicLines(t *testing.T) { + t.Parallel() + + sig := hashFromHex("0x" + strings.Repeat("1", 64)) + indexed := hashFromHex("0x" + strings.Repeat("2", 64)) + + t.Run("nil or single-slot returns nil", func(t *testing.T) { + t.Parallel() + assert.Nil(t, extraTopicLines(nil)) + assert.Nil(t, extraTopicLines([][]common.Hash{{sig}})) + }) + + t.Run("trailing wildcards trimmed", func(t *testing.T) { + t.Parallel() + assert.Nil(t, extraTopicLines([][]common.Hash{{sig}, nil, nil})) + }) + + t.Run("intermediate wildcard rendered as any", func(t *testing.T) { + t.Parallel() + got := extraTopicLines([][]common.Hash{{sig}, nil, {indexed}}) + require.Len(t, got, 2) + assert.Equal(t, "(any)", got[0]) + assert.Equal(t, indexed.Hex(), got[1]) + }) +} + +func TestReplayCommandHint(t *testing.T) { + t.Parallel() + assert.Contains(t, replayCommandHint("limitator-workflow"), "cre workflow simulate limitator-workflow --evm-tx-hash") + assert.Contains(t, replayCommandHint(""), "") +} + func TestGetEVMTriggerLogFromValues_ZeroAddressLog(t *testing.T) { t.Parallel() m := newMockRPC(t) diff --git a/cmd/workflow/simulate/chain/types.go b/cmd/workflow/simulate/chain/types.go index 12f8c1cb..dc398780 100644 --- a/cmd/workflow/simulate/chain/types.go +++ b/cmd/workflow/simulate/chain/types.go @@ -1,6 +1,8 @@ package chain import ( + "google.golang.org/protobuf/types/known/anypb" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink/v2/core/capabilities" ) @@ -53,4 +55,11 @@ type TriggerParams struct { Clients map[uint64]ChainClient Interactive bool ChainTypeInputs map[string]string + // TriggerPayload is the protobuf Any payload from the selected + // pb.TriggerSubscription. Chain types unmarshal it into their own + // trigger-config message (e.g. evm.FilterLogTriggerRequest) to learn what + // the workflow is actually subscribed to. + TriggerPayload *anypb.Any + // WorkflowName is used to render replay-command hints in interactive copy. + WorkflowName string } diff --git a/cmd/workflow/simulate/simulate.go b/cmd/workflow/simulate/simulate.go index fe95ed2b..eb5af07d 100644 --- a/cmd/workflow/simulate/simulate.go +++ b/cmd/workflow/simulate/simulate.go @@ -755,7 +755,7 @@ func makeBeforeStartInteractive(holder *TriggerInfoAndBeforeStart, inputs Inputs os.Exit(1) } - triggerData, err := getTriggerDataForChainType(ctx, ct, sel, inputs, true) + triggerData, err := getTriggerDataForChainType(ctx, ct, sel, holder.TriggerToRun, inputs, true) if err != nil { ui.Error(fmt.Sprintf("Failed to get %s trigger data: %v", name, err)) os.Exit(1) @@ -838,7 +838,7 @@ func makeBeforeStartNonInteractive(holder *TriggerInfoAndBeforeStart, inputs Inp os.Exit(1) } - triggerData, err := getTriggerDataForChainType(ctx, ct, sel, inputs, false) + triggerData, err := getTriggerDataForChainType(ctx, ct, sel, holder.TriggerToRun, inputs, false) if err != nil { ui.Error(fmt.Sprintf("Failed to get %s trigger data: %v", name, err)) os.Exit(1) @@ -947,11 +947,13 @@ func getHTTPTriggerPayload(invocationDir string) (*httptypedapi.Payload, error) // getTriggerDataForChainType resolves trigger data for a specific chain type. // Each chain type defines its own trigger data format. -func getTriggerDataForChainType(ctx context.Context, ct chain.ChainType, selector uint64, inputs Inputs, interactive bool) (interface{}, error) { +func getTriggerDataForChainType(ctx context.Context, ct chain.ChainType, selector uint64, triggerSub *pb.TriggerSubscription, inputs Inputs, interactive bool) (interface{}, error) { return ct.ResolveTriggerData(ctx, selector, chain.TriggerParams{ Clients: inputs.ChainTypeClients[ct.Name()], Interactive: interactive, ChainTypeInputs: inputs.ChainTypeInputs, + TriggerPayload: triggerSub.GetPayload(), + WorkflowName: inputs.WorkflowName, }) } diff --git a/internal/ui/spinner.go b/internal/ui/spinner.go index 6ecc2c25..39359824 100644 --- a/internal/ui/spinner.go +++ b/internal/ui/spinner.go @@ -3,6 +3,7 @@ package ui import ( "fmt" "os" + "strings" "sync" "github.com/charmbracelet/bubbles/spinner" @@ -129,7 +130,13 @@ func (s *Spinner) Start(message string) { s.quitCh = make(chan struct{}) model := newSpinnerModel(message) - s.program = tea.NewProgram(model, tea.WithOutput(os.Stderr)) + // Pass an empty reader so bubbletea does not take stdin and call + // term.MakeRaw on it. With raw mode on, the terminal driver no longer + // translates Ctrl+C into SIGINT — so signal.NotifyContext would never + // fire and long-running operations (e.g. waiting for an EVM log event) + // could not be interrupted. The spinner only needs its own ticker to + // animate; it never reads user input. + s.program = tea.NewProgram(model, tea.WithOutput(os.Stderr), tea.WithInput(strings.NewReader(""))) // Run the program in a goroutine go func() { From 2f0c157bad4560f2ea2baa51da9a863993e7c051 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Mon, 8 Jun 2026 17:33:55 -0300 Subject: [PATCH 5/5] lint and docs --- cmd/workflow/simulate/chain/evm/trigger.go | 8 +++--- .../simulate/chain/evm/trigger_test.go | 6 +++-- docs/cre_workflow_simulate.md | 27 ++++++++++--------- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/cmd/workflow/simulate/chain/evm/trigger.go b/cmd/workflow/simulate/chain/evm/trigger.go index 0359aeaa..3f629f35 100644 --- a/cmd/workflow/simulate/chain/evm/trigger.go +++ b/cmd/workflow/simulate/chain/evm/trigger.go @@ -229,13 +229,13 @@ func extraTopicLines(topics [][]common.Hash) []string { return nil } out := make([]string, 0, lastConcrete) - for i := 1; i <= lastConcrete; i++ { - if len(topics[i]) == 0 { + for _, slot := range topics[1 : lastConcrete+1] { + if len(slot) == 0 { out = append(out, "(any)") continue } - parts := make([]string, 0, len(topics[i])) - for _, h := range topics[i] { + parts := make([]string, 0, len(slot)) + for _, h := range slot { parts = append(parts, h.Hex()) } out = append(out, strings.Join(parts, ", ")) diff --git a/cmd/workflow/simulate/chain/evm/trigger_test.go b/cmd/workflow/simulate/chain/evm/trigger_test.go index 5ae06be3..b04761ea 100644 --- a/cmd/workflow/simulate/chain/evm/trigger_test.go +++ b/cmd/workflow/simulate/chain/evm/trigger_test.go @@ -154,7 +154,7 @@ func newMockRPC(t *testing.T) *mockRPC { // parseGetLogsRange extracts the FromBlock/ToBlock from an eth_getLogs request // param object. Returns 0 for missing or "latest"/"earliest" tags so the mock // matches the relevant logs regardless. -func parseGetLogsRange(params []json.RawMessage) (uint64, uint64) { +func parseGetLogsRange(params []json.RawMessage) (fromBlock, toBlock uint64) { if len(params) == 0 { return 0, 0 } @@ -165,7 +165,9 @@ func parseGetLogsRange(params []json.RawMessage) (uint64, uint64) { if err := json.Unmarshal(params[0], &arg); err != nil { return 0, 0 } - return parseHexBlock(arg.FromBlock), parseHexBlock(arg.ToBlock) + fromBlock = parseHexBlock(arg.FromBlock) + toBlock = parseHexBlock(arg.ToBlock) + return fromBlock, toBlock } func parseHexBlock(s string) uint64 { diff --git a/docs/cre_workflow_simulate.md b/docs/cre_workflow_simulate.md index af5bd1fa..aa13da04 100644 --- a/docs/cre_workflow_simulate.md +++ b/docs/cre_workflow_simulate.md @@ -19,19 +19,20 @@ cre workflow simulate ./my-workflow ### Options ``` - --broadcast Broadcast transactions to the EVM (default: false) - --config string Override the config file path from workflow.yaml - --default-config Use the config path from workflow.yaml settings (default behavior) - -g, --engine-logs Enable non-fatal engine logging - --evm-event-index int EVM trigger log index (0-based) (default -1) - --evm-tx-hash string EVM trigger transaction hash (0x...) - -h, --help help for simulate - --http-payload string HTTP trigger payload as JSON string or path to JSON file (with or without @ prefix) - --limits string Production limits to enforce during simulation: 'default' for prod defaults, path to a limits JSON file (e.g. from 'cre workflow limits export'), or 'none' to disable (default "default") - --no-config Simulate without a config file - --skip-type-checks Skip TypeScript project typecheck during compilation (passes --skip-type-checks to cre-compile) - --trigger-index int Index of the trigger to run (0-based) (default -1) - --wasm string Path or URL to a pre-built WASM binary (skips compilation) + --broadcast Broadcast transactions to the EVM (default: false) + --config string Override the config file path from workflow.yaml + --default-config Use the config path from workflow.yaml settings (default behavior) + -g, --engine-logs Enable non-fatal engine logging + --evm-event-index int EVM trigger log index (0-based) (default -1) + --evm-receipt-timeout string Timeout for waiting on an EVM transaction receipt (e.g. 30s, 2m) (default "1m") + --evm-tx-hash string EVM trigger transaction hash (0x...) + -h, --help help for simulate + --http-payload string HTTP trigger payload as JSON string or path to JSON file (with or without @ prefix) + --limits string Production limits to enforce during simulation: 'default' for prod defaults, path to a limits JSON file (e.g. from 'cre workflow limits export'), or 'none' to disable (default "default") + --no-config Simulate without a config file + --skip-type-checks Skip TypeScript project typecheck during compilation (passes --skip-type-checks to cre-compile) + --trigger-index int Index of the trigger to run (0-based) (default -1) + --wasm string Path or URL to a pre-built WASM binary (skips compilation) ``` ### Options inherited from parent commands