diff --git a/amazonclouddrive/amazonclouddrive.go b/amazonclouddrive/amazonclouddrive.go index 118a89d7a..f44cb0716 100644 --- a/amazonclouddrive/amazonclouddrive.go +++ b/amazonclouddrive/amazonclouddrive.go @@ -82,12 +82,11 @@ func init() { // FsAcd represents a remote acd server type FsAcd struct { - name string // name of this remote - c *acd.Client // the connection to the acd server - root string // the path we are working on - dirCache *dircache.DirCache // Map of directory path to directory id - pacer *pacer.Pacer // pacer for API calls - connTokens chan struct{} // Connection tokens for directory listings + name string // name of this remote + c *acd.Client // the connection to the acd server + root string // the path we are working on + dirCache *dircache.DirCache // Map of directory path to directory id + pacer *pacer.Pacer // pacer for API calls } // FsObjectAcd describes a acd object @@ -170,16 +169,10 @@ func NewFs(name, root string) (fs.Fs, error) { c := acd.NewClient(oAuthClient) c.UserAgent = fs.UserAgent f := &FsAcd{ - name: name, - root: root, - c: c, - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), - connTokens: make(chan struct{}, fs.Config.Checkers), - } - - // Insert connection tokens. - for i := 0; i < fs.Config.Checkers; i++ { - f.connTokens <- struct{}{} + name: name, + root: root, + c: c, + pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), } // Update endpoints @@ -332,14 +325,10 @@ func (f *FsAcd) listAll(dirId string, title string, directoriesOnly bool, filesO OUTER: for { var resp *http.Response - // Get a token - _ = <-f.connTokens err = f.pacer.Call(func() (bool, error) { nodes, resp, err = f.c.Nodes.GetNodes(&opts) return shouldRetry(resp, err) }) - // Reinsert token - f.connTokens <- struct{}{} if err != nil { fs.Stats.Error() fs.ErrorLog(f, "Couldn't list files: %v", err) diff --git a/pacer/pacer.go b/pacer/pacer.go index c69e9e01f..d51da5dbb 100644 --- a/pacer/pacer.go +++ b/pacer/pacer.go @@ -9,13 +9,15 @@ import ( ) type Pacer struct { - minSleep time.Duration // minimum sleep time - maxSleep time.Duration // maximum sleep time - decayConstant uint // decay constant - pacer chan struct{} // To pace the operations - sleepTime time.Duration // Time to sleep for each transaction - retries int // Max number of retries - mu sync.Mutex // Protecting read/writes + minSleep time.Duration // minimum sleep time + maxSleep time.Duration // maximum sleep time + decayConstant uint // decay constant + pacer chan struct{} // To pace the operations + sleepTime time.Duration // Time to sleep for each transaction + retries int // Max number of retries + mu sync.Mutex // Protecting read/writes + maxConnections int // Maximum number of concurrent connections + connTokens chan struct{} // Connection tokens } // Paced is a function which is called by the Call and CallNoRetry @@ -34,7 +36,7 @@ func New() *Pacer { pacer: make(chan struct{}, 1), } p.sleepTime = p.minSleep - + p.SetMaxConnections(fs.Config.Checkers) // Put the first pacing token in p.pacer <- struct{}{} @@ -59,6 +61,25 @@ func (p *Pacer) SetMaxSleep(t time.Duration) *Pacer { return p } +// SetMaxConnections sets the maximum number of concurrent connections. +// Setting the value to 0 will allow unlimited number of connections. +// Should not be changed once you have started calling the pacer. +// By default this will be set to fs.Config.Checkers. +func (p *Pacer) SetMaxConnections(n int) *Pacer { + p.mu.Lock() + defer p.mu.Unlock() + p.maxConnections = n + if n <= 0 { + p.connTokens = nil + } else { + p.connTokens = make(chan struct{}, n) + for i := 0; i < n; i++ { + p.connTokens <- struct{}{} + } + } + return p +} + // SetDecayConstant sets the decay constant for the pacer // // This is the speed the time falls back to the minimum after errors @@ -91,6 +112,9 @@ func (p *Pacer) beginCall() { // Ticker more accurately, but then we'd have to work out how // not to run it when it wasn't needed <-p.pacer + if p.maxConnections > 0 { + <-p.connTokens + } p.mu.Lock() // Restart the timer @@ -107,6 +131,9 @@ func (p *Pacer) beginCall() { // Refresh the pace given an error that was returned. It returns a // boolean as to whether the operation should be retried. func (p *Pacer) endCall(again bool) { + if p.maxConnections > 0 { + p.connTokens <- struct{}{} + } p.mu.Lock() oldSleepTime := p.sleepTime if again {