Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 135 additions & 2 deletions control-plane/internal/cli/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func NewExecutionCommand() *cobra.Command {
cmd.AddCommand(newCancelExecutionCommand())
cmd.AddCommand(newPauseExecutionCommand())
cmd.AddCommand(newResumeExecutionCommand())
cmd.AddCommand(newRestartExecutionCommand())
return cmd
}

Expand All @@ -32,6 +33,11 @@ type executionActionOptions struct {
timeout time.Duration
jsonOutput bool
reason string
scope string
reuse string
fork bool
model string
input string
}

func defaultExecutionActionOptions() executionActionOptions {
Expand All @@ -42,6 +48,44 @@ func defaultExecutionActionOptions() executionActionOptions {
}
}

func newRestartExecutionCommand() *cobra.Command {
opts := defaultExecutionActionOptions()
opts.scope = "workflow"
opts.reuse = "succeeded-before"

cmd := &cobra.Command{
Use: "restart <execution_id>",
Short: "Restart a workflow from an execution point",
Long: "Start a new run from an existing execution. By default, restarts the containing workflow and reuses successful app.call outputs before that point.",
Args: cobra.ExactArgs(1),
RunE: func(_ *cobra.Command, args []string) error {
body, err := buildRestartExecutionBody(opts)
if err != nil {
return err
}
_, err = runExecutionAction(executionActionConfig{
actionName: "restart",
successVerb: "restarted",
endpoint: "/api/v1/executions/%s/restart",
opts: &opts,
executionID: args[0],
withBody: true,
body: body,
})
return err
},
}

cmd.Flags().StringVar(&opts.scope, "scope", opts.scope, "Restart scope: workflow or execution")
cmd.Flags().StringVar(&opts.reuse, "reuse", opts.reuse, "Replay reuse mode: succeeded-before, all-succeeded, or none")
cmd.Flags().BoolVar(&opts.fork, "fork", false, "Mark this restart as a fork with intentional changes")
cmd.Flags().StringVar(&opts.model, "model", "", "Model override to send in restart context")
cmd.Flags().StringVar(&opts.input, "input", "", "JSON input override or @path to a JSON file")
cmd.Flags().StringVar(&opts.reason, "reason", "", "Reason for restarting the execution")
bindExecutionActionFlags(cmd, &opts)
return cmd
}

