gcs: low level retry all operations if necessary

Google cloud storage doesn't normally need retries, however certain
things (eg bucket creation and removal) are rate limited and do
generate 429 errors.

Before this change the integration tests would regularly blow up with
errors from GCS rate limiting bucket creation and removal.

After this change we low level retry all operations using the same
exponential backoff strategy as used in the google drive backend.
This commit is contained in:
Nick Craig-Wood 2018-05-09 14:27:21 +01:00
parent 5eecbd83ee
commit 9698a2babb
2 changed files with 97 additions and 18 deletions

View file

@ -31,10 +31,12 @@ import (
"github.com/ncw/rclone/fs/config"
"github.com/ncw/rclone/fs/config/flags"
"github.com/ncw/rclone/fs/config/obscure"
"github.com/ncw/rclone/fs/fserrors"
"github.com/ncw/rclone/fs/fshttp"
"github.com/ncw/rclone/fs/hash"
"github.com/ncw/rclone/fs/walk"
"github.com/ncw/rclone/lib/oauthutil"
"github.com/ncw/rclone/lib/pacer"
"github.com/pkg/errors"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
@ -49,6 +51,7 @@ const (
timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00"
metaMtime = "mtime" // key to store mtime under in metadata
listChunks = 1000 // chunk size to read directory listings
minSleep = 10 * time.Millisecond
)
var (
@ -219,6 +222,7 @@ type Fs struct {
bucketACL string // used when creating new buckets
location string // location of new buckets
storageClass string // storage class of new buckets
pacer *pacer.Pacer // To pace the API calls
}
// Object describes a storage object
@ -262,6 +266,30 @@ func (f *Fs) Features() *fs.Features {
return f.features
}
// shouldRetry determines whehter a given err rates being retried
func shouldRetry(err error) (again bool, errOut error) {
again = false
if err != nil {
if fserrors.ShouldRetry(err) {
again = true
} else {
switch gerr := err.(type) {
case *googleapi.Error:
if gerr.Code >= 500 && gerr.Code < 600 {
// All 5xx errors should be retried
again = true
} else if len(gerr.Errors) > 0 {
reason := gerr.Errors[0].Reason
if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" {
again = true
}
}
}
}
}
return again, err
}
// Pattern to match a storage path
var matcher = regexp.MustCompile(`^([^/]*)(.*)$`)
@ -327,6 +355,7 @@ func NewFs(name, root string) (fs.Fs, error) {
bucketACL: config.FileGet(name, "bucket_acl"),
location: config.FileGet(name, "location"),
storageClass: config.FileGet(name, "storage_class"),
pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.GoogleDrivePacer),
}
f.features = (&fs.Features{
ReadMimeType: true,
@ -356,7 +385,10 @@ func NewFs(name, root string) (fs.Fs, error) {
if f.root != "" {
f.root += "/"
// Check to see if the object exists
_, err = f.svc.Objects.Get(bucket, directory).Do()
err = f.pacer.Call(func() (bool, error) {
_, err = f.svc.Objects.Get(bucket, directory).Do()
return shouldRetry(err)
})
if err == nil {
f.root = path.Dir(directory)
if f.root == "." {
@ -404,7 +436,7 @@ type listFn func(remote string, object *storage.Object, isDirectory bool) error
// dir is the starting directory, "" for root
//
// Set recurse to read sub directories
func (f *Fs) list(dir string, recurse bool, fn listFn) error {
func (f *Fs) list(dir string, recurse bool, fn listFn) (err error) {
root := f.root
rootLength := len(root)
if dir != "" {
@ -415,7 +447,11 @@ func (f *Fs) list(dir string, recurse bool, fn listFn) error {
list = list.Delimiter("/")
}
for {
objects, err := list.Do()
var objects *storage.Objects
err = f.pacer.Call(func() (bool, error) {
objects, err = list.Do()
return shouldRetry(err)
})
if err != nil {
if gErr, ok := err.(*googleapi.Error); ok {
if gErr.Code == http.StatusNotFound {
@ -519,7 +555,11 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) {
}
listBuckets := f.svc.Buckets.List(f.projectNumber).MaxResults(listChunks)
for {
buckets, err := listBuckets.Do()
var buckets *storage.Buckets
err = f.pacer.Call(func() (bool, error) {
buckets, err = listBuckets.Do()
return shouldRetry(err)
})
if err != nil {
return nil, err
}
@ -607,7 +647,7 @@ func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption
}
// Mkdir creates the bucket if it doesn't exist
func (f *Fs) Mkdir(dir string) error {
func (f *Fs) Mkdir(dir string) (err error) {
f.bucketOKMu.Lock()
defer f.bucketOKMu.Unlock()
if f.bucketOK {
@ -615,7 +655,11 @@ func (f *Fs) Mkdir(dir string) error {
}
// List something from the bucket to see if it exists. Doing it like this enables the use of a
// service account that only has the "Storage Object Admin" role. See #2193 for details.
_, err := f.svc.Objects.List(f.bucket).MaxResults(1).Do()
err = f.pacer.Call(func() (bool, error) {
_, err = f.svc.Objects.List(f.bucket).MaxResults(1).Do()
return shouldRetry(err)
})
if err == nil {
// Bucket already exists
f.bucketOK = true
@ -637,7 +681,10 @@ func (f *Fs) Mkdir(dir string) error {
Location: f.location,
StorageClass: f.storageClass,
}
_, err = f.svc.Buckets.Insert(f.projectNumber, &bucket).PredefinedAcl(f.bucketACL).Do()
err = f.pacer.Call(func() (bool, error) {
_, err = f.svc.Buckets.Insert(f.projectNumber, &bucket).PredefinedAcl(f.bucketACL).Do()
return shouldRetry(err)
})
if err == nil {
f.bucketOK = true
}
@ -648,13 +695,16 @@ func (f *Fs) Mkdir(dir string) error {
//
// Returns an error if it isn't empty: Error 409: The bucket you tried
// to delete was not empty.
func (f *Fs) Rmdir(dir string) error {
func (f *Fs) Rmdir(dir string) (err error) {
f.bucketOKMu.Lock()
defer f.bucketOKMu.Unlock()
if f.root != "" || dir != "" {
return nil
}
err := f.svc.Buckets.Delete(f.bucket).Do()
err = f.pacer.Call(func() (bool, error) {
err = f.svc.Buckets.Delete(f.bucket).Do()
return shouldRetry(err)
})
if err == nil {
f.bucketOK = false
}
@ -696,7 +746,11 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
srcObject := srcObj.fs.root + srcObj.remote
dstBucket := f.bucket
dstObject := f.root + remote
newObject, err := f.svc.Objects.Copy(srcBucket, srcObject, dstBucket, dstObject, nil).Do()
var newObject *storage.Object
err = f.pacer.Call(func() (bool, error) {
newObject, err = f.svc.Objects.Copy(srcBucket, srcObject, dstBucket, dstObject, nil).Do()
return shouldRetry(err)
})
if err != nil {
return nil, err
}
@ -784,7 +838,11 @@ func (o *Object) readMetaData() (err error) {
if !o.modTime.IsZero() {
return nil
}
object, err := o.fs.svc.Objects.Get(o.fs.bucket, o.fs.root+o.remote).Do()
var object *storage.Object
err = o.fs.pacer.Call(func() (bool, error) {
object, err = o.fs.svc.Objects.Get(o.fs.bucket, o.fs.root+o.remote).Do()
return shouldRetry(err)
})
if err != nil {
if gErr, ok := err.(*googleapi.Error); ok {
if gErr.Code == http.StatusNotFound {
@ -818,14 +876,18 @@ func metadataFromModTime(modTime time.Time) map[string]string {
}
// SetModTime sets the modification time of the local fs object
func (o *Object) SetModTime(modTime time.Time) error {
func (o *Object) SetModTime(modTime time.Time) (err error) {
// This only adds metadata so will perserve other metadata
object := storage.Object{
Bucket: o.fs.bucket,
Name: o.fs.root + o.remote,
Metadata: metadataFromModTime(modTime),
}
newObject, err := o.fs.svc.Objects.Patch(o.fs.bucket, o.fs.root+o.remote, &object).Do()
var newObject *storage.Object
err = o.fs.pacer.Call(func() (bool, error) {
newObject, err = o.fs.svc.Objects.Patch(o.fs.bucket, o.fs.root+o.remote, &object).Do()
return shouldRetry(err)
})
if err != nil {
return err
}
@ -845,7 +907,17 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
return nil, err
}
fs.OpenOptionAddHTTPHeaders(req.Header, options)
res, err := o.fs.client.Do(req)
var res *http.Response
err = o.fs.pacer.Call(func() (bool, error) {
res, err = o.fs.client.Do(req)
if err == nil {
err = googleapi.CheckResponse(res)
if err != nil {
_ = res.Body.Close() // ignore error
}
}
return shouldRetry(err)
})
if err != nil {
return nil, err
}
@ -874,7 +946,11 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
Updated: modTime.Format(timeFormatOut), // Doesn't get set
Metadata: metadataFromModTime(modTime),
}
newObject, err := o.fs.svc.Objects.Insert(o.fs.bucket, &object).Media(in, googleapi.ContentType("")).Name(object.Name).PredefinedAcl(o.fs.objectACL).Do()
var newObject *storage.Object
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
newObject, err = o.fs.svc.Objects.Insert(o.fs.bucket, &object).Media(in, googleapi.ContentType("")).Name(object.Name).PredefinedAcl(o.fs.objectACL).Do()
return shouldRetry(err)
})
if err != nil {
return err
}
@ -884,8 +960,12 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
}
// Remove an object
func (o *Object) Remove() error {
return o.fs.svc.Objects.Delete(o.fs.bucket, o.fs.root+o.remote).Do()
func (o *Object) Remove() (err error) {
err = o.fs.pacer.Call(func() (bool, error) {
err = o.fs.svc.Objects.Delete(o.fs.bucket, o.fs.root+o.remote).Do()
return shouldRetry(err)
})
return err
}
// MimeType of an Object if known, "" otherwise

View file

@ -277,7 +277,6 @@ func (t *test) cleanFs() error {
remote := dir.Remote()
if fstest.MatchTestRemote.MatchString(remote) {
log.Printf("Purging %s%s", t.remote, remote)
time.Sleep(2500 * time.Millisecond) // sleep to rate limit bucket deletes for gcs
dir, err := fs.NewFs(t.remote + remote)
if err != nil {
return err