Rework from worker pool approach to buffered channel limiter approach

Should be better for relatively small amount of long-running tasks

Signed-off-by: Andrey Voronkov <voronkovaa@gmail.com>
This commit is contained in:
Andrey Voronkov 2023-02-14 22:21:38 +03:00
parent 218d060fad
commit 0ffef9d8f2

View file

@ -166,16 +166,38 @@ func (ts *TagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([
lookupErr := &atomicError{} lookupErr := &atomicError{}
allTagsCount := len(allTags)
inputChan := make(chan string) inputChan := make(chan string)
outputChan := make(chan string, len(allTags)) outputChan := make(chan string, allTagsCount)
workersWaitGroup := sync.WaitGroup{} waitGroup := sync.WaitGroup{}
workersWaitGroup.Add(ts.lookupConcurrencyFactor) waitGroup.Add(allTagsCount)
limiter := make(chan struct{}, ts.lookupConcurrencyFactor)
acquire := func() {
limiter <- struct{}{}
}
release := func() {
<-limiter
waitGroup.Done()
}
go func() {
defer func() {
waitGroup.Wait()
close(outputChan)
close(limiter)
}()
for tag := range inputChan {
acquire()
go func(tag string) {
defer release()
// No need to lookup further on lookupErr
if lookupErr.Load() != nil {
return
}
for i := 0; i < ts.lookupConcurrencyFactor; i++ {
go func() {
defer workersWaitGroup.Done()
for tag := range inputChan {
tagLinkPathSpec := manifestTagCurrentPathSpec{ tagLinkPathSpec := manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(), name: ts.repository.Named().Name(),
tag: tag, tag: tag,
@ -186,18 +208,20 @@ func (ts *TagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([
if err != nil { if err != nil {
switch err.(type) { switch err.(type) {
// PathNotFoundError shouldn't count as an error
case storagedriver.PathNotFoundError: case storagedriver.PathNotFoundError:
continue default:
lookupErr.Store(err)
} }
lookupErr.Store(err) return
} }
if tagDigest == desc.Digest { if tagDigest == desc.Digest {
outputChan <- tag outputChan <- tag
} }
} }(tag)
}() }
} }()
for _, tag := range allTags { for _, tag := range allTags {
if lookupErr.Load() != nil { if lookupErr.Load() != nil {
@ -211,10 +235,6 @@ func (ts *TagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([
} }
close(inputChan) close(inputChan)
workersWaitGroup.Wait()
close(outputChan)
if err := lookupErr.Load(); err != nil { if err := lookupErr.Load(); err != nil {
return nil, err return nil, err
} }