func newCancelExecutionCommand() *cobra.Command {
opts := defaultExecutionActionOptions()

Expand Down Expand Up @@ -129,6 +173,8 @@ type executionActionConfig struct {
opts *executionActionOptions
executionID string
withReason bool
withBody bool
body map[string]interface{}
}

func runExecutionAction(cfg executionActionConfig) (map[string]any, error) {
Expand All @@ -139,7 +185,25 @@ func runExecutionAction(cfg executionActionConfig) (map[string]any, error) {
server = strings.TrimSuffix(server, "/")

var bodyBytes []byte
if cfg.withReason {
if cfg.withBody {
payload := map[string]interface{}{}
for key, value := range cfg.body {
switch typed := value.(type) {
case string:
if strings.TrimSpace(typed) != "" {
payload[key] = typed
}
case nil:
default:
payload[key] = value
}
}
encoded, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("encode payload: %w", err)
}
bodyBytes = encoded
} else if cfg.withReason {
payload := map[string]string{}
if strings.TrimSpace(cfg.opts.reason) != "" {
payload["reason"] = cfg.opts.reason
Expand All @@ -165,7 +229,7 @@ func runExecutionAction(cfg executionActionConfig) (map[string]any, error) {
if err != nil {
return nil, fmt.Errorf("build request: %w", err)
}
if cfg.withReason {
if cfg.withReason || cfg.withBody {
req.Header.Set("Content-Type", "application/json")
}
if cfg.opts.token != "" {
Expand Down Expand Up @@ -200,9 +264,78 @@ func runExecutionAction(cfg executionActionConfig) (map[string]any, error) {
return parsed, nil
}

func buildRestartExecutionBody(opts executionActionOptions) (map[string]interface{}, error) {
body := map[string]interface{}{
"scope": opts.scope,
"reuse": opts.reuse,
}
if strings.TrimSpace(opts.reason) != "" {
body["reason"] = opts.reason
}
if opts.fork {
body["fork"] = true
}
context := map[string]interface{}{}
if strings.TrimSpace(opts.model) != "" {
context["model"] = strings.TrimSpace(opts.model)
}
if len(context) > 0 {
body["context"] = context
}
if strings.TrimSpace(opts.input) != "" {
input, err := parseRestartInput(opts.input)
if err != nil {
return nil, err
}
body["input"] = input
}
return body, nil
}

func parseRestartInput(value string) (map[string]interface{}, error) {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return nil, nil
}
raw := []byte(trimmed)
if strings.HasPrefix(trimmed, "@") {
path := strings.TrimSpace(strings.TrimPrefix(trimmed, "@"))
if path == "" {
return nil, fmt.Errorf("--input @path requires a file path")
}
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("read input file %q: %w", path, err)
}
raw = data
}
var input map[string]interface{}
if err := json.Unmarshal(raw, &input); err != nil {
return nil, fmt.Errorf("parse --input JSON: %w", err)
}
return input, nil
}

func printExecutionActionHumanOutput(parsed map[string]any, successVerb string) {
executionID, _ := parsed["execution_id"].(string)
previousStatus, _ := parsed["previous_status"].(string)
newRunID, _ := parsed["run_id"].(string)
sourceRunID, _ := parsed["source_run_id"].(string)
sourceExecutionID, _ := parsed["source_execution_id"].(string)
reuse, _ := parsed["replay_mode"].(string)

if successVerb == "restarted" && newRunID != "" {
if sourceRunID != "" && sourceExecutionID != "" {
fmt.Printf("Restarted run %s from %s\n", sourceRunID, sourceExecutionID)
} else if executionID != "" {
fmt.Printf("Execution %s restarted\n", executionID)
}
fmt.Printf("New run: %s\n", newRunID)
if reuse != "" {
fmt.Printf("Reuse: %s\n", reuse)
}
return
}

if executionID != "" && previousStatus != "" {
fmt.Printf("Execution %s %s (was: %s)\n", executionID, successVerb, previousStatus)
Expand Down
52 changes: 52 additions & 0 deletions control-plane/internal/cli/execution_additional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"time"

Expand Down Expand Up @@ -220,3 +221,54 @@ func TestResumeExecutionCommand(t *testing.T) {
})
require.Contains(t, output, "Execution ex-9 resumed")
}

func TestRestartExecutionCommandPostsRestartBody(t *testing.T) {
inputFile := filepath.Join(t.TempDir(), "input.json")
require.NoError(t, os.WriteFile(inputFile, []byte(`{"topic":"restart"}`), 0o644))

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/api/v1/executions/ex-10/restart", r.URL.Path)
require.Equal(t, "Bearer restart-token", r.Header.Get("Authorization"))
require.Equal(t, "application/json", r.Header.Get("Content-Type"))
var body map[string]any
require.NoError(t, json.NewDecoder(r.Body).Decode(&body))
require.Equal(t, "workflow", body["scope"])
require.Equal(t, "none", body["reuse"])
require.Equal(t, true, body["fork"])
require.Equal(t, "try another model", body["reason"])
require.Equal(t, map[string]any{"model": "google/gemini-3.1-flash-lite"}, body["context"])
require.Equal(t, map[string]any{"topic": "restart"}, body["input"])
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"execution_id":"new-root",
"run_id":"run-new",
"source_run_id":"run-old",
"source_execution_id":"ex-10",
"replay_mode":"none"
}`))
}))
defer server.Close()

cmd := newRestartExecutionCommand()
cmd.SetArgs([]string{
"ex-10",
"--server", server.URL,
"--token", "restart-token",
"--reuse", "none",
"--fork",
"--model", "google/gemini-3.1-flash-lite",
"--reason", "try another model",
"--input", "@" + inputFile,
})

output := captureOutput(t, func() {
require.NoError(t, cmd.Execute())
})
require.Contains(t, output, "Restarted run run-old from ex-10")
require.Contains(t, output, "New run: run-new")
require.Contains(t, output, "Reuse: none")

cmd = newRestartExecutionCommand()
cmd.SetArgs([]string{"ex-10", "--input", "@"})
require.ErrorContains(t, cmd.Execute(), "--input @path requires a file path")
}
Loading
Loading