Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions definition/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ func (d TaskDef) Equals(otherDef TaskDef) bool {
return true
}

// OnErrorTaskDef is a special task definition to be executed solely if an error occurs during "normal" task handling.
type OnErrorTaskDef struct {
// Script is a list of shell commands that are executed if an error occurs in a "normal" task
Script []string `yaml:"script"`

// Env sets/overrides environment variables for this task (takes precedence over pipeline environment)
Env map[string]string `yaml:"env"`
}

type PipelineDef struct {
// Concurrency declares how many instances of this pipeline are allowed to execute concurrently (defaults to 1)
Concurrency int `yaml:"concurrency"`
Expand All @@ -60,8 +69,19 @@ type PipelineDef struct {
// Env sets/overrides environment variables for all tasks (takes precedence over process environment)
Env map[string]string `yaml:"env"`

// Tasks is a map of task names to task definitions
Tasks map[string]TaskDef `yaml:"tasks"`

// Task to be executed if this pipeline fails, e.g. for notifications.
//
// In this task, you have the following variables set:
// - failedTaskName: Name of the failed task (key from pipelines.yml)
// - failedTaskExitCode: Exit code of the failed task
// - failedTaskError: Error message of the failed task
// - failedTaskStdout: Stdout of the failed task
// - failedTaskStderr: Stderr of the failed task
OnError *OnErrorTaskDef `yaml:"onError"`

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you agree to rename this to on_error to be consistent with the other fields?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes definitely!!


// SourcePath stores the source path where the pipeline was defined
SourcePath string
}
Expand Down
178 changes: 174 additions & 4 deletions prunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package prunner
import (
"context"
"fmt"
"io"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -41,7 +42,7 @@ type PipelineRunner struct {
// externally, call requestPersist()
persistRequests chan struct{}

// Mutex for reading or writing jobs and job state
// Mutex for reading or writing pipeline definitions (defs), jobs and job state
mx sync.RWMutex
createTaskRunner func(j *PipelineJob) taskctl.Runner

Expand Down Expand Up @@ -103,7 +104,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat
// It can be scheduled (in the waitListByPipeline of PipelineRunner),
// or currently running (jobsByID / jobsByPipeline in PipelineRunner).
type PipelineJob struct {
ID uuid.UUID
ID uuid.UUID
// Identifier of the pipeline (from the YAML file)
Pipeline string
Env map[string]string
Variables map[string]interface{}
Expand Down Expand Up @@ -194,6 +196,10 @@ var ErrJobNotFound = errors.New("job not found")
var errJobAlreadyCompleted = errors.New("job is already completed")
var ErrShuttingDown = errors.New("runner is shutting down")

// ScheduleAsync schedules a pipeline execution, if pipeline concurrency config allows for it.
// "pipeline" is the pipeline ID from the YAML file.
//
// the returned PipelineJob is the individual execution context.
func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*PipelineJob, error) {
r.mx.Lock()
defer r.mx.Unlock()
Expand Down Expand Up @@ -398,11 +404,169 @@ func (r *PipelineRunner) startJob(job *PipelineJob) {
go func() {
defer r.wg.Done()
lastErr := job.sched.Schedule(graph)
if lastErr != nil {
r.RunJobErrorHandler(job)
}
r.JobCompleted(job.ID, lastErr)
}()
}
func (r *PipelineRunner) RunJobErrorHandler(job *PipelineJob) {
errorGraph, err := r.buildErrorGraph(job)
if err != nil {
log.
WithError(err).
WithField("jobID", job.ID).
WithField("pipeline", job.Pipeline).
Error("Failed to build error pipeline graph")
// At this point, an error with the error handling happened - duh...
// Nothing we can do at this point.
return
}

// HandleTaskChange will be called when the task state changes in the task runner
// if errorGraph is nil (and no error); no error handling configured for task.
if errorGraph != nil {
// re-init scheduler, as we need a new one to schedule the error on. (the old one is already shut down
// if ContinueRunningTasksAfterFailure == false)
r.mx.Lock()
r.initScheduler(job)
r.mx.Unlock()

err = job.sched.Schedule(errorGraph)

if err != nil {
log.
WithError(err).
WithField("jobID", job.ID).
WithField("pipeline", job.Pipeline).
Error("Failed to run error handling for job")
} else {
log.
WithField("jobID", job.ID).
WithField("pipeline", job.Pipeline).
Info("error handling completed")
}
}
}

const OnErrorTaskName = "on_error"

func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.ExecutionGraph, error) {
Comment thread
hlubek marked this conversation as resolved.
r.mx.RLock()
defer r.mx.RUnlock()
pipelineDef, pipelineDefExists := r.defs.Pipelines[job.Pipeline]
if !pipelineDefExists {
return nil, fmt.Errorf("pipeline definition not found for pipeline %s (should never happen)", job.Pipeline)
}
onErrorTaskDef := pipelineDef.OnError
if onErrorTaskDef == nil {
// no error, but no error handling configured
return nil, nil
}

// we assume the 1st failed task (by end date) is the root cause, because this triggered a cascading abort then.
failedTask := findFirstFailedTaskByEndDate(job.Tasks)

failedTaskStdout := r.readTaskOutputBestEffort(job, failedTask, "stdout")
failedTaskStderr := r.readTaskOutputBestEffort(job, failedTask, "stderr")

onErrorVariables := make(map[string]interface{})
for key, value := range job.Variables {
onErrorVariables[key] = value
}
// TODO: find first failed task (by End Date)

if failedTask != nil {
onErrorVariables["failedTaskName"] = failedTask.Name
onErrorVariables["failedTaskExitCode"] = failedTask.ExitCode
onErrorVariables["failedTaskError"] = failedTask.Error
onErrorVariables["failedTaskStdout"] = string(failedTaskStdout)
onErrorVariables["failedTaskStderr"] = string(failedTaskStderr)
} else {
onErrorVariables["failedTaskName"] = "task_not_identified_should_not_happen"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this can really happen and we should just not set these variables.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok fine with me :)

onErrorVariables["failedTaskExitCode"] = "99"
onErrorVariables["failedTaskError"] = "task_not_identified_should_not_happen"
onErrorVariables["failedTaskStdout"] = "task_not_identified_should_not_happen"
onErrorVariables["failedTaskStderr"] = "task_not_identified_should_not_happen"
}

onErrorJobTask := jobTask{
TaskDef: definition.TaskDef{
Script: onErrorTaskDef.Script,
// AllowFailure needs to be false, otherwise lastError below won't be filled (so errors will not appear in the log)
AllowFailure: false,
Env: onErrorTaskDef.Env,
},
Name: OnErrorTaskName,
Status: toStatus(scheduler.StatusWaiting),
}
job.Tasks = append(job.Tasks, onErrorJobTask)
Comment thread
hlubek marked this conversation as resolved.

return buildPipelineGraph(job.ID, jobTasks{onErrorJobTask}, onErrorVariables)
}

func (r *PipelineRunner) readTaskOutputBestEffort(job *PipelineJob, task *jobTask, outputName string) []byte {
if task == nil || job == nil {
return []byte(nil)
}

rc, err := r.outputStore.Reader(job.ID.String(), task.Name, outputName)
if err != nil {
log.
WithField("component", "runner").
WithField("jobID", job.ID.String()).
WithField("pipeline", job.Pipeline).
WithField("failedTaskName", task.Name).
WithField("outputName", outputName).
WithError(err).
Debug("Could not create stderrReader for failed task")
return []byte(nil)
} else {
defer func(rc io.ReadCloser) {
_ = rc.Close()
}(rc)
outputAsBytes, err := io.ReadAll(rc)
if err != nil {
log.
WithField("component", "runner").
WithField("jobID", job.ID.String()).
WithField("pipeline", job.Pipeline).
WithField("failedTaskName", task.Name).
WithField("outputName", outputName).
WithError(err).
Debug("Could not read output of task")
}

return outputAsBytes
}

}

// FindFirstFailedTaskByEndDate returns the first failed task ordered by End Date
// A task is considered failed if it has errored or has a non-zero exit code
func findFirstFailedTaskByEndDate(tasks jobTasks) *jobTask {
Comment thread
hlubek marked this conversation as resolved.
Outdated
var firstFailedTask *jobTask

for i := range tasks {
task := &tasks[i]

// Check if the task failed (has an error or non-zero exit code)
if task.Errored {
// If this is our first failed task or this one ended earlier than our current earliest
if firstFailedTask == nil {
// we did not see any failed task yet. remember this one as the 1st failed task.
firstFailedTask = task
} else if firstFailedTask.End != nil && task.End != nil && task.End.Before(*firstFailedTask.End) {
// this task has failed EARLIER than the one we already remembered.
firstFailedTask = task
}
}
}

return firstFailedTask
}

// HandleTaskChange will be called when the task state changes in the task runner (taskctl)
// it is short-lived and updates our JobTask state accordingly.
func (r *PipelineRunner) HandleTaskChange(t *task.Task) {
r.mx.Lock()
defer r.mx.Unlock()
Expand Down Expand Up @@ -437,11 +601,16 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) {
jt.Error = t.Error
}

// if the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is set to FALSE),
// If the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is false),
// then we directly abort all other tasks of the job.
// NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only
// if one task failed, and we want to kill the other tasks.
if jt.Errored {
if jt.End == nil {
Comment thread
hlubek marked this conversation as resolved.
Outdated
// Remember ending time in case of error (we need this to identify the correct onError hook)
now := time.Now()
jt.End = &now
}
pipelineDef, found := r.defs.Pipelines[j.Pipeline]
if found && !pipelineDef.ContinueRunningTasksAfterFailure {
log.
Expand Down Expand Up @@ -810,6 +979,7 @@ func (r *PipelineRunner) Shutdown(ctx context.Context) error {
// Wait for all running jobs to have called JobCompleted
r.wg.Wait()

// TODO This is not safe to do outside of the requestPersist loop, since we might have a save in progress. So we need to wait until the save loop is finished before calling SaveToStore.
Comment thread
hlubek marked this conversation as resolved.
Outdated
// Do a final save to include the state of recently completed jobs
r.SaveToStore()
}()
Expand Down
Loading
Loading