swift: obey Retry-After to fix OVH restore from cold storage
In as many methods as possible we attempt to obey the Retry-After header where it is provided. This means that when objects are being requested from OVH cold storage rclone will sleep the correct amount of time before retrying. If the sleeps are short it does them immediately, if long then it returns an ErrorRetryAfter which will cause the outer retry to sleep before retrying. Fixes #3041
This commit is contained in:
parent
b0380aad95
commit
9ee9fe3885
2 changed files with 90 additions and 17 deletions
|
@ -286,6 +286,31 @@ func shouldRetry(err error) (bool, error) {
|
||||||
return fserrors.ShouldRetry(err), err
|
return fserrors.ShouldRetry(err), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// shouldRetryHeaders returns a boolean as to whether this err
|
||||||
|
// deserves to be retried. It reads the headers passed in looking for
|
||||||
|
// `Retry-After`. It returns the err as a convenience
|
||||||
|
func shouldRetryHeaders(headers swift.Headers, err error) (bool, error) {
|
||||||
|
if swiftError, ok := err.(*swift.Error); ok && swiftError.StatusCode == 429 {
|
||||||
|
if value := headers["Retry-After"]; value != "" {
|
||||||
|
retryAfter, parseErr := strconv.Atoi(value)
|
||||||
|
if parseErr != nil {
|
||||||
|
fs.Errorf(nil, "Failed to parse Retry-After: %q: %v", value, parseErr)
|
||||||
|
} else {
|
||||||
|
duration := time.Second * time.Duration(retryAfter)
|
||||||
|
if duration <= 60*time.Second {
|
||||||
|
// Do a short sleep immediately
|
||||||
|
fs.Debugf(nil, "Sleeping for %v to obey Retry-After", duration)
|
||||||
|
time.Sleep(duration)
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
// Delay a long sleep for a retry
|
||||||
|
return false, fserrors.NewErrorRetryAfter(duration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return shouldRetry(err)
|
||||||
|
}
|
||||||
|
|
||||||
// Pattern to match a swift path
|
// Pattern to match a swift path
|
||||||
var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`)
|
var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`)
|
||||||
|
|
||||||
|
@ -413,8 +438,9 @@ func NewFsWithConnection(opt *Options, name, root string, c *swift.Connection, n
|
||||||
// Check to see if the object exists - ignoring directory markers
|
// Check to see if the object exists - ignoring directory markers
|
||||||
var info swift.Object
|
var info swift.Object
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
info, _, err = f.c.Object(container, directory)
|
var rxHeaders swift.Headers
|
||||||
return shouldRetry(err)
|
info, rxHeaders, err = f.c.Object(container, directory)
|
||||||
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
if err == nil && info.ContentType != directoryMarkerContentType {
|
if err == nil && info.ContentType != directoryMarkerContentType {
|
||||||
f.root = path.Dir(directory)
|
f.root = path.Dir(directory)
|
||||||
|
@ -723,8 +749,9 @@ func (f *Fs) Mkdir(dir string) error {
|
||||||
var err error = swift.ContainerNotFound
|
var err error = swift.ContainerNotFound
|
||||||
if !f.noCheckContainer {
|
if !f.noCheckContainer {
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
_, _, err = f.c.Container(f.container)
|
var rxHeaders swift.Headers
|
||||||
return shouldRetry(err)
|
_, rxHeaders, err = f.c.Container(f.container)
|
||||||
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if err == swift.ContainerNotFound {
|
if err == swift.ContainerNotFound {
|
||||||
|
@ -816,8 +843,9 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
|
||||||
}
|
}
|
||||||
srcFs := srcObj.fs
|
srcFs := srcObj.fs
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
_, err = f.c.ObjectCopy(srcFs.container, srcFs.root+srcObj.remote, f.container, f.root+remote, nil)
|
var rxHeaders swift.Headers
|
||||||
return shouldRetry(err)
|
rxHeaders, err = f.c.ObjectCopy(srcFs.container, srcFs.root+srcObj.remote, f.container, f.root+remote, nil)
|
||||||
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -927,7 +955,7 @@ func (o *Object) readMetaData() (err error) {
|
||||||
var h swift.Headers
|
var h swift.Headers
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
info, h, err = o.fs.c.Object(o.fs.container, o.fs.root+o.remote)
|
info, h, err = o.fs.c.Object(o.fs.container, o.fs.root+o.remote)
|
||||||
return shouldRetry(err)
|
return shouldRetryHeaders(h, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == swift.ObjectNotFound {
|
if err == swift.ObjectNotFound {
|
||||||
|
@ -1002,8 +1030,9 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
||||||
headers := fs.OpenOptionHeaders(options)
|
headers := fs.OpenOptionHeaders(options)
|
||||||
_, isRanging := headers["Range"]
|
_, isRanging := headers["Range"]
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
in, _, err = o.fs.c.ObjectOpen(o.fs.container, o.fs.root+o.remote, !isRanging, headers)
|
var rxHeaders swift.Headers
|
||||||
return shouldRetry(err)
|
in, rxHeaders, err = o.fs.c.ObjectOpen(o.fs.container, o.fs.root+o.remote, !isRanging, headers)
|
||||||
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -1074,8 +1103,9 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
||||||
// Create the segmentsContainer if it doesn't exist
|
// Create the segmentsContainer if it doesn't exist
|
||||||
var err error
|
var err error
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
_, _, err = o.fs.c.Container(o.fs.segmentsContainer)
|
var rxHeaders swift.Headers
|
||||||
return shouldRetry(err)
|
_, rxHeaders, err = o.fs.c.Container(o.fs.segmentsContainer)
|
||||||
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
if err == swift.ContainerNotFound {
|
if err == swift.ContainerNotFound {
|
||||||
headers := swift.Headers{}
|
headers := swift.Headers{}
|
||||||
|
@ -1115,8 +1145,9 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
||||||
segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i)
|
segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i)
|
||||||
fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, o.fs.segmentsContainer)
|
fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, o.fs.segmentsContainer)
|
||||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
_, err = o.fs.c.ObjectPut(o.fs.segmentsContainer, segmentPath, segmentReader, true, "", "", headers)
|
var rxHeaders swift.Headers
|
||||||
return shouldRetry(err)
|
rxHeaders, err = o.fs.c.ObjectPut(o.fs.segmentsContainer, segmentPath, segmentReader, true, "", "", headers)
|
||||||
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -1129,8 +1160,9 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
||||||
emptyReader := bytes.NewReader(nil)
|
emptyReader := bytes.NewReader(nil)
|
||||||
manifestName := o.fs.root + o.remote
|
manifestName := o.fs.root + o.remote
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
_, err = o.fs.c.ObjectPut(o.fs.container, manifestName, emptyReader, true, "", contentType, headers)
|
var rxHeaders swift.Headers
|
||||||
return shouldRetry(err)
|
rxHeaders, err = o.fs.c.ObjectPut(o.fs.container, manifestName, emptyReader, true, "", contentType, headers)
|
||||||
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
return uniquePrefix + "/", err
|
return uniquePrefix + "/", err
|
||||||
}
|
}
|
||||||
|
@ -1174,7 +1206,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
|
||||||
var rxHeaders swift.Headers
|
var rxHeaders swift.Headers
|
||||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
rxHeaders, err = o.fs.c.ObjectPut(o.fs.container, o.fs.root+o.remote, in, true, "", contentType, headers)
|
rxHeaders, err = o.fs.c.ObjectPut(o.fs.container, o.fs.root+o.remote, in, true, "", contentType, headers)
|
||||||
return shouldRetry(err)
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -1,6 +1,13 @@
|
||||||
package swift
|
package swift
|
||||||
|
|
||||||
import "testing"
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ncw/rclone/fs/fserrors"
|
||||||
|
"github.com/ncw/swift"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
func TestInternalUrlEncode(t *testing.T) {
|
func TestInternalUrlEncode(t *testing.T) {
|
||||||
for _, test := range []struct {
|
for _, test := range []struct {
|
||||||
|
@ -23,3 +30,37 @@ func TestInternalUrlEncode(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInternalShouldRetryHeaders(t *testing.T) {
|
||||||
|
headers := swift.Headers{
|
||||||
|
"Content-Length": "64",
|
||||||
|
"Content-Type": "text/html; charset=UTF-8",
|
||||||
|
"Date": "Mon: 18 Mar 2019 12:11:23 GMT",
|
||||||
|
"Retry-After": "1",
|
||||||
|
}
|
||||||
|
err := &swift.Error{
|
||||||
|
StatusCode: 429,
|
||||||
|
Text: "Too Many Requests",
|
||||||
|
}
|
||||||
|
|
||||||
|
// Short sleep should just do the sleep
|
||||||
|
start := time.Now()
|
||||||
|
retry, gotErr := shouldRetryHeaders(headers, err)
|
||||||
|
dt := time.Since(start)
|
||||||
|
assert.True(t, retry)
|
||||||
|
assert.Equal(t, err, gotErr)
|
||||||
|
assert.True(t, dt > time.Second/2)
|
||||||
|
|
||||||
|
// Long sleep should return RetryError
|
||||||
|
headers["Retry-After"] = "3600"
|
||||||
|
start = time.Now()
|
||||||
|
retry, gotErr = shouldRetryHeaders(headers, err)
|
||||||
|
dt = time.Since(start)
|
||||||
|
assert.True(t, dt < time.Second)
|
||||||
|
assert.False(t, retry)
|
||||||
|
assert.Equal(t, true, fserrors.IsRetryAfterError(gotErr))
|
||||||
|
after := gotErr.(fserrors.RetryAfter).RetryAfter()
|
||||||
|
dt = after.Sub(start)
|
||||||
|
assert.True(t, dt >= time.Hour-time.Second && dt <= time.Hour+time.Second)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue