diff --git a/docs/configuration.md b/docs/configuration.md index 1f82e854d..01e231346 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -99,6 +99,9 @@ information about each option that appears later in this page. secure: true v4auth: true chunksize: 5242880 + multipartcopychunksize: 33554432 + multipartcopymaxconcurrency: 100 + multipartcopythresholdsize: 33554432 rootdirectory: /s3/object/name/prefix swift: username: username @@ -382,6 +385,9 @@ Permitted values are `error`, `warn`, `info` and `debug`. The default is secure: true v4auth: true chunksize: 5242880 + multipartcopychunksize: 33554432 + multipartcopymaxconcurrency: 100 + multipartcopythresholdsize: 33554432 rootdirectory: /s3/object/name/prefix swift: username: username diff --git a/docs/storage-drivers/s3.md b/docs/storage-drivers/s3.md index 32f199fc0..6bcb8a20d 100644 --- a/docs/storage-drivers/s3.md +++ b/docs/storage-drivers/s3.md @@ -138,6 +138,42 @@ An implementation of the `storagedriver.StorageDriver` interface which uses Amaz should be a number that is larger than 5*1024*1024. + + + multipartcopychunksize + + + no + + + Chunk size for all but the last Upload Part - Copy + operation of a copy that uses the multipart upload API. + + + + + multipartcopymaxconcurrency + + + no + + + Maximum number of concurrent Upload Part - Copy operations for a + copy that uses the multipart upload API. + + + + + multipartcopythresholdsize + + + no + + + Objects above this size will be copied using the multipart upload API. + PUT Object - Copy is used for objects at or below this size. + + rootdirectory diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go index 6c1c3370a..9d2cac684 100644 --- a/registry/storage/driver/s3-aws/s3.go +++ b/registry/storage/driver/s3-aws/s3.go @@ -16,6 +16,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "net/http" "reflect" "sort" @@ -45,8 +46,27 @@ const driverName = "s3aws" // S3 API requires multipart upload chunks to be at least 5MB const minChunkSize = 5 << 20 +// maxChunkSize defines the maximum multipart upload chunk size allowed by S3. +const maxChunkSize = 5 << 30 + const defaultChunkSize = 2 * minChunkSize +const ( + // defaultMultipartCopyChunkSize defines the default chunk size for all + // but the last Upload Part - Copy operation of a multipart copy. + // Empirically, 32 MB is optimal. + defaultMultipartCopyChunkSize = 32 << 20 + + // defaultMultipartCopyMaxConcurrency defines the default maximum number + // of concurrent Upload Part - Copy operations for a multipart copy. + defaultMultipartCopyMaxConcurrency = 100 + + // defaultMultipartCopyThresholdSize defines the default object size + // above which multipart copy will be used. (PUT Object - Copy is used + // for objects at or below this size.) Empirically, 32 MB is optimal. + defaultMultipartCopyThresholdSize = 32 << 20 +) + // listMax is the largest amount of objects you can request from S3 in a list call const listMax = 1000 @@ -58,19 +78,22 @@ var validObjectAcls = map[string]struct{}{} //DriverParameters A struct that encapsulates all of the driver parameters after all values have been set type DriverParameters struct { - AccessKey string - SecretKey string - Bucket string - Region string - RegionEndpoint string - Encrypt bool - KeyID string - Secure bool - ChunkSize int64 - RootDirectory string - StorageClass string - UserAgent string - ObjectAcl string + AccessKey string + SecretKey string + Bucket string + Region string + RegionEndpoint string + Encrypt bool + KeyID string + Secure bool + ChunkSize int64 + MultipartCopyChunkSize int64 + MultipartCopyMaxConcurrency int64 + MultipartCopyThresholdSize int64 + RootDirectory string + StorageClass string + UserAgent string + ObjectAcl string } func init() { @@ -116,14 +139,17 @@ func (factory *s3DriverFactory) Create(parameters map[string]interface{}) (stora } type driver struct { - S3 *s3.S3 - Bucket string - ChunkSize int64 - Encrypt bool - KeyID string - RootDirectory string - StorageClass string - ObjectAcl string + S3 *s3.S3 + Bucket string + ChunkSize int64 + Encrypt bool + KeyID string + MultipartCopyChunkSize int64 + MultipartCopyMaxConcurrency int64 + MultipartCopyThresholdSize int64 + RootDirectory string + StorageClass string + ObjectAcl string } type baseEmbed struct { @@ -217,27 +243,24 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) { keyID = "" } - chunkSize := int64(defaultChunkSize) - chunkSizeParam := parameters["chunksize"] - switch v := chunkSizeParam.(type) { - case string: - vv, err := strconv.ParseInt(v, 0, 64) - if err != nil { - return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam) - } - chunkSize = vv - case int64: - chunkSize = v - case int, uint, int32, uint32, uint64: - chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int() - case nil: - // do nothing - default: - return nil, fmt.Errorf("invalid value for chunksize: %#v", chunkSizeParam) + chunkSize, err := getParameterAsInt64(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize) + if err != nil { + return nil, err } - if chunkSize < minChunkSize { - return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize) + multipartCopyChunkSize, err := getParameterAsInt64(parameters, "multipartcopychunksize", defaultMultipartCopyChunkSize, minChunkSize, maxChunkSize) + if err != nil { + return nil, err + } + + multipartCopyMaxConcurrency, err := getParameterAsInt64(parameters, "multipartcopymaxconcurrency", defaultMultipartCopyMaxConcurrency, 1, math.MaxInt64) + if err != nil { + return nil, err + } + + multipartCopyThresholdSize, err := getParameterAsInt64(parameters, "multipartcopythresholdsize", defaultMultipartCopyThresholdSize, 0, maxChunkSize) + if err != nil { + return nil, err } rootDirectory := parameters["rootdirectory"] @@ -289,6 +312,9 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) { fmt.Sprint(keyID), secureBool, chunkSize, + multipartCopyChunkSize, + multipartCopyMaxConcurrency, + multipartCopyThresholdSize, fmt.Sprint(rootDirectory), storageClass, fmt.Sprint(userAgent), @@ -298,6 +324,35 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) { return New(params) } +// getParameterAsInt64 converts paramaters[name] to an int64 value (using +// defaultt if nil), verifies it is no smaller than min, and returns it. +func getParameterAsInt64(parameters map[string]interface{}, name string, defaultt int64, min int64, max int64) (int64, error) { + rv := defaultt + param := parameters[name] + switch v := param.(type) { + case string: + vv, err := strconv.ParseInt(v, 0, 64) + if err != nil { + return 0, fmt.Errorf("%s parameter must be an integer, %v invalid", name, param) + } + rv = vv + case int64: + rv = v + case int, uint, int32, uint32, uint64: + rv = reflect.ValueOf(v).Convert(reflect.TypeOf(rv)).Int() + case nil: + // do nothing + default: + return 0, fmt.Errorf("invalid value for %s: %#v", name, param) + } + + if rv < min || rv > max { + return 0, fmt.Errorf("The %s %#v parameter should be a number between %d and %d (inclusive)", name, rv, min, max) + } + + return rv, nil +} + // New constructs a new Driver with the given AWS credentials, region, encryption flag, and // bucketName func New(params DriverParameters) (*Driver, error) { @@ -346,14 +401,17 @@ func New(params DriverParameters) (*Driver, error) { // } d := &driver{ - S3: s3obj, - Bucket: params.Bucket, - ChunkSize: params.ChunkSize, - Encrypt: params.Encrypt, - KeyID: params.KeyID, - RootDirectory: params.RootDirectory, - StorageClass: params.StorageClass, - ObjectAcl: params.ObjectAcl, + S3: s3obj, + Bucket: params.Bucket, + ChunkSize: params.ChunkSize, + Encrypt: params.Encrypt, + KeyID: params.KeyID, + MultipartCopyChunkSize: params.MultipartCopyChunkSize, + MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency, + MultipartCopyThresholdSize: params.MultipartCopyThresholdSize, + RootDirectory: params.RootDirectory, + StorageClass: params.StorageClass, + ObjectAcl: params.ObjectAcl, } return &Driver{ @@ -565,21 +623,106 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) { // object. func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { /* This is terrible, but aws doesn't have an actual move. */ - _, err := d.S3.CopyObject(&s3.CopyObjectInput{ - Bucket: aws.String(d.Bucket), - Key: aws.String(d.s3Path(destPath)), - ContentType: d.getContentType(), - ACL: d.getACL(), - ServerSideEncryption: d.getEncryptionMode(), - SSEKMSKeyId: d.getSSEKMSKeyID(), - StorageClass: d.getStorageClass(), - CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)), - }) + if err := d.copy(ctx, sourcePath, destPath); err != nil { + return err + } + return d.Delete(ctx, sourcePath) +} + +// copy copies an object stored at sourcePath to destPath. +func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) error { + // S3 can copy objects up to 5 GB in size with a single PUT Object - Copy + // operation. For larger objects, the multipart upload API must be used. + // + // Empirically, multipart copy is fastest with 32 MB parts and is faster + // than PUT Object - Copy for objects larger than 32 MB. + + fileInfo, err := d.Stat(ctx, sourcePath) if err != nil { return parseError(sourcePath, err) } - return d.Delete(ctx, sourcePath) + if fileInfo.Size() <= d.MultipartCopyThresholdSize { + _, err := d.S3.CopyObject(&s3.CopyObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(destPath)), + ContentType: d.getContentType(), + ACL: d.getACL(), + ServerSideEncryption: d.getEncryptionMode(), + SSEKMSKeyId: d.getSSEKMSKeyID(), + StorageClass: d.getStorageClass(), + CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)), + }) + if err != nil { + return parseError(sourcePath, err) + } + return nil + } + + // Even in the worst case, a multipart copy should take no more + // than a few minutes, so 30 minutes is very conservative. + expires := time.Now().Add(time.Duration(30) * time.Minute) + createResp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(destPath)), + ContentType: d.getContentType(), + ACL: d.getACL(), + Expires: aws.Time(expires), + SSEKMSKeyId: d.getSSEKMSKeyID(), + ServerSideEncryption: d.getEncryptionMode(), + StorageClass: d.getStorageClass(), + }) + if err != nil { + return err + } + + numParts := (fileInfo.Size() + d.MultipartCopyChunkSize - 1) / d.MultipartCopyChunkSize + completedParts := make([]*s3.CompletedPart, numParts) + errChan := make(chan error, numParts) + limiter := make(chan struct{}, d.MultipartCopyMaxConcurrency) + + for i := range completedParts { + i := int64(i) + go func() { + limiter <- struct{}{} + firstByte := i * d.MultipartCopyChunkSize + lastByte := firstByte + d.MultipartCopyChunkSize - 1 + if lastByte >= fileInfo.Size() { + lastByte = fileInfo.Size() - 1 + } + uploadResp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{ + Bucket: aws.String(d.Bucket), + CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)), + Key: aws.String(d.s3Path(destPath)), + PartNumber: aws.Int64(i + 1), + UploadId: createResp.UploadId, + CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", firstByte, lastByte)), + }) + if err == nil { + completedParts[i] = &s3.CompletedPart{ + ETag: uploadResp.CopyPartResult.ETag, + PartNumber: aws.Int64(i + 1), + } + } + errChan <- err + <-limiter + }() + } + + for range completedParts { + err := <-errChan + if err != nil { + return err + } + } + + _, err = d.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(destPath)), + UploadId: createResp.UploadId, + MultipartUpload: &s3.CompletedMultipartUpload{Parts: completedParts}, + }) + return err } func min(a, b int) int { diff --git a/registry/storage/driver/s3-aws/s3_test.go b/registry/storage/driver/s3-aws/s3_test.go index e1df500fa..a49c21c78 100644 --- a/registry/storage/driver/s3-aws/s3_test.go +++ b/registry/storage/driver/s3-aws/s3_test.go @@ -1,19 +1,21 @@ package s3 import ( + "bytes" "io/ioutil" + "math/rand" "os" "strconv" "testing" + "gopkg.in/check.v1" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" "github.com/docker/distribution/context" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/testsuites" - - "gopkg.in/check.v1" ) // Hook up gocheck into the "go test" runner. @@ -65,6 +67,9 @@ func init() { keyID, secureBool, minChunkSize, + defaultMultipartCopyChunkSize, + defaultMultipartCopyMaxConcurrency, + defaultMultipartCopyThresholdSize, rootDirectory, storageClass, driverName + "-test", @@ -238,3 +243,57 @@ func TestOverThousandBlobs(t *testing.T) { t.Fatalf("unexpected error deleting thousand files: %v", err) } } + +func TestMoveWithMultipartCopy(t *testing.T) { + if skipS3() != "" { + t.Skip(skipS3()) + } + + rootDir, err := ioutil.TempDir("", "driver-") + if err != nil { + t.Fatalf("unexpected error creating temporary directory: %v", err) + } + defer os.Remove(rootDir) + + d, err := s3DriverConstructor(rootDir, s3.StorageClassStandard) + if err != nil { + t.Fatalf("unexpected error creating driver: %v", err) + } + + ctx := context.Background() + sourcePath := "/source" + destPath := "/dest" + + defer d.Delete(ctx, sourcePath) + defer d.Delete(ctx, destPath) + + // An object larger than d's MultipartCopyThresholdSize will cause d.Move() to perform a multipart copy. + multipartCopyThresholdSize := d.baseEmbed.Base.StorageDriver.(*driver).MultipartCopyThresholdSize + contents := make([]byte, 2*multipartCopyThresholdSize) + rand.Read(contents) + + err = d.PutContent(ctx, sourcePath, contents) + if err != nil { + t.Fatalf("unexpected error creating content: %v", err) + } + + err = d.Move(ctx, sourcePath, destPath) + if err != nil { + t.Fatalf("unexpected error moving file: %v", err) + } + + received, err := d.GetContent(ctx, destPath) + if err != nil { + t.Fatalf("unexpected error getting content: %v", err) + } + if !bytes.Equal(contents, received) { + t.Fatal("content differs") + } + + _, err = d.GetContent(ctx, sourcePath) + switch err.(type) { + case storagedriver.PathNotFoundError: + default: + t.Fatalf("unexpected error getting content: %v", err) + } +}