diff --git a/backend/qingstor/qingstor.go b/backend/qingstor/qingstor.go index 1438cf616..edd4b3a0b 100644 --- a/backend/qingstor/qingstor.go +++ b/backend/qingstor/qingstor.go @@ -72,14 +72,51 @@ func init() { Help: "Number of connection retries.", Default: 3, Advanced: true, + }, { + Name: "upload_cutoff", + Help: `Cutoff for switching to chunked upload + +Any files larger than this will be uploaded in chunks of chunk_size. +The minimum is 0 and the maximum is 5GB.`, + Default: defaultUploadCutoff, + Advanced: true, + }, { + Name: "chunk_size", + Help: `Chunk size to use for uploading. + +When uploading files larger than upload_cutoff they will be uploaded +as multipart uploads using this chunk size. + +Note that "--qingstor-upload-concurrency" chunks of this size are buffered +in memory per transfer. + +If you are transferring large files over high speed links and you have +enough memory, then increasing this will speed up the transfers.`, + Default: minChunkSize, + Advanced: true, + }, { + Name: "upload_concurrency", + Help: `Concurrency for multipart uploads. + +This is the number of chunks of the same file that are uploaded +concurrently. + +If you are uploading small numbers of large file over high speed link +and these uploads do not fully utilize your bandwidth, then increasing +this may help to speed up the transfers.`, + Default: 4, + Advanced: true, }}, }) } // Constants const ( - listLimitSize = 1000 // Number of items to read at once - maxSizeForCopy = 1024 * 1024 * 1024 * 5 // The maximum size of object we can COPY + listLimitSize = 1000 // Number of items to read at once + maxSizeForCopy = 1024 * 1024 * 1024 * 5 // The maximum size of object we can COPY + minChunkSize = fs.SizeSuffix(minMultiPartSize) + defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024) + maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024) ) // Globals @@ -92,12 +129,15 @@ func timestampToTime(tp int64) time.Time { // Options defines the configuration for this backend type Options struct { - EnvAuth bool `config:"env_auth"` - AccessKeyID string `config:"access_key_id"` - SecretAccessKey string `config:"secret_access_key"` - Endpoint string `config:"endpoint"` - Zone string `config:"zone"` - ConnectionRetries int `config:"connection_retries"` + EnvAuth bool `config:"env_auth"` + AccessKeyID string `config:"access_key_id"` + SecretAccessKey string `config:"secret_access_key"` + Endpoint string `config:"endpoint"` + Zone string `config:"zone"` + ConnectionRetries int `config:"connection_retries"` + UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` + ChunkSize fs.SizeSuffix `config:"chunk_size"` + UploadConcurrency int `config:"upload_concurrency"` } // Fs represents a remote qingstor server @@ -227,6 +267,36 @@ func qsServiceConnection(opt *Options) (*qs.Service, error) { return qs.Init(cf) } +func checkUploadChunkSize(cs fs.SizeSuffix) error { + if cs < minChunkSize { + return errors.Errorf("%s is less than %s", cs, minChunkSize) + } + return nil +} + +func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadChunkSize(cs) + if err == nil { + old, f.opt.ChunkSize = f.opt.ChunkSize, cs + } + return +} + +func checkUploadCutoff(cs fs.SizeSuffix) error { + if cs > maxUploadCutoff { + return errors.Errorf("%s is greater than %s", cs, maxUploadCutoff) + } + return nil +} + +func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadCutoff(cs) + if err == nil { + old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs + } + return +} + // NewFs constructs an Fs from the path, bucket:path func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { // Parse config into Options struct @@ -235,6 +305,14 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, err } + err = checkUploadChunkSize(opt.ChunkSize) + if err != nil { + return nil, errors.Wrap(err, "qingstor: chunk size") + } + err = checkUploadCutoff(opt.UploadCutoff) + if err != nil { + return nil, errors.Wrap(err, "qingstor: upload cutoff") + } bucket, key, err := qsParsePath(root) if err != nil { return nil, err @@ -913,16 +991,24 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio mimeType := fs.MimeType(src) req := uploadInput{ - body: in, - qsSvc: o.fs.svc, - bucket: o.fs.bucket, - zone: o.fs.zone, - key: key, - mimeType: mimeType, + body: in, + qsSvc: o.fs.svc, + bucket: o.fs.bucket, + zone: o.fs.zone, + key: key, + mimeType: mimeType, + partSize: int64(o.fs.opt.ChunkSize), + concurrency: o.fs.opt.UploadConcurrency, } uploader := newUploader(&req) - err = uploader.upload() + size := src.Size() + multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff) + if multipart { + err = uploader.upload() + } else { + err = uploader.singlePartUpload(in, size) + } if err != nil { return err } diff --git a/backend/qingstor/qingstor_test.go b/backend/qingstor/qingstor_test.go index 585df5196..4b0f36e13 100644 --- a/backend/qingstor/qingstor_test.go +++ b/backend/qingstor/qingstor_test.go @@ -2,12 +2,12 @@ // +build !plan9 -package qingstor_test +package qingstor import ( "testing" - "github.com/ncw/rclone/backend/qingstor" + "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fstest/fstests" ) @@ -15,6 +15,19 @@ import ( func TestIntegration(t *testing.T) { fstests.Run(t, &fstests.Opt{ RemoteName: "TestQingStor:", - NilObject: (*qingstor.Object)(nil), + NilObject: (*Object)(nil), + ChunkedUpload: fstests.ChunkedUploadConfig{ + MinChunkSize: minChunkSize, + }, }) } + +func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadChunkSize(cs) +} + +func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadCutoff(cs) +} + +var _ fstests.SetUploadChunkSizer = (*Fs)(nil) diff --git a/backend/qingstor/upload.go b/backend/qingstor/upload.go index 43f635c90..d529beb85 100644 --- a/backend/qingstor/upload.go +++ b/backend/qingstor/upload.go @@ -152,11 +152,11 @@ func (u *uploader) init() { } // singlePartUpload upload a single object that contentLength less than "defaultUploadPartSize" -func (u *uploader) singlePartUpload(buf io.ReadSeeker) error { +func (u *uploader) singlePartUpload(buf io.Reader, size int64) error { bucketInit, _ := u.bucketInit() req := qs.PutObjectInput{ - ContentLength: &u.readerPos, + ContentLength: &size, ContentType: &u.cfg.mimeType, Body: buf, } @@ -180,7 +180,7 @@ func (u *uploader) upload() error { reader, _, err := u.nextReader() if err == io.EOF { // single part fs.Debugf(u, "Uploading as single part object to QingStor") - return u.singlePartUpload(reader) + return u.singlePartUpload(reader, u.readerPos) } else if err != nil { return errors.Errorf("read upload data failed: %s", err) } diff --git a/docs/content/qingstor.md b/docs/content/qingstor.md index b46a4d957..4b740f7f2 100644 --- a/docs/content/qingstor.md +++ b/docs/content/qingstor.md @@ -234,4 +234,50 @@ Number of connection retries. - Type: int - Default: 3 +#### --qingstor-upload-cutoff + +Cutoff for switching to chunked upload + +Any files larger than this will be uploaded in chunks of chunk_size. +The minimum is 0 and the maximum is 5GB. + +- Config: upload_cutoff +- Env Var: RCLONE_QINGSTOR_UPLOAD_CUTOFF +- Type: SizeSuffix +- Default: 200M + +#### --qingstor-chunk-size + +Chunk size to use for uploading. + +When uploading files larger than upload_cutoff they will be uploaded +as multipart uploads using this chunk size. + +Note that "--qingstor-upload-concurrency" chunks of this size are buffered +in memory per transfer. + +If you are transferring large files over high speed links and you have +enough memory, then increasing this will speed up the transfers. + +- Config: chunk_size +- Env Var: RCLONE_QINGSTOR_CHUNK_SIZE +- Type: SizeSuffix +- Default: 4M + +#### --qingstor-upload-concurrency + +Concurrency for multipart uploads. + +This is the number of chunks of the same file that are uploaded +concurrently. + +If you are uploading small numbers of large file over high speed link +and these uploads do not fully utilize your bandwidth, then increasing +this may help to speed up the transfers. + +- Config: upload_concurrency +- Env Var: RCLONE_QINGSTOR_UPLOAD_CONCURRENCY +- Type: int +- Default: 4 +