Add code to handle pagination of parts. Fixes max layer size of 10GB bug

This is a back-port of https://github.com/distribution/distribution/pull/2815

Signed-off-by: Flavian Missi <fmissi@redhat.com>
This commit is contained in:
Flavian Missi 2023-02-14 12:45:53 +01:00
parent 212b38ed22
commit 4c0001ed53

View file

@ -82,7 +82,7 @@ var validRegions = map[string]struct{}{}
// validObjectACLs contains known s3 object Acls
var validObjectACLs = map[string]struct{}{}
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
// DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
type DriverParameters struct {
AccessKey string
SecretKey string
@ -536,7 +536,6 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read
Key: aws.String(d.s3Path(path)),
Range: aws.String("bytes=" + strconv.FormatInt(offset, 10) + "-"),
})
if err != nil {
if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "InvalidRange" {
return ioutil.NopCloser(bytes.NewReader(nil)), nil
@ -549,9 +548,9 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read
// Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit.
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (storagedriver.FileWriter, error) {
key := d.s3Path(path)
if !append {
if !appendParam {
// TODO (brianbland): cancel other uploads at this path
resp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
@ -575,6 +574,7 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged
return nil, parseError(path, err)
}
var allParts []*s3.Part
for _, multi := range resp.Uploads {
if key != *multi.Key {
continue
@ -587,11 +587,20 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged
if err != nil {
return nil, parseError(path, err)
}
var multiSize int64
for _, part := range resp.Parts {
multiSize += *part.Size
allParts = append(allParts, resp.Parts...)
for *resp.IsTruncated {
resp, err = d.S3.ListParts(&s3.ListPartsInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
UploadId: multi.UploadId,
PartNumberMarker: resp.NextPartNumberMarker,
})
if err != nil {
return nil, parseError(path, err)
}
allParts = append(allParts, resp.Parts...)
}
return d.newWriter(key, *multi.UploadId, resp.Parts), nil
return d.newWriter(key, *multi.UploadId, allParts), nil
}
return nil, storagedriver.PathNotFoundError{Path: path}
}
@ -969,7 +978,6 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
ctx, done := dcontext.WithTrace(parentCtx)
defer done("s3aws.ListObjectsV2Pages(%s)", path)
listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool {
var count int64
// KeyCount was introduced with version 2 of the GET Bucket operation in S3.
// Some S3 implementations don't support V2 now, so we fall back to manual