From 4ff0c0e34029918c38cf086c9675316b5dbe2b0d Mon Sep 17 00:00:00 2001 From: Andrey Voronkov Date: Tue, 2 May 2023 10:26:30 +0300 Subject: [PATCH] Attempt to use golang.org/x/sync/errgroup Signed-off-by: Andrey Voronkov --- go.mod | 1 + go.sum | 2 + registry/storage/tagstore.go | 104 +++----------- registry/storage/tagstore_test.go | 49 ++----- vendor/golang.org/x/sync/LICENSE | 27 ++++ vendor/golang.org/x/sync/PATENTS | 22 +++ vendor/golang.org/x/sync/errgroup/errgroup.go | 132 ++++++++++++++++++ vendor/modules.txt | 3 + 8 files changed, 213 insertions(+), 127 deletions(-) create mode 100644 vendor/golang.org/x/sync/LICENSE create mode 100644 vendor/golang.org/x/sync/PATENTS create mode 100644 vendor/golang.org/x/sync/errgroup/errgroup.go diff --git a/go.mod b/go.mod index 9e2b8c4f6..cea0b3998 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50 golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88 golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c + golang.org/x/sync v0.1.0 google.golang.org/api v0.30.0 google.golang.org/cloud v0.0.0-20151119220103-975617b05ea8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 diff --git a/go.sum b/go.sum index b642def30..0c7f8a5eb 100644 --- a/go.sum +++ b/go.sum @@ -373,6 +373,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/registry/storage/tagstore.go b/registry/storage/tagstore.go index 7c44f54e5..c561a19d4 100644 --- a/registry/storage/tagstore.go +++ b/registry/storage/tagstore.go @@ -6,12 +6,12 @@ import ( "path" "sort" "strconv" - "sync" "github.com/distribution/distribution/v3" dcontext "github.com/distribution/distribution/v3/context" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" "github.com/opencontainers/go-digest" + "golang.org/x/sync/errgroup" ) var _ distribution.TagService = &TagStore{} @@ -165,40 +165,15 @@ func (ts *TagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([ return nil, err } - lookupErr := &atomicError{} + outputChan := make(chan string) - allTagsCount := len(allTags) - inputChan := make(chan string) - outputChan := make(chan string, allTagsCount) - - waitGroup := sync.WaitGroup{} - - limiter := make(chan struct{}, ts.lookupConcurrencyFactor) - acquire := func() { - limiter <- struct{}{} - waitGroup.Add(1) - } - release := func() { - <-limiter - waitGroup.Done() - } + group, ctx := errgroup.WithContext(ctx) + group.SetLimit(ts.lookupConcurrencyFactor) go func() { - defer func() { - waitGroup.Wait() - close(outputChan) - close(limiter) - }() - for tag := range inputChan { - acquire() - go func(tag string) { - defer release() - - // No need to lookup further on lookupErr - if lookupErr.Load() != nil { - return - } - + for _, tag := range allTags { + tag := tag // https://go.dev/doc/faq#closures_and_goroutines + group.Go(func() error { tagLinkPathSpec := manifestTagCurrentPathSpec{ name: ts.repository.Named().Name(), tag: tag, @@ -211,59 +186,33 @@ func (ts *TagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([ switch err.(type) { // PathNotFoundError shouldn't count as an error case storagedriver.PathNotFoundError: - default: - lookupErr.Store(err) + return nil } - return + return err } if tagDigest == desc.Digest { outputChan <- tag } - }(tag) + return nil + }) } + group.Wait() + close(outputChan) }() - for _, tag := range allTags { - if lookupErr.Load() != nil { - break - } - - if err := pushTag(ctx, inputChan, tag); err != nil { - lookupErr.Store(err) - break - } - } - close(inputChan) - - if err := lookupErr.Load(); err != nil { - return nil, err - } - - tags := make([]string, 0, len(outputChan)) + var tags []string for tag := range outputChan { tags = append(tags, tag) } + if err := group.Wait(); err != nil { + return nil, err + } + return tags, nil } -func pushTag(ctx context.Context, ch chan string, tag string) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - select { - case <-ctx.Done(): - return ctx.Err() - case ch <- tag: - } - - return nil -} - func (ts *TagStore) ManifestDigests(ctx context.Context, tag string) ([]digest.Digest, error) { tagLinkPath := func(name string, dgst digest.Digest) (string, error) { return pathFor(manifestTagIndexEntryLinkPathSpec{ @@ -297,20 +246,3 @@ func (ts *TagStore) ManifestDigests(ctx context.Context, tag string) ([]digest.D } return dgsts, nil } - -type atomicError struct { - mu sync.Mutex - err error -} - -func (e *atomicError) Store(err error) { - e.mu.Lock() - defer e.mu.Unlock() - e.err = err -} - -func (e *atomicError) Load() error { - e.mu.Lock() - defer e.mu.Unlock() - return e.err -} diff --git a/registry/storage/tagstore_test.go b/registry/storage/tagstore_test.go index c3adece9f..3081ec8d8 100644 --- a/registry/storage/tagstore_test.go +++ b/registry/storage/tagstore_test.go @@ -3,7 +3,6 @@ package storage import ( "context" "errors" - "io" "reflect" "strconv" "testing" @@ -13,7 +12,6 @@ import ( "github.com/distribution/distribution/v3/manifest/schema2" "github.com/distribution/distribution/v3/reference" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" - "github.com/distribution/distribution/v3/registry/storage/driver/base" "github.com/distribution/distribution/v3/registry/storage/driver/inmemory" digest "github.com/opencontainers/go-digest" ) @@ -28,51 +26,20 @@ type tagsTestEnv struct { } type mockInMemory struct { - base.Base - driver *inmemory.Driver + *inmemory.Driver GetContentError error } -var _ storagedriver.StorageDriver = &mockInMemory{} - -func (m *mockInMemory) Name() string { - return m.driver.Name() -} func (m *mockInMemory) GetContent(ctx context.Context, path string) ([]byte, error) { if m.GetContentError != nil { return nil, m.GetContentError } - return m.driver.GetContent(ctx, path) -} -func (m *mockInMemory) PutContent(ctx context.Context, path string, content []byte) error { - return m.driver.PutContent(ctx, path, content) -} -func (m *mockInMemory) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { - return m.driver.Reader(ctx, path, offset) -} -func (m *mockInMemory) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { - return m.driver.Writer(ctx, path, append) -} -func (m *mockInMemory) List(ctx context.Context, path string) ([]string, error) { - return m.driver.List(ctx, path) -} -func (m *mockInMemory) Move(ctx context.Context, sourcePath string, destPath string) error { - return m.driver.Move(ctx, sourcePath, destPath) -} -func (m *mockInMemory) Delete(ctx context.Context, path string) error { - return m.driver.Delete(ctx, path) -} -func (m *mockInMemory) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { - return m.driver.URLFor(ctx, path, options) -} -func (m *mockInMemory) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { - return m.driver.Walk(ctx, path, f) + return m.Driver.GetContent(ctx, path) } func testTagStore(t *testing.T) *tagsTestEnv { ctx := context.Background() - d := inmemory.New() - mockDriver := mockInMemory{driver: d, Base: d.Base} + mockDriver := mockInMemory{inmemory.New(), nil} reg, err := NewRegistry(ctx, &mockDriver) if err != nil { t.Fatal(err) @@ -171,20 +138,20 @@ func TestTagStoreUnTag(t *testing.T) { func TestTagStoreAll(t *testing.T) { env := testTagStore(t) - TagStore := env.ts + tagStore := env.ts ctx := env.ctx alpha := "abcdefghijklmnopqrstuvwxyz" for i := 0; i < len(alpha); i++ { tag := alpha[i] desc := distribution.Descriptor{Digest: "sha256:eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"} - err := TagStore.Tag(ctx, string(tag), desc) + err := tagStore.Tag(ctx, string(tag), desc) if err != nil { t.Error(err) } } - all, err := TagStore.All(ctx) + all, err := tagStore.All(ctx) if err != nil { t.Error(err) } @@ -199,12 +166,12 @@ func TestTagStoreAll(t *testing.T) { } removed := "a" - err = TagStore.Untag(ctx, removed) + err = tagStore.Untag(ctx, removed) if err != nil { t.Error(err) } - all, err = TagStore.All(ctx) + all, err = tagStore.All(ctx) if err != nil { t.Error(err) } diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 000000000..6a66aea5e --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 000000000..733099041 --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 000000000..cbee7a4e2 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,132 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +package errgroup + +import ( + "context" + "fmt" + "sync" +) + +type token struct{} + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid, has no limit on the number of active goroutines, +// and does not cancel on error. +type Group struct { + cancel func() + + wg sync.WaitGroup + + sem chan token + + errOnce sync.Once + err error +} + +func (g *Group) done() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancel(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel() + } + return g.err +} + +// Go calls the given function in a new goroutine. +// It blocks until the new goroutine can be added without the number of +// active goroutines in the group exceeding the configured limit. +// +// The first call to return a non-nil error cancels the group's context, if the +// group was created by calling WithContext. The error will be returned by Wait. +func (g *Group) Go(f func() error) { + if g.sem != nil { + g.sem <- token{} + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() +} + +// TryGo calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGo(f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + default: + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() + return true +} + +// SetLimit limits the number of active goroutines in this group to at most n. +// A negative value indicates no limit. +// +// Any subsequent call to the Go method will block until it can add an active +// goroutine without exceeding the configured limit. +// +// The limit must not be modified while any goroutines in the group are active. +func (g *Group) SetLimit(n int) { + if n < 0 { + g.sem = nil + return + } + if len(g.sem) != 0 { + panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + g.sem = make(chan token, n) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 9086d87a3..c35aac124 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -313,6 +313,9 @@ golang.org/x/oauth2/google/internal/externalaccount golang.org/x/oauth2/internal golang.org/x/oauth2/jws golang.org/x/oauth2/jwt +# golang.org/x/sync v0.1.0 +## explicit +golang.org/x/sync/errgroup # golang.org/x/sys v0.5.0 ## explicit; go 1.17 golang.org/x/sys/internal/unsafeheader