From 2e4b65f888db6bff018e363b35c832fa90349f1a Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Mon, 7 Sep 2020 11:08:46 +0100 Subject: [PATCH] dropbox: add --dropbox-batch-mode flag to speed up uploading #5156 This adds 3 upload modes for dropbox off, sync and async and makes sync the default. This should improve uploads (especially for small files) greatly. --- backend/dropbox/batcher.go | 332 +++++++++++++++++++++++++++++++++++++ backend/dropbox/dropbox.go | 160 +++++++++++++++--- docs/content/dropbox.md | 61 ++++++- 3 files changed, 533 insertions(+), 20 deletions(-) create mode 100644 backend/dropbox/batcher.go diff --git a/backend/dropbox/batcher.go b/backend/dropbox/batcher.go new file mode 100644 index 000000000..695b9c82a --- /dev/null +++ b/backend/dropbox/batcher.go @@ -0,0 +1,332 @@ +// This file contains the implementation of the sync batcher for uploads +// +// Dropbox rules say you can start as many batches as you want, but +// you may only have one batch being committed and must wait for the +// batch to be finished before committing another. + +package dropbox + +import ( + "context" + "sync" + "time" + + "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/async" + "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/files" + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/fserrors" + "github.com/rclone/rclone/lib/atexit" +) + +const ( + maxBatchSize = 1000 // max size the batch can be + defaultTimeoutSync = 500 * time.Millisecond // kick off the batch if nothing added for this long (sync) + defaultTimeoutAsync = 10 * time.Second // kick off the batch if nothing added for this long (ssync) + defaultBatchSizeAsync = 100 // default batch size if async +) + +// batcher holds info about the current items waiting for upload +type batcher struct { + f *Fs // Fs this batch is part of + mode string // configured batch mode + size int // maximum size for batch + timeout time.Duration // idle timeout for batch + async bool // whether we are using async batching + in chan batcherRequest // incoming items to batch + quit chan struct{} // close to quit the loop + atexit atexit.FnHandle // atexit handle + shutOnce sync.Once // make sure we shutdown once only + wg sync.WaitGroup // wait for shutdown +} + +// batcherRequest holds an incoming request with a place for a reply +type batcherRequest struct { + commitInfo *files.UploadSessionFinishArg + result chan<- batcherResponse +} + +// batcherResponse holds a response to be delivered to clients waiting +// for a batch to complete. +type batcherResponse struct { + err error + entry *files.FileMetadata +} + +// newBatcher creates a new batcher structure +func newBatcher(ctx context.Context, f *Fs, mode string, size int, timeout time.Duration) (*batcher, error) { + // fs.Debugf(f, "Creating batcher with mode %q, size %d, timeout %v", mode, size, timeout) + if size > maxBatchSize || size < 0 { + return nil, errors.Errorf("dropbox: batch size must be < %d and >= 0 - it is currently %d", maxBatchSize, size) + } + + async := false + + switch mode { + case "sync": + if size <= 0 { + ci := fs.GetConfig(ctx) + size = ci.Transfers + } + if timeout <= 0 { + timeout = defaultTimeoutSync + } + case "async": + if size <= 0 { + size = defaultBatchSizeAsync + } + if timeout <= 0 { + timeout = defaultTimeoutAsync + } + async = true + case "off": + size = 0 + default: + return nil, errors.Errorf("dropbox: batch mode must be sync|async|off not %q", mode) + } + + b := &batcher{ + f: f, + mode: mode, + size: size, + timeout: timeout, + async: async, + in: make(chan batcherRequest, size), + quit: make(chan struct{}), + } + if b.Batching() { + b.atexit = atexit.Register(b.Shutdown) + b.wg.Add(1) + go b.commitLoop(context.Background()) + } + return b, nil + +} + +// Batching returns true if batching is active +func (b *batcher) Batching() bool { + return b.size > 0 +} + +// finishBatch commits the batch, returning a batch status to poll or maybe complete +func (b *batcher) finishBatch(ctx context.Context, items []*files.UploadSessionFinishArg) (batchStatus *files.UploadSessionFinishBatchLaunch, err error) { + var arg = &files.UploadSessionFinishBatchArg{ + Entries: items, + } + err = b.f.pacer.Call(func() (bool, error) { + batchStatus, err = b.f.srv.UploadSessionFinishBatch(arg) + // If error is insufficient space then don't retry + if e, ok := err.(files.UploadSessionFinishAPIError); ok { + if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace { + err = fserrors.NoRetryError(err) + return false, err + } + } + // after the first chunk is uploaded, we retry everything + return err != nil, err + }) + if err != nil { + return nil, errors.Wrap(err, "batch commit failed") + } + return batchStatus, nil +} + +// finishBatchJobStatus waits for the batch to complete returning completed entries +func (b *batcher) finishBatchJobStatus(ctx context.Context, launchBatchStatus *files.UploadSessionFinishBatchLaunch) (complete *files.UploadSessionFinishBatchResult, err error) { + if launchBatchStatus.AsyncJobId == "" { + return nil, errors.New("wait for batch completion: empty job ID") + } + var batchStatus *files.UploadSessionFinishBatchJobStatus + sleepTime := 100 * time.Millisecond + const maxTries = 120 + for try := 1; try <= maxTries; try++ { + err = b.f.pacer.Call(func() (bool, error) { + batchStatus, err = b.f.srv.UploadSessionFinishBatchCheck(&async.PollArg{ + AsyncJobId: launchBatchStatus.AsyncJobId, + }) + return shouldRetry(ctx, err) + }) + if err != nil { + return nil, errors.Wrap(err, "wait for batch completion: check failed") + } + if batchStatus.Tag == "complete" { + break + } + fs.Debugf(b.f, "Sleeping for %v to wait for batch to complete: %q: try %d/%d", sleepTime, batchStatus.Tag, try, maxTries) + time.Sleep(sleepTime) + sleepTime *= 2 + if sleepTime > time.Second { + sleepTime = time.Second + } + } + return batchStatus.Complete, nil +} + +// commit a batch +func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionFinishArg, results []chan<- batcherResponse) (err error) { + // If commit fails then signal clients if sync + var signalled = b.async + defer func() { + if err != nil && signalled { + // Signal to clients that there was an error + for _, result := range results { + result <- batcherResponse{err: err} + } + } + }() + fs.Debugf(b.f, "Committing %s batch length %d", b.mode, len(items)) + + // finalise the batch getting either a result or a job id to poll + batchStatus, err := b.finishBatch(ctx, items) + if err != nil { + return err + } + + // check whether batch is complete + var complete *files.UploadSessionFinishBatchResult + switch batchStatus.Tag { + case "async_job_id": + // wait for batch to complete + complete, err = b.finishBatchJobStatus(ctx, batchStatus) + if err != nil { + return err + } + case "complete": + complete = batchStatus.Complete + default: + return errors.Errorf("batch returned unknown status %q", batchStatus.Tag) + } + + // Check we got the right number of entries + entries := complete.Entries + if len(entries) != len(results) { + return errors.Errorf("expecting %d items in batch but got %d", len(results), len(entries)) + } + + // Report results to clients + var ( + errorTag = "" + errorCount = 0 + ) + for i := range results { + item := entries[i] + resp := batcherResponse{} + if item.Tag == "success" { + resp.entry = item.Success + } else { + errorCount++ + errorTag = item.Tag + if item.Failure != nil { + errorTag = item.Failure.Tag + if item.Failure.LookupFailed != nil { + errorTag += "/" + item.Failure.LookupFailed.Tag + } + if item.Failure.Path != nil { + errorTag += "/" + item.Failure.Path.Tag + } + if item.Failure.PropertiesError != nil { + errorTag += "/" + item.Failure.PropertiesError.Tag + } + } + resp.err = errors.Errorf("batch upload failed: %s", errorTag) + } + if !b.async { + results[i] <- resp + } + } + // Show signalled so no need to report error to clients from now on + signalled = true + + // Report an error if any failed in the batch + if errorTag != "" { + return errors.Errorf("batch had %d errors: last error: %s", errorCount, errorTag) + } + + return nil +} + +// commitLoop runs the commit engine in the background +func (b *batcher) commitLoop(ctx context.Context) { + var ( + items []*files.UploadSessionFinishArg // current batch of uncommitted files + results []chan<- batcherResponse // current batch of clients awaiting results + idleTimer = time.NewTimer(b.timeout) + commit = func() { + err := b.commitBatch(ctx, items, results) + if err != nil { + fs.Errorf(b.f, "%s batch commit: failed to commit batch length %d: %v", b.mode, len(items), err) + } + items, results = nil, nil + } + ) + defer b.wg.Done() + defer idleTimer.Stop() + idleTimer.Stop() + +outer: + for { + select { + case <-b.quit: + break outer + case req, ok := <-b.in: + if !ok { + break outer + } + items = append(items, req.commitInfo) + results = append(results, req.result) + idleTimer.Stop() + if len(items) >= b.size { + commit() + } else { + idleTimer.Reset(b.timeout) + } + case <-idleTimer.C: + if len(items) > 0 { + fs.Debugf(b.f, "Batch idle for %v so committing", b.timeout) + commit() + } + } + + } + // commit any remaining items + if len(items) > 0 { + commit() + } +} + +// Shutdown finishes any pending batches then shuts everything down +// +// Can be called from atexit handler +func (b *batcher) Shutdown() { + b.shutOnce.Do(func() { + atexit.Unregister(b.atexit) + // quit the commitLoop. Note that we don't close b.in + // because that will cause write to closed channel + close(b.quit) + b.wg.Wait() + }) +} + +// Commit commits the file using a batch call, first adding it to the +// batch and then waiting for the batch to complete in a synchronous +// way if async is not set. +func (b *batcher) Commit(ctx context.Context, commitInfo *files.UploadSessionFinishArg) (entry *files.FileMetadata, err error) { + select { + case <-b.in: + // pause this goroutine as we are quitting + select {} + default: + } + fs.Debugf(b.f, "Adding %q to batch", commitInfo.Commit.Path) + resp := make(chan batcherResponse, 1) + b.in <- batcherRequest{ + commitInfo: commitInfo, + result: resp, + } + // If running async then don't wait for the result + if b.async { + return nil, nil + } + result := <-resp + return result.entry, result.err +} diff --git a/backend/dropbox/dropbox.go b/backend/dropbox/dropbox.go index 3527964ad..6ce8c2efd 100755 --- a/backend/dropbox/dropbox.go +++ b/backend/dropbox/dropbox.go @@ -22,6 +22,7 @@ of path_display and all will be well. */ import ( + "bytes" "context" "fmt" "io" @@ -209,6 +210,63 @@ Note that we don't unmount the shared folder afterwards so the shared folder.`, Default: false, Advanced: true, + }, { + Name: "batch_mode", + Help: `Upload file batching sync|async|off. + +This sets the batch mode used by rclone. + +For full info see [the main docs](https://rclone.org/dropbox/#batch-mode) + +This has 3 possible values + +- off - no batching +- sync - batch uploads and check completion (default) +- async - batch upload and don't check completion + +Rclone will close any outstanding batches when it exits which may make +a delay on quit. +`, + Default: "sync", + Advanced: true, + }, { + Name: "batch_size", + Help: `Max number of files in upload batch. + +This sets the batch size of files to upload. It has to be less than 1000. + +By default this is 0 which means rclone which calculate the batch size +depending on the setting of batch_mode. + +- batch_mode: async - default batch_size is 100 +- batch_mode: sync - default batch_size is the same as --transfers +- batch_mode: off - not in use + +Rclone will close any outstanding batches when it exits which may make +a delay on quit. + +Setting this is a great idea if you are uploading lots of small files +as it will make them a lot quicker. You can use --transfers 32 to +maximise throughput. +`, + Default: 0, + Advanced: true, + }, { + Name: "batch_timeout", + Help: `Max time to allow an idle upload batch before uploading + +If an upload batch is idle for more than this long then it will be +uploaded. + +The default for this is 0 which means rclone will choose a sensible +default based on the batch_mode in use. + +- batch_mode: async - default batch_timeout is 500ms +- batch_mode: sync - default batch_timeout is 10s +- batch_mode: off - not in use +`, + Default: fs.Duration(0), + Advanced: true, }, { Name: config.ConfigEncoding, Help: config.ConfigEncodingHelp, @@ -232,6 +290,10 @@ type Options struct { Impersonate string `config:"impersonate"` SharedFiles bool `config:"shared_files"` SharedFolders bool `config:"shared_folders"` + BatchMode string `config:"batch_mode"` + BatchSize int `config:"batch_size"` + BatchTimeout fs.Duration `config:"batch_timeout"` + AsyncBatch bool `config:"async_batch"` Enc encoder.MultiEncoder `config:"encoding"` } @@ -251,6 +313,7 @@ type Fs struct { slashRootSlash string // root with "/" prefix and postfix, lowercase pacer *fs.Pacer // To pace the API calls ns string // The namespace we are using or "" for none + batcher *batcher // batch builder } // Object describes a dropbox object @@ -266,8 +329,6 @@ type Object struct { hash string // content_hash of the object } -// ------------------------------------------------------------ - // Name of the remote (as passed into NewFs) func (f *Fs) Name() string { return f.name @@ -378,6 +439,10 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e ci: ci, pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } + f.batcher, err = newBatcher(ctx, f, f.opt.BatchMode, f.opt.BatchSize, time.Duration(f.opt.BatchTimeout)) + if err != nil { + return nil, err + } cfg := dropbox.Config{ LogLevel: dropbox.LogOff, // logging in the SDK: LogOff, LogDebug, LogInfo Client: oAuthClient, // maybe??? @@ -1377,6 +1442,13 @@ func (f *Fs) Hashes() hash.Set { return hash.Set(DbHashType) } +// Shutdown the backend, closing any background tasks and any +// cached connections. +func (f *Fs) Shutdown(ctx context.Context) error { + f.batcher.Shutdown() + return nil +} + // ------------------------------------------------------------ // Fs returns the parent Fs @@ -1540,9 +1612,10 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read // unknown (i.e. -1) or smaller than uploadChunkSize, the method incurs an // avoidable request to the Dropbox API that does not carry payload. func (o *Object) uploadChunked(ctx context.Context, in0 io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) { + batching := o.fs.batcher.Batching() chunkSize := int64(o.fs.opt.ChunkSize) chunks := 0 - if size != -1 { + if size >= 0 { chunks = int(size/chunkSize) + 1 } in := readers.NewCountingReader(in0) @@ -1553,11 +1626,15 @@ func (o *Object) uploadChunked(ctx context.Context, in0 io.Reader, commitInfo *f fs.Debugf(o, "Streaming chunk %d/%d", cur, cur) } else if chunks == 0 { fs.Debugf(o, "Streaming chunk %d/unknown", cur) - } else { + } else if chunks != 1 { fs.Debugf(o, "Uploading chunk %d/%d", cur, chunks) } } + appendArg := files.UploadSessionAppendArg{ + Close: chunks == 1, + } + // write the first chunk fmtChunk(1, false) var res *files.UploadSessionStartResult @@ -1567,7 +1644,10 @@ func (o *Object) uploadChunked(ctx context.Context, in0 io.Reader, commitInfo *f if _, err = chunk.Seek(0, io.SeekStart); err != nil { return false, nil } - res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, chunk) + arg := files.UploadSessionStartArg{ + Close: appendArg.Close, + } + res, err = o.fs.srv.UploadSessionStart(&arg, chunk) return shouldRetry(ctx, err) }) if err != nil { @@ -1578,22 +1658,34 @@ func (o *Object) uploadChunked(ctx context.Context, in0 io.Reader, commitInfo *f SessionId: res.SessionId, Offset: 0, } - appendArg := files.UploadSessionAppendArg{ - Cursor: &cursor, - Close: false, - } + appendArg.Cursor = &cursor - // write more whole chunks (if any) + // write more whole chunks (if any, and if !batching), if + // batching write the last chunk also. currentChunk := 2 for { - if chunks > 0 && currentChunk >= chunks { - // if the size is known, only upload full chunks. Remaining bytes are uploaded with - // the UploadSessionFinish request. - break - } else if chunks == 0 && in.BytesRead()-cursor.Offset < uint64(chunkSize) { - // if the size is unknown, upload as long as we can read full chunks from the reader. - // The UploadSessionFinish request will not contain any payload. - break + if chunks > 0 { + // Size known + if currentChunk == chunks { + // Last chunk + if !batching { + // if the size is known, only upload full chunks. Remaining bytes are uploaded with + // the UploadSessionFinish request. + break + } + appendArg.Close = true + } else if currentChunk > chunks { + break + } + } else { + // Size unknown + lastReadWasShort := in.BytesRead()-cursor.Offset < uint64(chunkSize) + if lastReadWasShort { + // if the size is unknown, upload as long as we can read full chunks from the reader. + // The UploadSessionFinish request will not contain any payload. + // This is also what we want if batching + break + } } cursor.Offset = in.BytesRead() fmtChunk(currentChunk, false) @@ -1619,6 +1711,26 @@ func (o *Object) uploadChunked(ctx context.Context, in0 io.Reader, commitInfo *f Cursor: &cursor, Commit: commitInfo, } + // If we are batching then we should have written all the data now + // store the commit info now for a batch commit + if batching { + // If we haven't closed the session then we need to + if !appendArg.Close { + appendArg.Close = true + fs.Debugf(o, "Closing session") + var empty bytes.Buffer + err = o.fs.pacer.Call(func() (bool, error) { + err = o.fs.srv.UploadSessionAppendV2(&appendArg, &empty) + // after the first chunk is uploaded, we retry everything + return err != nil, err + }) + if err != nil { + return nil, err + } + } + return o.fs.batcher.Commit(ctx, args) + } + fmtChunk(currentChunk, true) chunk = readers.NewRepeatableReaderBuffer(in, buf) err = o.fs.pacer.Call(func() (bool, error) { @@ -1693,7 +1805,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op size := src.Size() var err error var entry *files.FileMetadata - if size > int64(o.fs.opt.ChunkSize) || size == -1 { + if size > int64(o.fs.opt.ChunkSize) || size < 0 || o.fs.batcher.Batching() { entry, err = o.uploadChunked(ctx, in, commitInfo, size) } else { err = o.fs.pacer.CallNoRetry(func() (bool, error) { @@ -1704,6 +1816,15 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op if err != nil { return errors.Wrap(err, "upload failed") } + // If we haven't received data back from batch upload then fake it + // + // This will only happen if we are uploading async batches + if entry == nil { + o.bytes = size + o.modTime = commitInfo.ClientModified + o.hash = "" // we don't have this + return nil + } return o.setMetadataFromEntry(entry) } @@ -1731,6 +1852,7 @@ var ( _ fs.PublicLinker = (*Fs)(nil) _ fs.DirMover = (*Fs)(nil) _ fs.Abouter = (*Fs)(nil) + _ fs.Shutdowner = &Fs{} _ fs.Object = (*Object)(nil) _ fs.IDer = (*Object)(nil) ) diff --git a/docs/content/dropbox.md b/docs/content/dropbox.md index 6173e9214..a63affe92 100644 --- a/docs/content/dropbox.md +++ b/docs/content/dropbox.md @@ -100,7 +100,7 @@ Dropbox supports [its own hash type](https://www.dropbox.com/developers/reference/content-hash) which is checked for all transfers. -#### Restricted filename characters +### Restricted filename characters | Character | Value | Replacement | | --------- |:-----:|:-----------:| @@ -119,6 +119,65 @@ These only get replaced if they are the last character in the name: Invalid UTF-8 bytes will also be [replaced](/overview/#invalid-utf8), as they can't be used in JSON strings. +### Batch mode uploads {#batch-mode} + +Using batch mode uploads is very important for performance when using +the Dropbox API. See [the dropbox performance guide](https://developers.dropbox.com/dbx-performance-guide) +for more info. + +There are 3 modes rclone can use for uploads. + +#### --dropbox-batch-mode off + +In this mode rclone will not use upload batching. This was the default +before rclone v1.55. It has the disadvantage that it is very likely to +encounter `too_many_requests` errors like this + + NOTICE: too_many_requests/.: Too many requests or write operations. Trying again in 15 seconds. + +When rclone receives these it has to wait for 15s or sometimes 300s +before continuing which really slows down transfers. + +This will happen especially if `--transfers` is large, so this mode +isn't recommended except for compatibility or investigating problems. + +#### --dropbox-batch-mode sync + +In this mode rclone will batch up uploads to the size specified by +`--dropbox-batch-size` and commit them together. + +Using this mode means you can use a much higher `--transfers` +parameter (32 or 64 works fine) without receiving `too_many_requests` +errors. + +This mode ensures full data integrity. + +Note that there may be a pause when quitting rclone while rclone +finishes up the last batch using this mode. + +#### --dropbox-batch-mode async + +In this mode rclone will batch up uploads to the size specified by +`--dropbox-batch-size` and commit them together. + +However it will not wait for the status of the batch to be returned to +the caller. This means rclone can use a much bigger batch size (much +bigger than `--transfers`), at the cost of not being able to check the +status of the upload. + +This provides the maximum possible upload speed especially with lots +of small files, however rclone can't check the file got uploaded +properly using this mode. + +If you are using this mode then using "rclone check" after the +transfer completes is recommended. Or you could do an initial transfer +with `--dropbox-batch-mode async` then do a final transfer with +`--dropbox-batch-mode sync` (the default). + +Note that there may be a pause when quitting rclone while rclone +finishes up the last batch using this mode. + + {{< rem autogenerated options start" - DO NOT EDIT - instead edit fs.RegInfo in backend/dropbox/dropbox.go then run make backenddocs" >}} ### Standard Options