From f8dbf8292a070bb0ae72130dc71f1c910b4cacc9 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 18 Jan 2019 16:29:35 +0000 Subject: [PATCH] pacer: control the minSleep with a rate limiter and allow burst This will mean rclone tracks the minimum sleep values more precisely when it isn't rate limiting. Allowing burst is good for some backends (eg Google Drive). --- lib/pacer/pacer.go | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/lib/pacer/pacer.go b/lib/pacer/pacer.go index 8d94e5fe8..6322ae86e 100644 --- a/lib/pacer/pacer.go +++ b/lib/pacer/pacer.go @@ -2,12 +2,14 @@ package pacer import ( + "context" "math/rand" "sync" "time" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/fserrors" + "golang.org/x/time/rate" ) // Pacer state @@ -15,6 +17,8 @@ type Pacer struct { mu sync.Mutex // Protecting read/writes minSleep time.Duration // minimum sleep time maxSleep time.Duration // maximum sleep time + burst int // number of calls to send without rate limiting + limiter *rate.Limiter // rate limiter for the minsleep decayConstant uint // decay constant attackConstant uint // attack constant pacer chan struct{} // To pace the operations @@ -76,7 +80,6 @@ type Paced func() (bool, error) // New returns a Pacer with sensible defaults func New() *Pacer { p := &Pacer{ - minSleep: 10 * time.Millisecond, maxSleep: 2 * time.Second, decayConstant: 2, attackConstant: 1, @@ -86,6 +89,7 @@ func New() *Pacer { p.sleepTime = p.minSleep p.SetPacer(DefaultPacer) p.SetMaxConnections(fs.Config.Checkers + fs.Config.Transfers) + p.SetMinSleep(10 * time.Millisecond) // Put the first pacing token in p.pacer <- struct{}{} @@ -114,6 +118,16 @@ func (p *Pacer) SetMinSleep(t time.Duration) *Pacer { defer p.mu.Unlock() p.minSleep = t p.sleepTime = p.minSleep + p.limiter = rate.NewLimiter(rate.Every(p.minSleep), p.burst) + return p +} + +// SetBurst sets the burst with no limiting of the pacer +func (p *Pacer) SetBurst(n int) *Pacer { + p.mu.Lock() + defer p.mu.Unlock() + p.burst = n + p.limiter = rate.NewLimiter(rate.Every(p.minSleep), p.burst) return p } @@ -216,11 +230,19 @@ func (p *Pacer) beginCall() { p.mu.Lock() // Restart the timer - go func(t time.Duration) { + go func(sleepTime, minSleep time.Duration) { // fs.Debugf(f, "New sleep for %v at %v", t, time.Now()) - time.Sleep(t) + // Sleep the minimum time with the rate limiter + if minSleep > 0 && sleepTime >= minSleep { + p.limiter.Wait(context.Background()) + sleepTime -= minSleep + } + // Then sleep the remaining time + if sleepTime > 0 { + time.Sleep(sleepTime) + } p.pacer <- struct{}{} - }(p.sleepTime) + }(p.sleepTime, p.minSleep) p.mu.Unlock() }