From cf81f67a1605ca920802338237331543f29ce850 Mon Sep 17 00:00:00 2001 From: Collin Shoop Date: Thu, 24 Jun 2021 14:42:02 -0400 Subject: [PATCH] storagedriver/s3: Optimized Walk implementation + bugfix Optimized S3 Walk impl by no longer listing files recursively. Overall gives a huge performance increase both in terms of runtime and S3 calls (up to ~500x). Fixed a bug in WalkFallback where ErrSkipDir for was not handled as documented for non-directory. Signed-off-by: Collin Shoop --- registry/storage/driver/azure/azure.go | 2 +- registry/storage/driver/filesystem/driver.go | 2 +- registry/storage/driver/inmemory/driver.go | 2 +- registry/storage/driver/s3-aws/s3.go | 173 ++++++++++--------- registry/storage/driver/s3-aws/s3_test.go | 171 ++++++++++++++++++ registry/storage/driver/swift/swift.go | 2 +- registry/storage/driver/walk.go | 21 ++- registry/storage/driver/walk_test.go | 139 ++++++++++++++- 8 files changed, 418 insertions(+), 94 deletions(-) diff --git a/registry/storage/driver/azure/azure.go b/registry/storage/driver/azure/azure.go index e39897e0..73c58c39 100644 --- a/registry/storage/driver/azure/azure.go +++ b/registry/storage/driver/azure/azure.go @@ -360,7 +360,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int } // Walk traverses a filesystem defined within driver, starting -// from the given path, calling f on each file +// from the given path, calling f on each file and directory func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { return storagedriver.WalkFallback(ctx, d, path, f) } diff --git a/registry/storage/driver/filesystem/driver.go b/registry/storage/driver/filesystem/driver.go index 0f7b6660..5bef7e11 100644 --- a/registry/storage/driver/filesystem/driver.go +++ b/registry/storage/driver/filesystem/driver.go @@ -290,7 +290,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int } // Walk traverses a filesystem defined within driver, starting -// from the given path, calling f on each file +// from the given path, calling f on each file and directory func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { return storagedriver.WalkFallback(ctx, d, path, f) } diff --git a/registry/storage/driver/inmemory/driver.go b/registry/storage/driver/inmemory/driver.go index e1d9ccfe..66ac2df0 100644 --- a/registry/storage/driver/inmemory/driver.go +++ b/registry/storage/driver/inmemory/driver.go @@ -245,7 +245,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int } // Walk traverses a filesystem defined within driver, starting -// from the given path, calling f on each file +// from the given path, calling f on each file and directory func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { return storagedriver.WalkFallback(ctx, d, path, f) } diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go index 0fb895f6..75f80b7e 100644 --- a/registry/storage/driver/s3-aws/s3.go +++ b/registry/storage/driver/s3-aws/s3.go @@ -20,6 +20,7 @@ import ( "io/ioutil" "math" "net/http" + "path/filepath" "reflect" "sort" "strconv" @@ -941,110 +942,84 @@ func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn) return nil } -type walkInfoContainer struct { - storagedriver.FileInfoFields - prefix *string -} - -// Path provides the full path of the target of this file info. -func (wi walkInfoContainer) Path() string { - return wi.FileInfoFields.Path -} - -// Size returns current length in bytes of the file. The return value can -// be used to write to the end of the file at path. The value is -// meaningless if IsDir returns true. -func (wi walkInfoContainer) Size() int64 { - return wi.FileInfoFields.Size -} - -// ModTime returns the modification time for the file. For backends that -// don't have a modification time, the creation time should be returned. -func (wi walkInfoContainer) ModTime() time.Time { - return wi.FileInfoFields.ModTime -} - -// IsDir returns true if the path is a directory. -func (wi walkInfoContainer) IsDir() bool { - return wi.FileInfoFields.IsDir -} - func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error { - var retError error + var ( + retError error + // the most recent directory walked for de-duping + prevDir string + // the most recent skip directory to avoid walking over undesirable files + prevSkipDir string + ) + prevDir = prefix + path listObjectsInput := &s3.ListObjectsV2Input{ - Bucket: aws.String(d.Bucket), - Prefix: aws.String(path), - Delimiter: aws.String("/"), - MaxKeys: aws.Int64(listMax), + Bucket: aws.String(d.Bucket), + Prefix: aws.String(path), + MaxKeys: aws.Int64(listMax), } ctx, done := dcontext.WithTrace(parentCtx) defer done("s3aws.ListObjectsV2Pages(%s)", path) + + // When the "delimiter" argument is omitted, the S3 list API will list all objects in the bucket + // recursively, omitting directory paths. Objects are listed in sorted, depth-first order so we + // can infer all the directories by comparing each object path to the last one we saw. + // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html + + // With files returned in sorted depth-first order, directories are inferred in the same order. + // ErrSkipDir is handled by explicitly skipping over any files under the skipped directory. This may be sub-optimal + // for extreme edge cases but for the general use case in a registry, this is orders of magnitude + // faster than a more explicit recursive implementation. listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool { - - var count int64 - // KeyCount was introduced with version 2 of the GET Bucket operation in S3. - // Some S3 implementations don't support V2 now, so we fall back to manual - // calculation of the key count if required - if objects.KeyCount != nil { - count = *objects.KeyCount - *objectCount += *objects.KeyCount - } else { - count = int64(len(objects.Contents) + len(objects.CommonPrefixes)) - *objectCount += count - } - - walkInfos := make([]walkInfoContainer, 0, count) - - for _, dir := range objects.CommonPrefixes { - commonPrefix := *dir.Prefix - walkInfos = append(walkInfos, walkInfoContainer{ - prefix: dir.Prefix, - FileInfoFields: storagedriver.FileInfoFields{ - IsDir: true, - Path: strings.Replace(commonPrefix[:len(commonPrefix)-1], d.s3Path(""), prefix, 1), - }, - }) - } + walkInfos := make([]storagedriver.FileInfoInternal, 0, len(objects.Contents)) for _, file := range objects.Contents { - // empty prefixes are listed as objects inside its own prefix. - // https://docs.aws.amazon.com/AmazonS3/latest/user-guide/using-folders.html - if strings.HasSuffix(*file.Key, "/") { - continue + filePath := strings.Replace(*file.Key, d.s3Path(""), prefix, 1) + + // get a list of all inferred directories between the previous directory and this file + dirs := directoryDiff(prevDir, filePath) + if len(dirs) > 0 { + for _, dir := range dirs { + walkInfos = append(walkInfos, storagedriver.FileInfoInternal{ + FileInfoFields: storagedriver.FileInfoFields{ + IsDir: true, + Path: dir, + }, + }) + prevDir = dir + } } - walkInfos = append(walkInfos, walkInfoContainer{ + + walkInfos = append(walkInfos, storagedriver.FileInfoInternal{ FileInfoFields: storagedriver.FileInfoFields{ IsDir: false, Size: *file.Size, ModTime: *file.LastModified, - Path: strings.Replace(*file.Key, d.s3Path(""), prefix, 1), + Path: filePath, }, }) } - sort.SliceStable(walkInfos, func(i, j int) bool { return walkInfos[i].FileInfoFields.Path < walkInfos[j].FileInfoFields.Path }) - for _, walkInfo := range walkInfos { - err := f(walkInfo) - - if err == storagedriver.ErrSkipDir { - if walkInfo.IsDir() { - continue - } else { - break - } - } else if err != nil { - retError = err - return false + // skip any results under the last skip directory + if prevSkipDir != "" && strings.HasPrefix(walkInfo.Path(), prevSkipDir) { + continue } - if walkInfo.IsDir() { - if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, f); err != nil { - retError = err + err := f(walkInfo) + *objectCount++ + + if err != nil { + if err == storagedriver.ErrSkipDir { + if walkInfo.IsDir() { + prevSkipDir = walkInfo.Path() + continue + } + // is file, stop gracefully return false } + retError = err + return false } } return true @@ -1061,6 +1036,44 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre return nil } +// directoryDiff finds all directories that are not in common between +// the previous and current paths in sorted order. +// +// Eg 1 directoryDiff("/path/to/folder", "/path/to/folder/folder/file") +// => [ "/path/to/folder/folder" ], +// Eg 2 directoryDiff("/path/to/folder/folder1", "/path/to/folder/folder2/file") +// => [ "/path/to/folder/folder2" ] +// Eg 3 directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/file") +// => [ "/path/to/folder/folder2" ] +// Eg 4 directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/folder1/file") +// => [ "/path/to/folder/folder2", "/path/to/folder/folder2/folder1" ] +// Eg 5 directoryDiff("/", "/path/to/folder/folder/file") +// => [ "/path", "/path/to", "/path/to/folder", "/path/to/folder/folder" ], +func directoryDiff(prev, current string) []string { + var paths []string + + if prev == "" || current == "" { + return paths + } + + parent := current + for { + parent = filepath.Dir(parent) + if parent == "/" || parent == prev || strings.HasPrefix(prev, parent) { + break + } + paths = append(paths, parent) + } + reverse(paths) + return paths +} + +func reverse(s []string) { + for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { + s[i], s[j] = s[j], s[i] + } +} + func (d *driver) s3Path(path string) string { return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/") } diff --git a/registry/storage/driver/s3-aws/s3_test.go b/registry/storage/driver/s3-aws/s3_test.go index f0b257ca..325e2e28 100644 --- a/registry/storage/driver/s3-aws/s3_test.go +++ b/registry/storage/driver/s3-aws/s3_test.go @@ -2,6 +2,7 @@ package s3 import ( "bytes" + "errors" "fmt" "io/ioutil" "math/rand" @@ -496,6 +497,165 @@ func TestDelete(t *testing.T) { } } +func TestWalk(t *testing.T) { + if skipS3() != "" { + t.Skip(skipS3()) + } + + rootDir, err := ioutil.TempDir("", "driver-") + if err != nil { + t.Fatalf("unexpected error creating temporary directory: %v", err) + } + defer os.Remove(rootDir) + + driver, err := s3DriverConstructor(rootDir, s3.StorageClassStandard) + if err != nil { + t.Fatalf("unexpected error creating driver with standard storage: %v", err) + } + + var fileset = []string{ + "/file1", + "/folder1/file1", + "/folder2/file1", + "/folder3/subfolder1/subfolder1/file1", + "/folder3/subfolder2/subfolder1/file1", + "/folder4/file1", + } + + // create file structure matching fileset above + var created []string + for _, path := range fileset { + err := driver.PutContent(context.Background(), path, []byte("content "+path)) + if err != nil { + fmt.Printf("unable to create file %s: %s\n", path, err) + continue + } + created = append(created, path) + } + + // cleanup + defer func() { + var lastErr error + for _, path := range created { + err := driver.Delete(context.Background(), path) + if err != nil { + _ = fmt.Errorf("cleanup failed for path %s: %s", path, err) + lastErr = err + } + } + if lastErr != nil { + t.Fatalf("cleanup failed: %s", err) + } + }() + + tcs := []struct { + name string + fn storagedriver.WalkFn + from string + expected []string + err bool + }{ + { + name: "walk all", + fn: func(fileInfo storagedriver.FileInfo) error { return nil }, + expected: []string{ + "/file1", + "/folder1", + "/folder1/file1", + "/folder2", + "/folder2/file1", + "/folder3", + "/folder3/subfolder1", + "/folder3/subfolder1/subfolder1", + "/folder3/subfolder1/subfolder1/file1", + "/folder3/subfolder2", + "/folder3/subfolder2/subfolder1", + "/folder3/subfolder2/subfolder1/file1", + "/folder4", + "/folder4/file1", + }, + }, + { + name: "skip directory", + fn: func(fileInfo storagedriver.FileInfo) error { + if fileInfo.Path() == "/folder3" { + return storagedriver.ErrSkipDir + } + if strings.Contains(fileInfo.Path(), "/folder3") { + t.Fatalf("skipped dir %s and should not walk %s", "/folder3", fileInfo.Path()) + } + return nil + }, + expected: []string{ + "/file1", + "/folder1", + "/folder1/file1", + "/folder2", + "/folder2/file1", + "/folder3", + // folder 3 contents skipped + "/folder4", + "/folder4/file1", + }, + }, + { + name: "stop early", + fn: func(fileInfo storagedriver.FileInfo) error { + if fileInfo.Path() == "/folder1/file1" { + return storagedriver.ErrSkipDir + } + return nil + }, + expected: []string{ + "/file1", + "/folder1", + "/folder1/file1", + // stop early + }, + err: false, + }, + { + name: "error", + fn: func(fileInfo storagedriver.FileInfo) error { + return errors.New("foo") + }, + expected: []string{ + "/file1", + }, + err: true, + }, + { + name: "from folder", + fn: func(fileInfo storagedriver.FileInfo) error { return nil }, + expected: []string{ + "/folder1", + "/folder1/file1", + }, + from: "/folder1", + }, + } + + for _, tc := range tcs { + var walked []string + if tc.from == "" { + tc.from = "/" + } + t.Run(tc.name, func(t *testing.T) { + err := driver.Walk(context.Background(), tc.from, func(fileInfo storagedriver.FileInfo) error { + walked = append(walked, fileInfo.Path()) + return tc.fn(fileInfo) + }) + if tc.err && err == nil { + t.Fatalf("expected err") + } + if !tc.err && err != nil { + t.Fatalf(err.Error()) + } + compareWalked(t, tc.expected, walked) + }) + } +} + func TestOverThousandBlobs(t *testing.T) { if skipS3() != "" { t.Skip(skipS3()) @@ -582,3 +742,14 @@ func TestMoveWithMultipartCopy(t *testing.T) { t.Fatalf("unexpected error getting content: %v", err) } } + +func compareWalked(t *testing.T, expected, walked []string) { + if len(walked) != len(expected) { + t.Fatalf("Mismatch number of fileInfo walked %d expected %d; walked %s; expected %s;", len(walked), len(expected), walked, expected) + } + for i := range walked { + if walked[i] != expected[i] { + t.Fatalf("walked in unexpected order: expected %s; walked %s", expected, walked) + } + } +} diff --git a/registry/storage/driver/swift/swift.go b/registry/storage/driver/swift/swift.go index 65072581..e54e44d6 100644 --- a/registry/storage/driver/swift/swift.go +++ b/registry/storage/driver/swift/swift.go @@ -658,7 +658,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int } // Walk traverses a filesystem defined within driver, starting -// from the given path, calling f on each file +// from the given path, calling f on each file and directory func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { return storagedriver.WalkFallback(ctx, d, path, f) } diff --git a/registry/storage/driver/walk.go b/registry/storage/driver/walk.go index 8ded5b8b..c564d896 100644 --- a/registry/storage/driver/walk.go +++ b/registry/storage/driver/walk.go @@ -22,9 +22,14 @@ type WalkFn func(fileInfo FileInfo) error // to a directory, the directory will not be entered and Walk // will continue the traversal. If fileInfo refers to a normal file, processing stops func WalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) error { + _, err := doWalkFallback(ctx, driver, from, f) + return err +} + +func doWalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) (bool, error) { children, err := driver.List(ctx, from) if err != nil { - return err + return false, err } sort.Stable(sort.StringSlice(children)) for _, child := range children { @@ -40,22 +45,22 @@ func WalkFallback(ctx context.Context, driver StorageDriver, from string, f Walk logrus.WithField("path", child).Infof("ignoring deleted path") continue default: - return err + return false, err } } err = f(fileInfo) if err == nil && fileInfo.IsDir() { - if err := WalkFallback(ctx, driver, child, f); err != nil { - return err + if ok, err := doWalkFallback(ctx, driver, child, f); err != nil || !ok { + return ok, err } } else if err == ErrSkipDir { - // Stop iteration if it's a file, otherwise noop if it's a directory + // noop for folders, will just skip if !fileInfo.IsDir() { - return nil + return false, nil // no error but stop iteration } } else if err != nil { - return err + return false, err } } - return nil + return true, nil } diff --git a/registry/storage/driver/walk_test.go b/registry/storage/driver/walk_test.go index d6e9beb6..2a3a4fb2 100644 --- a/registry/storage/driver/walk_test.go +++ b/registry/storage/driver/walk_test.go @@ -3,6 +3,7 @@ package driver import ( "context" "fmt" + "strings" "testing" ) @@ -12,10 +13,10 @@ type changingFileSystem struct { keptFiles map[string]bool } -func (cfs *changingFileSystem) List(ctx context.Context, path string) ([]string, error) { +func (cfs *changingFileSystem) List(_ context.Context, _ string) ([]string, error) { return cfs.fileset, nil } -func (cfs *changingFileSystem) Stat(ctx context.Context, path string) (FileInfo, error) { +func (cfs *changingFileSystem) Stat(_ context.Context, path string) (FileInfo, error) { kept, ok := cfs.keptFiles[path] if ok && kept { return &FileInfoInternal{ @@ -26,6 +27,32 @@ func (cfs *changingFileSystem) Stat(ctx context.Context, path string) (FileInfo, } return nil, PathNotFoundError{} } + +type fileSystem struct { + StorageDriver + // maps folder to list results + fileset map[string][]string +} + +func (cfs *fileSystem) List(_ context.Context, path string) ([]string, error) { + return cfs.fileset[path], nil +} + +func (cfs *fileSystem) Stat(_ context.Context, path string) (FileInfo, error) { + _, isDir := cfs.fileset[path] + return &FileInfoInternal{ + FileInfoFields: FileInfoFields{ + Path: path, + IsDir: isDir, + Size: int64(len(path)), + }, + }, nil +} +func (cfs *fileSystem) isDir(path string) bool { + _, isDir := cfs.fileset[path] + return isDir +} + func TestWalkFileRemoved(t *testing.T) { d := &changingFileSystem{ fileset: []string{"zoidberg", "bender"}, @@ -45,3 +72,111 @@ func TestWalkFileRemoved(t *testing.T) { t.Fatalf(err.Error()) } } + +func TestWalkFallback(t *testing.T) { + d := &fileSystem{ + fileset: map[string][]string{ + "/": {"/file1", "/folder1", "/folder2"}, + "/folder1": {"/folder1/file1"}, + "/folder2": {"/folder2/file1"}, + }, + } + noopFn := func(fileInfo FileInfo) error { return nil } + + tcs := []struct { + name string + fn WalkFn + from string + expected []string + err bool + }{ + { + name: "walk all", + fn: noopFn, + expected: []string{ + "/file1", + "/folder1", + "/folder1/file1", + "/folder2", + "/folder2/file1", + }, + }, + { + name: "skip directory", + fn: func(fileInfo FileInfo) error { + if fileInfo.Path() == "/folder1" { + return ErrSkipDir + } + if strings.Contains(fileInfo.Path(), "/folder1") { + t.Fatalf("skipped dir %s and should not walk %s", "/folder1", fileInfo.Path()) + } + return nil + }, + expected: []string{ + "/file1", + "/folder1", // return ErrSkipDir, skip anything under /folder1 + // skip /folder1/file1 + "/folder2", + "/folder2/file1", + }, + }, + { + name: "stop early", + fn: func(fileInfo FileInfo) error { + if fileInfo.Path() == "/folder1/file1" { + return ErrSkipDir + } + return nil + }, + expected: []string{ + "/file1", + "/folder1", + "/folder1/file1", + // stop early + }, + }, + { + name: "from folder", + fn: noopFn, + expected: []string{ + "/folder1/file1", + }, + from: "/folder1", + }, + } + + for _, tc := range tcs { + var walked []string + if tc.from == "" { + tc.from = "/" + } + t.Run(tc.name, func(t *testing.T) { + err := WalkFallback(context.Background(), d, tc.from, func(fileInfo FileInfo) error { + walked = append(walked, fileInfo.Path()) + if fileInfo.IsDir() != d.isDir(fileInfo.Path()) { + t.Fatalf("fileInfo isDir not matching file system: expected %t actual %t", d.isDir(fileInfo.Path()), fileInfo.IsDir()) + } + return tc.fn(fileInfo) + }) + if tc.err && err == nil { + t.Fatalf("expected err") + } + if !tc.err && err != nil { + t.Fatalf(err.Error()) + } + compareWalked(t, tc.expected, walked) + }) + } + +} + +func compareWalked(t *testing.T, expected, walked []string) { + if len(walked) != len(expected) { + t.Fatalf("Mismatch number of fileInfo walked %d expected %d; walked %s; expected %s;", len(walked), len(expected), walked, expected) + } + for i := range walked { + if walked[i] != expected[i] { + t.Fatalf("expected walked to come in order expected: walked %s", walked) + } + } +}