forked from TrueCloudLab/rclone
Factor pacer module from Drive and use it in Amazon Cloud Drive for
smooth API pacing and retry logic.
This commit is contained in:
parent
14f814b806
commit
073d112204
4 changed files with 296 additions and 188 deletions
|
@ -9,24 +9,22 @@ FIXME make searching for directory in id and file in id more efficient
|
|||
|
||||
FIXME make the default for no files and no dirs be (FILE & FOLDER) so
|
||||
we ignore assets completely!
|
||||
|
||||
*/
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ncw/go-acd"
|
||||
"github.com/ncw/rclone/dircache"
|
||||
"github.com/ncw/rclone/fs"
|
||||
"github.com/ncw/rclone/oauthutil"
|
||||
"github.com/ncw/rclone/pacer"
|
||||
"golang.org/x/oauth2"
|
||||
)
|
||||
|
||||
|
@ -40,8 +38,9 @@ const (
|
|||
assetKind = "ASSET"
|
||||
statusAvailable = "AVAILABLE"
|
||||
timeFormat = time.RFC3339 // 2014-03-07T22:31:12.173Z
|
||||
minBackoff = 1 * time.Second
|
||||
maxBackoff = 256 * time.Second
|
||||
minSleep = 100 * time.Millisecond
|
||||
maxSleep = 256 * time.Second
|
||||
decayConstant = 2 // bigger for slower decay, exponential
|
||||
)
|
||||
|
||||
// Globals
|
||||
|
@ -57,7 +56,6 @@ var (
|
|||
ClientSecret: fs.Reveal(rcloneClientSecret),
|
||||
RedirectURL: redirectURL,
|
||||
}
|
||||
FIXME = fmt.Errorf("FIXME not implemented")
|
||||
)
|
||||
|
||||
// Register with Fs
|
||||
|
@ -87,9 +85,7 @@ type FsAcd struct {
|
|||
c *acd.Client // the connection to the acd server
|
||||
root string // the path we are working on
|
||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||
|
||||
backoffLock sync.Mutex
|
||||
backoff time.Duration // current backoff
|
||||
pacer *pacer.Pacer // pacer for API calls
|
||||
}
|
||||
|
||||
// FsObjectAcd describes a acd object
|
||||
|
@ -127,6 +123,17 @@ func parsePath(path string) (root string) {
|
|||
return
|
||||
}
|
||||
|
||||
// shouldRetry returns a boolean as to whether this resp and err
|
||||
// deserve to be retried. It returns the err as a convenience
|
||||
func shouldRetry(resp *http.Response, err error) (bool, error) {
|
||||
// FIXME retry other http codes?
|
||||
// 409 conflict ?
|
||||
if err != nil && resp != nil && resp.StatusCode == 429 {
|
||||
return true, err
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
// NewFs contstructs an FsAcd from the path, container:path
|
||||
func NewFs(name, root string) (fs.Fs, error) {
|
||||
root = parsePath(root)
|
||||
|
@ -138,19 +145,28 @@ func NewFs(name, root string) (fs.Fs, error) {
|
|||
c := acd.NewClient(oAuthClient)
|
||||
c.UserAgent = fs.UserAgent
|
||||
f := &FsAcd{
|
||||
name: name,
|
||||
root: root,
|
||||
c: c,
|
||||
name: name,
|
||||
root: root,
|
||||
c: c,
|
||||
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
|
||||
}
|
||||
|
||||
// Update endpoints
|
||||
_, _, err = f.c.Account.GetEndpoints()
|
||||
var resp *http.Response
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
_, resp, err = f.c.Account.GetEndpoints()
|
||||
return shouldRetry(resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to get endpoints: %v", err)
|
||||
}
|
||||
|
||||
// Get rootID
|
||||
rootInfo, _, err := f.c.Nodes.GetRoot()
|
||||
var rootInfo *acd.Folder
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
rootInfo, resp, err = f.c.Nodes.GetRoot()
|
||||
return shouldRetry(resp, err)
|
||||
})
|
||||
if err != nil || rootInfo.Id == nil {
|
||||
return nil, fmt.Errorf("Failed to get root: %v", err)
|
||||
}
|
||||
|
@ -210,46 +226,16 @@ func (f *FsAcd) NewFsObject(remote string) fs.Object {
|
|||
return f.newFsObjectWithInfo(remote, nil)
|
||||
}
|
||||
|
||||
// errChk checks the response and if it is a 429 error returns a
|
||||
// RetryError, otherwise it returns just a plain error
|
||||
//
|
||||
// It also implements the backoff strategy
|
||||
func (f *FsAcd) errChk(resp *http.Response, err error) error {
|
||||
if err != nil && resp != nil && resp.StatusCode == 429 {
|
||||
// Update backoff
|
||||
f.backoffLock.Lock()
|
||||
backoff := f.backoff
|
||||
if backoff == 0 {
|
||||
backoff = minBackoff
|
||||
} else {
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
f.backoff = backoff
|
||||
f.backoffLock.Unlock()
|
||||
// Sleep for the backoff time
|
||||
sleepTime := time.Duration(rand.Int63n(int64(backoff)))
|
||||
fs.Debug(f, "Retry error: backoff is now %v, sleeping for %v", backoff, sleepTime)
|
||||
time.Sleep(sleepTime)
|
||||
return fs.RetryError(err)
|
||||
}
|
||||
// Reset backoff on success
|
||||
if err == nil {
|
||||
f.backoffLock.Lock()
|
||||
f.backoff = 0
|
||||
f.backoffLock.Unlock()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// FindLeaf finds a directory of name leaf in the folder with ID pathId
|
||||
func (f *FsAcd) FindLeaf(pathId, leaf string) (pathIdOut string, found bool, err error) {
|
||||
//fs.Debug(f, "FindLeaf(%q, %q)", pathId, leaf)
|
||||
folder := acd.FolderFromId(pathId, f.c.Nodes)
|
||||
subFolder, _, err := folder.GetFolder(leaf)
|
||||
// err = f.errChk(resp, err)
|
||||
var resp *http.Response
|
||||
var subFolder *acd.Folder
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
subFolder, resp, err = folder.GetFolder(leaf)
|
||||
return shouldRetry(resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
if err == acd.ErrorNodeNotFound {
|
||||
//fs.Debug(f, "...Not found")
|
||||
|
@ -269,15 +255,19 @@ func (f *FsAcd) FindLeaf(pathId, leaf string) (pathIdOut string, found bool, err
|
|||
|
||||
// CreateDir makes a directory with pathId as parent and name leaf
|
||||
func (f *FsAcd) CreateDir(pathId, leaf string) (newId string, err error) {
|
||||
//fs.Debug(f, "CreateDir(%q, %q)", pathId, leaf)
|
||||
//fmt.Printf("CreateDir(%q, %q)\n", pathId, leaf)
|
||||
folder := acd.FolderFromId(pathId, f.c.Nodes)
|
||||
info, _, err := folder.CreateFolder(leaf)
|
||||
// err = f.errChk(resp, err)
|
||||
var resp *http.Response
|
||||
var info *acd.Folder
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
info, resp, err = folder.CreateFolder(leaf)
|
||||
return shouldRetry(resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
fs.Debug(f, "...Error %v", err)
|
||||
//fmt.Printf("...Error %v\n", err)
|
||||
return "", err
|
||||
}
|
||||
//fs.Debug(f, "...Id %q", *info.Id)
|
||||
//fmt.Printf("...Id %q\n", *info.Id)
|
||||
return *info.Id, nil
|
||||
}
|
||||
|
||||
|
@ -310,8 +300,11 @@ func (f *FsAcd) listAll(dirId string, title string, directoriesOnly bool, filesO
|
|||
//var resp *http.Response
|
||||
OUTER:
|
||||
for {
|
||||
nodes, _, err = f.c.Nodes.GetNodes(&opts)
|
||||
// err = f.errChk(resp, err)
|
||||
var resp *http.Response
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
nodes, resp, err = f.c.Nodes.GetNodes(&opts)
|
||||
return shouldRetry(resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
fs.Stats.Error()
|
||||
fs.ErrorLog(f, "Couldn't list files: %v", err)
|
||||
|
@ -440,12 +433,14 @@ func (f *FsAcd) Put(in io.Reader, remote string, modTime time.Time, size int64)
|
|||
folder := acd.FolderFromId(directoryID, o.acd.c.Nodes)
|
||||
var info *acd.File
|
||||
var resp *http.Response
|
||||
if size != 0 {
|
||||
info, resp, err = folder.Put(in, leaf)
|
||||
} else {
|
||||
info, resp, err = folder.PutSized(in, size, leaf)
|
||||
}
|
||||
err = f.errChk(resp, err)
|
||||
err = f.pacer.CallNoRetry(func() (bool, error) {
|
||||
if size != 0 {
|
||||
info, resp, err = folder.Put(in, leaf)
|
||||
} else {
|
||||
info, resp, err = folder.PutSized(in, size, leaf)
|
||||
}
|
||||
return shouldRetry(resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -497,8 +492,10 @@ func (f *FsAcd) purgeCheck(check bool) error {
|
|||
|
||||
node := acd.NodeFromId(rootID, f.c.Nodes)
|
||||
var resp *http.Response
|
||||
resp, err = node.Trash()
|
||||
err = f.errChk(resp, err)
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
resp, err = node.Trash()
|
||||
return shouldRetry(resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -599,8 +596,12 @@ func (o *FsObjectAcd) readMetaData() (err error) {
|
|||
return err
|
||||
}
|
||||
folder := acd.FolderFromId(directoryID, o.acd.c.Nodes)
|
||||
info, _, err := folder.GetFile(leaf)
|
||||
// err = o.acd.errChk(resp, err)
|
||||
var resp *http.Response
|
||||
var info *acd.File
|
||||
err = o.acd.pacer.Call(func() (bool, error) {
|
||||
info, resp, err = folder.GetFile(leaf)
|
||||
return shouldRetry(resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
fs.Debug(o, "Failed to read info: %s", err)
|
||||
return err
|
||||
|
@ -643,8 +644,10 @@ func (o *FsObjectAcd) Storable() bool {
|
|||
func (o *FsObjectAcd) Open() (in io.ReadCloser, err error) {
|
||||
file := acd.File{Node: o.info}
|
||||
var resp *http.Response
|
||||
in, resp, err = file.Open()
|
||||
err = o.acd.errChk(resp, err)
|
||||
err = o.acd.pacer.Call(func() (bool, error) {
|
||||
in, resp, err = file.Open()
|
||||
return shouldRetry(resp, err)
|
||||
})
|
||||
return in, err
|
||||
}
|
||||
|
||||
|
@ -656,12 +659,14 @@ func (o *FsObjectAcd) Update(in io.Reader, modTime time.Time, size int64) error
|
|||
var info *acd.File
|
||||
var resp *http.Response
|
||||
var err error
|
||||
if size != 0 {
|
||||
info, resp, err = file.OverwriteSized(in, size)
|
||||
} else {
|
||||
info, resp, err = file.Overwrite(in)
|
||||
}
|
||||
err = o.acd.errChk(resp, err)
|
||||
err = o.acd.pacer.CallNoRetry(func() (bool, error) {
|
||||
if size != 0 {
|
||||
info, resp, err = file.OverwriteSized(in, size)
|
||||
} else {
|
||||
info, resp, err = file.Overwrite(in)
|
||||
}
|
||||
return shouldRetry(resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -671,8 +676,12 @@ func (o *FsObjectAcd) Update(in io.Reader, modTime time.Time, size int64) error
|
|||
|
||||
// Remove an object
|
||||
func (o *FsObjectAcd) Remove() error {
|
||||
resp, err := o.info.Trash()
|
||||
err = o.acd.errChk(resp, err)
|
||||
var resp *http.Response
|
||||
var err error
|
||||
err = o.acd.pacer.Call(func() (bool, error) {
|
||||
resp, err = o.info.Trash()
|
||||
return shouldRetry(resp, err)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
148
drive/drive.go
148
drive/drive.go
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/ncw/rclone/dircache"
|
||||
"github.com/ncw/rclone/fs"
|
||||
"github.com/ncw/rclone/oauthutil"
|
||||
"github.com/ncw/rclone/pacer"
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
|
@ -82,14 +83,13 @@ func init() {
|
|||
|
||||
// FsDrive represents a remote drive server
|
||||
type FsDrive struct {
|
||||
name string // name of this remote
|
||||
svc *drive.Service // the connection to the drive server
|
||||
root string // the path we are working on
|
||||
client *http.Client // authorized client
|
||||
about *drive.About // information about the drive, including the root
|
||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||
pacer chan struct{} // To pace the operations
|
||||
sleepTime time.Duration // Time to sleep for each transaction
|
||||
name string // name of this remote
|
||||
svc *drive.Service // the connection to the drive server
|
||||
root string // the path we are working on
|
||||
client *http.Client // authorized client
|
||||
about *drive.About // information about the drive, including the root
|
||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||
pacer *pacer.Pacer // To pace the API calls
|
||||
}
|
||||
|
||||
// FsObjectDrive describes a drive object
|
||||
|
@ -120,46 +120,11 @@ func (f *FsDrive) String() string {
|
|||
return fmt.Sprintf("Google drive root '%s'", f.root)
|
||||
}
|
||||
|
||||
// Start a call to the drive API
|
||||
//
|
||||
// This must be called as a pair with endCall
|
||||
//
|
||||
// This waits for the pacer token
|
||||
func (f *FsDrive) beginCall() {
|
||||
// pacer starts with a token in and whenever we take one out
|
||||
// XXX ms later we put another in. We could do this with a
|
||||
// Ticker more accurately, but then we'd have to work out how
|
||||
// not to run it when it wasn't needed
|
||||
<-f.pacer
|
||||
|
||||
// Restart the timer
|
||||
go func(t time.Duration) {
|
||||
// fs.Debug(f, "New sleep for %v at %v", t, time.Now())
|
||||
time.Sleep(t)
|
||||
f.pacer <- struct{}{}
|
||||
}(f.sleepTime)
|
||||
}
|
||||
|
||||
// End a call to the drive API
|
||||
//
|
||||
// Refresh the pace given an error that was returned. It returns a
|
||||
// boolean as to whether the operation should be retried.
|
||||
//
|
||||
// See https://developers.google.com/drive/web/handle-errors
|
||||
// http://stackoverflow.com/questions/18529524/403-rate-limit-after-only-1-insert-per-second
|
||||
func (f *FsDrive) endCall(err error) bool {
|
||||
again := false
|
||||
oldSleepTime := f.sleepTime
|
||||
if err == nil {
|
||||
f.sleepTime = (f.sleepTime<<decayConstant - f.sleepTime) >> decayConstant
|
||||
if f.sleepTime < minSleep {
|
||||
f.sleepTime = minSleep
|
||||
}
|
||||
if f.sleepTime != oldSleepTime {
|
||||
fs.Debug(f, "Reducing sleep to %v", f.sleepTime)
|
||||
}
|
||||
} else {
|
||||
fs.Debug(f, "Error recived: %T %#v", err, err)
|
||||
// shouldRetry determines whehter a given err rates being retried
|
||||
func shouldRetry(err error) (again bool, errOut error) {
|
||||
again = false
|
||||
errOut = err
|
||||
if err != nil {
|
||||
// Check for net error Timeout()
|
||||
if x, ok := err.(interface {
|
||||
Timeout() bool
|
||||
|
@ -185,30 +150,7 @@ func (f *FsDrive) endCall(err error) bool {
|
|||
}
|
||||
}
|
||||
}
|
||||
if again {
|
||||
f.sleepTime *= 2
|
||||
if f.sleepTime > maxSleep {
|
||||
f.sleepTime = maxSleep
|
||||
}
|
||||
if f.sleepTime != oldSleepTime {
|
||||
fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime)
|
||||
}
|
||||
}
|
||||
return again
|
||||
}
|
||||
|
||||
// Pace the remote operations to not exceed Google's limits and retry
|
||||
// on 403 rate limit exceeded
|
||||
//
|
||||
// This calls fn, expecting it to place its error in perr
|
||||
func (f *FsDrive) call(perr *error, fn func()) {
|
||||
for {
|
||||
f.beginCall()
|
||||
fn()
|
||||
if !f.endCall(*perr) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return again, err
|
||||
}
|
||||
|
||||
// parseParse parses a drive 'url'
|
||||
|
@ -249,8 +191,9 @@ func (f *FsDrive) listAll(dirId string, title string, directoriesOnly bool, file
|
|||
OUTER:
|
||||
for {
|
||||
var files *drive.FileList
|
||||
f.call(&err, func() {
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
files, err = list.Do()
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Couldn't list directory: %s", err)
|
||||
|
@ -301,15 +244,11 @@ func NewFs(name, path string) (fs.Fs, error) {
|
|||
}
|
||||
|
||||
f := &FsDrive{
|
||||
name: name,
|
||||
root: root,
|
||||
pacer: make(chan struct{}, 1),
|
||||
sleepTime: minSleep,
|
||||
name: name,
|
||||
root: root,
|
||||
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
|
||||
}
|
||||
|
||||
// Put the first pacing token in
|
||||
f.pacer <- struct{}{}
|
||||
|
||||
// Create a new authorized Drive client.
|
||||
f.client = oAuthClient
|
||||
f.svc, err = drive.New(f.client)
|
||||
|
@ -318,8 +257,9 @@ func NewFs(name, path string) (fs.Fs, error) {
|
|||
}
|
||||
|
||||
// Read About so we know the root path
|
||||
f.call(&err, func() {
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
f.about, err = f.svc.About.Get().Do()
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Couldn't read info about Drive: %s", err)
|
||||
|
@ -411,8 +351,9 @@ func (f *FsDrive) CreateDir(pathId, leaf string) (newId string, err error) {
|
|||
Parents: []*drive.ParentReference{{Id: pathId}},
|
||||
}
|
||||
var info *drive.File
|
||||
f.call(&err, func() {
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
info, err = f.svc.Files.Insert(createInfo).Do()
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
@ -625,13 +566,12 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64
|
|||
if size == 0 || size < int64(driveUploadCutoff) {
|
||||
// Make the API request to upload metadata and file data.
|
||||
// Don't retry, return a retry error instead
|
||||
f.beginCall()
|
||||
info, err = f.svc.Files.Insert(createInfo).Media(in).Do()
|
||||
if f.endCall(err) {
|
||||
return o, fs.RetryErrorf("Upload failed - retry: %s", err)
|
||||
}
|
||||
err = f.pacer.CallNoRetry(func() (bool, error) {
|
||||
info, err = f.svc.Files.Insert(createInfo).Media(in).Do()
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return o, fmt.Errorf("Upload failed: %s", err)
|
||||
return o, err
|
||||
}
|
||||
} else {
|
||||
// Upload the file in chunks
|
||||
|
@ -658,8 +598,9 @@ func (f *FsDrive) Rmdir() error {
|
|||
return err
|
||||
}
|
||||
var children *drive.ChildList
|
||||
f.call(&err, func() {
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
children, err = f.svc.Children.List(f.dirCache.RootID()).MaxResults(10).Do()
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -669,12 +610,13 @@ func (f *FsDrive) Rmdir() error {
|
|||
}
|
||||
// Delete the directory if it isn't the root
|
||||
if f.root != "" {
|
||||
f.call(&err, func() {
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
if *driveUseTrash {
|
||||
_, err = f.svc.Files.Trash(f.dirCache.RootID()).Do()
|
||||
} else {
|
||||
err = f.svc.Files.Delete(f.dirCache.RootID()).Do()
|
||||
}
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -711,8 +653,9 @@ func (f *FsDrive) Copy(src fs.Object, remote string) (fs.Object, error) {
|
|||
}
|
||||
|
||||
var info *drive.File
|
||||
o.drive.call(&err, func() {
|
||||
err = o.drive.pacer.Call(func() (bool, error) {
|
||||
info, err = o.drive.svc.Files.Copy(srcObj.id, createInfo).Do()
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -735,12 +678,13 @@ func (f *FsDrive) Purge() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f.call(&err, func() {
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
if *driveUseTrash {
|
||||
_, err = f.svc.Files.Trash(f.dirCache.RootID()).Do()
|
||||
} else {
|
||||
err = f.svc.Files.Delete(f.dirCache.RootID()).Do()
|
||||
}
|
||||
return shouldRetry(err)
|
||||
})
|
||||
f.dirCache.ResetRoot()
|
||||
if err != nil {
|
||||
|
@ -921,8 +865,9 @@ func (o *FsObjectDrive) SetModTime(modTime time.Time) {
|
|||
}
|
||||
// Set modified date
|
||||
var info *drive.File
|
||||
o.drive.call(&err, func() {
|
||||
err = o.drive.pacer.Call(func() (bool, error) {
|
||||
info, err = o.drive.svc.Files.Update(o.id, updateInfo).SetModifiedDate(true).Do()
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
fs.Stats.Error()
|
||||
|
@ -949,8 +894,9 @@ func (o *FsObjectDrive) Open() (in io.ReadCloser, err error) {
|
|||
}
|
||||
req.Header.Set("User-Agent", fs.UserAgent)
|
||||
var res *http.Response
|
||||
o.drive.call(&err, func() {
|
||||
err = o.drive.pacer.Call(func() (bool, error) {
|
||||
res, err = o.drive.client.Do(req)
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -978,13 +924,12 @@ func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) erro
|
|||
var info *drive.File
|
||||
if size == 0 || size < int64(driveUploadCutoff) {
|
||||
// Don't retry, return a retry error instead
|
||||
o.drive.beginCall()
|
||||
info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do()
|
||||
if o.drive.endCall(err) {
|
||||
return fs.RetryErrorf("Update failed - retry: %s", err)
|
||||
}
|
||||
err = o.drive.pacer.CallNoRetry(func() (bool, error) {
|
||||
info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do()
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Update failed: %s", err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Upload the file in chunks
|
||||
|
@ -1000,12 +945,13 @@ func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) erro
|
|||
// Remove an object
|
||||
func (o *FsObjectDrive) Remove() error {
|
||||
var err error
|
||||
o.drive.call(&err, func() {
|
||||
err = o.drive.pacer.Call(func() (bool, error) {
|
||||
if *driveUseTrash {
|
||||
_, err = o.drive.svc.Files.Trash(o.id).Do()
|
||||
} else {
|
||||
err = o.drive.svc.Files.Delete(o.id).Do()
|
||||
}
|
||||
return shouldRetry(err)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -78,12 +78,13 @@ func (f *FsDrive) Upload(in io.Reader, size int64, contentType string, info *dri
|
|||
req.Header.Set("X-Upload-Content-Length", fmt.Sprintf("%v", size))
|
||||
req.Header.Set("User-Agent", fs.UserAgent)
|
||||
var res *http.Response
|
||||
f.call(&err, func() {
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
res, err = f.client.Do(req)
|
||||
if err == nil {
|
||||
defer googleapi.CloseBody(res)
|
||||
err = googleapi.CheckResponse(res)
|
||||
}
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -203,19 +204,19 @@ func (rx *resumableUpload) Upload() (*drive.File, error) {
|
|||
}
|
||||
|
||||
// Transfer the chunk
|
||||
for try := 1; try <= maxTries; try++ {
|
||||
fs.Debug(rx.remote, "Sending chunk %d length %d, %d/%d", start, reqSize, try, maxTries)
|
||||
rx.f.beginCall()
|
||||
err = rx.f.pacer.Call(func() (bool, error) {
|
||||
fs.Debug(rx.remote, "Sending chunk %d length %d", start, reqSize)
|
||||
StatusCode, err = rx.transferChunk(start, buf)
|
||||
rx.f.endCall(err)
|
||||
again, err := shouldRetry(err)
|
||||
if StatusCode == statusResumeIncomplete || StatusCode == http.StatusCreated || StatusCode == http.StatusOK {
|
||||
goto success
|
||||
again = false
|
||||
err = nil
|
||||
}
|
||||
fs.Debug(rx.remote, "Retrying chunk %d/%d, code=%d, err=%v", try, maxTries, StatusCode, err)
|
||||
return again, err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fs.Debug(rx.remote, "Failed to send chunk")
|
||||
return nil, fs.RetryErrorf("Chunk upload failed - retry: code=%d, err=%v", StatusCode, err)
|
||||
success:
|
||||
|
||||
start += reqSize
|
||||
}
|
||||
|
|
152
pacer/pacer.go
Normal file
152
pacer/pacer.go
Normal file
|
@ -0,0 +1,152 @@
|
|||
// pacer is a utility package to make pacing and retrying API calls easy
|
||||
package pacer
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/ncw/rclone/fs"
|
||||
)
|
||||
|
||||
type Pacer struct {
|
||||
minSleep time.Duration // minimum sleep time
|
||||
maxSleep time.Duration // maximum sleep time
|
||||
decayConstant uint // decay constant
|
||||
pacer chan struct{} // To pace the operations
|
||||
sleepTime time.Duration // Time to sleep for each transaction
|
||||
retries int // Max number of retries
|
||||
}
|
||||
|
||||
// Paced is a function which is called by the Call and CallNoRetry
|
||||
// methods. It should return a boolean, true if it would like to be
|
||||
// retried, and an error. This error may be returned or returned
|
||||
// wrapped in a RetryError.
|
||||
type Paced func() (bool, error)
|
||||
|
||||
// New returns a Pacer with sensible defaults
|
||||
func New() *Pacer {
|
||||
p := &Pacer{
|
||||
minSleep: 10 * time.Millisecond,
|
||||
maxSleep: 2 * time.Second,
|
||||
decayConstant: 2,
|
||||
retries: 10,
|
||||
pacer: make(chan struct{}, 1),
|
||||
}
|
||||
p.sleepTime = p.minSleep
|
||||
|
||||
// Put the first pacing token in
|
||||
p.pacer <- struct{}{}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// SetMinSleep sets the minimum sleep time for the pacer
|
||||
func (p *Pacer) SetMinSleep(t time.Duration) *Pacer {
|
||||
p.minSleep = t
|
||||
p.sleepTime = p.minSleep
|
||||
return p
|
||||
}
|
||||
|
||||
// SetMaxSleep sets the maximum sleep time for the pacer
|
||||
func (p *Pacer) SetMaxSleep(t time.Duration) *Pacer {
|
||||
p.maxSleep = t
|
||||
p.sleepTime = p.minSleep
|
||||
return p
|
||||
}
|
||||
|
||||
// SetDecayConstant sets the decay constant for the pacer
|
||||
//
|
||||
// This is the speed the time falls back to the minimum after errors
|
||||
// have occurred.
|
||||
//
|
||||
// bigger for slower decay, exponential
|
||||
func (p *Pacer) SetDecayConstant(decay uint) *Pacer {
|
||||
p.decayConstant = decay
|
||||
return p
|
||||
}
|
||||
|
||||
// SetRetries sets the max number of tries for Call
|
||||
func (p *Pacer) SetRetries(retries int) *Pacer {
|
||||
p.retries = retries
|
||||
return p
|
||||
}
|
||||
|
||||
// Start a call to the API
|
||||
//
|
||||
// This must be called as a pair with endCall
|
||||
//
|
||||
// This waits for the pacer token
|
||||
func (p *Pacer) beginCall() {
|
||||
// pacer starts with a token in and whenever we take one out
|
||||
// XXX ms later we put another in. We could do this with a
|
||||
// Ticker more accurately, but then we'd have to work out how
|
||||
// not to run it when it wasn't needed
|
||||
<-p.pacer
|
||||
|
||||
// Restart the timer
|
||||
go func(t time.Duration) {
|
||||
// fs.Debug(f, "New sleep for %v at %v", t, time.Now())
|
||||
time.Sleep(t)
|
||||
p.pacer <- struct{}{}
|
||||
}(p.sleepTime)
|
||||
}
|
||||
|
||||
// End a call to the API
|
||||
//
|
||||
// Refresh the pace given an error that was returned. It returns a
|
||||
// boolean as to whether the operation should be retried.
|
||||
func (p *Pacer) endCall(again bool) {
|
||||
oldSleepTime := p.sleepTime
|
||||
if again {
|
||||
p.sleepTime *= 2
|
||||
if p.sleepTime > p.maxSleep {
|
||||
p.sleepTime = p.maxSleep
|
||||
}
|
||||
if p.sleepTime != oldSleepTime {
|
||||
fs.Debug("pacer", "Rate limited, increasing sleep to %v", p.sleepTime)
|
||||
}
|
||||
} else {
|
||||
p.sleepTime = (p.sleepTime<<p.decayConstant - p.sleepTime) >> p.decayConstant
|
||||
if p.sleepTime < p.minSleep {
|
||||
p.sleepTime = p.minSleep
|
||||
}
|
||||
if p.sleepTime != oldSleepTime {
|
||||
fs.Debug("pacer", "Reducing sleep to %v", p.sleepTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// call implements Call but with settable retries
|
||||
func (p *Pacer) call(fn Paced, retries int) (err error) {
|
||||
var again bool
|
||||
for i := 0; i < retries; i++ {
|
||||
p.beginCall()
|
||||
again, err = fn()
|
||||
p.endCall(again)
|
||||
if !again {
|
||||
break
|
||||
}
|
||||
}
|
||||
if again {
|
||||
err = fs.RetryError(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Call paces the remote operations to not exceed the limits and retry
|
||||
// on rate limit exceeded
|
||||
//
|
||||
// This calls fn, expecting it to return a retry flag and an
|
||||
// error. This error may be returned wrapped in a RetryError if the
|
||||
// number of retries is exceeded.
|
||||
func (p *Pacer) Call(fn Paced) (err error) {
|
||||
return p.call(fn, p.retries)
|
||||
}
|
||||
|
||||
// Pace the remote operations to not exceed Amazon's limits and return
|
||||
// a retry error on rate limit exceeded
|
||||
//
|
||||
// This calls fn and wraps the output in a RetryError if it would like
|
||||
// it to be retried
|
||||
func (p *Pacer) CallNoRetry(fn Paced) error {
|
||||
return p.call(fn, 1)
|
||||
}
|
Loading…
Add table
Reference in a new issue