feat: run jobs in parallel (#1003)

* feat: run jobs in parallel

This changes fixes and restructures the parallel execution of jobs.
The previous changes limiting the parallel execution did break this
and allowed only one job in parallel.

While we run #CPU jobs in parallel now, the jobs added per job-matrix
add to this. So we might over-commit to the capacity, but at least
it is limited.

* fix: correctly build job pipeline

The job pipeline should just append all required pipeline steps.
The parallelism will be handled by the ParallelExecutor and we
shouldn't handle it during building the pipelines.

Also this adds a test, that the ParallelExecutor does run
a limited amount of parallel goroutines.

* test: correct test implementation

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
Markus Wolf 2022-02-25 19:47:16 +01:00 committed by GitHub
parent c24cfc72f4
commit 18792f9620
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 31 deletions

View file

@ -91,27 +91,33 @@ func NewErrorExecutor(err error) Executor {
} }
// NewParallelExecutor creates a new executor from a parallel of other executors // NewParallelExecutor creates a new executor from a parallel of other executors
func NewParallelExecutor(executors ...Executor) Executor { func NewParallelExecutor(parallel int, executors ...Executor) Executor {
return func(ctx context.Context) error { return func(ctx context.Context) error {
errChan := make(chan error) work := make(chan Executor, len(executors))
errs := make(chan error, len(executors))
for _, executor := range executors { for i := 0; i < parallel; i++ {
e := executor go func(work <-chan Executor, errs chan<- error) {
go func() { for executor := range work {
err := e.ChannelError(errChan)(ctx) errs <- executor(ctx)
if err != nil {
log.Fatal(err)
} }
}() }(work, errs)
} }
for i := 0; i < len(executors); i++ {
work <- executors[i]
}
close(work)
// Executor waits all executors to cleanup these resources. // Executor waits all executors to cleanup these resources.
var firstErr error var firstErr error
for i := 0; i < len(executors); i++ { for i := 0; i < len(executors); i++ {
if err := <-errChan; err != nil && firstErr == nil { err := <-errs
if firstErr == nil {
firstErr = err firstErr = err
} }
} }
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
return err return err
} }
@ -119,14 +125,6 @@ func NewParallelExecutor(executors ...Executor) Executor {
} }
} }
// ChannelError sends error to errChan rather than returning error
func (e Executor) ChannelError(errChan chan error) Executor {
return func(ctx context.Context) error {
errChan <- e(ctx)
return nil
}
}
// Then runs another executor if this executor succeeds // Then runs another executor if this executor succeeds
func (e Executor) Then(then Executor) Executor { func (e Executor) Then(then Executor) Executor {
return func(ctx context.Context) error { return func(ctx context.Context) error {

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -79,14 +80,25 @@ func TestNewParallelExecutor(t *testing.T) {
ctx := context.Background() ctx := context.Background()
count := 0 count := 0
activeCount := 0
maxCount := 0
emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error { emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
count++ count++
activeCount++
if activeCount > maxCount {
maxCount = activeCount
}
time.Sleep(2 * time.Second)
activeCount--
return nil return nil
}) })
err := NewParallelExecutor(emptyWorkflow, emptyWorkflow)(ctx) err := NewParallelExecutor(2, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx)
assert.Equal(2, count)
assert.Equal(3, count, "should run all 3 executors")
assert.Equal(2, maxCount, "should run at most 2 executors in parallel")
assert.Nil(err) assert.Nil(err)
} }
@ -101,7 +113,7 @@ func TestNewParallelExecutorFailed(t *testing.T) {
count++ count++
return fmt.Errorf("fake error") return fmt.Errorf("fake error")
}) })
err := NewParallelExecutor(errorWorkflow)(ctx) err := NewParallelExecutor(1, errorWorkflow)(ctx)
assert.Equal(1, count) assert.Equal(1, count)
assert.ErrorIs(context.Canceled, err) assert.ErrorIs(context.Canceled, err)
} }
@ -123,7 +135,7 @@ func TestNewParallelExecutorCanceled(t *testing.T) {
count++ count++
return errExpected return errExpected
}) })
err := NewParallelExecutor(errorWorkflow, successWorkflow, successWorkflow)(ctx) err := NewParallelExecutor(3, errorWorkflow, successWorkflow, successWorkflow)(ctx)
assert.Equal(3, count) assert.Equal(3, count)
assert.Error(errExpected, err) assert.Error(errExpected, err)
} }

View file

@ -121,8 +121,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
stage := plan.Stages[i] stage := plan.Stages[i]
stagePipeline = append(stagePipeline, func(ctx context.Context) error { stagePipeline = append(stagePipeline, func(ctx context.Context) error {
pipeline := make([]common.Executor, 0) pipeline := make([]common.Executor, 0)
stageExecutor := make([]common.Executor, 0)
for r, run := range stage.Runs { for r, run := range stage.Runs {
stageExecutor := make([]common.Executor, 0)
job := run.Job() job := run.Job()
if job.Strategy != nil { if job.Strategy != nil {
strategyRc := runner.newRunContext(run, nil) strategyRc := runner.newRunContext(run, nil)
@ -140,7 +140,6 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
maxParallel = len(matrixes) maxParallel = len(matrixes)
} }
b := 0
for i, matrix := range matrixes { for i, matrix := range matrixes {
rc := runner.newRunContext(run, matrix) rc := runner.newRunContext(run, matrix)
rc.JobName = rc.Name rc.JobName = rc.Name
@ -167,15 +166,10 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
return nil return nil
})(common.WithJobErrorContainer(WithJobLogger(ctx, jobName, rc.Config.Secrets, rc.Config.InsecureSecrets))) })(common.WithJobErrorContainer(WithJobLogger(ctx, jobName, rc.Config.Secrets, rc.Config.InsecureSecrets)))
}) })
b++
if b == maxParallel {
pipeline = append(pipeline, common.NewParallelExecutor(stageExecutor...))
stageExecutor = make([]common.Executor, 0)
b = 0
}
} }
pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...))
} }
return common.NewPipelineExecutor(pipeline...)(ctx) return common.NewParallelExecutor(runtime.NumCPU(), pipeline...)(ctx)
}) })
} }