cmd: implement RetryAfter errors which cause a sleep before a retry
Use NewRetryAfterError to return an error which will cause a high level retry after the delay specified.
This commit is contained in:
parent
d3e3bbedf3
commit
2065e73d0b
6 changed files with 177 additions and 7 deletions
19
cmd/cmd.go
19
cmd/cmd.go
|
@ -230,22 +230,31 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) {
|
|||
SigInfoHandler()
|
||||
for try := 1; try <= *retries; try++ {
|
||||
err = f()
|
||||
if !Retry || (err == nil && !accounting.Stats.Errored()) {
|
||||
fs.CountError(err)
|
||||
if !Retry || !accounting.Stats.Errored() {
|
||||
if try > 1 {
|
||||
fs.Errorf(nil, "Attempt %d/%d succeeded", try, *retries)
|
||||
}
|
||||
break
|
||||
}
|
||||
if fserrors.IsFatalError(err) || accounting.Stats.HadFatalError() {
|
||||
if accounting.Stats.HadFatalError() {
|
||||
fs.Errorf(nil, "Fatal error received - not attempting retries")
|
||||
break
|
||||
}
|
||||
if fserrors.IsNoRetryError(err) || (accounting.Stats.Errored() && !accounting.Stats.HadRetryError()) {
|
||||
if accounting.Stats.Errored() && !accounting.Stats.HadRetryError() {
|
||||
fs.Errorf(nil, "Can't retry this error - not attempting retries")
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "Attempt %d/%d failed with %d errors and: %v", try, *retries, accounting.Stats.GetErrors(), err)
|
||||
if retryAfter := accounting.Stats.RetryAfter(); !retryAfter.IsZero() {
|
||||
d := retryAfter.Sub(time.Now())
|
||||
if d > 0 {
|
||||
fs.Logf(nil, "Received retry after error - sleeping until %s (%v)", retryAfter.Format(time.RFC3339Nano), d)
|
||||
time.Sleep(d)
|
||||
}
|
||||
}
|
||||
lastErr := accounting.Stats.GetLastError()
|
||||
if lastErr != nil {
|
||||
fs.Errorf(nil, "Attempt %d/%d failed with %d errors and: %v", try, *retries, accounting.Stats.GetErrors(), lastErr)
|
||||
} else {
|
||||
fs.Errorf(nil, "Attempt %d/%d failed with %d errors", try, *retries, accounting.Stats.GetErrors())
|
||||
}
|
||||
|
|
|
@ -74,6 +74,7 @@ type StatsInfo struct {
|
|||
lastError error
|
||||
fatalError bool
|
||||
retryError bool
|
||||
retryAfter time.Time
|
||||
checks int64
|
||||
checking *stringSet
|
||||
checkQueue int
|
||||
|
@ -373,6 +374,7 @@ func (s *StatsInfo) ResetCounters() {
|
|||
s.lastError = nil
|
||||
s.fatalError = false
|
||||
s.retryError = false
|
||||
s.retryAfter = time.Time{}
|
||||
s.checks = 0
|
||||
s.transfers = 0
|
||||
s.deletes = 0
|
||||
|
@ -386,6 +388,7 @@ func (s *StatsInfo) ResetErrors() {
|
|||
s.lastError = nil
|
||||
s.fatalError = false
|
||||
s.retryError = false
|
||||
s.retryAfter = time.Time{}
|
||||
}
|
||||
|
||||
// Errored returns whether there have been any errors
|
||||
|
@ -397,6 +400,9 @@ func (s *StatsInfo) Errored() bool {
|
|||
|
||||
// Error adds a single error into the stats, assigns lastError and eventually sets fatalError or retryError
|
||||
func (s *StatsInfo) Error(err error) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.errors++
|
||||
|
@ -404,11 +410,25 @@ func (s *StatsInfo) Error(err error) {
|
|||
switch {
|
||||
case fserrors.IsFatalError(err):
|
||||
s.fatalError = true
|
||||
case fserrors.IsRetryAfterError(err):
|
||||
retryAfter := fserrors.RetryAfterErrorTime(err)
|
||||
if s.retryAfter.IsZero() || retryAfter.Sub(s.retryAfter) > 0 {
|
||||
s.retryAfter = retryAfter
|
||||
}
|
||||
s.retryError = true
|
||||
case !fserrors.IsNoRetryError(err):
|
||||
s.retryError = true
|
||||
}
|
||||
}
|
||||
|
||||
// RetryAfter returns the time to retry after if it is set. It will
|
||||
// be Zero if it isn't set.
|
||||
func (s *StatsInfo) RetryAfter() time.Time {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.retryAfter
|
||||
}
|
||||
|
||||
// Checking adds a check into the stats
|
||||
func (s *StatsInfo) Checking(remote string) {
|
||||
s.checking.add(remote)
|
||||
|
|
|
@ -2,9 +2,12 @@ package accounting
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ncw/rclone/fs/fserrors"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
|
@ -49,3 +52,68 @@ func TestPercentage(t *testing.T) {
|
|||
assert.Equal(t, percent(-100, 100), "-")
|
||||
assert.Equal(t, percent(-100, -100), "-")
|
||||
}
|
||||
|
||||
func TestStatsError(t *testing.T) {
|
||||
s := NewStats()
|
||||
assert.Equal(t, int64(0), s.GetErrors())
|
||||
assert.False(t, s.HadFatalError())
|
||||
assert.False(t, s.HadRetryError())
|
||||
assert.Equal(t, time.Time{}, s.RetryAfter())
|
||||
assert.Equal(t, nil, s.GetLastError())
|
||||
assert.False(t, s.Errored())
|
||||
|
||||
t0 := time.Now()
|
||||
t1 := t0.Add(time.Second)
|
||||
|
||||
s.Error(nil)
|
||||
assert.Equal(t, int64(0), s.GetErrors())
|
||||
assert.False(t, s.HadFatalError())
|
||||
assert.False(t, s.HadRetryError())
|
||||
assert.Equal(t, time.Time{}, s.RetryAfter())
|
||||
assert.Equal(t, nil, s.GetLastError())
|
||||
assert.False(t, s.Errored())
|
||||
|
||||
s.Error(io.EOF)
|
||||
assert.Equal(t, int64(1), s.GetErrors())
|
||||
assert.False(t, s.HadFatalError())
|
||||
assert.True(t, s.HadRetryError())
|
||||
assert.Equal(t, time.Time{}, s.RetryAfter())
|
||||
assert.Equal(t, io.EOF, s.GetLastError())
|
||||
assert.True(t, s.Errored())
|
||||
|
||||
e := fserrors.ErrorRetryAfter(t0)
|
||||
s.Error(e)
|
||||
assert.Equal(t, int64(2), s.GetErrors())
|
||||
assert.False(t, s.HadFatalError())
|
||||
assert.True(t, s.HadRetryError())
|
||||
assert.Equal(t, t0, s.RetryAfter())
|
||||
assert.Equal(t, e, s.GetLastError())
|
||||
|
||||
err := errors.Wrap(fserrors.ErrorRetryAfter(t1), "potato")
|
||||
s.Error(err)
|
||||
assert.Equal(t, int64(3), s.GetErrors())
|
||||
assert.False(t, s.HadFatalError())
|
||||
assert.True(t, s.HadRetryError())
|
||||
assert.Equal(t, t1, s.RetryAfter())
|
||||
assert.Equal(t, t1, fserrors.RetryAfterErrorTime(err))
|
||||
|
||||
s.Error(fserrors.FatalError(io.EOF))
|
||||
assert.Equal(t, int64(4), s.GetErrors())
|
||||
assert.True(t, s.HadFatalError())
|
||||
assert.True(t, s.HadRetryError())
|
||||
assert.Equal(t, t1, s.RetryAfter())
|
||||
|
||||
s.ResetErrors()
|
||||
assert.Equal(t, int64(0), s.GetErrors())
|
||||
assert.False(t, s.HadFatalError())
|
||||
assert.False(t, s.HadRetryError())
|
||||
assert.Equal(t, time.Time{}, s.RetryAfter())
|
||||
assert.Equal(t, nil, s.GetLastError())
|
||||
assert.False(t, s.Errored())
|
||||
|
||||
s.Error(fserrors.NoRetryError(io.EOF))
|
||||
assert.Equal(t, int64(1), s.GetErrors())
|
||||
assert.False(t, s.HadFatalError())
|
||||
assert.False(t, s.HadRetryError())
|
||||
assert.Equal(t, time.Time{}, s.RetryAfter())
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
@ -166,6 +167,58 @@ func IsNoRetryError(err error) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// RetryAfter is an optional interface for error as to whether the
|
||||
// operation should be retried after a given delay
|
||||
//
|
||||
// This should be returned from Update or Put methods as required and
|
||||
// will cause the entire sync to be retried after a delay.
|
||||
type RetryAfter interface {
|
||||
error
|
||||
RetryAfter() time.Time
|
||||
}
|
||||
|
||||
// ErrorRetryAfter is an error which expresses a time that should be
|
||||
// waited for until trying again
|
||||
type ErrorRetryAfter time.Time
|
||||
|
||||
// NewErrorRetryAfter returns an ErrorRetryAfter with the given
|
||||
// duration as an endpoint
|
||||
func NewErrorRetryAfter(d time.Duration) ErrorRetryAfter {
|
||||
return ErrorRetryAfter(time.Now().Add(d))
|
||||
}
|
||||
|
||||
// Error returns the textual version of the error
|
||||
func (e ErrorRetryAfter) Error() string {
|
||||
return fmt.Sprintf("try again after %v (%v)", time.Time(e).Format(time.RFC3339Nano), time.Time(e).Sub(time.Now()))
|
||||
}
|
||||
|
||||
// RetryAfter returns the time the operation should be retried at or
|
||||
// after
|
||||
func (e ErrorRetryAfter) RetryAfter() time.Time {
|
||||
return time.Time(e)
|
||||
}
|
||||
|
||||
// Check interface
|
||||
var _ RetryAfter = ErrorRetryAfter{}
|
||||
|
||||
// RetryAfterErrorTime returns the time that the RetryAfter error
|
||||
// indicates or a Zero time.Time
|
||||
func RetryAfterErrorTime(err error) time.Time {
|
||||
if err == nil {
|
||||
return time.Time{}
|
||||
}
|
||||
_, err = Cause(err)
|
||||
if do, ok := err.(RetryAfter); ok {
|
||||
return do.RetryAfter()
|
||||
}
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
// IsRetryAfterError returns true if err is an ErrorRetryAfter
|
||||
func IsRetryAfterError(err error) bool {
|
||||
return !RetryAfterErrorTime(err).IsZero()
|
||||
}
|
||||
|
||||
// Cause is a souped up errors.Cause which can unwrap some standard
|
||||
// library errors too. It returns true if any of the intermediate
|
||||
// errors had a Timeout() or Temporary() method which returned true.
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"os"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -143,3 +144,21 @@ func TestShouldRetry(t *testing.T) {
|
|||
assert.Equal(t, test.want, got, fmt.Sprintf("test #%d: %v", i, test.err))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryAfter(t *testing.T) {
|
||||
e := NewErrorRetryAfter(time.Second)
|
||||
after := e.RetryAfter()
|
||||
dt := after.Sub(time.Now())
|
||||
assert.True(t, dt >= 900*time.Millisecond && dt <= 1100*time.Millisecond)
|
||||
assert.True(t, IsRetryAfterError(e))
|
||||
assert.False(t, IsRetryAfterError(io.EOF))
|
||||
assert.Equal(t, time.Time{}, RetryAfterErrorTime(io.EOF))
|
||||
assert.False(t, IsRetryAfterError(nil))
|
||||
assert.Contains(t, e.Error(), "try again after")
|
||||
|
||||
t0 := time.Now()
|
||||
err := errors.Wrap(ErrorRetryAfter(t0), "potato")
|
||||
assert.Equal(t, t0, RetryAfterErrorTime(err))
|
||||
assert.True(t, IsRetryAfterError(err))
|
||||
assert.Contains(t, e.Error(), "try again after")
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/ncw/rclone/fs/hash"
|
||||
"github.com/ncw/rclone/fs/operations"
|
||||
"github.com/ncw/rclone/fstest"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/text/unicode/norm"
|
||||
|
@ -468,7 +469,7 @@ func TestSyncIgnoreErrors(t *testing.T) {
|
|||
)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
fs.CountError(nil)
|
||||
fs.CountError(errors.New("boom"))
|
||||
assert.NoError(t, Sync(r.Fremote, r.Flocal, false))
|
||||
|
||||
fstest.CheckListingWithPrecision(
|
||||
|
@ -778,7 +779,7 @@ func TestSyncAfterRemovingAFileAndAddingAFileSubDirWithErrors(t *testing.T) {
|
|||
)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
fs.CountError(nil)
|
||||
fs.CountError(errors.New("boom"))
|
||||
err := Sync(r.Fremote, r.Flocal, false)
|
||||
assert.Equal(t, fs.ErrorNotDeleting, err)
|
||||
|
||||
|
|
Loading…
Reference in a new issue