From ef4ab7a8850affa0033a387fb0c773975eb452c4 Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Fri, 30 Oct 2015 17:08:56 +0100 Subject: [PATCH] Use bulk delete to remove segments in Swift driver Signed-off-by: Sylvain Baubeau --- registry/storage/driver/swift/swift.go | 49 ++++++++++----------- registry/storage/driver/swift/swift_test.go | 21 ++++++++- 2 files changed, 43 insertions(+), 27 deletions(-) diff --git a/registry/storage/driver/swift/swift.go b/registry/storage/driver/swift/swift.go index e0dada31..6d021ea4 100644 --- a/registry/storage/driver/swift/swift.go +++ b/registry/storage/driver/swift/swift.go @@ -629,19 +629,6 @@ func (d *driver) Delete(ctx context.Context, path string) error { return err } - if len(objects) > 0 && d.BulkDeleteSupport { - filenames := make([]string, len(objects)) - for i, obj := range objects { - filenames[i] = obj.Name - } - if _, err := d.Conn.BulkDelete(d.Container, filenames); err != swift.Forbidden { - if err == swift.ContainerNotFound { - return storagedriver.PathNotFoundError{Path: path} - } - return err - } - } - for _, obj := range objects { if obj.PseudoDirectory { continue @@ -649,20 +636,12 @@ func (d *driver) Delete(ctx context.Context, path string) error { if _, headers, err := d.Conn.Object(d.Container, obj.Name); err == nil { manifest, ok := headers["X-Object-Manifest"] if ok { - segContainer, prefix := parseManifest(manifest) + _, prefix := parseManifest(manifest) segments, err := d.getAllSegments(prefix) if err != nil { return err } - - for _, s := range segments { - if err := d.Conn.ObjectDelete(segContainer, s.Name); err != nil { - if err == swift.ObjectNotFound { - return storagedriver.PathNotFoundError{Path: s.Name} - } - return err - } - } + objects = append(objects, segments...) } } else { if err == swift.ObjectNotFound { @@ -670,13 +649,31 @@ func (d *driver) Delete(ctx context.Context, path string) error { } return err } + } - if err := d.Conn.ObjectDelete(d.Container, obj.Name); err != nil { - if err == swift.ObjectNotFound { - return storagedriver.PathNotFoundError{Path: obj.Name} + if d.BulkDeleteSupport && len(objects) > 0 { + filenames := make([]string, len(objects)) + for i, obj := range objects { + filenames[i] = obj.Name + } + _, err = d.Conn.BulkDelete(d.Container, filenames) + // Don't fail on ObjectNotFound because eventual consistency + // makes this situation normal. + if err != nil && err != swift.Forbidden && err != swift.ObjectNotFound { + if err == swift.ContainerNotFound { + return storagedriver.PathNotFoundError{Path: path} } return err } + } else { + for _, obj := range objects { + if err := d.Conn.ObjectDelete(d.Container, obj.Name); err != nil { + if err == swift.ObjectNotFound { + return storagedriver.PathNotFoundError{Path: obj.Name} + } + return err + } + } } _, _, err = d.Conn.Object(d.Container, d.swiftPath(path)) diff --git a/registry/storage/driver/swift/swift_test.go b/registry/storage/driver/swift/swift_test.go index c4c3333c..b2ff6001 100644 --- a/registry/storage/driver/swift/swift_test.go +++ b/registry/storage/driver/swift/swift_test.go @@ -134,7 +134,6 @@ func TestEmptyRootList(t *testing.T) { if err != nil { t.Fatalf("unexpected error creating content: %v", err) } - defer rootedDriver.Delete(ctx, filename) keys, err := emptyRootDriver.List(ctx, "/") for _, path := range keys { @@ -149,4 +148,24 @@ func TestEmptyRootList(t *testing.T) { t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp) } } + + // Create an object with a path nested under the existing object + err = rootedDriver.PutContent(ctx, filename+"/file1", contents) + if err != nil { + t.Fatalf("unexpected error creating content: %v", err) + } + + err = rootedDriver.Delete(ctx, filename) + if err != nil { + t.Fatalf("failed to delete: %v", err) + } + + keys, err = rootedDriver.List(ctx, "/") + if err != nil { + t.Fatalf("failed to list objects after deletion: %v", err) + } + + if len(keys) != 0 { + t.Fatal("delete did not remove nested objects") + } }