forked from TrueCloudLab/distribution
add concurrency limits for tag lookup and untag
Harbor is using the distribution for it's (harbor-registry) registry component. The harbor GC will call into the registry to delete the manifest, which in turn then does a lookup for all tags that reference the deleted manifest. To find the tag references, the registry will iterate every tag in the repository and read it's link file to check if it matches the deleted manifest (i.e. to see if uses the same sha256 digest). So, the more tags in repository, the worse the performance will be (as there will be more s3 API calls occurring for the tag directory lookups and tag file reads). Therefore, we can use concurrent lookup and untag to optimize performance as described in https://github.com/goharbor/harbor/issues/12948. P.S. This optimization was originally contributed by @Antiarchitect, now I would like to take it over. Thanks @Antiarchitect's efforts with PR https://github.com/distribution/distribution/pull/3890. Signed-off-by: Liang Zheng <zhengliang0901@gmail.com>
This commit is contained in:
parent
a5882d6646
commit
a2afe23f38
10 changed files with 153 additions and 24 deletions
|
@ -12,6 +12,8 @@ storage:
|
||||||
maintenance:
|
maintenance:
|
||||||
uploadpurging:
|
uploadpurging:
|
||||||
enabled: false
|
enabled: false
|
||||||
|
tag:
|
||||||
|
concurrencylimit: 8
|
||||||
http:
|
http:
|
||||||
addr: :5000
|
addr: :5000
|
||||||
secret: asecretforlocaldevelopment
|
secret: asecretforlocaldevelopment
|
||||||
|
|
|
@ -14,6 +14,8 @@ storage:
|
||||||
maintenance:
|
maintenance:
|
||||||
uploadpurging:
|
uploadpurging:
|
||||||
enabled: false
|
enabled: false
|
||||||
|
tag:
|
||||||
|
concurrencylimit: 8
|
||||||
http:
|
http:
|
||||||
addr: :5000
|
addr: :5000
|
||||||
debug:
|
debug:
|
||||||
|
|
|
@ -7,6 +7,8 @@ storage:
|
||||||
blobdescriptor: inmemory
|
blobdescriptor: inmemory
|
||||||
filesystem:
|
filesystem:
|
||||||
rootdirectory: /var/lib/registry
|
rootdirectory: /var/lib/registry
|
||||||
|
tag:
|
||||||
|
concurrencylimit: 8
|
||||||
http:
|
http:
|
||||||
addr: :5000
|
addr: :5000
|
||||||
headers:
|
headers:
|
||||||
|
|
|
@ -441,6 +441,8 @@ func (storage Storage) Type() string {
|
||||||
// allow configuration of delete
|
// allow configuration of delete
|
||||||
case "redirect":
|
case "redirect":
|
||||||
// allow configuration of redirect
|
// allow configuration of redirect
|
||||||
|
case "tag":
|
||||||
|
// allow configuration of tag
|
||||||
default:
|
default:
|
||||||
storageType = append(storageType, k)
|
storageType = append(storageType, k)
|
||||||
}
|
}
|
||||||
|
@ -454,6 +456,19 @@ func (storage Storage) Type() string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TagParameters returns the Parameters map for a Storage tag configuration
|
||||||
|
func (storage Storage) TagParameters() Parameters {
|
||||||
|
return storage["tag"]
|
||||||
|
}
|
||||||
|
|
||||||
|
// setTagParameter changes the parameter at the provided key to the new value
|
||||||
|
func (storage Storage) setTagParameter(key string, value interface{}) {
|
||||||
|
if _, ok := storage["tag"]; !ok {
|
||||||
|
storage["tag"] = make(Parameters)
|
||||||
|
}
|
||||||
|
storage["tag"][key] = value
|
||||||
|
}
|
||||||
|
|
||||||
// Parameters returns the Parameters map for a Storage configuration
|
// Parameters returns the Parameters map for a Storage configuration
|
||||||
func (storage Storage) Parameters() Parameters {
|
func (storage Storage) Parameters() Parameters {
|
||||||
return storage[storage.Type()]
|
return storage[storage.Type()]
|
||||||
|
@ -482,6 +497,8 @@ func (storage *Storage) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||||
// allow configuration of delete
|
// allow configuration of delete
|
||||||
case "redirect":
|
case "redirect":
|
||||||
// allow configuration of redirect
|
// allow configuration of redirect
|
||||||
|
case "tag":
|
||||||
|
// allow configuration of tag
|
||||||
default:
|
default:
|
||||||
types = append(types, k)
|
types = append(types, k)
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,9 @@ var configStruct = Configuration{
|
||||||
"url1": "https://foo.example.com",
|
"url1": "https://foo.example.com",
|
||||||
"path1": "/some-path",
|
"path1": "/some-path",
|
||||||
},
|
},
|
||||||
|
"tag": Parameters{
|
||||||
|
"concurrencylimit": 10,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Auth: Auth{
|
Auth: Auth{
|
||||||
"silly": Parameters{
|
"silly": Parameters{
|
||||||
|
@ -167,6 +170,8 @@ storage:
|
||||||
int1: 42
|
int1: 42
|
||||||
url1: "https://foo.example.com"
|
url1: "https://foo.example.com"
|
||||||
path1: "/some-path"
|
path1: "/some-path"
|
||||||
|
tag:
|
||||||
|
concurrencylimit: 10
|
||||||
auth:
|
auth:
|
||||||
silly:
|
silly:
|
||||||
realm: silly
|
realm: silly
|
||||||
|
@ -542,6 +547,9 @@ func copyConfig(config Configuration) *Configuration {
|
||||||
for k, v := range config.Storage.Parameters() {
|
for k, v := range config.Storage.Parameters() {
|
||||||
configCopy.Storage.setParameter(k, v)
|
configCopy.Storage.setParameter(k, v)
|
||||||
}
|
}
|
||||||
|
for k, v := range config.Storage.TagParameters() {
|
||||||
|
configCopy.Storage.setTagParameter(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
configCopy.Auth = Auth{config.Auth.Type(): Parameters{}}
|
configCopy.Auth = Auth{config.Auth.Type(): Parameters{}}
|
||||||
for k, v := range config.Auth.Parameters() {
|
for k, v := range config.Auth.Parameters() {
|
||||||
|
|
|
@ -141,6 +141,8 @@ storage:
|
||||||
usedualstack: false
|
usedualstack: false
|
||||||
loglevel: debug
|
loglevel: debug
|
||||||
inmemory: # This driver takes no parameters
|
inmemory: # This driver takes no parameters
|
||||||
|
tag:
|
||||||
|
concurrencylimit: 8
|
||||||
delete:
|
delete:
|
||||||
enabled: false
|
enabled: false
|
||||||
redirect:
|
redirect:
|
||||||
|
@ -521,6 +523,26 @@ parameter sets a limit on the number of descriptors to store in the cache.
|
||||||
The default value is 10000. If this parameter is set to 0, the cache is allowed
|
The default value is 10000. If this parameter is set to 0, the cache is allowed
|
||||||
to grow with no size limit.
|
to grow with no size limit.
|
||||||
|
|
||||||
|
### `tag`
|
||||||
|
|
||||||
|
The `tag` subsection provides configuration to set concurrency limit for tag lookup.
|
||||||
|
When user calls into the registry to delete the manifest, which in turn then does a
|
||||||
|
lookup for all tags that reference the deleted manifest. To find the tag references,
|
||||||
|
the registry will iterate every tag in the repository and read it's link file to check
|
||||||
|
if it matches the deleted manifest (i.e. to see if uses the same sha256 digest).
|
||||||
|
So, the more tags in repository, the worse the performance will be (as there will
|
||||||
|
be more S3 API calls occurring for the tag directory lookups and tag file reads if
|
||||||
|
using S3 storage driver).
|
||||||
|
|
||||||
|
Therefore, add a single flag `concurrencylimit` to set concurrency limit to optimize tag
|
||||||
|
lookup performance under the `tag` section. When a value is not provided or equal to 0,
|
||||||
|
`GOMAXPROCS` will be used.
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
tag:
|
||||||
|
concurrencylimit: 8
|
||||||
|
```
|
||||||
|
|
||||||
### `redirect`
|
### `redirect`
|
||||||
|
|
||||||
The `redirect` subsection provides configuration for managing redirects from
|
The `redirect` subsection provides configuration for managing redirects from
|
||||||
|
|
|
@ -188,6 +188,21 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// configure tag lookup concurrency limit
|
||||||
|
if p := config.Storage.TagParameters(); p != nil {
|
||||||
|
l, ok := p["concurrencylimit"]
|
||||||
|
if ok {
|
||||||
|
limit, ok := l.(int)
|
||||||
|
if !ok {
|
||||||
|
panic("tag lookup concurrency limit config key must have a integer value")
|
||||||
|
}
|
||||||
|
if limit < 0 {
|
||||||
|
panic("tag lookup concurrency limit should be a non-negative integer value")
|
||||||
|
}
|
||||||
|
options = append(options, storage.TagLookupConcurrencyLimit(limit))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// configure redirects
|
// configure redirects
|
||||||
var redirectDisabled bool
|
var redirectDisabled bool
|
||||||
if redirectConfig, ok := config.Storage["redirect"]; ok {
|
if redirectConfig, ok := config.Storage["redirect"]; ok {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"mime"
|
"mime"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/distribution/distribution/v3"
|
"github.com/distribution/distribution/v3"
|
||||||
"github.com/distribution/distribution/v3/internal/dcontext"
|
"github.com/distribution/distribution/v3/internal/dcontext"
|
||||||
|
@ -13,11 +14,13 @@ import (
|
||||||
"github.com/distribution/distribution/v3/manifest/ocischema"
|
"github.com/distribution/distribution/v3/manifest/ocischema"
|
||||||
"github.com/distribution/distribution/v3/manifest/schema2"
|
"github.com/distribution/distribution/v3/manifest/schema2"
|
||||||
"github.com/distribution/distribution/v3/registry/api/errcode"
|
"github.com/distribution/distribution/v3/registry/api/errcode"
|
||||||
|
"github.com/distribution/distribution/v3/registry/storage"
|
||||||
"github.com/distribution/distribution/v3/registry/storage/driver"
|
"github.com/distribution/distribution/v3/registry/storage/driver"
|
||||||
"github.com/distribution/reference"
|
"github.com/distribution/reference"
|
||||||
"github.com/gorilla/handlers"
|
"github.com/gorilla/handlers"
|
||||||
"github.com/opencontainers/go-digest"
|
"github.com/opencontainers/go-digest"
|
||||||
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -481,12 +484,26 @@ func (imh *manifestHandler) DeleteManifest(w http.ResponseWriter, r *http.Reques
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
errs []error
|
||||||
|
mu sync.Mutex
|
||||||
|
)
|
||||||
|
g := errgroup.Group{}
|
||||||
|
g.SetLimit(storage.DefaultConcurrencyLimit)
|
||||||
for _, tag := range referencedTags {
|
for _, tag := range referencedTags {
|
||||||
if err := tagService.Untag(imh, tag); err != nil {
|
tag := tag
|
||||||
imh.Errors = append(imh.Errors, err)
|
|
||||||
return
|
g.Go(func() error {
|
||||||
}
|
if err := tagService.Untag(imh, tag); err != nil {
|
||||||
|
mu.Lock()
|
||||||
|
errs = append(errs, err)
|
||||||
|
mu.Unlock()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
_ = g.Wait() // imh will record all errors, so ignore the error of Wait()
|
||||||
|
imh.Errors = errs
|
||||||
|
|
||||||
w.WriteHeader(http.StatusAccepted)
|
w.WriteHeader(http.StatusAccepted)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package storage
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"runtime"
|
||||||
|
|
||||||
"github.com/distribution/distribution/v3"
|
"github.com/distribution/distribution/v3"
|
||||||
"github.com/distribution/distribution/v3/registry/storage/cache"
|
"github.com/distribution/distribution/v3/registry/storage/cache"
|
||||||
|
@ -10,6 +11,10 @@ import (
|
||||||
"github.com/distribution/reference"
|
"github.com/distribution/reference"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
DefaultConcurrencyLimit = runtime.GOMAXPROCS(0)
|
||||||
|
)
|
||||||
|
|
||||||
// registry is the top-level implementation of Registry for use in the storage
|
// registry is the top-level implementation of Registry for use in the storage
|
||||||
// package. All instances should descend from this object.
|
// package. All instances should descend from this object.
|
||||||
type registry struct {
|
type registry struct {
|
||||||
|
@ -18,6 +23,7 @@ type registry struct {
|
||||||
statter *blobStatter // global statter service.
|
statter *blobStatter // global statter service.
|
||||||
blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider
|
blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider
|
||||||
deleteEnabled bool
|
deleteEnabled bool
|
||||||
|
tagLookupConcurrencyLimit int
|
||||||
resumableDigestEnabled bool
|
resumableDigestEnabled bool
|
||||||
blobDescriptorServiceFactory distribution.BlobDescriptorServiceFactory
|
blobDescriptorServiceFactory distribution.BlobDescriptorServiceFactory
|
||||||
manifestURLs manifestURLs
|
manifestURLs manifestURLs
|
||||||
|
@ -40,6 +46,13 @@ func EnableRedirect(registry *registry) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TagLookupConcurrencyLimit(concurrencyLimit int) RegistryOption {
|
||||||
|
return func(registry *registry) error {
|
||||||
|
registry.tagLookupConcurrencyLimit = concurrencyLimit
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// EnableDelete is a functional option for NewRegistry. It enables deletion on
|
// EnableDelete is a functional option for NewRegistry. It enables deletion on
|
||||||
// the registry.
|
// the registry.
|
||||||
func EnableDelete(registry *registry) error {
|
func EnableDelete(registry *registry) error {
|
||||||
|
@ -184,9 +197,14 @@ func (repo *repository) Named() reference.Named {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *repository) Tags(ctx context.Context) distribution.TagService {
|
func (repo *repository) Tags(ctx context.Context) distribution.TagService {
|
||||||
|
limit := DefaultConcurrencyLimit
|
||||||
|
if repo.tagLookupConcurrencyLimit > 0 {
|
||||||
|
limit = repo.tagLookupConcurrencyLimit
|
||||||
|
}
|
||||||
tags := &tagStore{
|
tags := &tagStore{
|
||||||
repository: repo,
|
repository: repo,
|
||||||
blobStore: repo.registry.blobStore,
|
blobStore: repo.registry.blobStore,
|
||||||
|
concurrencyLimit: limit,
|
||||||
}
|
}
|
||||||
|
|
||||||
return tags
|
return tags
|
||||||
|
|
|
@ -4,10 +4,13 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"path"
|
"path"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/opencontainers/go-digest"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"github.com/distribution/distribution/v3"
|
"github.com/distribution/distribution/v3"
|
||||||
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
|
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
|
||||||
"github.com/opencontainers/go-digest"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ distribution.TagService = &tagStore{}
|
var _ distribution.TagService = &tagStore{}
|
||||||
|
@ -18,8 +21,9 @@ var _ distribution.TagService = &tagStore{}
|
||||||
// which only makes use of the Digest field of the returned distribution.Descriptor
|
// which only makes use of the Digest field of the returned distribution.Descriptor
|
||||||
// but does not enable full roundtripping of Descriptor objects
|
// but does not enable full roundtripping of Descriptor objects
|
||||||
type tagStore struct {
|
type tagStore struct {
|
||||||
repository *repository
|
repository *repository
|
||||||
blobStore *blobStore
|
blobStore *blobStore
|
||||||
|
concurrencyLimit int
|
||||||
}
|
}
|
||||||
|
|
||||||
// All returns all tags
|
// All returns all tags
|
||||||
|
@ -145,26 +149,48 @@ func (ts *tagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var tags []string
|
g, ctx := errgroup.WithContext(ctx)
|
||||||
|
g.SetLimit(ts.concurrencyLimit)
|
||||||
|
|
||||||
|
var (
|
||||||
|
tags []string
|
||||||
|
mu sync.Mutex
|
||||||
|
)
|
||||||
for _, tag := range allTags {
|
for _, tag := range allTags {
|
||||||
tagLinkPathSpec := manifestTagCurrentPathSpec{
|
if ctx.Err() != nil {
|
||||||
name: ts.repository.Named().Name(),
|
break
|
||||||
tag: tag,
|
|
||||||
}
|
}
|
||||||
|
tag := tag
|
||||||
|
|
||||||
tagLinkPath, _ := pathFor(tagLinkPathSpec)
|
g.Go(func() error {
|
||||||
tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath)
|
tagLinkPathSpec := manifestTagCurrentPathSpec{
|
||||||
if err != nil {
|
name: ts.repository.Named().Name(),
|
||||||
switch err.(type) {
|
tag: tag,
|
||||||
case storagedriver.PathNotFoundError:
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if tagDigest == desc.Digest {
|
tagLinkPath, _ := pathFor(tagLinkPathSpec)
|
||||||
tags = append(tags, tag)
|
tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath)
|
||||||
}
|
if err != nil {
|
||||||
|
switch err.(type) {
|
||||||
|
case storagedriver.PathNotFoundError:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if tagDigest == desc.Digest {
|
||||||
|
mu.Lock()
|
||||||
|
tags = append(tags, tag)
|
||||||
|
mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
err = g.Wait()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return tags, nil
|
return tags, nil
|
||||||
|
|
Loading…
Reference in a new issue