forked from TrueCloudLab/rclone
parent
83c3bb2f1a
commit
0a922ad1dc
6 changed files with 70 additions and 41 deletions
|
@ -90,6 +90,7 @@ type Fs struct {
|
||||||
root string // the path we are working on
|
root string // the path we are working on
|
||||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||||
pacer *pacer.Pacer // pacer for API calls
|
pacer *pacer.Pacer // pacer for API calls
|
||||||
|
ts *oauthutil.TokenSource // token source for oauth
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object describes a acd object
|
// Object describes a acd object
|
||||||
|
@ -140,14 +141,19 @@ var retryErrorCodes = []int{
|
||||||
|
|
||||||
// shouldRetry returns a boolean as to whether this resp and err
|
// shouldRetry returns a boolean as to whether this resp and err
|
||||||
// deserve to be retried. It returns the err as a convenience
|
// deserve to be retried. It returns the err as a convenience
|
||||||
func shouldRetry(resp *http.Response, err error) (bool, error) {
|
func (f *Fs) shouldRetry(resp *http.Response, err error) (bool, error) {
|
||||||
|
if resp != nil && resp.StatusCode == 401 {
|
||||||
|
f.ts.Invalidate()
|
||||||
|
fs.Log(f, "401 error received - invalidating token")
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
return fs.ShouldRetry(err) || fs.ShouldRetryHTTP(resp, retryErrorCodes), err
|
return fs.ShouldRetry(err) || fs.ShouldRetryHTTP(resp, retryErrorCodes), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFs constructs an Fs from the path, container:path
|
// NewFs constructs an Fs from the path, container:path
|
||||||
func NewFs(name, root string) (fs.Fs, error) {
|
func NewFs(name, root string) (fs.Fs, error) {
|
||||||
root = parsePath(root)
|
root = parsePath(root)
|
||||||
oAuthClient, err := oauthutil.NewClient(name, acdConfig)
|
oAuthClient, ts, err := oauthutil.NewClient(name, acdConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to configure amazon cloud drive: %v", err)
|
log.Fatalf("Failed to configure amazon cloud drive: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -160,13 +166,14 @@ func NewFs(name, root string) (fs.Fs, error) {
|
||||||
c: c,
|
c: c,
|
||||||
pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.AmazonCloudDrivePacer),
|
pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.AmazonCloudDrivePacer),
|
||||||
noAuthClient: fs.Config.Client(),
|
noAuthClient: fs.Config.Client(),
|
||||||
|
ts: ts,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update endpoints
|
// Update endpoints
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
_, resp, err = f.c.Account.GetEndpoints()
|
_, resp, err = f.c.Account.GetEndpoints()
|
||||||
return shouldRetry(resp, err)
|
return f.shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Failed to get endpoints: %v", err)
|
return nil, fmt.Errorf("Failed to get endpoints: %v", err)
|
||||||
|
@ -176,7 +183,7 @@ func NewFs(name, root string) (fs.Fs, error) {
|
||||||
var rootInfo *acd.Folder
|
var rootInfo *acd.Folder
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
rootInfo, resp, err = f.c.Nodes.GetRoot()
|
rootInfo, resp, err = f.c.Nodes.GetRoot()
|
||||||
return shouldRetry(resp, err)
|
return f.shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
if err != nil || rootInfo.Id == nil {
|
if err != nil || rootInfo.Id == nil {
|
||||||
return nil, fmt.Errorf("Failed to get root: %v", err)
|
return nil, fmt.Errorf("Failed to get root: %v", err)
|
||||||
|
@ -245,7 +252,7 @@ func (f *Fs) FindLeaf(pathID, leaf string) (pathIDOut string, found bool, err er
|
||||||
var subFolder *acd.Folder
|
var subFolder *acd.Folder
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
subFolder, resp, err = folder.GetFolder(leaf)
|
subFolder, resp, err = folder.GetFolder(leaf)
|
||||||
return shouldRetry(resp, err)
|
return f.shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == acd.ErrorNodeNotFound {
|
if err == acd.ErrorNodeNotFound {
|
||||||
|
@ -272,7 +279,7 @@ func (f *Fs) CreateDir(pathID, leaf string) (newID string, err error) {
|
||||||
var info *acd.Folder
|
var info *acd.Folder
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
info, resp, err = folder.CreateFolder(leaf)
|
info, resp, err = folder.CreateFolder(leaf)
|
||||||
return shouldRetry(resp, err)
|
return f.shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//fmt.Printf("...Error %v\n", err)
|
//fmt.Printf("...Error %v\n", err)
|
||||||
|
@ -314,7 +321,7 @@ func (f *Fs) listAll(dirID string, title string, directoriesOnly bool, filesOnly
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
err = f.pacer.CallNoRetry(func() (bool, error) {
|
err = f.pacer.CallNoRetry(func() (bool, error) {
|
||||||
nodes, resp, err = f.c.Nodes.GetNodes(&opts)
|
nodes, resp, err = f.c.Nodes.GetNodes(&opts)
|
||||||
return shouldRetry(resp, err)
|
return f.shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
@ -424,7 +431,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
||||||
} else {
|
} else {
|
||||||
info, resp, err = folder.PutSized(in, size, leaf)
|
info, resp, err = folder.PutSized(in, size, leaf)
|
||||||
}
|
}
|
||||||
return shouldRetry(resp, err)
|
return f.shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -479,7 +486,7 @@ func (f *Fs) purgeCheck(check bool) error {
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
resp, err = node.Trash()
|
resp, err = node.Trash()
|
||||||
return shouldRetry(resp, err)
|
return f.shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -593,7 +600,7 @@ func (o *Object) readMetaData() (err error) {
|
||||||
var info *acd.File
|
var info *acd.File
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
info, resp, err = folder.GetFile(leaf)
|
info, resp, err = folder.GetFile(leaf)
|
||||||
return shouldRetry(resp, err)
|
return o.fs.shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fs.Debug(o, "Failed to read info: %s", err)
|
fs.Debug(o, "Failed to read info: %s", err)
|
||||||
|
@ -647,7 +654,7 @@ func (o *Object) Open() (in io.ReadCloser, err error) {
|
||||||
} else {
|
} else {
|
||||||
in, resp, err = file.OpenTempURL(o.fs.noAuthClient)
|
in, resp, err = file.OpenTempURL(o.fs.noAuthClient)
|
||||||
}
|
}
|
||||||
return shouldRetry(resp, err)
|
return o.fs.shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
return in, err
|
return in, err
|
||||||
}
|
}
|
||||||
|
@ -667,7 +674,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
||||||
} else {
|
} else {
|
||||||
info, resp, err = file.Overwrite(in)
|
info, resp, err = file.Overwrite(in)
|
||||||
}
|
}
|
||||||
return shouldRetry(resp, err)
|
return o.fs.shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -682,7 +689,7 @@ func (o *Object) Remove() error {
|
||||||
var err error
|
var err error
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
resp, err = o.info.Trash()
|
resp, err = o.info.Trash()
|
||||||
return shouldRetry(resp, err)
|
return o.fs.shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -278,7 +278,7 @@ func NewFs(name, path string) (fs.Fs, error) {
|
||||||
return nil, fmt.Errorf("drive: chunk size can't be less than 256k - was %v", chunkSize)
|
return nil, fmt.Errorf("drive: chunk size can't be less than 256k - was %v", chunkSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
oAuthClient, err := oauthutil.NewClient(name, driveConfig)
|
oAuthClient, _, err := oauthutil.NewClient(name, driveConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to configure drive: %v", err)
|
log.Fatalf("Failed to configure drive: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -215,7 +215,7 @@ func NewFs(name, root string) (fs.Fs, error) {
|
||||||
log.Fatalf("Failed configuring Google Cloud Storage Service Account: %v", err)
|
log.Fatalf("Failed configuring Google Cloud Storage Service Account: %v", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
oAuthClient, err = oauthutil.NewClient(name, storageConfig)
|
oAuthClient, _, err = oauthutil.NewClient(name, storageConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to configure Google Cloud Storage: %v", err)
|
log.Fatalf("Failed to configure Google Cloud Storage: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,7 +142,7 @@ func (f *Fs) getCredentials() (err error) {
|
||||||
|
|
||||||
// NewFs constructs an Fs from the path, container:path
|
// NewFs constructs an Fs from the path, container:path
|
||||||
func NewFs(name, root string) (fs.Fs, error) {
|
func NewFs(name, root string) (fs.Fs, error) {
|
||||||
client, err := oauthutil.NewClient(name, oauthConfig)
|
client, _, err := oauthutil.NewClient(name, oauthConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Failed to configure Hubic: %v", err)
|
return nil, fmt.Errorf("Failed to configure Hubic: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ncw/rclone/fs"
|
"github.com/ncw/rclone/fs"
|
||||||
|
@ -104,11 +105,14 @@ func putToken(name string, token *oauth2.Token) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// tokenSource stores updated tokens in the config file
|
// TokenSource stores updated tokens in the config file
|
||||||
type tokenSource struct {
|
type TokenSource struct {
|
||||||
Name string
|
mu sync.Mutex
|
||||||
TokenSource oauth2.TokenSource
|
name string
|
||||||
OldToken oauth2.Token
|
tokenSource oauth2.TokenSource
|
||||||
|
token *oauth2.Token
|
||||||
|
config *oauth2.Config
|
||||||
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
// Token returns a token or an error.
|
// Token returns a token or an error.
|
||||||
|
@ -116,13 +120,23 @@ type tokenSource struct {
|
||||||
// The returned Token must not be modified.
|
// The returned Token must not be modified.
|
||||||
//
|
//
|
||||||
// This saves the token in the config file if it has changed
|
// This saves the token in the config file if it has changed
|
||||||
func (ts *tokenSource) Token() (*oauth2.Token, error) {
|
func (ts *TokenSource) Token() (*oauth2.Token, error) {
|
||||||
token, err := ts.TokenSource.Token()
|
ts.mu.Lock()
|
||||||
|
defer ts.mu.Unlock()
|
||||||
|
|
||||||
|
// Make a new token source if required
|
||||||
|
if ts.tokenSource == nil {
|
||||||
|
ts.tokenSource = ts.config.TokenSource(ts.ctx, ts.token)
|
||||||
|
}
|
||||||
|
|
||||||
|
token, err := ts.tokenSource.Token()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if *token != ts.OldToken {
|
changed := *token != *ts.token
|
||||||
err = putToken(ts.Name, token)
|
ts.token = token
|
||||||
|
if changed {
|
||||||
|
err = putToken(ts.name, token)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -130,8 +144,15 @@ func (ts *tokenSource) Token() (*oauth2.Token, error) {
|
||||||
return token, nil
|
return token, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Invalidate invalidates the token
|
||||||
|
func (ts *TokenSource) Invalidate() {
|
||||||
|
ts.mu.Lock()
|
||||||
|
ts.token.AccessToken = ""
|
||||||
|
ts.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// Check interface satisfied
|
// Check interface satisfied
|
||||||
var _ oauth2.TokenSource = (*tokenSource)(nil)
|
var _ oauth2.TokenSource = (*TokenSource)(nil)
|
||||||
|
|
||||||
// Context returns a context with our HTTP Client baked in for oauth2
|
// Context returns a context with our HTTP Client baked in for oauth2
|
||||||
func Context() context.Context {
|
func Context() context.Context {
|
||||||
|
@ -157,12 +178,12 @@ func overrideCredentials(name string, config *oauth2.Config) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient gets a token from the config file and configures a Client
|
// NewClient gets a token from the config file and configures a Client
|
||||||
// with it
|
// with it. It returns the client and a TokenSource which Invalidate may need to be called on
|
||||||
func NewClient(name string, config *oauth2.Config) (*http.Client, error) {
|
func NewClient(name string, config *oauth2.Config) (*http.Client, *TokenSource, error) {
|
||||||
overrideCredentials(name, config)
|
overrideCredentials(name, config)
|
||||||
token, err := getToken(name)
|
token, err := getToken(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set our own http client in the context
|
// Set our own http client in the context
|
||||||
|
@ -170,12 +191,13 @@ func NewClient(name string, config *oauth2.Config) (*http.Client, error) {
|
||||||
|
|
||||||
// Wrap the TokenSource in our TokenSource which saves changed
|
// Wrap the TokenSource in our TokenSource which saves changed
|
||||||
// tokens in the config file
|
// tokens in the config file
|
||||||
ts := &tokenSource{
|
ts := &TokenSource{
|
||||||
Name: name,
|
name: name,
|
||||||
OldToken: *token,
|
token: token,
|
||||||
TokenSource: config.TokenSource(ctx, token),
|
config: config,
|
||||||
|
ctx: ctx,
|
||||||
}
|
}
|
||||||
return oauth2.NewClient(ctx, ts), nil
|
return oauth2.NewClient(ctx, ts), ts, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -170,7 +170,7 @@ func errorHandler(resp *http.Response) error {
|
||||||
// NewFs constructs an Fs from the path, container:path
|
// NewFs constructs an Fs from the path, container:path
|
||||||
func NewFs(name, root string) (fs.Fs, error) {
|
func NewFs(name, root string) (fs.Fs, error) {
|
||||||
root = parsePath(root)
|
root = parsePath(root)
|
||||||
oAuthClient, err := oauthutil.NewClient(name, oauthConfig)
|
oAuthClient, _, err := oauthutil.NewClient(name, oauthConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to configure One Drive: %v", err)
|
log.Fatalf("Failed to configure One Drive: %v", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue