Use the New Driver Walk method for catalog enumeration
This changes the Walk Method used for catalog enumeration. Just to show how much an effect this has on our s3 storage: Original: List calls: 6839 real 3m16.636s user 0m0.000s sys 0m0.016s New: ListObjectsV2 Calls: 1805 real 0m49.970s user 0m0.008s sys 0m0.000s This is because it no longer performs a list and stat per item, and instead is able to use the metadata gained from the list as a replacement to stat. Signed-off-by: Sargun Dhillon <sargun@sargun.me>
This commit is contained in:
parent
32ac467992
commit
35b29a609e
2 changed files with 136 additions and 12 deletions
|
@ -18,6 +18,7 @@ var errFinishedWalk = errors.New("finished walk")
|
||||||
// Because it's a quite expensive operation, it should only be used when building up
|
// Because it's a quite expensive operation, it should only be used when building up
|
||||||
// an initial set of repositories.
|
// an initial set of repositories.
|
||||||
func (reg *registry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
|
func (reg *registry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
|
||||||
|
var finishedWalk bool
|
||||||
var foundRepos []string
|
var foundRepos []string
|
||||||
|
|
||||||
if len(repos) == 0 {
|
if len(repos) == 0 {
|
||||||
|
@ -29,7 +30,7 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = Walk(ctx, reg.blobStore.driver, root, func(fileInfo driver.FileInfo) error {
|
err = reg.blobStore.driver.Walk(ctx, root, func(fileInfo driver.FileInfo) error {
|
||||||
err := handleRepository(fileInfo, root, last, func(repoPath string) error {
|
err := handleRepository(fileInfo, root, last, func(repoPath string) error {
|
||||||
foundRepos = append(foundRepos, repoPath)
|
foundRepos = append(foundRepos, repoPath)
|
||||||
return nil
|
return nil
|
||||||
|
@ -40,7 +41,8 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri
|
||||||
|
|
||||||
// if we've filled our array, no need to walk any further
|
// if we've filled our array, no need to walk any further
|
||||||
if len(foundRepos) == len(repos) {
|
if len(foundRepos) == len(repos) {
|
||||||
return errFinishedWalk
|
finishedWalk = true
|
||||||
|
return driver.ErrSkipDir
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -48,14 +50,11 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri
|
||||||
|
|
||||||
n = copy(repos, foundRepos)
|
n = copy(repos, foundRepos)
|
||||||
|
|
||||||
switch err {
|
if err != nil {
|
||||||
case nil:
|
return n, err
|
||||||
// nil means that we completed walk and didn't fill buffer. No more
|
} else if !finishedWalk {
|
||||||
// records are available.
|
// We didn't fill buffer. No more records are available.
|
||||||
err = io.EOF
|
return n, io.EOF
|
||||||
case errFinishedWalk:
|
|
||||||
// more records are available.
|
|
||||||
err = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return n, err
|
return n, err
|
||||||
|
|
|
@ -34,6 +34,7 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
"github.com/aws/aws-sdk-go/service/s3"
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
|
||||||
|
dcontext "github.com/docker/distribution/context"
|
||||||
"github.com/docker/distribution/registry/client/transport"
|
"github.com/docker/distribution/registry/client/transport"
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
"github.com/docker/distribution/registry/storage/driver/base"
|
"github.com/docker/distribution/registry/storage/driver/base"
|
||||||
|
@ -876,8 +877,132 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
||||||
|
|
||||||
// Walk traverses a filesystem defined within driver, starting
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
// from the given path, calling f on each file
|
// from the given path, calling f on each file
|
||||||
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn) error {
|
||||||
return storagedriver.WalkFallback(ctx, d, path, f)
|
path := from
|
||||||
|
if !strings.HasSuffix(path, "/") {
|
||||||
|
path = path + "/"
|
||||||
|
}
|
||||||
|
|
||||||
|
prefix := ""
|
||||||
|
if d.s3Path("") == "" {
|
||||||
|
prefix = "/"
|
||||||
|
}
|
||||||
|
|
||||||
|
var objectCount int64
|
||||||
|
if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, f); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// S3 doesn't have the concept of empty directories, so it'll return path not found if there are no objects
|
||||||
|
if objectCount == 0 {
|
||||||
|
return storagedriver.PathNotFoundError{Path: from}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type walkInfoContainer struct {
|
||||||
|
storagedriver.FileInfoFields
|
||||||
|
prefix *string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Path provides the full path of the target of this file info.
|
||||||
|
func (wi walkInfoContainer) Path() string {
|
||||||
|
return wi.FileInfoFields.Path
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size returns current length in bytes of the file. The return value can
|
||||||
|
// be used to write to the end of the file at path. The value is
|
||||||
|
// meaningless if IsDir returns true.
|
||||||
|
func (wi walkInfoContainer) Size() int64 {
|
||||||
|
return wi.FileInfoFields.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
// ModTime returns the modification time for the file. For backends that
|
||||||
|
// don't have a modification time, the creation time should be returned.
|
||||||
|
func (wi walkInfoContainer) ModTime() time.Time {
|
||||||
|
return wi.FileInfoFields.ModTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsDir returns true if the path is a directory.
|
||||||
|
func (wi walkInfoContainer) IsDir() bool {
|
||||||
|
return wi.FileInfoFields.IsDir
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error {
|
||||||
|
var retError error
|
||||||
|
|
||||||
|
listObjectsInput := &s3.ListObjectsV2Input{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Prefix: aws.String(path),
|
||||||
|
Delimiter: aws.String("/"),
|
||||||
|
MaxKeys: aws.Int64(listMax),
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, done := dcontext.WithTrace(parentCtx)
|
||||||
|
defer done("s3aws.ListObjectsV2Pages(%s)", path)
|
||||||
|
listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool {
|
||||||
|
|
||||||
|
*objectCount += *objects.KeyCount
|
||||||
|
walkInfos := make([]walkInfoContainer, 0, *objects.KeyCount)
|
||||||
|
|
||||||
|
for _, dir := range objects.CommonPrefixes {
|
||||||
|
commonPrefix := *dir.Prefix
|
||||||
|
walkInfos = append(walkInfos, walkInfoContainer{
|
||||||
|
prefix: dir.Prefix,
|
||||||
|
FileInfoFields: storagedriver.FileInfoFields{
|
||||||
|
IsDir: true,
|
||||||
|
Path: strings.Replace(commonPrefix[:len(commonPrefix)-1], d.s3Path(""), prefix, 1),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, file := range objects.Contents {
|
||||||
|
walkInfos = append(walkInfos, walkInfoContainer{
|
||||||
|
FileInfoFields: storagedriver.FileInfoFields{
|
||||||
|
IsDir: false,
|
||||||
|
Size: *file.Size,
|
||||||
|
ModTime: *file.LastModified,
|
||||||
|
Path: strings.Replace(*file.Key, d.s3Path(""), prefix, 1),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.SliceStable(walkInfos, func(i, j int) bool { return walkInfos[i].FileInfoFields.Path < walkInfos[j].FileInfoFields.Path })
|
||||||
|
|
||||||
|
for _, walkInfo := range walkInfos {
|
||||||
|
err := f(walkInfo)
|
||||||
|
|
||||||
|
if err == storagedriver.ErrSkipDir {
|
||||||
|
if walkInfo.IsDir() {
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
|
retError = err
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if walkInfo.IsDir() {
|
||||||
|
if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, f); err != nil {
|
||||||
|
retError = err
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
if retError != nil {
|
||||||
|
return retError
|
||||||
|
}
|
||||||
|
|
||||||
|
if listObjectErr != nil {
|
||||||
|
return listObjectErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) s3Path(path string) string {
|
func (d *driver) s3Path(path string) string {
|
||||||
|
|
Loading…
Reference in a new issue