diff --git a/backend/dropbox/dropbox.go b/backend/dropbox/dropbox.go index ad5c95189..0bf5c4567 100755 --- a/backend/dropbox/dropbox.go +++ b/backend/dropbox/dropbox.go @@ -22,6 +22,7 @@ of path_display and all will be well. */ import ( + "bytes" "context" "fmt" "io" @@ -29,6 +30,7 @@ import ( "path" "regexp" "strings" + "sync" "time" "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox" @@ -47,6 +49,7 @@ import ( "github.com/rclone/rclone/fs/config/obscure" "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/atexit" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/oauthutil" "github.com/rclone/rclone/lib/pacer" @@ -61,6 +64,7 @@ const ( minSleep = 10 * time.Millisecond maxSleep = 2 * time.Second decayConstant = 2 // bigger for slower decay, exponential + maxBatchSize = 1000 // Upload chunk size - setting too small makes uploads slow. // Chunks are buffered into memory for retries. // @@ -142,6 +146,23 @@ memory. It can be set smaller if you are tight on memory.`, maxChunkSize), Help: "Impersonate this user when using a business account.", Default: "", Advanced: true, + }, { + Name: "batch", + Help: `Enable batching of files if non-zero. + +This sets the batch size of files to upload. It has to be less than 1000. A +sensible setting is probably 1000 if you are using this feature. + +Rclone will close any outstanding batches when it exits. + +Setting this is a great idea if you are uploading lots of small files as it will +make them a lot quicker. You can use --transfers 32 to maximise throughput. + +It has the downside that rclone can't check the hash of the file after upload, +so using "rclone check" after the transfer completes is recommended. +`, + Default: 0, + Advanced: true, }, { Name: config.ConfigEncoding, Help: config.ConfigEncodingHelp, @@ -163,6 +184,7 @@ memory. It can be set smaller if you are tight on memory.`, maxChunkSize), type Options struct { ChunkSize fs.SizeSuffix `config:"chunk_size"` Impersonate string `config:"impersonate"` + Batch int `config:"batch"` Enc encoder.MultiEncoder `config:"encoding"` } @@ -180,6 +202,7 @@ type Fs struct { slashRootSlash string // root with "/" prefix and postfix, lowercase pacer *fs.Pacer // To pace the API calls ns string // The namespace we are using or "" for none + batcher *batcher // batch builder } // Object describes a dropbox object @@ -195,6 +218,118 @@ type Object struct { // ------------------------------------------------------------ +// batcher holds info about the current items waiting for upload +type batcher struct { + f *Fs // Fs this batch is part of + mu sync.Mutex // lock for vars below + maxBatch int // maximum size for batch + active int // number of batches being sent + items []*files.UploadSessionFinishArg // current uncommitted files + atexit atexit.FnHandle // atexit handle +} + +// newBatcher creates a new batcher structure +func newBatcher(f *Fs, maxBatch int) *batcher { + return &batcher{ + f: f, + maxBatch: maxBatch, + } +} + +// Start starts adding an item to a batch returning true if it was +// successfully started +// +// This should be paired with End +func (b *batcher) Start() bool { + if b.maxBatch <= 0 { + return false + } + b.mu.Lock() + defer b.mu.Unlock() + b.active++ + // FIXME set a timer or something + return true +} + +// End ends adding an item +func (b *batcher) End(started bool) error { + if !started { + return nil + } + b.mu.Lock() + defer b.mu.Unlock() + b.active-- + if len(b.items) < b.maxBatch { + return nil + } + return b._commit(false) +} + +// commit a batch - call with batchMu held +// +// if finalizing is true then it doesn't unregister Finalize as this +// causes a deadlock during finalization. +func (b *batcher) _commit(finalizing bool) (err error) { + batch := "batch" + if finalizing { + batch = "last batch" + } + fs.Debugf(b.f, "comitting %s length %d", batch, len(b.items)) + // FIXME this ignores the objects returned + var arg = &files.UploadSessionFinishBatchArg{ + Entries: b.items, + } + //var res *file.UploadSessionFinishBatchLaunch + err = b.f.pacer.Call(func() (bool, error) { + _, err = b.f.srv.UploadSessionFinishBatch(arg) + // If error is insufficient space then don't retry + if e, ok := err.(files.UploadSessionFinishAPIError); ok { + if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace { + err = fserrors.NoRetryError(err) + return false, err + } + } + // after the first chunk is uploaded, we retry everything + return err != nil, err + }) + if err != nil { + return err + } + // Show batches are empty + b.items = nil + if !finalizing { + atexit.Unregister(b.atexit) + b.atexit = nil + } + return nil +} + +// Add adds a finished item to the batch +func (b *batcher) Add(commitInfo *files.UploadSessionFinishArg) { + fs.Debugf(b.f, "adding %q to batch", commitInfo.Commit.Path) + b.mu.Lock() + defer b.mu.Unlock() + b.items = append(b.items, commitInfo) + if b.atexit == nil { + b.atexit = atexit.Register(b.Finalize) + } +} + +// Finalize finishes any pending batches +func (b *batcher) Finalize() { + b.mu.Lock() + defer b.mu.Unlock() + if len(b.items) == 0 { + return + } + err := b._commit(true) + if err != nil { + fs.Errorf(b.f, "Failed to finalize last batch: %v", err) + } +} + +// ------------------------------------------------------------ + // Name of the remote (as passed into NewFs) func (f *Fs) Name() string { return f.name @@ -273,6 +408,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, errors.Wrap(err, "dropbox: chunk size") } + if opt.Batch > maxBatchSize || opt.Batch < 0 { + return nil, errors.Errorf("dropbox: batch must be < %d and >= 0 - it is currently %d", maxBatchSize, opt.Batch) + } // Convert the old token if it exists. The old token was just // just a string, the new one is a JSON blob @@ -297,6 +435,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { opt: *opt, pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } + f.batcher = newBatcher(f, f.opt.Batch) config := dropbox.Config{ LogLevel: dropbox.LogOff, // logging in the SDK: LogOff, LogDebug, LogInfo Client: oAuthClient, // maybe??? @@ -1044,6 +1183,13 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read // unknown (i.e. -1) or smaller than uploadChunkSize, the method incurs an // avoidable request to the Dropbox API that does not carry payload. func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) { + batching := o.fs.batcher.Start() + defer func() { + batchErr := o.fs.batcher.End(batching) + if err != nil { + err = batchErr + } + }() chunkSize := int64(o.fs.opt.ChunkSize) chunks := 0 if size != -1 { @@ -1062,6 +1208,10 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size } } + appendArg := files.UploadSessionAppendArg{ + Close: chunks == 1, + } + // write the first chunk fmtChunk(1, false) var res *files.UploadSessionStartResult @@ -1071,7 +1221,10 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size if _, err = chunk.Seek(0, io.SeekStart); err != nil { return false, nil } - res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, chunk) + arg := files.UploadSessionStartArg{ + Close: appendArg.Close, + } + res, err = o.fs.srv.UploadSessionStart(&arg, chunk) return shouldRetry(err) }) if err != nil { @@ -1082,22 +1235,34 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size SessionId: res.SessionId, Offset: 0, } - appendArg := files.UploadSessionAppendArg{ - Cursor: &cursor, - Close: false, - } + appendArg.Cursor = &cursor - // write more whole chunks (if any) + // write more whole chunks (if any, and if !batching), if + // batching write the last chunk also. currentChunk := 2 for { - if chunks > 0 && currentChunk >= chunks { - // if the size is known, only upload full chunks. Remaining bytes are uploaded with - // the UploadSessionFinish request. - break - } else if chunks == 0 && in.BytesRead()-cursor.Offset < uint64(chunkSize) { - // if the size is unknown, upload as long as we can read full chunks from the reader. - // The UploadSessionFinish request will not contain any payload. - break + if chunks > 0 { + // Size known + if currentChunk == chunks { + // Last chunk + if !batching { + // if the size is known, only upload full chunks. Remaining bytes are uploaded with + // the UploadSessionFinish request. + break + } + appendArg.Close = true + } else if currentChunk > chunks { + break + } + } else { + // Size unknown + lastReadWasShort := in.BytesRead()-cursor.Offset < uint64(chunkSize) + if lastReadWasShort { + // if the size is unknown, upload as long as we can read full chunks from the reader. + // The UploadSessionFinish request will not contain any payload. + // This is also what we want if batching + break + } } cursor.Offset = in.BytesRead() fmtChunk(currentChunk, false) @@ -1123,6 +1288,26 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size Cursor: &cursor, Commit: commitInfo, } + // If we are batching then we should have written all the data now + // store the commit info now for a batch commit + if batching { + // If we haven't closed the session then we need to + if !appendArg.Close { + fs.Debugf(o, "Closing session") + var empty bytes.Buffer + err = o.fs.pacer.Call(func() (bool, error) { + err = o.fs.srv.UploadSessionAppendV2(&appendArg, &empty) + // after the first chunk is uploaded, we retry everything + return err != nil, err + }) + if err != nil { + return nil, err + } + } + o.fs.batcher.Add(args) + return nil, nil + } + fmtChunk(currentChunk, true) chunk = readers.NewRepeatableReaderBuffer(in, buf) err = o.fs.pacer.Call(func() (bool, error) { @@ -1165,7 +1350,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op size := src.Size() var err error var entry *files.FileMetadata - if size > int64(o.fs.opt.ChunkSize) || size == -1 { + if size > int64(o.fs.opt.ChunkSize) || size == -1 || o.fs.opt.Batch > 0 { entry, err = o.uploadChunked(in, commitInfo, size) } else { err = o.fs.pacer.CallNoRetry(func() (bool, error) { @@ -1176,6 +1361,13 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op if err != nil { return errors.Wrap(err, "upload failed") } + // If we haven't received data back from batch upload then fake it + if entry == nil { + o.bytes = size + o.modTime = commitInfo.ClientModified + o.hash = "" // we don't have this + return nil + } return o.setMetadataFromEntry(entry) }