backends: Add context checking to remaining backends #4504
This is a follow up to 4013bc4a4c
which missed some backends.
It adds a ctx parameter to shouldRetry and checks it.
This commit is contained in:
parent
f76c6cc893
commit
f2c0f82fc6
9 changed files with 241 additions and 217 deletions
|
@ -1399,7 +1399,10 @@ var retryErrorCodes = []int{
|
|||
//S3 is pretty resilient, and the built in retry handling is probably sufficient
|
||||
// as it should notice closed connections and timeouts which are the most likely
|
||||
// sort of failure modes
|
||||
func (f *Fs) shouldRetry(err error) (bool, error) {
|
||||
func (f *Fs) shouldRetry(ctx context.Context, err error) (bool, error) {
|
||||
if fserrors.ContextError(ctx, &err) {
|
||||
return false, err
|
||||
}
|
||||
// If this is an awserr object, try and extract more useful information to determine if we should retry
|
||||
if awsError, ok := err.(awserr.Error); ok {
|
||||
// Simple case, check the original embedded error in case it's generically retryable
|
||||
|
@ -1411,7 +1414,7 @@ func (f *Fs) shouldRetry(err error) (bool, error) {
|
|||
// 301 if wrong region for bucket - can only update if running from a bucket
|
||||
if f.rootBucket != "" {
|
||||
if reqErr.StatusCode() == http.StatusMovedPermanently {
|
||||
urfbErr := f.updateRegionForBucket(f.rootBucket)
|
||||
urfbErr := f.updateRegionForBucket(ctx, f.rootBucket)
|
||||
if urfbErr != nil {
|
||||
fs.Errorf(f, "Failed to update region for bucket: %v", urfbErr)
|
||||
return false, err
|
||||
|
@ -1741,7 +1744,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
|||
}
|
||||
|
||||
// Gets the bucket location
|
||||
func (f *Fs) getBucketLocation(bucket string) (string, error) {
|
||||
func (f *Fs) getBucketLocation(ctx context.Context, bucket string) (string, error) {
|
||||
req := s3.GetBucketLocationInput{
|
||||
Bucket: &bucket,
|
||||
}
|
||||
|
@ -1749,7 +1752,7 @@ func (f *Fs) getBucketLocation(bucket string) (string, error) {
|
|||
var err error
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
resp, err = f.c.GetBucketLocation(&req)
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
@ -1759,8 +1762,8 @@ func (f *Fs) getBucketLocation(bucket string) (string, error) {
|
|||
|
||||
// Updates the region for the bucket by reading the region from the
|
||||
// bucket then updating the session.
|
||||
func (f *Fs) updateRegionForBucket(bucket string) error {
|
||||
region, err := f.getBucketLocation(bucket)
|
||||
func (f *Fs) updateRegionForBucket(ctx context.Context, bucket string) error {
|
||||
region, err := f.getBucketLocation(ctx, bucket)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "reading bucket location failed")
|
||||
}
|
||||
|
@ -1854,7 +1857,7 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
|
|||
}
|
||||
}
|
||||
}
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
if awsErr, ok := err.(awserr.RequestFailure); ok {
|
||||
|
@ -2001,7 +2004,7 @@ func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error)
|
|||
var resp *s3.ListBucketsOutput
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
resp, err = f.c.ListBucketsWithContext(ctx, &req)
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -2116,7 +2119,7 @@ func (f *Fs) bucketExists(ctx context.Context, bucket string) (bool, error) {
|
|||
}
|
||||
err := f.pacer.Call(func() (bool, error) {
|
||||
_, err := f.c.HeadBucketWithContext(ctx, &req)
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err == nil {
|
||||
return true, nil
|
||||
|
@ -2152,7 +2155,7 @@ func (f *Fs) makeBucket(ctx context.Context, bucket string) error {
|
|||
}
|
||||
err := f.pacer.Call(func() (bool, error) {
|
||||
_, err := f.c.CreateBucketWithContext(ctx, &req)
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err == nil {
|
||||
fs.Infof(f, "Bucket %q created with ACL %q", bucket, f.opt.BucketACL)
|
||||
|
@ -2182,7 +2185,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
|||
}
|
||||
err := f.pacer.Call(func() (bool, error) {
|
||||
_, err := f.c.DeleteBucketWithContext(ctx, &req)
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err == nil {
|
||||
fs.Infof(f, "Bucket %q deleted", bucket)
|
||||
|
@ -2242,7 +2245,7 @@ func (f *Fs) copy(ctx context.Context, req *s3.CopyObjectInput, dstBucket, dstPa
|
|||
}
|
||||
return f.pacer.Call(func() (bool, error) {
|
||||
_, err := f.c.CopyObjectWithContext(ctx, req)
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -2286,7 +2289,7 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst
|
|||
if err := f.pacer.Call(func() (bool, error) {
|
||||
var err error
|
||||
cout, err = f.c.CreateMultipartUploadWithContext(ctx, req)
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -2302,7 +2305,7 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst
|
|||
UploadId: uid,
|
||||
RequestPayer: req.RequestPayer,
|
||||
})
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
})()
|
||||
|
||||
|
@ -2325,7 +2328,7 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst
|
|||
uploadPartReq.CopySourceRange = aws.String(calculateRange(partSize, partNum-1, numParts, srcSize))
|
||||
uout, err := f.c.UploadPartCopyWithContext(ctx, uploadPartReq)
|
||||
if err != nil {
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
}
|
||||
parts = append(parts, &s3.CompletedPart{
|
||||
PartNumber: &partNum,
|
||||
|
@ -2347,7 +2350,7 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst
|
|||
RequestPayer: req.RequestPayer,
|
||||
UploadId: uid,
|
||||
})
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -2578,7 +2581,7 @@ func (f *Fs) Command(ctx context.Context, name string, arg []string, opt map[str
|
|||
reqCopy.Key = &bucketPath
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
_, err = f.c.RestoreObject(&reqCopy)
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
st.Status = err.Error()
|
||||
|
@ -2626,7 +2629,7 @@ func (f *Fs) listMultipartUploads(ctx context.Context, bucket, key string) (uplo
|
|||
var resp *s3.ListMultipartUploadsOutput
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
resp, err = f.c.ListMultipartUploads(&req)
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "list multipart uploads bucket %q key %q", bucket, key)
|
||||
|
@ -2801,7 +2804,7 @@ func (o *Object) headObject(ctx context.Context) (resp *s3.HeadObjectOutput, err
|
|||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
var err error
|
||||
resp, err = o.fs.c.HeadObjectWithContext(ctx, &req)
|
||||
return o.fs.shouldRetry(err)
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
if awsErr, ok := err.(awserr.RequestFailure); ok {
|
||||
|
@ -2957,7 +2960,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
|||
var err error
|
||||
httpReq.HTTPRequest = httpReq.HTTPRequest.WithContext(ctx)
|
||||
err = httpReq.Send()
|
||||
return o.fs.shouldRetry(err)
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
})
|
||||
if err, ok := err.(awserr.RequestFailure); ok {
|
||||
if err.Code() == "InvalidObjectState" {
|
||||
|
@ -3016,7 +3019,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
err = f.pacer.Call(func() (bool, error) {
|
||||
var err error
|
||||
cout, err = f.c.CreateMultipartUploadWithContext(ctx, &mReq)
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "multipart upload failed to initialise")
|
||||
|
@ -3035,7 +3038,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
UploadId: uid,
|
||||
RequestPayer: req.RequestPayer,
|
||||
})
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if errCancel != nil {
|
||||
fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel)
|
||||
|
@ -3111,7 +3114,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
uout, err := f.c.UploadPartWithContext(gCtx, uploadPartReq)
|
||||
if err != nil {
|
||||
if partNum <= int64(concurrency) {
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
}
|
||||
// retry all chunks once have done the first batch
|
||||
return true, err
|
||||
|
@ -3151,7 +3154,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
RequestPayer: req.RequestPayer,
|
||||
UploadId: uid,
|
||||
})
|
||||
return f.shouldRetry(err)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "multipart upload failed to finalise")
|
||||
|
@ -3306,11 +3309,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
var err error
|
||||
resp, err = o.fs.srv.Do(httpReq)
|
||||
if err != nil {
|
||||
return o.fs.shouldRetry(err)
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
}
|
||||
body, err := rest.ReadBody(resp)
|
||||
if err != nil {
|
||||
return o.fs.shouldRetry(err)
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
}
|
||||
if resp.StatusCode >= 200 && resp.StatusCode < 299 {
|
||||
return false, nil
|
||||
|
@ -3361,7 +3364,7 @@ func (o *Object) Remove(ctx context.Context) error {
|
|||
}
|
||||
err := o.fs.pacer.Call(func() (bool, error) {
|
||||
_, err := o.fs.c.DeleteObjectWithContext(ctx, &req)
|
||||
return o.fs.shouldRetry(err)
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue