diff --git a/backend/googlecloudstorage/googlecloudstorage.go b/backend/googlecloudstorage/googlecloudstorage.go index c18cab6cb..62ee84e41 100644 --- a/backend/googlecloudstorage/googlecloudstorage.go +++ b/backend/googlecloudstorage/googlecloudstorage.go @@ -31,10 +31,12 @@ import ( "github.com/ncw/rclone/fs/config" "github.com/ncw/rclone/fs/config/flags" "github.com/ncw/rclone/fs/config/obscure" + "github.com/ncw/rclone/fs/fserrors" "github.com/ncw/rclone/fs/fshttp" "github.com/ncw/rclone/fs/hash" "github.com/ncw/rclone/fs/walk" "github.com/ncw/rclone/lib/oauthutil" + "github.com/ncw/rclone/lib/pacer" "github.com/pkg/errors" "golang.org/x/oauth2" "golang.org/x/oauth2/google" @@ -49,6 +51,7 @@ const ( timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00" metaMtime = "mtime" // key to store mtime under in metadata listChunks = 1000 // chunk size to read directory listings + minSleep = 10 * time.Millisecond ) var ( @@ -219,6 +222,7 @@ type Fs struct { bucketACL string // used when creating new buckets location string // location of new buckets storageClass string // storage class of new buckets + pacer *pacer.Pacer // To pace the API calls } // Object describes a storage object @@ -262,6 +266,30 @@ func (f *Fs) Features() *fs.Features { return f.features } +// shouldRetry determines whehter a given err rates being retried +func shouldRetry(err error) (again bool, errOut error) { + again = false + if err != nil { + if fserrors.ShouldRetry(err) { + again = true + } else { + switch gerr := err.(type) { + case *googleapi.Error: + if gerr.Code >= 500 && gerr.Code < 600 { + // All 5xx errors should be retried + again = true + } else if len(gerr.Errors) > 0 { + reason := gerr.Errors[0].Reason + if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" { + again = true + } + } + } + } + } + return again, err +} + // Pattern to match a storage path var matcher = regexp.MustCompile(`^([^/]*)(.*)$`) @@ -327,6 +355,7 @@ func NewFs(name, root string) (fs.Fs, error) { bucketACL: config.FileGet(name, "bucket_acl"), location: config.FileGet(name, "location"), storageClass: config.FileGet(name, "storage_class"), + pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.GoogleDrivePacer), } f.features = (&fs.Features{ ReadMimeType: true, @@ -356,7 +385,10 @@ func NewFs(name, root string) (fs.Fs, error) { if f.root != "" { f.root += "/" // Check to see if the object exists - _, err = f.svc.Objects.Get(bucket, directory).Do() + err = f.pacer.Call(func() (bool, error) { + _, err = f.svc.Objects.Get(bucket, directory).Do() + return shouldRetry(err) + }) if err == nil { f.root = path.Dir(directory) if f.root == "." { @@ -404,7 +436,7 @@ type listFn func(remote string, object *storage.Object, isDirectory bool) error // dir is the starting directory, "" for root // // Set recurse to read sub directories -func (f *Fs) list(dir string, recurse bool, fn listFn) error { +func (f *Fs) list(dir string, recurse bool, fn listFn) (err error) { root := f.root rootLength := len(root) if dir != "" { @@ -415,7 +447,11 @@ func (f *Fs) list(dir string, recurse bool, fn listFn) error { list = list.Delimiter("/") } for { - objects, err := list.Do() + var objects *storage.Objects + err = f.pacer.Call(func() (bool, error) { + objects, err = list.Do() + return shouldRetry(err) + }) if err != nil { if gErr, ok := err.(*googleapi.Error); ok { if gErr.Code == http.StatusNotFound { @@ -519,7 +555,11 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) { } listBuckets := f.svc.Buckets.List(f.projectNumber).MaxResults(listChunks) for { - buckets, err := listBuckets.Do() + var buckets *storage.Buckets + err = f.pacer.Call(func() (bool, error) { + buckets, err = listBuckets.Do() + return shouldRetry(err) + }) if err != nil { return nil, err } @@ -607,7 +647,7 @@ func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption } // Mkdir creates the bucket if it doesn't exist -func (f *Fs) Mkdir(dir string) error { +func (f *Fs) Mkdir(dir string) (err error) { f.bucketOKMu.Lock() defer f.bucketOKMu.Unlock() if f.bucketOK { @@ -615,7 +655,11 @@ func (f *Fs) Mkdir(dir string) error { } // List something from the bucket to see if it exists. Doing it like this enables the use of a // service account that only has the "Storage Object Admin" role. See #2193 for details. - _, err := f.svc.Objects.List(f.bucket).MaxResults(1).Do() + + err = f.pacer.Call(func() (bool, error) { + _, err = f.svc.Objects.List(f.bucket).MaxResults(1).Do() + return shouldRetry(err) + }) if err == nil { // Bucket already exists f.bucketOK = true @@ -637,7 +681,10 @@ func (f *Fs) Mkdir(dir string) error { Location: f.location, StorageClass: f.storageClass, } - _, err = f.svc.Buckets.Insert(f.projectNumber, &bucket).PredefinedAcl(f.bucketACL).Do() + err = f.pacer.Call(func() (bool, error) { + _, err = f.svc.Buckets.Insert(f.projectNumber, &bucket).PredefinedAcl(f.bucketACL).Do() + return shouldRetry(err) + }) if err == nil { f.bucketOK = true } @@ -648,13 +695,16 @@ func (f *Fs) Mkdir(dir string) error { // // Returns an error if it isn't empty: Error 409: The bucket you tried // to delete was not empty. -func (f *Fs) Rmdir(dir string) error { +func (f *Fs) Rmdir(dir string) (err error) { f.bucketOKMu.Lock() defer f.bucketOKMu.Unlock() if f.root != "" || dir != "" { return nil } - err := f.svc.Buckets.Delete(f.bucket).Do() + err = f.pacer.Call(func() (bool, error) { + err = f.svc.Buckets.Delete(f.bucket).Do() + return shouldRetry(err) + }) if err == nil { f.bucketOK = false } @@ -696,7 +746,11 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { srcObject := srcObj.fs.root + srcObj.remote dstBucket := f.bucket dstObject := f.root + remote - newObject, err := f.svc.Objects.Copy(srcBucket, srcObject, dstBucket, dstObject, nil).Do() + var newObject *storage.Object + err = f.pacer.Call(func() (bool, error) { + newObject, err = f.svc.Objects.Copy(srcBucket, srcObject, dstBucket, dstObject, nil).Do() + return shouldRetry(err) + }) if err != nil { return nil, err } @@ -784,7 +838,11 @@ func (o *Object) readMetaData() (err error) { if !o.modTime.IsZero() { return nil } - object, err := o.fs.svc.Objects.Get(o.fs.bucket, o.fs.root+o.remote).Do() + var object *storage.Object + err = o.fs.pacer.Call(func() (bool, error) { + object, err = o.fs.svc.Objects.Get(o.fs.bucket, o.fs.root+o.remote).Do() + return shouldRetry(err) + }) if err != nil { if gErr, ok := err.(*googleapi.Error); ok { if gErr.Code == http.StatusNotFound { @@ -818,14 +876,18 @@ func metadataFromModTime(modTime time.Time) map[string]string { } // SetModTime sets the modification time of the local fs object -func (o *Object) SetModTime(modTime time.Time) error { +func (o *Object) SetModTime(modTime time.Time) (err error) { // This only adds metadata so will perserve other metadata object := storage.Object{ Bucket: o.fs.bucket, Name: o.fs.root + o.remote, Metadata: metadataFromModTime(modTime), } - newObject, err := o.fs.svc.Objects.Patch(o.fs.bucket, o.fs.root+o.remote, &object).Do() + var newObject *storage.Object + err = o.fs.pacer.Call(func() (bool, error) { + newObject, err = o.fs.svc.Objects.Patch(o.fs.bucket, o.fs.root+o.remote, &object).Do() + return shouldRetry(err) + }) if err != nil { return err } @@ -845,7 +907,17 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { return nil, err } fs.OpenOptionAddHTTPHeaders(req.Header, options) - res, err := o.fs.client.Do(req) + var res *http.Response + err = o.fs.pacer.Call(func() (bool, error) { + res, err = o.fs.client.Do(req) + if err == nil { + err = googleapi.CheckResponse(res) + if err != nil { + _ = res.Body.Close() // ignore error + } + } + return shouldRetry(err) + }) if err != nil { return nil, err } @@ -874,7 +946,11 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio Updated: modTime.Format(timeFormatOut), // Doesn't get set Metadata: metadataFromModTime(modTime), } - newObject, err := o.fs.svc.Objects.Insert(o.fs.bucket, &object).Media(in, googleapi.ContentType("")).Name(object.Name).PredefinedAcl(o.fs.objectACL).Do() + var newObject *storage.Object + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + newObject, err = o.fs.svc.Objects.Insert(o.fs.bucket, &object).Media(in, googleapi.ContentType("")).Name(object.Name).PredefinedAcl(o.fs.objectACL).Do() + return shouldRetry(err) + }) if err != nil { return err } @@ -884,8 +960,12 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio } // Remove an object -func (o *Object) Remove() error { - return o.fs.svc.Objects.Delete(o.fs.bucket, o.fs.root+o.remote).Do() +func (o *Object) Remove() (err error) { + err = o.fs.pacer.Call(func() (bool, error) { + err = o.fs.svc.Objects.Delete(o.fs.bucket, o.fs.root+o.remote).Do() + return shouldRetry(err) + }) + return err } // MimeType of an Object if known, "" otherwise diff --git a/fstest/test_all/test_all.go b/fstest/test_all/test_all.go index 161b40ee6..50d841915 100644 --- a/fstest/test_all/test_all.go +++ b/fstest/test_all/test_all.go @@ -277,7 +277,6 @@ func (t *test) cleanFs() error { remote := dir.Remote() if fstest.MatchTestRemote.MatchString(remote) { log.Printf("Purging %s%s", t.remote, remote) - time.Sleep(2500 * time.Millisecond) // sleep to rate limit bucket deletes for gcs dir, err := fs.NewFs(t.remote + remote) if err != nil { return err