From bd29015022ab627a644c26b3b61a3a44f3268210 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sun, 29 Jan 2017 20:35:57 +0000 Subject: [PATCH] Factor token renewer from amazonclouddrive to oauthutil --- amazonclouddrive/amazonclouddrive.go | 50 ++++------------------ oauthutil/renew.go | 64 ++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 41 deletions(-) create mode 100644 oauthutil/renew.go diff --git a/amazonclouddrive/amazonclouddrive.go b/amazonclouddrive/amazonclouddrive.go index 099eab877..273b6326e 100644 --- a/amazonclouddrive/amazonclouddrive.go +++ b/amazonclouddrive/amazonclouddrive.go @@ -21,7 +21,6 @@ import ( "path" "regexp" "strings" - "sync/atomic" "time" "github.com/ncw/go-acd" @@ -97,8 +96,8 @@ type Fs struct { dirCache *dircache.DirCache // Map of directory path to directory id pacer *pacer.Pacer // pacer for API calls ts *oauthutil.TokenSource // token source for oauth - uploads int32 // number of uploads in progress - atomic access required trueRootID string // ID of true root directory + tokenRenewer *oauthutil.Renew // renew the token on expiry } // Object describes a acd object @@ -188,7 +187,6 @@ func NewFs(name, root string) (fs.Fs, error) { c: c, pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.AmazonCloudDrivePacer), noAuthClient: fs.Config.Client(), - ts: ts, } f.features = (&fs.Features{CaseInsensitive: true, ReadMimeType: true}).Fill(f) @@ -210,7 +208,10 @@ func NewFs(name, root string) (fs.Fs, error) { f.trueRootID = *rootInfo.Id // Renew the token in the background - go f.renewToken() + f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error { + _, err := f.getRootInfo() + return err + }) f.dirCache = dircache.New(root, f.trueRootID, f) @@ -252,39 +253,6 @@ func (f *Fs) getRootInfo() (rootInfo *acd.Folder, err error) { return rootInfo, err } -// Renew the token - runs in the background -// -// Renews the token whenever it expires. Useful when there are lots -// of uploads in progress and the token doesn't get renewed. Amazon -// seem to cancel your uploads if you don't renew your token for 2hrs. -func (f *Fs) renewToken() { - expiry := f.ts.OnExpiry() - for { - <-expiry - uploads := atomic.LoadInt32(&f.uploads) - if uploads != 0 { - fs.Debug(f, "Token expired - %d uploads in progress - refreshing", uploads) - // Do a transaction - _, err := f.getRootInfo() - if err == nil { - fs.Debug(f, "Token refresh successful") - } else { - fs.ErrorLog(f, "Token refresh failed: %v", err) - } - } else { - fs.Debug(f, "Token expired but no uploads in progress - doing nothing") - } - } -} - -func (f *Fs) startUpload() { - atomic.AddInt32(&f.uploads, 1) -} - -func (f *Fs) stopUpload() { - atomic.AddInt32(&f.uploads, -1) -} - // Return an Object from a path // // If it can't be found it returns the error fs.ErrorObjectNotFound. @@ -598,9 +566,9 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { var resp *http.Response err = f.pacer.CallNoRetry(func() (bool, error) { start := time.Now() - f.startUpload() + f.tokenRenewer.Start() info, resp, err = folder.Put(in, leaf) - f.stopUpload() + f.tokenRenewer.Stop() var ok bool ok, info, err = f.checkUpload(resp, in, src, info, err, time.Since(start)) if ok { @@ -996,9 +964,9 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { var err error err = o.fs.pacer.CallNoRetry(func() (bool, error) { start := time.Now() - o.fs.startUpload() + o.fs.tokenRenewer.Start() info, resp, err = file.Overwrite(in) - o.fs.stopUpload() + o.fs.tokenRenewer.Stop() var ok bool ok, info, err = o.fs.checkUpload(resp, in, src, info, err, time.Since(start)) if ok { diff --git a/oauthutil/renew.go b/oauthutil/renew.go new file mode 100644 index 000000000..5604d7ff7 --- /dev/null +++ b/oauthutil/renew.go @@ -0,0 +1,64 @@ +package oauthutil + +import ( + "sync/atomic" + + "github.com/ncw/rclone/fs" +) + +// Renew allows tokens to be renewed on expiry if uploads are in progress. +type Renew struct { + name string // name to use in logs + ts *TokenSource // token source that needs renewing + uploads int32 // number of uploads in progress - atomic access required + run func() error // a transaction to run to renew the token on +} + +// NewRenew creates a new Renew struct and starts a background process +// which renews the token whenever it expires. It uses the run() call +// to run a transaction to do this. +// +// It will only renew the token if the number of uploads > 0 +func NewRenew(name string, ts *TokenSource, run func() error) *Renew { + r := &Renew{ + name: name, + ts: ts, + run: run, + } + go r.renewOnExpiry() + return r +} + +// renewOnExpiry renews the token whenever it expires. Useful when there +// are lots of uploads in progress and the token doesn't get renewed. +// Amazon seem to cancel your uploads if you don't renew your token +// for 2hrs. +func (r *Renew) renewOnExpiry() { + expiry := r.ts.OnExpiry() + for { + <-expiry + uploads := atomic.LoadInt32(&r.uploads) + if uploads != 0 { + fs.Debug(r.name, "Token expired - %d uploads in progress - refreshing", uploads) + // Do a transaction + err := r.run() + if err == nil { + fs.Debug(r.name, "Token refresh successful") + } else { + fs.ErrorLog(r.name, "Token refresh failed: %v", err) + } + } else { + fs.Debug(r.name, "Token expired but no uploads in progress - doing nothing") + } + } +} + +// Start should be called before starting an upload +func (r *Renew) Start() { + atomic.AddInt32(&r.uploads, 1) +} + +// Stop should be called after finishing an upload +func (r *Renew) Stop() { + atomic.AddInt32(&r.uploads, -1) +}