s3: implement --s3-upload-cutoff for single part uploads below this - fixes #2772

Before this change rclone would use multipart uploads for any size of
file.  However multipart uploads are less efficient for smaller files
and don't have MD5 checksums so it is advantageous to use single part
uploads if possible.

This implements single part uploads for all files smaller than the
upload_cutoff size.  Streamed files must be uploaded as multipart
files though.
This commit is contained in:
Nick Craig-Wood 2018-11-26 21:09:23 +00:00
parent 0eba88bbfe
commit 198c34ce21
3 changed files with 208 additions and 59 deletions

View file

@ -544,12 +544,20 @@ doesn't copy the ACL from the source but rather writes a fresh one.`,
Value: "ONEZONE_IA", Value: "ONEZONE_IA",
Help: "One Zone Infrequent Access storage class", Help: "One Zone Infrequent Access storage class",
}}, }},
}, {
Name: "upload_cutoff",
Help: `Cutoff for switching to chunked upload
Any files larger than this will be uploaded in chunks of chunk_size.
The minimum is 0 and the maximum is 5GB.`,
Default: defaultUploadCutoff,
Advanced: true,
}, { }, {
Name: "chunk_size", Name: "chunk_size",
Help: `Chunk size to use for uploading. Help: `Chunk size to use for uploading.
Any files larger than this will be uploaded in chunks of this When uploading files larger than upload_cutoff they will be uploaded
size. The default is 5MB. The minimum is 5MB. as multipart uploads using this chunk size.
Note that "--s3-upload-concurrency" chunks of this size are buffered Note that "--s3-upload-concurrency" chunks of this size are buffered
in memory per transfer. in memory per transfer.
@ -607,14 +615,16 @@ Use this only if v4 signatures don't work, eg pre Jewel/v10 CEPH.`,
// Constants // Constants
const ( const (
metaMtime = "Mtime" // the meta key to store mtime in - eg X-Amz-Meta-Mtime metaMtime = "Mtime" // the meta key to store mtime in - eg X-Amz-Meta-Mtime
metaMD5Hash = "Md5chksum" // the meta key to store md5hash in metaMD5Hash = "Md5chksum" // the meta key to store md5hash in
listChunkSize = 1000 // number of items to read at once listChunkSize = 1000 // number of items to read at once
maxRetries = 10 // number of retries to make of operations maxRetries = 10 // number of retries to make of operations
maxSizeForCopy = 5 * 1024 * 1024 * 1024 // The maximum size of object we can COPY maxSizeForCopy = 5 * 1024 * 1024 * 1024 // The maximum size of object we can COPY
maxFileSize = 5 * 1024 * 1024 * 1024 * 1024 // largest possible upload file size maxFileSize = 5 * 1024 * 1024 * 1024 * 1024 // largest possible upload file size
minChunkSize = fs.SizeSuffix(s3manager.MinUploadPartSize) minChunkSize = fs.SizeSuffix(s3manager.MinUploadPartSize)
minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep. defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024)
maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024)
minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep.
) )
// Options defines the configuration for this backend // Options defines the configuration for this backend
@ -630,6 +640,7 @@ type Options struct {
ServerSideEncryption string `config:"server_side_encryption"` ServerSideEncryption string `config:"server_side_encryption"`
SSEKMSKeyID string `config:"sse_kms_key_id"` SSEKMSKeyID string `config:"sse_kms_key_id"`
StorageClass string `config:"storage_class"` StorageClass string `config:"storage_class"`
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
ChunkSize fs.SizeSuffix `config:"chunk_size"` ChunkSize fs.SizeSuffix `config:"chunk_size"`
DisableChecksum bool `config:"disable_checksum"` DisableChecksum bool `config:"disable_checksum"`
SessionToken string `config:"session_token"` SessionToken string `config:"session_token"`
@ -651,6 +662,7 @@ type Fs struct {
bucketOK bool // true if we have created the bucket bucketOK bool // true if we have created the bucket
bucketDeleted bool // true if we have deleted the bucket bucketDeleted bool // true if we have deleted the bucket
pacer *pacer.Pacer // To pace the API calls pacer *pacer.Pacer // To pace the API calls
srv *http.Client // a plain http client
} }
// Object describes a s3 object // Object describes a s3 object
@ -854,6 +866,21 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error)
return return
} }
func checkUploadCutoff(cs fs.SizeSuffix) error {
if cs > maxUploadCutoff {
return errors.Errorf("%s is greater than %s", cs, maxUploadCutoff)
}
return nil
}
func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
err = checkUploadCutoff(cs)
if err == nil {
old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
}
return
}
// NewFs constructs an Fs from the path, bucket:path // NewFs constructs an Fs from the path, bucket:path
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
// Parse config into Options struct // Parse config into Options struct
@ -866,6 +893,10 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
if err != nil { if err != nil {
return nil, errors.Wrap(err, "s3: chunk size") return nil, errors.Wrap(err, "s3: chunk size")
} }
err = checkUploadCutoff(opt.UploadCutoff)
if err != nil {
return nil, errors.Wrap(err, "s3: upload cutoff")
}
bucket, directory, err := s3ParsePath(root) bucket, directory, err := s3ParsePath(root)
if err != nil { if err != nil {
return nil, err return nil, err
@ -882,6 +913,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
bucket: bucket, bucket: bucket,
ses: ses, ses: ses,
pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.S3Pacer), pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.S3Pacer),
srv: fshttp.NewClient(fs.Config),
} }
f.features = (&fs.Features{ f.features = (&fs.Features{
ReadMimeType: true, ReadMimeType: true,
@ -1556,38 +1588,46 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
modTime := src.ModTime() modTime := src.ModTime()
size := src.Size() size := src.Size()
uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) { multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
u.Concurrency = o.fs.opt.UploadConcurrency var uploader *s3manager.Uploader
u.LeavePartsOnError = false if multipart {
u.S3 = o.fs.c uploader = s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) {
u.PartSize = int64(o.fs.opt.ChunkSize) u.Concurrency = o.fs.opt.UploadConcurrency
u.LeavePartsOnError = false
u.S3 = o.fs.c
u.PartSize = int64(o.fs.opt.ChunkSize)
if size == -1 { if size == -1 {
// Make parts as small as possible while still being able to upload to the // Make parts as small as possible while still being able to upload to the
// S3 file size limit. Rounded up to nearest MB. // S3 file size limit. Rounded up to nearest MB.
u.PartSize = (((maxFileSize / s3manager.MaxUploadParts) >> 20) + 1) << 20 u.PartSize = (((maxFileSize / s3manager.MaxUploadParts) >> 20) + 1) << 20
return return
} }
// Adjust PartSize until the number of parts is small enough. // Adjust PartSize until the number of parts is small enough.
if size/u.PartSize >= s3manager.MaxUploadParts { if size/u.PartSize >= s3manager.MaxUploadParts {
// Calculate partition size rounded up to the nearest MB // Calculate partition size rounded up to the nearest MB
u.PartSize = (((size / s3manager.MaxUploadParts) >> 20) + 1) << 20 u.PartSize = (((size / s3manager.MaxUploadParts) >> 20) + 1) << 20
} }
}) })
}
// Set the mtime in the meta data // Set the mtime in the meta data
metadata := map[string]*string{ metadata := map[string]*string{
metaMtime: aws.String(swift.TimeToFloatString(modTime)), metaMtime: aws.String(swift.TimeToFloatString(modTime)),
} }
if !o.fs.opt.DisableChecksum && size > uploader.PartSize { // read the md5sum if available for non multpart and if
// disable checksum isn't present.
var md5sum string
if !multipart || !o.fs.opt.DisableChecksum {
hash, err := src.Hash(hash.MD5) hash, err := src.Hash(hash.MD5)
if err == nil && matchMd5.MatchString(hash) { if err == nil && matchMd5.MatchString(hash) {
hashBytes, err := hex.DecodeString(hash) hashBytes, err := hex.DecodeString(hash)
if err == nil { if err == nil {
metadata[metaMD5Hash] = aws.String(base64.StdEncoding.EncodeToString(hashBytes)) md5sum = base64.StdEncoding.EncodeToString(hashBytes)
if multipart {
metadata[metaMD5Hash] = &md5sum
}
} }
} }
} }
@ -1596,30 +1636,98 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
mimeType := fs.MimeType(src) mimeType := fs.MimeType(src)
key := o.fs.root + o.remote key := o.fs.root + o.remote
req := s3manager.UploadInput{ if multipart {
Bucket: &o.fs.bucket, req := s3manager.UploadInput{
ACL: &o.fs.opt.ACL, Bucket: &o.fs.bucket,
Key: &key, ACL: &o.fs.opt.ACL,
Body: in, Key: &key,
ContentType: &mimeType, Body: in,
Metadata: metadata, ContentType: &mimeType,
//ContentLength: &size, Metadata: metadata,
} //ContentLength: &size,
if o.fs.opt.ServerSideEncryption != "" { }
req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption if o.fs.opt.ServerSideEncryption != "" {
} req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption
if o.fs.opt.SSEKMSKeyID != "" { }
req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID if o.fs.opt.SSEKMSKeyID != "" {
} req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID
if o.fs.opt.StorageClass != "" { }
req.StorageClass = &o.fs.opt.StorageClass if o.fs.opt.StorageClass != "" {
} req.StorageClass = &o.fs.opt.StorageClass
err = o.fs.pacer.CallNoRetry(func() (bool, error) { }
_, err = uploader.Upload(&req) err = o.fs.pacer.CallNoRetry(func() (bool, error) {
return shouldRetry(err) _, err = uploader.Upload(&req)
}) return shouldRetry(err)
if err != nil { })
return err if err != nil {
return err
}
} else {
req := s3.PutObjectInput{
Bucket: &o.fs.bucket,
ACL: &o.fs.opt.ACL,
Key: &key,
ContentType: &mimeType,
Metadata: metadata,
}
if md5sum != "" {
req.ContentMD5 = &md5sum
}
if o.fs.opt.ServerSideEncryption != "" {
req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption
}
if o.fs.opt.SSEKMSKeyID != "" {
req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID
}
if o.fs.opt.StorageClass != "" {
req.StorageClass = &o.fs.opt.StorageClass
}
// Create the request
putObj, _ := o.fs.c.PutObjectRequest(&req)
// Sign it so we can upload using a presigned request.
//
// Note the SDK doesn't currently support streaming to
// PutObject so we'll use this work-around.
url, headers, err := putObj.PresignRequest(15 * time.Minute)
if err != nil {
return errors.Wrap(err, "s3 upload: sign request")
}
// Set request to nil if empty so as not to make chunked encoding
if size == 0 {
in = nil
}
// create the vanilla http request
httpReq, err := http.NewRequest("PUT", url, in)
if err != nil {
return errors.Wrap(err, "s3 upload: new request")
}
// set the headers we signed and the length
httpReq.Header = headers
httpReq.ContentLength = size
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
resp, err := o.fs.srv.Do(httpReq)
if err != nil {
return shouldRetry(err)
}
body, err := rest.ReadBody(resp)
if err != nil {
return shouldRetry(err)
}
if resp.StatusCode >= 200 && resp.StatusCode < 299 {
return false, nil
}
err = errors.Errorf("s3 upload: %s: %s", resp.Status, body)
return fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err
})
if err != nil {
return err
}
} }
// Read the metadata from the newly created object // Read the metadata from the newly created object

