Use multipart upload API in S3 Move method

This change to the S3 Move method uses S3's multipart upload API to copy
objects whose size exceeds a threshold.  Parts are copied concurrently.
The level of concurrency, part size, and threshold are all configurable
with reasonable defaults.

Using the multipart upload API has two benefits.

* The S3 Move method can now handle objects over 5 GB, fixing #886.

* Moving most objects, and espectially large ones, is faster.  For
  example, moving a 1 GB object averaged 30 seconds but now averages 10.

Signed-off-by: Noah Treuhaft <noah.treuhaft@docker.com>
This commit is contained in:
Noah Treuhaft 2016-08-15 17:12:24 -07:00
parent c810308d1b
commit 63468ef4a8
4 changed files with 305 additions and 61 deletions

View file

@ -99,6 +99,9 @@ information about each option that appears later in this page.
secure: true secure: true
v4auth: true v4auth: true
chunksize: 5242880 chunksize: 5242880
multipartcopychunksize: 33554432
multipartcopymaxconcurrency: 100
multipartcopythresholdsize: 33554432
rootdirectory: /s3/object/name/prefix rootdirectory: /s3/object/name/prefix
swift: swift:
username: username username: username
@ -380,6 +383,9 @@ Permitted values are `error`, `warn`, `info` and `debug`. The default is
secure: true secure: true
v4auth: true v4auth: true
chunksize: 5242880 chunksize: 5242880
multipartcopychunksize: 33554432
multipartcopymaxconcurrency: 100
multipartcopythresholdsize: 33554432
rootdirectory: /s3/object/name/prefix rootdirectory: /s3/object/name/prefix
swift: swift:
username: username username: username

View file

@ -138,6 +138,42 @@ An implementation of the `storagedriver.StorageDriver` interface which uses Amaz
should be a number that is larger than 5*1024*1024. should be a number that is larger than 5*1024*1024.
</td> </td>
</tr> </tr>
<tr>
<td>
<code>multipartcopychunksize</code>
</td>
<td>
no
</td>
<td>
Chunk size for all but the last Upload Part - Copy
operation of a copy that uses the multipart upload API.
</td>
</tr>
<tr>
<td>
<code>multipartcopymaxconcurrency</code>
</td>
<td>
no
</td>
<td>
Maximum number of concurrent Upload Part - Copy operations for a
copy that uses the multipart upload API.
</td>
</tr>
<tr>
<td>
<code>multipartcopythresholdsize</code>
</td>
<td>
no
</td>
<td>
Objects above this size will be copied using the multipart upload API.
PUT Object - Copy is used for objects at or below this size.
</td>
</tr>
<tr> <tr>
<td> <td>
<code>rootdirectory</code> <code>rootdirectory</code>

View file

