forked from TrueCloudLab/distribution
Introduce Walk Method Per Storage Driver
Move the Walk types into registry/storage/driver, and add a Walk method to each storage driver. Although this is yet another API to implement, there is a fall back implementation that relies on List and Stat. For some filesystems this is very slow. Also, this WalkDir Method conforms better do a traditional WalkDir (a la filepath). This change is in preparation for refactoring. Signed-off-by: Sargun Dhillon <sargun@sargun.me>
This commit is contained in:
parent
9f664468ea
commit
32ac467992
15 changed files with 129 additions and 18 deletions
|
@ -144,9 +144,9 @@ func handleRepository(fileInfo driver.FileInfo, root, last string, fn func(repoP
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ErrSkipDir
|
return driver.ErrSkipDir
|
||||||
} else if strings.HasPrefix(file, "_") {
|
} else if strings.HasPrefix(file, "_") {
|
||||||
return ErrSkipDir
|
return driver.ErrSkipDir
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -336,6 +336,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
||||||
return d.client.GetBlobSASURI(d.container, path, expiresTime, "r")
|
return d.client.GetBlobSASURI(d.container, path, expiresTime, "r")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
|
// from the given path, calling f on each file
|
||||||
|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
||||||
|
return storagedriver.WalkFallback(ctx, d, path, f)
|
||||||
|
}
|
||||||
|
|
||||||
// directDescendants will find direct descendants (blobs or virtual containers)
|
// directDescendants will find direct descendants (blobs or virtual containers)
|
||||||
// of from list of blob paths and will return their full paths. Elements in blobs
|
// of from list of blob paths and will return their full paths. Elements in blobs
|
||||||
// list must be prefixed with a "/" and
|
// list must be prefixed with a "/" and
|
||||||
|
|
|
@ -197,3 +197,15 @@ func (base *Base) URLFor(ctx context.Context, path string, options map[string]in
|
||||||
str, e := base.StorageDriver.URLFor(ctx, path, options)
|
str, e := base.StorageDriver.URLFor(ctx, path, options)
|
||||||
return str, base.setDriverName(e)
|
return str, base.setDriverName(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Walk wraps Walk of underlying storage driver.
|
||||||
|
func (base *Base) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
||||||
|
ctx, done := dcontext.WithTrace(ctx)
|
||||||
|
defer done("%s.Walk(%q)", base.Name(), path)
|
||||||
|
|
||||||
|
if !storagedriver.PathRegexp.MatchString(path) {
|
||||||
|
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
|
||||||
|
}
|
||||||
|
|
||||||
|
return base.setDriverName(base.StorageDriver.Walk(ctx, path, f))
|
||||||
|
}
|
||||||
|
|
|
@ -315,6 +315,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
||||||
return "", storagedriver.ErrUnsupportedMethod{}
|
return "", storagedriver.ErrUnsupportedMethod{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
|
// from the given path, calling f on each file
|
||||||
|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
||||||
|
return storagedriver.WalkFallback(ctx, d, path, f)
|
||||||
|
}
|
||||||
|
|
||||||
// fullPath returns the absolute path of a key within the Driver's storage.
|
// fullPath returns the absolute path of a key within the Driver's storage.
|
||||||
func (d *driver) fullPath(subPath string) string {
|
func (d *driver) fullPath(subPath string) string {
|
||||||
return path.Join(d.rootDirectory, subPath)
|
return path.Join(d.rootDirectory, subPath)
|
||||||
|
|
|
@ -779,6 +779,12 @@ func (d *driver) URLFor(context context.Context, path string, options map[string
|
||||||
return storage.SignedURL(d.bucket, name, opts)
|
return storage.SignedURL(d.bucket, name, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
|
// from the given path, calling f on each file
|
||||||
|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
||||||
|
return storagedriver.WalkFallback(ctx, d, path, f)
|
||||||
|
}
|
||||||
|
|
||||||
func startSession(client *http.Client, bucket string, name string) (uri string, err error) {
|
func startSession(client *http.Client, bucket string, name string) (uri string, err error) {
|
||||||
u := &url.URL{
|
u := &url.URL{
|
||||||
Scheme: "https",
|
Scheme: "https",
|
||||||
|
|
|
@ -240,6 +240,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
||||||
return "", storagedriver.ErrUnsupportedMethod{}
|
return "", storagedriver.ErrUnsupportedMethod{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
|
// from the given path, calling f on each file
|
||||||
|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
||||||
|
return storagedriver.WalkFallback(ctx, d, path, f)
|
||||||
|
}
|
||||||
|
|
||||||
type writer struct {
|
type writer struct {
|
||||||
d *driver
|
d *driver
|
||||||
f *file
|
f *file
|
||||||
|
|
|
@ -479,6 +479,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
||||||
return signedURL, nil
|
return signedURL, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
|
// from the given path, calling f on each file
|
||||||
|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
||||||
|
return storagedriver.WalkFallback(ctx, d, path, f)
|
||||||
|
}
|
||||||
|
|
||||||
func (d *driver) ossPath(path string) string {
|
func (d *driver) ossPath(path string) string {
|
||||||
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
|
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
|
||||||
}
|
}
|
||||||
|
|
|
@ -874,6 +874,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
||||||
return req.Presign(expiresIn)
|
return req.Presign(expiresIn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
|
// from the given path, calling f on each file
|
||||||
|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
||||||
|
return storagedriver.WalkFallback(ctx, d, path, f)
|
||||||
|
}
|
||||||
|
|
||||||
func (d *driver) s3Path(path string) string {
|
func (d *driver) s3Path(path string) string {
|
||||||
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
|
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
|
||||||
}
|
}
|
||||||
|
|
|
@ -546,6 +546,12 @@ func (d *Driver) S3BucketKey(path string) string {
|
||||||
return d.StorageDriver.(*driver).s3Path(path)
|
return d.StorageDriver.(*driver).s3Path(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
|
// from the given path, calling f on each file
|
||||||
|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
||||||
|
return storagedriver.WalkFallback(ctx, d, path, f)
|
||||||
|
}
|
||||||
|
|
||||||
func parseError(path string, err error) error {
|
func parseError(path string, err error) error {
|
||||||
if s3Err, ok := err.(*s3.Error); ok && s3Err.Code == "NoSuchKey" {
|
if s3Err, ok := err.(*s3.Error); ok && s3Err.Code == "NoSuchKey" {
|
||||||
return storagedriver.PathNotFoundError{Path: path}
|
return storagedriver.PathNotFoundError{Path: path}
|
||||||
|
|
|
@ -83,6 +83,13 @@ type StorageDriver interface {
|
||||||
// May return an ErrUnsupportedMethod in certain StorageDriver
|
// May return an ErrUnsupportedMethod in certain StorageDriver
|
||||||
// implementations.
|
// implementations.
|
||||||
URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error)
|
URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error)
|
||||||
|
|
||||||
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
|
// from the given path, calling f on each file.
|
||||||
|
// If the returned error from the WalkFn is ErrSkipDir and fileInfo refers
|
||||||
|
// to a directory, the directory will not be entered and Walk
|
||||||
|
// will continue the traversal. If fileInfo refers to a normal file, processing stops
|
||||||
|
Walk(ctx context.Context, path string, f WalkFn) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// FileWriter provides an abstraction for an opened writable file-like object in
|
// FileWriter provides an abstraction for an opened writable file-like object in
|
||||||
|
|
|
@ -644,6 +644,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
||||||
return tempURL, nil
|
return tempURL, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
|
// from the given path, calling f on each file
|
||||||
|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
||||||
|
return storagedriver.WalkFallback(ctx, d, path, f)
|
||||||
|
}
|
||||||
|
|
||||||
func (d *driver) swiftPath(path string) string {
|
func (d *driver) swiftPath(path string) string {
|
||||||
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
|
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
|
||||||
}
|
}
|
||||||
|
|
52
registry/storage/driver/walk.go
Normal file
52
registry/storage/driver/walk.go
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
package driver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sort"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrSkipDir is used as a return value from onFileFunc to indicate that
|
||||||
|
// the directory named in the call is to be skipped. It is not returned
|
||||||
|
// as an error by any function.
|
||||||
|
var ErrSkipDir = errors.New("skip this directory")
|
||||||
|
|
||||||
|
// WalkFn is called once per file by Walk
|
||||||
|
type WalkFn func(fileInfo FileInfo) error
|
||||||
|
|
||||||
|
// WalkFallback traverses a filesystem defined within driver, starting
|
||||||
|
// from the given path, calling f on each file. It uses the List method and Stat to drive itself.
|
||||||
|
// If the returned error from the WalkFn is ErrSkipDir and fileInfo refers
|
||||||
|
// to a directory, the directory will not be entered and Walk
|
||||||
|
// will continue the traversal. If fileInfo refers to a normal file, processing stops
|
||||||
|
func WalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) error {
|
||||||
|
children, err := driver.List(ctx, from)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sort.Stable(sort.StringSlice(children))
|
||||||
|
for _, child := range children {
|
||||||
|
// TODO(stevvooe): Calling driver.Stat for every entry is quite
|
||||||
|
// expensive when running against backends with a slow Stat
|
||||||
|
// implementation, such as s3. This is very likely a serious
|
||||||
|
// performance bottleneck.
|
||||||
|
fileInfo, err := driver.Stat(ctx, child)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = f(fileInfo)
|
||||||
|
if err == nil && fileInfo.IsDir() {
|
||||||
|
if err := WalkFallback(ctx, driver, child, f); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else if err == ErrSkipDir {
|
||||||
|
// Stop iteration if it's a file, otherwise noop if it's a directory
|
||||||
|
if !fileInfo.IsDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -75,7 +75,7 @@ func getOutstandingUploads(ctx context.Context, driver storageDriver.StorageDriv
|
||||||
inUploadDir = (file == "_uploads")
|
inUploadDir = (file == "_uploads")
|
||||||
|
|
||||||
if fileInfo.IsDir() && !inUploadDir {
|
if fileInfo.IsDir() && !inUploadDir {
|
||||||
return ErrSkipDir
|
return storageDriver.ErrSkipDir
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,27 +2,19 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
storageDriver "github.com/docker/distribution/registry/storage/driver"
|
storageDriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrSkipDir is used as a return value from onFileFunc to indicate that
|
|
||||||
// the directory named in the call is to be skipped. It is not returned
|
|
||||||
// as an error by any function.
|
|
||||||
var ErrSkipDir = errors.New("skip this directory")
|
|
||||||
|
|
||||||
// WalkFn is called once per file by Walk
|
|
||||||
// If the returned error is ErrSkipDir and fileInfo refers
|
|
||||||
// to a directory, the directory will not be entered and Walk
|
|
||||||
// will continue the traversal. Otherwise Walk will return
|
|
||||||
type WalkFn func(fileInfo storageDriver.FileInfo) error
|
|
||||||
|
|
||||||
// Walk traverses a filesystem defined within driver, starting
|
// Walk traverses a filesystem defined within driver, starting
|
||||||
// from the given path, calling f on each file
|
// from the given path, calling f on each file
|
||||||
func Walk(ctx context.Context, driver storageDriver.StorageDriver, from string, f WalkFn) error {
|
// If the returned error from the WalkFn is ErrSkipDir and fileInfo refers
|
||||||
|
// to a directory, the directory will not be entered and Walk
|
||||||
|
// will continue the traversal. Otherwise Walk will return
|
||||||
|
// the error
|
||||||
|
func Walk(ctx context.Context, driver storageDriver.StorageDriver, from string, f storageDriver.WalkFn) error {
|
||||||
children, err := driver.List(ctx, from)
|
children, err := driver.List(ctx, from)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -38,7 +30,7 @@ func Walk(ctx context.Context, driver storageDriver.StorageDriver, from string,
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = f(fileInfo)
|
err = f(fileInfo)
|
||||||
skipDir := (err == ErrSkipDir)
|
skipDir := (err == storageDriver.ErrSkipDir)
|
||||||
if err != nil && !skipDir {
|
if err != nil && !skipDir {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,7 +131,7 @@ func TestWalkSkipDir(t *testing.T) {
|
||||||
filePath := fileInfo.Path()
|
filePath := fileInfo.Path()
|
||||||
if filePath == "/a/b" {
|
if filePath == "/a/b" {
|
||||||
// skip processing /a/b/c and /a/b/c/d
|
// skip processing /a/b/c and /a/b/c/d
|
||||||
return ErrSkipDir
|
return driver.ErrSkipDir
|
||||||
}
|
}
|
||||||
delete(expected, filePath)
|
delete(expected, filePath)
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in a new issue