forked from TrueCloudLab/rclone
swift: add pacer for retries to make swift more reliable #2740
This commit is contained in:
parent
369a8ee17b
commit
e84790ef79
1 changed files with 116 additions and 19 deletions
|
@ -21,6 +21,7 @@ import (
|
||||||
"github.com/ncw/rclone/fs/hash"
|
"github.com/ncw/rclone/fs/hash"
|
||||||
"github.com/ncw/rclone/fs/operations"
|
"github.com/ncw/rclone/fs/operations"
|
||||||
"github.com/ncw/rclone/fs/walk"
|
"github.com/ncw/rclone/fs/walk"
|
||||||
|
"github.com/ncw/rclone/lib/pacer"
|
||||||
"github.com/ncw/swift"
|
"github.com/ncw/swift"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
@ -30,6 +31,7 @@ const (
|
||||||
directoryMarkerContentType = "application/directory" // content type of directory marker objects
|
directoryMarkerContentType = "application/directory" // content type of directory marker objects
|
||||||
listChunks = 1000 // chunk size to read directory listings
|
listChunks = 1000 // chunk size to read directory listings
|
||||||
defaultChunkSize = 5 * fs.GibiByte
|
defaultChunkSize = 5 * fs.GibiByte
|
||||||
|
minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep.
|
||||||
)
|
)
|
||||||
|
|
||||||
// SharedOptions are shared between swift and hubic
|
// SharedOptions are shared between swift and hubic
|
||||||
|
@ -187,6 +189,7 @@ type Fs struct {
|
||||||
containerOK bool // true if we have created the container
|
containerOK bool // true if we have created the container
|
||||||
segmentsContainer string // container to store the segments (if any) in
|
segmentsContainer string // container to store the segments (if any) in
|
||||||
noCheckContainer bool // don't check the container before creating it
|
noCheckContainer bool // don't check the container before creating it
|
||||||
|
pacer *pacer.Pacer // To pace the API calls
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object describes a swift object
|
// Object describes a swift object
|
||||||
|
@ -227,6 +230,32 @@ func (f *Fs) Features() *fs.Features {
|
||||||
return f.features
|
return f.features
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// retryErrorCodes is a slice of error codes that we will retry
|
||||||
|
var retryErrorCodes = []int{
|
||||||
|
401, // Unauthorized (eg "Token has expired")
|
||||||
|
408, // Request Timeout
|
||||||
|
409, // Conflict - various states that could be resolved on a retry
|
||||||
|
429, // Rate exceeded.
|
||||||
|
500, // Get occasional 500 Internal Server Error
|
||||||
|
503, // Service Unavailable/Slow Down - "Reduce your request rate"
|
||||||
|
504, // Gateway Time-out
|
||||||
|
}
|
||||||
|
|
||||||
|
// shouldRetry returns a boolean as to whether this err deserves to be
|
||||||
|
// retried. It returns the err as a convenience
|
||||||
|
func shouldRetry(err error) (bool, error) {
|
||||||
|
// If this is an swift.Error object extract the HTTP error code
|
||||||
|
if swiftError, ok := err.(*swift.Error); ok {
|
||||||
|
for _, e := range retryErrorCodes {
|
||||||
|
if swiftError.StatusCode == e {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Check for generic failure conditions
|
||||||
|
return fserrors.ShouldRetry(err), err
|
||||||
|
}
|
||||||
|
|
||||||
// Pattern to match a swift path
|
// Pattern to match a swift path
|
||||||
var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`)
|
var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`)
|
||||||
|
|
||||||
|
@ -337,6 +366,7 @@ func NewFsWithConnection(opt *Options, name, root string, c *swift.Connection, n
|
||||||
segmentsContainer: container + "_segments",
|
segmentsContainer: container + "_segments",
|
||||||
root: directory,
|
root: directory,
|
||||||
noCheckContainer: noCheckContainer,
|
noCheckContainer: noCheckContainer,
|
||||||
|
pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.S3Pacer),
|
||||||
}
|
}
|
||||||
f.features = (&fs.Features{
|
f.features = (&fs.Features{
|
||||||
ReadMimeType: true,
|
ReadMimeType: true,
|
||||||
|
@ -346,7 +376,11 @@ func NewFsWithConnection(opt *Options, name, root string, c *swift.Connection, n
|
||||||
if f.root != "" {
|
if f.root != "" {
|
||||||
f.root += "/"
|
f.root += "/"
|
||||||
// Check to see if the object exists - ignoring directory markers
|
// Check to see if the object exists - ignoring directory markers
|
||||||
info, _, err := f.c.Object(container, directory)
|
var info swift.Object
|
||||||
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
|
info, _, err = f.c.Object(container, directory)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
if err == nil && info.ContentType != directoryMarkerContentType {
|
if err == nil && info.ContentType != directoryMarkerContentType {
|
||||||
f.root = path.Dir(directory)
|
f.root = path.Dir(directory)
|
||||||
if f.root == "." {
|
if f.root == "." {
|
||||||
|
@ -436,7 +470,12 @@ func (f *Fs) listContainerRoot(container, root string, dir string, recurse bool,
|
||||||
}
|
}
|
||||||
rootLength := len(root)
|
rootLength := len(root)
|
||||||
return f.c.ObjectsWalk(container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) {
|
return f.c.ObjectsWalk(container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) {
|
||||||
objects, err := f.c.Objects(container, opts)
|
var objects []swift.Object
|
||||||
|
var err error
|
||||||
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
|
objects, err = f.c.Objects(container, opts)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
object := &objects[i]
|
object := &objects[i]
|
||||||
|
@ -525,7 +564,11 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) {
|
||||||
if dir != "" {
|
if dir != "" {
|
||||||
return nil, fs.ErrorListBucketRequired
|
return nil, fs.ErrorListBucketRequired
|
||||||
}
|
}
|
||||||
containers, err := f.c.ContainersAll(nil)
|
var containers []swift.Container
|
||||||
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
|
containers, err = f.c.ContainersAll(nil)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "container listing failed")
|
return nil, errors.Wrap(err, "container listing failed")
|
||||||
}
|
}
|
||||||
|
@ -586,7 +629,12 @@ func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) {
|
||||||
|
|
||||||
// About gets quota information
|
// About gets quota information
|
||||||
func (f *Fs) About() (*fs.Usage, error) {
|
func (f *Fs) About() (*fs.Usage, error) {
|
||||||
containers, err := f.c.ContainersAll(nil)
|
var containers []swift.Container
|
||||||
|
var err error
|
||||||
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
|
containers, err = f.c.ContainersAll(nil)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "container listing failed")
|
return nil, errors.Wrap(err, "container listing failed")
|
||||||
}
|
}
|
||||||
|
@ -636,14 +684,20 @@ func (f *Fs) Mkdir(dir string) error {
|
||||||
// Check to see if container exists first
|
// Check to see if container exists first
|
||||||
var err error = swift.ContainerNotFound
|
var err error = swift.ContainerNotFound
|
||||||
if !f.noCheckContainer {
|
if !f.noCheckContainer {
|
||||||
_, _, err = f.c.Container(f.container)
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
|
_, _, err = f.c.Container(f.container)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
if err == swift.ContainerNotFound {
|
if err == swift.ContainerNotFound {
|
||||||
headers := swift.Headers{}
|
headers := swift.Headers{}
|
||||||
if f.opt.StoragePolicy != "" {
|
if f.opt.StoragePolicy != "" {
|
||||||
headers["X-Storage-Policy"] = f.opt.StoragePolicy
|
headers["X-Storage-Policy"] = f.opt.StoragePolicy
|
||||||
}
|
}
|
||||||
err = f.c.ContainerCreate(f.container, headers)
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
|
err = f.c.ContainerCreate(f.container, headers)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
f.containerOK = true
|
f.containerOK = true
|
||||||
|
@ -660,7 +714,11 @@ func (f *Fs) Rmdir(dir string) error {
|
||||||
if f.root != "" || dir != "" {
|
if f.root != "" || dir != "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
err := f.c.ContainerDelete(f.container)
|
var err error
|
||||||
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
|
err = f.c.ContainerDelete(f.container)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
f.containerOK = false
|
f.containerOK = false
|
||||||
}
|
}
|
||||||
|
@ -719,7 +777,10 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
|
||||||
return nil, fs.ErrorCantCopy
|
return nil, fs.ErrorCantCopy
|
||||||
}
|
}
|
||||||
srcFs := srcObj.fs
|
srcFs := srcObj.fs
|
||||||
_, err = f.c.ObjectCopy(srcFs.container, srcFs.root+srcObj.remote, f.container, f.root+remote, nil)
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
|
_, err = f.c.ObjectCopy(srcFs.container, srcFs.root+srcObj.remote, f.container, f.root+remote, nil)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -809,7 +870,12 @@ func (o *Object) readMetaData() (err error) {
|
||||||
if o.headers != nil {
|
if o.headers != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
info, h, err := o.fs.c.Object(o.fs.container, o.fs.root+o.remote)
|
var info swift.Object
|
||||||
|
var h swift.Headers
|
||||||
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
info, h, err = o.fs.c.Object(o.fs.container, o.fs.root+o.remote)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == swift.ObjectNotFound {
|
if err == swift.ObjectNotFound {
|
||||||
return fs.ErrorObjectNotFound
|
return fs.ErrorObjectNotFound
|
||||||
|
@ -861,7 +927,10 @@ func (o *Object) SetModTime(modTime time.Time) error {
|
||||||
newHeaders[k] = v
|
newHeaders[k] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return o.fs.c.ObjectUpdate(o.fs.container, o.fs.root+o.remote, newHeaders)
|
return o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
err = o.fs.c.ObjectUpdate(o.fs.container, o.fs.root+o.remote, newHeaders)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Storable returns if this object is storable
|
// Storable returns if this object is storable
|
||||||
|
@ -876,7 +945,10 @@ func (o *Object) Storable() bool {
|
||||||
func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
||||||
headers := fs.OpenOptionHeaders(options)
|
headers := fs.OpenOptionHeaders(options)
|
||||||
_, isRanging := headers["Range"]
|
_, isRanging := headers["Range"]
|
||||||
in, _, err = o.fs.c.ObjectOpen(o.fs.container, o.fs.root+o.remote, !isRanging, headers)
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
in, _, err = o.fs.c.ObjectOpen(o.fs.container, o.fs.root+o.remote, !isRanging, headers)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -903,13 +975,20 @@ func (o *Object) removeSegments(except string) error {
|
||||||
}
|
}
|
||||||
segmentPath := segmentsRoot + remote
|
segmentPath := segmentsRoot + remote
|
||||||
fs.Debugf(o, "Removing segment file %q in container %q", segmentPath, o.fs.segmentsContainer)
|
fs.Debugf(o, "Removing segment file %q in container %q", segmentPath, o.fs.segmentsContainer)
|
||||||
return o.fs.c.ObjectDelete(o.fs.segmentsContainer, segmentPath)
|
var err error
|
||||||
|
return o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
err = o.fs.c.ObjectDelete(o.fs.segmentsContainer, segmentPath)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// remove the segments container if empty, ignore errors
|
// remove the segments container if empty, ignore errors
|
||||||
err = o.fs.c.ContainerDelete(o.fs.segmentsContainer)
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
err = o.fs.c.ContainerDelete(o.fs.segmentsContainer)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
fs.Debugf(o, "Removed empty container %q", o.fs.segmentsContainer)
|
fs.Debugf(o, "Removed empty container %q", o.fs.segmentsContainer)
|
||||||
}
|
}
|
||||||
|
@ -938,13 +1017,19 @@ func urlEncode(str string) string {
|
||||||
func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, contentType string) (string, error) {
|
func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, contentType string) (string, error) {
|
||||||
// Create the segmentsContainer if it doesn't exist
|
// Create the segmentsContainer if it doesn't exist
|
||||||
var err error
|
var err error
|
||||||
_, _, err = o.fs.c.Container(o.fs.segmentsContainer)
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
_, _, err = o.fs.c.Container(o.fs.segmentsContainer)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
if err == swift.ContainerNotFound {
|
if err == swift.ContainerNotFound {
|
||||||
headers := swift.Headers{}
|
headers := swift.Headers{}
|
||||||
if o.fs.opt.StoragePolicy != "" {
|
if o.fs.opt.StoragePolicy != "" {
|
||||||
headers["X-Storage-Policy"] = o.fs.opt.StoragePolicy
|
headers["X-Storage-Policy"] = o.fs.opt.StoragePolicy
|
||||||
}
|
}
|
||||||
err = o.fs.c.ContainerCreate(o.fs.segmentsContainer, headers)
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
err = o.fs.c.ContainerCreate(o.fs.segmentsContainer, headers)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -973,7 +1058,10 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
||||||
segmentReader := io.LimitReader(in, n)
|
segmentReader := io.LimitReader(in, n)
|
||||||
segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i)
|
segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i)
|
||||||
fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, o.fs.segmentsContainer)
|
fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, o.fs.segmentsContainer)
|
||||||
_, err := o.fs.c.ObjectPut(o.fs.segmentsContainer, segmentPath, segmentReader, true, "", "", headers)
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
|
_, err = o.fs.c.ObjectPut(o.fs.segmentsContainer, segmentPath, segmentReader, true, "", "", headers)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -984,7 +1072,10 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
||||||
headers["Content-Length"] = "0" // set Content-Length as we know it
|
headers["Content-Length"] = "0" // set Content-Length as we know it
|
||||||
emptyReader := bytes.NewReader(nil)
|
emptyReader := bytes.NewReader(nil)
|
||||||
manifestName := o.fs.root + o.remote
|
manifestName := o.fs.root + o.remote
|
||||||
_, err = o.fs.c.ObjectPut(o.fs.container, manifestName, emptyReader, true, "", contentType, headers)
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
_, err = o.fs.c.ObjectPut(o.fs.container, manifestName, emptyReader, true, "", contentType, headers)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
return uniquePrefix + "/", err
|
return uniquePrefix + "/", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1021,7 +1112,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
headers["Content-Length"] = strconv.FormatInt(size, 10) // set Content-Length as we know it
|
headers["Content-Length"] = strconv.FormatInt(size, 10) // set Content-Length as we know it
|
||||||
_, err := o.fs.c.ObjectPut(o.fs.container, o.fs.root+o.remote, in, true, "", contentType, headers)
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
|
_, err = o.fs.c.ObjectPut(o.fs.container, o.fs.root+o.remote, in, true, "", contentType, headers)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1047,7 +1141,10 @@ func (o *Object) Remove() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Remove file/manifest first
|
// Remove file/manifest first
|
||||||
err = o.fs.c.ObjectDelete(o.fs.container, o.fs.root+o.remote)
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
err = o.fs.c.ObjectDelete(o.fs.container, o.fs.root+o.remote)
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue