Allow cancelling parallel workers on files

This commit is contained in:
Alexander Neumann 2015-07-04 18:20:37 +02:00
parent 0b531210eb
commit 14d252dfba
2 changed files with 9 additions and 10 deletions

View file

@ -16,10 +16,14 @@ func closeIfOpen(ch chan struct{}) {
} }
} }
// ParallelWorkFunc gets one file ID to work on. If an error is returned,
// processing stops. If done is closed, the function should return.
type ParallelWorkFunc func(id string, done <-chan struct{}) error
// FilesInParallel runs n workers of f in parallel, on the IDs that // FilesInParallel runs n workers of f in parallel, on the IDs that
// repo.List(t) yield. If f returns an error, the process is aborted and the // repo.List(t) yield. If f returns an error, the process is aborted and the
// first error is returned. // first error is returned.
func FilesInParallel(repo backend.Lister, t backend.Type, n uint, f func(backend.ID) error) error { func FilesInParallel(repo backend.Lister, t backend.Type, n uint, f ParallelWorkFunc) error {
done := make(chan struct{}) done := make(chan struct{})
defer closeIfOpen(done) defer closeIfOpen(done)
@ -36,17 +40,12 @@ func FilesInParallel(repo backend.Lister, t backend.Type, n uint, f func(backend
for { for {
select { select {
case item, ok := <-ch: case id, ok := <-ch:
if !ok { if !ok {
return return
} }
id, err := backend.ParseID(item) err := f(id, done)
if err == nil {
err = f(id)
}
if err != nil { if err != nil {
closeIfOpen(done) closeIfOpen(done)
errors <- err errors <- err

View file

@ -93,7 +93,7 @@ func (tests testIDs) List(t backend.Type, done <-chan struct{}) <-chan string {
} }
func TestFilesInParallel(t *testing.T) { func TestFilesInParallel(t *testing.T) {
f := func(id backend.ID) error { f := func(id string, done <-chan struct{}) error {
time.Sleep(1 * time.Millisecond) time.Sleep(1 * time.Millisecond)
return nil return nil
} }
@ -108,7 +108,7 @@ var errTest = errors.New("test error")
func TestFilesInParallelWithError(t *testing.T) { func TestFilesInParallelWithError(t *testing.T) {
f := func(id backend.ID) error { f := func(id string, done <-chan struct{}) error {
time.Sleep(1 * time.Millisecond) time.Sleep(1 * time.Millisecond)
if rand.Float32() < 0.01 { if rand.Float32() < 0.01 {