diff --git a/backend/s3/s3.go b/backend/s3/s3.go index fdd5ba6fe..ce16580d0 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -544,12 +544,20 @@ doesn't copy the ACL from the source but rather writes a fresh one.`, Value: "ONEZONE_IA", Help: "One Zone Infrequent Access storage class", }}, + }, { + Name: "upload_cutoff", + Help: `Cutoff for switching to chunked upload + +Any files larger than this will be uploaded in chunks of chunk_size. +The minimum is 0 and the maximum is 5GB.`, + Default: defaultUploadCutoff, + Advanced: true, }, { Name: "chunk_size", Help: `Chunk size to use for uploading. -Any files larger than this will be uploaded in chunks of this -size. The default is 5MB. The minimum is 5MB. +When uploading files larger than upload_cutoff they will be uploaded +as multipart uploads using this chunk size. Note that "--s3-upload-concurrency" chunks of this size are buffered in memory per transfer. @@ -607,14 +615,16 @@ Use this only if v4 signatures don't work, eg pre Jewel/v10 CEPH.`, // Constants const ( - metaMtime = "Mtime" // the meta key to store mtime in - eg X-Amz-Meta-Mtime - metaMD5Hash = "Md5chksum" // the meta key to store md5hash in - listChunkSize = 1000 // number of items to read at once - maxRetries = 10 // number of retries to make of operations - maxSizeForCopy = 5 * 1024 * 1024 * 1024 // The maximum size of object we can COPY - maxFileSize = 5 * 1024 * 1024 * 1024 * 1024 // largest possible upload file size - minChunkSize = fs.SizeSuffix(s3manager.MinUploadPartSize) - minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep. + metaMtime = "Mtime" // the meta key to store mtime in - eg X-Amz-Meta-Mtime + metaMD5Hash = "Md5chksum" // the meta key to store md5hash in + listChunkSize = 1000 // number of items to read at once + maxRetries = 10 // number of retries to make of operations + maxSizeForCopy = 5 * 1024 * 1024 * 1024 // The maximum size of object we can COPY + maxFileSize = 5 * 1024 * 1024 * 1024 * 1024 // largest possible upload file size + minChunkSize = fs.SizeSuffix(s3manager.MinUploadPartSize) + defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024) + maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024) + minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep. ) // Options defines the configuration for this backend @@ -630,6 +640,7 @@ type Options struct { ServerSideEncryption string `config:"server_side_encryption"` SSEKMSKeyID string `config:"sse_kms_key_id"` StorageClass string `config:"storage_class"` + UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` ChunkSize fs.SizeSuffix `config:"chunk_size"` DisableChecksum bool `config:"disable_checksum"` SessionToken string `config:"session_token"` @@ -651,6 +662,7 @@ type Fs struct { bucketOK bool // true if we have created the bucket bucketDeleted bool // true if we have deleted the bucket pacer *pacer.Pacer // To pace the API calls + srv *http.Client // a plain http client } // Object describes a s3 object @@ -854,6 +866,21 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) return } +func checkUploadCutoff(cs fs.SizeSuffix) error { + if cs > maxUploadCutoff { + return errors.Errorf("%s is greater than %s", cs, maxUploadCutoff) + } + return nil +} + +func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadCutoff(cs) + if err == nil { + old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs + } + return +} + // NewFs constructs an Fs from the path, bucket:path func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { // Parse config into Options struct @@ -866,6 +893,10 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, errors.Wrap(err, "s3: chunk size") } + err = checkUploadCutoff(opt.UploadCutoff) + if err != nil { + return nil, errors.Wrap(err, "s3: upload cutoff") + } bucket, directory, err := s3ParsePath(root) if err != nil { return nil, err @@ -882,6 +913,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { bucket: bucket, ses: ses, pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.S3Pacer), + srv: fshttp.NewClient(fs.Config), } f.features = (&fs.Features{ ReadMimeType: true, @@ -1556,38 +1588,46 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio modTime := src.ModTime() size := src.Size() - uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) { - u.Concurrency = o.fs.opt.UploadConcurrency - u.LeavePartsOnError = false - u.S3 = o.fs.c - u.PartSize = int64(o.fs.opt.ChunkSize) + multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff) + var uploader *s3manager.Uploader + if multipart { + uploader = s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) { + u.Concurrency = o.fs.opt.UploadConcurrency + u.LeavePartsOnError = false + u.S3 = o.fs.c + u.PartSize = int64(o.fs.opt.ChunkSize) - if size == -1 { - // Make parts as small as possible while still being able to upload to the - // S3 file size limit. Rounded up to nearest MB. - u.PartSize = (((maxFileSize / s3manager.MaxUploadParts) >> 20) + 1) << 20 - return - } - // Adjust PartSize until the number of parts is small enough. - if size/u.PartSize >= s3manager.MaxUploadParts { - // Calculate partition size rounded up to the nearest MB - u.PartSize = (((size / s3manager.MaxUploadParts) >> 20) + 1) << 20 - } - }) + if size == -1 { + // Make parts as small as possible while still being able to upload to the + // S3 file size limit. Rounded up to nearest MB. + u.PartSize = (((maxFileSize / s3manager.MaxUploadParts) >> 20) + 1) << 20 + return + } + // Adjust PartSize until the number of parts is small enough. + if size/u.PartSize >= s3manager.MaxUploadParts { + // Calculate partition size rounded up to the nearest MB + u.PartSize = (((size / s3manager.MaxUploadParts) >> 20) + 1) << 20 + } + }) + } // Set the mtime in the meta data metadata := map[string]*string{ metaMtime: aws.String(swift.TimeToFloatString(modTime)), } - if !o.fs.opt.DisableChecksum && size > uploader.PartSize { + // read the md5sum if available for non multpart and if + // disable checksum isn't present. + var md5sum string + if !multipart || !o.fs.opt.DisableChecksum { hash, err := src.Hash(hash.MD5) - if err == nil && matchMd5.MatchString(hash) { hashBytes, err := hex.DecodeString(hash) - if err == nil { - metadata[metaMD5Hash] = aws.String(base64.StdEncoding.EncodeToString(hashBytes)) + md5sum = base64.StdEncoding.EncodeToString(hashBytes) + if multipart { + metadata[metaMD5Hash] = &md5sum + } } } } @@ -1596,30 +1636,98 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio mimeType := fs.MimeType(src) key := o.fs.root + o.remote - req := s3manager.UploadInput{ - Bucket: &o.fs.bucket, - ACL: &o.fs.opt.ACL, - Key: &key, - Body: in, - ContentType: &mimeType, - Metadata: metadata, - //ContentLength: &size, - } - if o.fs.opt.ServerSideEncryption != "" { - req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption - } - if o.fs.opt.SSEKMSKeyID != "" { - req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID - } - if o.fs.opt.StorageClass != "" { - req.StorageClass = &o.fs.opt.StorageClass - } - err = o.fs.pacer.CallNoRetry(func() (bool, error) { - _, err = uploader.Upload(&req) - return shouldRetry(err) - }) - if err != nil { - return err + if multipart { + req := s3manager.UploadInput{ + Bucket: &o.fs.bucket, + ACL: &o.fs.opt.ACL, + Key: &key, + Body: in, + ContentType: &mimeType, + Metadata: metadata, + //ContentLength: &size, + } + if o.fs.opt.ServerSideEncryption != "" { + req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption + } + if o.fs.opt.SSEKMSKeyID != "" { + req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID + } + if o.fs.opt.StorageClass != "" { + req.StorageClass = &o.fs.opt.StorageClass + } + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + _, err = uploader.Upload(&req) + return shouldRetry(err) + }) + if err != nil { + return err + } + } else { + req := s3.PutObjectInput{ + Bucket: &o.fs.bucket, + ACL: &o.fs.opt.ACL, + Key: &key, + ContentType: &mimeType, + Metadata: metadata, + } + if md5sum != "" { + req.ContentMD5 = &md5sum + } + if o.fs.opt.ServerSideEncryption != "" { + req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption + } + if o.fs.opt.SSEKMSKeyID != "" { + req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID + } + if o.fs.opt.StorageClass != "" { + req.StorageClass = &o.fs.opt.StorageClass + } + + // Create the request + putObj, _ := o.fs.c.PutObjectRequest(&req) + + // Sign it so we can upload using a presigned request. + // + // Note the SDK doesn't currently support streaming to + // PutObject so we'll use this work-around. + url, headers, err := putObj.PresignRequest(15 * time.Minute) + if err != nil { + return errors.Wrap(err, "s3 upload: sign request") + } + + // Set request to nil if empty so as not to make chunked encoding + if size == 0 { + in = nil + } + + // create the vanilla http request + httpReq, err := http.NewRequest("PUT", url, in) + if err != nil { + return errors.Wrap(err, "s3 upload: new request") + } + + // set the headers we signed and the length + httpReq.Header = headers + httpReq.ContentLength = size + + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + resp, err := o.fs.srv.Do(httpReq) + if err != nil { + return shouldRetry(err) + } + body, err := rest.ReadBody(resp) + if err != nil { + return shouldRetry(err) + } + if resp.StatusCode >= 200 && resp.StatusCode < 299 { + return false, nil + } + err = errors.Errorf("s3 upload: %s: %s", resp.Status, body) + return fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err + }) + if err != nil { + return err + } } // Read the metadata from the newly created object diff --git a/backend/s3/s3_test.go b/backend/s3/s3_test.go index c544192b4..1b9a4045b 100644 --- a/backend/s3/s3_test.go +++ b/backend/s3/s3_test.go @@ -23,4 +23,8 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) { return f.setUploadChunkSize(cs) } +func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadCutoff(cs) +} + var _ fstests.SetUploadChunkSizer = (*Fs)(nil) diff --git a/docs/content/s3.md b/docs/content/s3.md index 4ffec06a3..5adc78ce5 100644 --- a/docs/content/s3.md +++ b/docs/content/s3.md @@ -266,8 +266,33 @@ The modified time is stored as metadata on the object as ### Multipart uploads ### rclone supports multipart uploads with S3 which means that it can -upload files bigger than 5GB. Note that files uploaded *both* with -multipart upload *and* through crypt remotes do not have MD5 sums. +upload files bigger than 5GB. + +Note that files uploaded *both* with multipart upload *and* through +crypt remotes do not have MD5 sums. + +Rclone switches from single part uploads to multipart uploads at the +point specified by `--s3-upload-cutoff`. This can be a maximum of 5GB +and a minimum of 0 (ie always upload mulipart files). + +The chunk sizes used in the multipart upload are specified by +`--s3-chunk-size` and the number of chunks uploaded concurrently is +specified by `--s3-upload-concurrency`. + +Multipart uploads will use `--transfers` * `--s3-upload-concurrency` * +`--s3-chunk-size` extra memory. Single part uploads to not use extra +memory. + +Single part transfers can be faster than multipart transfers or slower +depending on your latency from S3 - the more latency, the more likely +single part transfers will be faster. + +Increasing `--s3-upload-concurrency` will increase throughput (8 would +be a sensible value) and increasing `--s3-chunk-size` also increases +througput (16M would be sensible). Increasing either of these will +use more memory. The default values are high enough to gain most of +the possible performance without using too much memory. + ### Buckets and Regions ### @@ -832,12 +857,24 @@ The storage class to use when storing new objects in S3. Here are the advanced options specific to s3 (Amazon S3 Compliant Storage Providers (AWS, Ceph, Dreamhost, IBM COS, Minio)). +#### --s3-upload-cutoff + +Cutoff for switching to chunked upload + +Any files larger than this will be uploaded in chunks of chunk_size. +The minimum is 0 and the maximum is 5GB. + +- Config: upload_cutoff +- Env Var: RCLONE_S3_UPLOAD_CUTOFF +- Type: SizeSuffix +- Default: 200M + #### --s3-chunk-size Chunk size to use for uploading. -Any files larger than this will be uploaded in chunks of this -size. The default is 5MB. The minimum is 5MB. +When uploading files larger than upload_cutoff they will be uploaded +as multipart uploads using this chunk size. Note that "--s3-upload-concurrency" chunks of this size are buffered in memory per transfer.