Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions models/actions/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ const (
// ActionArtifact is a file that is stored in the artifact storage.
type ActionArtifact struct {
ID int64 `xorm:"pk autoincr"`
RunID int64 `xorm:"index unique(runid_name_path)"` // The run id of the artifact
RunID int64 `xorm:"index unique(runid_attempt_name_path)"` // The run id of the artifact
RunAttemptID int64 `xorm:"index unique(runid_attempt_name_path) NOT NULL DEFAULT 0"`
RunnerID int64
RepoID int64 `xorm:"index"`
OwnerID int64
Expand All @@ -80,9 +81,9 @@ type ActionArtifact struct {
// * "application/pdf", "text/html", etc.: real content type of the artifact
ContentEncodingOrType string `xorm:"content_encoding"`

ArtifactPath string `xorm:"index unique(runid_name_path)"` // The path to the artifact when runner uploads it
ArtifactName string `xorm:"index unique(runid_name_path)"` // The name of the artifact when runner uploads it
Status ArtifactStatus `xorm:"index"` // The status of the artifact, uploading, expired or need-delete
ArtifactPath string `xorm:"index unique(runid_attempt_name_path)"` // The path to the artifact when runner uploads it
ArtifactName string `xorm:"index unique(runid_attempt_name_path)"` // The name of the artifact when runner uploads it
Status ArtifactStatus `xorm:"index"` // The status of the artifact, uploading, expired or need-delete
CreatedUnix timeutil.TimeStamp `xorm:"created"`
UpdatedUnix timeutil.TimeStamp `xorm:"updated index"`
ExpiredUnix timeutil.TimeStamp `xorm:"index"` // The time when the artifact will be expired
Expand All @@ -92,12 +93,13 @@ func CreateArtifact(ctx context.Context, t *ActionTask, artifactName, artifactPa
if err := t.LoadJob(ctx); err != nil {
return nil, err
}
artifact, err := getArtifactByNameAndPath(ctx, t.Job.RunID, artifactName, artifactPath)
artifact, err := getArtifactByNameAndPath(ctx, t.Job.RunID, t.Job.RunAttemptID, artifactName, artifactPath)
if errors.Is(err, util.ErrNotExist) {
artifact := &ActionArtifact{
ArtifactName: artifactName,
ArtifactPath: artifactPath,
RunID: t.Job.RunID,
RunAttemptID: t.Job.RunAttemptID,
RunnerID: t.RunnerID,
RepoID: t.RepoID,
OwnerID: t.OwnerID,
Expand All @@ -122,9 +124,9 @@ func CreateArtifact(ctx context.Context, t *ActionTask, artifactName, artifactPa
return artifact, nil
}

func getArtifactByNameAndPath(ctx context.Context, runID int64, name, fpath string) (*ActionArtifact, error) {
func getArtifactByNameAndPath(ctx context.Context, runID, runAttemptID int64, name, fpath string) (*ActionArtifact, error) {
var art ActionArtifact
has, err := db.GetEngine(ctx).Where("run_id = ? AND artifact_name = ? AND artifact_path = ?", runID, name, fpath).Get(&art)
has, err := db.GetEngine(ctx).Where("run_id = ? AND run_attempt_id = ? AND artifact_name = ? AND artifact_path = ?", runID, runAttemptID, name, fpath).Get(&art)
if err != nil {
return nil, err
} else if !has {
Expand All @@ -144,6 +146,7 @@ type FindArtifactsOptions struct {
db.ListOptions
RepoID int64
RunID int64
RunAttemptID int64
ArtifactName string
Status int
FinalizedArtifactsV4 bool
Expand All @@ -163,6 +166,9 @@ func (opts FindArtifactsOptions) ToConds() builder.Cond {
if opts.RunID > 0 {
cond = cond.And(builder.Eq{"run_id": opts.RunID})
}
if opts.RunAttemptID > 0 {
cond = cond.And(builder.Eq{"run_attempt_id": opts.RunAttemptID})
}
if opts.ArtifactName != "" {
cond = cond.And(builder.Eq{"artifact_name": opts.ArtifactName})
}
Expand All @@ -185,6 +191,16 @@ type ActionArtifactMeta struct {
Status ArtifactStatus
}

// ListUploadedArtifactsMetaByRunAttempt returns all uploaded artifacts meta of a run attempt.
func ListUploadedArtifactsMetaByRunAttempt(ctx context.Context, repoID, runAttemptID int64) ([]*ActionArtifactMeta, error) {
arts := make([]*ActionArtifactMeta, 0, 10)
return arts, db.GetEngine(ctx).Table("action_artifact").
Where("repo_id=? AND run_attempt_id=? AND (status=? OR status=?)", repoID, runAttemptID, ArtifactStatusUploadConfirmed, ArtifactStatusExpired).
GroupBy("artifact_name").
Select("artifact_name, sum(file_size) as file_size, max(status) as status").
Find(&arts)
}

// ListUploadedArtifactsMeta returns all uploaded artifacts meta of a run
func ListUploadedArtifactsMeta(ctx context.Context, repoID, runID int64) ([]*ActionArtifactMeta, error) {
arts := make([]*ActionArtifactMeta, 0, 10)
Expand Down Expand Up @@ -222,6 +238,12 @@ func SetArtifactNeedDelete(ctx context.Context, runID int64, name string) error
return err
}

// SetArtifactNeedDeleteByRunAttempt sets an artifact to need-delete in a run attempt, cron job will delete it
func SetArtifactNeedDeleteByRunAttempt(ctx context.Context, runID, runAttemptID int64, name string) error {
_, err := db.GetEngine(ctx).Where("run_id=? AND run_attempt_id=? AND artifact_name=? AND status = ?", runID, runAttemptID, name, ArtifactStatusUploadConfirmed).Cols("status").Update(&ActionArtifact{Status: ArtifactStatusPendingDeletion})
return err
}

// SetArtifactDeleted sets an artifact to deleted
func SetArtifactDeleted(ctx context.Context, artifactID int64) error {
_, err := db.GetEngine(ctx).ID(artifactID).Cols("status").Update(&ActionArtifact{Status: ArtifactStatusDeleted})
Expand Down
63 changes: 45 additions & 18 deletions models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@ type ActionRun struct {
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
Status Status `xorm:"index"`
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
LatestAttemptID int64 `xorm:"index NOT NULL DEFAULT 0"`
RawConcurrency string // raw concurrency
ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"`
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"`
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
// PreviousDuration is used for recording previous duration
// PreviousDuration is a cached duration of the attempt immediately preceding the latest attempt.
PreviousDuration time.Duration
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
Expand Down Expand Up @@ -156,6 +155,31 @@ func (run *ActionRun) Duration() time.Duration {
return calculateDuration(run.Started, run.Stopped, run.Status) + run.PreviousDuration
}

func (run *ActionRun) GetLatestAttempt(ctx context.Context) (*RunAttempt, bool, error) {
if run.LatestAttemptID == 0 {
return nil, false, nil
}
attempt, err := GetRunAttemptByRepoAndID(ctx, run.RepoID, run.LatestAttemptID)
if errors.Is(err, util.ErrNotExist) {
return nil, false, nil
}
if err != nil {
return nil, false, err
}
return attempt, true, nil
}

func (run *ActionRun) GetEffectiveConcurrency(ctx context.Context) (string, bool, error) {
attempt, has, err := run.GetLatestAttempt(ctx)
if err != nil {
return "", false, err
}
if has {
return attempt.ConcurrencyGroup, attempt.ConcurrencyCancel, nil
}
return "", false, nil
}

func (run *ActionRun) GetPushEventPayload() (*api.PushPayload, error) {
if run.Event == webhook_module.HookEventPush {
var payload api.PushPayload
Expand Down Expand Up @@ -402,14 +426,19 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {

type ActionRunIndex db.ResourceIndex

func GetConcurrentRunsAndJobs(ctx context.Context, repoID int64, concurrencyGroup string, status []Status) ([]*ActionRun, []*ActionRunJob, error) {
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
// GetConcurrentRunAttemptsAndJobs returns run attempts and jobs in the same concurrency group
// whose statuses match the requested non-done statuses.
// Under the current data model, only the latest attempt of a run may remain in a non-done state,
// so filtering by attempt status is sufficient and does not need an extra join back to action_run.
func GetConcurrentRunAttemptsAndJobs(ctx context.Context, repoID int64, concurrencyGroup string, status []Status) ([]*RunAttempt, []*ActionRunJob, error) {
attempts, err := db.Find[RunAttempt](ctx, &FindRunAttemptOptions{
RepoID: repoID,
Statuses: status,
ConcurrencyGroup: concurrencyGroup,
Status: status,
ListOptions: db.ListOptionsAll,
})
if err != nil {
return nil, nil, fmt.Errorf("find runs: %w", err)
return nil, nil, fmt.Errorf("find run attempts: %w", err)
}

jobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
Expand All @@ -421,36 +450,34 @@ func GetConcurrentRunsAndJobs(ctx context.Context, repoID int64, concurrencyGrou
return nil, nil, fmt.Errorf("find jobs: %w", err)
}

return runs, jobs, nil
return attempts, jobs, nil
}

func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) {
if actionRun.ConcurrencyGroup == "" {
func CancelPreviousJobsByRunConcurrency(ctx context.Context, attempt *RunAttempt) ([]*ActionRunJob, error) {
if attempt.ConcurrencyGroup == "" {
return nil, nil
}

var jobsToCancel []*ActionRunJob

statusFindOption := []Status{StatusWaiting, StatusBlocked}
if actionRun.ConcurrencyCancel {
if attempt.ConcurrencyCancel {
statusFindOption = append(statusFindOption, StatusRunning)
}
runs, jobs, err := GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, statusFindOption)
attempts, jobs, err := GetConcurrentRunAttemptsAndJobs(ctx, attempt.RepoID, attempt.ConcurrencyGroup, statusFindOption)
if err != nil {
return nil, fmt.Errorf("find concurrent runs and jobs: %w", err)
}
jobsToCancel = append(jobsToCancel, jobs...)

// cancel runs in the same concurrency group
for _, run := range runs {
if run.ID == actionRun.ID {
for _, concurrentAttempt := range attempts {
if concurrentAttempt.RunID == attempt.RunID {
continue
}
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
RunID: run.ID,
})
jobs, err := GetRunJobsByRunAndAttemptID(ctx, concurrentAttempt.RunID, concurrentAttempt.ID)
if err != nil {
return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
return nil, fmt.Errorf("find run %d attempt %d jobs: %w", concurrentAttempt.RunID, concurrentAttempt.ID, err)
}
jobsToCancel = append(jobsToCancel, jobs...)
}
Expand Down
118 changes: 118 additions & 0 deletions models/actions/run_attempt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package actions

import (
"context"
"fmt"
"slices"
"time"

"code.gitea.io/gitea/models/db"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
)

// RunAttempt represents a single execution attempt of an ActionRun.
type RunAttempt struct {
ID int64
RepoID int64 `xorm:"index"`
RunID int64 `xorm:"index UNIQUE(run_attempt)"`
Run *ActionRun `xorm:"-"`
Attempt int64 `xorm:"UNIQUE(run_attempt)"`

TriggerUserID int64 `xorm:"index"`
TriggerUser *user_model.User `xorm:"-"`

ConcurrencyGroup string
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"`

Status Status `xorm:"index"`
Started timeutil.TimeStamp
RunStartedAt timeutil.TimeStamp
Stopped timeutil.TimeStamp

Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
}

func (*RunAttempt) TableName() string {
return "action_run_attempt"
}

func init() {
db.RegisterModel(new(RunAttempt))
}

func (attempt *RunAttempt) Duration() time.Duration {
return calculateDuration(attempt.Started, attempt.Stopped, attempt.Status)
}

func GetRunAttemptByRepoAndID(ctx context.Context, repoID, attemptID int64) (*RunAttempt, error) {
var attempt RunAttempt
has, err := db.GetEngine(ctx).Where("repo_id=? AND id=?", repoID, attemptID).Get(&attempt)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("run attempt %d in repo %d: %w", attemptID, repoID, util.ErrNotExist)
}
return &attempt, nil
}

func GetLatestAttemptByRunID(ctx context.Context, runID int64) (*RunAttempt, bool, error) {
var attempt RunAttempt
has, err := db.GetEngine(ctx).Where("run_id=?", runID).Desc("attempt").Get(&attempt)
if err != nil {
return nil, false, err
} else if !has {
return nil, false, nil
}
return &attempt, true, nil
}

func GetRunAttemptByRunIDAndAttemptNum(ctx context.Context, runID, attemptNum int64) (*RunAttempt, error) {
var attempt RunAttempt
has, err := db.GetEngine(ctx).Where("run_id=? AND attempt=?", runID, attemptNum).Get(&attempt)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("run attempt %d for run %d: %w", attemptNum, runID, util.ErrNotExist)
}
return &attempt, nil
}

func ListRunAttemptsByRunID(ctx context.Context, runID int64) ([]*RunAttempt, error) {
return db.Find[RunAttempt](ctx, &FindRunAttemptOptions{
RunID: runID,
ListOptions: db.ListOptionsAll,
})
}

func UpdateRunAttempt(ctx context.Context, attempt *RunAttempt, cols ...string) error {
sess := db.GetEngine(ctx).ID(attempt.ID)
if len(cols) > 0 {
sess.Cols(cols...)
}
if _, err := sess.Update(attempt); err != nil {
return err
}

if len(cols) > 0 && !slices.Contains(cols, "status") && !slices.Contains(cols, "started") && !slices.Contains(cols, "stopped") {
return nil
}

run, err := GetRunByRepoAndID(ctx, attempt.RepoID, attempt.RunID)
if err != nil {
return err
}
if run.LatestAttemptID != attempt.ID {
return nil
}

run.Status = attempt.Status
run.Started = attempt.Started
run.Stopped = attempt.Stopped
return UpdateRun(ctx, run, "status", "started", "stopped")
}
Loading