forked from TrueCloudLab/rclone
Factor token renewer from amazonclouddrive to oauthutil
This commit is contained in:
parent
2192805360
commit
bd29015022
2 changed files with 73 additions and 41 deletions
|
@ -21,7 +21,6 @@ import (
|
||||||
"path"
|
"path"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ncw/go-acd"
|
"github.com/ncw/go-acd"
|
||||||
|
@ -97,8 +96,8 @@ type Fs struct {
|
||||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||||
pacer *pacer.Pacer // pacer for API calls
|
pacer *pacer.Pacer // pacer for API calls
|
||||||
ts *oauthutil.TokenSource // token source for oauth
|
ts *oauthutil.TokenSource // token source for oauth
|
||||||
uploads int32 // number of uploads in progress - atomic access required
|
|
||||||
trueRootID string // ID of true root directory
|
trueRootID string // ID of true root directory
|
||||||
|
tokenRenewer *oauthutil.Renew // renew the token on expiry
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object describes a acd object
|
// Object describes a acd object
|
||||||
|
@ -188,7 +187,6 @@ func NewFs(name, root string) (fs.Fs, error) {
|
||||||
c: c,
|
c: c,
|
||||||
pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.AmazonCloudDrivePacer),
|
pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.AmazonCloudDrivePacer),
|
||||||
noAuthClient: fs.Config.Client(),
|
noAuthClient: fs.Config.Client(),
|
||||||
ts: ts,
|
|
||||||
}
|
}
|
||||||
f.features = (&fs.Features{CaseInsensitive: true, ReadMimeType: true}).Fill(f)
|
f.features = (&fs.Features{CaseInsensitive: true, ReadMimeType: true}).Fill(f)
|
||||||
|
|
||||||
|
@ -210,7 +208,10 @@ func NewFs(name, root string) (fs.Fs, error) {
|
||||||
f.trueRootID = *rootInfo.Id
|
f.trueRootID = *rootInfo.Id
|
||||||
|
|
||||||
// Renew the token in the background
|
// Renew the token in the background
|
||||||
go f.renewToken()
|
f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error {
|
||||||
|
_, err := f.getRootInfo()
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
f.dirCache = dircache.New(root, f.trueRootID, f)
|
f.dirCache = dircache.New(root, f.trueRootID, f)
|
||||||
|
|
||||||
|
@ -252,39 +253,6 @@ func (f *Fs) getRootInfo() (rootInfo *acd.Folder, err error) {
|
||||||
return rootInfo, err
|
return rootInfo, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Renew the token - runs in the background
|
|
||||||
//
|
|
||||||
// Renews the token whenever it expires. Useful when there are lots
|
|
||||||
// of uploads in progress and the token doesn't get renewed. Amazon
|
|
||||||
// seem to cancel your uploads if you don't renew your token for 2hrs.
|
|
||||||
func (f *Fs) renewToken() {
|
|
||||||
expiry := f.ts.OnExpiry()
|
|
||||||
for {
|
|
||||||
<-expiry
|
|
||||||
uploads := atomic.LoadInt32(&f.uploads)
|
|
||||||
if uploads != 0 {
|
|
||||||
fs.Debug(f, "Token expired - %d uploads in progress - refreshing", uploads)
|
|
||||||
// Do a transaction
|
|
||||||
_, err := f.getRootInfo()
|
|
||||||
if err == nil {
|
|
||||||
fs.Debug(f, "Token refresh successful")
|
|
||||||
} else {
|
|
||||||
fs.ErrorLog(f, "Token refresh failed: %v", err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
fs.Debug(f, "Token expired but no uploads in progress - doing nothing")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *Fs) startUpload() {
|
|
||||||
atomic.AddInt32(&f.uploads, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *Fs) stopUpload() {
|
|
||||||
atomic.AddInt32(&f.uploads, -1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return an Object from a path
|
// Return an Object from a path
|
||||||
//
|
//
|
||||||
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
||||||
|
@ -598,9 +566,9 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
err = f.pacer.CallNoRetry(func() (bool, error) {
|
err = f.pacer.CallNoRetry(func() (bool, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
f.startUpload()
|
f.tokenRenewer.Start()
|
||||||
info, resp, err = folder.Put(in, leaf)
|
info, resp, err = folder.Put(in, leaf)
|
||||||
f.stopUpload()
|
f.tokenRenewer.Stop()
|
||||||
var ok bool
|
var ok bool
|
||||||
ok, info, err = f.checkUpload(resp, in, src, info, err, time.Since(start))
|
ok, info, err = f.checkUpload(resp, in, src, info, err, time.Since(start))
|
||||||
if ok {
|
if ok {
|
||||||
|
@ -996,9 +964,9 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
||||||
var err error
|
var err error
|
||||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
o.fs.startUpload()
|
o.fs.tokenRenewer.Start()
|
||||||
info, resp, err = file.Overwrite(in)
|
info, resp, err = file.Overwrite(in)
|
||||||
o.fs.stopUpload()
|
o.fs.tokenRenewer.Stop()
|
||||||
var ok bool
|
var ok bool
|
||||||
ok, info, err = o.fs.checkUpload(resp, in, src, info, err, time.Since(start))
|
ok, info, err = o.fs.checkUpload(resp, in, src, info, err, time.Since(start))
|
||||||
if ok {
|
if ok {
|
||||||
|
|
64
oauthutil/renew.go
Normal file
64
oauthutil/renew.go
Normal file
|
@ -0,0 +1,64 @@
|
||||||
|
package oauthutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/ncw/rclone/fs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Renew allows tokens to be renewed on expiry if uploads are in progress.
|
||||||
|
type Renew struct {
|
||||||
|
name string // name to use in logs
|
||||||
|
ts *TokenSource // token source that needs renewing
|
||||||
|
uploads int32 // number of uploads in progress - atomic access required
|
||||||
|
run func() error // a transaction to run to renew the token on
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRenew creates a new Renew struct and starts a background process
|
||||||
|
// which renews the token whenever it expires. It uses the run() call
|
||||||
|
// to run a transaction to do this.
|
||||||
|
//
|
||||||
|
// It will only renew the token if the number of uploads > 0
|
||||||
|
func NewRenew(name string, ts *TokenSource, run func() error) *Renew {
|
||||||
|
r := &Renew{
|
||||||
|
name: name,
|
||||||
|
ts: ts,
|
||||||
|
run: run,
|
||||||
|
}
|
||||||
|
go r.renewOnExpiry()
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
// renewOnExpiry renews the token whenever it expires. Useful when there
|
||||||
|
// are lots of uploads in progress and the token doesn't get renewed.
|
||||||
|
// Amazon seem to cancel your uploads if you don't renew your token
|
||||||
|
// for 2hrs.
|
||||||
|
func (r *Renew) renewOnExpiry() {
|
||||||
|
expiry := r.ts.OnExpiry()
|
||||||
|
for {
|
||||||
|
<-expiry
|
||||||
|
uploads := atomic.LoadInt32(&r.uploads)
|
||||||
|
if uploads != 0 {
|
||||||
|
fs.Debug(r.name, "Token expired - %d uploads in progress - refreshing", uploads)
|
||||||
|
// Do a transaction
|
||||||
|
err := r.run()
|
||||||
|
if err == nil {
|
||||||
|
fs.Debug(r.name, "Token refresh successful")
|
||||||
|
} else {
|
||||||
|
fs.ErrorLog(r.name, "Token refresh failed: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fs.Debug(r.name, "Token expired but no uploads in progress - doing nothing")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start should be called before starting an upload
|
||||||
|
func (r *Renew) Start() {
|
||||||
|
atomic.AddInt32(&r.uploads, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop should be called after finishing an upload
|
||||||
|
func (r *Renew) Stop() {
|
||||||
|
atomic.AddInt32(&r.uploads, -1)
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue