Merge pull request #3685 from Jamstah/aws-paging
Work with the storage driver to minimise work when paging
This commit is contained in:
commit
17552d864d
13 changed files with 390 additions and 63 deletions
|
@ -27,6 +27,14 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
startAfter := ""
|
||||||
|
if last != "" {
|
||||||
|
startAfter, err = pathFor(manifestsPathSpec{name: last})
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err = reg.blobStore.driver.Walk(ctx, root, func(fileInfo driver.FileInfo) error {
|
err = reg.blobStore.driver.Walk(ctx, root, func(fileInfo driver.FileInfo) error {
|
||||||
err := handleRepository(fileInfo, root, last, func(repoPath string) error {
|
err := handleRepository(fileInfo, root, last, func(repoPath string) error {
|
||||||
repos[foundRepos] = repoPath
|
repos[foundRepos] = repoPath
|
||||||
|
@ -40,11 +48,11 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri
|
||||||
// if we've filled our slice, no need to walk any further
|
// if we've filled our slice, no need to walk any further
|
||||||
if foundRepos == len(repos) {
|
if foundRepos == len(repos) {
|
||||||
filledBuffer = true
|
filledBuffer = true
|
||||||
return driver.ErrSkipDir
|
return driver.ErrFilledBuffer
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
}, driver.WithStartAfterHint(startAfter))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return foundRepos, err
|
return foundRepos, err
|
||||||
|
@ -136,8 +144,9 @@ func compareReplaceInline(s1, s2 string, old, new byte) int {
|
||||||
|
|
||||||
// handleRepository calls function fn with a repository path if fileInfo
|
// handleRepository calls function fn with a repository path if fileInfo
|
||||||
// has a path of a repository under root and that it is lexographically
|
// has a path of a repository under root and that it is lexographically
|
||||||
// after last. Otherwise, it will return ErrSkipDir. This should be used
|
// after last. Otherwise, it will return ErrSkipDir or ErrFilledBuffer.
|
||||||
// with Walk to do handling with repositories in a storage.
|
// These should be used with Walk to do handling with repositories in a
|
||||||
|
// storage.
|
||||||
func handleRepository(fileInfo driver.FileInfo, root, last string, fn func(repoPath string) error) error {
|
func handleRepository(fileInfo driver.FileInfo, root, last string, fn func(repoPath string) error) error {
|
||||||
filePath := fileInfo.Path()
|
filePath := fileInfo.Path()
|
||||||
|
|
||||||
|
|
|
@ -151,7 +151,11 @@ func TestCatalogInParts(t *testing.T) {
|
||||||
lastRepo = p[len(p)-1]
|
lastRepo = p[len(p)-1]
|
||||||
numFilled, err = env.registry.Repositories(env.ctx, p, lastRepo)
|
numFilled, err = env.registry.Repositories(env.ctx, p, lastRepo)
|
||||||
|
|
||||||
if err != io.EOF || numFilled != len(p) {
|
if numFilled != len(p) {
|
||||||
|
t.Errorf("Expected more values in catalog")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != io.EOF {
|
||||||
t.Errorf("Expected end of catalog")
|
t.Errorf("Expected end of catalog")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -385,8 +385,8 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
||||||
|
|
||||||
// Walk traverses a filesystem defined within driver, starting
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
// from the given path, calling f on each file and directory
|
// from the given path, calling f on each file and directory
|
||||||
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error {
|
||||||
return storagedriver.WalkFallback(ctx, d, path, f)
|
return storagedriver.WalkFallback(ctx, d, path, f, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// directDescendants will find direct descendants (blobs or virtual containers)
|
// directDescendants will find direct descendants (blobs or virtual containers)
|
||||||
|
|
|
@ -226,7 +226,7 @@ func (base *Base) URLFor(ctx context.Context, path string, options map[string]in
|
||||||
}
|
}
|
||||||
|
|
||||||
// Walk wraps Walk of underlying storage driver.
|
// Walk wraps Walk of underlying storage driver.
|
||||||
func (base *Base) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
func (base *Base) Walk(ctx context.Context, path string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error {
|
||||||
ctx, done := dcontext.WithTrace(ctx)
|
ctx, done := dcontext.WithTrace(ctx)
|
||||||
defer done("%s.Walk(%q)", base.Name(), path)
|
defer done("%s.Walk(%q)", base.Name(), path)
|
||||||
|
|
||||||
|
@ -234,5 +234,5 @@ func (base *Base) Walk(ctx context.Context, path string, f storagedriver.WalkFn)
|
||||||
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
|
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
|
||||||
}
|
}
|
||||||
|
|
||||||
return base.setDriverName(base.StorageDriver.Walk(ctx, path, f))
|
return base.setDriverName(base.StorageDriver.Walk(ctx, path, f, options...))
|
||||||
}
|
}
|
||||||
|
|
|
@ -290,8 +290,8 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
||||||
|
|
||||||
// Walk traverses a filesystem defined within driver, starting
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
// from the given path, calling f on each file and directory
|
// from the given path, calling f on each file and directory
|
||||||
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error {
|
||||||
return storagedriver.WalkFallback(ctx, d, path, f)
|
return storagedriver.WalkFallback(ctx, d, path, f, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// fullPath returns the absolute path of a key within the Driver's storage.
|
// fullPath returns the absolute path of a key within the Driver's storage.
|
||||||
|
|
|
@ -849,8 +849,8 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
||||||
|
|
||||||
// Walk traverses a filesystem defined within driver, starting
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
// from the given path, calling f on each file
|
// from the given path, calling f on each file
|
||||||
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error {
|
||||||
return storagedriver.WalkFallback(ctx, d, path, f)
|
return storagedriver.WalkFallback(ctx, d, path, f, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func startSession(client *http.Client, bucket string, name string) (uri string, err error) {
|
func startSession(client *http.Client, bucket string, name string) (uri string, err error) {
|
||||||
|
|
|
@ -244,8 +244,8 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
||||||
|
|
||||||
// Walk traverses a filesystem defined within driver, starting
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
// from the given path, calling f on each file and directory
|
// from the given path, calling f on each file and directory
|
||||||
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error {
|
||||||
return storagedriver.WalkFallback(ctx, d, path, f)
|
return storagedriver.WalkFallback(ctx, d, path, f, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
type writer struct {
|
type writer struct {
|
||||||
|
|
|
@ -1040,21 +1040,21 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
||||||
|
|
||||||
// Walk traverses a filesystem defined within driver, starting
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
// from the given path, calling f on each file
|
// from the given path, calling f on each file
|
||||||
func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn) error {
|
func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error {
|
||||||
var objectCount int64
|
walkOptions := &storagedriver.WalkOptions{}
|
||||||
if err := d.doWalk(ctx, &objectCount, from, f); err != nil {
|
for _, o := range options {
|
||||||
return err
|
o(walkOptions)
|
||||||
}
|
}
|
||||||
|
|
||||||
// S3 doesn't have the concept of empty directories, so it'll return path not found if there are no objects
|
var objectCount int64
|
||||||
if objectCount == 0 {
|
if err := d.doWalk(ctx, &objectCount, from, walkOptions.StartAfterHint, f); err != nil {
|
||||||
return storagedriver.PathNotFoundError{Path: from}
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, from string, f storagedriver.WalkFn) error {
|
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, from string, startAfter string, f storagedriver.WalkFn) error {
|
||||||
var (
|
var (
|
||||||
retError error
|
retError error
|
||||||
// the most recent directory walked for de-duping
|
// the most recent directory walked for de-duping
|
||||||
|
@ -1075,13 +1075,14 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, from stri
|
||||||
}
|
}
|
||||||
|
|
||||||
listObjectsInput := &s3.ListObjectsV2Input{
|
listObjectsInput := &s3.ListObjectsV2Input{
|
||||||
Bucket: aws.String(d.Bucket),
|
Bucket: aws.String(d.Bucket),
|
||||||
Prefix: aws.String(d.s3Path(path)),
|
Prefix: aws.String(d.s3Path(path)),
|
||||||
MaxKeys: aws.Int64(listMax),
|
MaxKeys: aws.Int64(listMax),
|
||||||
|
StartAfter: aws.String(d.s3Path(startAfter)),
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, done := dcontext.WithTrace(parentCtx)
|
ctx, done := dcontext.WithTrace(parentCtx)
|
||||||
defer done("s3aws.ListObjectsV2Pages(%s)", path)
|
defer done("s3aws.ListObjectsV2PagesWithContext(%s)", listObjectsInput)
|
||||||
|
|
||||||
// When the "delimiter" argument is omitted, the S3 list API will list all objects in the bucket
|
// When the "delimiter" argument is omitted, the S3 list API will list all objects in the bucket
|
||||||
// recursively, omitting directory paths. Objects are listed in sorted, depth-first order so we
|
// recursively, omitting directory paths. Objects are listed in sorted, depth-first order so we
|
||||||
|
@ -1133,11 +1134,10 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, from stri
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == storagedriver.ErrSkipDir {
|
if err == storagedriver.ErrSkipDir {
|
||||||
if walkInfo.IsDir() {
|
prevSkipDir = walkInfo.Path()
|
||||||
prevSkipDir = walkInfo.Path()
|
continue
|
||||||
continue
|
}
|
||||||
}
|
if err == storagedriver.ErrFilledBuffer {
|
||||||
// is file, stop gracefully
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
retError = err
|
retError = err
|
||||||
|
@ -1187,7 +1187,7 @@ func directoryDiff(prev, current string) []string {
|
||||||
parent := current
|
parent := current
|
||||||
for {
|
for {
|
||||||
parent = filepath.Dir(parent)
|
parent = filepath.Dir(parent)
|
||||||
if parent == "/" || parent == prev || strings.HasPrefix(prev, parent) {
|
if parent == "/" || parent == prev || strings.HasPrefix(prev+"/", parent+"/") {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
paths = append(paths, parent)
|
paths = append(paths, parent)
|
||||||
|
|
|
@ -491,6 +491,7 @@ func TestWalk(t *testing.T) {
|
||||||
|
|
||||||
fileset := []string{
|
fileset := []string{
|
||||||
"/file1",
|
"/file1",
|
||||||
|
"/folder1-suffix/file1",
|
||||||
"/folder1/file1",
|
"/folder1/file1",
|
||||||
"/folder2/file1",
|
"/folder2/file1",
|
||||||
"/folder3/subfolder1/subfolder1/file1",
|
"/folder3/subfolder1/subfolder1/file1",
|
||||||
|
@ -524,18 +525,23 @@ func TestWalk(t *testing.T) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
noopFn := func(fileInfo storagedriver.FileInfo) error { return nil }
|
||||||
|
|
||||||
tcs := []struct {
|
tcs := []struct {
|
||||||
name string
|
name string
|
||||||
fn storagedriver.WalkFn
|
fn storagedriver.WalkFn
|
||||||
from string
|
from string
|
||||||
|
options []func(*storagedriver.WalkOptions)
|
||||||
expected []string
|
expected []string
|
||||||
err bool
|
err bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "walk all",
|
name: "walk all",
|
||||||
fn: func(fileInfo storagedriver.FileInfo) error { return nil },
|
fn: noopFn,
|
||||||
expected: []string{
|
expected: []string{
|
||||||
"/file1",
|
"/file1",
|
||||||
|
"/folder1-suffix",
|
||||||
|
"/folder1-suffix/file1",
|
||||||
"/folder1",
|
"/folder1",
|
||||||
"/folder1/file1",
|
"/folder1/file1",
|
||||||
"/folder2",
|
"/folder2",
|
||||||
|
@ -564,6 +570,8 @@ func TestWalk(t *testing.T) {
|
||||||
},
|
},
|
||||||
expected: []string{
|
expected: []string{
|
||||||
"/file1",
|
"/file1",
|
||||||
|
"/folder1-suffix",
|
||||||
|
"/folder1-suffix/file1",
|
||||||
"/folder1",
|
"/folder1",
|
||||||
"/folder1/file1",
|
"/folder1/file1",
|
||||||
"/folder2",
|
"/folder2",
|
||||||
|
@ -574,22 +582,101 @@ func TestWalk(t *testing.T) {
|
||||||
"/folder4/file1",
|
"/folder4/file1",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "start late without from",
|
||||||
|
fn: noopFn,
|
||||||
|
options: []func(*storagedriver.WalkOptions){
|
||||||
|
storagedriver.WithStartAfterHint("/folder3/subfolder1/subfolder1/file1"),
|
||||||
|
},
|
||||||
|
expected: []string{
|
||||||
|
// start late
|
||||||
|
"/folder3",
|
||||||
|
"/folder3/subfolder2",
|
||||||
|
"/folder3/subfolder2/subfolder1",
|
||||||
|
"/folder3/subfolder2/subfolder1/file1",
|
||||||
|
"/folder4",
|
||||||
|
"/folder4/file1",
|
||||||
|
},
|
||||||
|
err: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "start late with from",
|
||||||
|
fn: noopFn,
|
||||||
|
from: "/folder3",
|
||||||
|
options: []func(*storagedriver.WalkOptions){
|
||||||
|
storagedriver.WithStartAfterHint("/folder3/subfolder1/subfolder1/file1"),
|
||||||
|
},
|
||||||
|
expected: []string{
|
||||||
|
// start late
|
||||||
|
"/folder3/subfolder2",
|
||||||
|
"/folder3/subfolder2/subfolder1",
|
||||||
|
"/folder3/subfolder2/subfolder1/file1",
|
||||||
|
},
|
||||||
|
err: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "start after from",
|
||||||
|
fn: noopFn,
|
||||||
|
from: "/folder1",
|
||||||
|
options: []func(*storagedriver.WalkOptions){
|
||||||
|
storagedriver.WithStartAfterHint("/folder2"),
|
||||||
|
},
|
||||||
|
expected: []string{},
|
||||||
|
err: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "start matches from",
|
||||||
|
fn: noopFn,
|
||||||
|
from: "/folder3",
|
||||||
|
options: []func(*storagedriver.WalkOptions){
|
||||||
|
storagedriver.WithStartAfterHint("/folder3"),
|
||||||
|
},
|
||||||
|
expected: []string{
|
||||||
|
"/folder3/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1/file1",
|
||||||
|
"/folder3/subfolder2",
|
||||||
|
"/folder3/subfolder2/subfolder1",
|
||||||
|
"/folder3/subfolder2/subfolder1/file1",
|
||||||
|
},
|
||||||
|
err: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "start doesn't exist",
|
||||||
|
fn: noopFn,
|
||||||
|
from: "/folder3",
|
||||||
|
options: []func(*storagedriver.WalkOptions){
|
||||||
|
storagedriver.WithStartAfterHint("/folder3/notafolder/notafile"),
|
||||||
|
},
|
||||||
|
expected: []string{
|
||||||
|
"/folder3/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1/file1",
|
||||||
|
"/folder3/subfolder2",
|
||||||
|
"/folder3/subfolder2/subfolder1",
|
||||||
|
"/folder3/subfolder2/subfolder1/file1",
|
||||||
|
},
|
||||||
|
err: false,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "stop early",
|
name: "stop early",
|
||||||
fn: func(fileInfo storagedriver.FileInfo) error {
|
fn: func(fileInfo storagedriver.FileInfo) error {
|
||||||
if fileInfo.Path() == "/folder1/file1" {
|
if fileInfo.Path() == "/folder1/file1" {
|
||||||
return storagedriver.ErrSkipDir
|
return storagedriver.ErrFilledBuffer
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
expected: []string{
|
expected: []string{
|
||||||
"/file1",
|
"/file1",
|
||||||
|
"/folder1-suffix",
|
||||||
|
"/folder1-suffix/file1",
|
||||||
"/folder1",
|
"/folder1",
|
||||||
"/folder1/file1",
|
"/folder1/file1",
|
||||||
// stop early
|
// stop early
|
||||||
},
|
},
|
||||||
err: false,
|
err: false,
|
||||||
},
|
},
|
||||||
|
|
||||||
{
|
{
|
||||||
name: "error",
|
name: "error",
|
||||||
fn: func(fileInfo storagedriver.FileInfo) error {
|
fn: func(fileInfo storagedriver.FileInfo) error {
|
||||||
|
@ -602,7 +689,7 @@ func TestWalk(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "from folder",
|
name: "from folder",
|
||||||
fn: func(fileInfo storagedriver.FileInfo) error { return nil },
|
fn: noopFn,
|
||||||
expected: []string{
|
expected: []string{
|
||||||
"/folder1/file1",
|
"/folder1/file1",
|
||||||
},
|
},
|
||||||
|
@ -619,7 +706,7 @@ func TestWalk(t *testing.T) {
|
||||||
err := drvr.Walk(context.Background(), tc.from, func(fileInfo storagedriver.FileInfo) error {
|
err := drvr.Walk(context.Background(), tc.from, func(fileInfo storagedriver.FileInfo) error {
|
||||||
walked = append(walked, fileInfo.Path())
|
walked = append(walked, fileInfo.Path())
|
||||||
return tc.fn(fileInfo)
|
return tc.fn(fileInfo)
|
||||||
})
|
}, tc.options...)
|
||||||
if tc.err && err == nil {
|
if tc.err && err == nil {
|
||||||
t.Fatalf("expected err")
|
t.Fatalf("expected err")
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,19 @@ func (version Version) Minor() uint {
|
||||||
// CurrentVersion is the current storage driver Version.
|
// CurrentVersion is the current storage driver Version.
|
||||||
const CurrentVersion Version = "0.1"
|
const CurrentVersion Version = "0.1"
|
||||||
|
|
||||||
|
// WalkOptions provides options to the walk function that may adjust its behaviour
|
||||||
|
type WalkOptions struct {
|
||||||
|
// If StartAfterHint is set, the walk may start with the first item lexographically
|
||||||
|
// after the hint, but it is not guaranteed and drivers may start the walk from the path.
|
||||||
|
StartAfterHint string
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithStartAfterHint(startAfterHint string) func(*WalkOptions) {
|
||||||
|
return func(s *WalkOptions) {
|
||||||
|
s.StartAfterHint = startAfterHint
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// StorageDriver defines methods that a Storage Driver must implement for a
|
// StorageDriver defines methods that a Storage Driver must implement for a
|
||||||
// filesystem-like key/value object storage. Storage Drivers are automatically
|
// filesystem-like key/value object storage. Storage Drivers are automatically
|
||||||
// registered via an internal registration mechanism, and generally created
|
// registered via an internal registration mechanism, and generally created
|
||||||
|
@ -88,8 +101,9 @@ type StorageDriver interface {
|
||||||
// from the given path, calling f on each file.
|
// from the given path, calling f on each file.
|
||||||
// If the returned error from the WalkFn is ErrSkipDir and fileInfo refers
|
// If the returned error from the WalkFn is ErrSkipDir and fileInfo refers
|
||||||
// to a directory, the directory will not be entered and Walk
|
// to a directory, the directory will not be entered and Walk
|
||||||
// will continue the traversal. If fileInfo refers to a normal file, processing stops
|
// will continue the traversal.
|
||||||
Walk(ctx context.Context, path string, f WalkFn) error
|
// If the returned error from the WalkFn is ErrFilledBuffer, processing stops.
|
||||||
|
Walk(ctx context.Context, path string, f WalkFn, options ...func(*WalkOptions)) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// FileWriter provides an abstraction for an opened writable file-like object in
|
// FileWriter provides an abstraction for an opened writable file-like object in
|
||||||
|
|
|
@ -3,7 +3,9 @@ package driver
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
@ -13,30 +15,88 @@ import (
|
||||||
// as an error by any function.
|
// as an error by any function.
|
||||||
var ErrSkipDir = errors.New("skip this directory")
|
var ErrSkipDir = errors.New("skip this directory")
|
||||||
|
|
||||||
|
// ErrFilledBuffer is used as a return value from onFileFunc to indicate
|
||||||
|
// that the requested number of entries has been reached and the walk can
|
||||||
|
// stop.
|
||||||
|
var ErrFilledBuffer = errors.New("we have enough entries")
|
||||||
|
|
||||||
// WalkFn is called once per file by Walk
|
// WalkFn is called once per file by Walk
|
||||||
type WalkFn func(fileInfo FileInfo) error
|
type WalkFn func(fileInfo FileInfo) error
|
||||||
|
|
||||||
// WalkFallback traverses a filesystem defined within driver, starting
|
// WalkFallback traverses a filesystem defined within driver, starting
|
||||||
// from the given path, calling f on each file. It uses the List method and Stat to drive itself.
|
// from the given path, calling f on each file. It uses the List method and Stat to drive itself.
|
||||||
// If the returned error from the WalkFn is ErrSkipDir and fileInfo refers
|
// If the returned error from the WalkFn is ErrSkipDir the directory will not be entered and Walk
|
||||||
// to a directory, the directory will not be entered and Walk
|
// will continue the traversal. If the returned error from the WalkFn is ErrFilledBuffer, the walk
|
||||||
// will continue the traversal. If fileInfo refers to a normal file, processing stops
|
// stops.
|
||||||
func WalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) error {
|
func WalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn, options ...func(*WalkOptions)) error {
|
||||||
_, err := doWalkFallback(ctx, driver, from, f)
|
walkOptions := &WalkOptions{}
|
||||||
return err
|
for _, o := range options {
|
||||||
|
o(walkOptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
startAfterHint := walkOptions.StartAfterHint
|
||||||
|
// Ensure that we are checking the hint is contained within from by adding a "/".
|
||||||
|
// Add to both in case the hint and form are the same, which would still count.
|
||||||
|
rel, err := filepath.Rel(from, startAfterHint)
|
||||||
|
if err != nil || strings.HasPrefix(rel, "..") {
|
||||||
|
// The startAfterHint is outside from, so check if we even need to walk anything
|
||||||
|
// Replace any path separators with \x00 so that the sort works in a depth-first way
|
||||||
|
if strings.ReplaceAll(startAfterHint, "/", "\x00") < strings.ReplaceAll(from, "/", "\x00") {
|
||||||
|
_, err := doWalkFallback(ctx, driver, from, "", f)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// The startAfterHint is within from.
|
||||||
|
// Walk up the tree until we hit from - we know it is contained.
|
||||||
|
// Ensure startAfterHint is never deeper than a child of the base
|
||||||
|
// directory so that doWalkFallback doesn't have to worry about
|
||||||
|
// depth-first comparisons
|
||||||
|
base := startAfterHint
|
||||||
|
for strings.HasPrefix(base, from) {
|
||||||
|
_, err = doWalkFallback(ctx, driver, base, startAfterHint, f)
|
||||||
|
switch err.(type) {
|
||||||
|
case nil:
|
||||||
|
// No error
|
||||||
|
case PathNotFoundError:
|
||||||
|
// dir doesn't exist, so nothing to walk
|
||||||
|
default:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if base == from {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
startAfterHint = base
|
||||||
|
base, _ = filepath.Split(startAfterHint)
|
||||||
|
if len(base) > 1 {
|
||||||
|
base = strings.TrimSuffix(base, "/")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func doWalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) (bool, error) {
|
// doWalkFallback performs a depth first walk using recursion.
|
||||||
|
// from is the directory that this iteration of the function should walk.
|
||||||
|
// startAfterHint is the child within from to start the walk after. It should only ever be a child of from, or the empty string.
|
||||||
|
func doWalkFallback(ctx context.Context, driver StorageDriver, from string, startAfterHint string, f WalkFn) (bool, error) {
|
||||||
children, err := driver.List(ctx, from)
|
children, err := driver.List(ctx, from)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
sort.Stable(sort.StringSlice(children))
|
sort.Strings(children)
|
||||||
for _, child := range children {
|
for _, child := range children {
|
||||||
|
// The startAfterHint has been sanitised in WalkFallback and will either be
|
||||||
|
// empty, or be suitable for an <= check for this _from_.
|
||||||
|
if child <= startAfterHint {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(stevvooe): Calling driver.Stat for every entry is quite
|
// TODO(stevvooe): Calling driver.Stat for every entry is quite
|
||||||
// expensive when running against backends with a slow Stat
|
// expensive when running against backends with a slow Stat
|
||||||
// implementation, such as s3. This is very likely a serious
|
// implementation, such as GCS. This is very likely a serious
|
||||||
// performance bottleneck.
|
// performance bottleneck.
|
||||||
|
// Those backends should have custom walk functions. See S3.
|
||||||
fileInfo, err := driver.Stat(ctx, child)
|
fileInfo, err := driver.Stat(ctx, child)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
|
@ -50,14 +110,13 @@ func doWalkFallback(ctx context.Context, driver StorageDriver, from string, f Wa
|
||||||
}
|
}
|
||||||
err = f(fileInfo)
|
err = f(fileInfo)
|
||||||
if err == nil && fileInfo.IsDir() {
|
if err == nil && fileInfo.IsDir() {
|
||||||
if ok, err := doWalkFallback(ctx, driver, child, f); err != nil || !ok {
|
if ok, err := doWalkFallback(ctx, driver, child, startAfterHint, f); err != nil || !ok {
|
||||||
return ok, err
|
return ok, err
|
||||||
}
|
}
|
||||||
} else if err == ErrSkipDir {
|
} else if err == ErrSkipDir {
|
||||||
// noop for folders, will just skip
|
// don't traverse into this directory
|
||||||
if !fileInfo.IsDir() {
|
} else if err == ErrFilledBuffer {
|
||||||
return false, nil // no error but stop iteration
|
return false, nil // no error but stop iteration
|
||||||
}
|
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,8 +2,8 @@ package driver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -36,7 +36,12 @@ type fileSystem struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cfs *fileSystem) List(_ context.Context, path string) ([]string, error) {
|
func (cfs *fileSystem) List(_ context.Context, path string) ([]string, error) {
|
||||||
return cfs.fileset[path], nil
|
children := cfs.fileset[path]
|
||||||
|
if children == nil {
|
||||||
|
return nil, PathNotFoundError{Path: path}
|
||||||
|
} else {
|
||||||
|
return children, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cfs *fileSystem) Stat(_ context.Context, path string) (FileInfo, error) {
|
func (cfs *fileSystem) Stat(_ context.Context, path string) (FileInfo, error) {
|
||||||
|
@ -78,9 +83,16 @@ func TestWalkFileRemoved(t *testing.T) {
|
||||||
func TestWalkFallback(t *testing.T) {
|
func TestWalkFallback(t *testing.T) {
|
||||||
d := &fileSystem{
|
d := &fileSystem{
|
||||||
fileset: map[string][]string{
|
fileset: map[string][]string{
|
||||||
"/": {"/file1", "/folder1", "/folder2"},
|
"/": {"/file1", "/folder1", "/folder1-suffix", "/folder2", "/folder3", "/folder4"},
|
||||||
"/folder1": {"/folder1/file1"},
|
"/folder1": {"/folder1/file1"},
|
||||||
"/folder2": {"/folder2/file1"},
|
"/folder1-suffix": {"/folder1-suffix/file1"}, // importantly, - is before / in the ascii table
|
||||||
|
"/folder2": {"/folder2/file1"},
|
||||||
|
"/folder3": {"/folder3/subfolder1", "/folder3/subfolder2"},
|
||||||
|
"/folder3/subfolder1": {"/folder3/subfolder1/subfolder1"},
|
||||||
|
"/folder3/subfolder1/subfolder1": {"/folder3/subfolder1/subfolder1/file1"},
|
||||||
|
"/folder3/subfolder2": {"/folder3/subfolder2/subfolder1"},
|
||||||
|
"/folder3/subfolder2/subfolder1": {"/folder3/subfolder2/subfolder1/file1"},
|
||||||
|
"/folder4": {"/folder4/file1"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
noopFn := func(fileInfo FileInfo) error { return nil }
|
noopFn := func(fileInfo FileInfo) error { return nil }
|
||||||
|
@ -89,6 +101,7 @@ func TestWalkFallback(t *testing.T) {
|
||||||
name string
|
name string
|
||||||
fn WalkFn
|
fn WalkFn
|
||||||
from string
|
from string
|
||||||
|
options []func(*WalkOptions)
|
||||||
expected []string
|
expected []string
|
||||||
err bool
|
err bool
|
||||||
}{
|
}{
|
||||||
|
@ -99,8 +112,19 @@ func TestWalkFallback(t *testing.T) {
|
||||||
"/file1",
|
"/file1",
|
||||||
"/folder1",
|
"/folder1",
|
||||||
"/folder1/file1",
|
"/folder1/file1",
|
||||||
|
"/folder1-suffix",
|
||||||
|
"/folder1-suffix/file1",
|
||||||
"/folder2",
|
"/folder2",
|
||||||
"/folder2/file1",
|
"/folder2/file1",
|
||||||
|
"/folder3",
|
||||||
|
"/folder3/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1/file1",
|
||||||
|
"/folder3/subfolder2",
|
||||||
|
"/folder3/subfolder2/subfolder1",
|
||||||
|
"/folder3/subfolder2/subfolder1/file1",
|
||||||
|
"/folder4",
|
||||||
|
"/folder4/file1",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -109,7 +133,7 @@ func TestWalkFallback(t *testing.T) {
|
||||||
if fileInfo.Path() == "/folder1" {
|
if fileInfo.Path() == "/folder1" {
|
||||||
return ErrSkipDir
|
return ErrSkipDir
|
||||||
}
|
}
|
||||||
if strings.Contains(fileInfo.Path(), "/folder1") {
|
if fileInfo.Path() == "/folder1" {
|
||||||
t.Fatalf("skipped dir %s and should not walk %s", "/folder1", fileInfo.Path())
|
t.Fatalf("skipped dir %s and should not walk %s", "/folder1", fileInfo.Path())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -118,15 +142,124 @@ func TestWalkFallback(t *testing.T) {
|
||||||
"/file1",
|
"/file1",
|
||||||
"/folder1", // return ErrSkipDir, skip anything under /folder1
|
"/folder1", // return ErrSkipDir, skip anything under /folder1
|
||||||
// skip /folder1/file1
|
// skip /folder1/file1
|
||||||
|
"/folder1-suffix",
|
||||||
|
"/folder1-suffix/file1",
|
||||||
"/folder2",
|
"/folder2",
|
||||||
"/folder2/file1",
|
"/folder2/file1",
|
||||||
|
"/folder3",
|
||||||
|
"/folder3/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1/file1",
|
||||||
|
"/folder3/subfolder2",
|
||||||
|
"/folder3/subfolder2/subfolder1",
|
||||||
|
"/folder3/subfolder2/subfolder1/file1",
|
||||||
|
"/folder4",
|
||||||
|
"/folder4/file1",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "start late expecting -suffix",
|
||||||
|
fn: noopFn,
|
||||||
|
options: []func(*WalkOptions){
|
||||||
|
WithStartAfterHint("/folder1/file1"),
|
||||||
|
},
|
||||||
|
expected: []string{
|
||||||
|
"/folder1-suffix",
|
||||||
|
"/folder1-suffix/file1",
|
||||||
|
"/folder2",
|
||||||
|
"/folder2/file1",
|
||||||
|
"/folder3",
|
||||||
|
"/folder3/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1/file1",
|
||||||
|
"/folder3/subfolder2",
|
||||||
|
"/folder3/subfolder2/subfolder1",
|
||||||
|
"/folder3/subfolder2/subfolder1/file1",
|
||||||
|
"/folder4",
|
||||||
|
"/folder4/file1",
|
||||||
|
},
|
||||||
|
err: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "start late without from",
|
||||||
|
fn: noopFn,
|
||||||
|
options: []func(*WalkOptions){
|
||||||
|
WithStartAfterHint("/folder3/subfolder1/subfolder1/file1"),
|
||||||
|
},
|
||||||
|
expected: []string{
|
||||||
|
// start late
|
||||||
|
"/folder3/subfolder2",
|
||||||
|
"/folder3/subfolder2/subfolder1",
|
||||||
|
"/folder3/subfolder2/subfolder1/file1",
|
||||||
|
"/folder4",
|
||||||
|
"/folder4/file1",
|
||||||
|
},
|
||||||
|
err: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "start late with from",
|
||||||
|
fn: noopFn,
|
||||||
|
from: "/folder3",
|
||||||
|
options: []func(*WalkOptions){
|
||||||
|
WithStartAfterHint("/folder3/subfolder1/subfolder1/file1"),
|
||||||
|
},
|
||||||
|
expected: []string{
|
||||||
|
// start late
|
||||||
|
"/folder3/subfolder2",
|
||||||
|
"/folder3/subfolder2/subfolder1",
|
||||||
|
"/folder3/subfolder2/subfolder1/file1",
|
||||||
|
},
|
||||||
|
err: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "start after from",
|
||||||
|
fn: noopFn,
|
||||||
|
from: "/folder1",
|
||||||
|
options: []func(*WalkOptions){
|
||||||
|
WithStartAfterHint("/folder1-suffix"),
|
||||||
|
},
|
||||||
|
expected: []string{},
|
||||||
|
err: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "start matches from",
|
||||||
|
fn: noopFn,
|
||||||
|
from: "/folder3",
|
||||||
|
options: []func(*WalkOptions){
|
||||||
|
WithStartAfterHint("/folder3"),
|
||||||
|
},
|
||||||
|
expected: []string{
|
||||||
|
"/folder3/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1/file1",
|
||||||
|
"/folder3/subfolder2",
|
||||||
|
"/folder3/subfolder2/subfolder1",
|
||||||
|
"/folder3/subfolder2/subfolder1/file1",
|
||||||
|
},
|
||||||
|
err: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "start doesn't exist",
|
||||||
|
fn: noopFn,
|
||||||
|
from: "/folder3",
|
||||||
|
options: []func(*WalkOptions){
|
||||||
|
WithStartAfterHint("/folder3/notafolder/notafile"),
|
||||||
|
},
|
||||||
|
expected: []string{
|
||||||
|
"/folder3/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1",
|
||||||
|
"/folder3/subfolder1/subfolder1/file1",
|
||||||
|
"/folder3/subfolder2",
|
||||||
|
"/folder3/subfolder2/subfolder1",
|
||||||
|
"/folder3/subfolder2/subfolder1/file1",
|
||||||
|
},
|
||||||
|
err: false,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "stop early",
|
name: "stop early",
|
||||||
fn: func(fileInfo FileInfo) error {
|
fn: func(fileInfo FileInfo) error {
|
||||||
if fileInfo.Path() == "/folder1/file1" {
|
if fileInfo.Path() == "/folder1/file1" {
|
||||||
return ErrSkipDir
|
return ErrFilledBuffer
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
@ -137,6 +270,16 @@ func TestWalkFallback(t *testing.T) {
|
||||||
// stop early
|
// stop early
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "error",
|
||||||
|
fn: func(fileInfo FileInfo) error {
|
||||||
|
return errors.New("foo")
|
||||||
|
},
|
||||||
|
expected: []string{
|
||||||
|
"/file1",
|
||||||
|
},
|
||||||
|
err: true,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "from folder",
|
name: "from folder",
|
||||||
fn: noopFn,
|
fn: noopFn,
|
||||||
|
@ -159,7 +302,7 @@ func TestWalkFallback(t *testing.T) {
|
||||||
t.Fatalf("fileInfo isDir not matching file system: expected %t actual %t", d.isDir(fileInfo.Path()), fileInfo.IsDir())
|
t.Fatalf("fileInfo isDir not matching file system: expected %t actual %t", d.isDir(fileInfo.Path()), fileInfo.IsDir())
|
||||||
}
|
}
|
||||||
return tc.fn(fileInfo)
|
return tc.fn(fileInfo)
|
||||||
})
|
}, tc.options...)
|
||||||
if tc.err && err == nil {
|
if tc.err && err == nil {
|
||||||
t.Fatalf("expected err")
|
t.Fatalf("expected err")
|
||||||
}
|
}
|
||||||
|
@ -177,7 +320,7 @@ func compareWalked(t *testing.T, expected, walked []string) {
|
||||||
}
|
}
|
||||||
for i := range walked {
|
for i := range walked {
|
||||||
if walked[i] != expected[i] {
|
if walked[i] != expected[i] {
|
||||||
t.Fatalf("expected walked to come in order expected: walked %s", walked)
|
t.Fatalf("expected walked to come in order expected: walked %s; expected %s;", walked, expected)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,7 +79,8 @@ const (
|
||||||
//
|
//
|
||||||
// Manifests:
|
// Manifests:
|
||||||
//
|
//
|
||||||
// manifestRevisionsPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/
|
// manifestsPathSpec: <root>/v2/repositories/<name>/_manifests
|
||||||
|
// manifestRevisionsPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/
|
||||||
// manifestRevisionPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/<algorithm>/<hex digest>/
|
// manifestRevisionPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/<algorithm>/<hex digest>/
|
||||||
// manifestRevisionLinkPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/<algorithm>/<hex digest>/link
|
// manifestRevisionLinkPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/<algorithm>/<hex digest>/link
|
||||||
//
|
//
|
||||||
|
@ -130,6 +131,9 @@ func pathFor(spec pathSpec) (string, error) {
|
||||||
|
|
||||||
switch v := spec.(type) {
|
switch v := spec.(type) {
|
||||||
|
|
||||||
|
case manifestsPathSpec:
|
||||||
|
return path.Join(append(repoPrefix, v.name, "_manifests")...), nil
|
||||||
|
|
||||||
case manifestRevisionsPathSpec:
|
case manifestRevisionsPathSpec:
|
||||||
return path.Join(append(repoPrefix, v.name, "_manifests", "revisions")...), nil
|
return path.Join(append(repoPrefix, v.name, "_manifests", "revisions")...), nil
|
||||||
|
|
||||||
|
@ -256,6 +260,13 @@ type pathSpec interface {
|
||||||
pathSpec()
|
pathSpec()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// manifestPathSpec describes the directory path for a manifest.
|
||||||
|
type manifestsPathSpec struct {
|
||||||
|
name string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manifestsPathSpec) pathSpec() {}
|
||||||
|
|
||||||
// manifestRevisionsPathSpec describes the directory path for
|
// manifestRevisionsPathSpec describes the directory path for
|
||||||
// a manifest revision.
|
// a manifest revision.
|
||||||
type manifestRevisionsPathSpec struct {
|
type manifestRevisionsPathSpec struct {
|
||||||
|
|
Loading…
Reference in a new issue