forked from TrueCloudLab/rclone
Implement compliant pacing scheme for Amazon Cloud Drive
* Implement switchable pacing algorithm * Add tests for pacer
This commit is contained in:
parent
e391311512
commit
05050d53ad
3 changed files with 433 additions and 30 deletions
|
@ -40,8 +40,6 @@ const (
|
|||
statusAvailable = "AVAILABLE"
|
||||
timeFormat = time.RFC3339 // 2014-03-07T22:31:12.173Z
|
||||
minSleep = 20 * time.Millisecond
|
||||
maxSleep = 15 * time.Second
|
||||
decayConstant = 2 // bigger for slower decay, exponential
|
||||
)
|
||||
|
||||
// Globals
|
||||
|
@ -129,6 +127,7 @@ var retryErrorCodes = []int{
|
|||
429, // Rate exceeded.
|
||||
500, // Get occasional 500 Internal Server Error
|
||||
409, // Conflict - happens in the unit tests a lot
|
||||
503, // Service Unavailable
|
||||
}
|
||||
|
||||
// shouldRetry returns a boolean as to whether this resp and err
|
||||
|
@ -172,7 +171,7 @@ func NewFs(name, root string) (fs.Fs, error) {
|
|||
name: name,
|
||||
root: root,
|
||||
c: c,
|
||||
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
|
||||
pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.AmazonCloudDrivePacer),
|
||||
}
|
||||
|
||||
// Update endpoints
|
||||
|
|
151
pacer/pacer.go
151
pacer/pacer.go
|
@ -1,25 +1,54 @@
|
|||
// pacer is a utility package to make pacing and retrying API calls easy
|
||||
// Package pacer makes pacing and retrying API calls easy
|
||||
package pacer
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ncw/rclone/fs"
|
||||
)
|
||||
|
||||
// Pacer state
|
||||
type Pacer struct {
|
||||
minSleep time.Duration // minimum sleep time
|
||||
maxSleep time.Duration // maximum sleep time
|
||||
decayConstant uint // decay constant
|
||||
pacer chan struct{} // To pace the operations
|
||||
sleepTime time.Duration // Time to sleep for each transaction
|
||||
retries int // Max number of retries
|
||||
mu sync.Mutex // Protecting read/writes
|
||||
maxConnections int // Maximum number of concurrent connections
|
||||
connTokens chan struct{} // Connection tokens
|
||||
mu sync.Mutex // Protecting read/writes
|
||||
minSleep time.Duration // minimum sleep time
|
||||
maxSleep time.Duration // maximum sleep time
|
||||
decayConstant uint // decay constant
|
||||
pacer chan struct{} // To pace the operations
|
||||
sleepTime time.Duration // Time to sleep for each transaction
|
||||
retries int // Max number of retries
|
||||
maxConnections int // Maximum number of concurrent connections
|
||||
connTokens chan struct{} // Connection tokens
|
||||
calculatePace func(bool) // switchable pacing algorithm - call with mu held
|
||||
consecutiveRetries int // number of consecutive retries
|
||||
}
|
||||
|
||||
// Type is for selecting different pacing algorithms
|
||||
type Type int
|
||||
|
||||
const (
|
||||
// DefaultPacer is a truncated exponential attack and decay.
|
||||
//
|
||||
// On retries the sleep time is doubled, on non errors then
|
||||
// sleeptime decays according to the decay constant as set
|
||||
// with SetDecayConstant.
|
||||
//
|
||||
// The sleep never goes below that set with SetMinSleep or
|
||||
// above that set with SetMaxSleep.
|
||||
DefaultPacer = Type(iota)
|
||||
|
||||
// AmazonCloudDrivePacer is a specialised pacer for Amazon Cloud Drive
|
||||
//
|
||||
// It implements a truncated exponential backoff strategy with
|
||||
// randomization. Normally operations are paced at the
|
||||
// interval set with SetMinSleep. On errors the sleep timer
|
||||
// is set to 0..2**retries seconds.
|
||||
//
|
||||
// See https://developer.amazon.com/public/apis/experience/cloud-drive/content/restful-api-best-practices
|
||||
AmazonCloudDrivePacer
|
||||
)
|
||||
|
||||
// Paced is a function which is called by the Call and CallNoRetry
|
||||
// methods. It should return a boolean, true if it would like to be
|
||||
// retried, and an error. This error may be returned or returned
|
||||
|
@ -36,7 +65,9 @@ func New() *Pacer {
|
|||
pacer: make(chan struct{}, 1),
|
||||
}
|
||||
p.sleepTime = p.minSleep
|
||||
p.SetMaxConnections(fs.Config.Checkers)
|
||||
p.SetPacer(DefaultPacer)
|
||||
p.SetMaxConnections(fs.Config.Checkers + fs.Config.Transfers)
|
||||
|
||||
// Put the first pacing token in
|
||||
p.pacer <- struct{}{}
|
||||
|
||||
|
@ -101,6 +132,22 @@ func (p *Pacer) SetRetries(retries int) *Pacer {
|
|||
return p
|
||||
}
|
||||
|
||||
// SetPacer sets the pacing algorithm
|
||||
//
|
||||
// It will choose the default algorithm if an incorrect value is
|
||||
// passed in.
|
||||
func (p *Pacer) SetPacer(t Type) *Pacer {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
switch t {
|
||||
case AmazonCloudDrivePacer:
|
||||
p.calculatePace = p.acdPacer
|
||||
default:
|
||||
p.calculatePace = p.defaultPacer
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
// Start a call to the API
|
||||
//
|
||||
// This must be called as a pair with endCall
|
||||
|
@ -126,17 +173,18 @@ func (p *Pacer) beginCall() {
|
|||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// End a call to the API
|
||||
// exponentialImplementation implements a exponentialImplementation up
|
||||
// and down pacing algorithm
|
||||
//
|
||||
// Refresh the pace given an error that was returned. It returns a
|
||||
// boolean as to whether the operation should be retried.
|
||||
func (p *Pacer) endCall(again bool) {
|
||||
if p.maxConnections > 0 {
|
||||
p.connTokens <- struct{}{}
|
||||
}
|
||||
p.mu.Lock()
|
||||
// See the description for DefaultPacer
|
||||
//
|
||||
// This should calculate a new sleepTime. It takes a boolean as to
|
||||
// whether the operation should be retried or not.
|
||||
//
|
||||
// Call with p.mu held
|
||||
func (p *Pacer) defaultPacer(retry bool) {
|
||||
oldSleepTime := p.sleepTime
|
||||
if again {
|
||||
if retry {
|
||||
p.sleepTime *= 2
|
||||
if p.sleepTime > p.maxSleep {
|
||||
p.sleepTime = p.maxSleep
|
||||
|
@ -153,21 +201,70 @@ func (p *Pacer) endCall(again bool) {
|
|||
fs.Debug("pacer", "Reducing sleep to %v", p.sleepTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// acdPacer implements a truncated exponential backoff
|
||||
// strategy with randomization for Amazon Cloud Drive
|
||||
//
|
||||
// See the description for AmazonCloudDrivePacer
|
||||
//
|
||||
// This should calculate a new sleepTime. It takes a boolean as to
|
||||
// whether the operation should be retried or not.
|
||||
//
|
||||
// Call with p.mu held
|
||||
func (p *Pacer) acdPacer(retry bool) {
|
||||
consecutiveRetries := p.consecutiveRetries
|
||||
if consecutiveRetries == 0 {
|
||||
if p.sleepTime != p.minSleep {
|
||||
p.sleepTime = p.minSleep
|
||||
fs.Debug("pacer", "Resetting sleep to minimum %v on success", p.sleepTime)
|
||||
}
|
||||
} else {
|
||||
if consecutiveRetries > 9 {
|
||||
consecutiveRetries = 9
|
||||
}
|
||||
// consecutiveRetries starts at 1 so
|
||||
// maxSleep is 2**(consecutiveRetries-1) seconds
|
||||
maxSleep := time.Second << uint(consecutiveRetries-1)
|
||||
// actual sleep is random from 0..maxSleep
|
||||
p.sleepTime = time.Duration(rand.Int63n(int64(maxSleep)))
|
||||
if p.sleepTime < p.minSleep {
|
||||
p.sleepTime = p.minSleep
|
||||
}
|
||||
fs.Debug("pacer", "Rate limited, sleeping for %v (%d retries)", p.sleepTime, consecutiveRetries)
|
||||
}
|
||||
}
|
||||
|
||||
// endCall implements the pacing algorithm
|
||||
//
|
||||
// This should calculate a new sleepTime. It takes a boolean as to
|
||||
// whether the operation should be retried or not.
|
||||
func (p *Pacer) endCall(retry bool) {
|
||||
if p.maxConnections > 0 {
|
||||
p.connTokens <- struct{}{}
|
||||
}
|
||||
p.mu.Lock()
|
||||
if retry {
|
||||
p.consecutiveRetries++
|
||||
} else {
|
||||
p.consecutiveRetries = 0
|
||||
}
|
||||
p.calculatePace(retry)
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// call implements Call but with settable retries
|
||||
func (p *Pacer) call(fn Paced, retries int) (err error) {
|
||||
var again bool
|
||||
var retry bool
|
||||
for i := 0; i < retries; i++ {
|
||||
p.beginCall()
|
||||
again, err = fn()
|
||||
p.endCall(again)
|
||||
if !again {
|
||||
retry, err = fn()
|
||||
p.endCall(retry)
|
||||
if !retry {
|
||||
break
|
||||
}
|
||||
}
|
||||
if again {
|
||||
if retry {
|
||||
err = fs.RetryError(err)
|
||||
}
|
||||
return err
|
||||
|
@ -186,8 +283,8 @@ func (p *Pacer) Call(fn Paced) (err error) {
|
|||
return p.call(fn, retries)
|
||||
}
|
||||
|
||||
// Pace the remote operations to not exceed Amazon's limits and return
|
||||
// a retry error on rate limit exceeded
|
||||
// CallNoRetry paces the remote operations to not exceed the limits
|
||||
// and return a retry error on rate limit exceeded
|
||||
//
|
||||
// This calls fn and wraps the output in a RetryError if it would like
|
||||
// it to be retried
|
||||
|
|
307
pacer/pacer_test.go
Normal file
307
pacer/pacer_test.go
Normal file
|
@ -0,0 +1,307 @@
|
|||
package pacer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ncw/rclone/fs"
|
||||
)
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
p := New()
|
||||
if p.minSleep != 10*time.Millisecond {
|
||||
t.Errorf("minSleep")
|
||||
}
|
||||
if p.maxSleep != 2*time.Second {
|
||||
t.Errorf("maxSleep")
|
||||
}
|
||||
if p.sleepTime != p.minSleep {
|
||||
t.Errorf("sleepTime")
|
||||
}
|
||||
if p.retries != 10 {
|
||||
t.Errorf("retries")
|
||||
}
|
||||
if p.decayConstant != 2 {
|
||||
t.Errorf("decayConstant")
|
||||
}
|
||||
if cap(p.pacer) != 1 {
|
||||
t.Errorf("pacer 1")
|
||||
}
|
||||
if len(p.pacer) != 1 {
|
||||
t.Errorf("pacer 2")
|
||||
}
|
||||
if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.defaultPacer) {
|
||||
t.Errorf("calculatePace")
|
||||
}
|
||||
if p.maxConnections != fs.Config.Checkers+fs.Config.Transfers {
|
||||
t.Errorf("maxConnections")
|
||||
}
|
||||
if cap(p.connTokens) != fs.Config.Checkers+fs.Config.Transfers {
|
||||
t.Errorf("connTokens")
|
||||
}
|
||||
if p.consecutiveRetries != 0 {
|
||||
t.Errorf("consecutiveRetries")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetMinSleep(t *testing.T) {
|
||||
p := New().SetMinSleep(1 * time.Millisecond)
|
||||
if p.minSleep != 1*time.Millisecond {
|
||||
t.Errorf("didn't set")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetMaxSleep(t *testing.T) {
|
||||
p := New().SetMaxSleep(100 * time.Second)
|
||||
if p.maxSleep != 100*time.Second {
|
||||
t.Errorf("didn't set")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMaxConnections(t *testing.T) {
|
||||
p := New().SetMaxConnections(20)
|
||||
if p.maxConnections != 20 {
|
||||
t.Errorf("maxConnections")
|
||||
}
|
||||
if cap(p.connTokens) != 20 {
|
||||
t.Errorf("connTokens")
|
||||
}
|
||||
p.SetMaxConnections(0)
|
||||
if p.maxConnections != 0 {
|
||||
t.Errorf("maxConnections is not 0")
|
||||
}
|
||||
if p.connTokens != nil {
|
||||
t.Errorf("connTokens is not nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetDecayConstant(t *testing.T) {
|
||||
p := New().SetDecayConstant(17)
|
||||
if p.decayConstant != 17 {
|
||||
t.Errorf("didn't set")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetRetries(t *testing.T) {
|
||||
p := New().SetRetries(18)
|
||||
if p.retries != 18 {
|
||||
t.Errorf("didn't set")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetPacer(t *testing.T) {
|
||||
p := New().SetPacer(AmazonCloudDrivePacer)
|
||||
if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.acdPacer) {
|
||||
t.Errorf("calculatePace is not acdPacer")
|
||||
}
|
||||
p.SetPacer(DefaultPacer)
|
||||
if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.defaultPacer) {
|
||||
t.Errorf("calculatePace is not defaultPacer")
|
||||
}
|
||||
}
|
||||
|
||||
// emptyTokens empties the pacer of all its tokens
|
||||
func emptyTokens(p *Pacer) {
|
||||
for len(p.pacer) != 0 {
|
||||
<-p.pacer
|
||||
}
|
||||
for len(p.connTokens) != 0 {
|
||||
<-p.connTokens
|
||||
}
|
||||
}
|
||||
|
||||
func TestBeginCall(t *testing.T) {
|
||||
p := New().SetMaxConnections(10).SetMinSleep(1 * time.Millisecond)
|
||||
emptyTokens(p)
|
||||
go p.beginCall()
|
||||
time.Sleep(2 * p.minSleep)
|
||||
if len(p.pacer) != 0 {
|
||||
t.Errorf("beginSleep fired too early #1")
|
||||
}
|
||||
p.pacer <- struct{}{}
|
||||
time.Sleep(2 * p.minSleep)
|
||||
if len(p.pacer) != 0 {
|
||||
t.Errorf("beginSleep fired too early #2")
|
||||
}
|
||||
p.connTokens <- struct{}{}
|
||||
time.Sleep(2 * p.minSleep)
|
||||
if len(p.pacer) == 0 {
|
||||
t.Errorf("beginSleep didn't fire")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBeginCallZeroConnections(t *testing.T) {
|
||||
p := New().SetMaxConnections(0).SetMinSleep(1 * time.Millisecond)
|
||||
emptyTokens(p)
|
||||
go p.beginCall()
|
||||
time.Sleep(2 * p.minSleep)
|
||||
if len(p.pacer) != 0 {
|
||||
t.Errorf("beginSleep fired too early #1")
|
||||
}
|
||||
p.pacer <- struct{}{}
|
||||
time.Sleep(2 * p.minSleep)
|
||||
if len(p.pacer) == 0 {
|
||||
t.Errorf("beginSleep didn't fire")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultPacer(t *testing.T) {
|
||||
p := New().SetMinSleep(time.Millisecond).SetPacer(DefaultPacer).SetMaxSleep(time.Second).SetDecayConstant(2)
|
||||
for _, test := range []struct {
|
||||
in time.Duration
|
||||
retry bool
|
||||
want time.Duration
|
||||
}{
|
||||
{time.Millisecond, true, 2 * time.Millisecond},
|
||||
{time.Second, true, time.Second},
|
||||
{(3 * time.Second) / 4, true, time.Second},
|
||||
{time.Second, false, 750 * time.Millisecond},
|
||||
{1000 * time.Microsecond, false, time.Millisecond},
|
||||
{1200 * time.Microsecond, false, time.Millisecond},
|
||||
} {
|
||||
p.sleepTime = test.in
|
||||
p.defaultPacer(test.retry)
|
||||
got := p.sleepTime
|
||||
if got != test.want {
|
||||
t.Errorf("bad sleep want %v got %v", test.want, got)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAmazonCloudDrivePacer(t *testing.T) {
|
||||
p := New().SetMinSleep(time.Millisecond).SetPacer(AmazonCloudDrivePacer).SetMaxSleep(time.Second).SetDecayConstant(2)
|
||||
// Do lots of times because of the random number!
|
||||
for _, test := range []struct {
|
||||
in time.Duration
|
||||
consecutiveRetries int
|
||||
retry bool
|
||||
want time.Duration
|
||||
}{
|
||||
{time.Millisecond, 0, true, time.Millisecond},
|
||||
{10 * time.Millisecond, 0, true, time.Millisecond},
|
||||
{1 * time.Second, 1, true, 500 * time.Millisecond},
|
||||
{1 * time.Second, 2, true, 1 * time.Second},
|
||||
{1 * time.Second, 3, true, 2 * time.Second},
|
||||
{1 * time.Second, 4, true, 4 * time.Second},
|
||||
{1 * time.Second, 5, true, 8 * time.Second},
|
||||
{1 * time.Second, 6, true, 16 * time.Second},
|
||||
{1 * time.Second, 7, true, 32 * time.Second},
|
||||
{1 * time.Second, 8, true, 64 * time.Second},
|
||||
{1 * time.Second, 9, true, 128 * time.Second},
|
||||
{1 * time.Second, 10, true, 128 * time.Second},
|
||||
{1 * time.Second, 11, true, 128 * time.Second},
|
||||
} {
|
||||
const n = 1000
|
||||
var sum time.Duration
|
||||
// measure average time over n cycles
|
||||
for i := 0; i < n; i++ {
|
||||
p.sleepTime = test.in
|
||||
p.consecutiveRetries = test.consecutiveRetries
|
||||
p.acdPacer(test.retry)
|
||||
sum += p.sleepTime
|
||||
}
|
||||
got := sum / n
|
||||
//t.Logf("%+v: got = %v", test, got)
|
||||
if got < (test.want*9)/10 || got > (test.want*11)/10 {
|
||||
t.Fatalf("%+v: bad sleep want %v+/-10% got %v", test, test.want, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndCall(t *testing.T) {
|
||||
p := New().SetMaxConnections(5)
|
||||
emptyTokens(p)
|
||||
p.consecutiveRetries = 1
|
||||
p.endCall(true)
|
||||
if len(p.connTokens) != 1 {
|
||||
t.Errorf("Expecting 1 token")
|
||||
}
|
||||
if p.consecutiveRetries != 2 {
|
||||
t.Errorf("Bad consecutive retries")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndCallZeroConnections(t *testing.T) {
|
||||
p := New().SetMaxConnections(0)
|
||||
emptyTokens(p)
|
||||
p.consecutiveRetries = 1
|
||||
p.endCall(false)
|
||||
if len(p.connTokens) != 0 {
|
||||
t.Errorf("Expecting 0 token")
|
||||
}
|
||||
if p.consecutiveRetries != 0 {
|
||||
t.Errorf("Bad consecutive retries")
|
||||
}
|
||||
}
|
||||
|
||||
var errFoo = fmt.Errorf("Foo")
|
||||
|
||||
type dummyPaced struct {
|
||||
retry bool
|
||||
called int
|
||||
}
|
||||
|
||||
func (dp *dummyPaced) fn() (bool, error) {
|
||||
dp.called++
|
||||
return dp.retry, errFoo
|
||||
}
|
||||
|
||||
func Test_callNoRetry(t *testing.T) {
|
||||
p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond)
|
||||
|
||||
dp := &dummyPaced{retry: false}
|
||||
err := p.call(dp.fn, 10)
|
||||
if dp.called != 1 {
|
||||
t.Errorf("called want %d got %d", 1, dp.called)
|
||||
}
|
||||
if err != errFoo {
|
||||
t.Errorf("err want %v got %v", errFoo, err)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_callRetry(t *testing.T) {
|
||||
p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond)
|
||||
|
||||
dp := &dummyPaced{retry: true}
|
||||
err := p.call(dp.fn, 10)
|
||||
if dp.called != 10 {
|
||||
t.Errorf("called want %d got %d", 10, dp.called)
|
||||
}
|
||||
if err == errFoo {
|
||||
t.Errorf("err didn't want %v got %v", errFoo, err)
|
||||
}
|
||||
_, ok := err.(fs.Retry)
|
||||
if !ok {
|
||||
t.Errorf("didn't return a retry error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCall(t *testing.T) {
|
||||
p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond).SetRetries(20)
|
||||
|
||||
dp := &dummyPaced{retry: true}
|
||||
err := p.Call(dp.fn)
|
||||
if dp.called != 20 {
|
||||
t.Errorf("called want %d got %d", 20, dp.called)
|
||||
}
|
||||
_, ok := err.(fs.Retry)
|
||||
if !ok {
|
||||
t.Errorf("didn't return a retry error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCallNoRetry(t *testing.T) {
|
||||
p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond).SetRetries(20)
|
||||
|
||||
dp := &dummyPaced{retry: true}
|
||||
err := p.CallNoRetry(dp.fn)
|
||||
if dp.called != 1 {
|
||||
t.Errorf("called want %d got %d", 1, dp.called)
|
||||
}
|
||||
_, ok := err.(fs.Retry)
|
||||
if !ok {
|
||||
t.Errorf("didn't return a retry error")
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue