fix(actions): deadlock between PrepareRunAndInsert and UpdateTaskByState (#37692) (#37718)

This commit is contained in:
Zettat123
2026-05-15 23:02:14 -06:00
committed by GitHub
parent 1d5163133b
commit 7b82ded82a
8 changed files with 65 additions and 39 deletions
+28 -37
View File
@@ -7,7 +7,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"slices"
"strings" "strings"
"time" "time"
@@ -197,30 +196,34 @@ func (run *ActionRun) IsSchedule() bool {
} }
// UpdateRepoRunsNumbers updates the number of runs and closed runs of a repository. // UpdateRepoRunsNumbers updates the number of runs and closed runs of a repository.
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error { // Callers MUST invoke this from outside any transaction that has X-locked action_run rows for the same repo, otherwise, transaction deadlock
_, err := db.GetEngine(ctx).ID(repo.ID). func UpdateRepoRunsNumbers(ctx context.Context, repoID int64) {
NoAutoTime(). if db.InTransaction(ctx) {
Cols("num_action_runs", "num_closed_action_runs"). setting.PanicInDevOrTesting("UpdateRepoRunsNumbers must not be called inside a transaction")
SetExpr("num_action_runs", }
builder.Select("count(*)").From("action_run").
Where(builder.Eq{"repo_id": repo.ID}), e := db.GetEngine(ctx)
).
SetExpr("num_closed_action_runs", numActionRuns, err := e.Where("repo_id = ?", repoID).Count(new(ActionRun))
builder.Select("count(*)").From("action_run"). if err != nil {
Where(builder.Eq{ log.Error("UpdateRepoRunsNumbers count num_action_runs for repo %d: %v", repoID, err)
"repo_id": repo.ID, return
}.And( }
builder.In("status",
StatusSuccess, numClosedActionRuns, err := e.Where("repo_id = ?", repoID).
StatusFailure, In("status", StatusSuccess, StatusFailure, StatusCancelled, StatusSkipped).
StatusCancelled, Count(new(ActionRun))
StatusSkipped, if err != nil {
), log.Error("UpdateRepoRunsNumbers count num_closed_action_runs for repo %d: %v", repoID, err)
), return
), }
).
Update(repo) if _, err := e.ID(repoID).Cols("num_action_runs", "num_closed_action_runs").NoAutoTime().Update(&repo_model.Repository{
return err NumActionRuns: int(numActionRuns),
NumClosedActionRuns: int(numClosedActionRuns),
}); err != nil {
log.Error("UpdateRepoRunsNumbers update repo %d: %v", repoID, err)
}
} }
// CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event. // CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event.
@@ -388,18 +391,6 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
// It's impossible that the run is not found, since Gitea never deletes runs. // It's impossible that the run is not found, since Gitea never deletes runs.
} }
if run.Status != 0 || slices.Contains(cols, "status") {
if run.RepoID == 0 {
setting.PanicInDevOrTesting("RepoID should not be 0")
}
if err = run.LoadRepo(ctx); err != nil {
return err
}
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
return err
}
}
return nil return nil
} }
+1 -2
View File
@@ -29,8 +29,7 @@ func TestUpdateRepoRunsNumbers(t *testing.T) {
assert.Equal(t, 2, repo.NumClosedActionRuns) assert.Equal(t, 2, repo.NumClosedActionRuns)
// now update will correct them, only num_actionr_runs and num_closed_action_runs should be updated // now update will correct them, only num_actionr_runs and num_closed_action_runs should be updated
err = UpdateRepoRunsNumbers(t.Context(), repo) UpdateRepoRunsNumbers(t.Context(), repo.ID)
assert.NoError(t, err)
repo = unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 4}) repo = unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 4})
assert.Equal(t, 5, repo.NumActionRuns) assert.Equal(t, 5, repo.NumActionRuns)
assert.Equal(t, 3, repo.NumClosedActionRuns) assert.Equal(t, 3, repo.NumClosedActionRuns)
+2
View File
@@ -246,6 +246,8 @@ func DeleteRun(ctx context.Context, run *actions_model.ActionRun) error {
return err return err
} }
actions_model.UpdateRepoRunsNumbers(ctx, repoID)
// Delete files on storage // Delete files on storage
for _, tas := range tasks { for _, tas := range tasks {
removeTaskLog(ctx, tas) removeTaskLog(ctx, tas)
+23
View File
@@ -12,6 +12,7 @@ import (
"code.gitea.io/gitea/models/db" "code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo" repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/actions" "code.gitea.io/gitea/modules/actions"
"code.gitea.io/gitea/modules/container"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/timeutil"
@@ -62,6 +63,9 @@ func notifyWorkflowJobStatusUpdate(ctx context.Context, jobs []*actions_model.Ac
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error { func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
jobs, err := actions_model.CancelPreviousJobs(ctx, repoID, ref, workflowID, event) jobs, err := actions_model.CancelPreviousJobs(ctx, repoID, ref, workflowID, event)
notifyWorkflowJobStatusUpdate(ctx, jobs) notifyWorkflowJobStatusUpdate(ctx, jobs)
if len(jobs) > 0 {
actions_model.UpdateRepoRunsNumbers(ctx, repoID)
}
EmitJobsIfReadyByJobs(jobs) EmitJobsIfReadyByJobs(jobs)
return err return err
} }
@@ -69,6 +73,9 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) error { func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) error {
jobs, err := actions_model.CleanRepoScheduleTasks(ctx, repo) jobs, err := actions_model.CleanRepoScheduleTasks(ctx, repo)
notifyWorkflowJobStatusUpdate(ctx, jobs) notifyWorkflowJobStatusUpdate(ctx, jobs)
if len(jobs) > 0 {
actions_model.UpdateRepoRunsNumbers(ctx, repo.ID)
}
EmitJobsIfReadyByJobs(jobs) EmitJobsIfReadyByJobs(jobs)
return err return err
} }
@@ -176,6 +183,16 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
} }
notifyWorkflowJobStatusUpdate(ctx, jobs) notifyWorkflowJobStatusUpdate(ctx, jobs)
// Recompute counters post-commit for every repo whose runs may have flipped done-status.
reconcileRepos := make(container.Set[int64])
for _, job := range jobs {
reconcileRepos.Add(job.RepoID)
}
for repoID := range reconcileRepos {
actions_model.UpdateRepoRunsNumbers(ctx, repoID)
}
EmitJobsIfReadyByJobs(jobs) EmitJobsIfReadyByJobs(jobs)
return nil return nil
@@ -197,6 +214,7 @@ func CancelAbandonedJobs(ctx context.Context) error {
// Collect one job per run to send workflow run status update // Collect one job per run to send workflow run status update
updatedRuns := map[int64]*actions_model.ActionRunJob{} updatedRuns := map[int64]*actions_model.ActionRunJob{}
updatedJobs := []*actions_model.ActionRunJob{} updatedJobs := []*actions_model.ActionRunJob{}
updatedRepoIDs := make(container.Set[int64])
for _, job := range jobs { for _, job := range jobs {
job.Status = actions_model.StatusCancelled job.Status = actions_model.StatusCancelled
@@ -213,6 +231,7 @@ func CancelAbandonedJobs(ctx context.Context) error {
updated = n > 0 updated = n > 0
if updated && job.Run.Status.IsDone() { if updated && job.Run.Status.IsDone() {
updatedRuns[job.RunID] = job updatedRuns[job.RunID] = job
updatedRepoIDs.Add(job.RepoID)
} }
return nil return nil
}); err != nil { }); err != nil {
@@ -234,5 +253,9 @@ func CancelAbandonedJobs(ctx context.Context) error {
} }
EmitJobsIfReadyByJobs(updatedJobs) EmitJobsIfReadyByJobs(updatedJobs)
for repoID := range updatedRepoIDs {
actions_model.UpdateRepoRunsNumbers(ctx, repoID)
}
return nil return nil
} }
+3
View File
@@ -248,6 +248,9 @@ func NotifyWorkflowRunStatusUpdateWithReload(ctx context.Context, job *actions_m
return return
} }
notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run) notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run)
// Recomputes the repository's num_action_runs / num_closed_action_runs counters since the run's status changed
actions_model.UpdateRepoRunsNumbers(ctx, job.RepoID)
} }
type jobStatusResolver struct { type jobStatusResolver struct {
+3
View File
@@ -124,6 +124,9 @@ func prepareRunRerun(ctx context.Context, repo *repo_model.Repository, run *acti
job.Run = run job.Run = run
} }
// Recomputes the repository's num_action_runs / num_closed_action_runs counters since the run's status changed
actions_model.UpdateRepoRunsNumbers(ctx, run.RepoID)
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
return run.Status == actions_model.StatusBlocked, nil return run.Status == actions_model.StatusBlocked, nil
+3
View File
@@ -52,6 +52,9 @@ func PrepareRunAndInsert(ctx context.Context, content []byte, run *actions_model
notify_service.WorkflowJobStatusUpdate(ctx, run.Repo, run.TriggerUser, job, nil) notify_service.WorkflowJobStatusUpdate(ctx, run.Repo, run.TriggerUser, job, nil)
} }
// Recomputes the repository's num_action_runs / num_closed_action_runs counters since a new run is created
actions_model.UpdateRepoRunsNumbers(ctx, run.RepoID)
return nil return nil
} }
+2
View File
@@ -105,6 +105,8 @@ func fixUnfinishedRunStatus(ctx context.Context, logger log.Logger, autofix bool
} }
fixed++ fixed++
actions_model.UpdateRepoRunsNumbers(ctx, run.RepoID)
return nil return nil
}, },
) )