forked from TrueCloudLab/restic
Merge pull request #4304 from MichaelEischer/unlimited-lock-refresh
lock: Do not limit backend concurrency for lock files
This commit is contained in:
commit
2f518b7241
3 changed files with 46 additions and 15 deletions
5
changelog/unreleased/pull-4304
Normal file
5
changelog/unreleased/pull-4304
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
Bugfix: Avoid lock refresh issues with slow network connections
|
||||||
|
|
||||||
|
On network connections with a low upload speed, restic could often fail backups and other operations with `Fatal: failed to refresh lock in time`. We've reworked the lock refresh to avoid this error.
|
||||||
|
|
||||||
|
https://github.com/restic/restic/pull/4304
|
|
@ -31,14 +31,24 @@ func NewBackend(be restic.Backend) restic.Backend {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// typeDependentLimit acquire a token unless the FileType is a lock file. The returned function
|
||||||
|
// must be called to release the token.
|
||||||
|
func (be *connectionLimitedBackend) typeDependentLimit(t restic.FileType) func() {
|
||||||
|
// allow concurrent lock file operations to ensure that the lock refresh is always possible
|
||||||
|
if t == restic.LockFile {
|
||||||
|
return func() {}
|
||||||
|
}
|
||||||
|
be.sem.GetToken()
|
||||||
|
return be.sem.ReleaseToken
|
||||||
|
}
|
||||||
|
|
||||||
// Save adds new Data to the backend.
|
// Save adds new Data to the backend.
|
||||||
func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||||
if err := h.Valid(); err != nil {
|
if err := h.Valid(); err != nil {
|
||||||
return backoff.Permanent(err)
|
return backoff.Permanent(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
be.sem.GetToken()
|
defer be.typeDependentLimit(h.Type)()
|
||||||
defer be.sem.ReleaseToken()
|
|
||||||
|
|
||||||
return be.Backend.Save(ctx, h, rd)
|
return be.Backend.Save(ctx, h, rd)
|
||||||
}
|
}
|
||||||
|
@ -56,8 +66,7 @@ func (be *connectionLimitedBackend) Load(ctx context.Context, h restic.Handle, l
|
||||||
return backoff.Permanent(errors.Errorf("invalid length %d", length))
|
return backoff.Permanent(errors.Errorf("invalid length %d", length))
|
||||||
}
|
}
|
||||||
|
|
||||||
be.sem.GetToken()
|
defer be.typeDependentLimit(h.Type)()
|
||||||
defer be.sem.ReleaseToken()
|
|
||||||
|
|
||||||
return be.Backend.Load(ctx, h, length, offset, fn)
|
return be.Backend.Load(ctx, h, length, offset, fn)
|
||||||
}
|
}
|
||||||
|
@ -68,8 +77,7 @@ func (be *connectionLimitedBackend) Stat(ctx context.Context, h restic.Handle) (
|
||||||
return restic.FileInfo{}, backoff.Permanent(err)
|
return restic.FileInfo{}, backoff.Permanent(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
be.sem.GetToken()
|
defer be.typeDependentLimit(h.Type)()
|
||||||
defer be.sem.ReleaseToken()
|
|
||||||
|
|
||||||
return be.Backend.Stat(ctx, h)
|
return be.Backend.Stat(ctx, h)
|
||||||
}
|
}
|
||||||
|
@ -80,8 +88,7 @@ func (be *connectionLimitedBackend) Remove(ctx context.Context, h restic.Handle)
|
||||||
return backoff.Permanent(err)
|
return backoff.Permanent(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
be.sem.GetToken()
|
defer be.typeDependentLimit(h.Type)()
|
||||||
defer be.sem.ReleaseToken()
|
|
||||||
|
|
||||||
return be.Backend.Remove(ctx, h)
|
return be.Backend.Remove(ctx, h)
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,7 @@ func countingBlocker() (func(), func(int) int) {
|
||||||
unblock := func(expected int) int {
|
unblock := func(expected int) int {
|
||||||
// give goroutines enough time to block
|
// give goroutines enough time to block
|
||||||
var blocked int64
|
var blocked int64
|
||||||
for i := 0; i < 100 && blocked != int64(expected); i++ {
|
for i := 0; i < 100 && blocked < int64(expected); i++ {
|
||||||
time.Sleep(100 * time.Microsecond)
|
time.Sleep(100 * time.Microsecond)
|
||||||
blocked = atomic.LoadInt64(&ctr)
|
blocked = atomic.LoadInt64(&ctr)
|
||||||
}
|
}
|
||||||
|
@ -99,8 +99,9 @@ func countingBlocker() (func(), func(int) int) {
|
||||||
return wait, unblock
|
return wait, unblock
|
||||||
}
|
}
|
||||||
|
|
||||||
func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(be restic.Backend) func() error, unblock func(int) int) {
|
func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(be restic.Backend) func() error, unblock func(int) int, isUnlimited bool) {
|
||||||
expectBlocked := int(2)
|
expectBlocked := int(2)
|
||||||
|
workerCount := expectBlocked + 1
|
||||||
|
|
||||||
m := mock.NewBackend()
|
m := mock.NewBackend()
|
||||||
setup(m)
|
setup(m)
|
||||||
|
@ -108,10 +109,13 @@ func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(b
|
||||||
be := sema.NewBackend(m)
|
be := sema.NewBackend(m)
|
||||||
|
|
||||||
var wg errgroup.Group
|
var wg errgroup.Group
|
||||||
for i := 0; i < int(expectBlocked+1); i++ {
|
for i := 0; i < workerCount; i++ {
|
||||||
wg.Go(handler(be))
|
wg.Go(handler(be))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if isUnlimited {
|
||||||
|
expectBlocked = workerCount
|
||||||
|
}
|
||||||
blocked := unblock(expectBlocked)
|
blocked := unblock(expectBlocked)
|
||||||
test.Assert(t, blocked == expectBlocked, "Unexpected number of goroutines blocked: %v", blocked)
|
test.Assert(t, blocked == expectBlocked, "Unexpected number of goroutines blocked: %v", blocked)
|
||||||
test.OK(t, wg.Wait())
|
test.OK(t, wg.Wait())
|
||||||
|
@ -129,7 +133,7 @@ func TestConcurrencyLimitSave(t *testing.T) {
|
||||||
h := restic.Handle{Type: restic.PackFile, Name: "foobar"}
|
h := restic.Handle{Type: restic.PackFile, Name: "foobar"}
|
||||||
return be.Save(context.TODO(), h, nil)
|
return be.Save(context.TODO(), h, nil)
|
||||||
}
|
}
|
||||||
}, unblock)
|
}, unblock, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConcurrencyLimitLoad(t *testing.T) {
|
func TestConcurrencyLimitLoad(t *testing.T) {
|
||||||
|
@ -145,7 +149,7 @@ func TestConcurrencyLimitLoad(t *testing.T) {
|
||||||
nilCb := func(rd io.Reader) error { return nil }
|
nilCb := func(rd io.Reader) error { return nil }
|
||||||
return be.Load(context.TODO(), h, 10, 0, nilCb)
|
return be.Load(context.TODO(), h, 10, 0, nilCb)
|
||||||
}
|
}
|
||||||
}, unblock)
|
}, unblock, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConcurrencyLimitStat(t *testing.T) {
|
func TestConcurrencyLimitStat(t *testing.T) {
|
||||||
|
@ -161,7 +165,7 @@ func TestConcurrencyLimitStat(t *testing.T) {
|
||||||
_, err := be.Stat(context.TODO(), h)
|
_, err := be.Stat(context.TODO(), h)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}, unblock)
|
}, unblock, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConcurrencyLimitDelete(t *testing.T) {
|
func TestConcurrencyLimitDelete(t *testing.T) {
|
||||||
|
@ -176,5 +180,20 @@ func TestConcurrencyLimitDelete(t *testing.T) {
|
||||||
h := restic.Handle{Type: restic.PackFile, Name: "foobar"}
|
h := restic.Handle{Type: restic.PackFile, Name: "foobar"}
|
||||||
return be.Remove(context.TODO(), h)
|
return be.Remove(context.TODO(), h)
|
||||||
}
|
}
|
||||||
}, unblock)
|
}, unblock, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConcurrencyUnlimitedLockSave(t *testing.T) {
|
||||||
|
wait, unblock := countingBlocker()
|
||||||
|
concurrencyTester(t, func(m *mock.Backend) {
|
||||||
|
m.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||||
|
wait()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}, func(be restic.Backend) func() error {
|
||||||
|
return func() error {
|
||||||
|
h := restic.Handle{Type: restic.LockFile, Name: "foobar"}
|
||||||
|
return be.Save(context.TODO(), h, nil)
|
||||||
|
}
|
||||||
|
}, unblock, true)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue