From 9ee9fe38857d5f5cb62a7f2a2f507d44266dacfc Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sun, 17 Mar 2019 11:38:25 +0000 Subject: [PATCH] swift: obey Retry-After to fix OVH restore from cold storage In as many methods as possible we attempt to obey the Retry-After header where it is provided. This means that when objects are being requested from OVH cold storage rclone will sleep the correct amount of time before retrying. If the sleeps are short it does them immediately, if long then it returns an ErrorRetryAfter which will cause the outer retry to sleep before retrying. Fixes #3041 --- backend/swift/swift.go | 64 +++++++++++++++++++++------- backend/swift/swift_internal_test.go | 43 ++++++++++++++++++- 2 files changed, 90 insertions(+), 17 deletions(-) diff --git a/backend/swift/swift.go b/backend/swift/swift.go index 1708db02b..d9de011a3 100644 --- a/backend/swift/swift.go +++ b/backend/swift/swift.go @@ -286,6 +286,31 @@ func shouldRetry(err error) (bool, error) { return fserrors.ShouldRetry(err), err } +// shouldRetryHeaders returns a boolean as to whether this err +// deserves to be retried. It reads the headers passed in looking for +// `Retry-After`. It returns the err as a convenience +func shouldRetryHeaders(headers swift.Headers, err error) (bool, error) { + if swiftError, ok := err.(*swift.Error); ok && swiftError.StatusCode == 429 { + if value := headers["Retry-After"]; value != "" { + retryAfter, parseErr := strconv.Atoi(value) + if parseErr != nil { + fs.Errorf(nil, "Failed to parse Retry-After: %q: %v", value, parseErr) + } else { + duration := time.Second * time.Duration(retryAfter) + if duration <= 60*time.Second { + // Do a short sleep immediately + fs.Debugf(nil, "Sleeping for %v to obey Retry-After", duration) + time.Sleep(duration) + return true, err + } + // Delay a long sleep for a retry + return false, fserrors.NewErrorRetryAfter(duration) + } + } + } + return shouldRetry(err) +} + // Pattern to match a swift path var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`) @@ -413,8 +438,9 @@ func NewFsWithConnection(opt *Options, name, root string, c *swift.Connection, n // Check to see if the object exists - ignoring directory markers var info swift.Object err = f.pacer.Call(func() (bool, error) { - info, _, err = f.c.Object(container, directory) - return shouldRetry(err) + var rxHeaders swift.Headers + info, rxHeaders, err = f.c.Object(container, directory) + return shouldRetryHeaders(rxHeaders, err) }) if err == nil && info.ContentType != directoryMarkerContentType { f.root = path.Dir(directory) @@ -723,8 +749,9 @@ func (f *Fs) Mkdir(dir string) error { var err error = swift.ContainerNotFound if !f.noCheckContainer { err = f.pacer.Call(func() (bool, error) { - _, _, err = f.c.Container(f.container) - return shouldRetry(err) + var rxHeaders swift.Headers + _, rxHeaders, err = f.c.Container(f.container) + return shouldRetryHeaders(rxHeaders, err) }) } if err == swift.ContainerNotFound { @@ -816,8 +843,9 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { } srcFs := srcObj.fs err = f.pacer.Call(func() (bool, error) { - _, err = f.c.ObjectCopy(srcFs.container, srcFs.root+srcObj.remote, f.container, f.root+remote, nil) - return shouldRetry(err) + var rxHeaders swift.Headers + rxHeaders, err = f.c.ObjectCopy(srcFs.container, srcFs.root+srcObj.remote, f.container, f.root+remote, nil) + return shouldRetryHeaders(rxHeaders, err) }) if err != nil { return nil, err @@ -927,7 +955,7 @@ func (o *Object) readMetaData() (err error) { var h swift.Headers err = o.fs.pacer.Call(func() (bool, error) { info, h, err = o.fs.c.Object(o.fs.container, o.fs.root+o.remote) - return shouldRetry(err) + return shouldRetryHeaders(h, err) }) if err != nil { if err == swift.ObjectNotFound { @@ -1002,8 +1030,9 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { headers := fs.OpenOptionHeaders(options) _, isRanging := headers["Range"] err = o.fs.pacer.Call(func() (bool, error) { - in, _, err = o.fs.c.ObjectOpen(o.fs.container, o.fs.root+o.remote, !isRanging, headers) - return shouldRetry(err) + var rxHeaders swift.Headers + in, rxHeaders, err = o.fs.c.ObjectOpen(o.fs.container, o.fs.root+o.remote, !isRanging, headers) + return shouldRetryHeaders(rxHeaders, err) }) return } @@ -1074,8 +1103,9 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, // Create the segmentsContainer if it doesn't exist var err error err = o.fs.pacer.Call(func() (bool, error) { - _, _, err = o.fs.c.Container(o.fs.segmentsContainer) - return shouldRetry(err) + var rxHeaders swift.Headers + _, rxHeaders, err = o.fs.c.Container(o.fs.segmentsContainer) + return shouldRetryHeaders(rxHeaders, err) }) if err == swift.ContainerNotFound { headers := swift.Headers{} @@ -1115,8 +1145,9 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i) fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, o.fs.segmentsContainer) err = o.fs.pacer.CallNoRetry(func() (bool, error) { - _, err = o.fs.c.ObjectPut(o.fs.segmentsContainer, segmentPath, segmentReader, true, "", "", headers) - return shouldRetry(err) + var rxHeaders swift.Headers + rxHeaders, err = o.fs.c.ObjectPut(o.fs.segmentsContainer, segmentPath, segmentReader, true, "", "", headers) + return shouldRetryHeaders(rxHeaders, err) }) if err != nil { return "", err @@ -1129,8 +1160,9 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, emptyReader := bytes.NewReader(nil) manifestName := o.fs.root + o.remote err = o.fs.pacer.Call(func() (bool, error) { - _, err = o.fs.c.ObjectPut(o.fs.container, manifestName, emptyReader, true, "", contentType, headers) - return shouldRetry(err) + var rxHeaders swift.Headers + rxHeaders, err = o.fs.c.ObjectPut(o.fs.container, manifestName, emptyReader, true, "", contentType, headers) + return shouldRetryHeaders(rxHeaders, err) }) return uniquePrefix + "/", err } @@ -1174,7 +1206,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio var rxHeaders swift.Headers err = o.fs.pacer.CallNoRetry(func() (bool, error) { rxHeaders, err = o.fs.c.ObjectPut(o.fs.container, o.fs.root+o.remote, in, true, "", contentType, headers) - return shouldRetry(err) + return shouldRetryHeaders(rxHeaders, err) }) if err != nil { return err diff --git a/backend/swift/swift_internal_test.go b/backend/swift/swift_internal_test.go index de95a4c67..d9d5b4ca5 100644 --- a/backend/swift/swift_internal_test.go +++ b/backend/swift/swift_internal_test.go @@ -1,6 +1,13 @@ package swift -import "testing" +import ( + "testing" + "time" + + "github.com/ncw/rclone/fs/fserrors" + "github.com/ncw/swift" + "github.com/stretchr/testify/assert" +) func TestInternalUrlEncode(t *testing.T) { for _, test := range []struct { @@ -23,3 +30,37 @@ func TestInternalUrlEncode(t *testing.T) { } } } + +func TestInternalShouldRetryHeaders(t *testing.T) { + headers := swift.Headers{ + "Content-Length": "64", + "Content-Type": "text/html; charset=UTF-8", + "Date": "Mon: 18 Mar 2019 12:11:23 GMT", + "Retry-After": "1", + } + err := &swift.Error{ + StatusCode: 429, + Text: "Too Many Requests", + } + + // Short sleep should just do the sleep + start := time.Now() + retry, gotErr := shouldRetryHeaders(headers, err) + dt := time.Since(start) + assert.True(t, retry) + assert.Equal(t, err, gotErr) + assert.True(t, dt > time.Second/2) + + // Long sleep should return RetryError + headers["Retry-After"] = "3600" + start = time.Now() + retry, gotErr = shouldRetryHeaders(headers, err) + dt = time.Since(start) + assert.True(t, dt < time.Second) + assert.False(t, retry) + assert.Equal(t, true, fserrors.IsRetryAfterError(gotErr)) + after := gotErr.(fserrors.RetryAfter).RetryAfter() + dt = after.Sub(start) + assert.True(t, dt >= time.Hour-time.Second && dt <= time.Hour+time.Second) + +}