diff --git a/registry/storage/driver/swift/swift.go b/registry/storage/driver/swift/swift.go index 68013e21..242f1310 100644 --- a/registry/storage/driver/swift/swift.go +++ b/registry/storage/driver/swift/swift.go @@ -91,6 +91,9 @@ type swiftInfo struct { Tempurl struct { Methods []string `mapstructure:"methods"` } + BulkDelete struct { + MaxDeletesPerRequest int `mapstructure:"max_deletes_per_request"` + } `mapstructure:"bulk_delete"` } func init() { @@ -105,15 +108,16 @@ func (factory *swiftDriverFactory) Create(parameters map[string]interface{}) (st } type driver struct { - Conn swift.Connection - Container string - Prefix string - BulkDeleteSupport bool - ChunkSize int - SecretKey string - AccessKey string - TempURLContainerKey bool - TempURLMethods []string + Conn swift.Connection + Container string + Prefix string + BulkDeleteSupport bool + BulkDeleteMaxDeletes int + ChunkSize int + SecretKey string + AccessKey string + TempURLContainerKey bool + TempURLMethods []string } type baseEmbed struct { @@ -221,6 +225,9 @@ func New(params Parameters) (*Driver, error) { if err := mapstructure.Decode(config, &info); err == nil { d.TempURLContainerKey = info.Swift.Version >= "2.3.0" d.TempURLMethods = info.Tempurl.Methods + if d.BulkDeleteSupport { + d.BulkDeleteMaxDeletes = info.BulkDelete.MaxDeletesPerRequest + } } } else { d.TempURLContainerKey = params.TempURLContainerKey @@ -532,20 +539,27 @@ func (d *driver) Delete(ctx context.Context, path string) error { } } - if d.BulkDeleteSupport && len(objects) > 0 { + if d.BulkDeleteSupport && len(objects) > 0 && d.BulkDeleteMaxDeletes > 0 { filenames := make([]string, len(objects)) for i, obj := range objects { filenames[i] = obj.Name } - _, err = d.Conn.BulkDelete(d.Container, filenames) - // Don't fail on ObjectNotFound because eventual consistency - // makes this situation normal. - if err != nil && err != swift.Forbidden && err != swift.ObjectNotFound { - if err == swift.ContainerNotFound { - return storagedriver.PathNotFoundError{Path: path} - } + + chunks, err := chunkFilenames(filenames, d.BulkDeleteMaxDeletes) + if err != nil { return err } + for _, chunk := range chunks { + _, err := d.Conn.BulkDelete(d.Container, chunk) + // Don't fail on ObjectNotFound because eventual consistency + // makes this situation normal. + if err != nil && err != swift.Forbidden && err != swift.ObjectNotFound { + if err == swift.ContainerNotFound { + return storagedriver.PathNotFoundError{Path: path} + } + return err + } + } } else { for _, obj := range objects { if err := d.Conn.ObjectDelete(d.Container, obj.Name); err != nil { @@ -712,6 +726,21 @@ func (d *driver) createManifest(path string, segments string) error { return nil } +func chunkFilenames(slice []string, maxSize int) (chunks [][]string, err error) { + if maxSize > 0 { + for offset := 0; offset < len(slice); offset += maxSize { + chunkSize := maxSize + if offset+chunkSize > len(slice) { + chunkSize = len(slice) - offset + } + chunks = append(chunks, slice[offset:offset+chunkSize]) + } + } else { + return nil, fmt.Errorf("Max chunk size must be > 0") + } + return +} + func parseManifest(manifest string) (container string, prefix string) { components := strings.SplitN(manifest, "/", 2) container = components[0] diff --git a/registry/storage/driver/swift/swift_test.go b/registry/storage/driver/swift/swift_test.go index b8c97fb4..dcd5e4ff 100644 --- a/registry/storage/driver/swift/swift_test.go +++ b/registry/storage/driver/swift/swift_test.go @@ -3,6 +3,7 @@ package swift import ( "io/ioutil" "os" + "reflect" "strconv" "strings" "testing" @@ -181,3 +182,64 @@ func TestEmptyRootList(t *testing.T) { t.Fatal("delete did not remove nested objects") } } + +func TestFilenameChunking(t *testing.T) { + // Test valid input and sizes + input := []string{"a", "b", "c", "d", "e"} + expecteds := [][][]string{ + { + {"a"}, + {"b"}, + {"c"}, + {"d"}, + {"e"}, + }, + { + {"a", "b"}, + {"c", "d"}, + {"e"}, + }, + { + {"a", "b", "c"}, + {"d", "e"}, + }, + { + {"a", "b", "c", "d"}, + {"e"}, + }, + { + {"a", "b", "c", "d", "e"}, + }, + { + {"a", "b", "c", "d", "e"}, + }, + } + for i, expected := range expecteds { + actual, err := chunkFilenames(input, i+1) + if !reflect.DeepEqual(actual, expected) { + t.Fatalf("chunk %v didn't match expected value %v", actual, expected) + } + if err != nil { + t.Fatalf("unexpected error chunking filenames: %v", err) + } + } + + // Test nil input + actual, err := chunkFilenames(nil, 5) + if len(actual) != 0 { + t.Fatal("chunks were returned when passed nil") + } + if err != nil { + t.Fatalf("unexpected error chunking filenames: %v", err) + } + + // Test 0 and < 0 sizes + actual, err = chunkFilenames(nil, 0) + if err == nil { + t.Fatal("expected error for size = 0") + } + actual, err = chunkFilenames(nil, -1) + if err == nil { + t.Fatal("expected error for size = -1") + } +}