// Package pacer makes pacing and retrying API calls easy package pacer import ( "context" "math/rand" "sync" "time" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/fserrors" "golang.org/x/time/rate" ) // Pacer state type Pacer struct { mu sync.Mutex // Protecting read/writes minSleep time.Duration // minimum sleep time maxSleep time.Duration // maximum sleep time burst int // number of calls to send without rate limiting limiter *rate.Limiter // rate limiter for the minsleep decayConstant uint // decay constant attackConstant uint // attack constant pacer chan struct{} // To pace the operations sleepTime time.Duration // Time to sleep for each transaction retries int // Max number of retries maxConnections int // Maximum number of concurrent connections connTokens chan struct{} // Connection tokens calculatePace func(bool) // switchable pacing algorithm - call with mu held consecutiveRetries int // number of consecutive retries } // Type is for selecting different pacing algorithms type Type int const ( // DefaultPacer is a truncated exponential attack and decay. // // On retries the sleep time is doubled, on non errors then // sleeptime decays according to the decay constant as set // with SetDecayConstant. // // The sleep never goes below that set with SetMinSleep or // above that set with SetMaxSleep. DefaultPacer = Type(iota) // AmazonCloudDrivePacer is a specialised pacer for Amazon Drive // // It implements a truncated exponential backoff strategy with // randomization. Normally operations are paced at the // interval set with SetMinSleep. On errors the sleep timer // is set to 0..2**retries seconds. // // See https://developer.amazon.com/public/apis/experience/cloud-drive/content/restful-api-best-practices AmazonCloudDrivePacer // GoogleDrivePacer is a specialised pacer for Google Drive // // It implements a truncated exponential backoff strategy with // randomization. Normally operations are paced at the // interval set with SetMinSleep. On errors the sleep timer // is set to (2 ^ n) + random_number_milliseconds seconds // // See https://developers.google.com/drive/v2/web/handle-errors#exponential-backoff GoogleDrivePacer // S3Pacer is a specialised pacer for S3 // // It is basically the defaultPacer, but allows the sleep time to go to 0 // when things are going well. S3Pacer ) // Paced is a function which is called by the Call and CallNoRetry // methods. It should return a boolean, true if it would like to be // retried, and an error. This error may be returned or returned // wrapped in a RetryError. type Paced func() (bool, error) // New returns a Pacer with sensible defaults func New() *Pacer { p := &Pacer{ maxSleep: 2 * time.Second, decayConstant: 2, attackConstant: 1, retries: fs.Config.LowLevelRetries, pacer: make(chan struct{}, 1), } p.sleepTime = p.minSleep p.SetPacer(DefaultPacer) p.SetMaxConnections(fs.Config.Checkers + fs.Config.Transfers) p.SetMinSleep(10 * time.Millisecond) // Put the first pacing token in p.pacer <- struct{}{} return p } // SetSleep sets the current sleep time func (p *Pacer) SetSleep(t time.Duration) *Pacer { p.mu.Lock() defer p.mu.Unlock() p.sleepTime = t return p } // GetSleep gets the current sleep time func (p *Pacer) GetSleep() time.Duration { p.mu.Lock() defer p.mu.Unlock() return p.sleepTime } // SetMinSleep sets the minimum sleep time for the pacer func (p *Pacer) SetMinSleep(t time.Duration) *Pacer { p.mu.Lock() defer p.mu.Unlock() p.minSleep = t p.sleepTime = p.minSleep p.limiter = rate.NewLimiter(rate.Every(p.minSleep), p.burst) return p } // SetBurst sets the burst with no limiting of the pacer func (p *Pacer) SetBurst(n int) *Pacer { p.mu.Lock() defer p.mu.Unlock() p.burst = n p.limiter = rate.NewLimiter(rate.Every(p.minSleep), p.burst) return p } // SetMaxSleep sets the maximum sleep time for the pacer func (p *Pacer) SetMaxSleep(t time.Duration) *Pacer { p.mu.Lock() defer p.mu.Unlock() p.maxSleep = t p.sleepTime = p.minSleep return p } // SetMaxConnections sets the maximum number of concurrent connections. // Setting the value to 0 will allow unlimited number of connections. // Should not be changed once you have started calling the pacer. // By default this will be set to fs.Config.Checkers. func (p *Pacer) SetMaxConnections(n int) *Pacer { p.mu.Lock() defer p.mu.Unlock() p.maxConnections = n if n <= 0 { p.connTokens = nil } else { p.connTokens = make(chan struct{}, n) for i := 0; i < n; i++ { p.connTokens <- struct{}{} } } return p } // SetDecayConstant sets the decay constant for the pacer // // This is the speed the time falls back to the minimum after errors // have occurred. // // bigger for slower decay, exponential. 1 is halve, 0 is go straight to minimum func (p *Pacer) SetDecayConstant(decay uint) *Pacer { p.mu.Lock() defer p.mu.Unlock() p.decayConstant = decay return p } // SetAttackConstant sets the attack constant for the pacer // // This is the speed the time grows from the minimum after errors have // occurred. // // bigger for slower attack, 1 is double, 0 is go straight to maximum func (p *Pacer) SetAttackConstant(attack uint) *Pacer { p.mu.Lock() defer p.mu.Unlock() p.attackConstant = attack return p } // SetRetries sets the max number of tries for Call func (p *Pacer) SetRetries(retries int) *Pacer { p.mu.Lock() defer p.mu.Unlock() p.retries = retries return p } // SetPacer sets the pacing algorithm // // It will choose the default algorithm if an incorrect value is // passed in. func (p *Pacer) SetPacer(t Type) *Pacer { p.mu.Lock() defer p.mu.Unlock() switch t { case AmazonCloudDrivePacer: p.calculatePace = p.acdPacer case GoogleDrivePacer: p.calculatePace = p.drivePacer case S3Pacer: p.calculatePace = p.s3Pacer default: p.calculatePace = p.defaultPacer } return p } // Start a call to the API // // This must be called as a pair with endCall // // This waits for the pacer token func (p *Pacer) beginCall() { // pacer starts with a token in and whenever we take one out // XXX ms later we put another in. We could do this with a // Ticker more accurately, but then we'd have to work out how // not to run it when it wasn't needed <-p.pacer if p.maxConnections > 0 { <-p.connTokens } p.mu.Lock() // Restart the timer go func(sleepTime, minSleep time.Duration) { // fs.Debugf(f, "New sleep for %v at %v", t, time.Now()) // Sleep the minimum time with the rate limiter if minSleep > 0 && sleepTime >= minSleep { _ = p.limiter.Wait(context.Background()) sleepTime -= minSleep } // Then sleep the remaining time if sleepTime > 0 { time.Sleep(sleepTime) } p.pacer <- struct{}{} }(p.sleepTime, p.minSleep) p.mu.Unlock() } // exponentialImplementation implements a exponentialImplementation up // and down pacing algorithm // // See the description for DefaultPacer // // This should calculate a new sleepTime. It takes a boolean as to // whether the operation should be retried or not. // // Call with p.mu held func (p *Pacer) defaultPacer(retry bool) { oldSleepTime := p.sleepTime if retry { if p.attackConstant == 0 { p.sleepTime = p.maxSleep } else { p.sleepTime = (p.sleepTime << p.attackConstant) / ((1 << p.attackConstant) - 1) } if p.sleepTime > p.maxSleep { p.sleepTime = p.maxSleep } if p.sleepTime != oldSleepTime { fs.Debugf("pacer", "Rate limited, increasing sleep to %v", p.sleepTime) } } else { p.sleepTime = (p.sleepTime<