diff --git a/.tool-versions b/.tool-versions index 9a45258b..71d117e2 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,4 +1,4 @@ -golang 1.25.5 +golang 1.26.2 golangci-lint 2.11.2 goreleaser 2.0.1 python 3.10.5 diff --git a/cmd/common/compile.go b/cmd/common/compile.go index 2456ec77..d500e6e9 100644 --- a/cmd/common/compile.go +++ b/cmd/common/compile.go @@ -1,6 +1,7 @@ package common import ( + "context" "errors" "fmt" "os" @@ -33,7 +34,7 @@ type WorkflowCompileOptions struct { } // getBuildCmd returns a single step that builds the workflow and returns the WASM bytes. -func getBuildCmd(workflowRootFolder, mainFile, language string, opts WorkflowCompileOptions) (func() ([]byte, error), error) { +func getBuildCmd(ctx context.Context, workflowRootFolder, mainFile, language string, opts WorkflowCompileOptions) (func() ([]byte, error), error) { tmpPath := filepath.Join(workflowRootFolder, ".cre_build_tmp.wasm") switch language { case constants.WorkflowLanguageTypeScript: @@ -41,7 +42,7 @@ func getBuildCmd(workflowRootFolder, mainFile, language string, opts WorkflowCom if opts.SkipTypeChecks { args = append(args, SkipTypeChecksFlag) } - cmd := exec.Command("bun", args...) + cmd := exec.CommandContext(ctx, "bun", args...) cmd.Dir = workflowRootFolder return func() ([]byte, error) { out, err := cmd.CombinedOutput() @@ -67,7 +68,7 @@ func getBuildCmd(workflowRootFolder, mainFile, language string, opts WorkflowCom if opts.StripSymbols { ldflags = "-buildid= -w -s" } - cmd := exec.Command( + cmd := exec.CommandContext(ctx, "go", "build", "-o", tmpPath, "-trimpath", @@ -92,7 +93,7 @@ func getBuildCmd(workflowRootFolder, mainFile, language string, opts WorkflowCom if err != nil { return nil, err } - makeCmd := exec.Command("make", "build") + makeCmd := exec.CommandContext(ctx, "make", "build") makeCmd.Dir = makeRoot builtPath := filepath.Join(makeRoot, defaultWasmOutput) return func() ([]byte, error) { @@ -108,7 +109,7 @@ func getBuildCmd(workflowRootFolder, mainFile, language string, opts WorkflowCom if opts.StripSymbols { ldflags = "-buildid= -w -s" } - cmd := exec.Command( + cmd := exec.CommandContext(ctx, "go", "build", "-o", tmpPath, "-trimpath", @@ -135,7 +136,7 @@ func getBuildCmd(workflowRootFolder, mainFile, language string, opts WorkflowCom // opts.StripSymbols: for Go builds, true strips debug symbols (deploy); false keeps them (simulate). // opts.SkipTypeChecks: for TypeScript, passes SkipTypeChecksFlag to cre-compile. // For custom Makefile WASM builds, StripSymbols and SkipTypeChecks have no effect. -func CompileWorkflowToWasm(workflowPath string, opts WorkflowCompileOptions) ([]byte, error) { +func CompileWorkflowToWasm(ctx context.Context, workflowPath string, opts WorkflowCompileOptions) ([]byte, error) { workflowRootFolder, workflowMainFile, err := WorkflowPathRootAndMain(workflowPath) if err != nil { return nil, fmt.Errorf("workflow path: %w", err) @@ -167,7 +168,7 @@ func CompileWorkflowToWasm(workflowPath string, opts WorkflowCompileOptions) ([] return nil, fmt.Errorf("unsupported workflow language for file %s", workflowMainFile) } - buildStep, err := getBuildCmd(workflowRootFolder, workflowMainFile, language, opts) + buildStep, err := getBuildCmd(ctx, workflowRootFolder, workflowMainFile, language, opts) if err != nil { return nil, err } diff --git a/cmd/common/compile_test.go b/cmd/common/compile_test.go index fdc7dc3d..fa4aa42c 100644 --- a/cmd/common/compile_test.go +++ b/cmd/common/compile_test.go @@ -2,6 +2,7 @@ package common import ( "bytes" + "context" "io" "os" "os/exec" @@ -47,21 +48,21 @@ func TestFindMakefileRoot(t *testing.T) { func TestCompileWorkflowToWasm_Go_Success(t *testing.T) { t.Run("basic_workflow", func(t *testing.T) { path := deployTestdataPath("basic_workflow", "main.go") - wasm, err := CompileWorkflowToWasm(path, WorkflowCompileOptions{StripSymbols: true}) + wasm, err := CompileWorkflowToWasm(context.Background(), path, WorkflowCompileOptions{StripSymbols: true}) require.NoError(t, err) assert.NotEmpty(t, wasm) }) t.Run("configless_workflow", func(t *testing.T) { path := deployTestdataPath("configless_workflow", "main.go") - wasm, err := CompileWorkflowToWasm(path, WorkflowCompileOptions{StripSymbols: true}) + wasm, err := CompileWorkflowToWasm(context.Background(), path, WorkflowCompileOptions{StripSymbols: true}) require.NoError(t, err) assert.NotEmpty(t, wasm) }) t.Run("missing_go_mod", func(t *testing.T) { path := deployTestdataPath("missing_go_mod", "main.go") - wasm, err := CompileWorkflowToWasm(path, WorkflowCompileOptions{StripSymbols: true}) + wasm, err := CompileWorkflowToWasm(context.Background(), path, WorkflowCompileOptions{StripSymbols: true}) require.NoError(t, err) assert.NotEmpty(t, wasm) }) @@ -69,7 +70,7 @@ func TestCompileWorkflowToWasm_Go_Success(t *testing.T) { func TestCompileWorkflowToWasm_Go_Malformed_Fails(t *testing.T) { path := deployTestdataPath("malformed_workflow", "main.go") - _, err := CompileWorkflowToWasm(path, WorkflowCompileOptions{StripSymbols: true}) + _, err := CompileWorkflowToWasm(context.Background(), path, WorkflowCompileOptions{StripSymbols: true}) require.Error(t, err) assert.Contains(t, err.Error(), "failed to compile workflow") assert.Contains(t, err.Error(), "undefined: sdk.RemovedFunctionThatFailsCompilation") @@ -80,7 +81,7 @@ func TestCompileWorkflowToWasm_Wasm_Success(t *testing.T) { _ = os.Remove(wasmPath) t.Cleanup(func() { _ = os.Remove(wasmPath) }) - wasm, err := CompileWorkflowToWasm(wasmPath, WorkflowCompileOptions{StripSymbols: true}) + wasm, err := CompileWorkflowToWasm(context.Background(), wasmPath, WorkflowCompileOptions{StripSymbols: true}) require.NoError(t, err) assert.NotEmpty(t, wasm) @@ -96,14 +97,14 @@ func TestCompileWorkflowToWasm_Wasm_Fails(t *testing.T) { wasmPath := filepath.Join(wasmDir, "workflow.wasm") require.NoError(t, os.WriteFile(wasmPath, []byte("not really wasm"), 0600)) - _, err := CompileWorkflowToWasm(wasmPath, WorkflowCompileOptions{StripSymbols: true}) + _, err := CompileWorkflowToWasm(context.Background(), wasmPath, WorkflowCompileOptions{StripSymbols: true}) require.Error(t, err) assert.Contains(t, err.Error(), "no Makefile found") }) t.Run("make_build_fails", func(t *testing.T) { path := deployTestdataPath("wasm_make_fails", "wasm", "workflow.wasm") - _, err := CompileWorkflowToWasm(path, WorkflowCompileOptions{StripSymbols: true}) + _, err := CompileWorkflowToWasm(context.Background(), path, WorkflowCompileOptions{StripSymbols: true}) require.Error(t, err) assert.Contains(t, err.Error(), "failed to compile workflow") assert.Contains(t, err.Error(), "build output:") @@ -138,7 +139,7 @@ func TestCompileWorkflowToWasm_TS_Success(t *testing.T) { "include": ["main.ts"] } `), 0600)) - wasm, err := CompileWorkflowToWasm(mainPath, WorkflowCompileOptions{StripSymbols: true}) + wasm, err := CompileWorkflowToWasm(context.Background(), mainPath, WorkflowCompileOptions{StripSymbols: true}) if err != nil { t.Skipf("TS compile failed (published cre-sdk may lack full layout): %v", err) } diff --git a/cmd/common/fetch.go b/cmd/common/fetch.go index 5f8ee4f4..bc5b69e0 100644 --- a/cmd/common/fetch.go +++ b/cmd/common/fetch.go @@ -1,6 +1,7 @@ package common import ( + "context" "fmt" "io" "net/http" @@ -28,8 +29,13 @@ func IsURL(s string) bool { } // FetchURL performs an HTTP GET and returns the response body bytes. -func FetchURL(url string) ([]byte, error) { - resp, err := http.Get(url) //nolint:gosec,noctx +func FetchURL(ctx context.Context, url string) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("HTTP GET %s: %w", url, err) + } + + resp, err := http.DefaultClient.Do(req) //nolint:gosec if err != nil { return nil, fmt.Errorf("HTTP GET %s: %w", url, err) } diff --git a/cmd/common/fetch_test.go b/cmd/common/fetch_test.go index 10a6ade6..0291f3dd 100644 --- a/cmd/common/fetch_test.go +++ b/cmd/common/fetch_test.go @@ -1,6 +1,7 @@ package common import ( + "context" "net/http" "net/http/httptest" "testing" @@ -42,7 +43,7 @@ func TestFetchURL(t *testing.T) { })) defer srv.Close() - data, err := FetchURL(srv.URL) + data, err := FetchURL(context.Background(), srv.URL) require.NoError(t, err) assert.Equal(t, body, data) }) @@ -53,13 +54,13 @@ func TestFetchURL(t *testing.T) { })) defer srv.Close() - _, err := FetchURL(srv.URL) + _, err := FetchURL(context.Background(), srv.URL) require.Error(t, err) assert.Contains(t, err.Error(), "returned status 404") }) t.Run("unreachable host", func(t *testing.T) { - _, err := FetchURL("http://127.0.0.1:1") + _, err := FetchURL(context.Background(), "http://127.0.0.1:1") require.Error(t, err) }) } diff --git a/cmd/workflow/build/build.go b/cmd/workflow/build/build.go index f92f6973..63b79edd 100644 --- a/cmd/workflow/build/build.go +++ b/cmd/workflow/build/build.go @@ -1,6 +1,7 @@ package build import ( + "context" "fmt" "os" "path/filepath" @@ -26,7 +27,7 @@ func New(runtimeContext *runtime.Context) *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { outputPath, _ := cmd.Flags().GetString("output") skipTypeChecks, _ := cmd.Flags().GetBool(cmdcommon.SkipTypeChecksCLIFlag) - return execute(args[0], outputPath, skipTypeChecks) + return execute(cmd.Context(), args[0], outputPath, skipTypeChecks) }, } buildCmd.Flags().StringP("output", "o", "", "Output file path for the compiled WASM binary (default: /binary.wasm)") @@ -34,7 +35,7 @@ func New(runtimeContext *runtime.Context) *cobra.Command { return buildCmd } -func execute(workflowFolder, outputPath string, skipTypeChecks bool) error { +func execute(ctx context.Context, workflowFolder, outputPath string, skipTypeChecks bool) error { workflowDir, err := filepath.Abs(workflowFolder) if err != nil { return fmt.Errorf("resolve workflow folder: %w", err) @@ -60,7 +61,7 @@ func execute(workflowFolder, outputPath string, skipTypeChecks bool) error { outputPath = cmdcommon.EnsureWasmExtension(outputPath) ui.Dim("Compiling workflow...") - wasmBytes, err := cmdcommon.CompileWorkflowToWasm(resolvedPath, cmdcommon.WorkflowCompileOptions{ + wasmBytes, err := cmdcommon.CompileWorkflowToWasm(ctx, resolvedPath, cmdcommon.WorkflowCompileOptions{ StripSymbols: true, SkipTypeChecks: skipTypeChecks, }) diff --git a/cmd/workflow/deploy/artifacts.go b/cmd/workflow/deploy/artifacts.go index 070b1421..1bcb6467 100644 --- a/cmd/workflow/deploy/artifacts.go +++ b/cmd/workflow/deploy/artifacts.go @@ -1,6 +1,7 @@ package deploy import ( + "context" "fmt" "github.com/smartcontractkit/cre-cli/internal/client/graphqlclient" @@ -8,7 +9,11 @@ import ( "github.com/smartcontractkit/cre-cli/internal/ui" ) -func (h *handler) uploadArtifacts() error { +func (h *handler) uploadArtifacts(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return err + } + if h.workflowArtifact == nil { return fmt.Errorf("workflowArtifact is nil") } @@ -46,7 +51,7 @@ func (h *handler) uploadArtifacts() error { if !binaryFromURL { ui.Success(fmt.Sprintf("Loaded binary from: %s", h.inputs.OutputPath)) binaryResp, err := storageClient.UploadArtifactWithRetriesAndGetURL( - workflowID, storageclient.ArtifactTypeBinary, binaryData, "application/octet-stream") + ctx, workflowID, storageclient.ArtifactTypeBinary, binaryData, "application/octet-stream") if err != nil { return fmt.Errorf("uploading binary artifact: %w", err) } @@ -59,7 +64,7 @@ func (h *handler) uploadArtifacts() error { ui.Success(fmt.Sprintf("Loaded config from: %s", h.inputs.ConfigPath)) var err error configURL, err = storageClient.UploadArtifactWithRetriesAndGetURL( - workflowID, storageclient.ArtifactTypeConfig, configData, "text/plain") + ctx, workflowID, storageclient.ArtifactTypeConfig, configData, "text/plain") if err != nil { return fmt.Errorf("uploading config artifact: %w", err) } diff --git a/cmd/workflow/deploy/artifacts_test.go b/cmd/workflow/deploy/artifacts_test.go index 24833d9c..a70e6405 100644 --- a/cmd/workflow/deploy/artifacts_test.go +++ b/cmd/workflow/deploy/artifacts_test.go @@ -2,6 +2,7 @@ package deploy import ( //nolint:gosec + "context" "encoding/json" "errors" "io" @@ -99,7 +100,7 @@ func TestUpload_SuccessAndErrorCases(t *testing.T) { ConfigData: []byte("configdata"), WorkflowID: "workflow-id", } - err := h.uploadArtifacts() + err := h.uploadArtifacts(context.Background()) require.NoError(t, err) require.Equal(t, "http://origin/get", h.inputs.BinaryURL) require.Equal(t, "http://origin/get", *h.inputs.ConfigURL) @@ -110,12 +111,12 @@ func TestUpload_SuccessAndErrorCases(t *testing.T) { ConfigData: nil, WorkflowID: "workflow-id", } - err = h.uploadArtifacts() + err = h.uploadArtifacts(context.Background()) require.NoError(t, err) // Error: workflowArtifact is nil h.workflowArtifact = nil - err = h.uploadArtifacts() + err = h.uploadArtifacts(context.Background()) require.ErrorContains(t, err, "workflowArtifact is nil") // Error: empty BinaryData @@ -124,7 +125,7 @@ func TestUpload_SuccessAndErrorCases(t *testing.T) { ConfigData: []byte("configdata"), WorkflowID: "workflow-id", } - err = h.uploadArtifacts() + err = h.uploadArtifacts(context.Background()) require.ErrorContains(t, err, "uploading binary artifact: content is empty for artifactType BINARY") // Error: workflowID is empty @@ -133,7 +134,7 @@ func TestUpload_SuccessAndErrorCases(t *testing.T) { ConfigData: []byte("configdata"), WorkflowID: "", } - err = h.uploadArtifacts() + err = h.uploadArtifacts(context.Background()) require.ErrorContains(t, err, "workflowID is empty") } @@ -174,7 +175,7 @@ func TestUploadArtifactToStorageService_OriginError(t *testing.T) { ConfigData: []byte("configdata"), WorkflowID: "workflow-id", } - err := h.uploadArtifacts() + err := h.uploadArtifacts(context.Background()) require.ErrorContains(t, err, "upload to origin") } @@ -240,7 +241,7 @@ func TestUploadArtifactToStorageService_AlreadyExistsError(t *testing.T) { ConfigData: []byte("configdata"), WorkflowID: "workflow-id", } - err := h.uploadArtifacts() + err := h.uploadArtifacts(context.Background()) require.NoError(t, err) require.Equal(t, "http://origin/get", h.inputs.BinaryURL) require.Equal(t, "http://origin/get", *h.inputs.ConfigURL) @@ -291,7 +292,7 @@ func TestUpload_UsesResolvedWorkflowOwnerForPresignedUrls(t *testing.T) { WorkflowID: "workflow-id", } - err := h.uploadArtifacts() + err := h.uploadArtifacts(context.Background()) require.NoError(t, err) require.NotEmpty(t, ownersUsed) for _, owner := range ownersUsed { diff --git a/cmd/workflow/deploy/auto_link.go b/cmd/workflow/deploy/auto_link.go index 7e393dc3..e350033f 100644 --- a/cmd/workflow/deploy/auto_link.go +++ b/cmd/workflow/deploy/auto_link.go @@ -22,10 +22,10 @@ const ( ) // ensureOwnerLinkedOrFail checks if the owner is linked and attempts auto-link if needed -func (h *handler) ensureOwnerLinkedOrFail(onChain *settings.OnChainRegistry) error { +func (h *handler) ensureOwnerLinkedOrFail(ctx context.Context, onChain *settings.OnChainRegistry) error { ownerAddr := common.HexToAddress(h.inputs.WorkflowOwner) - linked, err := h.wrc.IsOwnerLinked(h.execCtx, ownerAddr) + linked, err := h.wrc.IsOwnerLinked(ctx, ownerAddr) if err != nil { return fmt.Errorf("failed to check owner link status: %w", err) } @@ -34,7 +34,7 @@ func (h *handler) ensureOwnerLinkedOrFail(onChain *settings.OnChainRegistry) err if linked { // Owner is linked on contract, now verify it's linked to the current user's account - linkedToCurrentUser, err := h.checkLinkStatusViaGraphQL(ownerAddr) + linkedToCurrentUser, err := h.checkLinkStatusViaGraphQL(ctx, ownerAddr) if err != nil { return fmt.Errorf("failed to validate key ownership: %w", err) } @@ -48,14 +48,14 @@ func (h *handler) ensureOwnerLinkedOrFail(onChain *settings.OnChainRegistry) err } ui.Dim(fmt.Sprintf("Owner not linked. Attempting auto-link: owner=%s", ownerAddr.Hex())) - if err := h.tryAutoLink(onChain); err != nil { + if err := h.tryAutoLink(ctx, onChain); err != nil { return fmt.Errorf("auto-link attempt failed: %w", err) } ui.Success(fmt.Sprintf("Auto-link successful: owner=%s", ownerAddr.Hex())) // Wait for linking process to complete - if err := h.waitForBackendLinkProcessing(ownerAddr); err != nil { + if err := h.waitForBackendLinkProcessing(ctx, ownerAddr); err != nil { return fmt.Errorf("linking process failed: %w", err) } @@ -63,17 +63,17 @@ func (h *handler) ensureOwnerLinkedOrFail(onChain *settings.OnChainRegistry) err } // autoLinkMSIGAndExit handles MSIG auto-link and exits if manual intervention is needed -func (h *handler) autoLinkMSIGAndExit(onChain *settings.OnChainRegistry) (halt bool, err error) { +func (h *handler) autoLinkMSIGAndExit(ctx context.Context, onChain *settings.OnChainRegistry) (halt bool, err error) { ownerAddr := common.HexToAddress(h.inputs.WorkflowOwner) - linked, err := h.wrc.IsOwnerLinked(h.execCtx, ownerAddr) + linked, err := h.wrc.IsOwnerLinked(ctx, ownerAddr) if err != nil { return false, fmt.Errorf("failed to check owner link status: %w", err) } if linked { // Owner is linked on contract, now verify it's linked to the current user's account - linkedToCurrentUser, err := h.checkLinkStatusViaGraphQL(ownerAddr) + linkedToCurrentUser, err := h.checkLinkStatusViaGraphQL(ctx, ownerAddr) if err != nil { return false, fmt.Errorf("failed to validate MSIG key ownership: %w", err) } @@ -89,7 +89,7 @@ func (h *handler) autoLinkMSIGAndExit(onChain *settings.OnChainRegistry) (halt b ui.Dim(fmt.Sprintf("MSIG workflow owner link status: owner=%s, linked=%v", ownerAddr.Hex(), linked)) ui.Dim(fmt.Sprintf("MSIG owner: attempting auto-link... owner=%s", ownerAddr.Hex())) - if err := h.tryAutoLink(onChain); err != nil { + if err := h.tryAutoLink(ctx, onChain); err != nil { return false, fmt.Errorf("MSIG auto-link attempt failed: %w", err) } @@ -98,7 +98,7 @@ func (h *handler) autoLinkMSIGAndExit(onChain *settings.OnChainRegistry) (halt b } // tryAutoLink executes the auto-link process using the link-key command -func (h *handler) tryAutoLink(onChain *settings.OnChainRegistry) error { +func (h *handler) tryAutoLink(ctx context.Context, onChain *settings.OnChainRegistry) error { rtx := &runtime.Context{ Settings: h.settings, Credentials: h.credentials, @@ -107,7 +107,7 @@ func (h *handler) tryAutoLink(onChain *settings.OnChainRegistry) error { EnvironmentSet: h.environmentSet, } - return linkkey.Exec(h.execCtx, rtx, linkkey.Inputs{ + return linkkey.Exec(ctx, rtx, linkkey.Inputs{ WorkflowOwner: h.inputs.WorkflowOwner, WorkflowRegistryContractAddress: onChain.Address(), WorkflowOwnerLabel: h.inputs.OwnerLabel, @@ -115,7 +115,7 @@ func (h *handler) tryAutoLink(onChain *settings.OnChainRegistry) error { } // checkLinkStatusViaGraphQL checks if the owner is linked and verified by querying the service -func (h *handler) checkLinkStatusViaGraphQL(ownerAddr common.Address) (bool, error) { +func (h *handler) checkLinkStatusViaGraphQL(ctx context.Context, ownerAddr common.Address) (bool, error) { const query = ` query { listWorkflowOwners(filters: { linkStatus: LINKED_ONLY }) { @@ -137,7 +137,7 @@ func (h *handler) checkLinkStatusViaGraphQL(ownerAddr common.Address) (bool, err } gql := graphqlclient.New(h.credentials, h.environmentSet, h.log) - if err := gql.Execute(context.Background(), req, &resp); err != nil { + if err := gql.Execute(ctx, req, &resp); err != nil { return false, fmt.Errorf("GraphQL query failed: %w", err) } @@ -169,7 +169,7 @@ func (h *handler) checkLinkStatusViaGraphQL(ownerAddr common.Address) (bool, err } // waitForBackendLinkProcessing polls the service until the link is processed -func (h *handler) waitForBackendLinkProcessing(ownerAddr common.Address) error { +func (h *handler) waitForBackendLinkProcessing(ctx context.Context, ownerAddr common.Address) error { const maxAttempts = 5 const retryDelay = 3 * time.Second const initialBlockWait = 36 * time.Second // Wait for 3 block confirmations (~12s per block) @@ -181,11 +181,15 @@ func (h *handler) waitForBackendLinkProcessing(ownerAddr common.Address) error { ui.Line() // Wait for 3 block confirmations before polling - time.Sleep(initialBlockWait) + select { + case <-time.After(initialBlockWait): + case <-ctx.Done(): + return ctx.Err() + } err := retry.Do( func() error { - linked, err := h.checkLinkStatusViaGraphQL(ownerAddr) + linked, err := h.checkLinkStatusViaGraphQL(ctx, ownerAddr) if err != nil { h.log.Warn().Err(err).Msg("Failed to check link status") return err // Return error to trigger retry @@ -199,6 +203,7 @@ func (h *handler) waitForBackendLinkProcessing(ownerAddr common.Address) error { retry.Delay(retryDelay), retry.DelayType(retry.FixedDelay), // Use fixed 3s delay between retries retry.LastErrorOnly(true), + retry.Context(ctx), retry.OnRetry(func(n uint, err error) { h.log.Debug().Uint("attempt", n+1).Uint("maxAttempts", maxAttempts).Err(err).Msg("Retrying link status check") ui.Dim(fmt.Sprintf(" Waiting for verification... (attempt %d/%d)", n+1, maxAttempts)) diff --git a/cmd/workflow/deploy/auto_link_test.go b/cmd/workflow/deploy/auto_link_test.go index e192ccfa..f12d767b 100644 --- a/cmd/workflow/deploy/auto_link_test.go +++ b/cmd/workflow/deploy/auto_link_test.go @@ -1,6 +1,7 @@ package deploy import ( + "context" "encoding/json" "net/http" "net/http/httptest" @@ -164,7 +165,7 @@ func TestCheckLinkStatusViaGraphQL(t *testing.T) { // Test the function ownerAddr := common.HexToAddress(tt.ownerAddress) - result, err := h.checkLinkStatusViaGraphQL(ownerAddr) + result, err := h.checkLinkStatusViaGraphQL(context.Background(), ownerAddr) if tt.expectError { assert.Error(t, err) @@ -335,7 +336,7 @@ func TestWaitForBackendLinkProcessing(t *testing.T) { // Test the function ownerAddr := common.HexToAddress(tt.ownerAddress) - err := h.waitForBackendLinkProcessing(ownerAddr) + err := h.waitForBackendLinkProcessing(context.Background(), ownerAddr) if tt.expectError { assert.Error(t, err) diff --git a/cmd/workflow/deploy/compile.go b/cmd/workflow/deploy/compile.go index ecb3c064..ffa42bdf 100644 --- a/cmd/workflow/deploy/compile.go +++ b/cmd/workflow/deploy/compile.go @@ -1,6 +1,7 @@ package deploy import ( + "context" "fmt" "os" @@ -9,7 +10,7 @@ import ( "github.com/smartcontractkit/cre-cli/internal/ui" ) -func (h *handler) Compile() error { +func (h *handler) Compile(ctx context.Context) error { if !h.validated { return fmt.Errorf("handler h.inputs not validated") } @@ -67,7 +68,7 @@ func (h *handler) Compile() error { h.runtimeContext.Workflow.Language = cmdcommon.GetWorkflowLanguage(workflowMainFile) } - wasmFile, err = cmdcommon.CompileWorkflowToWasm(resolvedWorkflowPath, cmdcommon.WorkflowCompileOptions{ + wasmFile, err = cmdcommon.CompileWorkflowToWasm(ctx, resolvedWorkflowPath, cmdcommon.WorkflowCompileOptions{ StripSymbols: true, SkipTypeChecks: h.inputs.SkipTypeChecks, }) diff --git a/cmd/workflow/deploy/compile_test.go b/cmd/workflow/deploy/compile_test.go index 149ac19a..b8abe6ee 100644 --- a/cmd/workflow/deploy/compile_test.go +++ b/cmd/workflow/deploy/compile_test.go @@ -1,6 +1,7 @@ package deploy import ( + "context" "encoding/base64" "errors" "io" @@ -266,12 +267,11 @@ func runCompile(simulatedEnvironment *chainsim.SimulatedEnvironment, inputs Inpu handler.settings = ctx.Settings handler.inputs = inputs - err := handler.ValidateInputs() - if err != nil { + if err := handler.ValidateInputs(); err != nil { return err } - return handler.Compile() + return handler.Compile(context.Background()) } // outputPathWithExtensions returns the path with .wasm.br.b64 appended as in Compile(). @@ -286,7 +286,7 @@ func outputPathWithExtensions(path string) string { // file content equals CompileWorkflowToWasm(workflowPath) + brotli + base64. func assertCompileOutputMatchesUnderlying(t *testing.T, simulatedEnvironment *chainsim.SimulatedEnvironment, inputs Inputs, ownerType string) { t.Helper() - wasm, err := cmdcommon.CompileWorkflowToWasm(inputs.WorkflowPath, cmdcommon.WorkflowCompileOptions{ + wasm, err := cmdcommon.CompileWorkflowToWasm(context.Background(), inputs.WorkflowPath, cmdcommon.WorkflowCompileOptions{ StripSymbols: true, SkipTypeChecks: inputs.SkipTypeChecks, }) @@ -435,7 +435,7 @@ func TestCompileWithWasmPath(t *testing.T) { handler.validated = true // Compile() with URL wasm should return nil (skips compile entirely). - err := handler.Compile() + err := handler.Compile(context.Background()) require.NoError(t, err) }) diff --git a/cmd/workflow/deploy/deploy.go b/cmd/workflow/deploy/deploy.go index 34f68f9b..11952c21 100644 --- a/cmd/workflow/deploy/deploy.go +++ b/cmd/workflow/deploy/deploy.go @@ -73,8 +73,6 @@ type handler struct { // existingWorkflowStatus stores the status of an existing workflow when updating. // nil means this is a new workflow, otherwise it contains the current status (0=active, 1=paused). existingWorkflowStatus *uint8 - - execCtx context.Context } var defaultOutputPath = "./binary.wasm.br.b64" @@ -211,8 +209,6 @@ func (h *handler) Execute(ctx context.Context) error { return fmt.Errorf("handler inputs not validated") } - h.execCtx = ctx - deployAccess, err := h.credentials.GetDeploymentAccessStatus() if err != nil { return fmt.Errorf("failed to check deployment access: %w", err) @@ -222,23 +218,27 @@ func (h *handler) Execute(ctx context.Context) error { return h.accessRequester.PromptAndSubmitRequest(ctx) } - adapter, err := newRegistryDeployStrategy(h.runtimeContext.ResolvedRegistry, h) + adapter, err := newRegistryDeployStrategy(ctx, h.runtimeContext.ResolvedRegistry, h) if err != nil { return err } - if err := h.prepareArtifacts(); err != nil { + if err := h.prepareArtifacts(ctx); err != nil { + return err + } + + if err := ctx.Err(); err != nil { return err } - if err := adapter.RunPreDeployChecks(); err != nil { + if err := adapter.RunPreDeployChecks(ctx); err != nil { if errors.Is(err, errDeployHalted) { return nil } return err } - exists, existingStatus, err := adapter.CheckWorkflowExists( + exists, existingStatus, err := adapter.CheckWorkflowExists(ctx, h.inputs.WorkflowOwner, h.inputs.WorkflowName, h.inputs.WorkflowTag, @@ -259,11 +259,11 @@ func (h *handler) Execute(ctx context.Context) error { ui.Line() ui.Dim("Uploading files...") - if err := h.uploadArtifacts(); err != nil { + if err := h.uploadArtifacts(ctx); err != nil { return fmt.Errorf("failed to upload workflow: %w", err) } - err = adapter.Upsert() + err = adapter.Upsert(ctx) if err == nil { warnIfPausedWorkflowUpdate(h.existingWorkflowStatus) } @@ -273,7 +273,11 @@ func (h *handler) Execute(ctx context.Context) error { // prepareArtifacts handles compile/fetch, artifact preparation, and hashing. // Artifact upload is deferred to the deploy service so it runs after any // existing-workflow update confirmation. -func (h *handler) prepareArtifacts() error { +func (h *handler) prepareArtifacts(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return err + } + workflowcommon.DisplayWorkflowDetails( h.settings, h.runtimeContext, @@ -285,14 +289,14 @@ func (h *handler) prepareArtifacts() error { if cmdcommon.IsURL(h.inputs.WasmPath) { h.inputs.BinaryURL = h.inputs.WasmPath ui.Dim("Fetching binary from URL for workflow ID computation...") - fetched, err := cmdcommon.FetchURL(h.inputs.WasmPath) + fetched, err := cmdcommon.FetchURL(ctx, h.inputs.WasmPath) if err != nil { return fmt.Errorf("failed to fetch binary from URL: %w", err) } h.urlBinaryData = fetched ui.Success(fmt.Sprintf("Using binary URL: %s", h.inputs.WasmPath)) } else { - if err := h.Compile(); err != nil { + if err := h.Compile(ctx); err != nil { return fmt.Errorf("failed to compile workflow: %w", err) } } @@ -302,7 +306,7 @@ func (h *handler) prepareArtifacts() error { h.inputs.ConfigURL = &url h.inputs.ConfigPath = "" ui.Dim("Fetching config from URL for workflow ID computation...") - fetched, err := cmdcommon.FetchURL(url) + fetched, err := cmdcommon.FetchURL(ctx, url) if err != nil { return fmt.Errorf("failed to fetch config from URL: %w", err) } diff --git a/cmd/workflow/deploy/private_registry_test.go b/cmd/workflow/deploy/private_registry_test.go index db0ed61b..098b669b 100644 --- a/cmd/workflow/deploy/private_registry_test.go +++ b/cmd/workflow/deploy/private_registry_test.go @@ -313,10 +313,9 @@ func TestCheckWorkflowExists_PrivateRegistry(t *testing.T) { defer gqlServer.Close() h.environmentSet.GraphQLURL = gqlServer.URL - h.execCtx = context.Background() strategy := newPrivateRegistryDeployStrategy(h) - exists, status, err := strategy.CheckWorkflowExists("", "jnowak-workflow-test-v5", "", tt.workflowID) + exists, status, err := strategy.CheckWorkflowExists(context.Background(), "", "jnowak-workflow-test-v5", "", tt.workflowID) if tt.wantErr { require.Error(t, err) if tt.errMsg != "" { diff --git a/cmd/workflow/deploy/register.go b/cmd/workflow/deploy/register.go index 29c6ac67..1d884bde 100644 --- a/cmd/workflow/deploy/register.go +++ b/cmd/workflow/deploy/register.go @@ -1,6 +1,7 @@ package deploy import ( + "context" "encoding/hex" "fmt" "time" @@ -14,7 +15,7 @@ import ( "github.com/smartcontractkit/cre-cli/internal/ui" ) -func (h *handler) upsert(onChain *settings.OnChainRegistry) error { +func (h *handler) upsert(ctx context.Context, onChain *settings.OnChainRegistry) error { if !h.validated { return fmt.Errorf("handler inputs not validated") } @@ -23,7 +24,7 @@ func (h *handler) upsert(onChain *settings.OnChainRegistry) error { if err != nil { return err } - return h.handleUpsert(params, onChain) + return h.handleUpsert(ctx, params, onChain) } func (h *handler) prepareUpsertParams() (client.RegisterWorkflowV2Parameters, error) { @@ -53,11 +54,11 @@ func (h *handler) prepareUpsertParams() (client.RegisterWorkflowV2Parameters, er }, nil } -func (h *handler) handleUpsert(params client.RegisterWorkflowV2Parameters, onChain *settings.OnChainRegistry) error { +func (h *handler) handleUpsert(ctx context.Context, params client.RegisterWorkflowV2Parameters, onChain *settings.OnChainRegistry) error { workflowName := h.inputs.WorkflowName workflowTag := h.inputs.WorkflowTag h.log.Debug().Interface("Workflow parameters", params).Msg("Registering workflow...") - txOut, err := h.wrc.UpsertWorkflow(h.execCtx, params) + txOut, err := h.wrc.UpsertWorkflow(ctx, params) if err != nil { return fmt.Errorf("failed to register workflow: %w", err) } diff --git a/cmd/workflow/deploy/register_test.go b/cmd/workflow/deploy/register_test.go index b039aaf5..60e296ea 100644 --- a/cmd/workflow/deploy/register_test.go +++ b/cmd/workflow/deploy/register_test.go @@ -56,18 +56,15 @@ func TestWorkflowUpsert(t *testing.T) { err = handler.ValidateInputs() require.NoError(t, err) - wfArt := workflowArtifact{ + handler.workflowArtifact = &workflowArtifact{ BinaryData: []byte("0x1234"), ConfigData: []byte("config"), WorkflowID: "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef", } - handler.workflowArtifact = &wfArt - handler.execCtx = context.Background() - onChain, err := settings.AsOnChain(ctx.ResolvedRegistry, "test") require.NoError(t, err) - err = handler.upsert(onChain) + err = handler.upsert(context.Background(), onChain) require.NoError(t, err) }) } diff --git a/cmd/workflow/deploy/registry_deploy_strategy.go b/cmd/workflow/deploy/registry_deploy_strategy.go index 34a5c431..419d4c41 100644 --- a/cmd/workflow/deploy/registry_deploy_strategy.go +++ b/cmd/workflow/deploy/registry_deploy_strategy.go @@ -1,6 +1,7 @@ package deploy import ( + "context" "errors" "github.com/smartcontractkit/cre-cli/internal/settings" @@ -22,23 +23,23 @@ type registryDeployStrategy interface { // RunPreDeployChecks validates readiness and runs registry-specific // prechecks (ownership linking, duplicate detection, etc.). // Return errDeployHalted to stop the deploy without returning an error. - RunPreDeployChecks() error + RunPreDeployChecks(ctx context.Context) error // CheckWorkflowExists returns whether a same-name workflow exists for this // registry target and includes the existing workflow status for updates. // When the existing workflow ID matches workflowID, exists is true and // errWorkflowUnchanged is returned to block redeployment of identical artifacts. - CheckWorkflowExists(workflowOwner, workflowName, workflowTag, workflowID string) (bool, *uint8, error) + CheckWorkflowExists(ctx context.Context, workflowOwner, workflowName, workflowTag, workflowID string) (bool, *uint8, error) // Upsert registers or updates the workflow in the target registry // and displays the result. - Upsert() error + Upsert(ctx context.Context) error } // newRegistryDeployStrategy returns the appropriate strategy for the given target. -func newRegistryDeployStrategy(resolvedRegistry settings.ResolvedRegistry, h *handler) (registryDeployStrategy, error) { +func newRegistryDeployStrategy(ctx context.Context, resolvedRegistry settings.ResolvedRegistry, h *handler) (registryDeployStrategy, error) { if resolvedRegistry.Type() == settings.RegistryTypeOffChain { return newPrivateRegistryDeployStrategy(h), nil } - return newOnchainRegistryDeployStrategy(h) + return newOnchainRegistryDeployStrategy(ctx, h) } diff --git a/cmd/workflow/deploy/registry_deploy_strategy_onchain.go b/cmd/workflow/deploy/registry_deploy_strategy_onchain.go index 032de838..cd1eafc8 100644 --- a/cmd/workflow/deploy/registry_deploy_strategy_onchain.go +++ b/cmd/workflow/deploy/registry_deploy_strategy_onchain.go @@ -1,6 +1,7 @@ package deploy import ( + "context" "fmt" "sync" @@ -23,7 +24,7 @@ type onchainRegistryDeployStrategy struct { initErr error } -func newOnchainRegistryDeployStrategy(h *handler) (*onchainRegistryDeployStrategy, error) { +func newOnchainRegistryDeployStrategy(ctx context.Context, h *handler) (*onchainRegistryDeployStrategy, error) { onChain, err := settings.AsOnChain(h.runtimeContext.ResolvedRegistry, "deploy") if err != nil { return nil, err @@ -33,7 +34,7 @@ func newOnchainRegistryDeployStrategy(h *handler) (*onchainRegistryDeployStrateg a.wg.Add(1) go func() { defer a.wg.Done() - wrc, err := h.clientFactory.NewWorkflowRegistryV2Client(h.execCtx) + wrc, err := h.clientFactory.NewWorkflowRegistryV2Client(ctx) if err != nil { a.initErr = fmt.Errorf("failed to create workflow registry client: %w", err) return @@ -44,10 +45,27 @@ func newOnchainRegistryDeployStrategy(h *handler) (*onchainRegistryDeployStrateg return a, nil } -func (a *onchainRegistryDeployStrategy) RunPreDeployChecks() error { +func waitWithContext(ctx context.Context, wg *sync.WaitGroup) error { + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (a *onchainRegistryDeployStrategy) RunPreDeployChecks(ctx context.Context) error { h := a.h - a.wg.Wait() + if err := waitWithContext(ctx, &a.wg); err != nil { + return err + } if a.initErr != nil { return a.initErr } @@ -55,7 +73,7 @@ func (a *onchainRegistryDeployStrategy) RunPreDeployChecks() error { ui.Line() ui.Dim("Verifying ownership...") if h.settings.Workflow.UserWorkflowSettings.WorkflowOwnerType == constants.WorkflowOwnerTypeMSIG { - halt, err := h.autoLinkMSIGAndExit(a.onChain) + halt, err := h.autoLinkMSIGAndExit(ctx, a.onChain) if err != nil { return fmt.Errorf("failed to check/handle MSIG owner link status: %w", err) } @@ -63,7 +81,7 @@ func (a *onchainRegistryDeployStrategy) RunPreDeployChecks() error { return errDeployHalted } } else { - if err := h.ensureOwnerLinkedOrFail(a.onChain); err != nil { + if err := h.ensureOwnerLinkedOrFail(ctx, a.onChain); err != nil { return err } } @@ -71,8 +89,8 @@ func (a *onchainRegistryDeployStrategy) RunPreDeployChecks() error { return nil } -func (a *onchainRegistryDeployStrategy) CheckWorkflowExists(workflowOwner, workflowName, workflowTag, workflowID string) (bool, *uint8, error) { - workflow, err := a.wrc.GetWorkflow(a.h.execCtx, common.HexToAddress(workflowOwner), workflowName, workflowTag) +func (a *onchainRegistryDeployStrategy) CheckWorkflowExists(ctx context.Context, workflowOwner, workflowName, workflowTag, workflowID string) (bool, *uint8, error) { + workflow, err := a.wrc.GetWorkflow(ctx, common.HexToAddress(workflowOwner), workflowName, workflowTag) if err != nil { return false, nil, err } @@ -88,11 +106,11 @@ func (a *onchainRegistryDeployStrategy) CheckWorkflowExists(workflowOwner, workf return false, nil, nil } -func (a *onchainRegistryDeployStrategy) Upsert() error { +func (a *onchainRegistryDeployStrategy) Upsert(ctx context.Context) error { h := a.h if err := checkUserDonLimitBeforeDeploy( - h.execCtx, + ctx, a.wrc, a.wrc, common.HexToAddress(h.inputs.WorkflowOwner), @@ -106,7 +124,7 @@ func (a *onchainRegistryDeployStrategy) Upsert() error { ui.Line() ui.Dim("Preparing deployment transaction...") - if err := h.upsert(a.onChain); err != nil { + if err := h.upsert(ctx, a.onChain); err != nil { return fmt.Errorf("failed to register workflow: %w", err) } return nil diff --git a/cmd/workflow/deploy/registry_deploy_strategy_private.go b/cmd/workflow/deploy/registry_deploy_strategy_private.go index 9909800e..1369fd8a 100644 --- a/cmd/workflow/deploy/registry_deploy_strategy_private.go +++ b/cmd/workflow/deploy/registry_deploy_strategy_private.go @@ -1,6 +1,7 @@ package deploy import ( + "context" "fmt" "strings" @@ -27,14 +28,14 @@ func (a *privateRegistryDeployStrategy) ensureClient() { } } -func (a *privateRegistryDeployStrategy) RunPreDeployChecks() error { +func (a *privateRegistryDeployStrategy) RunPreDeployChecks(_ context.Context) error { return nil } -func (a *privateRegistryDeployStrategy) CheckWorkflowExists(_, workflowName, _, workflowID string) (bool, *uint8, error) { +func (a *privateRegistryDeployStrategy) CheckWorkflowExists(ctx context.Context, _, workflowName, _, workflowID string) (bool, *uint8, error) { a.ensureClient() - workflow, err := a.prc.GetWorkflowByName(a.h.execCtx, workflowName) + workflow, err := a.prc.GetWorkflowByName(ctx, workflowName) if err == nil { if workflow.WorkflowID == workflowID { return true, offchainStatusToUint8(workflow.Status), fmt.Errorf("workflow with id %s is already registered and unchanged; re-deployment skipped: %w", workflowID, errWorkflowUnchanged) @@ -48,7 +49,7 @@ func (a *privateRegistryDeployStrategy) CheckWorkflowExists(_, workflowName, _, return false, nil, err } -func (a *privateRegistryDeployStrategy) Upsert() error { +func (a *privateRegistryDeployStrategy) Upsert(ctx context.Context) error { a.ensureClient() h := a.h @@ -57,7 +58,7 @@ func (a *privateRegistryDeployStrategy) Upsert() error { ui.Line() ui.Dim(fmt.Sprintf("Registering workflow in private registry (workflowID: %s)...", input.WorkflowID)) - result, err := a.prc.UpsertWorkflowInRegistry(a.h.execCtx, input) + result, err := a.prc.UpsertWorkflowInRegistry(ctx, input) if err != nil { return fmt.Errorf("failed to register workflow in private registry: %w", err) } diff --git a/cmd/workflow/hash/hash.go b/cmd/workflow/hash/hash.go index 49533870..7efdb434 100644 --- a/cmd/workflow/hash/hash.go +++ b/cmd/workflow/hash/hash.go @@ -1,6 +1,7 @@ package hash import ( + "context" "fmt" "os" "strings" @@ -62,7 +63,7 @@ func New(runtimeContext *runtime.Context) *cobra.Command { DerivedOwner: runtimeContext.DerivedWorkflowOwner, } - return Execute(inputs) + return Execute(cmd.Context(), inputs) }, } @@ -81,8 +82,8 @@ func New(runtimeContext *runtime.Context) *cobra.Command { return hashCmd } -func Execute(inputs Inputs) error { - rawBinary, err := loadBinary(inputs.WasmPath, inputs.WorkflowPath, inputs.SkipTypeChecks) +func Execute(ctx context.Context, inputs Inputs) error { + rawBinary, err := loadBinary(ctx, inputs.WasmPath, inputs.WorkflowPath, inputs.SkipTypeChecks) if err != nil { return err } @@ -92,7 +93,7 @@ func Execute(inputs Inputs) error { return fmt.Errorf("failed to compress binary: %w", err) } - config, err := loadConfig(inputs.ConfigPath) + config, err := loadConfig(ctx, inputs.ConfigPath) if err != nil { return err } @@ -190,11 +191,11 @@ func isPrivateRegistryID(deploymentRegistry string) bool { return strings.EqualFold(deploymentRegistry, "private") } -func loadBinary(wasmFlag, workflowPathFromSettings string, skipTypeChecks bool) ([]byte, error) { +func loadBinary(ctx context.Context, wasmFlag, workflowPathFromSettings string, skipTypeChecks bool) ([]byte, error) { if wasmFlag != "" { if cmdcommon.IsURL(wasmFlag) { ui.Dim("Fetching WASM binary from URL...") - data, err := cmdcommon.FetchURL(wasmFlag) + data, err := cmdcommon.FetchURL(ctx, wasmFlag) if err != nil { return nil, fmt.Errorf("failed to fetch WASM from URL: %w", err) } @@ -221,7 +222,7 @@ func loadBinary(wasmFlag, workflowPathFromSettings string, skipTypeChecks bool) spinner := ui.NewSpinner() spinner.Start("Compiling workflow...") - wasmBytes, err := cmdcommon.CompileWorkflowToWasm(resolvedWorkflowPath, cmdcommon.WorkflowCompileOptions{ + wasmBytes, err := cmdcommon.CompileWorkflowToWasm(ctx, resolvedWorkflowPath, cmdcommon.WorkflowCompileOptions{ StripSymbols: true, SkipTypeChecks: skipTypeChecks, }) @@ -235,13 +236,13 @@ func loadBinary(wasmFlag, workflowPathFromSettings string, skipTypeChecks bool) return wasmBytes, nil } -func loadConfig(configPath string) ([]byte, error) { +func loadConfig(ctx context.Context, configPath string) ([]byte, error) { if configPath == "" { return nil, nil } if cmdcommon.IsURL(configPath) { ui.Dim("Fetching config from URL...") - data, err := cmdcommon.FetchURL(configPath) + data, err := cmdcommon.FetchURL(ctx, configPath) if err != nil { return nil, fmt.Errorf("failed to fetch config from URL: %w", err) } diff --git a/cmd/workflow/hash/hash_test.go b/cmd/workflow/hash/hash_test.go index 0ed08a82..14809402 100644 --- a/cmd/workflow/hash/hash_test.go +++ b/cmd/workflow/hash/hash_test.go @@ -1,6 +1,7 @@ package hash import ( + "context" "crypto/sha256" "encoding/hex" "io" @@ -80,7 +81,7 @@ func TestExecute_WithForUser(t *testing.T) { WorkflowName: "test-workflow", } - err := Execute(inputs) + err := Execute(context.Background(), inputs) require.NoError(t, err) } @@ -94,7 +95,7 @@ func TestExecute_WithoutForUser_UsesPrivateKey(t *testing.T) { PrivateKey: testPrivateKey, } - err := Execute(inputs) + err := Execute(context.Background(), inputs) require.NoError(t, err) } @@ -107,7 +108,7 @@ func TestExecute_WithoutForUser_NoKey_Errors(t *testing.T) { WorkflowName: "test-workflow", } - err := Execute(inputs) + err := Execute(context.Background(), inputs) require.Error(t, err) assert.Contains(t, err.Error(), "--public_key") } @@ -173,7 +174,7 @@ func TestExecute_HashesAreDeterministic(t *testing.T) { "workflow ID should start with version byte 00") // Running Execute should succeed (hashes are printed via ui, verified above) - err = Execute(inputs) + err = Execute(context.Background(), inputs) require.NoError(t, err) } @@ -187,7 +188,7 @@ func TestExecute_EmptyConfig(t *testing.T) { WorkflowName: "test-workflow", } - err := Execute(inputs) + err := Execute(context.Background(), inputs) require.NoError(t, err) } @@ -201,7 +202,7 @@ func TestExecute_OffChainRequiresPublicKey(t *testing.T) { RegistryType: settings.RegistryTypeOffChain, } - err := Execute(inputs) + err := Execute(context.Background(), inputs) require.Error(t, err) assert.Contains(t, err.Error(), "--public_key") } @@ -218,7 +219,7 @@ func TestExecute_OffChainUsesPublicKey(t *testing.T) { DerivedOwner: testDerivedOwner, } - err := Execute(inputs) + err := Execute(context.Background(), inputs) require.NoError(t, err) } @@ -233,7 +234,7 @@ func TestExecute_OffChainUsesDerivedOwner(t *testing.T) { DerivedOwner: testDerivedOwner, } - err := Execute(inputs) + err := Execute(context.Background(), inputs) require.NoError(t, err) } diff --git a/cmd/workflow/simulate/simulate.go b/cmd/workflow/simulate/simulate.go index 7d0ca445..fe95ed2b 100644 --- a/cmd/workflow/simulate/simulate.go +++ b/cmd/workflow/simulate/simulate.go @@ -90,7 +90,7 @@ func New(runtimeContext *runtime.Context) *cobra.Command { if err != nil { return err } - return handler.Execute(inputs) + return handler.Execute(cmd.Context(), inputs) }, } @@ -252,14 +252,14 @@ func (h *handler) ValidateInputs(inputs Inputs) error { return nil } -func (h *handler) Execute(inputs Inputs) error { +func (h *handler) Execute(ctx context.Context, inputs Inputs) error { var wasmFileBinary []byte var err error if inputs.WasmPath != "" { if cmdcommon.IsURL(inputs.WasmPath) { ui.Dim("Fetching WASM binary from URL...") - wasmFileBinary, err = cmdcommon.FetchURL(inputs.WasmPath) + wasmFileBinary, err = cmdcommon.FetchURL(ctx, inputs.WasmPath) if err != nil { return fmt.Errorf("failed to fetch WASM from URL: %w", err) } @@ -298,7 +298,7 @@ func (h *handler) Execute(inputs Inputs) error { spinner := ui.NewSpinner() spinner.Start("Compiling workflow...") - wasmFileBinary, err = cmdcommon.CompileWorkflowToWasm(resolvedWorkflowPath, cmdcommon.WorkflowCompileOptions{ + wasmFileBinary, err = cmdcommon.CompileWorkflowToWasm(ctx, resolvedWorkflowPath, cmdcommon.WorkflowCompileOptions{ StripSymbols: false, SkipTypeChecks: inputs.SkipTypeChecks, }) @@ -343,7 +343,7 @@ func (h *handler) Execute(inputs Inputs) error { var config []byte if cmdcommon.IsURL(inputs.ConfigPath) { ui.Dim("Fetching config from URL...") - config, err = cmdcommon.FetchURL(inputs.ConfigPath) + config, err = cmdcommon.FetchURL(ctx, inputs.ConfigPath) if err != nil { return fmt.Errorf("failed to fetch config from URL: %w", err) } diff --git a/cmd/workflow/simulate/simulate_test.go b/cmd/workflow/simulate/simulate_test.go index 847d7ae2..4879b8f3 100644 --- a/cmd/workflow/simulate/simulate_test.go +++ b/cmd/workflow/simulate/simulate_test.go @@ -98,7 +98,7 @@ func TestBlankWorkflowSimulation(t *testing.T) { require.NoError(t, err) // Execute the simulation. We expect this to compile the workflow and run the simulator successfully. - err = handler.Execute(inputs) + err = handler.Execute(context.Background(), inputs) require.NoError(t, err, "Execute should not return an error") } diff --git a/internal/client/storageclient/storageclient.go b/internal/client/storageclient/storageclient.go index de16d790..9a798684 100644 --- a/internal/client/storageclient/storageclient.go +++ b/internal/client/storageclient/storageclient.go @@ -69,15 +69,15 @@ func (c *Client) SetHTTPTimeout(timeout time.Duration) { c.httpTimeout = timeout } -func (c *Client) CreateServiceContextWithTimeout() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.Background(), c.serviceTimeout) //nolint:gosec // G118 -- cancel is deferred by all callers +func (c *Client) CreateServiceContextWithTimeout(parent context.Context) (context.Context, context.CancelFunc) { + return context.WithTimeout(parent, c.serviceTimeout) //nolint:gosec // G118 -- cancel is deferred by all callers } -func (c *Client) CreateHttpContextWithTimeout() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.Background(), c.httpTimeout) //nolint:gosec // G118 -- cancel is deferred by all callers +func (c *Client) CreateHttpContextWithTimeout(parent context.Context) (context.Context, context.CancelFunc) { + return context.WithTimeout(parent, c.httpTimeout) //nolint:gosec // G118 -- cancel is deferred by all callers } -func (c *Client) GeneratePostUrlForArtifact(workflowId string, artifactType ArtifactType, content []byte) (GeneratePresignedPostUrlForArtifactResponse, error) { +func (c *Client) GeneratePostUrlForArtifact(ctx context.Context, workflowId string, artifactType ArtifactType, content []byte) (GeneratePresignedPostUrlForArtifactResponse, error) { const mutation = ` mutation GeneratePresignedPostUrlForArtifact($artifact: GeneratePresignedPostUrlRequest!) { generatePresignedPostUrlForArtifact(artifact: $artifact) { @@ -102,7 +102,7 @@ mutation GeneratePresignedPostUrlForArtifact($artifact: GeneratePresignedPostUrl GeneratePresignedPostUrlForArtifact GeneratePresignedPostUrlForArtifactResponse `json:"generatePresignedPostUrlForArtifact"` } - ctx, cancel := c.CreateServiceContextWithTimeout() + ctx, cancel := c.CreateServiceContextWithTimeout(ctx) defer cancel() if err := c.graphql. @@ -116,7 +116,7 @@ mutation GeneratePresignedPostUrlForArtifact($artifact: GeneratePresignedPostUrl return container.GeneratePresignedPostUrlForArtifact, nil } -func (c *Client) GenerateUnsignedGetUrlForArtifact(workflowId string, artifactType ArtifactType) (GenerateUnsignedGetUrlForArtifactResponse, error) { +func (c *Client) GenerateUnsignedGetUrlForArtifact(ctx context.Context, workflowId string, artifactType ArtifactType) (GenerateUnsignedGetUrlForArtifactResponse, error) { const mutation = ` mutation GenerateUnsignedGetUrlForArtifact($artifact: GenerateUnsignedGetUrlRequest!) { generateUnsignedGetUrlForArtifact(artifact: $artifact) { @@ -134,7 +134,7 @@ mutation GenerateUnsignedGetUrlForArtifact($artifact: GenerateUnsignedGetUrlRequ GenerateUnsignedGetUrlForArtifact GenerateUnsignedGetUrlForArtifactResponse `json:"generateUnsignedGetUrlForArtifact"` } - ctx, cancel := c.CreateServiceContextWithTimeout() + ctx, cancel := c.CreateServiceContextWithTimeout(ctx) defer cancel() if err := c.graphql. @@ -154,7 +154,7 @@ func calculateContentHash(content []byte) string { return contentHash } -func (c *Client) UploadToOrigin(g GeneratePresignedPostUrlForArtifactResponse, content []byte, contentType string) error { +func (c *Client) UploadToOrigin(ctx context.Context, g GeneratePresignedPostUrlForArtifactResponse, content []byte, contentType string) error { c.log.Debug().Str("URL", g.PresignedPostURL).Msg("Uploading content to origin") var b bytes.Buffer @@ -197,7 +197,7 @@ func (c *Client) UploadToOrigin(g GeneratePresignedPostUrlForArtifactResponse, c return err } - ctx, cancel := c.CreateHttpContextWithTimeout() + ctx, cancel := c.CreateHttpContextWithTimeout(ctx) defer cancel() httpReq, err := http.NewRequestWithContext(ctx, "POST", g.PresignedPostURL, &b) @@ -231,10 +231,14 @@ func (c *Client) UploadToOrigin(g GeneratePresignedPostUrlForArtifactResponse, c } func (c *Client) UploadArtifactWithRetriesAndGetURL( + ctx context.Context, workflowID string, artifactType ArtifactType, content []byte, contentType string) (GenerateUnsignedGetUrlForArtifactResponse, error) { + if err := ctx.Err(); err != nil { + return GenerateUnsignedGetUrlForArtifactResponse{}, err + } if len(workflowID) == 0 { return GenerateUnsignedGetUrlForArtifactResponse{}, fmt.Errorf("workflowID is empty") } @@ -251,7 +255,7 @@ func (c *Client) UploadArtifactWithRetriesAndGetURL( err := retry.Do( func() error { var err error - g, err = c.GeneratePostUrlForArtifact(workflowID, artifactType, content) + g, err = c.GeneratePostUrlForArtifact(ctx, workflowID, artifactType, content) if err != nil { if strings.Contains(err.Error(), "already exists") { shouldUpload = false @@ -264,6 +268,7 @@ func (c *Client) UploadArtifactWithRetriesAndGetURL( }, retry.Attempts(3), retry.LastErrorOnly(true), + retry.Context(ctx), ) if err != nil { c.log.Error().Err(err).Msg("Failed to generate presigned post URL for artifact") @@ -276,10 +281,11 @@ func (c *Client) UploadArtifactWithRetriesAndGetURL( if shouldUpload { err = retry.Do( func() error { - return c.UploadToOrigin(g, content, contentType) + return c.UploadToOrigin(ctx, g, content, contentType) }, retry.Attempts(3), retry.LastErrorOnly(true), + retry.Context(ctx), ) if err != nil { c.log.Error().Err(err).Msg("Failed to upload content to origin") @@ -290,7 +296,7 @@ func (c *Client) UploadArtifactWithRetriesAndGetURL( var g2 GenerateUnsignedGetUrlForArtifactResponse err = retry.Do( func() error { - g2, err = c.GenerateUnsignedGetUrlForArtifact(workflowID, artifactType) + g2, err = c.GenerateUnsignedGetUrlForArtifact(ctx, workflowID, artifactType) if err != nil { return fmt.Errorf("generate unsigned get url: %w", err) } @@ -298,6 +304,7 @@ func (c *Client) UploadArtifactWithRetriesAndGetURL( }, retry.Attempts(3), retry.LastErrorOnly(true), + retry.Context(ctx), ) if err != nil { c.log.Error().Err(err).Msg("Failed to generate unsigned get URL for artifact")