lock: rework stale lock refresh to avoid data race
This commit is contained in:
parent
05e5e29a8c
commit
b2fcbc21cb
1 changed files with 37 additions and 15 deletions
|
@ -110,12 +110,12 @@ retryLoop:
|
|||
}
|
||||
lockInfo.refreshWG.Add(2)
|
||||
refreshChan := make(chan struct{})
|
||||
forcedRefreshChan := make(chan struct{})
|
||||
forceRefreshChan := make(chan refreshLockRequest)
|
||||
|
||||
globalLocks.Lock()
|
||||
globalLocks.locks[lock] = lockInfo
|
||||
go refreshLocks(ctx, lockInfo, refreshChan, forcedRefreshChan)
|
||||
go monitorLockRefresh(ctx, repo.Backend(), lockInfo, refreshChan, forcedRefreshChan)
|
||||
go refreshLocks(ctx, repo.Backend(), lockInfo, refreshChan, forceRefreshChan)
|
||||
go monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan)
|
||||
globalLocks.Unlock()
|
||||
|
||||
return lock, ctx, err
|
||||
|
@ -127,7 +127,11 @@ var refreshInterval = 5 * time.Minute
|
|||
// the difference allows to compensate for a small time drift between clients.
|
||||
var refreshabilityTimeout = restic.StaleLockTimeout - refreshInterval*3/2
|
||||
|
||||
func refreshLocks(ctx context.Context, lockInfo *lockContext, refreshed chan<- struct{}, forcedRefresh <-chan struct{}) {
|
||||
type refreshLockRequest struct {
|
||||
result chan bool
|
||||
}
|
||||
|
||||
func refreshLocks(ctx context.Context, backend restic.Backend, lockInfo *lockContext, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest) {
|
||||
debug.Log("start")
|
||||
lock := lockInfo.lock
|
||||
ticker := time.NewTicker(refreshInterval)
|
||||
|
@ -154,9 +158,19 @@ func refreshLocks(ctx context.Context, lockInfo *lockContext, refreshed chan<- s
|
|||
debug.Log("terminate")
|
||||
return
|
||||
|
||||
case <-forcedRefresh:
|
||||
// update lock refresh time
|
||||
lastRefresh = lock.Time
|
||||
case req := <-forceRefresh:
|
||||
// keep on going if our current lock still exists
|
||||
success := tryRefreshStaleLock(ctx, backend, lock, lockInfo.cancel)
|
||||
// inform refresh goroutine about forced refresh
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case req.result <- success:
|
||||
}
|
||||
|
||||
if success {
|
||||
// update lock refresh time
|
||||
lastRefresh = lock.Time
|
||||
}
|
||||
|
||||
case <-ticker.C:
|
||||
if time.Since(lastRefresh) > refreshabilityTimeout {
|
||||
|
@ -180,7 +194,7 @@ func refreshLocks(ctx context.Context, lockInfo *lockContext, refreshed chan<- s
|
|||
}
|
||||
}
|
||||
|
||||
func monitorLockRefresh(ctx context.Context, backend restic.Backend, lockInfo *lockContext, refreshed <-chan struct{}, forcedRefresh chan<- struct{}) {
|
||||
func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest) {
|
||||
// time.Now() might use a monotonic timer which is paused during standby
|
||||
// convert to unix time to ensure we compare real time values
|
||||
lastRefresh := time.Now().UnixNano()
|
||||
|
@ -212,14 +226,22 @@ func monitorLockRefresh(ctx context.Context, backend restic.Backend, lockInfo *l
|
|||
}
|
||||
|
||||
// keep on going if our current lock still exists
|
||||
if tryRefreshStaleLock(ctx, backend, lockInfo.lock, lockInfo.cancel) {
|
||||
lastRefresh = time.Now().UnixNano()
|
||||
refreshReq := refreshLockRequest{
|
||||
result: make(chan bool),
|
||||
}
|
||||
// inform refresh goroutine about forced refresh
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case forceRefresh <- refreshReq:
|
||||
}
|
||||
var success bool
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case success = <-refreshReq.result:
|
||||
}
|
||||
|
||||
// inform refresh gorountine about forced refresh
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case forcedRefresh <- struct{}{}:
|
||||
}
|
||||
if success {
|
||||
lastRefresh = time.Now().UnixNano()
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue