Use bulk delete to remove segments in Swift driver

Signed-off-by: Sylvain Baubeau <sbaubeau@redhat.com>
This commit is contained in:
Sylvain Baubeau 2015-10-30 17:08:56 +01:00
parent 329c353411
commit ef4ab7a885
2 changed files with 43 additions and 27 deletions

View file

@ -629,19 +629,6 @@ func (d *driver) Delete(ctx context.Context, path string) error {
return err return err
} }
if len(objects) > 0 && d.BulkDeleteSupport {
filenames := make([]string, len(objects))
for i, obj := range objects {
filenames[i] = obj.Name
}
if _, err := d.Conn.BulkDelete(d.Container, filenames); err != swift.Forbidden {
if err == swift.ContainerNotFound {
return storagedriver.PathNotFoundError{Path: path}
}
return err
}
}
for _, obj := range objects { for _, obj := range objects {
if obj.PseudoDirectory { if obj.PseudoDirectory {
continue continue
@ -649,20 +636,12 @@ func (d *driver) Delete(ctx context.Context, path string) error {
if _, headers, err := d.Conn.Object(d.Container, obj.Name); err == nil { if _, headers, err := d.Conn.Object(d.Container, obj.Name); err == nil {
manifest, ok := headers["X-Object-Manifest"] manifest, ok := headers["X-Object-Manifest"]
if ok { if ok {
segContainer, prefix := parseManifest(manifest) _, prefix := parseManifest(manifest)
segments, err := d.getAllSegments(prefix) segments, err := d.getAllSegments(prefix)
if err != nil { if err != nil {
return err return err
} }
objects = append(objects, segments...)
for _, s := range segments {
if err := d.Conn.ObjectDelete(segContainer, s.Name); err != nil {
if err == swift.ObjectNotFound {
return storagedriver.PathNotFoundError{Path: s.Name}
}
return err
}
}
} }
} else { } else {
if err == swift.ObjectNotFound { if err == swift.ObjectNotFound {
@ -670,13 +649,31 @@ func (d *driver) Delete(ctx context.Context, path string) error {
} }
return err return err
} }
}
if err := d.Conn.ObjectDelete(d.Container, obj.Name); err != nil { if d.BulkDeleteSupport && len(objects) > 0 {
if err == swift.ObjectNotFound { filenames := make([]string, len(objects))
return storagedriver.PathNotFoundError{Path: obj.Name} for i, obj := range objects {
filenames[i] = obj.Name
}
_, err = d.Conn.BulkDelete(d.Container, filenames)
// Don't fail on ObjectNotFound because eventual consistency
// makes this situation normal.
if err != nil && err != swift.Forbidden && err != swift.ObjectNotFound {
if err == swift.ContainerNotFound {
return storagedriver.PathNotFoundError{Path: path}
} }
return err return err
} }
} else {
for _, obj := range objects {
if err := d.Conn.ObjectDelete(d.Container, obj.Name); err != nil {
if err == swift.ObjectNotFound {
return storagedriver.PathNotFoundError{Path: obj.Name}
}
return err
}
}
} }
_, _, err = d.Conn.Object(d.Container, d.swiftPath(path)) _, _, err = d.Conn.Object(d.Container, d.swiftPath(path))

View file

@ -134,7 +134,6 @@ func TestEmptyRootList(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error creating content: %v", err) t.Fatalf("unexpected error creating content: %v", err)
} }
defer rootedDriver.Delete(ctx, filename)
keys, err := emptyRootDriver.List(ctx, "/") keys, err := emptyRootDriver.List(ctx, "/")
for _, path := range keys { for _, path := range keys {
@ -149,4 +148,24 @@ func TestEmptyRootList(t *testing.T) {
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp) t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
} }
} }
// Create an object with a path nested under the existing object
err = rootedDriver.PutContent(ctx, filename+"/file1", contents)
if err != nil {
t.Fatalf("unexpected error creating content: %v", err)
}
err = rootedDriver.Delete(ctx, filename)
if err != nil {
t.Fatalf("failed to delete: %v", err)
}
keys, err = rootedDriver.List(ctx, "/")
if err != nil {
t.Fatalf("failed to list objects after deletion: %v", err)
}
if len(keys) != 0 {
t.Fatal("delete did not remove nested objects")
}
} }