S3: Use (custom) pacer, to retry operations when reasonable - fixes #2503

This commit is contained in:
Craig Miskell 2018-09-03 16:41:04 +12:00 committed by Nick Craig-Wood
parent 19cf3bb9e7
commit 2543278c3f
3 changed files with 165 additions and 12 deletions

View file

@ -39,9 +39,11 @@ import (
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/config/configmap"
"github.com/ncw/rclone/fs/config/configstruct"
"github.com/ncw/rclone/fs/fserrors"
"github.com/ncw/rclone/fs/fshttp"
"github.com/ncw/rclone/fs/hash"
"github.com/ncw/rclone/fs/walk"
"github.com/ncw/rclone/lib/pacer"
"github.com/ncw/rclone/lib/rest"
"github.com/ncw/swift"
"github.com/pkg/errors"
@ -570,6 +572,7 @@ const (
maxRetries = 10 // number of retries to make of operations
maxSizeForCopy = 5 * 1024 * 1024 * 1024 // The maximum size of object we can COPY
maxFileSize = 5 * 1024 * 1024 * 1024 * 1024 // largest possible upload file size
minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep.
)
// Options defines the configuration for this backend
@ -604,6 +607,7 @@ type Fs struct {
bucketOKMu sync.Mutex // mutex to protect bucket OK
bucketOK bool // true if we have created the bucket
bucketDeleted bool // true if we have deleted the bucket
pacer *pacer.Pacer // To pace the API calls
}
// Object describes a s3 object
@ -649,6 +653,37 @@ func (f *Fs) Features() *fs.Features {
return f.features
}
// retryErrorCodes is a slice of error codes that we will retry
// See: https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
var retryErrorCodes = []int{
409, // Conflict - various states that could be resolved on a retry
503, // Service Unavailable/Slow Down - "Reduce your request rate"
}
//S3 is pretty resilient, and the built in retry handling is probably sufficient
// as it should notice closed connections and timeouts which are the most likely
// sort of failure modes
func shouldRetry(err error) (bool, error) {
// If this is an awserr object, try and extract more useful information to determine if we should retry
if awsError, ok := err.(awserr.Error); ok {
// Simple case, check the original embedded error in case it's generically retriable
if fserrors.ShouldRetry(awsError.OrigErr()) {
return true, err
}
//Failing that, if it's a RequestFailure it's probably got an http status code we can check
if reqErr, ok := err.(awserr.RequestFailure); ok {
for _, e := range retryErrorCodes {
if reqErr.StatusCode() == e {
return true, err
}
}
}
}
//Ok, not an awserr, check for generic failure conditions
return fserrors.ShouldRetry(err), err
}
// Pattern to match a s3 path
var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`)
@ -774,6 +809,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
c: c,
bucket: bucket,
ses: ses,
pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.S3Pacer),
}
f.features = (&fs.Features{
ReadMimeType: true,
@ -787,7 +823,10 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
Bucket: &f.bucket,
Key: &directory,
}
_, err = f.c.HeadObject(&req)
err = f.pacer.Call(func() (bool, error) {
_, err = f.c.HeadObject(&req)
return shouldRetry(err)
})
if err == nil {
f.root = path.Dir(directory)
if f.root == "." {
@ -864,7 +903,12 @@ func (f *Fs) list(dir string, recurse bool, fn listFn) error {
MaxKeys: &maxKeys,
Marker: marker,
}
resp, err := f.c.ListObjects(&req)
var resp *s3.ListObjectsOutput
var err error
err = f.pacer.Call(func() (bool, error) {
resp, err = f.c.ListObjects(&req)
return shouldRetry(err)
})
if err != nil {
if awsErr, ok := err.(awserr.RequestFailure); ok {
if awsErr.StatusCode() == http.StatusNotFound {
@ -989,7 +1033,11 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) {
return nil, fs.ErrorListBucketRequired
}
req := s3.ListBucketsInput{}
resp, err := f.c.ListBuckets(&req)
var resp *s3.ListBucketsOutput
err = f.pacer.Call(func() (bool, error) {
resp, err = f.c.ListBuckets(&req)
return shouldRetry(err)
})
if err != nil {
return nil, err
}
@ -1074,7 +1122,10 @@ func (f *Fs) dirExists() (bool, error) {
req := s3.HeadBucketInput{
Bucket: &f.bucket,
}
_, err := f.c.HeadBucket(&req)
err := f.pacer.Call(func() (bool, error) {
_, err := f.c.HeadBucket(&req)
return shouldRetry(err)
})
if err == nil {
return true, nil
}
@ -1111,7 +1162,10 @@ func (f *Fs) Mkdir(dir string) error {
LocationConstraint: &f.opt.LocationConstraint,
}
}
_, err := f.c.CreateBucket(&req)
err := f.pacer.Call(func() (bool, error) {
_, err := f.c.CreateBucket(&req)
return shouldRetry(err)
})
if err, ok := err.(awserr.Error); ok {
if err.Code() == "BucketAlreadyOwnedByYou" {
err = nil
@ -1136,7 +1190,10 @@ func (f *Fs) Rmdir(dir string) error {
req := s3.DeleteBucketInput{
Bucket: &f.bucket,
}
_, err := f.c.DeleteBucket(&req)
err := f.pacer.Call(func() (bool, error) {
_, err := f.c.DeleteBucket(&req)
return shouldRetry(err)
})
if err == nil {
f.bucketOK = false
f.bucketDeleted = true
@ -1183,7 +1240,10 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
CopySource: &source,
MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
}
_, err = f.c.CopyObject(&req)
err = f.pacer.Call(func() (bool, error) {
_, err = f.c.CopyObject(&req)
return shouldRetry(err)
})
if err != nil {
return nil, err
}
@ -1260,7 +1320,12 @@ func (o *Object) readMetaData() (err error) {
Bucket: &o.fs.bucket,
Key: &key,
}
resp, err := o.fs.c.HeadObject(&req)
var resp *s3.HeadObjectOutput
err = o.fs.pacer.Call(func() (bool, error) {
var err error
resp, err = o.fs.c.HeadObject(&req)
return shouldRetry(err)
})
if err != nil {
if awsErr, ok := err.(awserr.RequestFailure); ok {
if awsErr.StatusCode() == http.StatusNotFound {
@ -1344,7 +1409,10 @@ func (o *Object) SetModTime(modTime time.Time) error {
Metadata: o.meta,
MetadataDirective: &directive,
}
_, err = o.fs.c.CopyObject(&req)
err = o.fs.pacer.Call(func() (bool, error) {
_, err := o.fs.c.CopyObject(&req)
return shouldRetry(err)
})
return err
}
@ -1371,7 +1439,12 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
}
}
}
resp, err := o.fs.c.GetObject(&req)
var resp *s3.GetObjectOutput
err = o.fs.pacer.Call(func() (bool, error) {
var err error
resp, err = o.fs.c.GetObject(&req)
return shouldRetry(err)
})
if err, ok := err.(awserr.RequestFailure); ok {
if err.Code() == "InvalidObjectState" {
return nil, errors.Errorf("Object in GLACIER, restore first: %v", key)
@ -1450,7 +1523,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
if o.fs.opt.StorageClass != "" {
req.StorageClass = &o.fs.opt.StorageClass
}
_, err = uploader.Upload(&req)
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
_, err = uploader.Upload(&req)
return shouldRetry(err)
})
if err != nil {
return err
}
@ -1468,7 +1544,10 @@ func (o *Object) Remove() error {
Bucket: &o.fs.bucket,
Key: &key,
}
_, err := o.fs.c.DeleteObject(&req)
err := o.fs.pacer.Call(func() (bool, error) {
_, err := o.fs.c.DeleteObject(&req)
return shouldRetry(err)
})
return err
}