View file

@ -23,4 +23,8 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadChunkSize(cs) return f.setUploadChunkSize(cs)
} }
func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadCutoff(cs)
}
var _ fstests.SetUploadChunkSizer = (*Fs)(nil) var _ fstests.SetUploadChunkSizer = (*Fs)(nil)

View file

@ -266,8 +266,33 @@ The modified time is stored as metadata on the object as
### Multipart uploads ### ### Multipart uploads ###
rclone supports multipart uploads with S3 which means that it can rclone supports multipart uploads with S3 which means that it can
upload files bigger than 5GB. Note that files uploaded *both* with upload files bigger than 5GB.
multipart upload *and* through crypt remotes do not have MD5 sums.
Note that files uploaded *both* with multipart upload *and* through
crypt remotes do not have MD5 sums.
Rclone switches from single part uploads to multipart uploads at the
point specified by `--s3-upload-cutoff`. This can be a maximum of 5GB
and a minimum of 0 (ie always upload mulipart files).
The chunk sizes used in the multipart upload are specified by
`--s3-chunk-size` and the number of chunks uploaded concurrently is
specified by `--s3-upload-concurrency`.
Multipart uploads will use `--transfers` * `--s3-upload-concurrency` *
`--s3-chunk-size` extra memory. Single part uploads to not use extra
memory.
Single part transfers can be faster than multipart transfers or slower
depending on your latency from S3 - the more latency, the more likely
single part transfers will be faster.
Increasing `--s3-upload-concurrency` will increase throughput (8 would
be a sensible value) and increasing `--s3-chunk-size` also increases
througput (16M would be sensible). Increasing either of these will
use more memory. The default values are high enough to gain most of
the possible performance without using too much memory.
### Buckets and Regions ### ### Buckets and Regions ###
@ -832,12 +857,24 @@ The storage class to use when storing new objects in S3.
Here are the advanced options specific to s3 (Amazon S3 Compliant Storage Providers (AWS, Ceph, Dreamhost, IBM COS, Minio)). Here are the advanced options specific to s3 (Amazon S3 Compliant Storage Providers (AWS, Ceph, Dreamhost, IBM COS, Minio)).
#### --s3-upload-cutoff
Cutoff for switching to chunked upload
Any files larger than this will be uploaded in chunks of chunk_size.
The minimum is 0 and the maximum is 5GB.
- Config: upload_cutoff
- Env Var: RCLONE_S3_UPLOAD_CUTOFF
- Type: SizeSuffix
- Default: 200M
#### --s3-chunk-size #### --s3-chunk-size
Chunk size to use for uploading. Chunk size to use for uploading.
Any files larger than this will be uploaded in chunks of this When uploading files larger than upload_cutoff they will be uploaded
size. The default is 5MB. The minimum is 5MB. as multipart uploads using this chunk size.
Note that "--s3-upload-concurrency" chunks of this size are buffered Note that "--s3-upload-concurrency" chunks of this size are buffered
in memory per transfer. in memory per transfer.