Merge pull request #3519 from jtherin/mpu-paginate

fix: paginate through s3 multipart uploads
This commit is contained in:
João Pereira 2022-03-11 16:06:46 +00:00 committed by GitHub
commit 514cbd71be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -601,19 +601,31 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
} }
return d.newWriter(key, *resp.UploadId, nil), nil return d.newWriter(key, *resp.UploadId, nil), nil
} }
resp, err := d.S3.ListMultipartUploads(&s3.ListMultipartUploadsInput{
listMultipartUploadsInput := &s3.ListMultipartUploadsInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Prefix: aws.String(key), Prefix: aws.String(key),
}) }
for {
resp, err := d.S3.ListMultipartUploads(listMultipartUploadsInput)
if err != nil { if err != nil {
return nil, parseError(path, err) return nil, parseError(path, err)
} }
// resp.Uploads can only be empty on the first call
// if there were no more results to return after the first call, resp.IsTruncated would have been false
// and the loop would be exited without recalling ListMultipartUploads
if len(resp.Uploads) == 0 {
break
}
var allParts []*s3.Part var allParts []*s3.Part
for _, multi := range resp.Uploads { for _, multi := range resp.Uploads {
if key != *multi.Key { if key != *multi.Key {
continue continue
} }
resp, err := d.S3.ListParts(&s3.ListPartsInput{
partsList, err := d.S3.ListParts(&s3.ListPartsInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Key: aws.String(key), Key: aws.String(key),
UploadId: multi.UploadId, UploadId: multi.UploadId,
@ -621,21 +633,31 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
if err != nil { if err != nil {
return nil, parseError(path, err) return nil, parseError(path, err)
} }
allParts = append(allParts, resp.Parts...) allParts = append(allParts, partsList.Parts...)
for *resp.IsTruncated { for *resp.IsTruncated {
resp, err = d.S3.ListParts(&s3.ListPartsInput{ partsList, err = d.S3.ListParts(&s3.ListPartsInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Key: aws.String(key), Key: aws.String(key),
UploadId: multi.UploadId, UploadId: multi.UploadId,
PartNumberMarker: resp.NextPartNumberMarker, PartNumberMarker: partsList.NextPartNumberMarker,
}) })
if err != nil { if err != nil {
return nil, parseError(path, err) return nil, parseError(path, err)
} }
allParts = append(allParts, resp.Parts...) allParts = append(allParts, partsList.Parts...)
} }
return d.newWriter(key, *multi.UploadId, allParts), nil return d.newWriter(key, *multi.UploadId, allParts), nil
} }
// resp.NextUploadIdMarker must have at least one element or we would have returned not found
listMultipartUploadsInput.UploadIdMarker = resp.NextUploadIdMarker
// from the s3 api docs, IsTruncated "specifies whether (true) or not (false) all of the results were returned"
// if everything has been returned, break
if resp.IsTruncated == nil || !*resp.IsTruncated {
break
}
}
return nil, storagedriver.PathNotFoundError{Path: path} return nil, storagedriver.PathNotFoundError{Path: path}
} }