From 9d335eb5cbd780b61a8367e629afec920258bb15 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sun, 28 May 2017 17:55:18 +0100 Subject: [PATCH] dropbox: add low level retries --- dropbox/dropbox.go | 121 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 97 insertions(+), 24 deletions(-) diff --git a/dropbox/dropbox.go b/dropbox/dropbox.go index 69492b3cb..68a42dbea 100644 --- a/dropbox/dropbox.go +++ b/dropbox/dropbox.go @@ -1,7 +1,7 @@ // Package dropbox provides an interface to Dropbox object storage package dropbox -// FIXME put low level retries in +// FIXME buffer chunks for retries in upload // FIXME dropbox for business would be quite easy to add /* @@ -28,19 +28,22 @@ import ( "strings" "time" - "golang.org/x/oauth2" - "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox" "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/files" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/oauthutil" + "github.com/ncw/rclone/pacer" "github.com/pkg/errors" + "golang.org/x/oauth2" ) // Constants const ( rcloneClientID = "5jcck7diasz0rqy" rcloneEncryptedClientSecret = "fRS5vVLr2v6FbyXYnIgjwBuUAt0osq_QZTXAEcmZ7g" + minSleep = 10 * time.Millisecond + maxSleep = 2 * time.Second + decayConstant = 2 // bigger for slower decay, exponential ) var ( @@ -95,6 +98,7 @@ type Fs struct { srv files.Client // the connection to the dropbox server slashRoot string // root with "/" prefix, lowercase slashRootSlash string // root with "/" prefix and postfix, lowercase + pacer *pacer.Pacer // To pace the API calls } // Object describes a dropbox object @@ -130,6 +134,20 @@ func (f *Fs) Features() *fs.Features { return f.features } +// shouldRetry returns a boolean as to whether this err deserves to be +// retried. It returns the err as a convenience +func shouldRetry(err error) (bool, error) { + if err == nil { + return false, err + } + baseErrString := errors.Cause(err).Error() + // FIXME there is probably a better way of doing this! + if strings.Contains(baseErrString, "too_many_write_operations") || strings.Contains(baseErrString, "too_many_requests") { + return true, err + } + return fs.ShouldRetry(err), err +} + // NewFs contstructs an Fs from the path, container:path func NewFs(name, root string) (fs.Fs, error) { if uploadChunkSize > maxUploadChunkSize { @@ -160,8 +178,9 @@ func NewFs(name, root string) (fs.Fs, error) { srv := files.New(config) f := &Fs{ - name: name, - srv: srv, + name: name, + srv: srv, + pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), } f.features = (&fs.Features{CaseInsensitive: true, ReadMimeType: true}).Fill(f) f.setRoot(root) @@ -194,7 +213,10 @@ func (f *Fs) setRoot(root string) { // getMetadata gets the metadata for a file or directory func (f *Fs) getMetadata(objPath string) (entry files.IsMetadata, notFound bool, err error) { - entry, err = f.srv.GetMetadata(&files.GetMetadataArg{Path: objPath}) + err = f.pacer.Call(func() (bool, error) { + entry, err = f.srv.GetMetadata(&files.GetMetadataArg{Path: objPath}) + return shouldRetry(err) + }) if err != nil { switch e := err.(type) { case files.GetMetadataAPIError: @@ -308,7 +330,10 @@ func (f *Fs) list(out fs.ListOpts, dir string, recursive bool) { if root == "/" { arg.Path = "" // Specify root folder as empty string } - res, err = f.srv.ListFolder(&arg) + err = f.pacer.Call(func() (bool, error) { + res, err = f.srv.ListFolder(&arg) + return shouldRetry(err) + }) if err != nil { switch e := err.(type) { case files.ListFolderAPIError: @@ -325,7 +350,10 @@ func (f *Fs) list(out fs.ListOpts, dir string, recursive bool) { arg := files.ListFolderContinueArg{ Cursor: res.Cursor, } - res, err = f.srv.ListFolderContinue(&arg) + err = f.pacer.Call(func() (bool, error) { + res, err = f.srv.ListFolderContinue(&arg) + return shouldRetry(err) + }) if err != nil { out.SetError(errors.Wrap(err, "list continue")) return @@ -454,7 +482,10 @@ func (f *Fs) Mkdir(dir string) error { arg2 := files.CreateFolderArg{ Path: root, } - _, err = f.srv.CreateFolder(&arg2) + err = f.pacer.Call(func() (bool, error) { + _, err = f.srv.CreateFolder(&arg2) + return shouldRetry(err) + }) return err } @@ -483,7 +514,11 @@ func (f *Fs) Rmdir(dir string) error { if root == "/" { arg.Path = "" // Specify root folder as empty string } - res, err := f.srv.ListFolder(&arg) + var res *files.ListFolderResult + err = f.pacer.Call(func() (bool, error) { + res, err = f.srv.ListFolder(&arg) + return shouldRetry(err) + }) if err != nil { return errors.Wrap(err, "Rmdir") } @@ -492,7 +527,10 @@ func (f *Fs) Rmdir(dir string) error { } // remove it - _, err = f.srv.Delete(&files.DeleteArg{Path: root}) + err = f.pacer.Call(func() (bool, error) { + _, err = f.srv.Delete(&files.DeleteArg{Path: root}) + return shouldRetry(err) + }) return err } @@ -527,7 +565,12 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { arg := files.RelocationArg{} arg.FromPath = srcObj.remotePath() arg.ToPath = dstObj.remotePath() - entry, err := f.srv.Copy(&arg) + var err error + var entry files.IsMetadata + err = f.pacer.Call(func() (bool, error) { + entry, err = f.srv.Copy(&arg) + return shouldRetry(err) + }) if err != nil { return nil, errors.Wrap(err, "copy failed") } @@ -550,9 +593,12 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { // Optional interface: Only implement this if you have a way of // deleting all the files quicker than just running Remove() on the // result of List() -func (f *Fs) Purge() error { +func (f *Fs) Purge() (err error) { // Let dropbox delete the filesystem tree - _, err := f.srv.Delete(&files.DeleteArg{Path: f.slashRoot}) + err = f.pacer.Call(func() (bool, error) { + _, err = f.srv.Delete(&files.DeleteArg{Path: f.slashRoot}) + return shouldRetry(err) + }) return err } @@ -582,7 +628,12 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { arg := files.RelocationArg{} arg.FromPath = srcObj.remotePath() arg.ToPath = dstObj.remotePath() - entry, err := f.srv.Move(&arg) + var err error + var entry files.IsMetadata + err = f.pacer.Call(func() (bool, error) { + entry, err = f.srv.Move(&arg) + return shouldRetry(err) + }) if err != nil { return nil, errors.Wrap(err, "move failed") } @@ -631,7 +682,10 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error { arg := files.RelocationArg{} arg.FromPath = srcPath arg.ToPath = dstPath - _, err = f.srv.Move(&arg) + err = f.pacer.Call(func() (bool, error) { + _, err = f.srv.Move(&arg) + return shouldRetry(err) + }) if err != nil { return errors.Wrap(err, "MoveDir failed") } @@ -769,7 +823,10 @@ func (o *Object) Storable() bool { func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { headers := fs.OpenOptionHeaders(options) arg := files.DownloadArg{Path: o.remotePath(), ExtraHeaders: headers} - _, in, err = o.fs.srv.Download(&arg) + err = o.fs.pacer.Call(func() (bool, error) { + _, in, err = o.fs.srv.Download(&arg) + return shouldRetry(err) + }) switch e := err.(type) { case files.DownloadAPIError: @@ -786,14 +843,18 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { // // Call only if size is >= uploadChunkSize // -// FIXME rework for retries +// FIXME buffer chunks to improve upload retries func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) { chunkSize := int64(uploadChunkSize) chunks := int(size/chunkSize) + 1 // write the first whole chunk fs.Debugf(o, "Uploading chunk 1/%d", chunks) - res, err := o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, &io.LimitedReader{R: in, N: chunkSize}) + var res *files.UploadSessionStartResult + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, &io.LimitedReader{R: in, N: chunkSize}) + return shouldRetry(err) + }) if err != nil { return nil, err } @@ -810,7 +871,10 @@ func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size // write more whole chunks (if any) for i := 2; i < chunks; i++ { fs.Debugf(o, "Uploading chunk %d/%d", i, chunks) - err = o.fs.srv.UploadSessionAppendV2(&appendArg, &io.LimitedReader{R: in, N: chunkSize}) + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + err = o.fs.srv.UploadSessionAppendV2(&appendArg, &io.LimitedReader{R: in, N: chunkSize}) + return shouldRetry(err) + }) if err != nil { return nil, err } @@ -823,7 +887,10 @@ func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size Commit: commitInfo, } fs.Debugf(o, "Uploading chunk %d/%d", chunks, chunks) - entry, err = o.fs.srv.UploadSessionFinish(args, in) + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + entry, err = o.fs.srv.UploadSessionFinish(args, in) + return shouldRetry(err) + }) if err != nil { return nil, err } @@ -852,7 +919,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio if size > int64(uploadChunkSize) { entry, err = o.uploadChunked(in, commitInfo, size) } else { - entry, err = o.fs.srv.Upload(commitInfo, in) + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + entry, err = o.fs.srv.Upload(commitInfo, in) + return shouldRetry(err) + }) } if err != nil { return errors.Wrap(err, "upload failed") @@ -861,8 +931,11 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio } // Remove an object -func (o *Object) Remove() error { - _, err := o.fs.srv.Delete(&files.DeleteArg{Path: o.remotePath()}) +func (o *Object) Remove() (err error) { + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + _, err = o.fs.srv.Delete(&files.DeleteArg{Path: o.remotePath()}) + return shouldRetry(err) + }) return err }