@ -16,6 +16,7 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math"
"net/http" "net/http"
"reflect" "reflect"
"sort" "sort"
@ -45,8 +46,27 @@ const driverName = "s3aws"
// S3 API requires multipart upload chunks to be at least 5MB // S3 API requires multipart upload chunks to be at least 5MB
const minChunkSize = 5 << 20 const minChunkSize = 5 << 20
// maxChunkSize defines the maximum multipart upload chunk size allowed by S3.
const maxChunkSize = 5 << 30
const defaultChunkSize = 2 * minChunkSize const defaultChunkSize = 2 * minChunkSize
const (
// defaultMultipartCopyChunkSize defines the default chunk size for all
// but the last Upload Part - Copy operation of a multipart copy.
// Empirically, 32 MB is optimal.
defaultMultipartCopyChunkSize = 32 << 20
// defaultMultipartCopyMaxConcurrency defines the default maximum number
// of concurrent Upload Part - Copy operations for a multipart copy.
defaultMultipartCopyMaxConcurrency = 100
// defaultMultipartCopyThresholdSize defines the default object size
// above which multipart copy will be used. (PUT Object - Copy is used
// for objects at or below this size.) Empirically, 32 MB is optimal.
defaultMultipartCopyThresholdSize = 32 << 20
)
// listMax is the largest amount of objects you can request from S3 in a list call // listMax is the largest amount of objects you can request from S3 in a list call
const listMax = 1000 const listMax = 1000
@ -67,6 +87,9 @@ type DriverParameters struct {
KeyID string KeyID string
Secure bool Secure bool
ChunkSize int64 ChunkSize int64
MultipartCopyChunkSize int64
MultipartCopyMaxConcurrency int64
MultipartCopyThresholdSize int64
RootDirectory string RootDirectory string
StorageClass string StorageClass string
UserAgent string UserAgent string
@ -121,6 +144,9 @@ type driver struct {
ChunkSize int64 ChunkSize int64
Encrypt bool Encrypt bool
KeyID string KeyID string
MultipartCopyChunkSize int64
MultipartCopyMaxConcurrency int64
MultipartCopyThresholdSize int64
RootDirectory string RootDirectory string
StorageClass string StorageClass string
ObjectAcl string ObjectAcl string
@ -217,27 +243,24 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
keyID = "" keyID = ""
} }
chunkSize := int64(defaultChunkSize) chunkSize, err := getParameterAsInt64(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize)
chunkSizeParam := parameters["chunksize"]
switch v := chunkSizeParam.(type) {
case string:
vv, err := strconv.ParseInt(v, 0, 64)
if err != nil { if err != nil {
return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam) return nil, err
}
chunkSize = vv
case int64:
chunkSize = v
case int, uint, int32, uint32, uint64:
chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int()
case nil:
// do nothing
default:
return nil, fmt.Errorf("invalid value for chunksize: %#v", chunkSizeParam)
} }
if chunkSize < minChunkSize { multipartCopyChunkSize, err := getParameterAsInt64(parameters, "multipartcopychunksize", defaultMultipartCopyChunkSize, minChunkSize, maxChunkSize)
return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize) if err != nil {
return nil, err
}
multipartCopyMaxConcurrency, err := getParameterAsInt64(parameters, "multipartcopymaxconcurrency", defaultMultipartCopyMaxConcurrency, 1, math.MaxInt64)
if err != nil {
return nil, err
}
multipartCopyThresholdSize, err := getParameterAsInt64(parameters, "multipartcopythresholdsize", defaultMultipartCopyThresholdSize, 0, maxChunkSize)
if err != nil {
return nil, err
} }
rootDirectory := parameters["rootdirectory"] rootDirectory := parameters["rootdirectory"]
@ -289,6 +312,9 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
fmt.Sprint(keyID), fmt.Sprint(keyID),
secureBool, secureBool,
chunkSize, chunkSize,
multipartCopyChunkSize,
multipartCopyMaxConcurrency,
multipartCopyThresholdSize,
fmt.Sprint(rootDirectory), fmt.Sprint(rootDirectory),
storageClass, storageClass,
fmt.Sprint(userAgent), fmt.Sprint(userAgent),
@ -298,6 +324,35 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
return New(params) return New(params)
} }
// getParameterAsInt64 converts paramaters[name] to an int64 value (using
// defaultt if nil), verifies it is no smaller than min, and returns it.
func getParameterAsInt64(parameters map[string]interface{}, name string, defaultt int64, min int64, max int64) (int64, error) {
rv := defaultt
param := parameters[name]
switch v := param.(type) {
case string:
vv, err := strconv.ParseInt(v, 0, 64)
if err != nil {
return 0, fmt.Errorf("%s parameter must be an integer, %v invalid", name, param)
}
rv = vv
case int64:
rv = v
case int, uint, int32, uint32, uint64:
rv = reflect.ValueOf(v).Convert(reflect.TypeOf(rv)).Int()
case nil:
// do nothing
default:
return 0, fmt.Errorf("invalid value for %s: %#v", name, param)
}
if rv < min || rv > max {
return 0, fmt.Errorf("The %s %#v parameter should be a number between %d and %d (inclusive)", name, rv, min, max)
}
return rv, nil
}
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and // New constructs a new Driver with the given AWS credentials, region, encryption flag, and
// bucketName // bucketName
func New(params DriverParameters) (*Driver, error) { func New(params DriverParameters) (*Driver, error) {
@ -351,6 +406,9 @@ func New(params DriverParameters) (*Driver, error) {
ChunkSize: params.ChunkSize, ChunkSize: params.ChunkSize,
Encrypt: params.Encrypt, Encrypt: params.Encrypt,
KeyID: params.KeyID, KeyID: params.KeyID,
MultipartCopyChunkSize: params.MultipartCopyChunkSize,
MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency,
MultipartCopyThresholdSize: params.MultipartCopyThresholdSize,
RootDirectory: params.RootDirectory, RootDirectory: params.RootDirectory,
StorageClass: params.StorageClass, StorageClass: params.StorageClass,
ObjectAcl: params.ObjectAcl, ObjectAcl: params.ObjectAcl,
@ -565,6 +623,26 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
// object. // object.
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
/* This is terrible, but aws doesn't have an actual move. */ /* This is terrible, but aws doesn't have an actual move. */
if err := d.copy(ctx, sourcePath, destPath); err != nil {
return err
}
return d.Delete(ctx, sourcePath)
}
// copy copies an object stored at sourcePath to destPath.
func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) error {
// S3 can copy objects up to 5 GB in size with a single PUT Object - Copy
// operation. For larger objects, the multipart upload API must be used.
//
// Empirically, multipart copy is fastest with 32 MB parts and is faster
// than PUT Object - Copy for objects larger than 32 MB.
fileInfo, err := d.Stat(ctx, sourcePath)
if err != nil {
return parseError(sourcePath, err)
}
if fileInfo.Size() <= d.MultipartCopyThresholdSize {
_, err := d.S3.CopyObject(&s3.CopyObjectInput{ _, err := d.S3.CopyObject(&s3.CopyObjectInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)), Key: aws.String(d.s3Path(destPath)),
@ -578,8 +656,73 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e
if err != nil { if err != nil {
return parseError(sourcePath, err) return parseError(sourcePath, err)
} }
return nil
}
return d.Delete(ctx, sourcePath) // Even in the worst case, a multipart copy should take no more
// than a few minutes, so 30 minutes is very conservative.
expires := time.Now().Add(time.Duration(30) * time.Minute)
createResp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
ACL: d.getACL(),
Expires: aws.Time(expires),
SSEKMSKeyId: d.getSSEKMSKeyID(),
ServerSideEncryption: d.getEncryptionMode(),
StorageClass: d.getStorageClass(),
})
if err != nil {
return err
}
numParts := (fileInfo.Size() + d.MultipartCopyChunkSize - 1) / d.MultipartCopyChunkSize
completedParts := make([]*s3.CompletedPart, numParts)
errChan := make(chan error, numParts)
limiter := make(chan struct{}, d.MultipartCopyMaxConcurrency)
for i := range completedParts {
i := int64(i)
go func() {
limiter <- struct{}{}
firstByte := i * d.MultipartCopyChunkSize
lastByte := firstByte + d.MultipartCopyChunkSize - 1
if lastByte >= fileInfo.Size() {
lastByte = fileInfo.Size() - 1
}
uploadResp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{
Bucket: aws.String(d.Bucket),
CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
Key: aws.String(d.s3Path(destPath)),
PartNumber: aws.Int64(i + 1),
UploadId: createResp.UploadId,
CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", firstByte, lastByte)),
})
if err == nil {
completedParts[i] = &s3.CompletedPart{
ETag: uploadResp.CopyPartResult.ETag,
PartNumber: aws.Int64(i + 1),
}
}
errChan <- err
<-limiter
}()
}
for range completedParts {
err := <-errChan
if err != nil {
return err
}
}
_, err = d.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
UploadId: createResp.UploadId,
MultipartUpload: &s3.CompletedMultipartUpload{Parts: completedParts},
})
return err
} }
func min(a, b int) int { func min(a, b int) int {

View file

@ -1,19 +1,21 @@
package s3 package s3
import ( import (
"bytes"
"io/ioutil" "io/ioutil"
"math/rand"
"os" "os"
"strconv" "strconv"
"testing" "testing"
"gopkg.in/check.v1"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/docker/distribution/context" "github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/testsuites" "github.com/docker/distribution/registry/storage/driver/testsuites"
"gopkg.in/check.v1"
) )
// Hook up gocheck into the "go test" runner. // Hook up gocheck into the "go test" runner.
@ -65,6 +67,9 @@ func init() {
keyID, keyID,
secureBool, secureBool,
minChunkSize, minChunkSize,
defaultMultipartCopyChunkSize,
defaultMultipartCopyMaxConcurrency,
defaultMultipartCopyThresholdSize,
rootDirectory, rootDirectory,
storageClass, storageClass,
driverName + "-test", driverName + "-test",
@ -238,3 +243,57 @@ func TestOverThousandBlobs(t *testing.T) {
t.Fatalf("unexpected error deleting thousand files: %v", err) t.Fatalf("unexpected error deleting thousand files: %v", err)
} }
} }
func TestMoveWithMultipartCopy(t *testing.T) {
if skipS3() != "" {
t.Skip(skipS3())
}
rootDir, err := ioutil.TempDir("", "driver-")
if err != nil {
t.Fatalf("unexpected error creating temporary directory: %v", err)
}
defer os.Remove(rootDir)
d, err := s3DriverConstructor(rootDir, s3.StorageClassStandard)
if err != nil {
t.Fatalf("unexpected error creating driver: %v", err)
}
ctx := context.Background()
sourcePath := "/source"
destPath := "/dest"
defer d.Delete(ctx, sourcePath)
defer d.Delete(ctx, destPath)
// An object larger than d's MultipartCopyThresholdSize will cause d.Move() to perform a multipart copy.
multipartCopyThresholdSize := d.baseEmbed.Base.StorageDriver.(*driver).MultipartCopyThresholdSize
contents := make([]byte, 2*multipartCopyThresholdSize)
rand.Read(contents)
err = d.PutContent(ctx, sourcePath, contents)
if err != nil {
t.Fatalf("unexpected error creating content: %v", err)
}
err = d.Move(ctx, sourcePath, destPath)
if err != nil {
t.Fatalf("unexpected error moving file: %v", err)
}
received, err := d.GetContent(ctx, destPath)
if err != nil {
t.Fatalf("unexpected error getting content: %v", err)
}
if !bytes.Equal(contents, received) {
t.Fatal("content differs")
}
_, err = d.GetContent(ctx, sourcePath)
switch err.(type) {
case storagedriver.PathNotFoundError:
default:
t.Fatalf("unexpected error getting content: %v", err)
}
}