Merge pull request #1388 from aibaars/gcs-simplify-move

StorageDriver: GCS: remove support for directory Moves
This commit is contained in:
Richard Scothern 2016-03-03 10:20:51 -08:00
commit becdd83131
2 changed files with 49 additions and 39 deletions

View file

@ -38,6 +38,8 @@ import (
"google.golang.org/cloud" "google.golang.org/cloud"
"google.golang.org/cloud/storage" "google.golang.org/cloud/storage"
"github.com/Sirupsen/logrus"
ctx "github.com/docker/distribution/context" ctx "github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base" "github.com/docker/distribution/registry/storage/driver/base"
@ -469,43 +471,8 @@ func (d *driver) List(context ctx.Context, path string) ([]string, error) {
// Move moves an object stored at sourcePath to destPath, removing the // Move moves an object stored at sourcePath to destPath, removing the
// original object. // original object.
func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) error { func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) error {
prefix := d.pathToDirKey(sourcePath)
gcsContext := d.context(context) gcsContext := d.context(context)
keys, err := d.listAll(gcsContext, prefix) _, err := storageCopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil)
if err != nil {
return err
}
if len(keys) > 0 {
destPrefix := d.pathToDirKey(destPath)
copies := make([]string, 0, len(keys))
sort.Strings(keys)
var err error
for _, key := range keys {
dest := destPrefix + key[len(prefix):]
_, err = storageCopyObject(gcsContext, d.bucket, key, d.bucket, dest, nil)
if err == nil {
copies = append(copies, dest)
} else {
break
}
}
// if an error occurred, attempt to cleanup the copies made
if err != nil {
for i := len(copies) - 1; i >= 0; i-- {
_ = storageDeleteObject(gcsContext, d.bucket, copies[i])
}
return err
}
// delete originals
for i := len(keys) - 1; i >= 0; i-- {
err2 := storageDeleteObject(gcsContext, d.bucket, keys[i])
if err2 != nil {
err = err2
}
}
return err
}
_, err = storageCopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil)
if err != nil { if err != nil {
if status := err.(*googleapi.Error); status != nil { if status := err.(*googleapi.Error); status != nil {
if status.Code == http.StatusNotFound { if status.Code == http.StatusNotFound {
@ -514,7 +481,13 @@ func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) e
} }
return err return err
} }
return storageDeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath)) err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath))
// if deleting the file fails, log the error, but do not fail; the file was succesfully copied,
// and the original should eventually be cleaned when purging the uploads folder.
if err != nil {
logrus.Infof("error deleting file: %v due to %v", sourcePath, err)
}
return nil
} }
// listAll recursively lists all names of objects stored at "prefix" and its subpaths. // listAll recursively lists all names of objects stored at "prefix" and its subpaths.
@ -530,8 +503,8 @@ func (d *driver) listAll(context context.Context, prefix string) ([]string, erro
} }
for _, obj := range objects.Results { for _, obj := range objects.Results {
// GCS does not guarantee strong consistency between // GCS does not guarantee strong consistency between
// DELETE and LIST operationsCheck that the object is not deleted, // DELETE and LIST operations. Check that the object is not deleted,
// so filter out any objects with a non-zero time-deleted // and filter out any objects with a non-zero time-deleted
if obj.Deleted.IsZero() { if obj.Deleted.IsZero() {
list = append(list, obj.Name) list = append(list, obj.Name)
} }

View file

@ -175,3 +175,40 @@ func TestEmptyRootList(t *testing.T) {
} }
} }
} }
// TestMoveDirectory checks that moving a directory returns an error.
func TestMoveDirectory(t *testing.T) {
if skipGCS() != "" {
t.Skip(skipGCS())
}
validRoot, err := ioutil.TempDir("", "driver-")
if err != nil {
t.Fatalf("unexpected error creating temporary directory: %v", err)
}
defer os.Remove(validRoot)
driver, err := gcsDriverConstructor(validRoot)
if err != nil {
t.Fatalf("unexpected error creating rooted driver: %v", err)
}
ctx := ctx.Background()
contents := []byte("contents")
// Create a regular file.
err = driver.PutContent(ctx, "/parent/dir/foo", contents)
if err != nil {
t.Fatalf("unexpected error creating content: %v", err)
}
defer func() {
err := driver.Delete(ctx, "/parent")
if err != nil {
t.Fatalf("failed to remove /parent due to %v\n", err)
}
}()
err = driver.Move(ctx, "/parent/dir", "/parent/other")
if err == nil {
t.Fatalf("Moving directory /parent/dir /parent/other should have return a non-nil error\n")
}
}