From e31578e03cbed52eadd61f06f65681ac88a67e0a Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 16 Jan 2019 13:35:19 +0000 Subject: [PATCH] s3: Auto detect region for buckets on operation failure - fixes #2915 If an incorrect region error is returned while using a bucket then the region is updated, the session is remade and the operation is retried. --- backend/s3/s3.go | 93 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 76 insertions(+), 17 deletions(-) diff --git a/backend/s3/s3.go b/backend/s3/s3.go index e3c623a37..65d99da48 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -833,7 +833,7 @@ var retryErrorCodes = []int{ //S3 is pretty resilient, and the built in retry handling is probably sufficient // as it should notice closed connections and timeouts which are the most likely // sort of failure modes -func shouldRetry(err error) (bool, error) { +func (f *Fs) shouldRetry(err error) (bool, error) { // If this is an awserr object, try and extract more useful information to determine if we should retry if awsError, ok := err.(awserr.Error); ok { // Simple case, check the original embedded error in case it's generically retriable @@ -842,6 +842,15 @@ func shouldRetry(err error) (bool, error) { } // Failing that, if it's a RequestFailure it's probably got an http status code we can check if reqErr, ok := err.(awserr.RequestFailure); ok { + // 301 if wrong region for bucket + if reqErr.StatusCode() == http.StatusMovedPermanently { + urfbErr := f.updateRegionForBucket() + if urfbErr != nil { + fs.Errorf(f, "Failed to update region for bucket: %v", urfbErr) + return false, err + } + return true, err + } for _, e := range retryErrorCodes { if reqErr.StatusCode() == e { return true, err @@ -930,12 +939,17 @@ func s3Connection(opt *Options) (*s3.S3, *session.Session, error) { opt.ForcePathStyle = false } awsConfig := aws.NewConfig(). - WithRegion(opt.Region). WithMaxRetries(maxRetries). WithCredentials(cred). - WithEndpoint(opt.Endpoint). WithHTTPClient(fshttp.NewClient(fs.Config)). WithS3ForcePathStyle(opt.ForcePathStyle) + if opt.Region != "" { + awsConfig.WithRegion(opt.Region) + } + if opt.Endpoint != "" { + awsConfig.WithEndpoint(opt.Endpoint) + } + // awsConfig.WithLogLevel(aws.LogDebugWithSigning) awsSessionOpts := session.Options{ Config: *awsConfig, @@ -1052,7 +1066,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { } err = f.pacer.Call(func() (bool, error) { _, err = f.c.HeadObject(&req) - return shouldRetry(err) + return f.shouldRetry(err) }) if err == nil { f.root = path.Dir(directory) @@ -1102,6 +1116,51 @@ func (f *Fs) NewObject(remote string) (fs.Object, error) { return f.newObjectWithInfo(remote, nil) } +// Gets the bucket location +func (f *Fs) getBucketLocation() (string, error) { + req := s3.GetBucketLocationInput{ + Bucket: &f.bucket, + } + var resp *s3.GetBucketLocationOutput + var err error + err = f.pacer.Call(func() (bool, error) { + resp, err = f.c.GetBucketLocation(&req) + return f.shouldRetry(err) + }) + if err != nil { + return "", err + } + return s3.NormalizeBucketLocation(aws.StringValue(resp.LocationConstraint)), nil +} + +// Updates the region for the bucket by reading the region from the +// bucket then updating the session. +func (f *Fs) updateRegionForBucket() error { + region, err := f.getBucketLocation() + if err != nil { + return errors.Wrap(err, "reading bucket location failed") + } + if aws.StringValue(f.c.Config.Endpoint) != "" { + return errors.Errorf("can't set region to %q as endpoint is set", region) + } + if aws.StringValue(f.c.Config.Region) == region { + return errors.Errorf("region is already %q - not updating", region) + } + + // Make a new session with the new region + oldRegion := f.opt.Region + f.opt.Region = region + c, ses, err := s3Connection(&f.opt) + if err != nil { + return errors.Wrap(err, "creating new session failed") + } + f.c = c + f.ses = ses + + fs.Logf(f, "Switched region to %q from %q", region, oldRegion) + return nil +} + // listFn is called from list to handle an object. type listFn func(remote string, object *s3.Object, isDirectory bool) error @@ -1134,7 +1193,7 @@ func (f *Fs) list(dir string, recurse bool, fn listFn) error { var err error err = f.pacer.Call(func() (bool, error) { resp, err = f.c.ListObjects(&req) - return shouldRetry(err) + return f.shouldRetry(err) }) if err != nil { if awsErr, ok := err.(awserr.RequestFailure); ok { @@ -1263,7 +1322,7 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) { var resp *s3.ListBucketsOutput err = f.pacer.Call(func() (bool, error) { resp, err = f.c.ListBuckets(&req) - return shouldRetry(err) + return f.shouldRetry(err) }) if err != nil { return nil, err @@ -1351,7 +1410,7 @@ func (f *Fs) dirExists() (bool, error) { } err := f.pacer.Call(func() (bool, error) { _, err := f.c.HeadBucket(&req) - return shouldRetry(err) + return f.shouldRetry(err) }) if err == nil { return true, nil @@ -1391,7 +1450,7 @@ func (f *Fs) Mkdir(dir string) error { } err := f.pacer.Call(func() (bool, error) { _, err := f.c.CreateBucket(&req) - return shouldRetry(err) + return f.shouldRetry(err) }) if err, ok := err.(awserr.Error); ok { if err.Code() == "BucketAlreadyOwnedByYou" { @@ -1420,7 +1479,7 @@ func (f *Fs) Rmdir(dir string) error { } err := f.pacer.Call(func() (bool, error) { _, err := f.c.DeleteBucket(&req) - return shouldRetry(err) + return f.shouldRetry(err) }) if err == nil { f.bucketOK = false @@ -1481,7 +1540,7 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { } err = f.pacer.Call(func() (bool, error) { _, err = f.c.CopyObject(&req) - return shouldRetry(err) + return f.shouldRetry(err) }) if err != nil { return nil, err @@ -1563,7 +1622,7 @@ func (o *Object) readMetaData() (err error) { err = o.fs.pacer.Call(func() (bool, error) { var err error resp, err = o.fs.c.HeadObject(&req) - return shouldRetry(err) + return o.fs.shouldRetry(err) }) if err != nil { if awsErr, ok := err.(awserr.RequestFailure); ok { @@ -1659,7 +1718,7 @@ func (o *Object) SetModTime(modTime time.Time) error { } err = o.fs.pacer.Call(func() (bool, error) { _, err := o.fs.c.CopyObject(&req) - return shouldRetry(err) + return o.fs.shouldRetry(err) }) return err } @@ -1691,7 +1750,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { err = o.fs.pacer.Call(func() (bool, error) { var err error resp, err = o.fs.c.GetObject(&req) - return shouldRetry(err) + return o.fs.shouldRetry(err) }) if err, ok := err.(awserr.RequestFailure); ok { if err.Code() == "InvalidObjectState" { @@ -1782,7 +1841,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio } err = o.fs.pacer.CallNoRetry(func() (bool, error) { _, err = uploader.Upload(&req) - return shouldRetry(err) + return o.fs.shouldRetry(err) }) if err != nil { return err @@ -1838,11 +1897,11 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio err = o.fs.pacer.CallNoRetry(func() (bool, error) { resp, err := o.fs.srv.Do(httpReq) if err != nil { - return shouldRetry(err) + return o.fs.shouldRetry(err) } body, err := rest.ReadBody(resp) if err != nil { - return shouldRetry(err) + return o.fs.shouldRetry(err) } if resp.StatusCode >= 200 && resp.StatusCode < 299 { return false, nil @@ -1870,7 +1929,7 @@ func (o *Object) Remove() error { } err := o.fs.pacer.Call(func() (bool, error) { _, err := o.fs.c.DeleteObject(&req) - return shouldRetry(err) + return o.fs.shouldRetry(err) }) return err }