drive: fix rate limit exceeded errors - fixes #20

This is done by pacing the requests to drive and backing them off
using an exponential increase.  Put and Modify operations can now be
retried also.
This commit is contained in:
Nick Craig-Wood 2015-02-02 17:29:08 +00:00
parent 20535348db
commit 5fb6f94579
5 changed files with 176 additions and 33 deletions

View file

@ -254,9 +254,6 @@ COPYING file included in this package).
Bugs Bugs
---- ----
* Drive: Sometimes get: Failed to copy: Upload failed: googleapi: Error 403: Rate Limit Exceeded
* quota is 100.0 requests/second/user
* just retry the command if this happens
* Empty directories left behind with Local and Drive * Empty directories left behind with Local and Drive
* eg purging a local directory with subdirectories doesn't work * eg purging a local directory with subdirectories doesn't work

View file

@ -132,8 +132,6 @@ COPYING file included in this package).
Bugs Bugs
---- ----
* Drive: Sometimes get: Failed to copy: Upload failed: googleapi: Error 403: Rate Limit Exceeded
* quota is 100.0 requests/second/user
* Empty directories left behind with Local and Drive * Empty directories left behind with Local and Drive
* eg purging a local directory with subdirectories doesn't work * eg purging a local directory with subdirectories doesn't work

View file

