refactor: move autoremove into the jobexecutor (#1463)

* refactor: move autoremove into the jobexecutor

breaking: docker container are removed after job exit

* reduce complexity

* remove linter exception

* reduce cyclic complexity

* fix: always allow 1 min for stopping and removing the runner, even if we were cancelled

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
ChristopherHX 2022-12-06 16:45:06 +01:00 committed by GitHub
parent 7073eac240
commit d9fe63ec24
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 41 deletions

View file

@ -95,21 +95,16 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
} }
postExecutor = postExecutor.Finally(func(ctx context.Context) error { postExecutor = postExecutor.Finally(func(ctx context.Context) error {
logger := common.Logger(ctx)
jobError := common.JobError(ctx) jobError := common.JobError(ctx)
if jobError != nil { var err error
info.result("failure") if rc.Config.AutoRemove || jobError == nil {
logger.WithField("jobResult", "failure").Infof("\U0001F3C1 Job failed") // always allow 1 min for stopping and removing the runner, even if we were cancelled
} else { ctx, cancel := context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), time.Minute)
err := info.stopContainer()(ctx) defer cancel()
if err != nil { err = info.stopContainer()(ctx)
}
setJobResult(ctx, info, rc, jobError == nil)
return err return err
}
info.result("success")
logger.WithField("jobResult", "success").Infof("\U0001F3C1 Job succeeded")
}
return nil
}) })
pipeline := make([]common.Executor, 0) pipeline := make([]common.Executor, 0)
@ -122,7 +117,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
if ctx.Err() == context.Canceled { if ctx.Err() == context.Canceled {
// in case of an aborted run, we still should execute the // in case of an aborted run, we still should execute the
// post steps to allow cleanup. // post steps to allow cleanup.
ctx, cancel = context.WithTimeout(WithJobLogger(context.Background(), rc.Run.JobID, rc.String(), rc.Config, &rc.Masks, rc.Matrix), 5*time.Minute) ctx, cancel = context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), 5*time.Minute)
defer cancel() defer cancel()
} }
return postExecutor(ctx) return postExecutor(ctx)
@ -131,6 +126,18 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
Finally(info.closeContainer())) Finally(info.closeContainer()))
} }
func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success bool) {
logger := common.Logger(ctx)
jobResult := "success"
jobResultMessage := "succeeded"
if !success {
jobResult = "failure"
jobResultMessage = "failed"
}
info.result(jobResult)
logger.WithField("jobResult", jobResult).Infof("\U0001F3C1 Job %s", jobResultMessage)
}
func useStepLogger(rc *RunContext, stepModel *model.Step, stage stepStage, executor common.Executor) common.Executor { func useStepLogger(rc *RunContext, stepModel *model.Step, stage stepStage, executor common.Executor) common.Executor {
return func(ctx context.Context) error { return func(ctx context.Context) error {
ctx = withStepLogger(ctx, stepModel.ID, rc.ExprEval.Interpolate(ctx, stepModel.String()), stage.String()) ctx = withStepLogger(ctx, stepModel.ID, rc.ExprEval.Interpolate(ctx, stepModel.String()), stage.String())

View file

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -77,18 +76,15 @@ func New(runnerConfig *Config) (Runner, error) {
} }
// NewPlanExecutor ... // NewPlanExecutor ...
//
//nolint:gocyclo
func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
maxJobNameLen := 0 maxJobNameLen := 0
stagePipeline := make([]common.Executor, 0) stagePipeline := make([]common.Executor, 0)
for i := range plan.Stages { for i := range plan.Stages {
s := i
stage := plan.Stages[i] stage := plan.Stages[i]
stagePipeline = append(stagePipeline, func(ctx context.Context) error { stagePipeline = append(stagePipeline, func(ctx context.Context) error {
pipeline := make([]common.Executor, 0) pipeline := make([]common.Executor, 0)
for r, run := range stage.Runs { for _, run := range stage.Runs {
stageExecutor := make([]common.Executor, 0) stageExecutor := make([]common.Executor, 0)
job := run.Job() job := run.Job()
@ -123,29 +119,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
maxJobNameLen = len(rc.String()) maxJobNameLen = len(rc.String())
} }
stageExecutor = append(stageExecutor, func(ctx context.Context) error { stageExecutor = append(stageExecutor, func(ctx context.Context) error {
logger := common.Logger(ctx)
jobName := fmt.Sprintf("%-*s", maxJobNameLen, rc.String()) jobName := fmt.Sprintf("%-*s", maxJobNameLen, rc.String())
return rc.Executor().Finally(func(ctx context.Context) error { return rc.Executor()(common.WithJobErrorContainer(WithJobLogger(ctx, rc.Run.JobID, jobName, rc.Config, &rc.Masks, matrix)))
isLastRunningContainer := func(currentStage int, currentRun int) bool {
return currentStage == len(plan.Stages)-1 && currentRun == len(stage.Runs)-1
}
if runner.config.AutoRemove && isLastRunningContainer(s, r) {
var cancel context.CancelFunc
if ctx.Err() == context.Canceled {
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
}
log.Infof("Cleaning up container for job %s", rc.JobName)
if err := rc.stopJobContainer()(ctx); err != nil {
logger.Errorf("Error while cleaning container: %v", err)
}
}
return nil
})(common.WithJobErrorContainer(WithJobLogger(ctx, rc.Run.JobID, jobName, rc.Config, &rc.Masks, matrix)))
}) })
} }
pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...)) pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...))