Attempt to use golang.org/x/sync/errgroup
Signed-off-by: Andrey Voronkov <voronkovaa@gmail.com>
This commit is contained in:
parent
a555c9cbbe
commit
4ff0c0e340
8 changed files with 213 additions and 127 deletions
1
go.mod
1
go.mod
|
@ -28,6 +28,7 @@ require (
|
|||
github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50
|
||||
golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88
|
||||
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c
|
||||
golang.org/x/sync v0.1.0
|
||||
google.golang.org/api v0.30.0
|
||||
google.golang.org/cloud v0.0.0-20151119220103-975617b05ea8
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15
|
||||
|
|
2
go.sum
2
go.sum
|
@ -373,6 +373,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
|
|||
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
|
|
@ -6,12 +6,12 @@ import (
|
|||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/distribution/distribution/v3"
|
||||
dcontext "github.com/distribution/distribution/v3/context"
|
||||
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var _ distribution.TagService = &TagStore{}
|
||||
|
@ -165,40 +165,15 @@ func (ts *TagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([
|
|||
return nil, err
|
||||
}
|
||||
|
||||
lookupErr := &atomicError{}
|
||||
outputChan := make(chan string)
|
||||
|
||||
allTagsCount := len(allTags)
|
||||
inputChan := make(chan string)
|
||||
outputChan := make(chan string, allTagsCount)
|
||||
|
||||
waitGroup := sync.WaitGroup{}
|
||||
|
||||
limiter := make(chan struct{}, ts.lookupConcurrencyFactor)
|
||||
acquire := func() {
|
||||
limiter <- struct{}{}
|
||||
waitGroup.Add(1)
|
||||
}
|
||||
release := func() {
|
||||
<-limiter
|
||||
waitGroup.Done()
|
||||
}
|
||||
group, ctx := errgroup.WithContext(ctx)
|
||||
group.SetLimit(ts.lookupConcurrencyFactor)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
waitGroup.Wait()
|
||||
close(outputChan)
|
||||
close(limiter)
|
||||
}()
|
||||
for tag := range inputChan {
|
||||
acquire()
|
||||
go func(tag string) {
|
||||
defer release()
|
||||
|
||||
// No need to lookup further on lookupErr
|
||||
if lookupErr.Load() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, tag := range allTags {
|
||||
tag := tag // https://go.dev/doc/faq#closures_and_goroutines
|
||||
group.Go(func() error {
|
||||
tagLinkPathSpec := manifestTagCurrentPathSpec{
|
||||
name: ts.repository.Named().Name(),
|
||||
tag: tag,
|
||||
|
@ -211,59 +186,33 @@ func (ts *TagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([
|
|||
switch err.(type) {
|
||||
// PathNotFoundError shouldn't count as an error
|
||||
case storagedriver.PathNotFoundError:
|
||||
default:
|
||||
lookupErr.Store(err)
|
||||
return nil
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
if tagDigest == desc.Digest {
|
||||
outputChan <- tag
|
||||
}
|
||||
}(tag)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
group.Wait()
|
||||
close(outputChan)
|
||||
}()
|
||||
|
||||
for _, tag := range allTags {
|
||||
if lookupErr.Load() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if err := pushTag(ctx, inputChan, tag); err != nil {
|
||||
lookupErr.Store(err)
|
||||
break
|
||||
}
|
||||
}
|
||||
close(inputChan)
|
||||
|
||||
if err := lookupErr.Load(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tags := make([]string, 0, len(outputChan))
|
||||
var tags []string
|
||||
for tag := range outputChan {
|
||||
tags = append(tags, tag)
|
||||
}
|
||||
|
||||
if err := group.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
func pushTag(ctx context.Context, ch chan string, tag string) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case ch <- tag:
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ts *TagStore) ManifestDigests(ctx context.Context, tag string) ([]digest.Digest, error) {
|
||||
tagLinkPath := func(name string, dgst digest.Digest) (string, error) {
|
||||
return pathFor(manifestTagIndexEntryLinkPathSpec{
|
||||
|
@ -297,20 +246,3 @@ func (ts *TagStore) ManifestDigests(ctx context.Context, tag string) ([]digest.D
|
|||
}
|
||||
return dgsts, nil
|
||||
}
|
||||
|
||||
type atomicError struct {
|
||||
mu sync.Mutex
|
||||
err error
|
||||
}
|
||||
|
||||
func (e *atomicError) Store(err error) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
e.err = err
|
||||
}
|
||||
|
||||
func (e *atomicError) Load() error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
return e.err
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ package storage
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
@ -13,7 +12,6 @@ import (
|
|||
"github.com/distribution/distribution/v3/manifest/schema2"
|
||||
"github.com/distribution/distribution/v3/reference"
|
||||
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
|
||||
"github.com/distribution/distribution/v3/registry/storage/driver/base"
|
||||
"github.com/distribution/distribution/v3/registry/storage/driver/inmemory"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
@ -28,51 +26,20 @@ type tagsTestEnv struct {
|
|||
}
|
||||
|
||||
type mockInMemory struct {
|
||||
base.Base
|
||||
driver *inmemory.Driver
|
||||
*inmemory.Driver
|
||||
GetContentError error
|
||||
}
|
||||
|
||||
var _ storagedriver.StorageDriver = &mockInMemory{}
|
||||
|
||||
func (m *mockInMemory) Name() string {
|
||||
return m.driver.Name()
|
||||
}
|
||||
func (m *mockInMemory) GetContent(ctx context.Context, path string) ([]byte, error) {
|
||||
if m.GetContentError != nil {
|
||||
return nil, m.GetContentError
|
||||
}
|
||||
return m.driver.GetContent(ctx, path)
|
||||
}
|
||||
func (m *mockInMemory) PutContent(ctx context.Context, path string, content []byte) error {
|
||||
return m.driver.PutContent(ctx, path, content)
|
||||
}
|
||||
func (m *mockInMemory) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||
return m.driver.Reader(ctx, path, offset)
|
||||
}
|
||||
func (m *mockInMemory) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
|
||||
return m.driver.Writer(ctx, path, append)
|
||||
}
|
||||
func (m *mockInMemory) List(ctx context.Context, path string) ([]string, error) {
|
||||
return m.driver.List(ctx, path)
|
||||
}
|
||||
func (m *mockInMemory) Move(ctx context.Context, sourcePath string, destPath string) error {
|
||||
return m.driver.Move(ctx, sourcePath, destPath)
|
||||
}
|
||||
func (m *mockInMemory) Delete(ctx context.Context, path string) error {
|
||||
return m.driver.Delete(ctx, path)
|
||||
}
|
||||
func (m *mockInMemory) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
|
||||
return m.driver.URLFor(ctx, path, options)
|
||||
}
|
||||
func (m *mockInMemory) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
||||
return m.driver.Walk(ctx, path, f)
|
||||
return m.Driver.GetContent(ctx, path)
|
||||
}
|
||||
|
||||
func testTagStore(t *testing.T) *tagsTestEnv {
|
||||
ctx := context.Background()
|
||||
d := inmemory.New()
|
||||
mockDriver := mockInMemory{driver: d, Base: d.Base}
|
||||
mockDriver := mockInMemory{inmemory.New(), nil}
|
||||
reg, err := NewRegistry(ctx, &mockDriver)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -171,20 +138,20 @@ func TestTagStoreUnTag(t *testing.T) {
|
|||
|
||||
func TestTagStoreAll(t *testing.T) {
|
||||
env := testTagStore(t)
|
||||
TagStore := env.ts
|
||||
tagStore := env.ts
|
||||
ctx := env.ctx
|
||||
|
||||
alpha := "abcdefghijklmnopqrstuvwxyz"
|
||||
for i := 0; i < len(alpha); i++ {
|
||||
tag := alpha[i]
|
||||
desc := distribution.Descriptor{Digest: "sha256:eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"}
|
||||
err := TagStore.Tag(ctx, string(tag), desc)
|
||||
err := tagStore.Tag(ctx, string(tag), desc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
all, err := TagStore.All(ctx)
|
||||
all, err := tagStore.All(ctx)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
@ -199,12 +166,12 @@ func TestTagStoreAll(t *testing.T) {
|
|||
}
|
||||
|
||||
removed := "a"
|
||||
err = TagStore.Untag(ctx, removed)
|
||||
err = tagStore.Untag(ctx, removed)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
all, err = TagStore.All(ctx)
|
||||
all, err = tagStore.All(ctx)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
|
27
vendor/golang.org/x/sync/LICENSE
generated
vendored
Normal file
27
vendor/golang.org/x/sync/LICENSE
generated
vendored
Normal file
|
@ -0,0 +1,27 @@
|
|||
Copyright (c) 2009 The Go Authors. All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are
|
||||
met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above
|
||||
copyright notice, this list of conditions and the following disclaimer
|
||||
in the documentation and/or other materials provided with the
|
||||
distribution.
|
||||
* Neither the name of Google Inc. nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
22
vendor/golang.org/x/sync/PATENTS
generated
vendored
Normal file
22
vendor/golang.org/x/sync/PATENTS
generated
vendored
Normal file
|
@ -0,0 +1,22 @@
|
|||
Additional IP Rights Grant (Patents)
|
||||
|
||||
"This implementation" means the copyrightable works distributed by
|
||||
Google as part of the Go project.
|
||||
|
||||
Google hereby grants to You a perpetual, worldwide, non-exclusive,
|
||||
no-charge, royalty-free, irrevocable (except as stated in this section)
|
||||
patent license to make, have made, use, offer to sell, sell, import,
|
||||
transfer and otherwise run, modify and propagate the contents of this
|
||||
implementation of Go, where such license applies only to those patent
|
||||
claims, both currently owned or controlled by Google and acquired in
|
||||
the future, licensable by Google that are necessarily infringed by this
|
||||
implementation of Go. This grant does not include claims that would be
|
||||
infringed only as a consequence of further modification of this
|
||||
implementation. If you or your agent or exclusive licensee institute or
|
||||
order or agree to the institution of patent litigation against any
|
||||
entity (including a cross-claim or counterclaim in a lawsuit) alleging
|
||||
that this implementation of Go or any code incorporated within this
|
||||
implementation of Go constitutes direct or contributory patent
|
||||
infringement, or inducement of patent infringement, then any patent
|
||||
rights granted to you under this License for this implementation of Go
|
||||
shall terminate as of the date such litigation is filed.
|
132
vendor/golang.org/x/sync/errgroup/errgroup.go
generated
vendored
Normal file
132
vendor/golang.org/x/sync/errgroup/errgroup.go
generated
vendored
Normal file
|
@ -0,0 +1,132 @@
|
|||
// Copyright 2016 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package errgroup provides synchronization, error propagation, and Context
|
||||
// cancelation for groups of goroutines working on subtasks of a common task.
|
||||
package errgroup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type token struct{}
|
||||
|
||||
// A Group is a collection of goroutines working on subtasks that are part of
|
||||
// the same overall task.
|
||||
//
|
||||
// A zero Group is valid, has no limit on the number of active goroutines,
|
||||
// and does not cancel on error.
|
||||
type Group struct {
|
||||
cancel func()
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
sem chan token
|
||||
|
||||
errOnce sync.Once
|
||||
err error
|
||||
}
|
||||
|
||||
func (g *Group) done() {
|
||||
if g.sem != nil {
|
||||
<-g.sem
|
||||
}
|
||||
g.wg.Done()
|
||||
}
|
||||
|
||||
// WithContext returns a new Group and an associated Context derived from ctx.
|
||||
//
|
||||
// The derived Context is canceled the first time a function passed to Go
|
||||
// returns a non-nil error or the first time Wait returns, whichever occurs
|
||||
// first.
|
||||
func WithContext(ctx context.Context) (*Group, context.Context) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &Group{cancel: cancel}, ctx
|
||||
}
|
||||
|
||||
// Wait blocks until all function calls from the Go method have returned, then
|
||||
// returns the first non-nil error (if any) from them.
|
||||
func (g *Group) Wait() error {
|
||||
g.wg.Wait()
|
||||
if g.cancel != nil {
|
||||
g.cancel()
|
||||
}
|
||||
return g.err
|
||||
}
|
||||
|
||||
// Go calls the given function in a new goroutine.
|
||||
// It blocks until the new goroutine can be added without the number of
|
||||
// active goroutines in the group exceeding the configured limit.
|
||||
//
|
||||
// The first call to return a non-nil error cancels the group's context, if the
|
||||
// group was created by calling WithContext. The error will be returned by Wait.
|
||||
func (g *Group) Go(f func() error) {
|
||||
if g.sem != nil {
|
||||
g.sem <- token{}
|
||||
}
|
||||
|
||||
g.wg.Add(1)
|
||||
go func() {
|
||||
defer g.done()
|
||||
|
||||
if err := f(); err != nil {
|
||||
g.errOnce.Do(func() {
|
||||
g.err = err
|
||||
if g.cancel != nil {
|
||||
g.cancel()
|
||||
}
|
||||
})
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// TryGo calls the given function in a new goroutine only if the number of
|
||||
// active goroutines in the group is currently below the configured limit.
|
||||
//
|
||||
// The return value reports whether the goroutine was started.
|
||||
func (g *Group) TryGo(f func() error) bool {
|
||||
if g.sem != nil {
|
||||
select {
|
||||
case g.sem <- token{}:
|
||||
// Note: this allows barging iff channels in general allow barging.
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
g.wg.Add(1)
|
||||
go func() {
|
||||
defer g.done()
|
||||
|
||||
if err := f(); err != nil {
|
||||
g.errOnce.Do(func() {
|
||||
g.err = err
|
||||
if g.cancel != nil {
|
||||
g.cancel()
|
||||
}
|
||||
})
|
||||
}
|
||||
}()
|
||||
return true
|
||||
}
|
||||
|
||||
// SetLimit limits the number of active goroutines in this group to at most n.
|
||||
// A negative value indicates no limit.
|
||||
//
|
||||
// Any subsequent call to the Go method will block until it can add an active
|
||||
// goroutine without exceeding the configured limit.
|
||||
//
|
||||
// The limit must not be modified while any goroutines in the group are active.
|
||||
func (g *Group) SetLimit(n int) {
|
||||
if n < 0 {
|
||||
g.sem = nil
|
||||
return
|
||||
}
|
||||
if len(g.sem) != 0 {
|
||||
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
|
||||
}
|
||||
g.sem = make(chan token, n)
|
||||
}
|
3
vendor/modules.txt
vendored
3
vendor/modules.txt
vendored
|
@ -313,6 +313,9 @@ golang.org/x/oauth2/google/internal/externalaccount
|
|||
golang.org/x/oauth2/internal
|
||||
golang.org/x/oauth2/jws
|
||||
golang.org/x/oauth2/jwt
|
||||
# golang.org/x/sync v0.1.0
|
||||
## explicit
|
||||
golang.org/x/sync/errgroup
|
||||
# golang.org/x/sys v0.5.0
|
||||
## explicit; go 1.17
|
||||
golang.org/x/sys/internal/unsafeheader
|
||||
|
|
Loading…
Reference in a new issue