@ -1,14 +1,6 @@
// Drive interface // Drive interface
package drive package drive
// Gets this quite often
// Failed to set mtime: googleapi: Error 403: Rate Limit Exceeded
// FIXME list containers equivalent should list directories?
// FIXME list directory should list to channel for concurrency not
// append to array
// FIXME need to deal with some corner cases // FIXME need to deal with some corner cases
// * multiple files with the same name // * multiple files with the same name
// * files can be in multiple directories // * files can be in multiple directories
@ -27,6 +19,7 @@ import (
"time" "time"
"google.golang.org/api/drive/v2" "google.golang.org/api/drive/v2"
"google.golang.org/api/googleapi"
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/ncw/rclone/googleauth" "github.com/ncw/rclone/googleauth"
@ -40,6 +33,9 @@ const (
driveFolderType = "application/vnd.google-apps.folder" driveFolderType = "application/vnd.google-apps.folder"
timeFormatIn = time.RFC3339 timeFormatIn = time.RFC3339
timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00" timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00"
minSleep = 10 * time.Millisecond
maxSleep = 2 * time.Second
decayConstant = 2 // bigger for slower decay, exponential
) )
// Globals // Globals
@ -83,6 +79,8 @@ type FsDrive struct {
findRootLock sync.Mutex // Protect findRoot from concurrent use findRootLock sync.Mutex // Protect findRoot from concurrent use
dirCache dirCache // Map of directory path to directory id dirCache dirCache // Map of directory path to directory id
findDirLock sync.Mutex // Protect findDir from concurrent use findDirLock sync.Mutex // Protect findDir from concurrent use
pacer chan struct{} // To pace the operations
sleepTime time.Duration // Time to sleep for each transaction
} }
// FsObjectDrive describes a drive object // FsObjectDrive describes a drive object
@ -149,6 +147,74 @@ func (f *FsDrive) String() string {
return fmt.Sprintf("Google drive root '%s'", f.root) return fmt.Sprintf("Google drive root '%s'", f.root)
} }
// Wait for the pace
func (f *FsDrive) paceWait() {
// pacer starts with a token in and whenever we take one out
// XXX ms later we put another in. We could do this with a
// Ticker more accurately, but then we'd have to work out how
// not to run it when it wasn't needed
<-f.pacer
// Restart the timer
go func(t time.Duration) {
// fs.Debug(f, "New sleep for %v at %v", t, time.Now())
time.Sleep(t)
f.pacer <- struct{}{}
}(f.sleepTime)
}
// Refresh the pace given an error that was returned. It returns a
// boolean as to whether the operation should be retried.
//
// See https://developers.google.com/drive/web/handle-errors
// http://stackoverflow.com/questions/18529524/403-rate-limit-after-only-1-insert-per-second
func (f *FsDrive) paceRefresh(err error) bool {
again := false
oldSleepTime := f.sleepTime
if err == nil {
f.sleepTime = (f.sleepTime<<decayConstant - f.sleepTime) >> decayConstant
if f.sleepTime < minSleep {
f.sleepTime = minSleep
}
if f.sleepTime != oldSleepTime {
fs.Debug(f, "Reducing sleep to %v", f.sleepTime)
}
} else {
fs.Debug(f, "Error recived: %v", err)
if gerr, ok := err.(*googleapi.Error); ok {
if len(gerr.Errors) > 0 {
reason := gerr.Errors[0].Reason
if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" {
f.sleepTime *= 2
if f.sleepTime > maxSleep {
f.sleepTime = maxSleep
}
if f.sleepTime != oldSleepTime {
fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime)
}
again = true
}
}
}
}
return again
}
// Pace the remote operations to not exceed Google's limits and retry
// on 403 rate limit exceeded
//
// This calls fn, expecting it to place its error in perr
func (f *FsDrive) pace(perr *error, fn func()) {
for {
f.paceWait()
fn()
if !f.paceRefresh(*perr) {
break
}
}
}
// parseParse parses a drive 'url' // parseParse parses a drive 'url'
func parseDrivePath(path string) (root string, err error) { func parseDrivePath(path string) (root string, err error) {
root = strings.Trim(path, "/") root = strings.Trim(path, "/")
@ -186,7 +252,10 @@ func (f *FsDrive) listAll(dirId string, title string, directoriesOnly bool, file
list := f.svc.Files.List().Q(query).MaxResults(1000) list := f.svc.Files.List().Q(query).MaxResults(1000)
OUTER: OUTER:
for { for {
files, err := list.Do() var files *drive.FileList
f.pace(&err, func() {
files, err = list.Do()
})
if err != nil { if err != nil {
return false, fmt.Errorf("Couldn't list directory: %s", err) return false, fmt.Errorf("Couldn't list directory: %s", err)
} }
@ -219,8 +288,13 @@ func NewFs(name, path string) (fs.Fs, error) {
f := &FsDrive{ f := &FsDrive{
root: root, root: root,
dirCache: newDirCache(), dirCache: newDirCache(),
pacer: make(chan struct{}, 1),
sleepTime: minSleep,
} }
// Put the first pacing token in
f.pacer <- struct{}{}
// Create a new authorized Drive client. // Create a new authorized Drive client.
f.client = t.Client() f.client = t.Client()
f.svc, err = drive.New(f.client) f.svc, err = drive.New(f.client)
@ -229,7 +303,9 @@ func NewFs(name, path string) (fs.Fs, error) {
} }
// Read About so we know the root path // Read About so we know the root path
f.pace(&err, func() {
f.about, err = f.svc.About.Get().Do() f.about, err = f.svc.About.Get().Do()
})
if err != nil { if err != nil {
return nil, fmt.Errorf("Couldn't read info about Drive: %s", err) return nil, fmt.Errorf("Couldn't read info about Drive: %s", err)
} }
@ -489,13 +565,16 @@ func (f *FsDrive) _findDir(path string, create bool) (pathId string, err error)
if create { if create {
// fmt.Println("Making", path) // fmt.Println("Making", path)
// Define the metadata for the directory we are going to create. // Define the metadata for the directory we are going to create.
info := &drive.File{ createInfo := &drive.File{
Title: leaf, Title: leaf,
Description: leaf, Description: leaf,
MimeType: driveFolderType, MimeType: driveFolderType,
Parents: []*drive.ParentReference{{Id: pathId}}, Parents: []*drive.ParentReference{{Id: pathId}},
} }
info, err := f.svc.Files.Insert(info).Do() var info *drive.File
f.pace(&err, func() {
info, err = f.svc.Files.Insert(createInfo).Do()
})
if err != nil { if err != nil {
return pathId, fmt.Errorf("Failed to make directory: %v", err) return pathId, fmt.Errorf("Failed to make directory: %v", err)
} }
@ -629,7 +708,7 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64
modifiedDate := modTime.Format(timeFormatOut) modifiedDate := modTime.Format(timeFormatOut)
// Define the metadata for the file we are going to create. // Define the metadata for the file we are going to create.
info := &drive.File{ createInfo := &drive.File{
Title: leaf, Title: leaf,
Description: leaf, Description: leaf,
Parents: []*drive.ParentReference{{Id: directoryId}}, Parents: []*drive.ParentReference{{Id: directoryId}},
@ -639,7 +718,13 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64
// Make the API request to upload metadata and file data. // Make the API request to upload metadata and file data.
in = &fs.SeekWrapper{In: in, Size: size} in = &fs.SeekWrapper{In: in, Size: size}
info, err = f.svc.Files.Insert(info).Media(in).Do() var info *drive.File
// Don't retry, return a retry error instead
f.paceWait()
info, err = f.svc.Files.Insert(createInfo).Media(in).Do()
if f.paceRefresh(err) {
return o, fs.RetryErrorf("Upload failed - retry: %s", err)
}
if err != nil { if err != nil {
return o, fmt.Errorf("Upload failed: %s", err) return o, fmt.Errorf("Upload failed: %s", err)
} }
@ -660,7 +745,10 @@ func (f *FsDrive) Rmdir() error {
if err != nil { if err != nil {
return err return err
} }
children, err := f.svc.Children.List(f.rootId).MaxResults(10).Do() var children *drive.ChildList
f.pace(&err, func() {
children, err = f.svc.Children.List(f.rootId).MaxResults(10).Do()
})
if err != nil { if err != nil {
return err return err
} }
@ -669,7 +757,9 @@ func (f *FsDrive) Rmdir() error {
} }
// Delete the directory if it isn't the root // Delete the directory if it isn't the root
if f.root != "" { if f.root != "" {
f.pace(&err, func() {
err = f.svc.Files.Delete(f.rootId).Do() err = f.svc.Files.Delete(f.rootId).Do()
})
if err != nil { if err != nil {
return err return err
} }
@ -696,7 +786,9 @@ func (f *FsDrive) Purge() error {
if err != nil { if err != nil {
return err return err
} }
f.pace(&err, func() {
err = f.svc.Files.Delete(f.rootId).Do() err = f.svc.Files.Delete(f.rootId).Do()
})
f.resetRoot() f.resetRoot()
if err != nil { if err != nil {
return err return err
@ -801,11 +893,14 @@ func (o *FsObjectDrive) SetModTime(modTime time.Time) {
return return
} }
// New metadata // New metadata
info := &drive.File{ updateInfo := &drive.File{
ModifiedDate: modTime.Format(timeFormatOut), ModifiedDate: modTime.Format(timeFormatOut),
} }
// Set modified date // Set modified date
info, err = o.drive.svc.Files.Update(o.id, info).SetModifiedDate(true).Do() var info *drive.File
o.drive.pace(&err, func() {
info, err = o.drive.svc.Files.Update(o.id, updateInfo).SetModifiedDate(true).Do()
})
if err != nil { if err != nil {
fs.Stats.Error() fs.Stats.Error()
fs.Log(o, "Failed to update remote mtime: %s", err) fs.Log(o, "Failed to update remote mtime: %s", err)
@ -826,7 +921,10 @@ func (o *FsObjectDrive) Open() (in io.ReadCloser, err error) {
return nil, err return nil, err
} }
req.Header.Set("User-Agent", fs.UserAgent) req.Header.Set("User-Agent", fs.UserAgent)
res, err := o.drive.client.Do(req) var res *http.Response
o.drive.pace(&err, func() {
res, err = o.drive.client.Do(req)
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -843,14 +941,21 @@ func (o *FsObjectDrive) Open() (in io.ReadCloser, err error) {
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) error { func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) error {
info := &drive.File{ updateInfo := &drive.File{
Id: o.id, Id: o.id,
ModifiedDate: modTime.Format(timeFormatOut), ModifiedDate: modTime.Format(timeFormatOut),
} }
// Make the API request to upload metadata and file data. // Make the API request to upload metadata and file data.
in = &fs.SeekWrapper{In: in, Size: size} in = &fs.SeekWrapper{In: in, Size: size}
info, err := o.drive.svc.Files.Update(info.Id, info).SetModifiedDate(true).Media(in).Do() var err error
var info *drive.File
// Don't retry, return a retry error instead
o.drive.paceWait()
info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do()
if o.drive.paceRefresh(err) {
return fs.RetryErrorf("Update failed - retry: %s", err)
}
if err != nil { if err != nil {
return fmt.Errorf("Update failed: %s", err) return fmt.Errorf("Update failed: %s", err)
} }
@ -860,7 +965,11 @@ func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) erro
// Remove an object // Remove an object
func (o *FsObjectDrive) Remove() error { func (o *FsObjectDrive) Remove() error {
return o.drive.svc.Files.Delete(o.id).Do() var err error
o.drive.pace(&err, func() {
err = o.drive.svc.Files.Delete(o.id).Do()
})
return err
} }
// Check the interfaces are satisfied // Check the interfaces are satisfied

View file

@ -142,6 +142,35 @@ type Purger interface {
Purge() error Purge() error
} }
// An optional interface for error as to whether the operation should be retried
//
// This should be returned from Update or Put methods as required
type Retry interface {
error
Retry() bool
}
// A type of error
type retryError string
// Error interface
func (r retryError) Error() string {
return string(r)
}
// Retry interface
func (r retryError) Retry() bool {
return true
}
// Check interface
var _ Retry = retryError("")
// RetryErrorf makes an error which indicates it would like to be retried
func RetryErrorf(format string, a ...interface{}) error {
return retryError(fmt.Sprintf(format, a...))
}
// A channel of Objects // A channel of Objects
type ObjectsChan chan Object type ObjectsChan chan Object

View file

@ -103,8 +103,7 @@ func removeFailedCopy(dst Object) {
Debug(dst, "Removing failed copy") Debug(dst, "Removing failed copy")
removeErr := dst.Remove() removeErr := dst.Remove()
if removeErr != nil { if removeErr != nil {
Stats.Error() Debug(dst, "Failed to remove failed copy: %s", removeErr)
Log(dst, "Failed to remove failed copy: %s", removeErr)
} }
} }
} }
@ -115,6 +114,10 @@ func removeFailedCopy(dst Object) {
// call Copy() with dst nil on a pre-existing file then some filing // call Copy() with dst nil on a pre-existing file then some filing
// systems (eg Drive) may duplicate the file. // systems (eg Drive) may duplicate the file.
func Copy(f Fs, dst, src Object) { func Copy(f Fs, dst, src Object) {
const maxTries = 10
tries := 0
doUpdate := dst != nil
tryAgain:
in0, err := src.Open() in0, err := src.Open()
if err != nil { if err != nil {
Stats.Error() Stats.Error()
@ -124,7 +127,7 @@ func Copy(f Fs, dst, src Object) {
in := NewAccount(in0) // account the transfer in := NewAccount(in0) // account the transfer
var actionTaken string var actionTaken string
if dst != nil { if doUpdate {
actionTaken = "Copied (updated existing)" actionTaken = "Copied (updated existing)"
err = dst.Update(in, src.ModTime(), src.Size()) err = dst.Update(in, src.ModTime(), src.Size())
} else { } else {
@ -132,6 +135,13 @@ func Copy(f Fs, dst, src Object) {
dst, err = f.Put(in, src.Remote(), src.ModTime(), src.Size()) dst, err = f.Put(in, src.Remote(), src.ModTime(), src.Size())
} }
inErr := in.Close() inErr := in.Close()
// Retry if err returned a retry error
if r, ok := err.(Retry); ok && r.Retry() && tries < maxTries {
tries++
Log(src, "Received error: %v - retrying %d/%d", err, tries, maxTries)
removeFailedCopy(dst)
goto tryAgain
}
if err == nil { if err == nil {
err = inErr err = inErr
} }