Merge pull request #4784 from MichaelEischer/rework-backend-retries

Rework backend retries
This commit is contained in:
Michael Eischer 2024-05-24 20:29:54 +02:00 committed by GitHub
commit 16ef4d515b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 198 additions and 49 deletions

View file

@ -9,7 +9,9 @@ downloading are now forcibly interrupted. This ensures that stuck requests are
retried after a short timeout. retried after a short timeout.
Attempts to access a missing file or a truncated file will no longer be retried. Attempts to access a missing file or a truncated file will no longer be retried.
This avoids unnecessary retries in those cases. This avoids unnecessary retries in those cases. All other backend requests are
retried for up to 15 minutes. This ensures that a temporarily interrupted network
connections can be tolerated.
If a download yields a corrupt file or blob, then the download will be retried once. If a download yields a corrupt file or blob, then the download will be retried once.
@ -26,3 +28,4 @@ https://github.com/restic/restic/issues/4515
https://github.com/restic/restic/issues/1523 https://github.com/restic/restic/issues/1523
https://github.com/restic/restic/pull/4520 https://github.com/restic/restic/pull/4520
https://github.com/restic/restic/pull/4800 https://github.com/restic/restic/pull/4800
https://github.com/restic/restic/pull/4784

View file

@ -416,12 +416,16 @@ func OpenRepository(ctx context.Context, opts GlobalOptions) (*repository.Reposi
} }
report := func(msg string, err error, d time.Duration) { report := func(msg string, err error, d time.Duration) {
if d >= 0 {
Warnf("%v returned error, retrying after %v: %v\n", msg, d, err) Warnf("%v returned error, retrying after %v: %v\n", msg, d, err)
} else {
Warnf("%v failed: %v\n", msg, err)
}
} }
success := func(msg string, retries int) { success := func(msg string, retries int) {
Warnf("%v operation successful after %d retries\n", msg, retries) Warnf("%v operation successful after %d retries\n", msg, retries)
} }
be = retry.New(be, 10, report, success) be = retry.New(be, 15*time.Minute, report, success)
// wrap backend if a test specified a hook // wrap backend if a test specified a hook
if opts.backendTestHook != nil { if opts.backendTestHook != nil {

View file

@ -18,7 +18,7 @@ import (
// backoff. // backoff.
type Backend struct { type Backend struct {
backend.Backend backend.Backend
MaxTries int MaxElapsedTime time.Duration
Report func(string, error, time.Duration) Report func(string, error, time.Duration)
Success func(string, int) Success func(string, int)
@ -32,10 +32,10 @@ var _ backend.Backend = &Backend{}
// backoff. report is called with a description and the error, if one occurred. // backoff. report is called with a description and the error, if one occurred.
// success is called with the number of retries before a successful operation // success is called with the number of retries before a successful operation
// (it is not called if it succeeded on the first try) // (it is not called if it succeeded on the first try)
func New(be backend.Backend, maxTries int, report func(string, error, time.Duration), success func(string, int)) *Backend { func New(be backend.Backend, maxElapsedTime time.Duration, report func(string, error, time.Duration), success func(string, int)) *Backend {
return &Backend{ return &Backend{
Backend: be, Backend: be,
MaxTries: maxTries, MaxElapsedTime: maxElapsedTime,
Report: report, Report: report,
Success: success, Success: success,
} }
@ -44,11 +44,12 @@ func New(be backend.Backend, maxTries int, report func(string, error, time.Durat
// retryNotifyErrorWithSuccess is an extension of backoff.RetryNotify with notification of success after an error. // retryNotifyErrorWithSuccess is an extension of backoff.RetryNotify with notification of success after an error.
// success is NOT notified on the first run of operation (only after an error). // success is NOT notified on the first run of operation (only after an error).
func retryNotifyErrorWithSuccess(operation backoff.Operation, b backoff.BackOff, notify backoff.Notify, success func(retries int)) error { func retryNotifyErrorWithSuccess(operation backoff.Operation, b backoff.BackOff, notify backoff.Notify, success func(retries int)) error {
var operationWrapper backoff.Operation
if success == nil { if success == nil {
return backoff.RetryNotify(operation, b, notify) operationWrapper = operation
} } else {
retries := 0 retries := 0
operationWrapper := func() error { operationWrapper = func() error {
err := operation() err := operation()
if err != nil { if err != nil {
retries++ retries++
@ -57,7 +58,38 @@ func retryNotifyErrorWithSuccess(operation backoff.Operation, b backoff.BackOff,
} }
return err return err
} }
return backoff.RetryNotify(operationWrapper, b, notify) }
err := backoff.RetryNotify(operationWrapper, b, notify)
if err != nil && notify != nil {
// log final error
notify(err, -1)
}
return err
}
func withRetryAtLeastOnce(delegate *backoff.ExponentialBackOff) *retryAtLeastOnce {
return &retryAtLeastOnce{delegate: delegate}
}
type retryAtLeastOnce struct {
delegate *backoff.ExponentialBackOff
numTries uint64
}
func (b *retryAtLeastOnce) NextBackOff() time.Duration {
delay := b.delegate.NextBackOff()
b.numTries++
if b.numTries == 1 && b.delegate.Stop == delay {
return b.delegate.InitialInterval
}
return delay
}
func (b *retryAtLeastOnce) Reset() {
b.numTries = 0
b.delegate.Reset()
} }
var fastRetries = false var fastRetries = false
@ -74,9 +106,25 @@ func (be *Backend) retry(ctx context.Context, msg string, f func() error) error
} }
bo := backoff.NewExponentialBackOff() bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = be.MaxElapsedTime
if feature.Flag.Enabled(feature.BackendErrorRedesign) {
bo.InitialInterval = 1 * time.Second
bo.Multiplier = 2
}
if fastRetries { if fastRetries {
// speed up integration tests // speed up integration tests
bo.InitialInterval = 1 * time.Millisecond bo.InitialInterval = 1 * time.Millisecond
maxElapsedTime := 200 * time.Millisecond
if bo.MaxElapsedTime > maxElapsedTime {
bo.MaxElapsedTime = maxElapsedTime
}
}
var b backoff.BackOff = withRetryAtLeastOnce(bo)
if !feature.Flag.Enabled(feature.BackendErrorRedesign) {
// deprecated behavior
b = backoff.WithMaxRetries(b, 10)
} }
err := retryNotifyErrorWithSuccess( err := retryNotifyErrorWithSuccess(
@ -89,7 +137,7 @@ func (be *Backend) retry(ctx context.Context, msg string, f func() error) error
} }
return err return err
}, },
backoff.WithContext(backoff.WithMaxRetries(bo, uint64(be.MaxTries)), ctx), backoff.WithContext(b, ctx),
func(err error, d time.Duration) { func(err error, d time.Duration) {
if be.Report != nil { if be.Report != nil {
be.Report(msg, err, d) be.Report(msg, err, d)

View file

@ -193,8 +193,9 @@ func TestBackendListRetryErrorBackend(t *testing.T) {
} }
TestFastRetries(t) TestFastRetries(t)
const maxRetries = 2 const maxElapsedTime = 10 * time.Millisecond
retryBackend := New(be, maxRetries, nil, nil) now := time.Now()
retryBackend := New(be, maxElapsedTime, nil, nil)
var listed []string var listed []string
err := retryBackend.List(context.TODO(), backend.PackFile, func(fi backend.FileInfo) error { err := retryBackend.List(context.TODO(), backend.PackFile, func(fi backend.FileInfo) error {
@ -207,8 +208,9 @@ func TestBackendListRetryErrorBackend(t *testing.T) {
t.Fatalf("wrong error returned, want %v, got %v", ErrBackendTest, err) t.Fatalf("wrong error returned, want %v, got %v", ErrBackendTest, err)
} }
if retries != maxRetries+1 { duration := time.Since(now)
t.Fatalf("List was called %d times, wanted %v", retries, maxRetries+1) if duration > 100*time.Millisecond {
t.Fatalf("list retries took %v, expected at most 10ms", duration)
} }
test.Equals(t, names[:2], listed) test.Equals(t, names[:2], listed)
@ -327,7 +329,7 @@ func TestBackendLoadCircuitBreaker(t *testing.T) {
// trip the circuit breaker for file "other" // trip the circuit breaker for file "other"
err := retryBackend.Load(context.TODO(), backend.Handle{Name: "other"}, 0, 0, nilRd) err := retryBackend.Load(context.TODO(), backend.Handle{Name: "other"}, 0, 0, nilRd)
test.Equals(t, otherError, err, "unexpected error") test.Equals(t, otherError, err, "unexpected error")
test.Equals(t, 3, attempt) test.Equals(t, 2, attempt)
attempt = 0 attempt = 0
err = retryBackend.Load(context.TODO(), backend.Handle{Name: "other"}, 0, 0, nilRd) err = retryBackend.Load(context.TODO(), backend.Handle{Name: "other"}, 0, 0, nilRd)
@ -407,7 +409,7 @@ func TestBackendRetryPermanent(t *testing.T) {
return errors.New("something") return errors.New("something")
}) })
test.Assert(t, !be.IsPermanentErrorFn(err), "error unexpectedly considered permanent %v", err) test.Assert(t, !be.IsPermanentErrorFn(err), "error unexpectedly considered permanent %v", err)
test.Equals(t, 3, attempt) test.Equals(t, 2, attempt)
} }
@ -497,3 +499,64 @@ func TestNotifyWithSuccessIsCalled(t *testing.T) {
t.Fatalf("Success should have been called only once, but was called %d times instead", successCalled) t.Fatalf("Success should have been called only once, but was called %d times instead", successCalled)
} }
} }
func TestNotifyWithSuccessFinalError(t *testing.T) {
operation := func() error {
return errors.New("expected error in test")
}
notifyCalled := 0
notify := func(error, time.Duration) {
notifyCalled++
}
successCalled := 0
success := func(retries int) {
successCalled++
}
err := retryNotifyErrorWithSuccess(operation, backoff.WithMaxRetries(&backoff.ZeroBackOff{}, 5), notify, success)
test.Assert(t, err.Error() == "expected error in test", "wrong error message %v", err)
test.Equals(t, 6, notifyCalled, "notify should have been called 6 times")
test.Equals(t, 0, successCalled, "success should not have been called")
}
type testClock struct {
Time time.Time
}
func (c *testClock) Now() time.Time {
return c.Time
}
func TestRetryAtLeastOnce(t *testing.T) {
expBackOff := backoff.NewExponentialBackOff()
expBackOff.InitialInterval = 500 * time.Millisecond
expBackOff.RandomizationFactor = 0
expBackOff.MaxElapsedTime = 5 * time.Second
expBackOff.Multiplier = 2 // guarantee numerical stability
clock := &testClock{Time: time.Now()}
expBackOff.Clock = clock
expBackOff.Reset()
retry := withRetryAtLeastOnce(expBackOff)
// expire backoff
clock.Time = clock.Time.Add(10 * time.Second)
delay := retry.NextBackOff()
test.Equals(t, expBackOff.InitialInterval, delay, "must retry at least once")
delay = retry.NextBackOff()
test.Equals(t, expBackOff.Stop, delay, "must not retry more than once")
// test reset behavior
retry.Reset()
test.Equals(t, uint64(0), retry.numTries, "numTries should be reset to 0")
// Verify that after reset, NextBackOff returns the initial interval again
delay = retry.NextBackOff()
test.Equals(t, expBackOff.InitialInterval, delay, "retries must work after reset")
delay = retry.NextBackOff()
test.Equals(t, expBackOff.InitialInterval*time.Duration(expBackOff.Multiplier), delay, "retries must work after reset")
}

View file

@ -132,7 +132,7 @@ func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lock
// remove the lock from the repo // remove the lock from the repo
debug.Log("unlocking repository with lock %v", lock) debug.Log("unlocking repository with lock %v", lock)
if err := lock.Unlock(); err != nil { if err := lock.Unlock(ctx); err != nil {
debug.Log("error while unlocking: %v", err) debug.Log("error while unlocking: %v", err)
logger("error while unlocking: %v", err) logger("error while unlocking: %v", err)
} }

View file

@ -17,6 +17,10 @@ import (
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
) )
// UnlockCancelDelay bounds the duration how long lock cleanup operations will wait
// if the passed in context was canceled.
const UnlockCancelDelay time.Duration = 1 * time.Minute
// Lock represents a process locking the repository for an operation. // Lock represents a process locking the repository for an operation.
// //
// There are two types of locks: exclusive and non-exclusive. There may be many // There are two types of locks: exclusive and non-exclusive. There may be many
@ -136,7 +140,7 @@ func newLock(ctx context.Context, repo Unpacked, excl bool) (*Lock, error) {
time.Sleep(waitBeforeLockCheck) time.Sleep(waitBeforeLockCheck)
if err = lock.checkForOtherLocks(ctx); err != nil { if err = lock.checkForOtherLocks(ctx); err != nil {
_ = lock.Unlock() _ = lock.Unlock(ctx)
return nil, err return nil, err
} }
@ -220,12 +224,15 @@ func (l *Lock) createLock(ctx context.Context) (ID, error) {
} }
// Unlock removes the lock from the repository. // Unlock removes the lock from the repository.
func (l *Lock) Unlock() error { func (l *Lock) Unlock(ctx context.Context) error {
if l == nil || l.lockID == nil { if l == nil || l.lockID == nil {
return nil return nil
} }
return l.repo.RemoveUnpacked(context.TODO(), LockFile, *l.lockID) ctx, cancel := delayedCancelContext(ctx, UnlockCancelDelay)
defer cancel()
return l.repo.RemoveUnpacked(ctx, LockFile, *l.lockID)
} }
var StaleLockTimeout = 30 * time.Minute var StaleLockTimeout = 30 * time.Minute
@ -266,6 +273,23 @@ func (l *Lock) Stale() bool {
return false return false
} }
func delayedCancelContext(parentCtx context.Context, delay time.Duration) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-parentCtx.Done():
case <-ctx.Done():
return
}
time.Sleep(delay)
cancel()
}()
return ctx, cancel
}
// Refresh refreshes the lock by creating a new file in the backend with a new // Refresh refreshes the lock by creating a new file in the backend with a new
// timestamp. Afterwards the old lock is removed. // timestamp. Afterwards the old lock is removed.
func (l *Lock) Refresh(ctx context.Context) error { func (l *Lock) Refresh(ctx context.Context) error {
@ -285,7 +309,10 @@ func (l *Lock) Refresh(ctx context.Context) error {
oldLockID := l.lockID oldLockID := l.lockID
l.lockID = &id l.lockID = &id
return l.repo.RemoveUnpacked(context.TODO(), LockFile, *oldLockID) ctx, cancel := delayedCancelContext(ctx, UnlockCancelDelay)
defer cancel()
return l.repo.RemoveUnpacked(ctx, LockFile, *oldLockID)
} }
// RefreshStaleLock is an extended variant of Refresh that can also refresh stale lock files. // RefreshStaleLock is an extended variant of Refresh that can also refresh stale lock files.
@ -312,15 +339,19 @@ func (l *Lock) RefreshStaleLock(ctx context.Context) error {
time.Sleep(waitBeforeLockCheck) time.Sleep(waitBeforeLockCheck)
exists, err = l.checkExistence(ctx) exists, err = l.checkExistence(ctx)
ctx, cancel := delayedCancelContext(ctx, UnlockCancelDelay)
defer cancel()
if err != nil { if err != nil {
// cleanup replacement lock // cleanup replacement lock
_ = l.repo.RemoveUnpacked(context.TODO(), LockFile, id) _ = l.repo.RemoveUnpacked(ctx, LockFile, id)
return err return err
} }
if !exists { if !exists {
// cleanup replacement lock // cleanup replacement lock
_ = l.repo.RemoveUnpacked(context.TODO(), LockFile, id) _ = l.repo.RemoveUnpacked(ctx, LockFile, id)
return ErrRemovedLock return ErrRemovedLock
} }
@ -331,7 +362,7 @@ func (l *Lock) RefreshStaleLock(ctx context.Context) error {
oldLockID := l.lockID oldLockID := l.lockID
l.lockID = &id l.lockID = &id
return l.repo.RemoveUnpacked(context.TODO(), LockFile, *oldLockID) return l.repo.RemoveUnpacked(ctx, LockFile, *oldLockID)
} }
func (l *Lock) checkExistence(ctx context.Context) (bool, error) { func (l *Lock) checkExistence(ctx context.Context) (bool, error) {

View file

@ -22,7 +22,7 @@ func TestLock(t *testing.T) {
lock, err := restic.NewLock(context.TODO(), repo) lock, err := restic.NewLock(context.TODO(), repo)
rtest.OK(t, err) rtest.OK(t, err)
rtest.OK(t, lock.Unlock()) rtest.OK(t, lock.Unlock(context.TODO()))
} }
func TestDoubleUnlock(t *testing.T) { func TestDoubleUnlock(t *testing.T) {
@ -32,9 +32,9 @@ func TestDoubleUnlock(t *testing.T) {
lock, err := restic.NewLock(context.TODO(), repo) lock, err := restic.NewLock(context.TODO(), repo)
rtest.OK(t, err) rtest.OK(t, err)
rtest.OK(t, lock.Unlock()) rtest.OK(t, lock.Unlock(context.TODO()))
err = lock.Unlock() err = lock.Unlock(context.TODO())
rtest.Assert(t, err != nil, rtest.Assert(t, err != nil,
"double unlock didn't return an error, got %v", err) "double unlock didn't return an error, got %v", err)
} }
@ -49,8 +49,8 @@ func TestMultipleLock(t *testing.T) {
lock2, err := restic.NewLock(context.TODO(), repo) lock2, err := restic.NewLock(context.TODO(), repo)
rtest.OK(t, err) rtest.OK(t, err)
rtest.OK(t, lock1.Unlock()) rtest.OK(t, lock1.Unlock(context.TODO()))
rtest.OK(t, lock2.Unlock()) rtest.OK(t, lock2.Unlock(context.TODO()))
} }
type failLockLoadingBackend struct { type failLockLoadingBackend struct {
@ -75,7 +75,7 @@ func TestMultipleLockFailure(t *testing.T) {
_, err = restic.NewLock(context.TODO(), repo) _, err = restic.NewLock(context.TODO(), repo)
rtest.Assert(t, err != nil, "unreadable lock file did not result in an error") rtest.Assert(t, err != nil, "unreadable lock file did not result in an error")
rtest.OK(t, lock1.Unlock()) rtest.OK(t, lock1.Unlock(context.TODO()))
} }
func TestLockExclusive(t *testing.T) { func TestLockExclusive(t *testing.T) {
@ -83,7 +83,7 @@ func TestLockExclusive(t *testing.T) {
elock, err := restic.NewExclusiveLock(context.TODO(), repo) elock, err := restic.NewExclusiveLock(context.TODO(), repo)
rtest.OK(t, err) rtest.OK(t, err)
rtest.OK(t, elock.Unlock()) rtest.OK(t, elock.Unlock(context.TODO()))
} }
func TestLockOnExclusiveLockedRepo(t *testing.T) { func TestLockOnExclusiveLockedRepo(t *testing.T) {
@ -99,8 +99,8 @@ func TestLockOnExclusiveLockedRepo(t *testing.T) {
rtest.Assert(t, restic.IsAlreadyLocked(err), rtest.Assert(t, restic.IsAlreadyLocked(err),
"create normal lock with exclusively locked repo didn't return the correct error") "create normal lock with exclusively locked repo didn't return the correct error")
rtest.OK(t, lock.Unlock()) rtest.OK(t, lock.Unlock(context.TODO()))
rtest.OK(t, elock.Unlock()) rtest.OK(t, elock.Unlock(context.TODO()))
} }
func TestExclusiveLockOnLockedRepo(t *testing.T) { func TestExclusiveLockOnLockedRepo(t *testing.T) {
@ -116,8 +116,8 @@ func TestExclusiveLockOnLockedRepo(t *testing.T) {
rtest.Assert(t, restic.IsAlreadyLocked(err), rtest.Assert(t, restic.IsAlreadyLocked(err),
"create normal lock with exclusively locked repo didn't return the correct error") "create normal lock with exclusively locked repo didn't return the correct error")
rtest.OK(t, lock.Unlock()) rtest.OK(t, lock.Unlock(context.TODO()))
rtest.OK(t, elock.Unlock()) rtest.OK(t, elock.Unlock(context.TODO()))
} }
func createFakeLock(repo restic.SaverUnpacked, t time.Time, pid int) (restic.ID, error) { func createFakeLock(repo restic.SaverUnpacked, t time.Time, pid int) (restic.ID, error) {
@ -296,7 +296,7 @@ func testLockRefresh(t *testing.T, refresh func(lock *restic.Lock) error) {
rtest.OK(t, err) rtest.OK(t, err)
rtest.Assert(t, lock2.Time.After(time0), rtest.Assert(t, lock2.Time.After(time0),
"expected a later timestamp after lock refresh") "expected a later timestamp after lock refresh")
rtest.OK(t, lock.Unlock()) rtest.OK(t, lock.Unlock(context.TODO()))
} }
func TestLockRefresh(t *testing.T) { func TestLockRefresh(t *testing.T) {