diff --git a/backend/amazonclouddrive/amazonclouddrive.go b/backend/amazonclouddrive/amazonclouddrive.go
index a1288aa67..a61227e23 100644
--- a/backend/amazonclouddrive/amazonclouddrive.go
+++ b/backend/amazonclouddrive/amazonclouddrive.go
@@ -155,7 +155,7 @@ type Fs struct {
noAuthClient *http.Client // unauthenticated http client
root string // the path we are working on
dirCache *dircache.DirCache // Map of directory path to directory id
- pacer *pacer.Pacer // pacer for API calls
+ pacer *fs.Pacer // pacer for API calls
trueRootID string // ID of true root directory
tokenRenewer *oauthutil.Renew // renew the token on expiry
}
@@ -273,7 +273,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
root: root,
opt: *opt,
c: c,
- pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.AmazonCloudDrivePacer),
+ pacer: fs.NewPacer(pacer.NewAmazonCloudDrive(pacer.MinSleep(minSleep))),
noAuthClient: fshttp.NewClient(fs.Config),
}
f.features = (&fs.Features{
diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go
index 5c9defb3f..51aad65c4 100644
--- a/backend/azureblob/azureblob.go
+++ b/backend/azureblob/azureblob.go
@@ -144,7 +144,7 @@ type Fs struct {
containerOKMu sync.Mutex // mutex to protect container OK
containerOK bool // true if we have created the container
containerDeleted bool // true if we have deleted the container
- pacer *pacer.Pacer // To pace and retry the API calls
+ pacer *fs.Pacer // To pace and retry the API calls
uploadToken *pacer.TokenDispenser // control concurrency
}
@@ -347,7 +347,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
opt: *opt,
container: container,
root: directory,
- pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant).SetPacer(pacer.S3Pacer),
+ pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
client: fshttp.NewClient(fs.Config),
}
diff --git a/backend/b2/b2.go b/backend/b2/b2.go
index 6f3539f66..75dbd6783 100644
--- a/backend/b2/b2.go
+++ b/backend/b2/b2.go
@@ -167,7 +167,7 @@ type Fs struct {
uploadMu sync.Mutex // lock for upload variable
uploads []*api.GetUploadURLResponse // result of get upload URL calls
authMu sync.Mutex // lock for authorizing the account
- pacer *pacer.Pacer // To pace and retry the API calls
+ pacer *fs.Pacer // To pace and retry the API calls
bufferTokens chan []byte // control concurrency of multipart uploads
}
@@ -251,13 +251,7 @@ func (f *Fs) shouldRetryNoReauth(resp *http.Response, err error) (bool, error) {
fs.Errorf(f, "Malformed %s header %q: %v", retryAfterHeader, retryAfterString, err)
}
}
- retryAfterDuration := time.Duration(retryAfter) * time.Second
- if f.pacer.GetSleep() < retryAfterDuration {
- fs.Debugf(f, "Setting sleep to %v after error: %v", retryAfterDuration, err)
- // We set 1/2 the value here because the pacer will double it immediately
- f.pacer.SetSleep(retryAfterDuration / 2)
- }
- return true, err
+ return true, pacer.RetryAfterError(err, time.Duration(retryAfter)*time.Second)
}
return fserrors.ShouldRetry(err) || fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err
}
@@ -363,7 +357,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
bucket: bucket,
root: directory,
srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetErrorHandler(errorHandler),
- pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
+ pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.features = (&fs.Features{
ReadMimeType: true,
diff --git a/backend/box/box.go b/backend/box/box.go
index 3bf242e77..f6f4a629c 100644
--- a/backend/box/box.go
+++ b/backend/box/box.go
@@ -111,7 +111,7 @@ type Fs struct {
features *fs.Features // optional features
srv *rest.Client // the connection to the one drive server
dirCache *dircache.DirCache // Map of directory path to directory id
- pacer *pacer.Pacer // pacer for API calls
+ pacer *fs.Pacer // pacer for API calls
tokenRenewer *oauthutil.Renew // renew the token on expiry
uploadToken *pacer.TokenDispenser // control concurrency
}
@@ -260,7 +260,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
root: root,
opt: *opt,
srv: rest.NewClient(oAuthClient).SetRoot(rootURL),
- pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
+ pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
}
f.features = (&fs.Features{
diff --git a/backend/drive/drive.go b/backend/drive/drive.go
index 3dfa1a51d..c45b8a35f 100644
--- a/backend/drive/drive.go
+++ b/backend/drive/drive.go
@@ -426,7 +426,7 @@ type Fs struct {
client *http.Client // authorized client
rootFolderID string // the id of the root folder
dirCache *dircache.DirCache // Map of directory path to directory id
- pacer *pacer.Pacer // To pace the API calls
+ pacer *fs.Pacer // To pace the API calls
exportExtensions []string // preferred extensions to download docs
importMimeTypes []string // MIME types to convert to docs
isTeamDrive bool // true if this is a team drive
@@ -789,8 +789,8 @@ func configTeamDrive(opt *Options, m configmap.Mapper, name string) error {
}
// newPacer makes a pacer configured for drive
-func newPacer(opt *Options) *pacer.Pacer {
- return pacer.New().SetMinSleep(time.Duration(opt.PacerMinSleep)).SetBurst(opt.PacerBurst).SetPacer(pacer.GoogleDrivePacer)
+func newPacer(opt *Options) *fs.Pacer {
+ return fs.NewPacer(pacer.NewGoogleDrive(pacer.MinSleep(opt.PacerMinSleep), pacer.Burst(opt.PacerBurst)))
}
func getServiceAccountClient(opt *Options, credentialsData []byte) (*http.Client, error) {
diff --git a/backend/dropbox/dropbox.go b/backend/dropbox/dropbox.go
index fa17ba97e..7d4748d97 100644
--- a/backend/dropbox/dropbox.go
+++ b/backend/dropbox/dropbox.go
@@ -160,7 +160,7 @@ type Fs struct {
team team.Client // for the Teams API
slashRoot string // root with "/" prefix, lowercase
slashRootSlash string // root with "/" prefix and postfix, lowercase
- pacer *pacer.Pacer // To pace the API calls
+ pacer *fs.Pacer // To pace the API calls
ns string // The namespace we are using or "" for none
}
@@ -209,7 +209,7 @@ func shouldRetry(err error) (bool, error) {
case auth.RateLimitAPIError:
if e.RateLimitError.RetryAfter > 0 {
fs.Debugf(baseErrString, "Too many requests or write operations. Trying again in %d seconds.", e.RateLimitError.RetryAfter)
- time.Sleep(time.Duration(e.RateLimitError.RetryAfter) * time.Second)
+ err = pacer.RetryAfterError(err, time.Duration(e.RateLimitError.RetryAfter)*time.Second)
}
return true, err
}
@@ -273,7 +273,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
f := &Fs{
name: name,
opt: *opt,
- pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
+ pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
config := dropbox.Config{
LogLevel: dropbox.LogOff, // logging in the SDK: LogOff, LogDebug, LogInfo
diff --git a/backend/googlecloudstorage/googlecloudstorage.go b/backend/googlecloudstorage/googlecloudstorage.go
index 5e7b65015..25866ff81 100644
--- a/backend/googlecloudstorage/googlecloudstorage.go
+++ b/backend/googlecloudstorage/googlecloudstorage.go
@@ -256,7 +256,7 @@ type Fs struct {
bucket string // the bucket we are working on
bucketOKMu sync.Mutex // mutex to protect bucket OK
bucketOK bool // true if we have created the bucket
- pacer *pacer.Pacer // To pace the API calls
+ pacer *fs.Pacer // To pace the API calls
}
// Object describes a storage object
@@ -395,7 +395,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
bucket: bucket,
root: directory,
opt: *opt,
- pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.GoogleDrivePacer),
+ pacer: fs.NewPacer(pacer.NewGoogleDrive(pacer.MinSleep(minSleep))),
}
f.features = (&fs.Features{
ReadMimeType: true,
diff --git a/backend/jottacloud/jottacloud.go b/backend/jottacloud/jottacloud.go
index 99ab163aa..62bd394c2 100644
--- a/backend/jottacloud/jottacloud.go
+++ b/backend/jottacloud/jottacloud.go
@@ -190,7 +190,7 @@ type Fs struct {
endpointURL string
srv *rest.Client
apiSrv *rest.Client
- pacer *pacer.Pacer
+ pacer *fs.Pacer
tokenRenewer *oauthutil.Renew // renew the token on expiry
}
@@ -403,7 +403,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
opt: *opt,
srv: rest.NewClient(oAuthClient).SetRoot(rootURL),
apiSrv: rest.NewClient(oAuthClient).SetRoot(apiURL),
- pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
+ pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.features = (&fs.Features{
CaseInsensitive: true,
diff --git a/backend/mega/mega.go b/backend/mega/mega.go
index e61dd319c..be3d7f8f1 100644
--- a/backend/mega/mega.go
+++ b/backend/mega/mega.go
@@ -98,7 +98,7 @@ type Fs struct {
opt Options // parsed config options
features *fs.Features // optional features
srv *mega.Mega // the connection to the server
- pacer *pacer.Pacer // pacer for API calls
+ pacer *fs.Pacer // pacer for API calls
rootNodeMu sync.Mutex // mutex for _rootNode
_rootNode *mega.Node // root node - call findRoot to use this
mkdirMu sync.Mutex // used to serialize calls to mkdir / rmdir
@@ -217,7 +217,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
root: root,
opt: *opt,
srv: srv,
- pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
+ pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.features = (&fs.Features{
DuplicateFiles: true,
diff --git a/backend/onedrive/onedrive.go b/backend/onedrive/onedrive.go
index ee2440b5a..21395f1ec 100644
--- a/backend/onedrive/onedrive.go
+++ b/backend/onedrive/onedrive.go
@@ -261,7 +261,7 @@ type Fs struct {
features *fs.Features // optional features
srv *rest.Client // the connection to the one drive server
dirCache *dircache.DirCache // Map of directory path to directory id
- pacer *pacer.Pacer // pacer for API calls
+ pacer *fs.Pacer // pacer for API calls
tokenRenewer *oauthutil.Renew // renew the token on expiry
driveID string // ID to use for querying Microsoft Graph
driveType string // https://developer.microsoft.com/en-us/graph/docs/api-reference/v1.0/resources/drive
@@ -475,7 +475,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
driveID: opt.DriveID,
driveType: opt.DriveType,
srv: rest.NewClient(oAuthClient).SetRoot(graphURL + "/drives/" + opt.DriveID),
- pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
+ pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.features = (&fs.Features{
CaseInsensitive: true,
diff --git a/backend/opendrive/opendrive.go b/backend/opendrive/opendrive.go
index 5d8cc53a2..127fdc7e7 100644
--- a/backend/opendrive/opendrive.go
+++ b/backend/opendrive/opendrive.go
@@ -65,7 +65,7 @@ type Fs struct {
opt Options // parsed options
features *fs.Features // optional features
srv *rest.Client // the connection to the server
- pacer *pacer.Pacer // To pace and retry the API calls
+ pacer *fs.Pacer // To pace and retry the API calls
session UserSessionInfo // contains the session data
dirCache *dircache.DirCache // Map of directory path to directory id
}
@@ -144,7 +144,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
root: root,
opt: *opt,
srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetErrorHandler(errorHandler),
- pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
+ pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.dirCache = dircache.New(root, "0", f)
diff --git a/backend/pcloud/pcloud.go b/backend/pcloud/pcloud.go
index 7a75932de..2caaa1444 100644
--- a/backend/pcloud/pcloud.go
+++ b/backend/pcloud/pcloud.go
@@ -95,7 +95,7 @@ type Fs struct {
features *fs.Features // optional features
srv *rest.Client // the connection to the server
dirCache *dircache.DirCache // Map of directory path to directory id
- pacer *pacer.Pacer // pacer for API calls
+ pacer *fs.Pacer // pacer for API calls
tokenRenewer *oauthutil.Renew // renew the token on expiry
}
@@ -254,7 +254,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
root: root,
opt: *opt,
srv: rest.NewClient(oAuthClient).SetRoot(rootURL),
- pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
+ pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.features = (&fs.Features{
CaseInsensitive: false,
diff --git a/backend/s3/s3.go b/backend/s3/s3.go
index 8a545470c..f70d0b27b 100644
--- a/backend/s3/s3.go
+++ b/backend/s3/s3.go
@@ -782,7 +782,7 @@ type Fs struct {
bucketOKMu sync.Mutex // mutex to protect bucket OK
bucketOK bool // true if we have created the bucket
bucketDeleted bool // true if we have deleted the bucket
- pacer *pacer.Pacer // To pace the API calls
+ pacer *fs.Pacer // To pace the API calls
srv *http.Client // a plain http client
}
@@ -1055,7 +1055,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
c: c,
bucket: bucket,
ses: ses,
- pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.S3Pacer),
+ pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))),
srv: fshttp.NewClient(fs.Config),
}
f.features = (&fs.Features{
diff --git a/backend/swift/swift.go b/backend/swift/swift.go
index aa72adc57..1708db02b 100644
--- a/backend/swift/swift.go
+++ b/backend/swift/swift.go
@@ -216,7 +216,7 @@ type Fs struct {
containerOK bool // true if we have created the container
segmentsContainer string // container to store the segments (if any) in
noCheckContainer bool // don't check the container before creating it
- pacer *pacer.Pacer // To pace the API calls
+ pacer *fs.Pacer // To pace the API calls
}
// Object describes a swift object
@@ -401,7 +401,7 @@ func NewFsWithConnection(opt *Options, name, root string, c *swift.Connection, n
segmentsContainer: container + "_segments",
root: directory,
noCheckContainer: noCheckContainer,
- pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.S3Pacer),
+ pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))),
}
f.features = (&fs.Features{
ReadMimeType: true,
diff --git a/backend/webdav/webdav.go b/backend/webdav/webdav.go
index 889da8cdf..91205b00f 100644
--- a/backend/webdav/webdav.go
+++ b/backend/webdav/webdav.go
@@ -101,7 +101,7 @@ type Fs struct {
endpoint *url.URL // URL of the host
endpointURL string // endpoint as a string
srv *rest.Client // the connection to the one drive server
- pacer *pacer.Pacer // pacer for API calls
+ pacer *fs.Pacer // pacer for API calls
precision time.Duration // mod time precision
canStream bool // set if can stream
useOCMtime bool // set if can use X-OC-Mtime
@@ -318,7 +318,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
endpoint: u,
endpointURL: u.String(),
srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetRoot(u.String()),
- pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
+ pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
precision: fs.ModTimeNotSupported,
}
f.features = (&fs.Features{
diff --git a/backend/yandex/yandex.go b/backend/yandex/yandex.go
index 624c425e6..864cb25f4 100644
--- a/backend/yandex/yandex.go
+++ b/backend/yandex/yandex.go
@@ -93,7 +93,7 @@ type Fs struct {
opt Options // parsed options
features *fs.Features // optional features
srv *rest.Client // the connection to the yandex server
- pacer *pacer.Pacer // pacer for API calls
+ pacer *fs.Pacer // pacer for API calls
diskRoot string // root path with "disk:/" container name
}
@@ -269,7 +269,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
name: name,
opt: *opt,
srv: rest.NewClient(oAuthClient).SetRoot(rootURL),
- pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
+ pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.setRoot(root)
f.features = (&fs.Features{
diff --git a/fs/fs.go b/fs/fs.go
index f8ded9ed6..ffdd6cf36 100644
--- a/fs/fs.go
+++ b/fs/fs.go
@@ -16,8 +16,10 @@ import (
"github.com/ncw/rclone/fs/config/configmap"
"github.com/ncw/rclone/fs/config/configstruct"
+ "github.com/ncw/rclone/fs/fserrors"
"github.com/ncw/rclone/fs/fspath"
"github.com/ncw/rclone/fs/hash"
+ "github.com/ncw/rclone/lib/pacer"
"github.com/pkg/errors"
)
@@ -1112,3 +1114,90 @@ func GetModifyWindow(fss ...Info) time.Duration {
}
return window
}
+
+// Pacer is a simple wrapper around a pacer.Pacer with logging.
+type Pacer struct {
+ *pacer.Pacer
+}
+
+type logCalculator struct {
+ pacer.Calculator
+}
+
+// NewPacer creates a Pacer for the given Fs and Calculator.
+func NewPacer(c pacer.Calculator) *Pacer {
+ p := &Pacer{
+ Pacer: pacer.New(
+ pacer.InvokerOption(pacerInvoker),
+ pacer.MaxConnectionsOption(Config.Checkers+Config.Transfers),
+ pacer.RetriesOption(Config.LowLevelRetries),
+ pacer.CalculatorOption(c),
+ ),
+ }
+ p.SetCalculator(c)
+ return p
+}
+
+func (d *logCalculator) Calculate(state pacer.State) time.Duration {
+ type causer interface {
+ Cause() error
+ }
+
+ if c, ok := state.LastError.(causer); ok {
+ state.LastError = c.Cause()
+ } else {
+ Logf("pacer", "Invalid error in fs.Pacer: %t", state.LastError)
+ }
+ oldSleepTime := state.SleepTime
+ newSleepTime := d.Calculator.Calculate(state)
+ if state.ConsecutiveRetries > 0 {
+ if newSleepTime != oldSleepTime {
+ Debugf("pacer", "Rate limited, increasing sleep to %v", newSleepTime)
+ }
+ } else {
+ if newSleepTime != oldSleepTime {
+ Debugf("pacer", "Reducing sleep to %v", newSleepTime)
+ }
+ }
+ return newSleepTime
+}
+
+// SetCalculator sets the pacing algorithm. Don't modify the Calculator object
+// afterwards, use the ModifyCalculator method when needed.
+//
+// It will choose the default algorithm if nil is passed in.
+func (p *Pacer) SetCalculator(c pacer.Calculator) {
+ switch c.(type) {
+ case *logCalculator:
+ Logf("pacer", "Invalid Calculator in fs.Pacer.SetCalculator")
+ case nil:
+ c = &logCalculator{pacer.NewDefault()}
+ default:
+ c = &logCalculator{c}
+ }
+
+ p.Pacer.SetCalculator(c)
+}
+
+// ModifyCalculator calls the given function with the currently configured
+// Calculator and the Pacer lock held.
+func (p *Pacer) ModifyCalculator(f func(pacer.Calculator)) {
+ p.ModifyCalculator(func(c pacer.Calculator) {
+ switch _c := c.(type) {
+ case *logCalculator:
+ f(_c.Calculator)
+ default:
+ Logf("pacer", "Invalid Calculator in fs.Pacer: %t", c)
+ f(c)
+ }
+ })
+}
+
+func pacerInvoker(try, retries int, f pacer.Paced) (retry bool, err error) {
+ retry, err = f()
+ if retry {
+ Debugf("pacer", "low level retry %d/%d (error %v)", try, retries, err)
+ err = fserrors.RetryError(err)
+ }
+ return
+}
diff --git a/fs/fs_test.go b/fs/fs_test.go
index 753614b0b..0fd15c22d 100644
--- a/fs/fs_test.go
+++ b/fs/fs_test.go
@@ -2,8 +2,15 @@ package fs
import (
"strings"
+ "sync"
"testing"
+ "time"
+ "github.com/stretchr/testify/require"
+
+ "github.com/ncw/rclone/fs/fserrors"
+ "github.com/ncw/rclone/lib/pacer"
+ "github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
)
@@ -70,3 +77,47 @@ func TestOption(t *testing.T) {
err = d.Set("sdfsdf")
assert.Error(t, err)
}
+
+var errFoo = errors.New("foo")
+
+type dummyPaced struct {
+ retry bool
+ called int
+ wait *sync.Cond
+}
+
+func (dp *dummyPaced) fn() (bool, error) {
+ if dp.wait != nil {
+ dp.wait.L.Lock()
+ dp.wait.Wait()
+ dp.wait.L.Unlock()
+ }
+ dp.called++
+ return dp.retry, errFoo
+}
+
+func TestPacerCall(t *testing.T) {
+ expectedCalled := Config.LowLevelRetries
+ if expectedCalled == 0 {
+ expectedCalled = 20
+ Config.LowLevelRetries = expectedCalled
+ defer func() {
+ Config.LowLevelRetries = 0
+ }()
+ }
+ p := NewPacer(pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
+
+ dp := &dummyPaced{retry: true}
+ err := p.Call(dp.fn)
+ require.Equal(t, expectedCalled, dp.called)
+ require.Implements(t, (*fserrors.Retrier)(nil), err)
+}
+
+func TestPacerCallNoRetry(t *testing.T) {
+ p := NewPacer(pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
+
+ dp := &dummyPaced{retry: true}
+ err := p.CallNoRetry(dp.fn)
+ require.Equal(t, 1, dp.called)
+ require.Implements(t, (*fserrors.Retrier)(nil), err)
+}
diff --git a/lib/pacer/pacer.go b/lib/pacer/pacer.go
index 3914950ad..7e58230f5 100644
--- a/lib/pacer/pacer.go
+++ b/lib/pacer/pacer.go
@@ -2,74 +2,69 @@
package pacer
import (
- "context"
- "math/rand"
"sync"
"time"
- "github.com/ncw/rclone/fs"
- "github.com/ncw/rclone/fs/fserrors"
- "golang.org/x/time/rate"
+ "github.com/ncw/rclone/lib/errors"
)
-// Pacer state
-type Pacer struct {
- mu sync.Mutex // Protecting read/writes
- minSleep time.Duration // minimum sleep time
- maxSleep time.Duration // maximum sleep time
- burst int // number of calls to send without rate limiting
- limiter *rate.Limiter // rate limiter for the minsleep
- decayConstant uint // decay constant
- attackConstant uint // attack constant
- pacer chan struct{} // To pace the operations
- sleepTime time.Duration // Time to sleep for each transaction
- retries int // Max number of retries
- maxConnections int // Maximum number of concurrent connections
- connTokens chan struct{} // Connection tokens
- calculatePace func(bool) // switchable pacing algorithm - call with mu held
- consecutiveRetries int // number of consecutive retries
+// State represents the public Pacer state that will be passed to the
+// configured Calculator
+type State struct {
+ SleepTime time.Duration // current time to sleep before adding the pacer token back
+ ConsecutiveRetries int // number of consecutive retries, will be 0 when the last invoker call returned false
+ LastError error // the error returned by the last invoker call or nil
}
-// Type is for selecting different pacing algorithms
-type Type int
+// Calculator is a generic calculation function for a Pacer.
+type Calculator interface {
+ // Calculate takes the current Pacer state and returns the sleep time after which
+ // the next Pacer call will be done.
+ Calculate(state State) time.Duration
+}
-const (
- // DefaultPacer is a truncated exponential attack and decay.
- //
- // On retries the sleep time is doubled, on non errors then
- // sleeptime decays according to the decay constant as set
- // with SetDecayConstant.
- //
- // The sleep never goes below that set with SetMinSleep or
- // above that set with SetMaxSleep.
- DefaultPacer = Type(iota)
+// Pacer is the primary type of the pacer package. It allows to retry calls
+// with a configurable delay in between.
+type Pacer struct {
+ pacerOptions
+ mu sync.Mutex // Protecting read/writes
+ pacer chan struct{} // To pace the operations
+ connTokens chan struct{} // Connection tokens
+ state State
+}
+type pacerOptions struct {
+ maxConnections int // Maximum number of concurrent connections
+ retries int // Max number of retries
+ calculator Calculator // switchable pacing algorithm - call with mu held
+ invoker InvokerFunc // wrapper function used to invoke the target function
+}
- // AmazonCloudDrivePacer is a specialised pacer for Amazon Drive
- //
- // It implements a truncated exponential backoff strategy with
- // randomization. Normally operations are paced at the
- // interval set with SetMinSleep. On errors the sleep timer
- // is set to 0..2**retries seconds.
- //
- // See https://developer.amazon.com/public/apis/experience/cloud-drive/content/restful-api-best-practices
- AmazonCloudDrivePacer
+// InvokerFunc is the signature of the wrapper function used to invoke the
+// target function in Pacer.
+type InvokerFunc func(try, tries int, f Paced) (bool, error)
- // GoogleDrivePacer is a specialised pacer for Google Drive
- //
- // It implements a truncated exponential backoff strategy with
- // randomization. Normally operations are paced at the
- // interval set with SetMinSleep. On errors the sleep timer
- // is set to (2 ^ n) + random_number_milliseconds seconds
- //
- // See https://developers.google.com/drive/v2/web/handle-errors#exponential-backoff
- GoogleDrivePacer
+// Option can be used in New to configure the Pacer.
+type Option func(*pacerOptions)
- // S3Pacer is a specialised pacer for S3
- //
- // It is basically the defaultPacer, but allows the sleep time to go to 0
- // when things are going well.
- S3Pacer
-)
+// CalculatorOption sets a Calculator for the new Pacer.
+func CalculatorOption(c Calculator) Option {
+ return func(p *pacerOptions) { p.calculator = c }
+}
+
+// RetriesOption sets the retries number for the new Pacer.
+func RetriesOption(retries int) Option {
+ return func(p *pacerOptions) { p.retries = retries }
+}
+
+// MaxConnectionsOption sets the maximum connections number for the new Pacer.
+func MaxConnectionsOption(maxConnections int) Option {
+ return func(p *pacerOptions) { p.maxConnections = maxConnections }
+}
+
+// InvokerOption sets a InvokerFunc for the new Pacer.
+func InvokerOption(invoker InvokerFunc) Option {
+ return func(p *pacerOptions) { p.invoker = invoker }
+}
// Paced is a function which is called by the Call and CallNoRetry
// methods. It should return a boolean, true if it would like to be
@@ -77,19 +72,27 @@ const (
// wrapped in a RetryError.
type Paced func() (bool, error)
-// New returns a Pacer with sensible defaults
-func New() *Pacer {
- p := &Pacer{
- maxSleep: 2 * time.Second,
- decayConstant: 2,
- attackConstant: 1,
- retries: fs.Config.LowLevelRetries,
- pacer: make(chan struct{}, 1),
+// New returns a Pacer with sensible defaults.
+func New(options ...Option) *Pacer {
+ opts := pacerOptions{
+ maxConnections: 10,
+ retries: 3,
}
- p.sleepTime = p.minSleep
- p.SetPacer(DefaultPacer)
- p.SetMaxConnections(fs.Config.Checkers + fs.Config.Transfers)
- p.SetMinSleep(10 * time.Millisecond)
+ for _, o := range options {
+ o(&opts)
+ }
+ p := &Pacer{
+ pacerOptions: opts,
+ pacer: make(chan struct{}, 1),
+ }
+ if p.calculator == nil {
+ p.SetCalculator(nil)
+ }
+ p.state.SleepTime = p.calculator.Calculate(p.state)
+ if p.invoker == nil {
+ p.invoker = invoke
+ }
+ p.SetMaxConnections(p.maxConnections)
// Put the first pacing token in
p.pacer <- struct{}{}
@@ -97,54 +100,11 @@ func New() *Pacer {
return p
}
-// SetSleep sets the current sleep time
-func (p *Pacer) SetSleep(t time.Duration) *Pacer {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.sleepTime = t
- return p
-}
-
-// GetSleep gets the current sleep time
-func (p *Pacer) GetSleep() time.Duration {
- p.mu.Lock()
- defer p.mu.Unlock()
- return p.sleepTime
-}
-
-// SetMinSleep sets the minimum sleep time for the pacer
-func (p *Pacer) SetMinSleep(t time.Duration) *Pacer {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.minSleep = t
- p.sleepTime = p.minSleep
- p.limiter = rate.NewLimiter(rate.Every(p.minSleep), p.burst)
- return p
-}
-
-// SetBurst sets the burst with no limiting of the pacer
-func (p *Pacer) SetBurst(n int) *Pacer {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.burst = n
- p.limiter = rate.NewLimiter(rate.Every(p.minSleep), p.burst)
- return p
-}
-
-// SetMaxSleep sets the maximum sleep time for the pacer
-func (p *Pacer) SetMaxSleep(t time.Duration) *Pacer {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.maxSleep = t
- p.sleepTime = p.minSleep
- return p
-}
-
// SetMaxConnections sets the maximum number of concurrent connections.
// Setting the value to 0 will allow unlimited number of connections.
// Should not be changed once you have started calling the pacer.
// By default this will be set to fs.Config.Checkers.
-func (p *Pacer) SetMaxConnections(n int) *Pacer {
+func (p *Pacer) SetMaxConnections(n int) {
p.mu.Lock()
defer p.mu.Unlock()
p.maxConnections = n
@@ -156,61 +116,34 @@ func (p *Pacer) SetMaxConnections(n int) *Pacer {
p.connTokens <- struct{}{}
}
}
- return p
}
-// SetDecayConstant sets the decay constant for the pacer
-//
-// This is the speed the time falls back to the minimum after errors
-// have occurred.
-//
-// bigger for slower decay, exponential. 1 is halve, 0 is go straight to minimum
-func (p *Pacer) SetDecayConstant(decay uint) *Pacer {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.decayConstant = decay
- return p
-}
-
-// SetAttackConstant sets the attack constant for the pacer
-//
-// This is the speed the time grows from the minimum after errors have
-// occurred.
-//
-// bigger for slower attack, 1 is double, 0 is go straight to maximum
-func (p *Pacer) SetAttackConstant(attack uint) *Pacer {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.attackConstant = attack
- return p
-}
-
-// SetRetries sets the max number of tries for Call
-func (p *Pacer) SetRetries(retries int) *Pacer {
+// SetRetries sets the max number of retries for Call
+func (p *Pacer) SetRetries(retries int) {
p.mu.Lock()
defer p.mu.Unlock()
p.retries = retries
- return p
}
-// SetPacer sets the pacing algorithm
+// SetCalculator sets the pacing algorithm. Don't modify the Calculator object
+// afterwards, use the ModifyCalculator method when needed.
//
-// It will choose the default algorithm if an incorrect value is
-// passed in.
-func (p *Pacer) SetPacer(t Type) *Pacer {
+// It will choose the default algorithm if nil is passed in.
+func (p *Pacer) SetCalculator(c Calculator) {
p.mu.Lock()
defer p.mu.Unlock()
- switch t {
- case AmazonCloudDrivePacer:
- p.calculatePace = p.acdPacer
- case GoogleDrivePacer:
- p.calculatePace = p.drivePacer
- case S3Pacer:
- p.calculatePace = p.s3Pacer
- default:
- p.calculatePace = p.defaultPacer
+ if c == nil {
+ c = NewDefault()
}
- return p
+ p.calculator = c
+}
+
+// ModifyCalculator calls the given function with the currently configured
+// Calculator and the Pacer lock held.
+func (p *Pacer) ModifyCalculator(f func(Calculator)) {
+ p.mu.Lock()
+ f(p.calculator)
+ p.mu.Unlock()
}
// Start a call to the API
@@ -230,170 +163,29 @@ func (p *Pacer) beginCall() {
p.mu.Lock()
// Restart the timer
- go func(sleepTime, minSleep time.Duration) {
- // fs.Debugf(f, "New sleep for %v at %v", t, time.Now())
- // Sleep the minimum time with the rate limiter
- if minSleep > 0 && sleepTime >= minSleep {
- _ = p.limiter.Wait(context.Background())
- sleepTime -= minSleep
- }
- // Then sleep the remaining time
- if sleepTime > 0 {
- time.Sleep(sleepTime)
- }
+ go func(t time.Duration) {
+ time.Sleep(t)
p.pacer <- struct{}{}
- }(p.sleepTime, p.minSleep)
+ }(p.state.SleepTime)
p.mu.Unlock()
}
-// exponentialImplementation implements a exponentialImplementation up
-// and down pacing algorithm
-//
-// See the description for DefaultPacer
-//
-// This should calculate a new sleepTime. It takes a boolean as to
-// whether the operation should be retried or not.
-//
-// Call with p.mu held
-func (p *Pacer) defaultPacer(retry bool) {
- oldSleepTime := p.sleepTime
- if retry {
- if p.attackConstant == 0 {
- p.sleepTime = p.maxSleep
- } else {
- p.sleepTime = (p.sleepTime << p.attackConstant) / ((1 << p.attackConstant) - 1)
- }
- if p.sleepTime > p.maxSleep {
- p.sleepTime = p.maxSleep
- }
- if p.sleepTime != oldSleepTime {
- fs.Debugf("pacer", "Rate limited, increasing sleep to %v", p.sleepTime)
- }
- } else {
- p.sleepTime = (p.sleepTime<
> p.decayConstant
- if p.sleepTime < p.minSleep {
- p.sleepTime = p.minSleep
- }
- if p.sleepTime != oldSleepTime {
- fs.Debugf("pacer", "Reducing sleep to %v", p.sleepTime)
- }
- }
-}
-
-// acdPacer implements a truncated exponential backoff
-// strategy with randomization for Amazon Drive
-//
-// See the description for AmazonCloudDrivePacer
-//
-// This should calculate a new sleepTime. It takes a boolean as to
-// whether the operation should be retried or not.
-//
-// Call with p.mu held
-func (p *Pacer) acdPacer(retry bool) {
- consecutiveRetries := p.consecutiveRetries
- if consecutiveRetries == 0 {
- if p.sleepTime != p.minSleep {
- p.sleepTime = p.minSleep
- fs.Debugf("pacer", "Resetting sleep to minimum %v on success", p.sleepTime)
- }
- } else {
- if consecutiveRetries > 9 {
- consecutiveRetries = 9
- }
- // consecutiveRetries starts at 1 so
- // maxSleep is 2**(consecutiveRetries-1) seconds
- maxSleep := time.Second << uint(consecutiveRetries-1)
- // actual sleep is random from 0..maxSleep
- p.sleepTime = time.Duration(rand.Int63n(int64(maxSleep)))
- if p.sleepTime < p.minSleep {
- p.sleepTime = p.minSleep
- }
- fs.Debugf("pacer", "Rate limited, sleeping for %v (%d consecutive low level retries)", p.sleepTime, p.consecutiveRetries)
- }
-}
-
-// drivePacer implements a truncated exponential backoff strategy with
-// randomization for Google Drive
-//
-// See the description for GoogleDrivePacer
-//
-// This should calculate a new sleepTime. It takes a boolean as to
-// whether the operation should be retried or not.
-//
-// Call with p.mu held
-func (p *Pacer) drivePacer(retry bool) {
- consecutiveRetries := p.consecutiveRetries
- if consecutiveRetries == 0 {
- if p.sleepTime != p.minSleep {
- p.sleepTime = p.minSleep
- fs.Debugf("pacer", "Resetting sleep to minimum %v on success", p.sleepTime)
- }
- } else {
- if consecutiveRetries > 5 {
- consecutiveRetries = 5
- }
- // consecutiveRetries starts at 1 so go from 1,2,3,4,5,5 => 1,2,4,8,16,16
- // maxSleep is 2**(consecutiveRetries-1) seconds + random milliseconds
- p.sleepTime = time.Second< p.maxSleep {
- p.sleepTime = p.maxSleep
- }
- if p.sleepTime != oldSleepTime {
- fs.Debugf("pacer", "Rate limited, increasing sleep to %v", p.sleepTime)
- }
- } else {
- p.sleepTime = (p.sleepTime<> p.decayConstant
- if p.sleepTime < p.minSleep {
- p.sleepTime = 0
- }
- if p.sleepTime != oldSleepTime {
- fs.Debugf("pacer", "Reducing sleep to %v", p.sleepTime)
- }
- }
-}
-
// endCall implements the pacing algorithm
//
// This should calculate a new sleepTime. It takes a boolean as to
// whether the operation should be retried or not.
-func (p *Pacer) endCall(retry bool) {
+func (p *Pacer) endCall(retry bool, err error) {
if p.maxConnections > 0 {
p.connTokens <- struct{}{}
}
p.mu.Lock()
if retry {
- p.consecutiveRetries++
+ p.state.ConsecutiveRetries++
} else {
- p.consecutiveRetries = 0
+ p.state.ConsecutiveRetries = 0
}
- p.calculatePace(retry)
+ p.state.LastError = err
+ p.state.SleepTime = p.calculator.Calculate(p.state)
p.mu.Unlock()
}
@@ -402,15 +194,11 @@ func (p *Pacer) call(fn Paced, retries int) (err error) {
var retry bool
for i := 1; i <= retries; i++ {
p.beginCall()
- retry, err = fn()
- p.endCall(retry)
+ retry, err = p.invoker(i, retries, fn)
+ p.endCall(retry, err)
if !retry {
break
}
- fs.Debugf("pacer", "low level retry %d/%d (error %v)", i, retries, err)
- }
- if retry {
- err = fserrors.RetryError(err)
}
return err
}
@@ -436,3 +224,41 @@ func (p *Pacer) Call(fn Paced) (err error) {
func (p *Pacer) CallNoRetry(fn Paced) error {
return p.call(fn, 1)
}
+
+func invoke(try, tries int, f Paced) (bool, error) {
+ return f()
+}
+
+type retryAfterError struct {
+ error
+ retryAfter time.Duration
+}
+
+func (r *retryAfterError) Error() string {
+ return r.error.Error()
+}
+
+func (r *retryAfterError) Cause() error {
+ return r.error
+}
+
+// RetryAfterError returns a wrapped error that can be used by Calculator implementations
+func RetryAfterError(err error, retryAfter time.Duration) error {
+ return &retryAfterError{
+ error: err,
+ retryAfter: retryAfter,
+ }
+}
+
+// IsRetryAfter returns true if the the error or any of it's Cause's is an error
+// returned by RetryAfterError. It also returns the associated Duration if possible.
+func IsRetryAfter(err error) (retryAfter time.Duration, isRetryAfter bool) {
+ errors.Walk(err, func(err error) bool {
+ if r, ok := err.(*retryAfterError); ok {
+ retryAfter, isRetryAfter = r.retryAfter, true
+ return true
+ }
+ return false
+ })
+ return
+}
diff --git a/lib/pacer/pacer_test.go b/lib/pacer/pacer_test.go
index 1b2793f7a..4a7646530 100644
--- a/lib/pacer/pacer_test.go
+++ b/lib/pacer/pacer_test.go
@@ -1,181 +1,85 @@
package pacer
import (
- "fmt"
+ "sync"
"testing"
"time"
- "github.com/ncw/rclone/fs"
- "github.com/ncw/rclone/fs/fserrors"
"github.com/pkg/errors"
+ "github.com/stretchr/testify/assert"
)
func TestNew(t *testing.T) {
const expectedRetries = 7
- fs.Config.LowLevelRetries = expectedRetries
- p := New()
- if p.minSleep != 10*time.Millisecond {
- t.Errorf("minSleep")
- }
- if p.maxSleep != 2*time.Second {
- t.Errorf("maxSleep")
- }
- if p.sleepTime != p.minSleep {
- t.Errorf("sleepTime")
- }
- if p.retries != expectedRetries {
- t.Errorf("retries want %v got %v", expectedRetries, p.retries)
- }
- if p.decayConstant != 2 {
- t.Errorf("decayConstant")
- }
- if p.attackConstant != 1 {
- t.Errorf("attackConstant")
- }
- if cap(p.pacer) != 1 {
- t.Errorf("pacer 1")
- }
- if len(p.pacer) != 1 {
- t.Errorf("pacer 2")
- }
- if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.defaultPacer) {
- t.Errorf("calculatePace")
- }
- if p.maxConnections != fs.Config.Checkers+fs.Config.Transfers {
- t.Errorf("maxConnections")
- }
- if cap(p.connTokens) != fs.Config.Checkers+fs.Config.Transfers {
- t.Errorf("connTokens")
- }
- if p.consecutiveRetries != 0 {
- t.Errorf("consecutiveRetries")
- }
-}
-
-func TestSetSleep(t *testing.T) {
- p := New().SetSleep(2 * time.Millisecond)
- if p.sleepTime != 2*time.Millisecond {
- t.Errorf("didn't set")
- }
-}
-
-func TestGetSleep(t *testing.T) {
- p := New().SetSleep(2 * time.Millisecond)
- if p.GetSleep() != 2*time.Millisecond {
- t.Errorf("didn't get")
- }
-}
-
-func TestSetMinSleep(t *testing.T) {
- p := New().SetMinSleep(1 * time.Millisecond)
- if p.minSleep != 1*time.Millisecond {
- t.Errorf("didn't set")
- }
-}
-
-func TestSetMaxSleep(t *testing.T) {
- p := New().SetMaxSleep(100 * time.Second)
- if p.maxSleep != 100*time.Second {
- t.Errorf("didn't set")
+ const expectedConnections = 9
+ p := New(RetriesOption(expectedRetries), MaxConnectionsOption(expectedConnections))
+ if d, ok := p.calculator.(*Default); ok {
+ assert.Equal(t, 10*time.Millisecond, d.minSleep)
+ assert.Equal(t, 2*time.Second, d.maxSleep)
+ assert.Equal(t, d.minSleep, p.state.SleepTime)
+ assert.Equal(t, uint(2), d.decayConstant)
+ assert.Equal(t, uint(1), d.attackConstant)
+ } else {
+ t.Errorf("calculator")
}
+ assert.Equal(t, expectedRetries, p.retries)
+ assert.Equal(t, 1, cap(p.pacer))
+ assert.Equal(t, 1, len(p.pacer))
+ assert.Equal(t, expectedConnections, p.maxConnections)
+ assert.Equal(t, expectedConnections, cap(p.connTokens))
+ assert.Equal(t, 0, p.state.ConsecutiveRetries)
}
func TestMaxConnections(t *testing.T) {
- p := New().SetMaxConnections(20)
- if p.maxConnections != 20 {
- t.Errorf("maxConnections")
- }
- if cap(p.connTokens) != 20 {
- t.Errorf("connTokens")
- }
+ p := New()
+ p.SetMaxConnections(20)
+ assert.Equal(t, 20, p.maxConnections)
+ assert.Equal(t, 20, cap(p.connTokens))
p.SetMaxConnections(0)
- if p.maxConnections != 0 {
- t.Errorf("maxConnections is not 0")
- }
- if p.connTokens != nil {
- t.Errorf("connTokens is not nil")
- }
-}
-
-func TestSetDecayConstant(t *testing.T) {
- p := New().SetDecayConstant(17)
- if p.decayConstant != 17 {
- t.Errorf("didn't set")
- }
+ assert.Equal(t, 0, p.maxConnections)
+ assert.Nil(t, p.connTokens)
}
func TestDecay(t *testing.T) {
- p := New().SetMinSleep(time.Microsecond).SetPacer(DefaultPacer).SetMaxSleep(time.Second)
+ c := NewDefault(MinSleep(1*time.Microsecond), MaxSleep(1*time.Second))
for _, test := range []struct {
- in time.Duration
+ in State
attackConstant uint
want time.Duration
}{
- {8 * time.Millisecond, 1, 4 * time.Millisecond},
- {1 * time.Millisecond, 0, time.Microsecond},
- {1 * time.Millisecond, 2, (3 * time.Millisecond) / 4},
- {1 * time.Millisecond, 3, (7 * time.Millisecond) / 8},
+ {State{SleepTime: 8 * time.Millisecond}, 1, 4 * time.Millisecond},
+ {State{SleepTime: 1 * time.Millisecond}, 0, 1 * time.Microsecond},
+ {State{SleepTime: 1 * time.Millisecond}, 2, (3 * time.Millisecond) / 4},
+ {State{SleepTime: 1 * time.Millisecond}, 3, (7 * time.Millisecond) / 8},
} {
- p.sleepTime = test.in
- p.SetDecayConstant(test.attackConstant)
- p.defaultPacer(false)
- got := p.sleepTime
- if got != test.want {
- t.Errorf("bad sleep want %v got %v", test.want, got)
- }
- }
-}
-
-func TestSetAttackConstant(t *testing.T) {
- p := New().SetAttackConstant(19)
- if p.attackConstant != 19 {
- t.Errorf("didn't set")
+ c.decayConstant = test.attackConstant
+ got := c.Calculate(test.in)
+ assert.Equal(t, test.want, got, "test: %+v", test)
}
}
func TestAttack(t *testing.T) {
- p := New().SetMinSleep(time.Microsecond).SetPacer(DefaultPacer).SetMaxSleep(time.Second)
+ c := NewDefault(MinSleep(1*time.Microsecond), MaxSleep(1*time.Second))
for _, test := range []struct {
- in time.Duration
+ in State
attackConstant uint
want time.Duration
}{
- {1 * time.Millisecond, 1, 2 * time.Millisecond},
- {1 * time.Millisecond, 0, time.Second},
- {1 * time.Millisecond, 2, (4 * time.Millisecond) / 3},
- {1 * time.Millisecond, 3, (8 * time.Millisecond) / 7},
+ {State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 1, 2 * time.Millisecond},
+ {State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 0, 1 * time.Second},
+ {State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 2, (4 * time.Millisecond) / 3},
+ {State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 3, (8 * time.Millisecond) / 7},
} {
- p.sleepTime = test.in
- p.SetAttackConstant(test.attackConstant)
- p.defaultPacer(true)
- got := p.sleepTime
- if got != test.want {
- t.Errorf("bad sleep want %v got %v", test.want, got)
- }
+ c.attackConstant = test.attackConstant
+ got := c.Calculate(test.in)
+ assert.Equal(t, test.want, got, "test: %+v", test)
}
-
}
func TestSetRetries(t *testing.T) {
- p := New().SetRetries(18)
- if p.retries != 18 {
- t.Errorf("didn't set")
- }
-}
-
-func TestSetPacer(t *testing.T) {
- p := New().SetPacer(AmazonCloudDrivePacer)
- if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.acdPacer) {
- t.Errorf("calculatePace is not acdPacer")
- }
- p.SetPacer(GoogleDrivePacer)
- if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.drivePacer) {
- t.Errorf("calculatePace is not drivePacer")
- }
- p.SetPacer(DefaultPacer)
- if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.defaultPacer) {
- t.Errorf("calculatePace is not defaultPacer")
- }
+ p := New()
+ p.SetRetries(18)
+ assert.Equal(t, 18, p.retries)
}
// emptyTokens empties the pacer of all its tokens
@@ -200,7 +104,7 @@ func waitForPace(p *Pacer, duration time.Duration) (when time.Time) {
}
func TestBeginCall(t *testing.T) {
- p := New().SetMaxConnections(10).SetMinSleep(1 * time.Millisecond)
+ p := New(MaxConnectionsOption(10), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
emptyTokens(p)
go p.beginCall()
if !waitForPace(p, 10*time.Millisecond).IsZero() {
@@ -223,7 +127,7 @@ func TestBeginCall(t *testing.T) {
}
func TestBeginCallZeroConnections(t *testing.T) {
- p := New().SetMaxConnections(0).SetMinSleep(1 * time.Millisecond)
+ p := New(MaxConnectionsOption(0), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
emptyTokens(p)
go p.beginCall()
if !waitForPace(p, 10*time.Millisecond).IsZero() {
@@ -241,155 +145,144 @@ func TestBeginCallZeroConnections(t *testing.T) {
}
func TestDefaultPacer(t *testing.T) {
- p := New().SetMinSleep(time.Millisecond).SetPacer(DefaultPacer).SetMaxSleep(time.Second).SetDecayConstant(2)
+ c := NewDefault(MinSleep(1*time.Millisecond), MaxSleep(1*time.Second), DecayConstant(2))
for _, test := range []struct {
- in time.Duration
- retry bool
+ state State
want time.Duration
}{
- {time.Millisecond, true, 2 * time.Millisecond},
- {time.Second, true, time.Second},
- {(3 * time.Second) / 4, true, time.Second},
- {time.Second, false, 750 * time.Millisecond},
- {1000 * time.Microsecond, false, time.Millisecond},
- {1200 * time.Microsecond, false, time.Millisecond},
+ {State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 2 * time.Millisecond},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 1}, 1 * time.Second},
+ {State{SleepTime: (3 * time.Second) / 4, ConsecutiveRetries: 1}, 1 * time.Second},
+ {State{SleepTime: 1 * time.Second}, 750 * time.Millisecond},
+ {State{SleepTime: 1000 * time.Microsecond}, 1 * time.Millisecond},
+ {State{SleepTime: 1200 * time.Microsecond}, 1 * time.Millisecond},
} {
- p.sleepTime = test.in
- p.defaultPacer(test.retry)
- got := p.sleepTime
- if got != test.want {
- t.Errorf("bad sleep want %v got %v", test.want, got)
- }
+ got := c.Calculate(test.state)
+ assert.Equal(t, test.want, got, "test: %+v", test)
}
}
func TestAmazonCloudDrivePacer(t *testing.T) {
- p := New().SetMinSleep(time.Millisecond).SetPacer(AmazonCloudDrivePacer).SetMaxSleep(time.Second).SetDecayConstant(2)
+ c := NewAmazonCloudDrive(MinSleep(1 * time.Millisecond))
// Do lots of times because of the random number!
for _, test := range []struct {
- in time.Duration
- consecutiveRetries int
- retry bool
- want time.Duration
+ state State
+ want time.Duration
}{
- {time.Millisecond, 0, true, time.Millisecond},
- {10 * time.Millisecond, 0, true, time.Millisecond},
- {1 * time.Second, 1, true, 500 * time.Millisecond},
- {1 * time.Second, 2, true, 1 * time.Second},
- {1 * time.Second, 3, true, 2 * time.Second},
- {1 * time.Second, 4, true, 4 * time.Second},
- {1 * time.Second, 5, true, 8 * time.Second},
- {1 * time.Second, 6, true, 16 * time.Second},
- {1 * time.Second, 7, true, 32 * time.Second},
- {1 * time.Second, 8, true, 64 * time.Second},
- {1 * time.Second, 9, true, 128 * time.Second},
- {1 * time.Second, 10, true, 128 * time.Second},
- {1 * time.Second, 11, true, 128 * time.Second},
+ {State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 0}, 1 * time.Millisecond},
+ {State{SleepTime: 10 * time.Millisecond, ConsecutiveRetries: 0}, 1 * time.Millisecond},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 1}, 500 * time.Millisecond},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 2}, 1 * time.Second},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 3}, 2 * time.Second},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 4}, 4 * time.Second},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 5}, 8 * time.Second},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 6}, 16 * time.Second},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 7}, 32 * time.Second},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 8}, 64 * time.Second},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 9}, 128 * time.Second},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 10}, 128 * time.Second},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 11}, 128 * time.Second},
} {
const n = 1000
var sum time.Duration
// measure average time over n cycles
for i := 0; i < n; i++ {
- p.sleepTime = test.in
- p.consecutiveRetries = test.consecutiveRetries
- p.acdPacer(test.retry)
- sum += p.sleepTime
+ sum += c.Calculate(test.state)
}
got := sum / n
- //t.Logf("%+v: got = %v", test, got)
- if got < (test.want*9)/10 || got > (test.want*11)/10 {
- t.Fatalf("%+v: bad sleep want %v+/-10%% got %v", test, test.want, got)
- }
+ assert.False(t, got < (test.want*9)/10 || got > (test.want*11)/10, "test: %+v", test)
}
}
func TestGoogleDrivePacer(t *testing.T) {
- p := New().SetMinSleep(time.Millisecond).SetPacer(GoogleDrivePacer).SetMaxSleep(time.Second).SetDecayConstant(2)
// Do lots of times because of the random number!
for _, test := range []struct {
- in time.Duration
- consecutiveRetries int
- retry bool
- want time.Duration
+ state State
+ want time.Duration
}{
- {time.Millisecond, 0, true, time.Millisecond},
- {10 * time.Millisecond, 0, true, time.Millisecond},
- {1 * time.Second, 1, true, 1*time.Second + 500*time.Millisecond},
- {1 * time.Second, 2, true, 2*time.Second + 500*time.Millisecond},
- {1 * time.Second, 3, true, 4*time.Second + 500*time.Millisecond},
- {1 * time.Second, 4, true, 8*time.Second + 500*time.Millisecond},
- {1 * time.Second, 5, true, 16*time.Second + 500*time.Millisecond},
- {1 * time.Second, 6, true, 16*time.Second + 500*time.Millisecond},
- {1 * time.Second, 7, true, 16*time.Second + 500*time.Millisecond},
+ {State{SleepTime: 1 * time.Millisecond}, 0},
+ {State{SleepTime: 10 * time.Millisecond}, 0},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 1}, 1*time.Second + 500*time.Millisecond},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 2}, 2*time.Second + 500*time.Millisecond},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 3}, 4*time.Second + 500*time.Millisecond},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 4}, 8*time.Second + 500*time.Millisecond},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 5}, 16*time.Second + 500*time.Millisecond},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 6}, 16*time.Second + 500*time.Millisecond},
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 7}, 16*time.Second + 500*time.Millisecond},
} {
const n = 1000
var sum time.Duration
// measure average time over n cycles
for i := 0; i < n; i++ {
- p.sleepTime = test.in
- p.consecutiveRetries = test.consecutiveRetries
- p.drivePacer(test.retry)
- sum += p.sleepTime
+ c := NewGoogleDrive(MinSleep(1 * time.Millisecond))
+ sum += c.Calculate(test.state)
}
got := sum / n
- //t.Logf("%+v: got = %v", test, got)
- if got < (test.want*9)/10 || got > (test.want*11)/10 {
- t.Fatalf("%+v: bad sleep want %v+/-10%% got %v", test, test.want, got)
+ assert.False(t, got < (test.want*9)/10 || got > (test.want*11)/10, "test: %+v, got: %v", test, got)
+ }
+
+ const minSleep = 2 * time.Millisecond
+ for _, test := range []struct {
+ calls int
+ want int
+ }{
+ {1, 0},
+ {9, 0},
+ {10, 0},
+ {11, 1},
+ {12, 2},
+ } {
+ c := NewGoogleDrive(MinSleep(minSleep), Burst(10))
+ count := 0
+ for i := 0; i < test.calls; i++ {
+ sleep := c.Calculate(State{})
+ time.Sleep(sleep)
+ if sleep != 0 {
+ count++
+ }
}
+ assert.Equalf(t, test.want, count, "test: %+v, got: %v", test, count)
}
}
func TestS3Pacer(t *testing.T) {
- p := New().SetMinSleep(10 * time.Millisecond).SetPacer(S3Pacer).SetMaxSleep(time.Second).SetDecayConstant(2)
+ c := NewS3(MinSleep(10*time.Millisecond), MaxSleep(1*time.Second), DecayConstant(2))
for _, test := range []struct {
- in time.Duration
- retry bool
+ state State
want time.Duration
}{
- {0, true, 10 * time.Millisecond}, //Things were going ok, we failed once, back off to minSleep
- {10 * time.Millisecond, true, 20 * time.Millisecond}, //Another fail, double the backoff
- {10 * time.Millisecond, false, 0}, //Things start going ok when we're at minSleep; should result in no sleep
- {12 * time.Millisecond, false, 0}, //*near* minsleep and going ok, decay would take below minSleep, should go to 0
- {0, false, 0}, //Things have been going ok; not retrying should keep sleep at 0
- {time.Second, true, time.Second}, //Check maxSleep is enforced
- {(3 * time.Second) / 4, true, time.Second}, //Check attack heading to maxSleep doesn't exceed maxSleep
- {time.Second, false, 750 * time.Millisecond}, //Check decay from maxSleep
- {48 * time.Millisecond, false, 36 * time.Millisecond}, //Check simple decay above minSleep
+ {State{SleepTime: 0, ConsecutiveRetries: 1}, 10 * time.Millisecond}, //Things were going ok, we failed once, back off to minSleep
+ {State{SleepTime: 10 * time.Millisecond, ConsecutiveRetries: 1}, 20 * time.Millisecond}, //Another fail, double the backoff
+ {State{SleepTime: 10 * time.Millisecond}, 0}, //Things start going ok when we're at minSleep; should result in no sleep
+ {State{SleepTime: 12 * time.Millisecond}, 0}, //*near* minsleep and going ok, decay would take below minSleep, should go to 0
+ {State{SleepTime: 0}, 0}, //Things have been going ok; not retrying should keep sleep at 0
+ {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 1}, 1 * time.Second}, //Check maxSleep is enforced
+ {State{SleepTime: (3 * time.Second) / 4, ConsecutiveRetries: 1}, 1 * time.Second}, //Check attack heading to maxSleep doesn't exceed maxSleep
+ {State{SleepTime: 1 * time.Second}, 750 * time.Millisecond}, //Check decay from maxSleep
+ {State{SleepTime: 48 * time.Millisecond}, 36 * time.Millisecond}, //Check simple decay above minSleep
} {
- p.sleepTime = test.in
- p.s3Pacer(test.retry)
- got := p.sleepTime
- if got != test.want {
- t.Errorf("bad sleep for %v with retry %v: want %v got %v", test.in, test.retry, test.want, got)
- }
+ got := c.Calculate(test.state)
+ assert.Equal(t, test.want, got, "test: %+v", test)
}
}
func TestEndCall(t *testing.T) {
- p := New().SetMaxConnections(5)
+ p := New(MaxConnectionsOption(5))
emptyTokens(p)
- p.consecutiveRetries = 1
- p.endCall(true)
- if len(p.connTokens) != 1 {
- t.Errorf("Expecting 1 token")
- }
- if p.consecutiveRetries != 2 {
- t.Errorf("Bad consecutive retries")
- }
+ p.state.ConsecutiveRetries = 1
+ p.endCall(true, nil)
+ assert.Equal(t, 1, len(p.connTokens))
+ assert.Equal(t, 2, p.state.ConsecutiveRetries)
}
func TestEndCallZeroConnections(t *testing.T) {
- p := New().SetMaxConnections(0)
+ p := New(MaxConnectionsOption(0))
emptyTokens(p)
- p.consecutiveRetries = 1
- p.endCall(false)
- if len(p.connTokens) != 0 {
- t.Errorf("Expecting 0 token")
- }
- if p.consecutiveRetries != 0 {
- t.Errorf("Bad consecutive retries")
- }
+ p.state.ConsecutiveRetries = 1
+ p.endCall(false, nil)
+ assert.Equal(t, 0, len(p.connTokens))
+ assert.Equal(t, 0, p.state.ConsecutiveRetries)
}
var errFoo = errors.New("foo")
@@ -397,67 +290,79 @@ var errFoo = errors.New("foo")
type dummyPaced struct {
retry bool
called int
+ wait *sync.Cond
}
func (dp *dummyPaced) fn() (bool, error) {
- dp.called++
+ if dp.wait != nil {
+ dp.wait.L.Lock()
+ dp.called++
+ dp.wait.Wait()
+ dp.wait.L.Unlock()
+ } else {
+ dp.called++
+ }
return dp.retry, errFoo
}
-func Test_callNoRetry(t *testing.T) {
- p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond)
+func TestCallFixed(t *testing.T) {
+ p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
dp := &dummyPaced{retry: false}
err := p.call(dp.fn, 10)
- if dp.called != 1 {
- t.Errorf("called want %d got %d", 1, dp.called)
- }
- if err != errFoo {
- t.Errorf("err want %v got %v", errFoo, err)
- }
+ assert.Equal(t, 1, dp.called)
+ assert.Equal(t, errFoo, err)
}
func Test_callRetry(t *testing.T) {
- p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond)
+ p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
dp := &dummyPaced{retry: true}
err := p.call(dp.fn, 10)
- if dp.called != 10 {
- t.Errorf("called want %d got %d", 10, dp.called)
- }
- if err == errFoo {
- t.Errorf("err didn't want %v got %v", errFoo, err)
- }
- _, ok := err.(fserrors.Retrier)
- if !ok {
- t.Errorf("didn't return a retry error")
- }
+ assert.Equal(t, 10, dp.called)
+ assert.Equal(t, errFoo, err)
}
func TestCall(t *testing.T) {
- p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond).SetRetries(20)
+ p := New(RetriesOption(20), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
dp := &dummyPaced{retry: true}
err := p.Call(dp.fn)
- if dp.called != 20 {
- t.Errorf("called want %d got %d", 20, dp.called)
- }
- _, ok := err.(fserrors.Retrier)
- if !ok {
- t.Errorf("didn't return a retry error")
- }
+ assert.Equal(t, 20, dp.called)
+ assert.Equal(t, errFoo, err)
}
-func TestCallNoRetry(t *testing.T) {
- p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond).SetRetries(20)
+func TestCallParallel(t *testing.T) {
+ p := New(MaxConnectionsOption(3), RetriesOption(1), CalculatorOption(NewDefault(MinSleep(100*time.Microsecond), MaxSleep(1*time.Millisecond))))
- dp := &dummyPaced{retry: true}
- err := p.CallNoRetry(dp.fn)
- if dp.called != 1 {
- t.Errorf("called want %d got %d", 1, dp.called)
+ wait := sync.NewCond(&sync.Mutex{})
+ funcs := make([]*dummyPaced, 5)
+ for i := range funcs {
+ dp := &dummyPaced{wait: wait}
+ funcs[i] = dp
+ go func() {
+ assert.Equal(t, errFoo, p.CallNoRetry(dp.fn))
+ }()
}
- _, ok := err.(fserrors.Retrier)
- if !ok {
- t.Errorf("didn't return a retry error")
+ time.Sleep(10 * time.Millisecond)
+ called := 0
+ wait.L.Lock()
+ for _, dp := range funcs {
+ called += dp.called
}
+ wait.L.Unlock()
+
+ assert.Equal(t, 3, called)
+ wait.Broadcast()
+ time.Sleep(20 * time.Millisecond)
+
+ called = 0
+ wait.L.Lock()
+ for _, dp := range funcs {
+ called += dp.called
+ }
+ wait.L.Unlock()
+
+ assert.Equal(t, 5, called)
+ wait.Broadcast()
}
diff --git a/lib/pacer/pacers.go b/lib/pacer/pacers.go
new file mode 100644
index 000000000..9c42be198
--- /dev/null
+++ b/lib/pacer/pacers.go
@@ -0,0 +1,326 @@
+package pacer
+
+import (
+ "math/rand"
+ "time"
+
+ "golang.org/x/time/rate"
+)
+
+type (
+ // MinSleep configures the minimum sleep time of a Calculator
+ MinSleep time.Duration
+ // MaxSleep configures the maximum sleep time of a Calculator
+ MaxSleep time.Duration
+ // DecayConstant configures the decay constant time of a Calculator
+ DecayConstant uint
+ // AttackConstant configures the attack constant of a Calculator
+ AttackConstant uint
+ // Burst configures the number of API calls to allow without sleeping
+ Burst int
+)
+
+// Default is a truncated exponential attack and decay.
+//
+// On retries the sleep time is doubled, on non errors then sleeptime decays
+// according to the decay constant as set with SetDecayConstant.
+//
+// The sleep never goes below that set with SetMinSleep or above that set
+// with SetMaxSleep.
+type Default struct {
+ minSleep time.Duration // minimum sleep time
+ maxSleep time.Duration // maximum sleep time
+ decayConstant uint // decay constant
+ attackConstant uint // attack constant
+}
+
+// DefaultOption is the interface implemented by all options for the Default Calculator
+type DefaultOption interface {
+ ApplyDefault(*Default)
+}
+
+// NewDefault creates a Calculator used by Pacer as the default.
+func NewDefault(opts ...DefaultOption) *Default {
+ c := &Default{
+ minSleep: 10 * time.Millisecond,
+ maxSleep: 2 * time.Second,
+ decayConstant: 2,
+ attackConstant: 1,
+ }
+ c.Update(opts...)
+ return c
+}
+
+// Update applies the Calculator options.
+func (c *Default) Update(opts ...DefaultOption) {
+ for _, opt := range opts {
+ opt.ApplyDefault(c)
+ }
+}
+
+// ApplyDefault updates the value on the Calculator
+func (o MinSleep) ApplyDefault(c *Default) {
+ c.minSleep = time.Duration(o)
+}
+
+// ApplyDefault updates the value on the Calculator
+func (o MaxSleep) ApplyDefault(c *Default) {
+ c.maxSleep = time.Duration(o)
+}
+
+// ApplyDefault updates the value on the Calculator
+func (o DecayConstant) ApplyDefault(c *Default) {
+ c.decayConstant = uint(o)
+}
+
+// ApplyDefault updates the value on the Calculator
+func (o AttackConstant) ApplyDefault(c *Default) {
+ c.attackConstant = uint(o)
+}
+
+// Calculate takes the current Pacer state and return the wait time until the next try.
+func (c *Default) Calculate(state State) time.Duration {
+ if t, ok := IsRetryAfter(state.LastError); ok {
+ if t < c.minSleep {
+ return c.minSleep
+ }
+ return t
+ }
+
+ if state.ConsecutiveRetries > 0 {
+ sleepTime := c.maxSleep
+ if c.attackConstant != 0 {
+ sleepTime = (state.SleepTime << c.attackConstant) / ((1 << c.attackConstant) - 1)
+ }
+ if sleepTime > c.maxSleep {
+ sleepTime = c.maxSleep
+ }
+ return sleepTime
+ }
+ sleepTime := (state.SleepTime<> c.decayConstant
+ if sleepTime < c.minSleep {
+ sleepTime = c.minSleep
+ }
+ return sleepTime
+}
+
+// AmazonCloudDrive is a specialized pacer for Amazon Drive
+//
+// It implements a truncated exponential backoff strategy with randomization.
+// Normally operations are paced at the interval set with SetMinSleep. On errors
+// the sleep timer is set to 0..2**retries seconds.
+//
+// See https://developer.amazon.com/public/apis/experience/cloud-drive/content/restful-api-best-practices
+type AmazonCloudDrive struct {
+ minSleep time.Duration // minimum sleep time
+}
+
+// AmazonCloudDriveOption is the interface implemented by all options for the AmazonCloudDrive Calculator
+type AmazonCloudDriveOption interface {
+ ApplyAmazonCloudDrive(*AmazonCloudDrive)
+}
+
+// NewAmazonCloudDrive returns a new AmazonCloudDrive Calculator with default values
+func NewAmazonCloudDrive(opts ...AmazonCloudDriveOption) *AmazonCloudDrive {
+ c := &AmazonCloudDrive{
+ minSleep: 10 * time.Millisecond,
+ }
+ c.Update(opts...)
+ return c
+}
+
+// Update applies the Calculator options.
+func (c *AmazonCloudDrive) Update(opts ...AmazonCloudDriveOption) {
+ for _, opt := range opts {
+ opt.ApplyAmazonCloudDrive(c)
+ }
+}
+
+// ApplyAmazonCloudDrive updates the value on the Calculator
+func (o MinSleep) ApplyAmazonCloudDrive(c *AmazonCloudDrive) {
+ c.minSleep = time.Duration(o)
+}
+
+// Calculate takes the current Pacer state and return the wait time until the next try.
+func (c *AmazonCloudDrive) Calculate(state State) time.Duration {
+ if t, ok := IsRetryAfter(state.LastError); ok {
+ if t < c.minSleep {
+ return c.minSleep
+ }
+ return t
+ }
+
+ consecutiveRetries := state.ConsecutiveRetries
+ if consecutiveRetries == 0 {
+ return c.minSleep
+ }
+ if consecutiveRetries > 9 {
+ consecutiveRetries = 9
+ }
+ // consecutiveRetries starts at 1 so
+ // maxSleep is 2**(consecutiveRetries-1) seconds
+ maxSleep := time.Second << uint(consecutiveRetries-1)
+ // actual sleep is random from 0..maxSleep
+ sleepTime := time.Duration(rand.Int63n(int64(maxSleep)))
+ if sleepTime < c.minSleep {
+ sleepTime = c.minSleep
+ }
+ return sleepTime
+}
+
+// GoogleDrive is a specialized pacer for Google Drive
+//
+// It implements a truncated exponential backoff strategy with randomization.
+// Normally operations are paced at the interval set with SetMinSleep. On errors
+// the sleep timer is set to (2 ^ n) + random_number_milliseconds seconds.
+//
+// See https://developers.google.com/drive/v2/web/handle-errors#exponential-backoff
+type GoogleDrive struct {
+ minSleep time.Duration // minimum sleep time
+ burst int // number of requests without sleeping
+ limiter *rate.Limiter // rate limiter for the minSleep
+}
+
+// GoogleDriveOption is the interface implemented by all options for the GoogleDrive Calculator
+type GoogleDriveOption interface {
+ ApplyGoogleDrive(*GoogleDrive)
+}
+
+// NewGoogleDrive returns a new GoogleDrive Calculator with default values
+func NewGoogleDrive(opts ...GoogleDriveOption) *GoogleDrive {
+ c := &GoogleDrive{
+ minSleep: 10 * time.Millisecond,
+ burst: 1,
+ }
+ c.Update(opts...)
+ return c
+}
+
+// Update applies the Calculator options.
+func (c *GoogleDrive) Update(opts ...GoogleDriveOption) {
+ for _, opt := range opts {
+ opt.ApplyGoogleDrive(c)
+ }
+ if c.burst <= 0 {
+ c.burst = 1
+ }
+ c.limiter = rate.NewLimiter(rate.Every(c.minSleep), c.burst)
+}
+
+// ApplyGoogleDrive updates the value on the Calculator
+func (o MinSleep) ApplyGoogleDrive(c *GoogleDrive) {
+ c.minSleep = time.Duration(o)
+}
+
+// ApplyGoogleDrive updates the value on the Calculator
+func (o Burst) ApplyGoogleDrive(c *GoogleDrive) {
+ c.burst = int(o)
+}
+
+// Calculate takes the current Pacer state and return the wait time until the next try.
+func (c *GoogleDrive) Calculate(state State) time.Duration {
+ if t, ok := IsRetryAfter(state.LastError); ok {
+ if t < c.minSleep {
+ return c.minSleep
+ }
+ return t
+ }
+
+ consecutiveRetries := state.ConsecutiveRetries
+ if consecutiveRetries == 0 {
+ return c.limiter.Reserve().Delay()
+ }
+ if consecutiveRetries > 5 {
+ consecutiveRetries = 5
+ }
+ // consecutiveRetries starts at 1 so go from 1,2,3,4,5,5 => 1,2,4,8,16,16
+ // maxSleep is 2**(consecutiveRetries-1) seconds + random milliseconds
+ return time.Second< 0 {
+ if c.attackConstant == 0 {
+ return c.maxSleep
+ }
+ if state.SleepTime == 0 {
+ return c.minSleep
+ }
+ sleepTime := (state.SleepTime << c.attackConstant) / ((1 << c.attackConstant) - 1)
+ if sleepTime > c.maxSleep {
+ sleepTime = c.maxSleep
+ }
+ return sleepTime
+ }
+ sleepTime := (state.SleepTime<> c.decayConstant
+ if sleepTime < c.minSleep {
+ sleepTime = 0
+ }
+ return sleepTime
+}