From 3ac2b726f2c019a1985dbd21abc3b7ca1d2374ba Mon Sep 17 00:00:00 2001 From: psa Date: Tue, 6 Jun 2023 03:00:54 +0000 Subject: [PATCH] Fix bug in processing jobs on platforms without Docker (#1834) * Log incoming jobs. Log the full contents of the job protobuf to make debugging jobs easier * Ensure that the parallel executor always uses at least one thread. The caller may mis-calculate the number of CPUs as zero, in which case ensure that at least one thread is spawned. * Use runtime.NumCPU for CPU counts. For hosts without docker, GetHostInfo() returns a blank struct which has zero CPUs and causes downstream trouble. --------- Co-authored-by: Paul Armstrong Co-authored-by: Jason Song --- pkg/common/executor.go | 7 ++++++ pkg/common/executor_test.go | 11 +++++++++ pkg/model/workflow.go | 1 + pkg/runner/runner.go | 45 ++++++++++++++++++++++++++++++------- 4 files changed, 56 insertions(+), 8 deletions(-) diff --git a/pkg/common/executor.go b/pkg/common/executor.go index c5b05f3..a5eb079 100644 --- a/pkg/common/executor.go +++ b/pkg/common/executor.go @@ -3,6 +3,8 @@ package common import ( "context" "fmt" + + log "github.com/sirupsen/logrus" ) // Warning that implements `error` but safe to ignore @@ -94,6 +96,11 @@ func NewParallelExecutor(parallel int, executors ...Executor) Executor { work := make(chan Executor, len(executors)) errs := make(chan error, len(executors)) + if 1 > parallel { + log.Infof("Parallel tasks (%d) below minimum, setting to 1", parallel) + parallel = 1 + } + for i := 0; i < parallel; i++ { go func(work <-chan Executor, errs chan<- error) { for executor := range work { diff --git a/pkg/common/executor_test.go b/pkg/common/executor_test.go index 7f691e4..e70c638 100644 --- a/pkg/common/executor_test.go +++ b/pkg/common/executor_test.go @@ -100,6 +100,17 @@ func TestNewParallelExecutor(t *testing.T) { assert.Equal(3, count, "should run all 3 executors") assert.Equal(2, maxCount, "should run at most 2 executors in parallel") assert.Nil(err) + + // Reset to test running the executor with 0 parallelism + count = 0 + activeCount = 0 + maxCount = 0 + + errSingle := NewParallelExecutor(0, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx) + + assert.Equal(3, count, "should run all 3 executors") + assert.Equal(1, maxCount, "should run at most 1 executors in parallel") + assert.Nil(errSingle) } func TestNewParallelExecutorFailed(t *testing.T) { diff --git a/pkg/model/workflow.go b/pkg/model/workflow.go index e7d43e4..19edfb4 100644 --- a/pkg/model/workflow.go +++ b/pkg/model/workflow.go @@ -417,6 +417,7 @@ func (j *Job) GetMatrixes() ([]map[string]interface{}, error) { } } else { matrixes = append(matrixes, make(map[string]interface{})) + log.Debugf("Empty Strategy, matrixes=%v", matrixes) } return matrixes, nil } diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index a47cf8b..e1d8d8a 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -5,11 +5,11 @@ import ( "encoding/json" "fmt" "os" + "runtime" log "github.com/sirupsen/logrus" "github.com/nektos/act/pkg/common" - "github.com/nektos/act/pkg/container" "github.com/nektos/act/pkg/model" ) @@ -103,15 +103,45 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { maxJobNameLen := 0 stagePipeline := make([]common.Executor, 0) + log.Debugf("Plan Stages: %v", plan.Stages) + for i := range plan.Stages { stage := plan.Stages[i] stagePipeline = append(stagePipeline, func(ctx context.Context) error { pipeline := make([]common.Executor, 0) for _, run := range stage.Runs { + log.Debugf("Stages Runs: %v", stage.Runs) stageExecutor := make([]common.Executor, 0) job := run.Job() + log.Debugf("Job.Name: %v", job.Name) + log.Debugf("Job.RawNeeds: %v", job.RawNeeds) + log.Debugf("Job.RawRunsOn: %v", job.RawRunsOn) + log.Debugf("Job.Env: %v", job.Env) + log.Debugf("Job.If: %v", job.If) + for step := range job.Steps { + if nil != job.Steps[step] { + log.Debugf("Job.Steps: %v", job.Steps[step].String()) + } + } + log.Debugf("Job.TimeoutMinutes: %v", job.TimeoutMinutes) + log.Debugf("Job.Services: %v", job.Services) + log.Debugf("Job.Strategy: %v", job.Strategy) + log.Debugf("Job.RawContainer: %v", job.RawContainer) + log.Debugf("Job.Defaults.Run.Shell: %v", job.Defaults.Run.Shell) + log.Debugf("Job.Defaults.Run.WorkingDirectory: %v", job.Defaults.Run.WorkingDirectory) + log.Debugf("Job.Outputs: %v", job.Outputs) + log.Debugf("Job.Uses: %v", job.Uses) + log.Debugf("Job.With: %v", job.With) + // log.Debugf("Job.RawSecrets: %v", job.RawSecrets) + log.Debugf("Job.Result: %v", job.Result) if job.Strategy != nil { + log.Debugf("Job.Strategy.FailFast: %v", job.Strategy.FailFast) + log.Debugf("Job.Strategy.MaxParallel: %v", job.Strategy.MaxParallel) + log.Debugf("Job.Strategy.FailFastString: %v", job.Strategy.FailFastString) + log.Debugf("Job.Strategy.MaxParallelString: %v", job.Strategy.MaxParallelString) + log.Debugf("Job.Strategy.RawMatrix: %v", job.Strategy.RawMatrix) + strategyRc := runner.newRunContext(ctx, run, nil) if err := strategyRc.NewExpressionEvaluator(ctx).EvaluateYamlNode(ctx, &job.Strategy.RawMatrix); err != nil { log.Errorf("Error while evaluating matrix: %v", err) @@ -122,6 +152,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { if m, err := job.GetMatrixes(); err != nil { log.Errorf("Error while get job's matrix: %v", err) } else { + log.Debugf("Job Matrices: %v", m) + log.Debugf("Runner Matrices: %v", runner.config.Matrix) matrixes = selectMatrixes(m, runner.config.Matrix) } log.Debugf("Final matrix after applying user inclusions '%v'", matrixes) @@ -152,14 +184,11 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { } pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...)) } - var ncpu int - info, err := container.GetHostInfo(ctx) - if err != nil { - log.Errorf("failed to obtain container engine info: %s", err) - ncpu = 1 // sane default? - } else { - ncpu = info.NCPU + ncpu := runtime.NumCPU() + if 1 > ncpu { + ncpu = 1 } + log.Debugf("Detected CPUs: %d", ncpu) return common.NewParallelExecutor(ncpu, pipeline...)(ctx) }) }