forked from TrueCloudLab/restic
repository: parallelize lock tests
This commit is contained in:
parent
e8df50fa3c
commit
044e8bf821
2 changed files with 72 additions and 57 deletions
|
@ -18,21 +18,31 @@ type lockContext struct {
|
||||||
refreshWG sync.WaitGroup
|
refreshWG sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
type locker struct {
|
||||||
retrySleepStart = 5 * time.Second
|
retrySleepStart time.Duration
|
||||||
retrySleepMax = 60 * time.Second
|
retrySleepMax time.Duration
|
||||||
)
|
refreshInterval time.Duration
|
||||||
|
refreshabilityTimeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
func minDuration(a, b time.Duration) time.Duration {
|
const defaultRefreshInterval = 5 * time.Minute
|
||||||
if a <= b {
|
|
||||||
return a
|
var lockerInst = &locker{
|
||||||
}
|
retrySleepStart: 5 * time.Second,
|
||||||
return b
|
retrySleepMax: 60 * time.Second,
|
||||||
|
refreshInterval: defaultRefreshInterval,
|
||||||
|
// consider a lock refresh failed a bit before the lock actually becomes stale
|
||||||
|
// the difference allows to compensate for a small time drift between clients.
|
||||||
|
refreshabilityTimeout: restic.StaleLockTimeout - defaultRefreshInterval*3/2,
|
||||||
|
}
|
||||||
|
|
||||||
|
func Lock(ctx context.Context, repo restic.Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
|
||||||
|
return lockerInst.Lock(ctx, repo, exclusive, retryLock, printRetry, logger)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock wraps the ctx such that it is cancelled when the repository is unlocked
|
// Lock wraps the ctx such that it is cancelled when the repository is unlocked
|
||||||
// cancelling the original context also stops the lock refresh
|
// cancelling the original context also stops the lock refresh
|
||||||
func Lock(ctx context.Context, repo restic.Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
|
func (l *locker) Lock(ctx context.Context, repo restic.Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
|
||||||
|
|
||||||
lockFn := restic.NewLock
|
lockFn := restic.NewLock
|
||||||
if exclusive {
|
if exclusive {
|
||||||
|
@ -42,7 +52,7 @@ func Lock(ctx context.Context, repo restic.Repository, exclusive bool, retryLock
|
||||||
var lock *restic.Lock
|
var lock *restic.Lock
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
retrySleep := minDuration(retrySleepStart, retryLock)
|
retrySleep := minDuration(l.retrySleepStart, retryLock)
|
||||||
retryMessagePrinted := false
|
retryMessagePrinted := false
|
||||||
retryTimeout := time.After(retryLock)
|
retryTimeout := time.After(retryLock)
|
||||||
|
|
||||||
|
@ -68,7 +78,7 @@ retryLoop:
|
||||||
lock, err = lockFn(ctx, repo)
|
lock, err = lockFn(ctx, repo)
|
||||||
break retryLoop
|
break retryLoop
|
||||||
case <-retrySleepCh:
|
case <-retrySleepCh:
|
||||||
retrySleep = minDuration(retrySleep*2, retrySleepMax)
|
retrySleep = minDuration(retrySleep*2, l.retrySleepMax)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// anything else, either a successful lock or another error
|
// anything else, either a successful lock or another error
|
||||||
|
@ -92,26 +102,27 @@ retryLoop:
|
||||||
refreshChan := make(chan struct{})
|
refreshChan := make(chan struct{})
|
||||||
forceRefreshChan := make(chan refreshLockRequest)
|
forceRefreshChan := make(chan refreshLockRequest)
|
||||||
|
|
||||||
go refreshLocks(ctx, repo.Backend(), lockInfo, refreshChan, forceRefreshChan, logger)
|
go l.refreshLocks(ctx, repo.Backend(), lockInfo, refreshChan, forceRefreshChan, logger)
|
||||||
go monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan, logger)
|
go l.monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan, logger)
|
||||||
|
|
||||||
return &Unlocker{lockInfo}, ctx, nil
|
return &Unlocker{lockInfo}, ctx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var refreshInterval = 5 * time.Minute
|
func minDuration(a, b time.Duration) time.Duration {
|
||||||
|
if a <= b {
|
||||||
// consider a lock refresh failed a bit before the lock actually becomes stale
|
return a
|
||||||
// the difference allows to compensate for a small time drift between clients.
|
}
|
||||||
var refreshabilityTimeout = restic.StaleLockTimeout - refreshInterval*3/2
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
type refreshLockRequest struct {
|
type refreshLockRequest struct {
|
||||||
result chan bool
|
result chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func refreshLocks(ctx context.Context, backend backend.Backend, lockInfo *lockContext, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest, logger func(format string, args ...interface{})) {
|
func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lockInfo *lockContext, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest, logger func(format string, args ...interface{})) {
|
||||||
debug.Log("start")
|
debug.Log("start")
|
||||||
lock := lockInfo.lock
|
lock := lockInfo.lock
|
||||||
ticker := time.NewTicker(refreshInterval)
|
ticker := time.NewTicker(l.refreshInterval)
|
||||||
lastRefresh := lock.Time
|
lastRefresh := lock.Time
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -151,7 +162,7 @@ func refreshLocks(ctx context.Context, backend backend.Backend, lockInfo *lockCo
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if time.Since(lastRefresh) > refreshabilityTimeout {
|
if time.Since(lastRefresh) > l.refreshabilityTimeout {
|
||||||
// the lock is too old, wait until the expiry monitor cancels the context
|
// the lock is too old, wait until the expiry monitor cancels the context
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -172,14 +183,14 @@ func refreshLocks(ctx context.Context, backend backend.Backend, lockInfo *lockCo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest, logger func(format string, args ...interface{})) {
|
func (l *locker) monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest, logger func(format string, args ...interface{})) {
|
||||||
// time.Now() might use a monotonic timer which is paused during standby
|
// time.Now() might use a monotonic timer which is paused during standby
|
||||||
// convert to unix time to ensure we compare real time values
|
// convert to unix time to ensure we compare real time values
|
||||||
lastRefresh := time.Now().UnixNano()
|
lastRefresh := time.Now().UnixNano()
|
||||||
pollDuration := 1 * time.Second
|
pollDuration := 1 * time.Second
|
||||||
if refreshInterval < pollDuration {
|
if l.refreshInterval < pollDuration {
|
||||||
// required for TestLockFailedRefresh
|
// required for TestLockFailedRefresh
|
||||||
pollDuration = refreshInterval / 5
|
pollDuration = l.refreshInterval / 5
|
||||||
}
|
}
|
||||||
// timers are paused during standby, which is a problem as the refresh timeout
|
// timers are paused during standby, which is a problem as the refresh timeout
|
||||||
// _must_ expire if the host was too long in standby. Thus fall back to periodic checks
|
// _must_ expire if the host was too long in standby. Thus fall back to periodic checks
|
||||||
|
@ -205,7 +216,7 @@ func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-
|
||||||
}
|
}
|
||||||
lastRefresh = time.Now().UnixNano()
|
lastRefresh = time.Now().UnixNano()
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if time.Now().UnixNano()-lastRefresh < refreshabilityTimeout.Nanoseconds() || refreshStaleLockResult != nil {
|
if time.Now().UnixNano()-lastRefresh < l.refreshabilityTimeout.Nanoseconds() || refreshStaleLockResult != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,8 +37,8 @@ func openLockTestRepo(t *testing.T, wrapper backendWrapper) restic.Repository {
|
||||||
return repo
|
return repo
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkedLockRepo(ctx context.Context, t *testing.T, repo restic.Repository, retryLock time.Duration) (*Unlocker, context.Context) {
|
func checkedLockRepo(ctx context.Context, t *testing.T, repo restic.Repository, lockerInst *locker, retryLock time.Duration) (*Unlocker, context.Context) {
|
||||||
lock, wrappedCtx, err := Lock(ctx, repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
|
lock, wrappedCtx, err := lockerInst.Lock(ctx, repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
|
||||||
test.OK(t, err)
|
test.OK(t, err)
|
||||||
test.OK(t, wrappedCtx.Err())
|
test.OK(t, wrappedCtx.Err())
|
||||||
if lock.info.lock.Stale() {
|
if lock.info.lock.Stale() {
|
||||||
|
@ -48,9 +48,10 @@ func checkedLockRepo(ctx context.Context, t *testing.T, repo restic.Repository,
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLock(t *testing.T) {
|
func TestLock(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
repo := openLockTestRepo(t, nil)
|
repo := openLockTestRepo(t, nil)
|
||||||
|
|
||||||
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, 0)
|
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, lockerInst, 0)
|
||||||
lock.Unlock()
|
lock.Unlock()
|
||||||
if wrappedCtx.Err() == nil {
|
if wrappedCtx.Err() == nil {
|
||||||
t.Fatal("unlock did not cancel context")
|
t.Fatal("unlock did not cancel context")
|
||||||
|
@ -58,11 +59,12 @@ func TestLock(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLockCancel(t *testing.T) {
|
func TestLockCancel(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
repo := openLockTestRepo(t, nil)
|
repo := openLockTestRepo(t, nil)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
lock, wrappedCtx := checkedLockRepo(ctx, t, repo, 0)
|
lock, wrappedCtx := checkedLockRepo(ctx, t, repo, lockerInst, 0)
|
||||||
cancel()
|
cancel()
|
||||||
if wrappedCtx.Err() == nil {
|
if wrappedCtx.Err() == nil {
|
||||||
t.Fatal("canceled parent context did not cancel context")
|
t.Fatal("canceled parent context did not cancel context")
|
||||||
|
@ -73,6 +75,7 @@ func TestLockCancel(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLockConflict(t *testing.T) {
|
func TestLockConflict(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
repo := openLockTestRepo(t, nil)
|
repo := openLockTestRepo(t, nil)
|
||||||
repo2, err := New(repo.Backend(), Options{})
|
repo2, err := New(repo.Backend(), Options{})
|
||||||
test.OK(t, err)
|
test.OK(t, err)
|
||||||
|
@ -102,19 +105,19 @@ func (b *writeOnceBackend) Save(ctx context.Context, h backend.Handle, rd backen
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLockFailedRefresh(t *testing.T) {
|
func TestLockFailedRefresh(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
repo := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
|
repo := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
|
||||||
return &writeOnceBackend{Backend: r}, nil
|
return &writeOnceBackend{Backend: r}, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
// reduce locking intervals to be suitable for testing
|
// reduce locking intervals to be suitable for testing
|
||||||
ri, rt := refreshInterval, refreshabilityTimeout
|
li := &locker{
|
||||||
refreshInterval = 20 * time.Millisecond
|
retrySleepStart: lockerInst.retrySleepStart,
|
||||||
refreshabilityTimeout = 100 * time.Millisecond
|
retrySleepMax: lockerInst.retrySleepMax,
|
||||||
defer func() {
|
refreshInterval: 20 * time.Millisecond,
|
||||||
refreshInterval, refreshabilityTimeout = ri, rt
|
refreshabilityTimeout: 100 * time.Millisecond,
|
||||||
}()
|
}
|
||||||
|
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, li, 0)
|
||||||
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, 0)
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-wrappedCtx.Done():
|
case <-wrappedCtx.Done():
|
||||||
|
@ -139,6 +142,7 @@ func (b *loggingBackend) Save(ctx context.Context, h backend.Handle, rd backend.
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLockSuccessfulRefresh(t *testing.T) {
|
func TestLockSuccessfulRefresh(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
repo := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
|
repo := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
|
||||||
return &loggingBackend{
|
return &loggingBackend{
|
||||||
Backend: r,
|
Backend: r,
|
||||||
|
@ -148,14 +152,13 @@ func TestLockSuccessfulRefresh(t *testing.T) {
|
||||||
|
|
||||||
t.Logf("test for successful lock refresh %v", time.Now())
|
t.Logf("test for successful lock refresh %v", time.Now())
|
||||||
// reduce locking intervals to be suitable for testing
|
// reduce locking intervals to be suitable for testing
|
||||||
ri, rt := refreshInterval, refreshabilityTimeout
|
li := &locker{
|
||||||
refreshInterval = 60 * time.Millisecond
|
retrySleepStart: lockerInst.retrySleepStart,
|
||||||
refreshabilityTimeout = 500 * time.Millisecond
|
retrySleepMax: lockerInst.retrySleepMax,
|
||||||
defer func() {
|
refreshInterval: 60 * time.Millisecond,
|
||||||
refreshInterval, refreshabilityTimeout = ri, rt
|
refreshabilityTimeout: 500 * time.Millisecond,
|
||||||
}()
|
}
|
||||||
|
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, li, 0)
|
||||||
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, 0)
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-wrappedCtx.Done():
|
case <-wrappedCtx.Done():
|
||||||
|
@ -168,7 +171,7 @@ func TestLockSuccessfulRefresh(t *testing.T) {
|
||||||
buf = buf[:n]
|
buf = buf[:n]
|
||||||
t.Log(string(buf))
|
t.Log(string(buf))
|
||||||
|
|
||||||
case <-time.After(2 * refreshabilityTimeout):
|
case <-time.After(2 * li.refreshabilityTimeout):
|
||||||
// expected lock refresh to work
|
// expected lock refresh to work
|
||||||
}
|
}
|
||||||
// Unlock should not crash
|
// Unlock should not crash
|
||||||
|
@ -190,6 +193,7 @@ func (b *slowBackend) Save(ctx context.Context, h backend.Handle, rd backend.Rew
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLockSuccessfulStaleRefresh(t *testing.T) {
|
func TestLockSuccessfulStaleRefresh(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
var sb *slowBackend
|
var sb *slowBackend
|
||||||
repo := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
|
repo := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
|
||||||
sb = &slowBackend{Backend: r}
|
sb = &slowBackend{Backend: r}
|
||||||
|
@ -198,17 +202,17 @@ func TestLockSuccessfulStaleRefresh(t *testing.T) {
|
||||||
|
|
||||||
t.Logf("test for successful lock refresh %v", time.Now())
|
t.Logf("test for successful lock refresh %v", time.Now())
|
||||||
// reduce locking intervals to be suitable for testing
|
// reduce locking intervals to be suitable for testing
|
||||||
ri, rt := refreshInterval, refreshabilityTimeout
|
li := &locker{
|
||||||
refreshInterval = 10 * time.Millisecond
|
retrySleepStart: lockerInst.retrySleepStart,
|
||||||
refreshabilityTimeout = 50 * time.Millisecond
|
retrySleepMax: lockerInst.retrySleepMax,
|
||||||
defer func() {
|
refreshInterval: 10 * time.Millisecond,
|
||||||
refreshInterval, refreshabilityTimeout = ri, rt
|
refreshabilityTimeout: 50 * time.Millisecond,
|
||||||
}()
|
}
|
||||||
|
|
||||||
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, 0)
|
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, li, 0)
|
||||||
// delay lock refreshing long enough that the lock would expire
|
// delay lock refreshing long enough that the lock would expire
|
||||||
sb.m.Lock()
|
sb.m.Lock()
|
||||||
sb.sleep = refreshabilityTimeout + refreshInterval
|
sb.sleep = li.refreshabilityTimeout + li.refreshInterval
|
||||||
sb.m.Unlock()
|
sb.m.Unlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
@ -216,7 +220,7 @@ func TestLockSuccessfulStaleRefresh(t *testing.T) {
|
||||||
// don't call t.Fatal to allow the lock to be properly cleaned up
|
// don't call t.Fatal to allow the lock to be properly cleaned up
|
||||||
t.Error("lock refresh failed", time.Now())
|
t.Error("lock refresh failed", time.Now())
|
||||||
|
|
||||||
case <-time.After(refreshabilityTimeout):
|
case <-time.After(li.refreshabilityTimeout):
|
||||||
}
|
}
|
||||||
// reset slow backend
|
// reset slow backend
|
||||||
sb.m.Lock()
|
sb.m.Lock()
|
||||||
|
@ -229,7 +233,7 @@ func TestLockSuccessfulStaleRefresh(t *testing.T) {
|
||||||
// don't call t.Fatal to allow the lock to be properly cleaned up
|
// don't call t.Fatal to allow the lock to be properly cleaned up
|
||||||
t.Error("lock refresh failed", time.Now())
|
t.Error("lock refresh failed", time.Now())
|
||||||
|
|
||||||
case <-time.After(3 * refreshabilityTimeout):
|
case <-time.After(3 * li.refreshabilityTimeout):
|
||||||
// expected lock refresh to work
|
// expected lock refresh to work
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue