// This file contains the implementation of the sync batcher for uploads // // Dropbox rules say you can start as many batches as you want, but // you may only have one batch being committed and must wait for the // batch to be finished before committing another. package dropbox import ( "context" "sync" "time" "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/async" "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/files" "github.com/pkg/errors" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/lib/atexit" ) const ( maxBatchSize = 1000 // max size the batch can be defaultTimeoutSync = 500 * time.Millisecond // kick off the batch if nothing added for this long (sync) defaultTimeoutAsync = 10 * time.Second // kick off the batch if nothing added for this long (ssync) defaultBatchSizeAsync = 100 // default batch size if async ) // batcher holds info about the current items waiting for upload type batcher struct { f *Fs // Fs this batch is part of mode string // configured batch mode size int // maximum size for batch timeout time.Duration // idle timeout for batch async bool // whether we are using async batching in chan batcherRequest // incoming items to batch quit chan struct{} // close to quit the loop atexit atexit.FnHandle // atexit handle shutOnce sync.Once // make sure we shutdown once only wg sync.WaitGroup // wait for shutdown } // batcherRequest holds an incoming request with a place for a reply type batcherRequest struct { commitInfo *files.UploadSessionFinishArg result chan<- batcherResponse } // batcherResponse holds a response to be delivered to clients waiting // for a batch to complete. type batcherResponse struct { err error entry *files.FileMetadata } // newBatcher creates a new batcher structure func newBatcher(ctx context.Context, f *Fs, mode string, size int, timeout time.Duration) (*batcher, error) { // fs.Debugf(f, "Creating batcher with mode %q, size %d, timeout %v", mode, size, timeout) if size > maxBatchSize || size < 0 { return nil, errors.Errorf("dropbox: batch size must be < %d and >= 0 - it is currently %d", maxBatchSize, size) } async := false switch mode { case "sync": if size <= 0 { ci := fs.GetConfig(ctx) size = ci.Transfers } if timeout <= 0 { timeout = defaultTimeoutSync } case "async": if size <= 0 { size = defaultBatchSizeAsync } if timeout <= 0 { timeout = defaultTimeoutAsync } async = true case "off": size = 0 default: return nil, errors.Errorf("dropbox: batch mode must be sync|async|off not %q", mode) } b := &batcher{ f: f, mode: mode, size: size, timeout: timeout, async: async, in: make(chan batcherRequest, size), quit: make(chan struct{}), } if b.Batching() { b.atexit = atexit.Register(b.Shutdown) b.wg.Add(1) go b.commitLoop(context.Background()) } return b, nil } // Batching returns true if batching is active func (b *batcher) Batching() bool { return b.size > 0 } // finishBatch commits the batch, returning a batch status to poll or maybe complete func (b *batcher) finishBatch(ctx context.Context, items []*files.UploadSessionFinishArg) (batchStatus *files.UploadSessionFinishBatchLaunch, err error) { var arg = &files.UploadSessionFinishBatchArg{ Entries: items, } err = b.f.pacer.Call(func() (bool, error) { batchStatus, err = b.f.srv.UploadSessionFinishBatch(arg) // If error is insufficient space then don't retry if e, ok := err.(files.UploadSessionFinishAPIError); ok { if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace { err = fserrors.NoRetryError(err) return false, err } } // after the first chunk is uploaded, we retry everything return err != nil, err }) if err != nil { return nil, errors.Wrap(err, "batch commit failed") } return batchStatus, nil } // finishBatchJobStatus waits for the batch to complete returning completed entries func (b *batcher) finishBatchJobStatus(ctx context.Context, launchBatchStatus *files.UploadSessionFinishBatchLaunch) (complete *files.UploadSessionFinishBatchResult, err error) { if launchBatchStatus.AsyncJobId == "" { return nil, errors.New("wait for batch completion: empty job ID") } var batchStatus *files.UploadSessionFinishBatchJobStatus sleepTime := 100 * time.Millisecond const maxTries = 120 for try := 1; try <= maxTries; try++ { err = b.f.pacer.Call(func() (bool, error) { batchStatus, err = b.f.srv.UploadSessionFinishBatchCheck(&async.PollArg{ AsyncJobId: launchBatchStatus.AsyncJobId, }) return shouldRetry(ctx, err) }) if err != nil { fs.Debugf(b.f, "Wait for batch: sleeping for %v after error: %v: try %d/%d", sleepTime, err, try, maxTries) } else { if batchStatus.Tag == "complete" { return batchStatus.Complete, nil } fs.Debugf(b.f, "Wait for batch: sleeping for %v after status: %q: try %d/%d", sleepTime, batchStatus.Tag, try, maxTries) } time.Sleep(sleepTime) sleepTime *= 2 if sleepTime > time.Second { sleepTime = time.Second } } if err == nil { err = errors.New("batch didn't complete") } return nil, errors.Wrapf(err, "wait for batch failed after %d tries", maxTries) } // commit a batch func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionFinishArg, results []chan<- batcherResponse) (err error) { // If commit fails then signal clients if sync var signalled = b.async defer func() { if err != nil && signalled { // Signal to clients that there was an error for _, result := range results { result <- batcherResponse{err: err} } } }() fs.Debugf(b.f, "Committing %s batch length %d", b.mode, len(items)) // finalise the batch getting either a result or a job id to poll batchStatus, err := b.finishBatch(ctx, items) if err != nil { return err } // check whether batch is complete var complete *files.UploadSessionFinishBatchResult switch batchStatus.Tag { case "async_job_id": // wait for batch to complete complete, err = b.finishBatchJobStatus(ctx, batchStatus) if err != nil { return err } case "complete": complete = batchStatus.Complete default: return errors.Errorf("batch returned unknown status %q", batchStatus.Tag) } // Check we got the right number of entries entries := complete.Entries if len(entries) != len(results) { return errors.Errorf("expecting %d items in batch but got %d", len(results), len(entries)) } // Report results to clients var ( errorTag = "" errorCount = 0 ) for i := range results { item := entries[i] resp := batcherResponse{} if item.Tag == "success" { resp.entry = item.Success } else { errorCount++ errorTag = item.Tag if item.Failure != nil { errorTag = item.Failure.Tag if item.Failure.LookupFailed != nil { errorTag += "/" + item.Failure.LookupFailed.Tag } if item.Failure.Path != nil { errorTag += "/" + item.Failure.Path.Tag } if item.Failure.PropertiesError != nil { errorTag += "/" + item.Failure.PropertiesError.Tag } } resp.err = errors.Errorf("batch upload failed: %s", errorTag) } if !b.async { results[i] <- resp } } // Show signalled so no need to report error to clients from now on signalled = true // Report an error if any failed in the batch if errorTag != "" { return errors.Errorf("batch had %d errors: last error: %s", errorCount, errorTag) } return nil } // commitLoop runs the commit engine in the background func (b *batcher) commitLoop(ctx context.Context) { var ( items []*files.UploadSessionFinishArg // current batch of uncommitted files results []chan<- batcherResponse // current batch of clients awaiting results idleTimer = time.NewTimer(b.timeout) commit = func() { err := b.commitBatch(ctx, items, results) if err != nil { fs.Errorf(b.f, "%s batch commit: failed to commit batch length %d: %v", b.mode, len(items), err) } items, results = nil, nil } ) defer b.wg.Done() defer idleTimer.Stop() idleTimer.Stop() outer: for { select { case <-b.quit: break outer case req, ok := <-b.in: if !ok { break outer } items = append(items, req.commitInfo) results = append(results, req.result) idleTimer.Stop() if len(items) >= b.size { commit() } else { idleTimer.Reset(b.timeout) } case <-idleTimer.C: if len(items) > 0 { fs.Debugf(b.f, "Batch idle for %v so committing", b.timeout) commit() } } } // commit any remaining items if len(items) > 0 { commit() } } // Shutdown finishes any pending batches then shuts everything down // // Can be called from atexit handler func (b *batcher) Shutdown() { b.shutOnce.Do(func() { atexit.Unregister(b.atexit) // quit the commitLoop. Note that we don't close b.in // because that will cause write to closed channel close(b.quit) b.wg.Wait() }) } // Commit commits the file using a batch call, first adding it to the // batch and then waiting for the batch to complete in a synchronous // way if async is not set. func (b *batcher) Commit(ctx context.Context, commitInfo *files.UploadSessionFinishArg) (entry *files.FileMetadata, err error) { fs.Debugf(b.f, "Adding %q to batch", commitInfo.Commit.Path) resp := make(chan batcherResponse, 1) b.in <- batcherRequest{ commitInfo: commitInfo, result: resp, } // If running async then don't wait for the result if b.async { return nil, nil } result := <-resp return result.entry, result.err }