add bounded concurrency for tag lookup and untag (#4329)

This commit is contained in:
Milos Gajdos 2024-04-26 19:59:59 +01:00 committed by GitHub
commit e0795fcfe3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 316 additions and 25 deletions

View file

@ -12,6 +12,8 @@ storage:
maintenance: maintenance:
uploadpurging: uploadpurging:
enabled: false enabled: false
tag:
concurrencylimit: 8
http: http:
addr: :5000 addr: :5000
secret: asecretforlocaldevelopment secret: asecretforlocaldevelopment

View file

@ -14,6 +14,8 @@ storage:
maintenance: maintenance:
uploadpurging: uploadpurging:
enabled: false enabled: false
tag:
concurrencylimit: 8
http: http:
addr: :5000 addr: :5000
debug: debug:

View file

@ -7,6 +7,8 @@ storage:
blobdescriptor: inmemory blobdescriptor: inmemory
filesystem: filesystem:
rootdirectory: /var/lib/registry rootdirectory: /var/lib/registry
tag:
concurrencylimit: 8
http: http:
addr: :5000 addr: :5000
headers: headers:

View file

@ -441,6 +441,8 @@ func (storage Storage) Type() string {
// allow configuration of delete // allow configuration of delete
case "redirect": case "redirect":
// allow configuration of redirect // allow configuration of redirect
case "tag":
// allow configuration of tag
default: default:
storageType = append(storageType, k) storageType = append(storageType, k)
} }
@ -454,6 +456,19 @@ func (storage Storage) Type() string {
return "" return ""
} }
// TagParameters returns the Parameters map for a Storage tag configuration
func (storage Storage) TagParameters() Parameters {
return storage["tag"]
}
// setTagParameter changes the parameter at the provided key to the new value
func (storage Storage) setTagParameter(key string, value interface{}) {
if _, ok := storage["tag"]; !ok {
storage["tag"] = make(Parameters)
}
storage["tag"][key] = value
}
// Parameters returns the Parameters map for a Storage configuration // Parameters returns the Parameters map for a Storage configuration
func (storage Storage) Parameters() Parameters { func (storage Storage) Parameters() Parameters {
return storage[storage.Type()] return storage[storage.Type()]
@ -482,6 +497,8 @@ func (storage *Storage) UnmarshalYAML(unmarshal func(interface{}) error) error {
// allow configuration of delete // allow configuration of delete
case "redirect": case "redirect":
// allow configuration of redirect // allow configuration of redirect
case "tag":
// allow configuration of tag
default: default:
types = append(types, k) types = append(types, k)
} }

View file

@ -39,6 +39,9 @@ var configStruct = Configuration{
"url1": "https://foo.example.com", "url1": "https://foo.example.com",
"path1": "/some-path", "path1": "/some-path",
}, },
"tag": Parameters{
"concurrencylimit": 10,
},
}, },
Auth: Auth{ Auth: Auth{
"silly": Parameters{ "silly": Parameters{
@ -167,6 +170,8 @@ storage:
int1: 42 int1: 42
url1: "https://foo.example.com" url1: "https://foo.example.com"
path1: "/some-path" path1: "/some-path"
tag:
concurrencylimit: 10
auth: auth:
silly: silly:
realm: silly realm: silly
@ -542,6 +547,9 @@ func copyConfig(config Configuration) *Configuration {
for k, v := range config.Storage.Parameters() { for k, v := range config.Storage.Parameters() {
configCopy.Storage.setParameter(k, v) configCopy.Storage.setParameter(k, v)
} }
for k, v := range config.Storage.TagParameters() {
configCopy.Storage.setTagParameter(k, v)
}
configCopy.Auth = Auth{config.Auth.Type(): Parameters{}} configCopy.Auth = Auth{config.Auth.Type(): Parameters{}}
for k, v := range config.Auth.Parameters() { for k, v := range config.Auth.Parameters() {

View file

@ -141,6 +141,8 @@ storage:
usedualstack: false usedualstack: false
loglevel: debug loglevel: debug
inmemory: # This driver takes no parameters inmemory: # This driver takes no parameters
tag:
concurrencylimit: 8
delete: delete:
enabled: false enabled: false
redirect: redirect:
@ -521,6 +523,26 @@ parameter sets a limit on the number of descriptors to store in the cache.
The default value is 10000. If this parameter is set to 0, the cache is allowed The default value is 10000. If this parameter is set to 0, the cache is allowed
to grow with no size limit. to grow with no size limit.
### `tag`
The `tag` subsection provides configuration to set concurrency limit for tag lookup.
When user calls into the registry to delete the manifest, which in turn then does a
lookup for all tags that reference the deleted manifest. To find the tag references,
the registry will iterate every tag in the repository and read it's link file to check
if it matches the deleted manifest (i.e. to see if uses the same sha256 digest).
So, the more tags in repository, the worse the performance will be (as there will
be more S3 API calls occurring for the tag directory lookups and tag file reads if
using S3 storage driver).
Therefore, add a single flag `concurrencylimit` to set concurrency limit to optimize tag
lookup performance under the `tag` section. When a value is not provided or equal to 0,
`GOMAXPROCS` will be used.
```yaml
tag:
concurrencylimit: 8
```
### `redirect` ### `redirect`
The `redirect` subsection provides configuration for managing redirects from The `redirect` subsection provides configuration for managing redirects from

2
go.mod
View file

@ -88,7 +88,7 @@ require (
go.opentelemetry.io/otel/metric v1.21.0 // indirect go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect
golang.org/x/sync v0.3.0 // indirect golang.org/x/sync v0.3.0
golang.org/x/sys v0.18.0 // indirect golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect

View file

@ -188,6 +188,21 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App {
} }
} }
// configure tag lookup concurrency limit
if p := config.Storage.TagParameters(); p != nil {
l, ok := p["concurrencylimit"]
if ok {
limit, ok := l.(int)
if !ok {
panic("tag lookup concurrency limit config key must have a integer value")
}
if limit < 0 {
panic("tag lookup concurrency limit should be a non-negative integer value")
}
options = append(options, storage.TagLookupConcurrencyLimit(limit))
}
}
// configure redirects // configure redirects
var redirectDisabled bool var redirectDisabled bool
if redirectConfig, ok := config.Storage["redirect"]; ok { if redirectConfig, ok := config.Storage["redirect"]; ok {

View file

@ -6,6 +6,7 @@ import (
"mime" "mime"
"net/http" "net/http"
"strings" "strings"
"sync"
"github.com/distribution/distribution/v3" "github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext" "github.com/distribution/distribution/v3/internal/dcontext"
@ -13,11 +14,13 @@ import (
"github.com/distribution/distribution/v3/manifest/ocischema" "github.com/distribution/distribution/v3/manifest/ocischema"
"github.com/distribution/distribution/v3/manifest/schema2" "github.com/distribution/distribution/v3/manifest/schema2"
"github.com/distribution/distribution/v3/registry/api/errcode" "github.com/distribution/distribution/v3/registry/api/errcode"
"github.com/distribution/distribution/v3/registry/storage"
"github.com/distribution/distribution/v3/registry/storage/driver" "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/reference" "github.com/distribution/reference"
"github.com/gorilla/handlers" "github.com/gorilla/handlers"
"github.com/opencontainers/go-digest" "github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1" v1 "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/errgroup"
) )
const ( const (
@ -481,12 +484,26 @@ func (imh *manifestHandler) DeleteManifest(w http.ResponseWriter, r *http.Reques
return return
} }
var (
errs []error
mu sync.Mutex
)
g := errgroup.Group{}
g.SetLimit(storage.DefaultConcurrencyLimit)
for _, tag := range referencedTags { for _, tag := range referencedTags {
if err := tagService.Untag(imh, tag); err != nil { tag := tag
imh.Errors = append(imh.Errors, err)
return g.Go(func() error {
} if err := tagService.Untag(imh, tag); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
return nil
})
} }
_ = g.Wait() // imh will record all errors, so ignore the error of Wait()
imh.Errors = errs
w.WriteHeader(http.StatusAccepted) w.WriteHeader(http.StatusAccepted)
} }

View file

@ -3,6 +3,7 @@ package storage
import ( import (
"context" "context"
"regexp" "regexp"
"runtime"
"github.com/distribution/distribution/v3" "github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/registry/storage/cache" "github.com/distribution/distribution/v3/registry/storage/cache"
@ -10,6 +11,10 @@ import (
"github.com/distribution/reference" "github.com/distribution/reference"
) )
var (
DefaultConcurrencyLimit = runtime.GOMAXPROCS(0)
)
// registry is the top-level implementation of Registry for use in the storage // registry is the top-level implementation of Registry for use in the storage
// package. All instances should descend from this object. // package. All instances should descend from this object.
type registry struct { type registry struct {
@ -18,6 +23,7 @@ type registry struct {
statter *blobStatter // global statter service. statter *blobStatter // global statter service.
blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider
deleteEnabled bool deleteEnabled bool
tagLookupConcurrencyLimit int
resumableDigestEnabled bool resumableDigestEnabled bool
blobDescriptorServiceFactory distribution.BlobDescriptorServiceFactory blobDescriptorServiceFactory distribution.BlobDescriptorServiceFactory
manifestURLs manifestURLs manifestURLs manifestURLs
@ -40,6 +46,13 @@ func EnableRedirect(registry *registry) error {
return nil return nil
} }
func TagLookupConcurrencyLimit(concurrencyLimit int) RegistryOption {
return func(registry *registry) error {
registry.tagLookupConcurrencyLimit = concurrencyLimit
return nil
}
}
// EnableDelete is a functional option for NewRegistry. It enables deletion on // EnableDelete is a functional option for NewRegistry. It enables deletion on
// the registry. // the registry.
func EnableDelete(registry *registry) error { func EnableDelete(registry *registry) error {
@ -184,9 +197,14 @@ func (repo *repository) Named() reference.Named {
} }
func (repo *repository) Tags(ctx context.Context) distribution.TagService { func (repo *repository) Tags(ctx context.Context) distribution.TagService {
limit := DefaultConcurrencyLimit
if repo.tagLookupConcurrencyLimit > 0 {
limit = repo.tagLookupConcurrencyLimit
}
tags := &tagStore{ tags := &tagStore{
repository: repo, repository: repo,
blobStore: repo.registry.blobStore, blobStore: repo.registry.blobStore,
concurrencyLimit: limit,
} }
return tags return tags

View file

@ -4,10 +4,13 @@ import (
"context" "context"
"path" "path"
"sort" "sort"
"sync"
"github.com/opencontainers/go-digest"
"golang.org/x/sync/errgroup"
"github.com/distribution/distribution/v3" "github.com/distribution/distribution/v3"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/opencontainers/go-digest"
) )
var _ distribution.TagService = &tagStore{} var _ distribution.TagService = &tagStore{}
@ -18,8 +21,9 @@ var _ distribution.TagService = &tagStore{}
// which only makes use of the Digest field of the returned distribution.Descriptor // which only makes use of the Digest field of the returned distribution.Descriptor
// but does not enable full roundtripping of Descriptor objects // but does not enable full roundtripping of Descriptor objects
type tagStore struct { type tagStore struct {
repository *repository repository *repository
blobStore *blobStore blobStore *blobStore
concurrencyLimit int
} }
// All returns all tags // All returns all tags
@ -145,26 +149,48 @@ func (ts *tagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([
return nil, err return nil, err
} }
var tags []string g, ctx := errgroup.WithContext(ctx)
g.SetLimit(ts.concurrencyLimit)
var (
tags []string
mu sync.Mutex
)
for _, tag := range allTags { for _, tag := range allTags {
tagLinkPathSpec := manifestTagCurrentPathSpec{ if ctx.Err() != nil {
name: ts.repository.Named().Name(), break
tag: tag,
} }
tag := tag
tagLinkPath, _ := pathFor(tagLinkPathSpec) g.Go(func() error {
tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath) tagLinkPathSpec := manifestTagCurrentPathSpec{
if err != nil { name: ts.repository.Named().Name(),
switch err.(type) { tag: tag,
case storagedriver.PathNotFoundError:
continue
} }
return nil, err
}
if tagDigest == desc.Digest { tagLinkPath, _ := pathFor(tagLinkPathSpec)
tags = append(tags, tag) tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath)
} if err != nil {
switch err.(type) {
case storagedriver.PathNotFoundError:
return nil
}
return err
}
if tagDigest == desc.Digest {
mu.Lock()
tags = append(tags, tag)
mu.Unlock()
}
return nil
})
}
err = g.Wait()
if err != nil {
return nil, err
} }
return tags, nil return tags, nil

132
vendor/golang.org/x/sync/errgroup/errgroup.go generated vendored Normal file
View file

@ -0,0 +1,132 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
package errgroup
import (
"context"
"fmt"
"sync"
)
type token struct{}
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
cancel func(error)
wg sync.WaitGroup
sem chan token
errOnce sync.Once
err error
}
func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := withCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel(g.err)
}
return g.err
}
// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
}
// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}:
// Note: this allows barging iff channels in general allow barging.
default:
return false
}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
return true
}
// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}

14
vendor/golang.org/x/sync/errgroup/go120.go generated vendored Normal file
View file

@ -0,0 +1,14 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.20
// +build go1.20
package errgroup
import "context"
func withCancelCause(parent context.Context) (context.Context, func(error)) {
return context.WithCancelCause(parent)
}

15
vendor/golang.org/x/sync/errgroup/pre_go120.go generated vendored Normal file
View file

@ -0,0 +1,15 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.20
// +build !go1.20
package errgroup
import "context"
func withCancelCause(parent context.Context) (context.Context, func(error)) {
ctx, cancel := context.WithCancel(parent)
return ctx, func(error) { cancel() }
}

1
vendor/modules.txt vendored
View file

@ -504,6 +504,7 @@ golang.org/x/oauth2/jws
golang.org/x/oauth2/jwt golang.org/x/oauth2/jwt
# golang.org/x/sync v0.3.0 # golang.org/x/sync v0.3.0
## explicit; go 1.17 ## explicit; go 1.17
golang.org/x/sync/errgroup
golang.org/x/sync/semaphore golang.org/x/sync/semaphore
# golang.org/x/sys v0.18.0 # golang.org/x/sys v0.18.0
## explicit; go 1.18 ## explicit; go 1.18