Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions models/actions/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ const (
// ActionArtifact is a file that is stored in the artifact storage.
type ActionArtifact struct {
ID int64 `xorm:"pk autoincr"`
RunID int64 `xorm:"index unique(runid_name_path)"` // The run id of the artifact
RunID int64 `xorm:"index unique(runid_attempt_name_path)"` // The run id of the artifact
RunAttemptID int64 `xorm:"index unique(runid_attempt_name_path) NOT NULL DEFAULT 0"`
RunnerID int64
RepoID int64 `xorm:"index"`
OwnerID int64
Expand All @@ -80,9 +81,9 @@ type ActionArtifact struct {
// * "application/pdf", "text/html", etc.: real content type of the artifact
ContentEncodingOrType string `xorm:"content_encoding"`

ArtifactPath string `xorm:"index unique(runid_name_path)"` // The path to the artifact when runner uploads it
ArtifactName string `xorm:"index unique(runid_name_path)"` // The name of the artifact when runner uploads it
Status ArtifactStatus `xorm:"index"` // The status of the artifact, uploading, expired or need-delete
ArtifactPath string `xorm:"index unique(runid_attempt_name_path)"` // The path to the artifact when runner uploads it
ArtifactName string `xorm:"index unique(runid_attempt_name_path)"` // The name of the artifact when runner uploads it
Status ArtifactStatus `xorm:"index"` // The status of the artifact, uploading, expired or need-delete
CreatedUnix timeutil.TimeStamp `xorm:"created"`
UpdatedUnix timeutil.TimeStamp `xorm:"updated index"`
ExpiredUnix timeutil.TimeStamp `xorm:"index"` // The time when the artifact will be expired
Expand All @@ -92,12 +93,13 @@ func CreateArtifact(ctx context.Context, t *ActionTask, artifactName, artifactPa
if err := t.LoadJob(ctx); err != nil {
return nil, err
}
artifact, err := getArtifactByNameAndPath(ctx, t.Job.RunID, artifactName, artifactPath)
artifact, err := getArtifactByNameAndPath(ctx, t.Job.RunID, t.Job.RunAttemptID, artifactName, artifactPath)
if errors.Is(err, util.ErrNotExist) {
artifact := &ActionArtifact{
ArtifactName: artifactName,
ArtifactPath: artifactPath,
RunID: t.Job.RunID,
RunAttemptID: t.Job.RunAttemptID,
RunnerID: t.RunnerID,
RepoID: t.RepoID,
OwnerID: t.OwnerID,
Expand All @@ -122,9 +124,9 @@ func CreateArtifact(ctx context.Context, t *ActionTask, artifactName, artifactPa
return artifact, nil
}

func getArtifactByNameAndPath(ctx context.Context, runID int64, name, fpath string) (*ActionArtifact, error) {
func getArtifactByNameAndPath(ctx context.Context, runID, runAttemptID int64, name, fpath string) (*ActionArtifact, error) {
var art ActionArtifact
has, err := db.GetEngine(ctx).Where("run_id = ? AND artifact_name = ? AND artifact_path = ?", runID, name, fpath).Get(&art)
has, err := db.GetEngine(ctx).Where("run_id = ? AND run_attempt_id = ? AND artifact_name = ? AND artifact_path = ?", runID, runAttemptID, name, fpath).Get(&art)
if err != nil {
return nil, err
} else if !has {
Expand All @@ -144,6 +146,7 @@ type FindArtifactsOptions struct {
db.ListOptions
RepoID int64
RunID int64
RunAttemptID int64
ArtifactName string
Status int
FinalizedArtifactsV4 bool
Expand All @@ -163,6 +166,9 @@ func (opts FindArtifactsOptions) ToConds() builder.Cond {
if opts.RunID > 0 {
cond = cond.And(builder.Eq{"run_id": opts.RunID})
}
if opts.RunAttemptID > 0 {
cond = cond.And(builder.Eq{"run_attempt_id": opts.RunAttemptID})
}
if opts.ArtifactName != "" {
cond = cond.And(builder.Eq{"artifact_name": opts.ArtifactName})
}
Expand All @@ -185,6 +191,16 @@ type ActionArtifactMeta struct {
Status ArtifactStatus
}

// ListUploadedArtifactsMetaByRunAttempt returns all uploaded artifacts meta of a run attempt.
func ListUploadedArtifactsMetaByRunAttempt(ctx context.Context, repoID, runAttemptID int64) ([]*ActionArtifactMeta, error) {
arts := make([]*ActionArtifactMeta, 0, 10)
return arts, db.GetEngine(ctx).Table("action_artifact").
Where("repo_id=? AND run_attempt_id=? AND (status=? OR status=?)", repoID, runAttemptID, ArtifactStatusUploadConfirmed, ArtifactStatusExpired).
GroupBy("artifact_name").
Select("artifact_name, sum(file_size) as file_size, max(status) as status").
Find(&arts)
}

// ListUploadedArtifactsMeta returns all uploaded artifacts meta of a run
func ListUploadedArtifactsMeta(ctx context.Context, repoID, runID int64) ([]*ActionArtifactMeta, error) {
arts := make([]*ActionArtifactMeta, 0, 10)
Expand Down Expand Up @@ -222,6 +238,12 @@ func SetArtifactNeedDelete(ctx context.Context, runID int64, name string) error
return err
}

// SetArtifactNeedDeleteByRunAttempt sets an artifact to need-delete in a run attempt, cron job will delete it
func SetArtifactNeedDeleteByRunAttempt(ctx context.Context, runID, runAttemptID int64, name string) error {
_, err := db.GetEngine(ctx).Where("run_id=? AND run_attempt_id=? AND artifact_name=? AND status = ?", runID, runAttemptID, name, ArtifactStatusUploadConfirmed).Cols("status").Update(&ActionArtifact{Status: ArtifactStatusPendingDeletion})
return err
}

// SetArtifactDeleted sets an artifact to deleted
func SetArtifactDeleted(ctx context.Context, artifactID int64) error {
_, err := db.GetEngine(ctx).ID(artifactID).Cols("status").Update(&ActionArtifact{Status: ArtifactStatusDeleted})
Expand Down
89 changes: 68 additions & 21 deletions models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,20 @@ type ActionRun struct {
Status Status `xorm:"index"`
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
RawConcurrency string // raw concurrency
ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"`
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"`
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0

// Started and Stopped are identical to the latest attempt after ActionRunAttempt was introduced.
// When a rerun creates a new latest attempt, they are reset until the new attempt starts and stops.
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
// PreviousDuration is used for recording previous duration

// PreviousDuration is kept only for legacy runs created before ActionRunAttempt existed.
// New runs and reruns no longer update this field and use attempt-scoped durations instead.
PreviousDuration time.Duration
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`

LatestAttemptID int64 `xorm:"index NOT NULL DEFAULT 0"`

Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
}

func init() {
Expand Down Expand Up @@ -160,6 +165,48 @@ func (run *ActionRun) Duration() time.Duration {
return d
}

// GetLatestAttempt returns
// - the latest attempt of the run
// - (nil, false, nil) for legacy runs that have no attempt records
func (run *ActionRun) GetLatestAttempt(ctx context.Context) (*ActionRunAttempt, bool, error) {
if run.LatestAttemptID == 0 {
return nil, false, nil
}
attempt, err := GetRunAttemptByRepoAndID(ctx, run.RepoID, run.LatestAttemptID)
if err != nil {
return nil, false, err
}
return attempt, true, nil
}

// GetLatestAttemptID returns
// - the latest attempt ID for attempt-based runs
// - 0 for legacy runs that were created before ActionRunAttempt existed
func (run *ActionRun) GetLatestAttemptID(ctx context.Context) (int64, error) {
if run.LatestAttemptID == 0 {
return 0, nil
}
attempt, _, err := run.GetLatestAttempt(ctx)
if err != nil {
return 0, fmt.Errorf("get latest run attempt: %w", err)
}
if attempt != nil {
return attempt.ID, nil
}
return 0, fmt.Errorf("run %d has no valid latest attempt", run.ID)
}

func (run *ActionRun) GetEffectiveConcurrency(ctx context.Context) (string, bool, error) {
attempt, has, err := run.GetLatestAttempt(ctx)
if err != nil {
return "", false, err
}
if has {
return attempt.ConcurrencyGroup, attempt.ConcurrencyCancel, nil
}
return "", false, nil
}

func (run *ActionRun) GetPushEventPayload() (*api.PushPayload, error) {
if run.Event == webhook_module.HookEventPush {
var payload api.PushPayload
Expand Down Expand Up @@ -406,14 +453,16 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {

type ActionRunIndex db.ResourceIndex

func GetConcurrentRunsAndJobs(ctx context.Context, repoID int64, concurrencyGroup string, status []Status) ([]*ActionRun, []*ActionRunJob, error) {
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
// GetConcurrentRunAttemptsAndJobs returns run attempts and jobs in the same concurrency group by statuses.
func GetConcurrentRunAttemptsAndJobs(ctx context.Context, repoID int64, concurrencyGroup string, status []Status) ([]*ActionRunAttempt, []*ActionRunJob, error) {
attempts, err := db.Find[ActionRunAttempt](ctx, &FindRunAttemptOptions{
RepoID: repoID,
Statuses: status,
ConcurrencyGroup: concurrencyGroup,
Status: status,
ListOptions: db.ListOptionsAll,
})
if err != nil {
return nil, nil, fmt.Errorf("find runs: %w", err)
return nil, nil, fmt.Errorf("find run attempts: %w", err)
}

jobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
Expand All @@ -425,36 +474,34 @@ func GetConcurrentRunsAndJobs(ctx context.Context, repoID int64, concurrencyGrou
return nil, nil, fmt.Errorf("find jobs: %w", err)
}

return runs, jobs, nil
return attempts, jobs, nil
}

func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) {
if actionRun.ConcurrencyGroup == "" {
func CancelPreviousJobsByRunConcurrency(ctx context.Context, attempt *ActionRunAttempt) ([]*ActionRunJob, error) {
if attempt.ConcurrencyGroup == "" {
return nil, nil
}

var jobsToCancel []*ActionRunJob

statusFindOption := []Status{StatusWaiting, StatusBlocked}
if actionRun.ConcurrencyCancel {
if attempt.ConcurrencyCancel {
statusFindOption = append(statusFindOption, StatusRunning)
}
runs, jobs, err := GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, statusFindOption)
attempts, jobs, err := GetConcurrentRunAttemptsAndJobs(ctx, attempt.RepoID, attempt.ConcurrencyGroup, statusFindOption)
if err != nil {
return nil, fmt.Errorf("find concurrent runs and jobs: %w", err)
}
jobsToCancel = append(jobsToCancel, jobs...)

// cancel runs in the same concurrency group
for _, run := range runs {
if run.ID == actionRun.ID {
for _, concurrentAttempt := range attempts {
if concurrentAttempt.RunID == attempt.RunID {
continue
}
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
RunID: run.ID,
})
jobs, err := GetRunJobsByRunAndAttemptID(ctx, concurrentAttempt.RunID, concurrentAttempt.ID)
if err != nil {
return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
return nil, fmt.Errorf("find run %d attempt %d jobs: %w", concurrentAttempt.RunID, concurrentAttempt.ID, err)
}
jobsToCancel = append(jobsToCancel, jobs...)
}
Expand Down
128 changes: 128 additions & 0 deletions models/actions/run_attempt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package actions

import (
"context"
"fmt"
"slices"
"time"

"code.gitea.io/gitea/models/db"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
)

// ActionRunAttempt represents a single execution attempt of an ActionRun.
type ActionRunAttempt struct {
ID int64
RepoID int64 `xorm:"index"`
RunID int64 `xorm:"index UNIQUE(run_attempt)"`
Run *ActionRun `xorm:"-"`
Attempt int64 `xorm:"UNIQUE(run_attempt)"`

TriggerUserID int64 `xorm:"index"`
TriggerUser *user_model.User `xorm:"-"`

ConcurrencyGroup string
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"`

Status Status `xorm:"index"`
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp

Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
}

func (*ActionRunAttempt) TableName() string {
return "action_run_attempt"
}

func init() {
db.RegisterModel(new(ActionRunAttempt))
}

func (attempt *ActionRunAttempt) Duration() time.Duration {
return calculateDuration(attempt.Started, attempt.Stopped, attempt.Status, attempt.Updated)
}

func (attempt *ActionRunAttempt) LoadAttributes(ctx context.Context) error {
if attempt == nil {
return nil
}
if attempt.TriggerUser == nil {
u, err := user_model.GetPossibleUserByID(ctx, attempt.TriggerUserID)
if err != nil {
return err
}
attempt.TriggerUser = u
}
return nil
}

func GetRunAttemptByRepoAndID(ctx context.Context, repoID, attemptID int64) (*ActionRunAttempt, error) {
var attempt ActionRunAttempt
has, err := db.GetEngine(ctx).Where("repo_id=? AND id=?", repoID, attemptID).Get(&attempt)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("run attempt %d in repo %d: %w", attemptID, repoID, util.ErrNotExist)
}
return &attempt, nil
}

func GetRunAttemptByRunIDAndAttemptNum(ctx context.Context, runID, attemptNum int64) (*ActionRunAttempt, error) {
var attempt ActionRunAttempt
has, err := db.GetEngine(ctx).Where("run_id=? AND attempt=?", runID, attemptNum).Get(&attempt)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("run attempt %d for run %d: %w", attemptNum, runID, util.ErrNotExist)
}
return &attempt, nil
}

func ListRunAttemptsByRunID(ctx context.Context, runID int64) ([]*ActionRunAttempt, error) {
return db.Find[ActionRunAttempt](ctx, &FindRunAttemptOptions{
RunID: runID,
ListOptions: db.ListOptionsAll,
})
}

func UpdateRunAttempt(ctx context.Context, attempt *ActionRunAttempt, cols ...string) error {
if slices.Contains(cols, "status") && attempt.Started.IsZero() && attempt.Status.IsRunning() {
attempt.Started = timeutil.TimeStampNow()
cols = append(cols, "started")
}

sess := db.GetEngine(ctx).ID(attempt.ID)
if len(cols) > 0 {
sess.Cols(cols...)
}
if _, err := sess.Update(attempt); err != nil {
return err
}

// Only status/timing changes on an attempt need to update the latest run.
if len(cols) > 0 && !slices.Contains(cols, "status") && !slices.Contains(cols, "started") && !slices.Contains(cols, "stopped") {
return nil
}

run, err := GetRunByRepoAndID(ctx, attempt.RepoID, attempt.RunID)
if err != nil {
return err
}
if run.LatestAttemptID != attempt.ID {
log.Warn("run %d cannot be updated by an old attempt %d", run.LatestAttemptID, attempt.ID)
return nil
}

run.Status = attempt.Status
run.Started = attempt.Started
run.Stopped = attempt.Stopped
return UpdateRun(ctx, run, "status", "started", "stopped")
}
Loading
Loading