diff --git a/storagedriver/s3/README.md b/storagedriver/s3/README.md index c53b49479..fb0dd014a 100644 --- a/storagedriver/s3/README.md +++ b/storagedriver/s3/README.md @@ -21,4 +21,6 @@ An implementation of the `storagedriver.StorageDriver` interface which uses Amaz `v4auth`: (optional) Whether you would like to use aws signature version 4 with your requests. This defaults to true if not specified (note that the eu-central-1 region does not work with version 2 signatures, so the driver will error out if initialized with this region and v4auth set to false) +`chunksize`: (optional) The default part size for multipart uploads (performed by WriteStream) to s3. The default is 10 MB. Keep in mind that the minimum part size for s3 is 5MB. You might experience better performance for larger chunk sizes depending on the speed of your connection to s3. + `rootdirectory`: (optional) The root directory tree in which all registry files will be stored. Defaults to the empty string (bucket root). diff --git a/storagedriver/s3/s3.go b/storagedriver/s3/s3.go index f2b4c4172..c63b1d6b9 100644 --- a/storagedriver/s3/s3.go +++ b/storagedriver/s3/s3.go @@ -34,7 +34,9 @@ const driverName = "s3" // minChunkSize defines the minimum multipart upload chunk size // S3 API requires multipart upload chunks to be at least 5MB -const chunkSize = 5 * 1024 * 1024 +const minChunkSize = 5 << 20 + +const defaultChunkSize = 2 * minChunkSize // listMax is the largest amount of objects you can request from S3 in a list call const listMax = 1000 @@ -48,6 +50,7 @@ type DriverParameters struct { Encrypt bool Secure bool V4Auth bool + ChunkSize int64 RootDirectory string } @@ -67,8 +70,9 @@ func (factory *s3DriverFactory) Create(parameters map[string]interface{}) (stora type Driver struct { S3 *s3.S3 Bucket *s3.Bucket + ChunkSize int64 Encrypt bool - rootDirectory string + RootDirectory string } // FromParameters constructs a new Driver with a given parameters map @@ -132,6 +136,15 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) { } } + chunkSize := int64(defaultChunkSize) + chunkSizeParam, ok := parameters["chunksize"] + if ok { + chunkSize, ok = chunkSizeParam.(int64) + if !ok || chunkSize < minChunkSize { + return nil, fmt.Errorf("The chunksize parameter should be a number that is larger than 5*1024*1024") + } + } + rootDirectory, ok := parameters["rootdirectory"] if !ok { rootDirectory = "" @@ -145,6 +158,7 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) { encryptBool, secureBool, v4AuthBool, + chunkSize, fmt.Sprint(rootDirectory), } @@ -195,7 +209,12 @@ func New(params DriverParameters) (*Driver, error) { // } // } - return &Driver{s3obj, bucket, params.Encrypt, params.RootDirectory}, nil + return &Driver{ + S3: s3obj, + Bucket: bucket, + ChunkSize: params.ChunkSize, + Encrypt: params.Encrypt, + RootDirectory: params.RootDirectory}, nil } // Implement the storagedriver.StorageDriver interface @@ -273,8 +292,8 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total return 0, err } - buf := make([]byte, chunkSize) - zeroBuf := make([]byte, chunkSize) + buf := make([]byte, d.ChunkSize) + zeroBuf := make([]byte, d.ChunkSize) // We never want to leave a dangling multipart upload, our only consistent state is // when there is a whole object at path. This is in order to remain consistent with @@ -323,7 +342,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total // Fills from parameter to chunkSize from reader fromReader := func(from int64) error { bytesRead = 0 - for from+int64(bytesRead) < chunkSize { + for from+int64(bytesRead) < d.ChunkSize { nn, err := reader.Read(buf[from+int64(bytesRead):]) totalRead += int64(nn) bytesRead += nn @@ -364,7 +383,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total } if currentLength >= offset { - if offset < chunkSize { + if offset < d.ChunkSize { // chunkSize > currentLength >= offset if err = fromSmallCurrent(offset); err != nil { return totalRead, err @@ -374,7 +393,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total return totalRead, err } - if totalRead+offset < chunkSize { + if totalRead+offset < d.ChunkSize { return totalRead, nil } } else { @@ -407,23 +426,23 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total // Fills between parameters with 0s, making new parts fromZeroFillLarge := func(from, to int64) error { bytesRead64 := int64(0) - for to-(from+bytesRead64) >= chunkSize { + for to-(from+bytesRead64) >= d.ChunkSize { part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf)) if err != nil { return err } - bytesRead64 += chunkSize + bytesRead64 += d.ChunkSize parts = append(parts, part) partNumber++ } - return fromZeroFillSmall(0, (to-from)%chunkSize) + return fromZeroFillSmall(0, (to-from)%d.ChunkSize) } // currentLength < offset - if currentLength < chunkSize { - if offset < chunkSize { + if currentLength < d.ChunkSize { + if offset < d.ChunkSize { // chunkSize > offset > currentLength if err = fromSmallCurrent(currentLength); err != nil { return totalRead, err @@ -437,7 +456,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total return totalRead, err } - if totalRead+offset < chunkSize { + if totalRead+offset < d.ChunkSize { return totalRead, nil } } else { @@ -446,7 +465,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total return totalRead, err } - if err = fromZeroFillSmall(currentLength, chunkSize); err != nil { + if err = fromZeroFillSmall(currentLength, d.ChunkSize); err != nil { return totalRead, err } @@ -459,15 +478,15 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total partNumber++ //Zero fill from chunkSize up to offset, then some reader - if err = fromZeroFillLarge(chunkSize, offset); err != nil { + if err = fromZeroFillLarge(d.ChunkSize, offset); err != nil { return totalRead, err } - if err = fromReader(offset % chunkSize); err != nil { + if err = fromReader(offset % d.ChunkSize); err != nil { return totalRead, err } - if totalRead+(offset%chunkSize) < chunkSize { + if totalRead+(offset%d.ChunkSize) < d.ChunkSize { return totalRead, nil } } @@ -488,11 +507,11 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total return totalRead, err } - if err = fromReader((offset - currentLength) % chunkSize); err != nil { + if err = fromReader((offset - currentLength) % d.ChunkSize); err != nil { return totalRead, err } - if totalRead+((offset-currentLength)%chunkSize) < chunkSize { + if totalRead+((offset-currentLength)%d.ChunkSize) < d.ChunkSize { return totalRead, nil } } @@ -505,7 +524,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total return totalRead, err } - if int64(bytesRead) < chunkSize { + if int64(bytesRead) < d.ChunkSize { break } } @@ -670,7 +689,7 @@ func (d *Driver) URLFor(path string, options map[string]interface{}) (string, er } func (d *Driver) s3Path(path string) string { - return strings.TrimLeft(strings.TrimRight(d.rootDirectory, "/")+path, "/") + return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/") } func parseError(path string, err error) error { diff --git a/storagedriver/s3/s3_test.go b/storagedriver/s3/s3_test.go index f70080f22..830f99342 100644 --- a/storagedriver/s3/s3_test.go +++ b/storagedriver/s3/s3_test.go @@ -63,6 +63,7 @@ func init() { encryptBool, secureBool, v4AuthBool, + minChunkSize, root, }