From 2f5cdb929765eb7cd2662b470cc59071012d5bb1 Mon Sep 17 00:00:00 2001 From: Zettat123 Date: Sun, 5 Apr 2026 18:39:08 -0600 Subject: [PATCH 01/33] add run_attempt --- models/actions/artifact.go | 36 +- models/actions/run.go | 63 ++- models/actions/run_attempt.go | 118 +++++ models/actions/run_attempt_list.go | 75 ++++ models/actions/run_job.go | 122 +++-- models/actions/run_job_list.go | 4 + models/actions/run_list.go | 6 - models/actions/task.go | 1 - models/migrations/migrations.go | 1 + models/migrations/v1_26/v331.go | 113 +++++ options/locale/locale_en-US.json | 2 + routers/api/actions/artifacts.go | 13 +- routers/api/actions/artifacts_chunks.go | 3 +- routers/api/actions/artifactsv4.go | 17 +- routers/api/v1/repo/action.go | 2 +- routers/common/actions.go | 5 +- routers/web/repo/actions/view.go | 422 ++++++++++++------ routers/web/web.go | 5 + services/actions/approve.go | 68 +++ services/actions/cleanup.go | 6 +- services/actions/clear_tasks.go | 40 +- services/actions/concurrency.go | 23 +- services/actions/context.go | 31 +- services/actions/job_emitter.go | 151 ++++--- services/actions/rerun.go | 194 +------- services/actions/rerun_plan.go | 296 ++++++++++++ services/actions/rerun_test.go | 61 +-- services/actions/run.go | 64 ++- services/actions/task.go | 6 +- services/convert/convert.go | 11 +- templates/repo/actions/view.tmpl | 3 +- templates/repo/actions/view_component.tmpl | 7 +- tests/integration/actions_concurrency_test.go | 130 ++++-- tests/integration/actions_rerun_test.go | 5 +- tests/integration/actions_route_test.go | 2 +- tests/integration/api_actions_run_test.go | 28 +- web_src/js/components/ActionRunJobView.vue | 6 +- web_src/js/components/ActionRunView.ts | 10 +- web_src/js/components/RepoActionView.vue | 177 +++++++- web_src/js/features/repo-actions.ts | 6 +- web_src/js/modules/gitea-actions.ts | 18 + 41 files changed, 1712 insertions(+), 639 deletions(-) create mode 100644 models/actions/run_attempt.go create mode 100644 models/actions/run_attempt_list.go create mode 100644 models/migrations/v1_26/v331.go create mode 100644 services/actions/approve.go create mode 100644 services/actions/rerun_plan.go diff --git a/models/actions/artifact.go b/models/actions/artifact.go index d61afb2aed47b..47115c577c4e0 100644 --- a/models/actions/artifact.go +++ b/models/actions/artifact.go @@ -61,7 +61,8 @@ const ( // ActionArtifact is a file that is stored in the artifact storage. type ActionArtifact struct { ID int64 `xorm:"pk autoincr"` - RunID int64 `xorm:"index unique(runid_name_path)"` // The run id of the artifact + RunID int64 `xorm:"index unique(runid_attempt_name_path)"` // The run id of the artifact + RunAttemptID int64 `xorm:"index unique(runid_attempt_name_path) NOT NULL DEFAULT 0"` RunnerID int64 RepoID int64 `xorm:"index"` OwnerID int64 @@ -80,9 +81,9 @@ type ActionArtifact struct { // * "application/pdf", "text/html", etc.: real content type of the artifact ContentEncodingOrType string `xorm:"content_encoding"` - ArtifactPath string `xorm:"index unique(runid_name_path)"` // The path to the artifact when runner uploads it - ArtifactName string `xorm:"index unique(runid_name_path)"` // The name of the artifact when runner uploads it - Status ArtifactStatus `xorm:"index"` // The status of the artifact, uploading, expired or need-delete + ArtifactPath string `xorm:"index unique(runid_attempt_name_path)"` // The path to the artifact when runner uploads it + ArtifactName string `xorm:"index unique(runid_attempt_name_path)"` // The name of the artifact when runner uploads it + Status ArtifactStatus `xorm:"index"` // The status of the artifact, uploading, expired or need-delete CreatedUnix timeutil.TimeStamp `xorm:"created"` UpdatedUnix timeutil.TimeStamp `xorm:"updated index"` ExpiredUnix timeutil.TimeStamp `xorm:"index"` // The time when the artifact will be expired @@ -92,12 +93,13 @@ func CreateArtifact(ctx context.Context, t *ActionTask, artifactName, artifactPa if err := t.LoadJob(ctx); err != nil { return nil, err } - artifact, err := getArtifactByNameAndPath(ctx, t.Job.RunID, artifactName, artifactPath) + artifact, err := getArtifactByNameAndPath(ctx, t.Job.RunID, t.Job.RunAttemptID, artifactName, artifactPath) if errors.Is(err, util.ErrNotExist) { artifact := &ActionArtifact{ ArtifactName: artifactName, ArtifactPath: artifactPath, RunID: t.Job.RunID, + RunAttemptID: t.Job.RunAttemptID, RunnerID: t.RunnerID, RepoID: t.RepoID, OwnerID: t.OwnerID, @@ -122,9 +124,9 @@ func CreateArtifact(ctx context.Context, t *ActionTask, artifactName, artifactPa return artifact, nil } -func getArtifactByNameAndPath(ctx context.Context, runID int64, name, fpath string) (*ActionArtifact, error) { +func getArtifactByNameAndPath(ctx context.Context, runID, runAttemptID int64, name, fpath string) (*ActionArtifact, error) { var art ActionArtifact - has, err := db.GetEngine(ctx).Where("run_id = ? AND artifact_name = ? AND artifact_path = ?", runID, name, fpath).Get(&art) + has, err := db.GetEngine(ctx).Where("run_id = ? AND run_attempt_id = ? AND artifact_name = ? AND artifact_path = ?", runID, runAttemptID, name, fpath).Get(&art) if err != nil { return nil, err } else if !has { @@ -144,6 +146,7 @@ type FindArtifactsOptions struct { db.ListOptions RepoID int64 RunID int64 + RunAttemptID int64 ArtifactName string Status int FinalizedArtifactsV4 bool @@ -163,6 +166,9 @@ func (opts FindArtifactsOptions) ToConds() builder.Cond { if opts.RunID > 0 { cond = cond.And(builder.Eq{"run_id": opts.RunID}) } + if opts.RunAttemptID > 0 { + cond = cond.And(builder.Eq{"run_attempt_id": opts.RunAttemptID}) + } if opts.ArtifactName != "" { cond = cond.And(builder.Eq{"artifact_name": opts.ArtifactName}) } @@ -185,6 +191,16 @@ type ActionArtifactMeta struct { Status ArtifactStatus } +// ListUploadedArtifactsMetaByRunAttempt returns all uploaded artifacts meta of a run attempt. +func ListUploadedArtifactsMetaByRunAttempt(ctx context.Context, repoID, runAttemptID int64) ([]*ActionArtifactMeta, error) { + arts := make([]*ActionArtifactMeta, 0, 10) + return arts, db.GetEngine(ctx).Table("action_artifact"). + Where("repo_id=? AND run_attempt_id=? AND (status=? OR status=?)", repoID, runAttemptID, ArtifactStatusUploadConfirmed, ArtifactStatusExpired). + GroupBy("artifact_name"). + Select("artifact_name, sum(file_size) as file_size, max(status) as status"). + Find(&arts) +} + // ListUploadedArtifactsMeta returns all uploaded artifacts meta of a run func ListUploadedArtifactsMeta(ctx context.Context, repoID, runID int64) ([]*ActionArtifactMeta, error) { arts := make([]*ActionArtifactMeta, 0, 10) @@ -222,6 +238,12 @@ func SetArtifactNeedDelete(ctx context.Context, runID int64, name string) error return err } +// SetArtifactNeedDeleteByRunAttempt sets an artifact to need-delete in a run attempt, cron job will delete it +func SetArtifactNeedDeleteByRunAttempt(ctx context.Context, runID, runAttemptID int64, name string) error { + _, err := db.GetEngine(ctx).Where("run_id=? AND run_attempt_id=? AND artifact_name=? AND status = ?", runID, runAttemptID, name, ArtifactStatusUploadConfirmed).Cols("status").Update(&ActionArtifact{Status: ArtifactStatusPendingDeletion}) + return err +} + // SetArtifactDeleted sets an artifact to deleted func SetArtifactDeleted(ctx context.Context, artifactID int64) error { _, err := db.GetEngine(ctx).ID(artifactID).Cols("status").Update(&ActionArtifact{Status: ArtifactStatusDeleted}) diff --git a/models/actions/run.go b/models/actions/run.go index 27958d6fb6fba..bc9ab96e0556b 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -49,13 +49,12 @@ type ActionRun struct { TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow Status Status `xorm:"index"` Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed + LatestAttemptID int64 `xorm:"index NOT NULL DEFAULT 0"` RawConcurrency string // raw concurrency - ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` - ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0 Started timeutil.TimeStamp Stopped timeutil.TimeStamp - // PreviousDuration is used for recording previous duration + // PreviousDuration is a cached duration of the attempt immediately preceding the latest attempt. PreviousDuration time.Duration Created timeutil.TimeStamp `xorm:"created"` Updated timeutil.TimeStamp `xorm:"updated"` @@ -156,6 +155,31 @@ func (run *ActionRun) Duration() time.Duration { return calculateDuration(run.Started, run.Stopped, run.Status) + run.PreviousDuration } +func (run *ActionRun) GetLatestAttempt(ctx context.Context) (*RunAttempt, bool, error) { + if run.LatestAttemptID == 0 { + return nil, false, nil + } + attempt, err := GetRunAttemptByRepoAndID(ctx, run.RepoID, run.LatestAttemptID) + if errors.Is(err, util.ErrNotExist) { + return nil, false, nil + } + if err != nil { + return nil, false, err + } + return attempt, true, nil +} + +func (run *ActionRun) GetEffectiveConcurrency(ctx context.Context) (string, bool, error) { + attempt, has, err := run.GetLatestAttempt(ctx) + if err != nil { + return "", false, err + } + if has { + return attempt.ConcurrencyGroup, attempt.ConcurrencyCancel, nil + } + return "", false, nil +} + func (run *ActionRun) GetPushEventPayload() (*api.PushPayload, error) { if run.Event == webhook_module.HookEventPush { var payload api.PushPayload @@ -402,14 +426,19 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error { type ActionRunIndex db.ResourceIndex -func GetConcurrentRunsAndJobs(ctx context.Context, repoID int64, concurrencyGroup string, status []Status) ([]*ActionRun, []*ActionRunJob, error) { - runs, err := db.Find[ActionRun](ctx, &FindRunOptions{ +// GetConcurrentRunAttemptsAndJobs returns run attempts and jobs in the same concurrency group +// whose statuses match the requested non-done statuses. +// Under the current data model, only the latest attempt of a run may remain in a non-done state, +// so filtering by attempt status is sufficient and does not need an extra join back to action_run. +func GetConcurrentRunAttemptsAndJobs(ctx context.Context, repoID int64, concurrencyGroup string, status []Status) ([]*RunAttempt, []*ActionRunJob, error) { + attempts, err := db.Find[RunAttempt](ctx, &FindRunAttemptOptions{ RepoID: repoID, + Statuses: status, ConcurrencyGroup: concurrencyGroup, - Status: status, + ListOptions: db.ListOptionsAll, }) if err != nil { - return nil, nil, fmt.Errorf("find runs: %w", err) + return nil, nil, fmt.Errorf("find run attempts: %w", err) } jobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{ @@ -421,36 +450,34 @@ func GetConcurrentRunsAndJobs(ctx context.Context, repoID int64, concurrencyGrou return nil, nil, fmt.Errorf("find jobs: %w", err) } - return runs, jobs, nil + return attempts, jobs, nil } -func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) { - if actionRun.ConcurrencyGroup == "" { +func CancelPreviousJobsByRunConcurrency(ctx context.Context, attempt *RunAttempt) ([]*ActionRunJob, error) { + if attempt.ConcurrencyGroup == "" { return nil, nil } var jobsToCancel []*ActionRunJob statusFindOption := []Status{StatusWaiting, StatusBlocked} - if actionRun.ConcurrencyCancel { + if attempt.ConcurrencyCancel { statusFindOption = append(statusFindOption, StatusRunning) } - runs, jobs, err := GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, statusFindOption) + attempts, jobs, err := GetConcurrentRunAttemptsAndJobs(ctx, attempt.RepoID, attempt.ConcurrencyGroup, statusFindOption) if err != nil { return nil, fmt.Errorf("find concurrent runs and jobs: %w", err) } jobsToCancel = append(jobsToCancel, jobs...) // cancel runs in the same concurrency group - for _, run := range runs { - if run.ID == actionRun.ID { + for _, concurrentAttempt := range attempts { + if concurrentAttempt.RunID == attempt.RunID { continue } - jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{ - RunID: run.ID, - }) + jobs, err := GetRunJobsByRunAndAttemptID(ctx, concurrentAttempt.RunID, concurrentAttempt.ID) if err != nil { - return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err) + return nil, fmt.Errorf("find run %d attempt %d jobs: %w", concurrentAttempt.RunID, concurrentAttempt.ID, err) } jobsToCancel = append(jobsToCancel, jobs...) } diff --git a/models/actions/run_attempt.go b/models/actions/run_attempt.go new file mode 100644 index 0000000000000..24d5525e9d631 --- /dev/null +++ b/models/actions/run_attempt.go @@ -0,0 +1,118 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + "slices" + "time" + + "code.gitea.io/gitea/models/db" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" +) + +// RunAttempt represents a single execution attempt of an ActionRun. +type RunAttempt struct { + ID int64 + RepoID int64 `xorm:"index"` + RunID int64 `xorm:"index UNIQUE(run_attempt)"` + Run *ActionRun `xorm:"-"` + Attempt int64 `xorm:"UNIQUE(run_attempt)"` + + TriggerUserID int64 `xorm:"index"` + TriggerUser *user_model.User `xorm:"-"` + + ConcurrencyGroup string + ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` + + Status Status `xorm:"index"` + Started timeutil.TimeStamp + RunStartedAt timeutil.TimeStamp + Stopped timeutil.TimeStamp + + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated"` +} + +func (*RunAttempt) TableName() string { + return "action_run_attempt" +} + +func init() { + db.RegisterModel(new(RunAttempt)) +} + +func (attempt *RunAttempt) Duration() time.Duration { + return calculateDuration(attempt.Started, attempt.Stopped, attempt.Status) +} + +func GetRunAttemptByRepoAndID(ctx context.Context, repoID, attemptID int64) (*RunAttempt, error) { + var attempt RunAttempt + has, err := db.GetEngine(ctx).Where("repo_id=? AND id=?", repoID, attemptID).Get(&attempt) + if err != nil { + return nil, err + } else if !has { + return nil, fmt.Errorf("run attempt %d in repo %d: %w", attemptID, repoID, util.ErrNotExist) + } + return &attempt, nil +} + +func GetLatestAttemptByRunID(ctx context.Context, runID int64) (*RunAttempt, bool, error) { + var attempt RunAttempt + has, err := db.GetEngine(ctx).Where("run_id=?", runID).Desc("attempt").Get(&attempt) + if err != nil { + return nil, false, err + } else if !has { + return nil, false, nil + } + return &attempt, true, nil +} + +func GetRunAttemptByRunIDAndAttemptNum(ctx context.Context, runID, attemptNum int64) (*RunAttempt, error) { + var attempt RunAttempt + has, err := db.GetEngine(ctx).Where("run_id=? AND attempt=?", runID, attemptNum).Get(&attempt) + if err != nil { + return nil, err + } else if !has { + return nil, fmt.Errorf("run attempt %d for run %d: %w", attemptNum, runID, util.ErrNotExist) + } + return &attempt, nil +} + +func ListRunAttemptsByRunID(ctx context.Context, runID int64) ([]*RunAttempt, error) { + return db.Find[RunAttempt](ctx, &FindRunAttemptOptions{ + RunID: runID, + ListOptions: db.ListOptionsAll, + }) +} + +func UpdateRunAttempt(ctx context.Context, attempt *RunAttempt, cols ...string) error { + sess := db.GetEngine(ctx).ID(attempt.ID) + if len(cols) > 0 { + sess.Cols(cols...) + } + if _, err := sess.Update(attempt); err != nil { + return err + } + + if len(cols) > 0 && !slices.Contains(cols, "status") && !slices.Contains(cols, "started") && !slices.Contains(cols, "stopped") { + return nil + } + + run, err := GetRunByRepoAndID(ctx, attempt.RepoID, attempt.RunID) + if err != nil { + return err + } + if run.LatestAttemptID != attempt.ID { + return nil + } + + run.Status = attempt.Status + run.Started = attempt.Started + run.Stopped = attempt.Stopped + return UpdateRun(ctx, run, "status", "started", "stopped") +} diff --git a/models/actions/run_attempt_list.go b/models/actions/run_attempt_list.go new file mode 100644 index 0000000000000..9cedaead72592 --- /dev/null +++ b/models/actions/run_attempt_list.go @@ -0,0 +1,75 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + "code.gitea.io/gitea/models/db" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/container" + + "xorm.io/builder" +) + +type RunAttemptList []*RunAttempt + +// GetUserIDs returns a slice of user's id +func (attempts RunAttemptList) GetUserIDs() []int64 { + return container.FilterSlice(attempts, func(attempt *RunAttempt) (int64, bool) { + return attempt.TriggerUserID, true + }) +} + +func (attempts RunAttemptList) LoadTriggerUser(ctx context.Context) error { + userIDs := attempts.GetUserIDs() + users := make(map[int64]*user_model.User, len(userIDs)) + if err := db.GetEngine(ctx).In("id", userIDs).Find(&users); err != nil { + return err + } + for _, attempt := range attempts { + if attempt.TriggerUserID == user_model.ActionsUserID { + attempt.TriggerUser = user_model.NewActionsUser() + } else { + attempt.TriggerUser = users[attempt.TriggerUserID] + if attempt.TriggerUser == nil { + attempt.TriggerUser = user_model.NewGhostUser() + } + } + } + return nil +} + +type FindRunAttemptOptions struct { + db.ListOptions + RepoID int64 + RunID int64 + Attempt int64 + Statuses []Status + ConcurrencyGroup string +} + +func (opts FindRunAttemptOptions) ToConds() builder.Cond { + cond := builder.NewCond() + if opts.RepoID > 0 { + cond = cond.And(builder.Eq{"`action_run_attempt`.repo_id": opts.RepoID}) + } + if opts.RunID > 0 { + cond = cond.And(builder.Eq{"`action_run_attempt`.run_id": opts.RunID}) + } + if opts.Attempt > 0 { + cond = cond.And(builder.Eq{"`action_run_attempt`.attempt": opts.Attempt}) + } + if len(opts.Statuses) > 0 { + cond = cond.And(builder.In("`action_run_attempt`.status", opts.Statuses)) + } + if opts.ConcurrencyGroup != "" { + cond = cond.And(builder.Eq{"`action_run_attempt`.concurrency_group": opts.ConcurrencyGroup}) + } + return cond +} + +func (opts FindRunAttemptOptions) ToOrders() string { + return "`action_run_attempt`.`attempt` DESC" +} diff --git a/models/actions/run_job.go b/models/actions/run_job.go index f89f4e9f876a3..d0baa0a467326 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -27,6 +27,7 @@ const MaxJobNumPerRun = 256 type ActionRunJob struct { ID int64 RunID int64 `xorm:"index"` + RunAttemptID int64 `xorm:"index NOT NULL DEFAULT 0"` Run *ActionRun `xorm:"-"` RepoID int64 `xorm:"index(repo_concurrency)"` Repo *repo_model.Repository `xorm:"-"` @@ -34,7 +35,7 @@ type ActionRunJob struct { CommitSHA string `xorm:"index"` IsForkPullRequest bool Name string `xorm:"VARCHAR(255)"` - Attempt int64 + Attempt int64 // redundant attempt number copied from the owning RunAttempt for compatibility // WorkflowPayload is act/jobparser.SingleWorkflow for act/jobparser.Parse // it should contain exactly one job with global workflow fields for this model @@ -43,8 +44,11 @@ type ActionRunJob struct { JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id Needs []string `xorm:"JSON TEXT"` RunsOn []string `xorm:"JSON TEXT"` - TaskID int64 // the latest task of the job - Status Status `xorm:"index"` + + TaskID int64 // the latest task created by this job in its own attempt + SourceTaskID int64 `xorm:"NOT NULL DEFAULT 0"` // SourceTaskID points to a historical task when this job reuses an earlier attempt's result. + + Status Status `xorm:"index"` RawConcurrency string // raw concurrency from job YAML's "concurrency" section @@ -75,6 +79,13 @@ func (job *ActionRunJob) Duration() time.Duration { return calculateDuration(job.Started, job.Stopped, job.Status) } +func (job *ActionRunJob) EffectiveTaskID() int64 { + if job.TaskID > 0 { + return job.TaskID + } + return job.SourceTaskID +} + func (job *ActionRunJob) LoadRun(ctx context.Context) error { if job.Run == nil { run, err := GetRunByRepoAndID(ctx, job.RepoID, job.RunID) @@ -152,7 +163,30 @@ func GetRunJobByRunAndID(ctx context.Context, runID, jobID int64) (*ActionRunJob return &job, nil } +// GetRunJobsByRunID returns the current jobs for a run. +// It prefers the latest attempt when one exists, and falls back to legacy jobs with run_attempt_id=0 for runs created before RunAttempt existed. func GetRunJobsByRunID(ctx context.Context, runID int64) (ActionJobList, error) { + if run, ok, err := db.GetByID[ActionRun](ctx, runID); err != nil { + return nil, err + } else if ok && run.LatestAttemptID > 0 { + jobs, err := GetRunJobsByRunAndAttemptID(ctx, runID, run.LatestAttemptID) + if err != nil { + return nil, err + } + if len(jobs) > 0 { + return jobs, nil + } + } + + var jobs []*ActionRunJob + if err := db.GetEngine(ctx).Where("run_id=? AND run_attempt_id=0", runID).OrderBy("id").Find(&jobs); err != nil { + return nil, err + } + return jobs, nil +} + +// GetAllRunJobsByRunID returns all jobs for a run across all attempts. +func GetAllRunJobsByRunID(ctx context.Context, runID int64) (ActionJobList, error) { var jobs []*ActionRunJob if err := db.GetEngine(ctx).Where("run_id=?", runID).OrderBy("id").Find(&jobs); err != nil { return nil, err @@ -160,6 +194,16 @@ func GetRunJobsByRunID(ctx context.Context, runID int64) (ActionJobList, error) return jobs, nil } +// GetRunJobsByRunAndAttemptID returns jobs for a run within a specific attempt. +// runAttemptID may be 0 to address legacy jobs that were created before RunAttempt existed and therefore have no attempt association. +func GetRunJobsByRunAndAttemptID(ctx context.Context, runID, runAttemptID int64) (ActionJobList, error) { + var jobs []*ActionRunJob + if err := db.GetEngine(ctx).Where("run_id=? AND run_attempt_id=?", runID, runAttemptID).OrderBy("id").Find(&jobs); err != nil { + return nil, err + } + return jobs, nil +} + func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, cols ...string) (int64, error) { e := db.GetEngine(ctx) @@ -196,25 +240,46 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col } { - // Other goroutines may aggregate the status of the run and update it too. - // So we need load the run and its jobs before updating the run. - run, err := GetRunByRepoAndID(ctx, job.RepoID, job.RunID) - if err != nil { - return 0, err - } - jobs, err := GetRunJobsByRunID(ctx, job.RunID) - if err != nil { - return 0, err - } - run.Status = AggregateJobStatus(jobs) - if run.Started.IsZero() && run.Status.IsRunning() { - run.Started = timeutil.TimeStampNow() - } - if run.Stopped.IsZero() && run.Status.IsDone() { - run.Stopped = timeutil.TimeStampNow() - } - if err := UpdateRun(ctx, run, "status", "started", "stopped"); err != nil { - return 0, fmt.Errorf("update run %d: %w", run.ID, err) + // Other goroutines may aggregate the status of the attempt/run and update it too. + // So we need to load the current jobs before updating the aggregate state. + if job.RunAttemptID > 0 { + attempt, err := GetRunAttemptByRepoAndID(ctx, job.RepoID, job.RunAttemptID) + if err != nil { + return 0, err + } + jobs, err := GetRunJobsByRunAndAttemptID(ctx, job.RunID, job.RunAttemptID) + if err != nil { + return 0, err + } + attempt.Status = AggregateJobStatus(jobs) + if attempt.Started.IsZero() && attempt.Status.IsRunning() { + attempt.Started = timeutil.TimeStampNow() + } + if attempt.Stopped.IsZero() && attempt.Status.IsDone() { + attempt.Stopped = timeutil.TimeStampNow() + } + if err := UpdateRunAttempt(ctx, attempt, "status", "started", "stopped"); err != nil { + return 0, fmt.Errorf("update run attempt %d: %w", attempt.ID, err) + } + } else { + run, err := GetRunByRepoAndID(ctx, job.RepoID, job.RunID) + if err != nil { + return 0, err + } + jobs, err := GetRunJobsByRunID(ctx, job.RunID) + if err != nil { + return 0, err + } + run.Status = AggregateJobStatus(jobs) + if run.Started.IsZero() && run.Status.IsRunning() { + run.Started = timeutil.TimeStampNow() + } + if run.Stopped.IsZero() && run.Status.IsDone() { + run.Stopped = timeutil.TimeStampNow() + } + if err := UpdateRun(ctx, run, "status", "started", "stopped"); err != nil { + return 0, fmt.Errorf("update run %d: %w", run.ID, err) + } } } @@ -269,7 +334,7 @@ func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) if job.ConcurrencyCancel { statusFindOption = append(statusFindOption, StatusRunning) } - runs, jobs, err := GetConcurrentRunsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, statusFindOption) + attempts, jobs, err := GetConcurrentRunAttemptsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, statusFindOption) if err != nil { return nil, fmt.Errorf("find concurrent runs and jobs: %w", err) } @@ -277,12 +342,13 @@ func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) jobsToCancel = append(jobsToCancel, jobs...) // cancel runs in the same concurrency group - for _, run := range runs { - jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{ - RunID: run.ID, - }) + for _, attempt := range attempts { + if attempt.ID == job.RunAttemptID { + continue + } + jobs, err := GetRunJobsByRunAndAttemptID(ctx, attempt.RunID, attempt.ID) if err != nil { - return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err) + return nil, fmt.Errorf("find run %d attempt %d jobs: %w", attempt.RunID, attempt.ID, err) } jobsToCancel = append(jobsToCancel, jobs...) } diff --git a/models/actions/run_job_list.go b/models/actions/run_job_list.go index 10f76d3641b6f..9f8156d8b0cc3 100644 --- a/models/actions/run_job_list.go +++ b/models/actions/run_job_list.go @@ -70,6 +70,7 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err type FindRunJobOptions struct { db.ListOptions RunID int64 + RunAttemptID int64 RepoID int64 OwnerID int64 CommitSHA string @@ -83,6 +84,9 @@ func (opts FindRunJobOptions) ToConds() builder.Cond { if opts.RunID > 0 { cond = cond.And(builder.Eq{"`action_run_job`.run_id": opts.RunID}) } + if opts.RunAttemptID > 0 { + cond = cond.And(builder.Eq{"`action_run_job`.run_attempt_id": opts.RunAttemptID}) + } if opts.RepoID > 0 { cond = cond.And(builder.Eq{"`action_run_job`.repo_id": opts.RepoID}) } diff --git a/models/actions/run_list.go b/models/actions/run_list.go index 2628c4712f59f..af28e627a7869 100644 --- a/models/actions/run_list.go +++ b/models/actions/run_list.go @@ -102,12 +102,6 @@ func (opts FindRunOptions) ToConds() builder.Cond { if opts.CommitSHA != "" { cond = cond.And(builder.Eq{"`action_run`.commit_sha": opts.CommitSHA}) } - if len(opts.ConcurrencyGroup) > 0 { - if opts.RepoID == 0 { - panic("Invalid FindRunOptions: repo_id is required") - } - cond = cond.And(builder.Eq{"`action_run`.concurrency_group": opts.ConcurrencyGroup}) - } return cond } diff --git a/models/actions/task.go b/models/actions/task.go index e092d6fbbd948..14c843cf982d9 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -273,7 +273,6 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask } now := timeutil.TimeStampNow() - job.Attempt++ job.Started = now job.Status = StatusRunning diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index db74ff78d5040..ac796146e1d5a 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -405,6 +405,7 @@ func prepareMigrationTasks() []*migration { newMigration(328, "Add TokenPermissions column to ActionRunJob", v1_26.AddTokenPermissionsToActionRunJob), newMigration(329, "Add unique constraint for user badge", v1_26.AddUniqueIndexForUserBadge), newMigration(330, "Add name column to webhook", v1_26.AddNameToWebhook), + newMigration(331, "Add RunAttempt model and related action fields", v1_26.AddRunAttemptModel), } return preparedMigrations } diff --git a/models/migrations/v1_26/v331.go b/models/migrations/v1_26/v331.go new file mode 100644 index 0000000000000..20d1466bcb821 --- /dev/null +++ b/models/migrations/v1_26/v331.go @@ -0,0 +1,113 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v1_26 + +import ( + "context" + + "code.gitea.io/gitea/models/migrations/base" + "code.gitea.io/gitea/modules/timeutil" + + "xorm.io/xorm" +) + +type actionRunAttempt struct { + ID int64 + RepoID int64 `xorm:"index"` + RunID int64 `xorm:"index UNIQUE(run_attempt)"` + Attempt int64 `xorm:"UNIQUE(run_attempt)"` + TriggerUserID int64 `xorm:"index"` + Status int `xorm:"index"` + Started timeutil.TimeStamp + RunStartedAt timeutil.TimeStamp + Stopped timeutil.TimeStamp + ConcurrencyGroup string + ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated"` +} + +func (actionRunAttempt) TableName() string { + return "action_run_attempt" +} + +type actionArtifact struct { + ID int64 `xorm:"pk autoincr"` + RunID int64 `xorm:"index unique(runid_attempt_name_path)"` + RunAttemptID int64 `xorm:"index unique(runid_attempt_name_path) NOT NULL DEFAULT 0"` + RunnerID int64 + RepoID int64 `xorm:"index"` + OwnerID int64 + CommitSHA string + StoragePath string + FileSize int64 + FileCompressedSize int64 + ContentEncoding string `xorm:"content_encoding"` + ArtifactPath string `xorm:"index unique(runid_attempt_name_path)"` + ArtifactName string `xorm:"index unique(runid_attempt_name_path)"` + Status int `xorm:"index"` + CreatedUnix timeutil.TimeStamp `xorm:"created"` + UpdatedUnix timeutil.TimeStamp `xorm:"updated index"` + ExpiredUnix timeutil.TimeStamp `xorm:"index"` +} + +func (actionArtifact) TableName() string { + return "action_artifact" +} + +// AddRunAttemptModel adds the RunAttempt table and the supporting ActionRun/ActionRunJob fields. +func AddRunAttemptModel(x *xorm.Engine) error { + if _, err := x.SyncWithOptions(xorm.SyncOptions{ + IgnoreDropIndices: true, + }, new(actionRunAttempt)); err != nil { + return err + } + + type ActionRun struct { + LatestAttemptID int64 `xorm:"index NOT NULL DEFAULT 0"` + } + if _, err := x.SyncWithOptions(xorm.SyncOptions{ + IgnoreDropIndices: true, + }, new(ActionRun)); err != nil { + return err + } + + type ActionRunJob struct { + RunAttemptID int64 `xorm:"index NOT NULL DEFAULT 0"` + SourceTaskID int64 `xorm:"NOT NULL DEFAULT 0"` + } + if _, err := x.SyncWithOptions(xorm.SyncOptions{ + IgnoreDropIndices: true, + }, new(ActionRunJob)); err != nil { + return err + } + + if _, err := x.SyncWithOptions(xorm.SyncOptions{ + IgnoreDropIndices: true, + }, new(actionArtifact)); err != nil { + return err + } + + if err := base.RecreateTables(new(actionArtifact))(x); err != nil { + return err + } + + concurrencyColumns := make([]string, 0, 2) + for _, col := range []string{"concurrency_group", "concurrency_cancel"} { + exist, err := x.Dialect().IsColumnExist(x.DB(), context.Background(), "action_run", col) + if err != nil { + return err + } + if exist { + concurrencyColumns = append(concurrencyColumns, col) + } + } + if len(concurrencyColumns) == 0 { + return nil + } + + sess := x.NewSession() + defer sess.Close() + return base.DropTableColumns(sess, "action_run", concurrencyColumns...) +} diff --git a/options/locale/locale_en-US.json b/options/locale/locale_en-US.json index acb5a25087c57..cc783daaee211 100644 --- a/options/locale/locale_en-US.json +++ b/options/locale/locale_en-US.json @@ -3755,6 +3755,8 @@ "actions.runs.workflow_graph": "Workflow Graph", "actions.runs.summary": "Summary", "actions.runs.all_jobs": "All jobs", + "actions.runs.attempt": "Attempt", + "actions.runs.latest_attempt": "Latest attempt", "actions.runs.triggered_via": "Triggered via %s", "actions.runs.total_duration": "Total duration:", "actions.workflow.disable": "Disable Workflow", diff --git a/routers/api/actions/artifacts.go b/routers/api/actions/artifacts.go index 13cbecb5cd031..68756bce947f4 100644 --- a/routers/api/actions/artifacts.go +++ b/routers/api/actions/artifacts.go @@ -310,7 +310,7 @@ func (ar artifactRoutes) confirmUploadArtifact(ctx *ArtifactContext) { ctx.HTTPError(http.StatusBadRequest, "Error artifact name is empty") return } - if err := mergeChunksForRun(ctx, ar.fs, runID, artifactName); err != nil { + if err := mergeChunksForRun(ctx, ar.fs, runID, ctx.ActionTask.Job.RunAttemptID, artifactName); err != nil { log.Error("Error merge chunks: %v", err) ctx.HTTPError(http.StatusInternalServerError, "Error merge chunks") return @@ -338,8 +338,9 @@ func (ar artifactRoutes) listArtifacts(ctx *ArtifactContext) { } artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{ - RunID: runID, - Status: int(actions.ArtifactStatusUploadConfirmed), + RunID: runID, + RunAttemptID: ctx.ActionTask.Job.RunAttemptID, + Status: int(actions.ArtifactStatusUploadConfirmed), }) if err != nil { log.Error("Error getting artifacts: %v", err) @@ -404,6 +405,7 @@ func (ar artifactRoutes) getDownloadArtifactURL(ctx *ArtifactContext) { artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{ RunID: runID, + RunAttemptID: ctx.ActionTask.Job.RunAttemptID, ArtifactName: itemPath, Status: int(actions.ArtifactStatusUploadConfirmed), }) @@ -477,6 +479,11 @@ func (ar artifactRoutes) downloadArtifact(ctx *ArtifactContext) { ctx.HTTPError(http.StatusBadRequest) return } + if ctx.ActionTask.Job.RunAttemptID > 0 && artifact.RunAttemptID != ctx.ActionTask.Job.RunAttemptID { + log.Error("Error mismatch runAttemptID and artifactID, task: %v, artifact: %v", ctx.ActionTask.Job.RunAttemptID, artifactID) + ctx.HTTPError(http.StatusBadRequest) + return + } if artifact.Status != actions.ArtifactStatusUploadConfirmed { log.Error("Error artifact not found: %s", artifact.Status.ToString()) ctx.HTTPError(http.StatusNotFound, "Error artifact not found") diff --git a/routers/api/actions/artifacts_chunks.go b/routers/api/actions/artifacts_chunks.go index 8d04c689221a6..a2718f9d12b80 100644 --- a/routers/api/actions/artifacts_chunks.go +++ b/routers/api/actions/artifacts_chunks.go @@ -257,10 +257,11 @@ func listOrderedChunksForArtifact(st storage.ObjectStorage, runID, artifactID in return emptyListAsError(chunks) } -func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error { +func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID, runAttemptID int64, artifactName string) error { // read all db artifacts by name artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{ RunID: runID, + RunAttemptID: runAttemptID, ArtifactName: artifactName, }) if err != nil { diff --git a/routers/api/actions/artifactsv4.go b/routers/api/actions/artifactsv4.go index e86645cb0cf1b..d9effcc064be2 100644 --- a/routers/api/actions/artifactsv4.go +++ b/routers/api/actions/artifactsv4.go @@ -266,9 +266,9 @@ func (r *artifactV4Routes) verifySignature(ctx *ArtifactContext, endp string) (* return task, artifactName, true } -func (r *artifactV4Routes) getArtifactByName(ctx *ArtifactContext, runID int64, name string) (*actions_model.ActionArtifact, error) { +func (r *artifactV4Routes) getArtifactByName(ctx *ArtifactContext, runID, runAttemptID int64, name string) (*actions_model.ActionArtifact, error) { var art actions_model.ActionArtifact - has, err := db.GetEngine(ctx).Where(builder.Eq{"run_id": runID, "artifact_name": name}, builder.Like{"content_encoding", "%/%"}).Get(&art) + has, err := db.GetEngine(ctx).Where(builder.Eq{"run_id": runID, "run_attempt_id": runAttemptID, "artifact_name": name}, builder.Like{"content_encoding", "%/%"}).Get(&art) if err != nil { return nil, err } else if !has { @@ -388,7 +388,7 @@ func (r *artifactV4Routes) uploadArtifact(ctx *ArtifactContext) { switch comp { case "block", "appendBlock": // get artifact by name - artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName) + artifact, err := r.getArtifactByName(ctx, task.Job.RunID, task.Job.RunAttemptID, artifactName) if err != nil { log.Error("Error artifact not found: %v", err) ctx.HTTPError(http.StatusNotFound, "Error artifact not found") @@ -475,7 +475,7 @@ func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) { } // get artifact by name - artifact, err := r.getArtifactByName(ctx, runID, req.Name) + artifact, err := r.getArtifactByName(ctx, runID, ctx.ActionTask.Job.RunAttemptID, req.Name) if err != nil { log.Error("Error artifact not found: %v", err) ctx.HTTPError(http.StatusNotFound, "Error artifact not found") @@ -589,6 +589,7 @@ func (r *artifactV4Routes) listArtifacts(ctx *ArtifactContext) { artifacts, err := db.Find[actions_model.ActionArtifact](ctx, actions_model.FindArtifactsOptions{ RunID: runID, + RunAttemptID: ctx.ActionTask.Job.RunAttemptID, Status: int(actions_model.ArtifactStatusUploadConfirmed), FinalizedArtifactsV4: true, }) @@ -642,7 +643,7 @@ func (r *artifactV4Routes) getSignedArtifactURL(ctx *ArtifactContext) { artifactName := req.Name // get artifact by name - artifact, err := r.getArtifactByName(ctx, runID, artifactName) + artifact, err := r.getArtifactByName(ctx, runID, ctx.ActionTask.Job.RunAttemptID, artifactName) if err != nil { log.Error("Error artifact not found: %v", err) ctx.HTTPError(http.StatusNotFound, "Error artifact not found") @@ -676,7 +677,7 @@ func (r *artifactV4Routes) downloadArtifact(ctx *ArtifactContext) { } // get artifact by name - artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName) + artifact, err := r.getArtifactByName(ctx, task.Job.RunID, task.Job.RunAttemptID, artifactName) if err != nil { log.Error("Error artifact not found: %v", err) ctx.HTTPError(http.StatusNotFound, "Error artifact not found") @@ -707,14 +708,14 @@ func (r *artifactV4Routes) deleteArtifact(ctx *ArtifactContext) { } // get artifact by name - artifact, err := r.getArtifactByName(ctx, runID, req.Name) + artifact, err := r.getArtifactByName(ctx, runID, ctx.ActionTask.Job.RunAttemptID, req.Name) if err != nil { log.Error("Error artifact not found: %v", err) ctx.HTTPError(http.StatusNotFound, "Error artifact not found") return } - err = actions_model.SetArtifactNeedDelete(ctx, runID, req.Name) + err = actions_model.SetArtifactNeedDeleteByRunAttempt(ctx, runID, ctx.ActionTask.Job.RunAttemptID, req.Name) if err != nil { log.Error("Error deleting artifacts: %v", err) ctx.HTTPError(http.StatusInternalServerError, err.Error()) diff --git a/routers/api/v1/repo/action.go b/routers/api/v1/repo/action.go index 7ac8a10575cfb..bebd86c471899 100644 --- a/routers/api/v1/repo/action.go +++ b/routers/api/v1/repo/action.go @@ -1367,7 +1367,7 @@ func RerunWorkflowJob(ctx *context.APIContext) { } targetJob := jobs[jobIdx] - if err := actions_service.RerunWorkflowRunJobs(ctx, ctx.Repo.Repository, run, actions_service.GetAllRerunJobs(targetJob, jobs)); err != nil { + if err := actions_service.RerunWorkflowRunJobs(ctx, ctx.Repo.Repository, run, []*actions_model.ActionRunJob{targetJob}); err != nil { handleWorkflowRerunError(ctx, err) return } diff --git a/routers/common/actions.go b/routers/common/actions.go index 4eb7078db6754..2b83e5d84235d 100644 --- a/routers/common/actions.go +++ b/routers/common/actions.go @@ -31,7 +31,8 @@ func DownloadActionsRunJobLogs(ctx *context.Base, ctxRepo *repo_model.Repository return util.NewNotExistErrorf("job not found") } - if curJob.TaskID == 0 { + taskID := curJob.EffectiveTaskID() + if taskID == 0 { return util.NewNotExistErrorf("job not started") } @@ -39,7 +40,7 @@ func DownloadActionsRunJobLogs(ctx *context.Base, ctxRepo *repo_model.Repository return fmt.Errorf("LoadRun: %w", err) } - task, err := actions_model.GetTaskByID(ctx, curJob.TaskID) + task, err := actions_model.GetTaskByID(ctx, taskID) if err != nil { return fmt.Errorf("GetTaskByID: %w", err) } diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index f92df685fda13..adb0b9ec04d5e 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -203,9 +203,26 @@ func View(ctx *context_module.Context) { if ctx.Written() { return } - ctx.Data["RunID"] = run.ID - ctx.Data["JobID"] = ctx.PathParamInt64("job") // it can be 0 when no job (e.g.: run summary view) - ctx.Data["ActionsURL"] = ctx.Repo.RepoLink + "/actions" + run.Repo = ctx.Repo.Repository + + jobID := ctx.PathParamInt64("job") + ctx.Data["JobID"] = jobID // it can be 0 when no job (e.g.: run summary view) + + switch { + case ctx.PathParamInt64("attempt") > 0: + attempt, err := actions_model.GetRunAttemptByRunIDAndAttemptNum(ctx, run.ID, ctx.PathParamInt64("attempt")) + if err != nil { + ctx.NotFoundOrServerError("GetRunAttemptByRunIDAndAttempt", func(err error) bool { + return errors.Is(err, util.ErrNotExist) + }, err) + return + } + ctx.Data["ViewURL"] = getRunViewLink(run, attempt) + case jobID > 0: + ctx.Data["ViewURL"] = fmt.Sprintf("%s/jobs/%d", run.Link(), jobID) + default: + ctx.Data["ViewURL"] = run.Link() + } ctx.HTML(http.StatusOK, tplViewActions) } @@ -258,22 +275,27 @@ type ViewResponse struct { State struct { Run struct { - RepoID int64 `json:"repoId"` - Link string `json:"link"` - Title string `json:"title"` - TitleHTML template.HTML `json:"titleHTML"` - Status string `json:"status"` - CanCancel bool `json:"canCancel"` - CanApprove bool `json:"canApprove"` // the run needs an approval and the doer has permission to approve - CanRerun bool `json:"canRerun"` - CanRerunFailed bool `json:"canRerunFailed"` - CanDeleteArtifact bool `json:"canDeleteArtifact"` - Done bool `json:"done"` - WorkflowID string `json:"workflowID"` - WorkflowLink string `json:"workflowLink"` - IsSchedule bool `json:"isSchedule"` - Jobs []*ViewJob `json:"jobs"` - Commit ViewCommit `json:"commit"` + RepoID int64 `json:"repoId"` + Link string `json:"link"` + ViewLink string `json:"viewLink"` + Title string `json:"title"` + TitleHTML template.HTML `json:"titleHTML"` + Status string `json:"status"` + CanCancel bool `json:"canCancel"` + CanApprove bool `json:"canApprove"` // the run needs an approval and the doer has permission to approve + CanRerun bool `json:"canRerun"` + CanRerunFailed bool `json:"canRerunFailed"` + CanDeleteArtifact bool `json:"canDeleteArtifact"` + Done bool `json:"done"` + WorkflowID string `json:"workflowID"` + WorkflowLink string `json:"workflowLink"` + IsSchedule bool `json:"isSchedule"` + RunAttempt int64 `json:"runAttempt"` + IsLatestAttempt bool `json:"isLatestAttempt"` + ReadOnlyAttemptView bool `json:"readOnlyAttemptView"` + Attempts []*ViewRunAttempt `json:"attempts"` + Jobs []*ViewJob `json:"jobs"` + Commit ViewCommit `json:"commit"` // Summary view: run duration and trigger time/event Duration string `json:"duration"` TriggeredAt int64 `json:"triggeredAt"` // unix seconds for relative time @@ -292,6 +314,7 @@ type ViewResponse struct { type ViewJob struct { ID int64 `json:"id"` + Link string `json:"link"` JobID string `json:"jobId,omitempty"` Name string `json:"name"` Status string `json:"status"` @@ -300,6 +323,18 @@ type ViewJob struct { Needs []string `json:"needs,omitempty"` } +type ViewRunAttempt struct { + Attempt int64 `json:"attempt"` + Status string `json:"status"` + Done bool `json:"done"` + Link string `json:"link"` + Current bool `json:"current"` + Latest bool `json:"latest"` + TriggeredAt int64 `json:"triggeredAt"` + TriggerUserName string `json:"triggerUserName"` + TriggerUserLink string `json:"triggerUserLink"` +} + type ViewCommit struct { ShortSha string `json:"shortSHA"` Link string `json:"link"` @@ -352,8 +387,23 @@ func getActionsViewArtifacts(ctx context.Context, repoID, runID int64) (artifact return artifactsViewItems, nil } +func getActionsViewArtifactsByAttempt(ctx context.Context, repoID, runAttemptID int64) (artifactsViewItems []*ArtifactsViewItem, err error) { + artifacts, err := actions_model.ListUploadedArtifactsMetaByRunAttempt(ctx, repoID, runAttemptID) + if err != nil { + return nil, err + } + for _, art := range artifacts { + artifactsViewItems = append(artifactsViewItems, &ArtifactsViewItem{ + Name: art.ArtifactName, + Size: art.FileSize, + Status: util.Iif(art.Status == actions_model.ArtifactStatusExpired, "expired", "completed"), + }) + } + return artifactsViewItems, nil +} + func ViewPost(ctx *context_module.Context) { - run, jobs := getCurrentRunJobsByPathParam(ctx) + run, attempt, isLatestAttempt, jobs := getCurrentRunJobsByPathParam(ctx) if ctx.Written() { return } @@ -363,7 +413,7 @@ func ViewPost(ctx *context_module.Context) { } resp := &ViewResponse{} - fillViewRunResponseSummary(ctx, resp, run, jobs) + fillViewRunResponseSummary(ctx, resp, run, attempt, isLatestAttempt, jobs) if ctx.Written() { return } @@ -374,23 +424,34 @@ func ViewPost(ctx *context_module.Context) { ctx.JSON(http.StatusOK, resp) } -func fillViewRunResponseSummary(ctx *context_module.Context, resp *ViewResponse, run *actions_model.ActionRun, jobs []*actions_model.ActionRunJob) { - var err error - resp.Artifacts, err = getActionsViewArtifacts(ctx, ctx.Repo.Repository.ID, run.ID) - if err != nil { - ctx.ServerError("getActionsViewArtifacts", err) - return - } - +func fillViewRunResponseSummary(ctx *context_module.Context, resp *ViewResponse, run *actions_model.ActionRun, attempt *actions_model.RunAttempt, isLatestAttempt bool, jobs []*actions_model.ActionRunJob) { resp.State.Run.RepoID = ctx.Repo.Repository.ID // the title for the "run" is from the commit message resp.State.Run.Title = run.Title resp.State.Run.TitleHTML = templates.NewRenderUtils(ctx).RenderCommitMessage(run.Title, ctx.Repo.Repository) resp.State.Run.Link = run.Link() - resp.State.Run.CanCancel = !run.Status.IsDone() && ctx.Repo.CanWrite(unit.TypeActions) - resp.State.Run.CanApprove = run.NeedApproval && ctx.Repo.CanWrite(unit.TypeActions) - resp.State.Run.CanRerun = run.Status.IsDone() && ctx.Repo.CanWrite(unit.TypeActions) - resp.State.Run.CanDeleteArtifact = run.Status.IsDone() && ctx.Repo.CanWrite(unit.TypeActions) + resp.State.Run.ViewLink = getRunViewLink(run, attempt) + if attempt != nil { + resp.State.Run.RunAttempt = attempt.Attempt + } + resp.State.Run.IsLatestAttempt = isLatestAttempt + resp.State.Run.ReadOnlyAttemptView = attempt != nil && !isLatestAttempt + resp.State.Run.Attempts = make([]*ViewRunAttempt, 0) + if attempt != nil { + resp.State.Run.Status = attempt.Status.String() + resp.State.Run.Done = attempt.Status.IsDone() + resp.State.Run.Duration = attempt.Duration().String() + resp.State.Run.TriggeredAt = attempt.Created.AsTime().Unix() + } else { + resp.State.Run.Status = run.Status.String() + resp.State.Run.Done = run.Status.IsDone() + resp.State.Run.Duration = run.Duration().String() + resp.State.Run.TriggeredAt = run.Created.AsTime().Unix() + } + resp.State.Run.CanCancel = isLatestAttempt && !resp.State.Run.Done && ctx.Repo.CanWrite(unit.TypeActions) + resp.State.Run.CanApprove = isLatestAttempt && run.NeedApproval && ctx.Repo.CanWrite(unit.TypeActions) + resp.State.Run.CanRerun = isLatestAttempt && resp.State.Run.Done && ctx.Repo.CanWrite(unit.TypeActions) + resp.State.Run.CanDeleteArtifact = ctx.Repo.CanWrite(unit.TypeActions) if resp.State.Run.CanRerun { for _, job := range jobs { if job.Status == actions_model.StatusFailure || job.Status == actions_model.StatusCancelled { @@ -399,15 +460,16 @@ func fillViewRunResponseSummary(ctx *context_module.Context, resp *ViewResponse, } } } - resp.State.Run.Done = run.Status.IsDone() resp.State.Run.WorkflowID = run.WorkflowID - resp.State.Run.WorkflowLink = run.WorkflowLink() + if isLatestAttempt { + resp.State.Run.WorkflowLink = run.WorkflowLink() + } resp.State.Run.IsSchedule = run.IsSchedule() resp.State.Run.Jobs = make([]*ViewJob, 0, len(jobs)) // marshal to '[]' instead fo 'null' in json - resp.State.Run.Status = run.Status.String() for _, v := range jobs { resp.State.Run.Jobs = append(resp.State.Run.Jobs, &ViewJob{ ID: v.ID, + Link: fmt.Sprintf("%s/jobs/%d", run.Link(), v.ID), JobID: v.JobID, Name: v.Name, Status: v.Status.String(), @@ -417,6 +479,29 @@ func fillViewRunResponseSummary(ctx *context_module.Context, resp *ViewResponse, }) } + attempts, err := actions_model.ListRunAttemptsByRunID(ctx, run.ID) + if err != nil { + ctx.ServerError("ListRunAttemptsByRunID", err) + return + } + if err := actions_model.RunAttemptList(attempts).LoadTriggerUser(ctx); err != nil { + ctx.ServerError("LoadTriggerUser", err) + return + } + for _, runAttempt := range attempts { + resp.State.Run.Attempts = append(resp.State.Run.Attempts, &ViewRunAttempt{ + Attempt: runAttempt.Attempt, + Status: runAttempt.Status.String(), + Done: runAttempt.Status.IsDone(), + Link: getRunViewLink(run, runAttempt), + Current: attempt != nil && runAttempt.ID == attempt.ID, + Latest: run.LatestAttemptID > 0 && runAttempt.ID == run.LatestAttemptID, + TriggeredAt: runAttempt.Created.AsTime().Unix(), + TriggerUserName: runAttempt.TriggerUser.GetDisplayName(), + TriggerUserLink: runAttempt.TriggerUser.HomeLink(), + }) + } + pusher := ViewUser{ DisplayName: run.TriggerUser.GetDisplayName(), Link: run.TriggerUser.HomeLink(), @@ -441,9 +526,18 @@ func fillViewRunResponseSummary(ctx *context_module.Context, resp *ViewResponse, Pusher: pusher, Branch: branch, } - resp.State.Run.Duration = run.Duration().String() - resp.State.Run.TriggeredAt = run.Created.AsTime().Unix() resp.State.Run.TriggerEvent = run.TriggerEvent + + switch { + case attempt != nil: + resp.Artifacts, err = getActionsViewArtifactsByAttempt(ctx, ctx.Repo.Repository.ID, attempt.ID) + case run.LatestAttemptID == 0: + resp.Artifacts, err = getActionsViewArtifacts(ctx, ctx.Repo.Repository.ID, run.ID) + } + if err != nil { + ctx.ServerError("get view artifacts", err) + return + } } func fillViewRunResponseCurrentJob(ctx *context_module.Context, resp *ViewResponse, run *actions_model.ActionRun, jobs []*actions_model.ActionRunJob) { @@ -457,9 +551,9 @@ func fillViewRunResponseCurrentJob(ctx *context_module.Context, resp *ViewRespon } var task *actions_model.ActionTask - if current.TaskID > 0 { + if effectiveTaskID := current.EffectiveTaskID(); effectiveTaskID > 0 { var err error - task, err = actions_model.GetTaskByID(ctx, current.TaskID) + task, err = actions_model.GetTaskByID(ctx, effectiveTaskID) if err != nil { ctx.ServerError("actions_model.GetTaskByID", err) return @@ -587,13 +681,24 @@ func checkRunRerunAllowed(ctx *context_module.Context, run *actions_model.Action return true } +func checkLatestAttempt(ctx *context_module.Context, attempt *actions_model.RunAttempt, isLatestAttempt bool) bool { + if attempt != nil && !isLatestAttempt { + ctx.NotFound(nil) + return false + } + return true +} + // Rerun will rerun jobs in the given run // If jobIDStr is a blank string, it means rerun all jobs func Rerun(ctx *context_module.Context) { - run, jobs := getCurrentRunJobsByPathParam(ctx) + run, attempt, isLatestAttempt, jobs := getCurrentRunJobsByPathParam(ctx) if ctx.Written() { return } + if !checkLatestAttempt(ctx, attempt, isLatestAttempt) { + return + } if !checkRunRerunAllowed(ctx, run) { return } @@ -606,9 +711,7 @@ func Rerun(ctx *context_module.Context) { var jobsToRerun []*actions_model.ActionRunJob if currentJob != nil { - jobsToRerun = actions_service.GetAllRerunJobs(currentJob, jobs) - } else { - jobsToRerun = jobs + jobsToRerun = []*actions_model.ActionRunJob{currentJob} } if err := actions_service.RerunWorkflowRunJobs(ctx, ctx.Repo.Repository, run, jobsToRerun); err != nil { @@ -616,15 +719,18 @@ func Rerun(ctx *context_module.Context) { return } - ctx.JSONOK() + ctx.JSONRedirect(run.Link()) } // RerunFailed reruns all failed jobs in the given run func RerunFailed(ctx *context_module.Context) { - run, jobs := getCurrentRunJobsByPathParam(ctx) + run, attempt, isLatestAttempt, jobs := getCurrentRunJobsByPathParam(ctx) if ctx.Written() { return } + if !checkLatestAttempt(ctx, attempt, isLatestAttempt) { + return + } if !checkRunRerunAllowed(ctx, run) { return } @@ -634,7 +740,7 @@ func RerunFailed(ctx *context_module.Context) { return } - ctx.JSONOK() + ctx.JSONRedirect(run.Link()) } func Logs(ctx *context_module.Context) { @@ -652,10 +758,13 @@ func Logs(ctx *context_module.Context) { } func Cancel(ctx *context_module.Context) { - run, jobs := getCurrentRunJobsByPathParam(ctx) + run, attempt, isLatestAttempt, jobs := getCurrentRunJobsByPathParam(ctx) if ctx.Written() { return } + if !checkLatestAttempt(ctx, attempt, isLatestAttempt) { + return + } var updatedJobs []*actions_model.ActionRunJob @@ -690,78 +799,14 @@ func Approve(ctx *context_module.Context) { if ctx.Written() { return } - approveRuns(ctx, []int64{run.ID}) - if ctx.Written() { - return - } - - ctx.JSONOK() -} - -func approveRuns(ctx *context_module.Context, runIDs []int64) { - doer := ctx.Doer - repo := ctx.Repo.Repository - - updatedJobs := make([]*actions_model.ActionRunJob, 0) - runMap := make(map[int64]*actions_model.ActionRun, len(runIDs)) - runJobs := make(map[int64][]*actions_model.ActionRunJob, len(runIDs)) - - err := db.WithTx(ctx, func(ctx context.Context) (err error) { - for _, runID := range runIDs { - run, err := actions_model.GetRunByRepoAndID(ctx, repo.ID, runID) - if err != nil { - return err - } - runMap[run.ID] = run - run.Repo = repo - run.NeedApproval = false - run.ApprovedBy = doer.ID - if err := actions_model.UpdateRun(ctx, run, "need_approval", "approved_by"); err != nil { - return err - } - jobs, err := actions_model.GetRunJobsByRunID(ctx, run.ID) - if err != nil { - return err - } - runJobs[run.ID] = jobs - for _, job := range jobs { - job.Status, err = actions_service.PrepareToStartJobWithConcurrency(ctx, job) - if err != nil { - return err - } - if job.Status == actions_model.StatusWaiting { - n, err := actions_model.UpdateRunJob(ctx, job, nil, "status") - if err != nil { - return err - } - if n > 0 { - updatedJobs = append(updatedJobs, job) - } - } - } - } - return nil - }) - if err != nil { - ctx.NotFoundOrServerError("approveRuns", func(err error) bool { + if err := actions_service.ApproveRuns(ctx, ctx.Repo.Repository, ctx.Doer, []int64{run.ID}); err != nil { + ctx.NotFoundOrServerError("ApproveRuns", func(err error) bool { return errors.Is(err, util.ErrNotExist) }, err) return } - for runID, run := range runMap { - actions_service.CreateCommitStatusForRunJobs(ctx, run, runJobs[runID]...) - } - - if len(updatedJobs) > 0 { - job := updatedJobs[0] - actions_service.NotifyWorkflowRunStatusUpdateWithReload(ctx, job) - } - - for _, job := range updatedJobs { - _ = job.LoadAttributes(ctx) - notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) - } + ctx.JSONOK() } func Delete(ctx *context_module.Context) { @@ -783,28 +828,127 @@ func Delete(ctx *context_module.Context) { ctx.JSONOK() } -// getRunJobs loads the run and its jobs for runID +func getRunViewLink(run *actions_model.ActionRun, attempt *actions_model.RunAttempt) string { + if attempt == nil || run.LatestAttemptID == attempt.ID { + return run.Link() + } + return fmt.Sprintf("%s/attempts/%d", run.Link(), attempt.Attempt) +} + +// getCurrentRunJobsByPathParam resolves the current run view context from path parameters, including the run, optional attempt, and jobs to render. // Any error will be written to the ctx, empty jobs will also result in 404 error, then the return values are all nil. -func getCurrentRunJobsByPathParam(ctx *context_module.Context) (*actions_model.ActionRun, []*actions_model.ActionRunJob) { +func getCurrentRunJobsByPathParam(ctx *context_module.Context) (*actions_model.ActionRun, *actions_model.RunAttempt, bool, []*actions_model.ActionRunJob) { run := getCurrentRunByPathParam(ctx) if ctx.Written() { - return nil, nil + return nil, nil, false, nil } run.Repo = ctx.Repo.Repository - jobs, err := actions_model.GetRunJobsByRunID(ctx, run.ID) + + var err error + var selectedJob *actions_model.ActionRunJob + if ctx.PathParam("job") != "" { + jobID := ctx.PathParamInt64("job") + selectedJob, err = actions_model.GetRunJobByRunAndID(ctx, run.ID, jobID) + if err != nil { + ctx.NotFoundOrServerError("GetRunJobByRepoAndID", func(err error) bool { + return errors.Is(err, util.ErrNotExist) + }, err) + return nil, nil, false, nil + } + } + + attemptNum := ctx.PathParamInt64("attempt") + var attempt *actions_model.RunAttempt + var isLatestAttempt bool + switch { + case attemptNum > 0: + attempt, err = actions_model.GetRunAttemptByRunIDAndAttemptNum(ctx, run.ID, attemptNum) + if err != nil { + ctx.NotFoundOrServerError("GetRunAttemptByRunIDAndAttempt", func(err error) bool { + return errors.Is(err, util.ErrNotExist) + }, err) + return nil, nil, false, nil + } + isLatestAttempt = run.LatestAttemptID > 0 && run.LatestAttemptID == attempt.ID + case selectedJob != nil && selectedJob.RunAttemptID > 0: + attempt, err = actions_model.GetRunAttemptByRepoAndID(ctx, selectedJob.RepoID, selectedJob.RunAttemptID) + if err != nil { + ctx.NotFoundOrServerError("GetRunAttemptByRepoAndID", func(err error) bool { + return errors.Is(err, util.ErrNotExist) + }, err) + return nil, nil, false, nil + } + isLatestAttempt = run.LatestAttemptID > 0 && run.LatestAttemptID == attempt.ID + default: + attempt, _, err = run.GetLatestAttempt(ctx) + if err != nil { + ctx.NotFoundOrServerError("GetLatestAttempt", func(err error) bool { + return errors.Is(err, util.ErrNotExist) + }, err) + return nil, nil, false, nil + } + isLatestAttempt = true + } + + var jobs []*actions_model.ActionRunJob + switch { + case attempt != nil: + jobs, err = actions_model.GetRunJobsByRunAndAttemptID(ctx, run.ID, attempt.ID) + case selectedJob != nil && selectedJob.RunAttemptID == 0: + jobs, err = actions_model.GetRunJobsByRunAndAttemptID(ctx, run.ID, 0) + default: + jobs, err = actions_model.GetRunJobsByRunID(ctx, run.ID) + } if err != nil { - ctx.ServerError("GetRunJobsByRunID", err) - return nil, nil + ctx.ServerError("get current jobs", err) + return nil, nil, false, nil } if len(jobs) == 0 { ctx.NotFound(nil) - return nil, nil + return nil, nil, false, nil } for _, job := range jobs { job.Run = run } - return run, jobs + return run, attempt, isLatestAttempt, jobs +} + +// getArtifactAttemptByQuery resolves the artifact attempt from the request. +// It returns the explicitly requested attempt when the `attempt` query parameter is present, +// otherwise it falls back to the run's latest attempt for attempt-scoped artifact access. +func getArtifactAttemptByQuery(ctx *context_module.Context, run *actions_model.ActionRun) (*actions_model.RunAttempt, error) { + if ctx.FormString("attempt") != "" { + attemptNum := ctx.FormInt64("attempt") + if attemptNum > 0 { + attempt, err := actions_model.GetRunAttemptByRunIDAndAttemptNum(ctx, run.ID, attemptNum) + if err != nil { + return nil, err + } + return attempt, nil + } + return nil, util.ErrNotExist + } + + if run.LatestAttemptID == 0 { + return nil, nil //nolint:nilnil // return nil to indicate that artifact access should fall back to legacy run-scoped data + } + attempt, err := actions_model.GetRunAttemptByRepoAndID(ctx, run.RepoID, run.LatestAttemptID) + if err != nil { + return nil, err + } + return attempt, nil +} + +func getArtifactsByRunAndAttempt(ctx *context_module.Context, run *actions_model.ActionRun, attempt *actions_model.RunAttempt, artifactName string) ([]*actions_model.ActionArtifact, error) { + opts := actions_model.FindArtifactsOptions{ + RunID: run.ID, + ArtifactName: artifactName, + } + if attempt != nil { + opts.RunAttemptID = attempt.ID + } + return db.Find[actions_model.ActionArtifact](ctx, opts) } func ArtifactsDeleteView(ctx *context_module.Context) { @@ -812,8 +956,20 @@ func ArtifactsDeleteView(ctx *context_module.Context) { if ctx.Written() { return } + attempt, err := getArtifactAttemptByQuery(ctx, run) + if err != nil { + ctx.NotFoundOrServerError("GetRunAttemptByRunIDAndAttempt", func(err error) bool { + return errors.Is(err, util.ErrNotExist) + }, err) + return + } artifactName := ctx.PathParam("artifact_name") - if err := actions_model.SetArtifactNeedDelete(ctx, run.ID, artifactName); err != nil { + if attempt != nil { + err = actions_model.SetArtifactNeedDeleteByRunAttempt(ctx, run.ID, attempt.ID, artifactName) + } else { + err = actions_model.SetArtifactNeedDelete(ctx, run.ID, artifactName) + } + if err != nil { ctx.ServerError("SetArtifactNeedDelete", err) return } @@ -825,12 +981,16 @@ func ArtifactsDownloadView(ctx *context_module.Context) { if ctx.Written() { return } + attempt, err := getArtifactAttemptByQuery(ctx, run) + if err != nil { + ctx.NotFoundOrServerError("GetRunAttemptByRunIDAndAttempt", func(err error) bool { + return errors.Is(err, util.ErrNotExist) + }, err) + return + } artifactName := ctx.PathParam("artifact_name") - artifacts, err := db.Find[actions_model.ActionArtifact](ctx, actions_model.FindArtifactsOptions{ - RunID: run.ID, - ArtifactName: artifactName, - }) + artifacts, err := getArtifactsByRunAndAttempt(ctx, run, attempt, artifactName) if err != nil { ctx.ServerError("FindArtifacts", err) return @@ -929,8 +1089,10 @@ func ApproveAllChecks(ctx *context_module.Context) { return } - approveRuns(ctx, runIDs) - if ctx.Written() { + if err := actions_service.ApproveRuns(ctx, repo, ctx.Doer, runIDs); err != nil { + ctx.NotFoundOrServerError("ApproveRuns", func(err error) bool { + return errors.Is(err, util.ErrNotExist) + }, err) return } diff --git a/routers/web/web.go b/routers/web/web.go index f85c2f7501151..7d2c98dfabf87 100644 --- a/routers/web/web.go +++ b/routers/web/web.go @@ -1534,6 +1534,11 @@ func registerWebRoutes(m *web.Router, webAuth *AuthMiddleware) { m.Combo(""). Get(actions.View). Post(web.Bind(actions.ViewRequest{}), actions.ViewPost) + m.Group("/attempts/{attempt}", func() { + m.Combo(""). + Get(actions.View). + Post(web.Bind(actions.ViewRequest{}), actions.ViewPost) + }) m.Group("/jobs/{job}", func() { m.Combo(""). Get(actions.View). diff --git a/services/actions/approve.go b/services/actions/approve.go new file mode 100644 index 0000000000000..837fbffe77139 --- /dev/null +++ b/services/actions/approve.go @@ -0,0 +1,68 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + user_model "code.gitea.io/gitea/models/user" +) + +func ApproveRuns(ctx context.Context, repo *repo_model.Repository, doer *user_model.User, runIDs []int64) error { + updatedJobs := make([]*actions_model.ActionRunJob, 0) + cancelledConcurrencyJobs := make([]*actions_model.ActionRunJob, 0) + + err := db.WithTx(ctx, func(ctx context.Context) (err error) { + for _, runID := range runIDs { + run, err := actions_model.GetRunByRepoAndID(ctx, repo.ID, runID) + if err != nil { + return err + } + run.Repo = repo + run.NeedApproval = false + run.ApprovedBy = doer.ID + if err := actions_model.UpdateRun(ctx, run, "need_approval", "approved_by"); err != nil { + return err + } + jobs, err := actions_model.GetRunJobsByRunID(ctx, run.ID) // GetRunJobsByRunID returns the latest attempt's jobs here + if err != nil { + return err + } + for _, job := range jobs { + if len(job.Needs) > 0 { + continue + } + var jobsToCancel []*actions_model.ActionRunJob + job.Status, jobsToCancel, err = PrepareToStartJobWithConcurrency(ctx, job) + if err != nil { + return err + } + cancelledConcurrencyJobs = append(cancelledConcurrencyJobs, jobsToCancel...) + if job.Status == actions_model.StatusWaiting { + n, err := actions_model.UpdateRunJob(ctx, job, nil, "status") + if err != nil { + return err + } + if n > 0 { + updatedJobs = append(updatedJobs, job) + } + } + } + } + return nil + }) + if err != nil { + return err + } + + notifyWorkflowJobStatusUpdate(ctx, updatedJobs) + notifyWorkflowJobStatusUpdate(ctx, cancelledConcurrencyJobs) + + EmitJobsIfReadyByJobs(cancelledConcurrencyJobs) + + return nil +} diff --git a/services/actions/cleanup.go b/services/actions/cleanup.go index d0cc63e538872..25b4531759790 100644 --- a/services/actions/cleanup.go +++ b/services/actions/cleanup.go @@ -179,7 +179,7 @@ func DeleteRun(ctx context.Context, run *actions_model.ActionRun) error { repoID := run.RepoID - jobs, err := actions_model.GetRunJobsByRunID(ctx, run.ID) + jobs, err := actions_model.GetAllRunJobsByRunID(ctx, run.ID) if err != nil { return err } @@ -207,6 +207,10 @@ func DeleteRun(ctx context.Context, run *actions_model.ActionRun) error { RepoID: repoID, ID: run.ID, }) + recordsToDelete = append(recordsToDelete, &actions_model.RunAttempt{ + RepoID: repoID, + RunID: run.ID, + }) recordsToDelete = append(recordsToDelete, &actions_model.ActionRunJob{ RepoID: repoID, RunID: run.ID, diff --git a/services/actions/clear_tasks.go b/services/actions/clear_tasks.go index c71f63e7d17b7..8f1f4fa4a0f45 100644 --- a/services/actions/clear_tasks.go +++ b/services/actions/clear_tasks.go @@ -83,61 +83,59 @@ func shouldBlockJobByConcurrency(ctx context.Context, job *actions_model.ActionR return false, nil } - runs, jobs, err := actions_model.GetConcurrentRunsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning}) + attempts, jobs, err := actions_model.GetConcurrentRunAttemptsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning}) if err != nil { return false, fmt.Errorf("GetConcurrentRunsAndJobs: %w", err) } - return len(runs) > 0 || len(jobs) > 0, nil + return len(attempts) > 0 || len(jobs) > 0, nil } // PrepareToStartJobWithConcurrency prepares a job to start by its evaluated concurrency group and cancelling previous jobs if necessary. -// It returns the new status of the job (either StatusBlocked or StatusWaiting) and any error encountered during the process. -func PrepareToStartJobWithConcurrency(ctx context.Context, job *actions_model.ActionRunJob) (actions_model.Status, error) { +// It returns the new status of the job (either StatusBlocked or StatusWaiting), any cancelled jobs, and any error encountered during the process. +func PrepareToStartJobWithConcurrency(ctx context.Context, job *actions_model.ActionRunJob) (actions_model.Status, []*actions_model.ActionRunJob, error) { shouldBlock, err := shouldBlockJobByConcurrency(ctx, job) if err != nil { - return actions_model.StatusBlocked, err + return actions_model.StatusBlocked, nil, err } // even if the current job is blocked, we still need to cancel previous "waiting/blocked" jobs in the same concurrency group jobs, err := actions_model.CancelPreviousJobsByJobConcurrency(ctx, job) if err != nil { - return actions_model.StatusBlocked, fmt.Errorf("CancelPreviousJobsByJobConcurrency: %w", err) + return actions_model.StatusBlocked, nil, fmt.Errorf("CancelPreviousJobsByJobConcurrency: %w", err) } - notifyWorkflowJobStatusUpdate(ctx, jobs) - return util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting), nil + return util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting), jobs, nil } -func shouldBlockRunByConcurrency(ctx context.Context, actionRun *actions_model.ActionRun) (bool, error) { - if actionRun.ConcurrencyGroup == "" || actionRun.ConcurrencyCancel { +func shouldBlockRunByConcurrency(ctx context.Context, attempt *actions_model.RunAttempt) (bool, error) { + if attempt.ConcurrencyGroup == "" || attempt.ConcurrencyCancel { return false, nil } - runs, jobs, err := actions_model.GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning}) + attempts, jobs, err := actions_model.GetConcurrentRunAttemptsAndJobs(ctx, attempt.RepoID, attempt.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning}) if err != nil { return false, fmt.Errorf("find concurrent runs and jobs: %w", err) } - return len(runs) > 0 || len(jobs) > 0, nil + return len(attempts) > 0 || len(jobs) > 0, nil } -// PrepareToStartRunWithConcurrency prepares a run to start by its evaluated concurrency group and cancelling previous jobs if necessary. -// It returns the new status of the run (either StatusBlocked or StatusWaiting) and any error encountered during the process. -func PrepareToStartRunWithConcurrency(ctx context.Context, run *actions_model.ActionRun) (actions_model.Status, error) { - shouldBlock, err := shouldBlockRunByConcurrency(ctx, run) +// PrepareToStartRunWithConcurrency prepares a run attempt to start by its evaluated concurrency group and cancelling previous jobs if necessary. +// It returns the new status of the run attempt (either StatusBlocked or StatusWaiting), any cancelled jobs, and any error encountered during the process. +func PrepareToStartRunWithConcurrency(ctx context.Context, attempt *actions_model.RunAttempt) (actions_model.Status, []*actions_model.ActionRunJob, error) { + shouldBlock, err := shouldBlockRunByConcurrency(ctx, attempt) if err != nil { - return actions_model.StatusBlocked, err + return actions_model.StatusBlocked, nil, err } // even if the current run is blocked, we still need to cancel previous "waiting/blocked" jobs in the same concurrency group - jobs, err := actions_model.CancelPreviousJobsByRunConcurrency(ctx, run) + jobs, err := actions_model.CancelPreviousJobsByRunConcurrency(ctx, attempt) if err != nil { - return actions_model.StatusBlocked, fmt.Errorf("CancelPreviousJobsByRunConcurrency: %w", err) + return actions_model.StatusBlocked, nil, fmt.Errorf("CancelPreviousJobsByRunConcurrency: %w", err) } - notifyWorkflowJobStatusUpdate(ctx, jobs) - return util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting), nil + return util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting), jobs, nil } func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error { diff --git a/services/actions/concurrency.go b/services/actions/concurrency.go index 878e5c483bfee..8ddc99c8899b7 100644 --- a/services/actions/concurrency.go +++ b/services/actions/concurrency.go @@ -5,6 +5,7 @@ package actions import ( "context" + "errors" "fmt" actions_model "code.gitea.io/gitea/models/actions" @@ -16,16 +17,20 @@ import ( "go.yaml.in/yaml/v4" ) -// EvaluateRunConcurrencyFillModel evaluates the expressions in a run-level (workflow) concurrency, -// and fills the run's model fields with `concurrency.group` and `concurrency.cancel-in-progress`. +// EvaluateRunConcurrencyFillModel evaluates the expressions in a run-level (workflow) concurrency +// and fills the run attempt model with the evaluated `concurrency.group` and +// `concurrency.cancel-in-progress` values. // Workflow-level concurrency doesn't depend on the job outputs, so it can always be evaluated if there is no syntax error. // See https://docs.github.com/en/actions/reference/workflows-and-actions/workflow-syntax#concurrency -func EvaluateRunConcurrencyFillModel(ctx context.Context, run *actions_model.ActionRun, wfRawConcurrency *act_model.RawConcurrency, vars map[string]string, inputs map[string]any) error { +func EvaluateRunConcurrencyFillModel(ctx context.Context, run *actions_model.ActionRun, attempt *actions_model.RunAttempt, wfRawConcurrency *act_model.RawConcurrency, vars map[string]string, inputs map[string]any) error { + if attempt == nil { + return errors.New("run attempt is nil") + } if err := run.LoadAttributes(ctx); err != nil { return fmt.Errorf("run LoadAttributes: %w", err) } - actionsRunCtx := GenerateGiteaContext(run, nil) + actionsRunCtx := GenerateGiteaContext(ctx, run, attempt, nil) jobResults := map[string]*jobparser.JobResult{"": {}} if inputs == nil { var err error @@ -35,12 +40,8 @@ func EvaluateRunConcurrencyFillModel(ctx context.Context, run *actions_model.Act } } - rawConcurrency, err := yaml.Marshal(wfRawConcurrency) - if err != nil { - return fmt.Errorf("marshal raw concurrency: %w", err) - } - run.RawConcurrency = string(rawConcurrency) - run.ConcurrencyGroup, run.ConcurrencyCancel, err = jobparser.EvaluateConcurrency(wfRawConcurrency, "", nil, actionsRunCtx, jobResults, vars, inputs) + var err error + attempt.ConcurrencyGroup, attempt.ConcurrencyCancel, err = jobparser.EvaluateConcurrency(wfRawConcurrency, "", nil, actionsRunCtx, jobResults, vars, inputs) if err != nil { return fmt.Errorf("evaluate concurrency: %w", err) } @@ -81,7 +82,7 @@ func EvaluateJobConcurrencyFillModel(ctx context.Context, run *actions_model.Act return fmt.Errorf("unmarshal raw concurrency: %w", err) } - actionsJobCtx := GenerateGiteaContext(run, actionRunJob) + actionsJobCtx := GenerateGiteaContext(ctx, run, nil, actionRunJob) jobResults, err := findJobNeedsAndFillJobResults(ctx, actionRunJob) if err != nil { diff --git a/services/actions/context.go b/services/actions/context.go index 626ae6ee6bfbb..66d85850fc364 100644 --- a/services/actions/context.go +++ b/services/actions/context.go @@ -23,8 +23,12 @@ import ( type GiteaContext map[string]any // GenerateGiteaContext generate the gitea context without token and gitea_runtime_token -// job can be nil when generating a context for parsing workflow-level expressions -func GenerateGiteaContext(run *actions_model.ActionRun, job *actions_model.ActionRunJob) GiteaContext { +// attempt and job can be nil when generating a context for parsing workflow-level expressions. +// The run_attempt value is resolved with the following precedence: +// 1. job.Attempt when a job context is available +// 2. attempt.Attempt when an explicit attempt context is available +// 3. the run's latest attempt as a fallback +func GenerateGiteaContext(ctx context.Context, run *actions_model.ActionRun, attempt *actions_model.RunAttempt, job *actions_model.ActionRunJob) GiteaContext { event := map[string]any{} _ = json.Unmarshal([]byte(run.EventPayload), &event) @@ -73,7 +77,7 @@ func GenerateGiteaContext(run *actions_model.ActionRun, job *actions_model.Actio "repository_owner": run.Repo.OwnerName, // string, The repository owner's name. For example, Codertocat. "repositoryUrl": run.Repo.HTMLURL(), // string, The Git URL to the repository. For example, git://github.com/codertocat/hello-world.git. "retention_days": "", // string, The number of days that workflow run logs and artifacts are kept. - "run_id": "", // string, A unique number for each workflow run within a repository. This number does not change if you re-run the workflow run. + "run_id": strconv.FormatInt(run.ID, 10), // string, A unique number for each workflow run within a repository. This number does not change if you re-run the workflow run. "run_number": strconv.FormatInt(run.Index, 10), // string, A unique number for each run of a particular workflow in a repository. This number begins at 1 for the workflow's first run, and increments with each new run. This number does not change if you re-run the workflow run. "run_attempt": "", // string, A unique number for each attempt of a particular workflow run in a repository. This number begins at 1 for the workflow run's first attempt, and increments with each re-run. "secret_source": "Actions", // string, The source of a secret used in a workflow. Possible values are None, Actions, Dependabot, or Codespaces. @@ -89,8 +93,13 @@ func GenerateGiteaContext(run *actions_model.ActionRun, job *actions_model.Actio if job != nil { gitContext["job"] = job.JobID - gitContext["run_id"] = strconv.FormatInt(job.RunID, 10) gitContext["run_attempt"] = strconv.FormatInt(job.Attempt, 10) + } else if attempt != nil { + gitContext["run_attempt"] = strconv.FormatInt(attempt.Attempt, 10) + } else { + if attempt, has, err := run.GetLatestAttempt(ctx); err == nil && has { + gitContext["run_attempt"] = strconv.FormatInt(attempt.Attempt, 10) + } } return gitContext @@ -108,7 +117,12 @@ func FindTaskNeeds(ctx context.Context, job *actions_model.ActionRunJob) (map[st } needs := container.SetOf(job.Needs...) - jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: job.RunID}) + findOpts := actions_model.FindRunJobOptions{RunID: job.RunID} + if job.RunAttemptID > 0 { + findOpts = actions_model.FindRunJobOptions{RunAttemptID: job.RunAttemptID} + } + + jobs, err := db.Find[actions_model.ActionRunJob](ctx, findOpts) if err != nil { return nil, fmt.Errorf("FindRunJobs: %w", err) } @@ -125,11 +139,12 @@ func FindTaskNeeds(ctx context.Context, job *actions_model.ActionRunJob) (map[st } var jobOutputs map[string]string for _, job := range jobsWithSameID { - if job.TaskID == 0 || !job.Status.IsDone() { - // it shouldn't happen, or the job has been rerun + taskID := job.EffectiveTaskID() + if taskID == 0 || !job.Status.IsDone() { + // it shouldn't happen continue } - got, err := actions_model.FindTaskOutputByTaskID(ctx, job.TaskID) + got, err := actions_model.FindTaskOutputByTaskID(ctx, taskID) if err != nil { return nil, fmt.Errorf("FindTaskOutputByTaskID: %w", err) } diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 20a4f81eabb91..cb9bc7f85bc4a 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -70,26 +70,40 @@ func checkJobsByRunID(ctx context.Context, runID int64) error { if err != nil { return fmt.Errorf("get action run: %w", err) } - var jobs, updatedJobs []*actions_model.ActionRunJob + attempt, has, err := run.GetLatestAttempt(ctx) + if err != nil { + return fmt.Errorf("get latest run attempt: %w", err) + } + if !has { + // This should not happen for newly created or rerun runs because they always create a new attempt. + return fmt.Errorf("run %d has no latest attempt", run.ID) + } + var jobs, updatedJobs, cancelledJobs []*actions_model.ActionRunJob if err := db.WithTx(ctx, func(ctx context.Context) error { // check jobs of the current run - if js, ujs, err := checkJobsOfRun(ctx, run); err != nil { + if js, ujs, cjs, err := checkJobsOfCurrentRunAttempt(ctx, run, attempt); err != nil { return err } else { jobs = append(jobs, js...) updatedJobs = append(updatedJobs, ujs...) + cancelledJobs = append(cancelledJobs, cjs...) } - if js, ujs, err := checkRunConcurrency(ctx, run); err != nil { + if js, ujs, cjs, err := checkRunConcurrency(ctx, run); err != nil { return err } else { jobs = append(jobs, js...) updatedJobs = append(updatedJobs, ujs...) + cancelledJobs = append(cancelledJobs, cjs...) } return nil }); err != nil { return err } - CreateCommitStatusForRunJobs(ctx, run, jobs...) + notifyWorkflowJobStatusUpdate(ctx, cancelledJobs) + EmitJobsIfReadyByJobs(cancelledJobs) + if err := createCommitStatusesForJobsByRun(ctx, jobs); err != nil { + return err + } for _, job := range updatedJobs { _ = job.LoadAttributes(ctx) notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) @@ -120,99 +134,130 @@ func checkJobsByRunID(ctx context.Context, runID int64) error { return nil } -// findBlockedRunByConcurrency finds the blocked concurrent run in a repo and returns `nil, nil` when there is no blocked run. -func findBlockedRunByConcurrency(ctx context.Context, repoID int64, concurrencyGroup string) (*actions_model.ActionRun, error) { +func createCommitStatusesForJobsByRun(ctx context.Context, jobs []*actions_model.ActionRunJob) error { + runJobs := make(map[int64][]*actions_model.ActionRunJob) + for _, job := range jobs { + runJobs[job.RunID] = append(runJobs[job.RunID], job) + } + + for jobRunID, jobList := range runJobs { + run, err := actions_model.GetRunByRepoAndID(ctx, jobList[0].RepoID, jobRunID) + if err != nil { + return fmt.Errorf("get action run %d: %w", jobRunID, err) + } + CreateCommitStatusForRunJobs(ctx, run, jobList...) + } + return nil +} + +// findBlockedRunAttemptByConcurrency finds the blocked concurrent run attempt in a repo and returns `nil, nil` when there is no blocked run attempt. +func findBlockedRunAttemptByConcurrency(ctx context.Context, repoID int64, concurrencyGroup string) (*actions_model.RunAttempt, error) { if concurrencyGroup == "" { return nil, nil //nolint:nilnil // return nil to indicate that no blocked run exists } - cRuns, cJobs, err := actions_model.GetConcurrentRunsAndJobs(ctx, repoID, concurrencyGroup, []actions_model.Status{actions_model.StatusBlocked}) + cAttempts, cJobs, err := actions_model.GetConcurrentRunAttemptsAndJobs(ctx, repoID, concurrencyGroup, []actions_model.Status{actions_model.StatusBlocked}) if err != nil { return nil, fmt.Errorf("find concurrent runs and jobs: %w", err) } - // There can be at most one blocked run or job - var concurrentRun *actions_model.ActionRun - if len(cRuns) > 0 { - concurrentRun = cRuns[0] - } else if len(cJobs) > 0 { - jobRun, exist, err := db.GetByID[actions_model.ActionRun](ctx, cJobs[0].RunID) - if !exist { - return nil, fmt.Errorf("run %d does not exist", cJobs[0].RunID) - } + if len(cAttempts) > 0 { + return cAttempts[0], nil + } + if len(cJobs) > 0 { + attempt, err := actions_model.GetRunAttemptByRepoAndID(ctx, cJobs[0].RepoID, cJobs[0].RunAttemptID) if err != nil { - return nil, fmt.Errorf("get run by job %d: %w", cJobs[0].ID, err) + return nil, fmt.Errorf("get run attempt by job %d: %w", cJobs[0].ID, err) } - concurrentRun = jobRun + return attempt, nil } - return concurrentRun, nil + return nil, nil //nolint:nilnil // return nil to indicate that no blocked run attempt exists } -func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs []*actions_model.ActionRunJob, err error) { +func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs, cancelledJobs []*actions_model.ActionRunJob, err error) { checkedConcurrencyGroup := make(container.Set[string]) // check run (workflow-level) concurrency - if run.ConcurrencyGroup != "" { - concurrentRun, err := findBlockedRunByConcurrency(ctx, run.RepoID, run.ConcurrencyGroup) + runConcurrencyGroup, _, err := run.GetEffectiveConcurrency(ctx) + if err != nil { + return nil, nil, nil, fmt.Errorf("load run concurrency: %w", err) + } + if runConcurrencyGroup != "" { + concurrentAttempt, err := findBlockedRunAttemptByConcurrency(ctx, run.RepoID, runConcurrencyGroup) if err != nil { - return nil, nil, fmt.Errorf("find blocked run by concurrency: %w", err) + return nil, nil, nil, fmt.Errorf("find blocked run by concurrency: %w", err) } - if concurrentRun != nil && !concurrentRun.NeedApproval { - js, ujs, err := checkJobsOfRun(ctx, concurrentRun) + if concurrentAttempt != nil { + concurrentRun, err := actions_model.GetRunByRepoAndID(ctx, concurrentAttempt.RepoID, concurrentAttempt.RunID) if err != nil { - return nil, nil, err + return nil, nil, nil, fmt.Errorf("get run by attempt %d: %w", concurrentAttempt.ID, err) + } + if !concurrentRun.NeedApproval { + js, ujs, cjs, err := checkJobsOfCurrentRunAttempt(ctx, concurrentRun, concurrentAttempt) + if err != nil { + return nil, nil, nil, err + } + jobs = append(jobs, js...) + updatedJobs = append(updatedJobs, ujs...) + cancelledJobs = append(cancelledJobs, cjs...) } - jobs = append(jobs, js...) - updatedJobs = append(updatedJobs, ujs...) } - checkedConcurrencyGroup.Add(run.ConcurrencyGroup) + checkedConcurrencyGroup.Add(runConcurrencyGroup) } // check job concurrency runJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) if err != nil { - return nil, nil, fmt.Errorf("find run %d jobs: %w", run.ID, err) + return nil, nil, nil, fmt.Errorf("find run %d jobs: %w", run.ID, err) } for _, job := range runJobs { if !job.Status.IsDone() { continue } - if job.ConcurrencyGroup == "" && checkedConcurrencyGroup.Contains(job.ConcurrencyGroup) { + if job.ConcurrencyGroup == "" || checkedConcurrencyGroup.Contains(job.ConcurrencyGroup) { continue } - concurrentRun, err := findBlockedRunByConcurrency(ctx, job.RepoID, job.ConcurrencyGroup) + concurrentAttempt, err := findBlockedRunAttemptByConcurrency(ctx, job.RepoID, job.ConcurrencyGroup) if err != nil { - return nil, nil, fmt.Errorf("find blocked run by concurrency: %w", err) + return nil, nil, nil, fmt.Errorf("find blocked run by concurrency: %w", err) } - if concurrentRun != nil && !concurrentRun.NeedApproval { - js, ujs, err := checkJobsOfRun(ctx, concurrentRun) + if concurrentAttempt != nil { + concurrentRun, err := actions_model.GetRunByRepoAndID(ctx, concurrentAttempt.RepoID, concurrentAttempt.RunID) if err != nil { - return nil, nil, err + return nil, nil, nil, fmt.Errorf("get run by attempt %d: %w", concurrentAttempt.ID, err) + } + if !concurrentRun.NeedApproval { + js, ujs, cjs, err := checkJobsOfCurrentRunAttempt(ctx, concurrentRun, concurrentAttempt) + if err != nil { + return nil, nil, nil, err + } + jobs = append(jobs, js...) + updatedJobs = append(updatedJobs, ujs...) + cancelledJobs = append(cancelledJobs, cjs...) } - jobs = append(jobs, js...) - updatedJobs = append(updatedJobs, ujs...) } checkedConcurrencyGroup.Add(job.ConcurrencyGroup) } - return jobs, updatedJobs, nil + return jobs, updatedJobs, cancelledJobs, nil } -func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs []*actions_model.ActionRunJob, err error) { - jobs, err = db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) +func checkJobsOfCurrentRunAttempt(ctx context.Context, run *actions_model.ActionRun, attempt *actions_model.RunAttempt) (jobs, updatedJobs, cancelledJobs []*actions_model.ActionRunJob, err error) { + jobs, err = actions_model.GetRunJobsByRunAndAttemptID(ctx, run.ID, attempt.ID) if err != nil { - return nil, nil, err + return nil, nil, nil, err } vars, err := actions_model.GetVariablesOfRun(ctx, run) if err != nil { - return nil, nil, err + return nil, nil, nil, err } + resolver := newJobStatusResolver(jobs, vars) if err = db.WithTx(ctx, func(ctx context.Context) error { for _, job := range jobs { job.Run = run } - updates := newJobStatusResolver(jobs, vars).Resolve(ctx) + updates := resolver.Resolve(ctx) for _, job := range jobs { if status, ok := updates[job.ID]; ok { job.Status = status @@ -226,10 +271,10 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up } return nil }); err != nil { - return nil, nil, err + return nil, nil, nil, err } - return jobs, updatedJobs, nil + return jobs, updatedJobs, resolver.cancelledJobs, nil } func NotifyWorkflowRunStatusUpdateWithReload(ctx context.Context, job *actions_model.ActionRunJob) { @@ -242,10 +287,11 @@ func NotifyWorkflowRunStatusUpdateWithReload(ctx context.Context, job *actions_m } type jobStatusResolver struct { - statuses map[int64]actions_model.Status - needs map[int64][]int64 - jobMap map[int64]*actions_model.ActionRunJob - vars map[string]string + statuses map[int64]actions_model.Status + needs map[int64][]int64 + jobMap map[int64]*actions_model.ActionRunJob + vars map[string]string + cancelledJobs []*actions_model.ActionRunJob } func newJobStatusResolver(jobs actions_model.ActionJobList, vars map[string]string) *jobStatusResolver { @@ -344,9 +390,12 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model newStatus := util.Iif(shouldStartJob, actions_model.StatusWaiting, actions_model.StatusSkipped) if newStatus == actions_model.StatusWaiting { - newStatus, err = PrepareToStartJobWithConcurrency(ctx, actionRunJob) + var cancelledJobs []*actions_model.ActionRunJob + newStatus, cancelledJobs, err = PrepareToStartJobWithConcurrency(ctx, actionRunJob) if err != nil { log.Error("ShouldBlockJobByConcurrency failed, this job will stay blocked: job: %d, err: %v", id, err) + } else { + r.cancelledJobs = append(r.cancelledJobs, cancelledJobs...) } } diff --git a/services/actions/rerun.go b/services/actions/rerun.go index 1596d9bfc5a7f..0e386c9aece4e 100644 --- a/services/actions/rerun.go +++ b/services/actions/rerun.go @@ -5,213 +5,31 @@ package actions import ( "context" - "fmt" actions_model "code.gitea.io/gitea/models/actions" - "code.gitea.io/gitea/models/db" repo_model "code.gitea.io/gitea/models/repo" - "code.gitea.io/gitea/models/unit" - "code.gitea.io/gitea/modules/container" - "code.gitea.io/gitea/modules/util" - notify_service "code.gitea.io/gitea/services/notify" - - "github.com/nektos/act/pkg/model" - "go.yaml.in/yaml/v4" - "xorm.io/builder" ) -// GetFailedRerunJobs returns all failed jobs and their downstream dependent jobs that need to be rerun +// GetFailedRerunJobs returns the failed or cancelled jobs in a run. func GetFailedRerunJobs(allJobs []*actions_model.ActionRunJob) []*actions_model.ActionRunJob { - rerunJobIDSet := make(container.Set[int64]) var jobsToRerun []*actions_model.ActionRunJob for _, job := range allJobs { if job.Status == actions_model.StatusFailure || job.Status == actions_model.StatusCancelled { - for _, j := range GetAllRerunJobs(job, allJobs) { - if !rerunJobIDSet.Contains(j.ID) { - rerunJobIDSet.Add(j.ID) - jobsToRerun = append(jobsToRerun, j) - } - } + jobsToRerun = append(jobsToRerun, job) } } return jobsToRerun } -// GetAllRerunJobs returns the target job and all jobs that transitively depend on it. -// Downstream jobs are included regardless of their current status. -func GetAllRerunJobs(job *actions_model.ActionRunJob, allJobs []*actions_model.ActionRunJob) []*actions_model.ActionRunJob { - rerunJobs := []*actions_model.ActionRunJob{job} - rerunJobsIDSet := make(container.Set[string]) - rerunJobsIDSet.Add(job.JobID) - - for { - found := false - for _, j := range allJobs { - if rerunJobsIDSet.Contains(j.JobID) { - continue - } - for _, need := range j.Needs { - if rerunJobsIDSet.Contains(need) { - found = true - rerunJobs = append(rerunJobs, j) - rerunJobsIDSet.Add(j.JobID) - break - } - } - } - if !found { - break - } - } - - return rerunJobs -} - -// prepareRunRerun validates the run, resets its state, handles concurrency, persists the -// updated run, and fires a status-update notification. -// It returns isRunBlocked (true when the run itself is held by a concurrency group). -func prepareRunRerun(ctx context.Context, repo *repo_model.Repository, run *actions_model.ActionRun, jobs []*actions_model.ActionRunJob) (isRunBlocked bool, err error) { - if !run.Status.IsDone() { - return false, util.NewInvalidArgumentErrorf("this workflow run is not done") - } - - cfgUnit := repo.MustGetUnit(ctx, unit.TypeActions) - - // Rerun is not allowed when workflow is disabled. - cfg := cfgUnit.ActionsConfig() - if cfg.IsWorkflowDisabled(run.WorkflowID) { - return false, util.NewInvalidArgumentErrorf("workflow %s is disabled", run.WorkflowID) - } - - // Reset run's timestamps and status. - run.PreviousDuration = run.Duration() - run.Started = 0 - run.Stopped = 0 - run.Status = actions_model.StatusWaiting - - vars, err := actions_model.GetVariablesOfRun(ctx, run) - if err != nil { - return false, fmt.Errorf("get run %d variables: %w", run.ID, err) - } - - if run.RawConcurrency != "" { - var rawConcurrency model.RawConcurrency - if err := yaml.Unmarshal([]byte(run.RawConcurrency), &rawConcurrency); err != nil { - return false, fmt.Errorf("unmarshal raw concurrency: %w", err) - } - - if err := EvaluateRunConcurrencyFillModel(ctx, run, &rawConcurrency, vars, nil); err != nil { - return false, err - } - - run.Status, err = PrepareToStartRunWithConcurrency(ctx, run) - if err != nil { - return false, err - } - } - - if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status", "concurrency_group", "concurrency_cancel"); err != nil { - return false, err - } - - if err := run.LoadAttributes(ctx); err != nil { - return false, err - } - - for _, job := range jobs { - job.Run = run - } - - notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) - - return run.Status == actions_model.StatusBlocked, nil -} - // RerunWorkflowRunJobs reruns the given jobs of a workflow run. -// jobsToRerun must include all jobs to be rerun (the target job and its transitively dependent jobs). -// A job is blocked (waiting for dependencies) if the run itself is blocked or if any of its -// needs are also being rerun. +// An empty jobsToRerun means rerunning the whole run. Otherwise jobsToRerun contains only the user-requested target jobs; +// downstream dependent jobs are expanded internally while building the rerun plan. func RerunWorkflowRunJobs(ctx context.Context, repo *repo_model.Repository, run *actions_model.ActionRun, jobsToRerun []*actions_model.ActionRunJob) error { - if len(jobsToRerun) == 0 { - return nil - } - - isRunBlocked, err := prepareRunRerun(ctx, repo, run, jobsToRerun) - if err != nil { - return err - } - - rerunJobIDs := make(container.Set[string]) - for _, j := range jobsToRerun { - rerunJobIDs.Add(j.JobID) - } - - for _, job := range jobsToRerun { - shouldBlockJob := isRunBlocked - if !shouldBlockJob { - for _, need := range job.Needs { - if rerunJobIDs.Contains(need) { - shouldBlockJob = true - break - } - } - } - if err := rerunWorkflowJob(ctx, job, shouldBlockJob); err != nil { - return err - } - } - - return nil -} - -func rerunWorkflowJob(ctx context.Context, job *actions_model.ActionRunJob, shouldBlock bool) error { - status := job.Status - if !status.IsDone() { - return nil - } - - job.TaskID = 0 - job.Status = util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting) - job.Started = 0 - job.Stopped = 0 - job.ConcurrencyGroup = "" - job.ConcurrencyCancel = false - job.IsConcurrencyEvaluated = false - - if err := job.LoadRun(ctx); err != nil { - return err - } - if err := job.Run.LoadAttributes(ctx); err != nil { - return err - } - - vars, err := actions_model.GetVariablesOfRun(ctx, job.Run) + plan, err := buildRerunPlan(ctx, repo, run, jobsToRerun) if err != nil { - return fmt.Errorf("get run %d variables: %w", job.Run.ID, err) - } - - if job.RawConcurrency != "" && !shouldBlock { - if err := EvaluateJobConcurrencyFillModel(ctx, job.Run, job, vars, nil); err != nil { - return fmt.Errorf("evaluate job concurrency: %w", err) - } - - job.Status, err = PrepareToStartJobWithConcurrency(ctx, job) - if err != nil { - return err - } - } - - if err := db.WithTx(ctx, func(ctx context.Context) error { - updateCols := []string{"task_id", "status", "started", "stopped", "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"} - _, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, updateCols...) - return err - }); err != nil { return err } - - CreateCommitStatusForRunJobs(ctx, job.Run, job) - notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) - return nil + return execRerunPlan(ctx, plan) } diff --git a/services/actions/rerun_plan.go b/services/actions/rerun_plan.go new file mode 100644 index 0000000000000..a48f431e4a9a0 --- /dev/null +++ b/services/actions/rerun_plan.go @@ -0,0 +1,296 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + "slices" + "time" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/models/unit" + "code.gitea.io/gitea/modules/container" + "code.gitea.io/gitea/modules/util" + notify_service "code.gitea.io/gitea/services/notify" + + "github.com/nektos/act/pkg/model" + "go.yaml.in/yaml/v4" +) + +type rerunPlan struct { + run *actions_model.ActionRun + templateJobs actions_model.ActionJobList + rerunJobIDs container.Set[string] + vars map[string]string + previousDuration time.Duration + newAttempt *actions_model.RunAttempt +} + +func buildRerunPlan(ctx context.Context, repo *repo_model.Repository, run *actions_model.ActionRun, jobsToRerun []*actions_model.ActionRunJob) (*rerunPlan, error) { + if !run.Status.IsDone() { + return nil, util.NewInvalidArgumentErrorf("this workflow run is not done") + } + if run.RepoID != repo.ID { + return nil, util.NewInvalidArgumentErrorf("run %d does not belong to repo %d", run.ID, repo.ID) + } + for _, job := range jobsToRerun { + if job.RunID != run.ID { + return nil, util.NewInvalidArgumentErrorf("job %d does not belong to workflow run %d", job.ID, run.ID) + } + } + + if err := run.LoadAttributes(ctx); err != nil { + return nil, err + } + + cfgUnit := repo.MustGetUnit(ctx, unit.TypeActions) + cfg := cfgUnit.ActionsConfig() + if cfg.IsWorkflowDisabled(run.WorkflowID) { + return nil, util.NewInvalidArgumentErrorf("workflow %s is disabled", run.WorkflowID) + } + + templateAttempt, hasTemplateAttempt, err := run.GetLatestAttempt(ctx) + if err != nil { + return nil, err + } + + var templateJobs actions_model.ActionJobList + if hasTemplateAttempt { + templateJobs, err = actions_model.GetRunJobsByRunAndAttemptID(ctx, run.ID, templateAttempt.ID) + } else { + templateJobs, err = actions_model.GetRunJobsByRunID(ctx, run.ID) + } + if err != nil { + return nil, fmt.Errorf("load template jobs: %w", err) + } + + plan := &rerunPlan{ + run: run, + templateJobs: templateJobs, + } + if len(templateJobs) == 0 { + return plan, nil + } + + if err := plan.expandRerunJobIDs(jobsToRerun); err != nil { + return nil, err + } + + plan.vars, err = actions_model.GetVariablesOfRun(ctx, run) + if err != nil { + return nil, fmt.Errorf("get run %d variables: %w", run.ID, err) + } + + plan.previousDuration = run.Duration() + if hasTemplateAttempt { + plan.previousDuration = templateAttempt.Duration() + } + + attemptNum := int64(1) + if hasTemplateAttempt { + attemptNum = templateAttempt.Attempt + 1 + } + plan.newAttempt = &actions_model.RunAttempt{ + RepoID: run.RepoID, + RunID: run.ID, + Attempt: attemptNum, + TriggerUserID: run.TriggerUserID, + Status: actions_model.StatusWaiting, + } + + if run.RawConcurrency != "" { + var rawConcurrency model.RawConcurrency + if err := yaml.Unmarshal([]byte(run.RawConcurrency), &rawConcurrency); err != nil { + return nil, fmt.Errorf("unmarshal raw concurrency: %w", err) + } + if err := EvaluateRunConcurrencyFillModel(ctx, run, plan.newAttempt, &rawConcurrency, plan.vars, nil); err != nil { + return nil, err + } + } + + return plan, nil +} + +func execRerunPlan(ctx context.Context, plan *rerunPlan) error { + if len(plan.templateJobs) == 0 { + return nil + } + + var newJobs actions_model.ActionJobList + var cancelledConcurrencyJobs []*actions_model.ActionRunJob + + err := db.WithTx(ctx, func(ctx context.Context) error { + newAttemptStatus, jobsToCancel, err := PrepareToStartRunWithConcurrency(ctx, plan.newAttempt) + if err != nil { + return err + } + cancelledConcurrencyJobs = append(cancelledConcurrencyJobs, jobsToCancel...) + plan.newAttempt.Status = newAttemptStatus + shouldBlock := newAttemptStatus == actions_model.StatusBlocked + + if err := db.Insert(ctx, plan.newAttempt); err != nil { + return err + } + + hasWaitingJobs := false + newJobs = make(actions_model.ActionJobList, 0, len(plan.templateJobs)) + for _, templateJob := range plan.templateJobs { + newJob := cloneRunJobForAttempt(templateJob, plan.newAttempt) + if plan.rerunJobIDs.Contains(templateJob.JobID) { + shouldBlockJob := shouldBlock || plan.hasRerunDependency(templateJob) + + newJob.Status = util.Iif(shouldBlockJob, actions_model.StatusBlocked, actions_model.StatusWaiting) + newJob.TaskID = 0 + newJob.SourceTaskID = 0 + newJob.Started = 0 + newJob.Stopped = 0 + newJob.ConcurrencyGroup = "" + newJob.ConcurrencyCancel = false + newJob.IsConcurrencyEvaluated = false + + if newJob.RawConcurrency != "" && !shouldBlockJob { + if err := EvaluateJobConcurrencyFillModel(ctx, plan.run, newJob, plan.vars, nil); err != nil { + return fmt.Errorf("evaluate job concurrency: %w", err) + } + newJob.Status, jobsToCancel, err = PrepareToStartJobWithConcurrency(ctx, newJob) + if err != nil { + return fmt.Errorf("prepare to start job with concurrency: %w", err) + } + cancelledConcurrencyJobs = append(cancelledConcurrencyJobs, jobsToCancel...) + } + } else { + newJob.TaskID = 0 + newJob.SourceTaskID = templateJob.EffectiveTaskID() + newJob.Started = templateJob.Started + newJob.Stopped = templateJob.Stopped + } + + if err := db.Insert(ctx, newJob); err != nil { + return err + } + hasWaitingJobs = hasWaitingJobs || newJob.Status == actions_model.StatusWaiting + newJobs = append(newJobs, newJob) + } + + plan.newAttempt.Status = actions_model.AggregateJobStatus(newJobs) + if err := actions_model.UpdateRunAttempt(ctx, plan.newAttempt, "status"); err != nil { + return err + } + + plan.run.PreviousDuration = plan.previousDuration + plan.run.Started = 0 + plan.run.Stopped = 0 + plan.run.Status = plan.newAttempt.Status + plan.run.LatestAttemptID = plan.newAttempt.ID + if err := actions_model.UpdateRun(ctx, plan.run, "previous_duration", "started", "stopped", "status", "latest_attempt_id"); err != nil { + return err + } + + if hasWaitingJobs { + if err := actions_model.IncreaseTaskVersion(ctx, plan.run.OwnerID, plan.run.RepoID); err != nil { + return err + } + } + + return nil + }) + if err != nil { + return err + } + + if err := plan.run.LoadAttributes(ctx); err != nil { + return err + } + for _, job := range newJobs { + job.Run = plan.run + } + + notifyWorkflowJobStatusUpdate(ctx, cancelledConcurrencyJobs) + EmitJobsIfReadyByJobs(cancelledConcurrencyJobs) + + CreateCommitStatusForRunJobs(ctx, plan.run, newJobs...) + notify_service.WorkflowRunStatusUpdate(ctx, plan.run.Repo, plan.run.TriggerUser, plan.run) + for _, job := range newJobs { + notify_service.WorkflowJobStatusUpdate(ctx, plan.run.Repo, plan.run.TriggerUser, job, nil) + } + + return nil +} + +func (p *rerunPlan) expandRerunJobIDs(jobsToRerun []*actions_model.ActionRunJob) error { + templateJobIDs := make(container.Set[string]) + for _, job := range p.templateJobs { + templateJobIDs.Add(job.JobID) + } + + if len(jobsToRerun) == 0 { + p.rerunJobIDs = templateJobIDs + return nil + } + + rerunJobIDs := make(container.Set[string]) + for _, job := range jobsToRerun { + if !templateJobIDs.Contains(job.JobID) { + return util.NewInvalidArgumentErrorf("job %q does not exist in the latest attempt", job.JobID) + } + rerunJobIDs.Add(job.JobID) + } + + for { + found := false + for _, job := range p.templateJobs { + if rerunJobIDs.Contains(job.JobID) { + continue + } + for _, need := range job.Needs { + if rerunJobIDs.Contains(need) { + found = true + rerunJobIDs.Add(job.JobID) + break + } + } + } + if !found { + break + } + } + + p.rerunJobIDs = rerunJobIDs + return nil +} + +func (p *rerunPlan) hasRerunDependency(job *actions_model.ActionRunJob) bool { + for _, need := range job.Needs { + if p.rerunJobIDs.Contains(need) { + return true + } + } + return false +} + +func cloneRunJobForAttempt(templateJob *actions_model.ActionRunJob, attempt *actions_model.RunAttempt) *actions_model.ActionRunJob { + return &actions_model.ActionRunJob{ + RunID: templateJob.RunID, + RunAttemptID: attempt.ID, + RepoID: templateJob.RepoID, + OwnerID: templateJob.OwnerID, + CommitSHA: templateJob.CommitSHA, + IsForkPullRequest: templateJob.IsForkPullRequest, + Name: templateJob.Name, + Attempt: attempt.Attempt, + WorkflowPayload: slices.Clone(templateJob.WorkflowPayload), + JobID: templateJob.JobID, + Needs: slices.Clone(templateJob.Needs), + RunsOn: slices.Clone(templateJob.RunsOn), + Status: templateJob.Status, + RawConcurrency: templateJob.RawConcurrency, + IsConcurrencyEvaluated: templateJob.IsConcurrencyEvaluated, + ConcurrencyGroup: templateJob.ConcurrencyGroup, + ConcurrencyCancel: templateJob.ConcurrencyCancel, + TokenPermissions: templateJob.TokenPermissions, + } +} diff --git a/services/actions/rerun_test.go b/services/actions/rerun_test.go index 3b4dc5483f424..24642a7d62156 100644 --- a/services/actions/rerun_test.go +++ b/services/actions/rerun_test.go @@ -14,44 +14,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestGetAllRerunJobs(t *testing.T) { - job1 := &actions_model.ActionRunJob{JobID: "job1"} - job2 := &actions_model.ActionRunJob{JobID: "job2", Needs: []string{"job1"}} - job3 := &actions_model.ActionRunJob{JobID: "job3", Needs: []string{"job2"}} - job4 := &actions_model.ActionRunJob{JobID: "job4", Needs: []string{"job2", "job3"}} - - jobs := []*actions_model.ActionRunJob{job1, job2, job3, job4} - - testCases := []struct { - job *actions_model.ActionRunJob - rerunJobs []*actions_model.ActionRunJob - }{ - { - job1, - []*actions_model.ActionRunJob{job1, job2, job3, job4}, - }, - { - job2, - []*actions_model.ActionRunJob{job2, job3, job4}, - }, - { - job3, - []*actions_model.ActionRunJob{job3, job4}, - }, - { - job4, - []*actions_model.ActionRunJob{job4}, - }, - } - - for _, tc := range testCases { - rerunJobs := GetAllRerunJobs(tc.job, jobs) - assert.ElementsMatch(t, tc.rerunJobs, rerunJobs) - } -} - func TestGetFailedRerunJobs(t *testing.T) { - // IDs must be non-zero to distinguish jobs in the dedup set. makeJob := func(id int64, jobID string, status actions_model.Status, needs ...string) *actions_model.ActionRunJob { return &actions_model.ActionRunJob{ID: id, JobID: jobID, Status: status, Needs: needs} } @@ -73,8 +36,7 @@ func TestGetFailedRerunJobs(t *testing.T) { assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1}, result) }) - t.Run("failed job pulls in downstream dependents", func(t *testing.T) { - // job1 failed; job2 depends on job1 (skipped); job3 depends on job2 (skipped) + t.Run("failed job does not pull in downstream dependents", func(t *testing.T) { job1 := makeJob(1, "job1", actions_model.StatusFailure) job2 := makeJob(2, "job2", actions_model.StatusSkipped, "job1") job3 := makeJob(3, "job3", actions_model.StatusSkipped, "job2") @@ -82,12 +44,10 @@ func TestGetFailedRerunJobs(t *testing.T) { jobs := []*actions_model.ActionRunJob{job1, job2, job3, job4} result := GetFailedRerunJobs(jobs) - assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1, job2, job3}, result) + assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1}, result) }) - t.Run("multiple independent failed jobs each pull in their own dependents", func(t *testing.T) { - // job1 failed -> job3 depends on job1 - // job2 failed -> job4 depends on job2 + t.Run("multiple failed jobs are returned directly", func(t *testing.T) { job1 := makeJob(1, "job1", actions_model.StatusFailure) job2 := makeJob(2, "job2", actions_model.StatusFailure) job3 := makeJob(3, "job3", actions_model.StatusSkipped, "job1") @@ -95,30 +55,27 @@ func TestGetFailedRerunJobs(t *testing.T) { jobs := []*actions_model.ActionRunJob{job1, job2, job3, job4} result := GetFailedRerunJobs(jobs) - assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1, job2, job3, job4}, result) + assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1, job2}, result) }) - t.Run("shared downstream dependent is not duplicated", func(t *testing.T) { - // job1 and job2 both failed; job3 depends on both + t.Run("shared downstream dependent is not included", func(t *testing.T) { job1 := makeJob(1, "job1", actions_model.StatusFailure) job2 := makeJob(2, "job2", actions_model.StatusFailure) job3 := makeJob(3, "job3", actions_model.StatusSkipped, "job1", "job2") jobs := []*actions_model.ActionRunJob{job1, job2, job3} result := GetFailedRerunJobs(jobs) - assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1, job2, job3}, result) - assert.Len(t, result, 3) // job3 must appear exactly once + assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1, job2}, result) + assert.Len(t, result, 2) }) - t.Run("successful downstream job of a failed job is still included", func(t *testing.T) { - // job1 failed; job2 succeeded but depends on job1 — downstream is always rerun - // regardless of its own status (GetAllRerunJobs includes all transitive dependents) + t.Run("successful downstream job of a failed job is not included", func(t *testing.T) { job1 := makeJob(1, "job1", actions_model.StatusFailure) job2 := makeJob(2, "job2", actions_model.StatusSuccess, "job1") jobs := []*actions_model.ActionRunJob{job1, job2} result := GetFailedRerunJobs(jobs) - assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1, job2}, result) + assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1}, result) }) } diff --git a/services/actions/run.go b/services/actions/run.go index e9fcdcaf43d60..6a48fecb300d9 100644 --- a/services/actions/run.go +++ b/services/actions/run.go @@ -13,6 +13,7 @@ import ( "code.gitea.io/gitea/modules/util" notify_service "code.gitea.io/gitea/services/notify" + act_model "github.com/nektos/act/pkg/model" "go.yaml.in/yaml/v4" ) @@ -35,13 +36,14 @@ func PrepareRunAndInsert(ctx context.Context, content []byte, run *actions_model } if wfRawConcurrency != nil { - err = EvaluateRunConcurrencyFillModel(ctx, run, wfRawConcurrency, vars, inputsWithDefaults) + rawConcurrency, err := yaml.Marshal(wfRawConcurrency) if err != nil { - return fmt.Errorf("EvaluateRunConcurrencyFillModel: %w", err) + return fmt.Errorf("marshal raw concurrency: %w", err) } + run.RawConcurrency = string(rawConcurrency) } - giteaCtx := GenerateGiteaContext(run, nil) + giteaCtx := GenerateGiteaContext(ctx, run, nil, nil) jobs, err := jobparser.Parse(content, jobparser.WithVars(vars), jobparser.WithGitContext(giteaCtx.ToGitHubContext()), jobparser.WithInputs(inputsWithDefaults)) if err != nil { @@ -52,7 +54,7 @@ func PrepareRunAndInsert(ctx context.Context, content []byte, run *actions_model run.Title = jobs[0].RunName } - if err = InsertRun(ctx, run, jobs, vars, inputsWithDefaults); err != nil { + if err = InsertRun(ctx, run, jobs, vars, inputsWithDefaults, wfRawConcurrency); err != nil { return fmt.Errorf("InsertRun: %w", err) } @@ -74,8 +76,9 @@ func PrepareRunAndInsert(ctx context.Context, content []byte, run *actions_model // InsertRun inserts a run // The title will be cut off at 255 characters if it's longer than 255 characters. -func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobparser.SingleWorkflow, vars map[string]string, inputs map[string]any) error { - return db.WithTx(ctx, func(ctx context.Context) error { +func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobparser.SingleWorkflow, vars map[string]string, inputs map[string]any, wfRawConcurrency *act_model.RawConcurrency) error { + var cancelledConcurrencyJobs []*actions_model.ActionRunJob + err := db.WithTx(ctx, func(ctx context.Context) error { index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID) if err != nil { return err @@ -83,16 +86,45 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar run.Index = index run.Title = util.EllipsisDisplayString(run.Title, 255) + runAttempt := &actions_model.RunAttempt{ + RepoID: run.RepoID, + RunID: run.ID, + Attempt: 1, + TriggerUserID: run.TriggerUserID, + Status: actions_model.StatusWaiting, + Started: 0, + RunStartedAt: 0, + Stopped: 0, + } + var jobsToCancel []*actions_model.ActionRunJob + if wfRawConcurrency != nil { + if err := EvaluateRunConcurrencyFillModel(ctx, run, runAttempt, wfRawConcurrency, vars, inputs); err != nil { + return fmt.Errorf("EvaluateRunConcurrencyFillModel: %w", err) + } + } + // check run (workflow-level) concurrency - run.Status, err = PrepareToStartRunWithConcurrency(ctx, run) + runAttempt.Status, jobsToCancel, err = PrepareToStartRunWithConcurrency(ctx, runAttempt) if err != nil { return err } + cancelledConcurrencyJobs = append(cancelledConcurrencyJobs, jobsToCancel...) + run.Status = runAttempt.Status if err := db.Insert(ctx, run); err != nil { return err } + runAttempt.RunID = run.ID + if err := db.Insert(ctx, runAttempt); err != nil { + return err + } + + run.LatestAttemptID = runAttempt.ID + if err := actions_model.UpdateRun(ctx, run, "latest_attempt_id"); err != nil { + return err + } + if err := run.LoadRepo(ctx); err != nil { return err } @@ -117,11 +149,13 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar job.Name = util.EllipsisDisplayString(job.Name, 255) runJob := &actions_model.ActionRunJob{ RunID: run.ID, + RunAttemptID: runAttempt.ID, RepoID: run.RepoID, OwnerID: run.OwnerID, CommitSHA: run.CommitSHA, IsForkPullRequest: run.IsForkPullRequest, Name: job.Name, + Attempt: runAttempt.Attempt, WorkflowPayload: payload, JobID: id, Needs: needs, @@ -152,10 +186,11 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar // If a job needs other jobs ("needs" is not empty), its status is set to StatusBlocked at the entry of the loop // No need to check job concurrency for a blocked job (it will be checked by job emitter later) if runJob.Status == actions_model.StatusWaiting { - runJob.Status, err = PrepareToStartJobWithConcurrency(ctx, runJob) + runJob.Status, jobsToCancel, err = PrepareToStartJobWithConcurrency(ctx, runJob) if err != nil { return fmt.Errorf("prepare to start job with concurrency: %w", err) } + cancelledConcurrencyJobs = append(cancelledConcurrencyJobs, jobsToCancel...) } } @@ -167,10 +202,11 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar runJobs = append(runJobs, runJob) } - run.Status = actions_model.AggregateJobStatus(runJobs) - if err := actions_model.UpdateRun(ctx, run, "status"); err != nil { + runAttempt.Status = actions_model.AggregateJobStatus(runJobs) + if err := actions_model.UpdateRunAttempt(ctx, runAttempt, "status"); err != nil { return err } + run.Status = runAttempt.Status // if there is a job in the waiting status, increase tasks version. if hasWaitingJobs { @@ -181,4 +217,12 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar return nil }) + if err != nil { + return err + } + + notifyWorkflowJobStatusUpdate(ctx, cancelledConcurrencyJobs) + EmitJobsIfReadyByJobs(cancelledConcurrencyJobs) + + return nil } diff --git a/services/actions/task.go b/services/actions/task.go index 2cb10b6cd8f49..383c92c38912a 100644 --- a/services/actions/task.go +++ b/services/actions/task.go @@ -78,7 +78,7 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv return fmt.Errorf("findTaskNeeds: %w", err) } - taskContext, err := generateTaskContext(t) + taskContext, err := generateTaskContext(ctx, t) if err != nil { return fmt.Errorf("generateTaskContext: %w", err) } @@ -112,13 +112,13 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv return task, true, nil } -func generateTaskContext(t *actions_model.ActionTask) (*structpb.Struct, error) { +func generateTaskContext(ctx context.Context, t *actions_model.ActionTask) (*structpb.Struct, error) { giteaRuntimeToken, err := CreateAuthorizationToken(t.ID, t.Job.RunID, t.JobID) if err != nil { return nil, err } - gitCtx := GenerateGiteaContext(t.Job.Run, t.Job) + gitCtx := GenerateGiteaContext(ctx, t.Job.Run, nil, t.Job) gitCtx["token"] = t.Token gitCtx["gitea_runtime_token"] = giteaRuntimeToken diff --git a/services/convert/convert.go b/services/convert/convert.go index 71d2ecb33339f..a4b5257605a66 100644 --- a/services/convert/convert.go +++ b/services/convert/convert.go @@ -252,12 +252,19 @@ func ToActionWorkflowRun(ctx context.Context, repo *repo_model.Repository, run * if err != nil { return nil, err } + var runAttempt int64 + if attempt, has, err := run.GetLatestAttempt(ctx); err != nil { + return nil, err + } else if has { + runAttempt = attempt.Attempt + } status, conclusion := ToActionsStatus(run.Status) return &api.ActionWorkflowRun{ ID: run.ID, URL: fmt.Sprintf("%s/actions/runs/%d", repo.APIURL(), run.ID), HTMLURL: run.HTMLURL(), RunNumber: run.Index, + RunAttempt: runAttempt, StartedAt: run.Started.AsLocalTime(), CompletedAt: run.Stopped.AsLocalTime(), Event: string(run.Event), @@ -329,9 +336,9 @@ func ToActionWorkflowJob(ctx context.Context, repo *repo_model.Repository, task var runnerName string var steps []*api.ActionWorkflowStep - if job.TaskID != 0 { + if effectiveTaskID := job.EffectiveTaskID(); effectiveTaskID != 0 { if task == nil { - task, _, err = db.GetByID[actions_model.ActionTask](ctx, job.TaskID) + task, _, err = db.GetByID[actions_model.ActionTask](ctx, effectiveTaskID) if err != nil { return nil, err } diff --git a/templates/repo/actions/view.tmpl b/templates/repo/actions/view.tmpl index 1eb84a9b93773..c98e02f8d77d4 100644 --- a/templates/repo/actions/view.tmpl +++ b/templates/repo/actions/view.tmpl @@ -3,9 +3,8 @@
{{template "repo/header" .}} {{template "repo/actions/view_component" (dict - "RunID" .RunID "JobID" .JobID - "ActionsURL" .ActionsURL + "ViewURL" .ViewURL )}}
diff --git a/templates/repo/actions/view_component.tmpl b/templates/repo/actions/view_component.tmpl index 405e9cfb4b111..246934ed9aa13 100644 --- a/templates/repo/actions/view_component.tmpl +++ b/templates/repo/actions/view_component.tmpl @@ -1,13 +1,14 @@ -
; }>(); const store = props.store; @@ -270,8 +269,7 @@ async function fetchJobData(abortController: AbortController): Promise // for example: make cursor=null means the first time to fetch logs, cursor=eof means no more logs, etc return {step: idx, cursor: it.cursor, expanded: it.expanded}; }); - const url = `${props.actionsUrl}/runs/${props.runId}/jobs/${props.jobId}`; - const resp = await POST(url, { + const resp = await POST(props.viewUrl, { signal: abortController.signal, data: {logCursors}, }); diff --git a/web_src/js/components/ActionRunView.ts b/web_src/js/components/ActionRunView.ts index 133b7263ebacf..56702b838c042 100644 --- a/web_src/js/components/ActionRunView.ts +++ b/web_src/js/components/ActionRunView.ts @@ -91,6 +91,7 @@ export function createEmptyActionsRun(): ActionsRun { return { repoId: 0, link: '', + viewLink: '', title: '', titleHTML: '', status: '' as ActionsRunStatus, // do not show the status before initialized, otherwise it would show an incorrect "error" icon @@ -103,6 +104,10 @@ export function createEmptyActionsRun(): ActionsRun { workflowID: '', workflowLink: '', isSchedule: false, + runAttempt: 0, + isLatestAttempt: true, + readOnlyAttemptView: false, + attempts: [], duration: '', triggeredAt: 0, triggerEvent: '', @@ -125,7 +130,7 @@ export function createEmptyActionsRun(): ActionsRun { }; } -export function createActionRunViewStore(actionsUrl: string, runId: number) { +export function createActionRunViewStore(viewUrl: string) { let loadingAbortController: AbortController | null = null; let intervalID: IntervalId | null = null; const viewData = reactive({ @@ -137,8 +142,7 @@ export function createActionRunViewStore(actionsUrl: string, runId: number) { const abortController = new AbortController(); loadingAbortController = abortController; try { - const url = `${actionsUrl}/runs/${runId}`; - const resp = await POST(url, {signal: abortController.signal, data: {}}); + const resp = await POST(viewUrl, {signal: abortController.signal, data: {}}); const runResp = await resp.json(); if (loadingAbortController !== abortController) return; diff --git a/web_src/js/components/RepoActionView.vue b/web_src/js/components/RepoActionView.vue index ee8b4880029a9..ae063a839c702 100644 --- a/web_src/js/components/RepoActionView.vue +++ b/web_src/js/components/RepoActionView.vue @@ -12,16 +12,23 @@ defineOptions({ }); const props = defineProps<{ - runId: number; jobId: number; - actionsUrl: string; + viewUrl: string; locale: Record; }>(); const locale = props.locale; -const store = createActionRunViewStore(props.actionsUrl, props.runId); +const store = createActionRunViewStore(props.viewUrl); const {currentRun: run , runArtifacts: artifacts} = toRefs(store.viewData); +function formatAttemptTitle(attempt: {attempt: number; latest: boolean}) { + return attempt.latest ? `${locale.latestAttempt} #${attempt.attempt}` : `${locale.attempt} #${attempt.attempt}`; +} + +function getArtifactActionSuffix() { + return !run.value.isLatestAttempt && run.value.runAttempt > 0 ? `?attempt=${run.value.runAttempt}` : ''; +} + function cancelRun() { POST(`${run.value.link}/cancel`); } @@ -32,7 +39,7 @@ function approveRun() { async function deleteArtifact(name: string) { if (!window.confirm(locale.confirmDeleteArtifact.replace('%s', name))) return; - await DELETE(`${run.value.link}/artifacts/${encodeURIComponent(name)}`); + await DELETE(`${run.value.link}/artifacts/${encodeURIComponent(name)}${getArtifactActionSuffix()}`); await store.forceReloadCurrentRun(); } @@ -71,10 +78,50 @@ async function deleteArtifact(name: string) { {{ locale.rerun_all }} +
- {{ run.workflowID }}: + + {{ run.workflowID }} + {{ run.workflowID }} + : + @@ -93,7 +140,7 @@ async function deleteArtifact(name: string) {