diff --git a/amazonclouddrive/amazonclouddrive.go b/amazonclouddrive/amazonclouddrive.go index ecfc1331c..0d012c34e 100644 --- a/amazonclouddrive/amazonclouddrive.go +++ b/amazonclouddrive/amazonclouddrive.go @@ -19,6 +19,7 @@ import ( "net/http" "regexp" "strings" + "sync/atomic" "time" "github.com/ncw/go-acd" @@ -93,6 +94,7 @@ type Fs struct { dirCache *dircache.DirCache // Map of directory path to directory id pacer *pacer.Pacer // pacer for API calls ts *oauthutil.TokenSource // token source for oauth + uploads int32 // number of uploads in progress - atomic access required } // Object describes a acd object @@ -191,15 +193,14 @@ func NewFs(name, root string) (fs.Fs, error) { } // Get rootID - var rootInfo *acd.Folder - err = f.pacer.Call(func() (bool, error) { - rootInfo, resp, err = f.c.Nodes.GetRoot() - return f.shouldRetry(resp, err) - }) + rootInfo, err := f.getRootInfo() if err != nil || rootInfo.Id == nil { return nil, errors.Wrap(err, "failed to get root") } + // Renew the token in the background + go f.renewToken() + f.dirCache = dircache.New(root, *rootInfo.Id, f) // Find the current root @@ -230,6 +231,49 @@ func NewFs(name, root string) (fs.Fs, error) { return f, nil } +// getRootInfo gets the root folder info +func (f *Fs) getRootInfo() (rootInfo *acd.Folder, err error) { + var resp *http.Response + err = f.pacer.Call(func() (bool, error) { + rootInfo, resp, err = f.c.Nodes.GetRoot() + return f.shouldRetry(resp, err) + }) + return rootInfo, err +} + +// Renew the token - runs in the background +// +// Renews the token whenever it expires. Useful when there are lots +// of uploads in progress and the token doesn't get renewed. Amazon +// seem to cancel your uploads if you don't renew your token for 2hrs. +func (f *Fs) renewToken() { + expiry := f.ts.OnExpiry() + for { + <-expiry + uploads := atomic.LoadInt32(&f.uploads) + if uploads != 0 { + fs.Debug(f, "Token expired - %d uploads in progress - refreshing", uploads) + // Do a transaction + _, err := f.getRootInfo() + if err == nil { + fs.Debug(f, "Token refresh successful") + } else { + fs.ErrorLog(f, "Token refresh failed: %v", err) + } + } else { + fs.Debug(f, "Token expired but no uploads in progress - doing nothing") + } + } +} + +func (f *Fs) startUpload() { + atomic.AddInt32(&f.uploads, 1) +} + +func (f *Fs) stopUpload() { + atomic.AddInt32(&f.uploads, -1) +} + // Return an Object from a path // // If it can't be found it returns the error fs.ErrorObjectNotFound. @@ -510,11 +554,13 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { var info *acd.File var resp *http.Response err = f.pacer.CallNoRetry(func() (bool, error) { + f.startUpload() if src.Size() != 0 { info, resp, err = folder.Put(in, leaf) } else { info, resp, err = folder.PutSized(in, size, leaf) } + f.stopUpload() var ok bool ok, info, err = f.checkUpload(in, src, info, err) if ok { @@ -765,11 +811,13 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { var resp *http.Response var err error err = o.fs.pacer.CallNoRetry(func() (bool, error) { + o.fs.startUpload() if size != 0 { info, resp, err = file.OverwriteSized(in, size) } else { info, resp, err = file.Overwrite(in) } + o.fs.stopUpload() var ok bool ok, info, err = o.fs.checkUpload(in, src, info, err) if ok {