forked from TrueCloudLab/rclone
acd: Fix token expiry during large uploads
When rclone is busy doing lots of very long uploads it doesn't refresh the token. Amazon will fail uploads if they finish when the token is more than 1 Hour past expiry. Fix this by keeping track of the number of uploads and refreshing the token when the token expires if there is an upload in progress.
This commit is contained in:
parent
23d8ba41d5
commit
2ebeed6753
1 changed files with 53 additions and 5 deletions
|
@ -19,6 +19,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ncw/go-acd"
|
"github.com/ncw/go-acd"
|
||||||
|
@ -93,6 +94,7 @@ type Fs struct {
|
||||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||||
pacer *pacer.Pacer // pacer for API calls
|
pacer *pacer.Pacer // pacer for API calls
|
||||||
ts *oauthutil.TokenSource // token source for oauth
|
ts *oauthutil.TokenSource // token source for oauth
|
||||||
|
uploads int32 // number of uploads in progress - atomic access required
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object describes a acd object
|
// Object describes a acd object
|
||||||
|
@ -191,15 +193,14 @@ func NewFs(name, root string) (fs.Fs, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get rootID
|
// Get rootID
|
||||||
var rootInfo *acd.Folder
|
rootInfo, err := f.getRootInfo()
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
|
||||||
rootInfo, resp, err = f.c.Nodes.GetRoot()
|
|
||||||
return f.shouldRetry(resp, err)
|
|
||||||
})
|
|
||||||
if err != nil || rootInfo.Id == nil {
|
if err != nil || rootInfo.Id == nil {
|
||||||
return nil, errors.Wrap(err, "failed to get root")
|
return nil, errors.Wrap(err, "failed to get root")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Renew the token in the background
|
||||||
|
go f.renewToken()
|
||||||
|
|
||||||
f.dirCache = dircache.New(root, *rootInfo.Id, f)
|
f.dirCache = dircache.New(root, *rootInfo.Id, f)
|
||||||
|
|
||||||
// Find the current root
|
// Find the current root
|
||||||
|
@ -230,6 +231,49 @@ func NewFs(name, root string) (fs.Fs, error) {
|
||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getRootInfo gets the root folder info
|
||||||
|
func (f *Fs) getRootInfo() (rootInfo *acd.Folder, err error) {
|
||||||
|
var resp *http.Response
|
||||||
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
|
rootInfo, resp, err = f.c.Nodes.GetRoot()
|
||||||
|
return f.shouldRetry(resp, err)
|
||||||
|
})
|
||||||
|
return rootInfo, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Renew the token - runs in the background
|
||||||
|
//
|
||||||
|
// Renews the token whenever it expires. Useful when there are lots
|
||||||
|
// of uploads in progress and the token doesn't get renewed. Amazon
|
||||||
|
// seem to cancel your uploads if you don't renew your token for 2hrs.
|
||||||
|
func (f *Fs) renewToken() {
|
||||||
|
expiry := f.ts.OnExpiry()
|
||||||
|
for {
|
||||||
|
<-expiry
|
||||||
|
uploads := atomic.LoadInt32(&f.uploads)
|
||||||
|
if uploads != 0 {
|
||||||
|
fs.Debug(f, "Token expired - %d uploads in progress - refreshing", uploads)
|
||||||
|
// Do a transaction
|
||||||
|
_, err := f.getRootInfo()
|
||||||
|
if err == nil {
|
||||||
|
fs.Debug(f, "Token refresh successful")
|
||||||
|
} else {
|
||||||
|
fs.ErrorLog(f, "Token refresh failed: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fs.Debug(f, "Token expired but no uploads in progress - doing nothing")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fs) startUpload() {
|
||||||
|
atomic.AddInt32(&f.uploads, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fs) stopUpload() {
|
||||||
|
atomic.AddInt32(&f.uploads, -1)
|
||||||
|
}
|
||||||
|
|
||||||
// Return an Object from a path
|
// Return an Object from a path
|
||||||
//
|
//
|
||||||
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
||||||
|
@ -510,11 +554,13 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
||||||
var info *acd.File
|
var info *acd.File
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
err = f.pacer.CallNoRetry(func() (bool, error) {
|
err = f.pacer.CallNoRetry(func() (bool, error) {
|
||||||
|
f.startUpload()
|
||||||
if src.Size() != 0 {
|
if src.Size() != 0 {
|
||||||
info, resp, err = folder.Put(in, leaf)
|
info, resp, err = folder.Put(in, leaf)
|
||||||
} else {
|
} else {
|
||||||
info, resp, err = folder.PutSized(in, size, leaf)
|
info, resp, err = folder.PutSized(in, size, leaf)
|
||||||
}
|
}
|
||||||
|
f.stopUpload()
|
||||||
var ok bool
|
var ok bool
|
||||||
ok, info, err = f.checkUpload(in, src, info, err)
|
ok, info, err = f.checkUpload(in, src, info, err)
|
||||||
if ok {
|
if ok {
|
||||||
|
@ -765,11 +811,13 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
var err error
|
var err error
|
||||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
|
o.fs.startUpload()
|
||||||
if size != 0 {
|
if size != 0 {
|
||||||
info, resp, err = file.OverwriteSized(in, size)
|
info, resp, err = file.OverwriteSized(in, size)
|
||||||
} else {
|
} else {
|
||||||
info, resp, err = file.Overwrite(in)
|
info, resp, err = file.Overwrite(in)
|
||||||
}
|
}
|
||||||
|
o.fs.stopUpload()
|
||||||
var ok bool
|
var ok bool
|
||||||
ok, info, err = o.fs.checkUpload(in, src, info, err)
|
ok, info, err = o.fs.checkUpload(in, src, info, err)
|
||||||
if ok {
|
if ok {
|
||||||
|
|
Loading…
Reference in a new issue