diff --git a/src/restic/worker/pool.go b/src/restic/worker/pool.go index d2afb3257..d2331f587 100644 --- a/src/restic/worker/pool.go +++ b/src/restic/worker/pool.go @@ -1,10 +1,5 @@ package worker -import ( - "fmt" - "sync" -) - // Job is one unit of work. It is given to a Func, and the returned result and // error are stored in Result and Error. type Job struct { @@ -20,33 +15,53 @@ type Func func(job Job, done <-chan struct{}) (result interface{}, err error) type Pool struct { f Func done chan struct{} - wg *sync.WaitGroup jobCh <-chan Job resCh chan<- Job + + numWorkers int + workersExit chan struct{} + allWorkersDone chan struct{} } // New returns a new worker pool with n goroutines, each running the function // f. The workers are started immediately. func New(n int, f Func, jobChan <-chan Job, resultChan chan<- Job) *Pool { p := &Pool{ - f: f, - done: make(chan struct{}), - wg: &sync.WaitGroup{}, - jobCh: jobChan, - resCh: resultChan, + f: f, + done: make(chan struct{}), + workersExit: make(chan struct{}), + allWorkersDone: make(chan struct{}), + numWorkers: n, + jobCh: jobChan, + resCh: resultChan, } for i := 0; i < n; i++ { - p.wg.Add(1) go p.runWorker(i) } + go p.waitForExit() + return p } +// waitForExit receives from p.workersExit until all worker functions have +// exited, then closes the result channel. +func (p *Pool) waitForExit() { + n := p.numWorkers + for n > 0 { + <-p.workersExit + n-- + } + close(p.allWorkersDone) + close(p.resCh) +} + // runWorker runs a worker function. func (p *Pool) runWorker(numWorker int) { - defer p.wg.Done() + defer func() { + p.workersExit <- struct{}{} + }() var ( // enable the input channel when starting up a new goroutine @@ -65,7 +80,6 @@ func (p *Pool) runWorker(numWorker int) { case job, ok = <-inCh: if !ok { - fmt.Printf("in channel closed, worker exiting\n") return } @@ -88,6 +102,5 @@ func (p *Pool) Cancel() { // Wait waits for all worker goroutines to terminate, afterwards the output // channel is closed. func (p *Pool) Wait() { - p.wg.Wait() - close(p.resCh) + <-p.allWorkersDone }