forked from TrueCloudLab/rclone
dropbox: add low level retries
This commit is contained in:
parent
20da3e6352
commit
9d335eb5cb
1 changed files with 97 additions and 24 deletions
|
@ -1,7 +1,7 @@
|
|||
// Package dropbox provides an interface to Dropbox object storage
|
||||
package dropbox
|
||||
|
||||
// FIXME put low level retries in
|
||||
// FIXME buffer chunks for retries in upload
|
||||
// FIXME dropbox for business would be quite easy to add
|
||||
|
||||
/*
|
||||
|
@ -28,19 +28,22 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"golang.org/x/oauth2"
|
||||
|
||||
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox"
|
||||
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/files"
|
||||
"github.com/ncw/rclone/fs"
|
||||
"github.com/ncw/rclone/oauthutil"
|
||||
"github.com/ncw/rclone/pacer"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/oauth2"
|
||||
)
|
||||
|
||||
// Constants
|
||||
const (
|
||||
rcloneClientID = "5jcck7diasz0rqy"
|
||||
rcloneEncryptedClientSecret = "fRS5vVLr2v6FbyXYnIgjwBuUAt0osq_QZTXAEcmZ7g"
|
||||
minSleep = 10 * time.Millisecond
|
||||
maxSleep = 2 * time.Second
|
||||
decayConstant = 2 // bigger for slower decay, exponential
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -95,6 +98,7 @@ type Fs struct {
|
|||
srv files.Client // the connection to the dropbox server
|
||||
slashRoot string // root with "/" prefix, lowercase
|
||||
slashRootSlash string // root with "/" prefix and postfix, lowercase
|
||||
pacer *pacer.Pacer // To pace the API calls
|
||||
}
|
||||
|
||||
// Object describes a dropbox object
|
||||
|
@ -130,6 +134,20 @@ func (f *Fs) Features() *fs.Features {
|
|||
return f.features
|
||||
}
|
||||
|
||||
// shouldRetry returns a boolean as to whether this err deserves to be
|
||||
// retried. It returns the err as a convenience
|
||||
func shouldRetry(err error) (bool, error) {
|
||||
if err == nil {
|
||||
return false, err
|
||||
}
|
||||
baseErrString := errors.Cause(err).Error()
|
||||
// FIXME there is probably a better way of doing this!
|
||||
if strings.Contains(baseErrString, "too_many_write_operations") || strings.Contains(baseErrString, "too_many_requests") {
|
||||
return true, err
|
||||
}
|
||||
return fs.ShouldRetry(err), err
|
||||
}
|
||||
|
||||
// NewFs contstructs an Fs from the path, container:path
|
||||
func NewFs(name, root string) (fs.Fs, error) {
|
||||
if uploadChunkSize > maxUploadChunkSize {
|
||||
|
@ -160,8 +178,9 @@ func NewFs(name, root string) (fs.Fs, error) {
|
|||
srv := files.New(config)
|
||||
|
||||
f := &Fs{
|
||||
name: name,
|
||||
srv: srv,
|
||||
name: name,
|
||||
srv: srv,
|
||||
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
|
||||
}
|
||||
f.features = (&fs.Features{CaseInsensitive: true, ReadMimeType: true}).Fill(f)
|
||||
f.setRoot(root)
|
||||
|
@ -194,7 +213,10 @@ func (f *Fs) setRoot(root string) {
|
|||
|
||||
// getMetadata gets the metadata for a file or directory
|
||||
func (f *Fs) getMetadata(objPath string) (entry files.IsMetadata, notFound bool, err error) {
|
||||
entry, err = f.srv.GetMetadata(&files.GetMetadataArg{Path: objPath})
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
entry, err = f.srv.GetMetadata(&files.GetMetadataArg{Path: objPath})
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
switch e := err.(type) {
|
||||
case files.GetMetadataAPIError:
|
||||
|
@ -308,7 +330,10 @@ func (f *Fs) list(out fs.ListOpts, dir string, recursive bool) {
|
|||
if root == "/" {
|
||||
arg.Path = "" // Specify root folder as empty string
|
||||
}
|
||||
res, err = f.srv.ListFolder(&arg)
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
res, err = f.srv.ListFolder(&arg)
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
switch e := err.(type) {
|
||||
case files.ListFolderAPIError:
|
||||
|
@ -325,7 +350,10 @@ func (f *Fs) list(out fs.ListOpts, dir string, recursive bool) {
|
|||
arg := files.ListFolderContinueArg{
|
||||
Cursor: res.Cursor,
|
||||
}
|
||||
res, err = f.srv.ListFolderContinue(&arg)
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
res, err = f.srv.ListFolderContinue(&arg)
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
out.SetError(errors.Wrap(err, "list continue"))
|
||||
return
|
||||
|
@ -454,7 +482,10 @@ func (f *Fs) Mkdir(dir string) error {
|
|||
arg2 := files.CreateFolderArg{
|
||||
Path: root,
|
||||
}
|
||||
_, err = f.srv.CreateFolder(&arg2)
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
_, err = f.srv.CreateFolder(&arg2)
|
||||
return shouldRetry(err)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -483,7 +514,11 @@ func (f *Fs) Rmdir(dir string) error {
|
|||
if root == "/" {
|
||||
arg.Path = "" // Specify root folder as empty string
|
||||
}
|
||||
res, err := f.srv.ListFolder(&arg)
|
||||
var res *files.ListFolderResult
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
res, err = f.srv.ListFolder(&arg)
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Rmdir")
|
||||
}
|
||||
|
@ -492,7 +527,10 @@ func (f *Fs) Rmdir(dir string) error {
|
|||
}
|
||||
|
||||
// remove it
|
||||
_, err = f.srv.Delete(&files.DeleteArg{Path: root})
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
_, err = f.srv.Delete(&files.DeleteArg{Path: root})
|
||||
return shouldRetry(err)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -527,7 +565,12 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
|
|||
arg := files.RelocationArg{}
|
||||
arg.FromPath = srcObj.remotePath()
|
||||
arg.ToPath = dstObj.remotePath()
|
||||
entry, err := f.srv.Copy(&arg)
|
||||
var err error
|
||||
var entry files.IsMetadata
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
entry, err = f.srv.Copy(&arg)
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "copy failed")
|
||||
}
|
||||
|
@ -550,9 +593,12 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
|
|||
// Optional interface: Only implement this if you have a way of
|
||||
// deleting all the files quicker than just running Remove() on the
|
||||
// result of List()
|
||||
func (f *Fs) Purge() error {
|
||||
func (f *Fs) Purge() (err error) {
|
||||
// Let dropbox delete the filesystem tree
|
||||
_, err := f.srv.Delete(&files.DeleteArg{Path: f.slashRoot})
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
_, err = f.srv.Delete(&files.DeleteArg{Path: f.slashRoot})
|
||||
return shouldRetry(err)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -582,7 +628,12 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) {
|
|||
arg := files.RelocationArg{}
|
||||
arg.FromPath = srcObj.remotePath()
|
||||
arg.ToPath = dstObj.remotePath()
|
||||
entry, err := f.srv.Move(&arg)
|
||||
var err error
|
||||
var entry files.IsMetadata
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
entry, err = f.srv.Move(&arg)
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "move failed")
|
||||
}
|
||||
|
@ -631,7 +682,10 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error {
|
|||
arg := files.RelocationArg{}
|
||||
arg.FromPath = srcPath
|
||||
arg.ToPath = dstPath
|
||||
_, err = f.srv.Move(&arg)
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
_, err = f.srv.Move(&arg)
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "MoveDir failed")
|
||||
}
|
||||
|
@ -769,7 +823,10 @@ func (o *Object) Storable() bool {
|
|||
func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
||||
headers := fs.OpenOptionHeaders(options)
|
||||
arg := files.DownloadArg{Path: o.remotePath(), ExtraHeaders: headers}
|
||||
_, in, err = o.fs.srv.Download(&arg)
|
||||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
_, in, err = o.fs.srv.Download(&arg)
|
||||
return shouldRetry(err)
|
||||
})
|
||||
|
||||
switch e := err.(type) {
|
||||
case files.DownloadAPIError:
|
||||
|
@ -786,14 +843,18 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
|||
//
|
||||
// Call only if size is >= uploadChunkSize
|
||||
//
|
||||
// FIXME rework for retries
|
||||
// FIXME buffer chunks to improve upload retries
|
||||
func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) {
|
||||
chunkSize := int64(uploadChunkSize)
|
||||
chunks := int(size/chunkSize) + 1
|
||||
|
||||
// write the first whole chunk
|
||||
fs.Debugf(o, "Uploading chunk 1/%d", chunks)
|
||||
res, err := o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, &io.LimitedReader{R: in, N: chunkSize})
|
||||
var res *files.UploadSessionStartResult
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, &io.LimitedReader{R: in, N: chunkSize})
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -810,7 +871,10 @@ func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size
|
|||
// write more whole chunks (if any)
|
||||
for i := 2; i < chunks; i++ {
|
||||
fs.Debugf(o, "Uploading chunk %d/%d", i, chunks)
|
||||
err = o.fs.srv.UploadSessionAppendV2(&appendArg, &io.LimitedReader{R: in, N: chunkSize})
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
err = o.fs.srv.UploadSessionAppendV2(&appendArg, &io.LimitedReader{R: in, N: chunkSize})
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -823,7 +887,10 @@ func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size
|
|||
Commit: commitInfo,
|
||||
}
|
||||
fs.Debugf(o, "Uploading chunk %d/%d", chunks, chunks)
|
||||
entry, err = o.fs.srv.UploadSessionFinish(args, in)
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
entry, err = o.fs.srv.UploadSessionFinish(args, in)
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -852,7 +919,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
|
|||
if size > int64(uploadChunkSize) {
|
||||
entry, err = o.uploadChunked(in, commitInfo, size)
|
||||
} else {
|
||||
entry, err = o.fs.srv.Upload(commitInfo, in)
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
entry, err = o.fs.srv.Upload(commitInfo, in)
|
||||
return shouldRetry(err)
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "upload failed")
|
||||
|
@ -861,8 +931,11 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
|
|||
}
|
||||
|
||||
// Remove an object
|
||||
func (o *Object) Remove() error {
|
||||
_, err := o.fs.srv.Delete(&files.DeleteArg{Path: o.remotePath()})
|
||||
func (o *Object) Remove() (err error) {
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
_, err = o.fs.srv.Delete(&files.DeleteArg{Path: o.remotePath()})
|
||||
return shouldRetry(err)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue