fixed s3 Delete bug due to read-after-delete inconsistency

Signed-off-by: Josh Chorlton <josh.chorlton@docker.com>
This commit is contained in:
Josh Chorlton 2016-06-27 17:39:25 -07:00
parent d5441ca506
commit 1c5cb12745
2 changed files with 66 additions and 16 deletions

View file

@ -561,45 +561,62 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e
return d.Delete(ctx, sourcePath) return d.Delete(ctx, sourcePath)
} }
func min(a, b int) int {
if a < b {
return a
}
return b
}
// Delete recursively deletes all objects stored at "path" and its subpaths. // Delete recursively deletes all objects stored at "path" and its subpaths.
// We must be careful since S3 does not guarantee read after delete consistency
func (d *driver) Delete(ctx context.Context, path string) error { func (d *driver) Delete(ctx context.Context, path string) error {
resp, err := d.S3.ListObjects(&s3.ListObjectsInput{ s3Objects := make([]*s3.ObjectIdentifier, 0, listMax)
listObjectsInput := &s3.ListObjectsInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)), Prefix: aws.String(d.s3Path(path)),
})
if err != nil || len(resp.Contents) == 0 {
return storagedriver.PathNotFoundError{Path: path}
} }
for {
// list all the objects
resp, err := d.S3.ListObjects(listObjectsInput)
s3Objects := make([]*s3.ObjectIdentifier, 0, listMax) // resp.Contents can only be empty on the first call
// if there were no more results to return after the first call, resp.IsTruncated would have been false
// and the loop would be exited without recalling ListObjects
if err != nil || len(resp.Contents) == 0 {
return storagedriver.PathNotFoundError{Path: path}
}
for len(resp.Contents) > 0 {
for _, key := range resp.Contents { for _, key := range resp.Contents {
s3Objects = append(s3Objects, &s3.ObjectIdentifier{ s3Objects = append(s3Objects, &s3.ObjectIdentifier{
Key: key.Key, Key: key.Key,
}) })
} }
// resp.Contents must have at least one element or we would have returned not found
listObjectsInput.Marker = resp.Contents[len(resp.Contents)-1].Key
// from the s3 api docs, IsTruncated "specifies whether (true) or not (false) all of the results were returned"
// if everything has been returned, break
if resp.IsTruncated == nil || !*resp.IsTruncated {
break
}
}
// need to chunk objects into groups of 1000 per s3 restrictions
total := len(s3Objects)
for i := 0; i < total; i += 1000 {
_, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{ _, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Delete: &s3.Delete{ Delete: &s3.Delete{
Objects: s3Objects, Objects: s3Objects[i:min(i+1000, total)],
Quiet: aws.Bool(false), Quiet: aws.Bool(false),
}, },
}) })
if err != nil {
return nil
}
resp, err = d.S3.ListObjects(&s3.ListObjectsInput{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
})
if err != nil { if err != nil {
return err return err
} }
} }
return nil return nil
} }

View file

@ -203,3 +203,36 @@ func TestStorageClass(t *testing.T) {
} }
} }
func TestOverThousandBlobs(t *testing.T) {
if skipS3() != "" {
t.Skip(skipS3())
}
rootDir, err := ioutil.TempDir("", "driver-")
if err != nil {
t.Fatalf("unexpected error creating temporary directory: %v", err)
}
defer os.Remove(rootDir)
standardDriver, err := s3DriverConstructor(rootDir, s3.StorageClassStandard)
if err != nil {
t.Fatalf("unexpected error creating driver with standard storage: %v", err)
}
ctx := context.Background()
for i := 0; i < 1005; i++ {
filename := "/thousandfiletest/file" + strconv.Itoa(i)
contents := []byte("contents")
err = standardDriver.PutContent(ctx, filename, contents)
if err != nil {
t.Fatalf("unexpected error creating content: %v", err)
}
}
// cant actually verify deletion because read-after-delete is inconsistent, but can ensure no errors
err = standardDriver.Delete(ctx, "/thousandfiletest")
if err != nil {
t.Fatalf("unexpected error deleting thousand files: %v", err)
}
}