forked from TrueCloudLab/rclone
acd: Retry on 429 errors with backoff and fix upload of 0 length files
This commit is contained in:
parent
a288c2b3a3
commit
14f814b806
1 changed files with 72 additions and 7 deletions
|
@ -10,16 +10,17 @@ FIXME make searching for directory in id and file in id more efficient
|
||||||
FIXME make the default for no files and no dirs be (FILE & FOLDER) so
|
FIXME make the default for no files and no dirs be (FILE & FOLDER) so
|
||||||
we ignore assets completely!
|
we ignore assets completely!
|
||||||
|
|
||||||
FIXME detect 429 errors and return error with fs.RetryErrorf?
|
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ncw/go-acd"
|
"github.com/ncw/go-acd"
|
||||||
|
@ -39,6 +40,8 @@ const (
|
||||||
assetKind = "ASSET"
|
assetKind = "ASSET"
|
||||||
statusAvailable = "AVAILABLE"
|
statusAvailable = "AVAILABLE"
|
||||||
timeFormat = time.RFC3339 // 2014-03-07T22:31:12.173Z
|
timeFormat = time.RFC3339 // 2014-03-07T22:31:12.173Z
|
||||||
|
minBackoff = 1 * time.Second
|
||||||
|
maxBackoff = 256 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// Globals
|
// Globals
|
||||||
|
@ -84,6 +87,9 @@ type FsAcd struct {
|
||||||
c *acd.Client // the connection to the acd server
|
c *acd.Client // the connection to the acd server
|
||||||
root string // the path we are working on
|
root string // the path we are working on
|
||||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||||
|
|
||||||
|
backoffLock sync.Mutex
|
||||||
|
backoff time.Duration // current backoff
|
||||||
}
|
}
|
||||||
|
|
||||||
// FsObjectAcd describes a acd object
|
// FsObjectAcd describes a acd object
|
||||||
|
@ -204,11 +210,46 @@ func (f *FsAcd) NewFsObject(remote string) fs.Object {
|
||||||
return f.newFsObjectWithInfo(remote, nil)
|
return f.newFsObjectWithInfo(remote, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// errChk checks the response and if it is a 429 error returns a
|
||||||
|
// RetryError, otherwise it returns just a plain error
|
||||||
|
//
|
||||||
|
// It also implements the backoff strategy
|
||||||
|
func (f *FsAcd) errChk(resp *http.Response, err error) error {
|
||||||
|
if err != nil && resp != nil && resp.StatusCode == 429 {
|
||||||
|
// Update backoff
|
||||||
|
f.backoffLock.Lock()
|
||||||
|
backoff := f.backoff
|
||||||
|
if backoff == 0 {
|
||||||
|
backoff = minBackoff
|
||||||
|
} else {
|
||||||
|
backoff *= 2
|
||||||
|
if backoff > maxBackoff {
|
||||||
|
backoff = maxBackoff
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f.backoff = backoff
|
||||||
|
f.backoffLock.Unlock()
|
||||||
|
// Sleep for the backoff time
|
||||||
|
sleepTime := time.Duration(rand.Int63n(int64(backoff)))
|
||||||
|
fs.Debug(f, "Retry error: backoff is now %v, sleeping for %v", backoff, sleepTime)
|
||||||
|
time.Sleep(sleepTime)
|
||||||
|
return fs.RetryError(err)
|
||||||
|
}
|
||||||
|
// Reset backoff on success
|
||||||
|
if err == nil {
|
||||||
|
f.backoffLock.Lock()
|
||||||
|
f.backoff = 0
|
||||||
|
f.backoffLock.Unlock()
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// FindLeaf finds a directory of name leaf in the folder with ID pathId
|
// FindLeaf finds a directory of name leaf in the folder with ID pathId
|
||||||
func (f *FsAcd) FindLeaf(pathId, leaf string) (pathIdOut string, found bool, err error) {
|
func (f *FsAcd) FindLeaf(pathId, leaf string) (pathIdOut string, found bool, err error) {
|
||||||
//fs.Debug(f, "FindLeaf(%q, %q)", pathId, leaf)
|
//fs.Debug(f, "FindLeaf(%q, %q)", pathId, leaf)
|
||||||
folder := acd.FolderFromId(pathId, f.c.Nodes)
|
folder := acd.FolderFromId(pathId, f.c.Nodes)
|
||||||
subFolder, _, err := folder.GetFolder(leaf)
|
subFolder, _, err := folder.GetFolder(leaf)
|
||||||
|
// err = f.errChk(resp, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == acd.ErrorNodeNotFound {
|
if err == acd.ErrorNodeNotFound {
|
||||||
//fs.Debug(f, "...Not found")
|
//fs.Debug(f, "...Not found")
|
||||||
|
@ -231,6 +272,7 @@ func (f *FsAcd) CreateDir(pathId, leaf string) (newId string, err error) {
|
||||||
//fs.Debug(f, "CreateDir(%q, %q)", pathId, leaf)
|
//fs.Debug(f, "CreateDir(%q, %q)", pathId, leaf)
|
||||||
folder := acd.FolderFromId(pathId, f.c.Nodes)
|
folder := acd.FolderFromId(pathId, f.c.Nodes)
|
||||||
info, _, err := folder.CreateFolder(leaf)
|
info, _, err := folder.CreateFolder(leaf)
|
||||||
|
// err = f.errChk(resp, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fs.Debug(f, "...Error %v", err)
|
fs.Debug(f, "...Error %v", err)
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -265,9 +307,11 @@ func (f *FsAcd) listAll(dirId string, title string, directoriesOnly bool, filesO
|
||||||
Filters: query,
|
Filters: query,
|
||||||
}
|
}
|
||||||
var nodes []*acd.Node
|
var nodes []*acd.Node
|
||||||
|
//var resp *http.Response
|
||||||
OUTER:
|
OUTER:
|
||||||
for {
|
for {
|
||||||
nodes, _, err = f.c.Nodes.GetNodes(&opts)
|
nodes, _, err = f.c.Nodes.GetNodes(&opts)
|
||||||
|
// err = f.errChk(resp, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fs.Stats.Error()
|
fs.Stats.Error()
|
||||||
fs.ErrorLog(f, "Couldn't list files: %v", err)
|
fs.ErrorLog(f, "Couldn't list files: %v", err)
|
||||||
|
@ -394,7 +438,14 @@ func (f *FsAcd) Put(in io.Reader, remote string, modTime time.Time, size int64)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
folder := acd.FolderFromId(directoryID, o.acd.c.Nodes)
|
folder := acd.FolderFromId(directoryID, o.acd.c.Nodes)
|
||||||
info, _, err := folder.Put(in, leaf)
|
var info *acd.File
|
||||||
|
var resp *http.Response
|
||||||
|
if size != 0 {
|
||||||
|
info, resp, err = folder.Put(in, leaf)
|
||||||
|
} else {
|
||||||
|
info, resp, err = folder.PutSized(in, size, leaf)
|
||||||
|
}
|
||||||
|
err = f.errChk(resp, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -445,7 +496,9 @@ func (f *FsAcd) purgeCheck(check bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
node := acd.NodeFromId(rootID, f.c.Nodes)
|
node := acd.NodeFromId(rootID, f.c.Nodes)
|
||||||
_, err = node.Trash()
|
var resp *http.Response
|
||||||
|
resp, err = node.Trash()
|
||||||
|
err = f.errChk(resp, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -547,6 +600,7 @@ func (o *FsObjectAcd) readMetaData() (err error) {
|
||||||
}
|
}
|
||||||
folder := acd.FolderFromId(directoryID, o.acd.c.Nodes)
|
folder := acd.FolderFromId(directoryID, o.acd.c.Nodes)
|
||||||
info, _, err := folder.GetFile(leaf)
|
info, _, err := folder.GetFile(leaf)
|
||||||
|
// err = o.acd.errChk(resp, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fs.Debug(o, "Failed to read info: %s", err)
|
fs.Debug(o, "Failed to read info: %s", err)
|
||||||
return err
|
return err
|
||||||
|
@ -588,7 +642,9 @@ func (o *FsObjectAcd) Storable() bool {
|
||||||
// Open an object for read
|
// Open an object for read
|
||||||
func (o *FsObjectAcd) Open() (in io.ReadCloser, err error) {
|
func (o *FsObjectAcd) Open() (in io.ReadCloser, err error) {
|
||||||
file := acd.File{Node: o.info}
|
file := acd.File{Node: o.info}
|
||||||
in, _, err = file.Open()
|
var resp *http.Response
|
||||||
|
in, resp, err = file.Open()
|
||||||
|
err = o.acd.errChk(resp, err)
|
||||||
return in, err
|
return in, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -597,7 +653,15 @@ func (o *FsObjectAcd) Open() (in io.ReadCloser, err error) {
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (o *FsObjectAcd) Update(in io.Reader, modTime time.Time, size int64) error {
|
func (o *FsObjectAcd) Update(in io.Reader, modTime time.Time, size int64) error {
|
||||||
file := acd.File{Node: o.info}
|
file := acd.File{Node: o.info}
|
||||||
info, _, err := file.Overwrite(in)
|
var info *acd.File
|
||||||
|
var resp *http.Response
|
||||||
|
var err error
|
||||||
|
if size != 0 {
|
||||||
|
info, resp, err = file.OverwriteSized(in, size)
|
||||||
|
} else {
|
||||||
|
info, resp, err = file.Overwrite(in)
|
||||||
|
}
|
||||||
|
err = o.acd.errChk(resp, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -607,7 +671,8 @@ func (o *FsObjectAcd) Update(in io.Reader, modTime time.Time, size int64) error
|
||||||
|
|
||||||
// Remove an object
|
// Remove an object
|
||||||
func (o *FsObjectAcd) Remove() error {
|
func (o *FsObjectAcd) Remove() error {
|
||||||
_, err := o.info.Trash()
|
resp, err := o.info.Trash()
|
||||||
|
err = o.acd.errChk(resp, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue