dropbox: add --dropbox-batch flag to speed up uploading of lots of small files

This commit is contained in:
Nick Craig-Wood 2020-09-07 11:08:46 +01:00
parent e74f5b8906
commit 56e8d75cab

View file

@ -22,6 +22,7 @@ of path_display and all will be well.
*/
import (
"bytes"
"context"
"fmt"
"io"
@ -29,6 +30,7 @@ import (
"path"
"regexp"
"strings"
"sync"
"time"
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox"
@ -47,6 +49,7 @@ import (
"github.com/rclone/rclone/fs/config/obscure"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/oauthutil"
"github.com/rclone/rclone/lib/pacer"
@ -61,6 +64,7 @@ const (
minSleep = 10 * time.Millisecond
maxSleep = 2 * time.Second
decayConstant = 2 // bigger for slower decay, exponential
maxBatchSize = 1000
// Upload chunk size - setting too small makes uploads slow.
// Chunks are buffered into memory for retries.
//
@ -142,6 +146,23 @@ memory. It can be set smaller if you are tight on memory.`, maxChunkSize),
Help: "Impersonate this user when using a business account.",
Default: "",
Advanced: true,
}, {
Name: "batch",
Help: `Enable batching of files if non-zero.
This sets the batch size of files to upload. It has to be less than 1000. A
sensible setting is probably 1000 if you are using this feature.
Rclone will close any outstanding batches when it exits.
Setting this is a great idea if you are uploading lots of small files as it will
make them a lot quicker. You can use --transfers 32 to maximise throughput.
It has the downside that rclone can't check the hash of the file after upload,
so using "rclone check" after the transfer completes is recommended.
`,
Default: 0,
Advanced: true,
}, {
Name: config.ConfigEncoding,
Help: config.ConfigEncodingHelp,
@ -163,6 +184,7 @@ memory. It can be set smaller if you are tight on memory.`, maxChunkSize),
type Options struct {
ChunkSize fs.SizeSuffix `config:"chunk_size"`
Impersonate string `config:"impersonate"`
Batch int `config:"batch"`
Enc encoder.MultiEncoder `config:"encoding"`
}
@ -180,6 +202,7 @@ type Fs struct {
slashRootSlash string // root with "/" prefix and postfix, lowercase
pacer *fs.Pacer // To pace the API calls
ns string // The namespace we are using or "" for none
batcher *batcher // batch builder
}
// Object describes a dropbox object
@ -195,6 +218,118 @@ type Object struct {
// ------------------------------------------------------------
// batcher holds info about the current items waiting for upload
type batcher struct {
f *Fs // Fs this batch is part of
mu sync.Mutex // lock for vars below
maxBatch int // maximum size for batch
active int // number of batches being sent
items []*files.UploadSessionFinishArg // current uncommitted files
atexit atexit.FnHandle // atexit handle
}
// newBatcher creates a new batcher structure
func newBatcher(f *Fs, maxBatch int) *batcher {
return &batcher{
f: f,
maxBatch: maxBatch,
}
}
// Start starts adding an item to a batch returning true if it was
// successfully started
//
// This should be paired with End
func (b *batcher) Start() bool {
if b.maxBatch <= 0 {
return false
}
b.mu.Lock()
defer b.mu.Unlock()
b.active++
// FIXME set a timer or something
return true
}
// End ends adding an item
func (b *batcher) End(started bool) error {
if !started {
return nil
}
b.mu.Lock()
defer b.mu.Unlock()
b.active--
if len(b.items) < b.maxBatch {
return nil
}
return b._commit(false)
}
// commit a batch - call with batchMu held
//
// if finalizing is true then it doesn't unregister Finalize as this
// causes a deadlock during finalization.
func (b *batcher) _commit(finalizing bool) (err error) {
batch := "batch"
if finalizing {
batch = "last batch"
}
fs.Debugf(b.f, "comitting %s length %d", batch, len(b.items))
// FIXME this ignores the objects returned
var arg = &files.UploadSessionFinishBatchArg{
Entries: b.items,
}
//var res *file.UploadSessionFinishBatchLaunch
err = b.f.pacer.Call(func() (bool, error) {
_, err = b.f.srv.UploadSessionFinishBatch(arg)
// If error is insufficient space then don't retry
if e, ok := err.(files.UploadSessionFinishAPIError); ok {
if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace {
err = fserrors.NoRetryError(err)
return false, err
}
}
// after the first chunk is uploaded, we retry everything
return err != nil, err
})
if err != nil {
return err
}
// Show batches are empty
b.items = nil
if !finalizing {
atexit.Unregister(b.atexit)
b.atexit = nil
}
return nil
}
// Add adds a finished item to the batch
func (b *batcher) Add(commitInfo *files.UploadSessionFinishArg) {
fs.Debugf(b.f, "adding %q to batch", commitInfo.Commit.Path)
b.mu.Lock()
defer b.mu.Unlock()
b.items = append(b.items, commitInfo)
if b.atexit == nil {
b.atexit = atexit.Register(b.Finalize)
}
}
// Finalize finishes any pending batches
func (b *batcher) Finalize() {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.items) == 0 {
return
}
err := b._commit(true)
if err != nil {
fs.Errorf(b.f, "Failed to finalize last batch: %v", err)
}
}
// ------------------------------------------------------------
// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
return f.name
@ -273,6 +408,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
if err != nil {
return nil, errors.Wrap(err, "dropbox: chunk size")
}
if opt.Batch > maxBatchSize || opt.Batch < 0 {
return nil, errors.Errorf("dropbox: batch must be < %d and >= 0 - it is currently %d", maxBatchSize, opt.Batch)
}
// Convert the old token if it exists. The old token was just
// just a string, the new one is a JSON blob
@ -297,6 +435,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
opt: *opt,
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.batcher = newBatcher(f, f.opt.Batch)
config := dropbox.Config{
LogLevel: dropbox.LogOff, // logging in the SDK: LogOff, LogDebug, LogInfo
Client: oAuthClient, // maybe???
@ -1044,6 +1183,13 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
// unknown (i.e. -1) or smaller than uploadChunkSize, the method incurs an
// avoidable request to the Dropbox API that does not carry payload.
func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) {
batching := o.fs.batcher.Start()
defer func() {
batchErr := o.fs.batcher.End(batching)
if err != nil {
err = batchErr
}
}()
chunkSize := int64(o.fs.opt.ChunkSize)
chunks := 0
if size != -1 {
@ -1062,6 +1208,10 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
}
}
appendArg := files.UploadSessionAppendArg{
Close: chunks == 1,
}
// write the first chunk
fmtChunk(1, false)
var res *files.UploadSessionStartResult
@ -1071,7 +1221,10 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
if _, err = chunk.Seek(0, io.SeekStart); err != nil {
return false, nil
}
res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, chunk)
arg := files.UploadSessionStartArg{
Close: appendArg.Close,
}
res, err = o.fs.srv.UploadSessionStart(&arg, chunk)
return shouldRetry(err)
})
if err != nil {
@ -1082,22 +1235,34 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
SessionId: res.SessionId,
Offset: 0,
}
appendArg := files.UploadSessionAppendArg{
Cursor: &cursor,
Close: false,
}
appendArg.Cursor = &cursor
// write more whole chunks (if any)
// write more whole chunks (if any, and if !batching), if
// batching write the last chunk also.
currentChunk := 2
for {
if chunks > 0 && currentChunk >= chunks {
// if the size is known, only upload full chunks. Remaining bytes are uploaded with
// the UploadSessionFinish request.
break
} else if chunks == 0 && in.BytesRead()-cursor.Offset < uint64(chunkSize) {
// if the size is unknown, upload as long as we can read full chunks from the reader.
// The UploadSessionFinish request will not contain any payload.
break
if chunks > 0 {
// Size known
if currentChunk == chunks {
// Last chunk
if !batching {
// if the size is known, only upload full chunks. Remaining bytes are uploaded with
// the UploadSessionFinish request.
break
}
appendArg.Close = true
} else if currentChunk > chunks {
break
}
} else {
// Size unknown
lastReadWasShort := in.BytesRead()-cursor.Offset < uint64(chunkSize)
if lastReadWasShort {
// if the size is unknown, upload as long as we can read full chunks from the reader.
// The UploadSessionFinish request will not contain any payload.
// This is also what we want if batching
break
}
}
cursor.Offset = in.BytesRead()
fmtChunk(currentChunk, false)
@ -1123,6 +1288,26 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
Cursor: &cursor,
Commit: commitInfo,
}
// If we are batching then we should have written all the data now
// store the commit info now for a batch commit
if batching {
// If we haven't closed the session then we need to
if !appendArg.Close {
fs.Debugf(o, "Closing session")
var empty bytes.Buffer
err = o.fs.pacer.Call(func() (bool, error) {
err = o.fs.srv.UploadSessionAppendV2(&appendArg, &empty)
// after the first chunk is uploaded, we retry everything
return err != nil, err
})
if err != nil {
return nil, err
}
}
o.fs.batcher.Add(args)
return nil, nil
}
fmtChunk(currentChunk, true)
chunk = readers.NewRepeatableReaderBuffer(in, buf)
err = o.fs.pacer.Call(func() (bool, error) {
@ -1165,7 +1350,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
size := src.Size()
var err error
var entry *files.FileMetadata
if size > int64(o.fs.opt.ChunkSize) || size == -1 {
if size > int64(o.fs.opt.ChunkSize) || size == -1 || o.fs.opt.Batch > 0 {
entry, err = o.uploadChunked(in, commitInfo, size)
} else {
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
@ -1176,6 +1361,13 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if err != nil {
return errors.Wrap(err, "upload failed")
}
// If we haven't received data back from batch upload then fake it
if entry == nil {
o.bytes = size
o.modTime = commitInfo.ClientModified
o.hash = "" // we don't have this
return nil
}
return o.setMetadataFromEntry(entry)
}