restic/internal/restic/lock.go

346 lines
9 KiB
Go
Raw Normal View History

2015-06-24 16:17:01 +00:00
package restic
import (
2017-06-03 15:39:57 +00:00
"context"
"fmt"
2015-06-24 16:17:01 +00:00
"os"
"os/signal"
"os/user"
"sync"
2022-10-15 15:25:45 +00:00
"sync/atomic"
2015-06-24 16:17:01 +00:00
"syscall"
2016-09-01 19:13:06 +00:00
"testing"
2015-06-24 16:17:01 +00:00
"time"
2017-07-23 12:21:03 +00:00
"github.com/restic/restic/internal/errors"
2016-08-29 19:38:34 +00:00
2017-07-23 12:21:03 +00:00
"github.com/restic/restic/internal/debug"
2015-06-24 16:17:01 +00:00
)
// Lock represents a process locking the repository for an operation.
//
// There are two types of locks: exclusive and non-exclusive. There may be many
// different non-exclusive locks, but at most one exclusive lock, which can
// only be acquired while no non-exclusive lock is held.
2015-07-12 19:02:00 +00:00
//
// A lock must be refreshed regularly to not be considered stale, this must be
// triggered by regularly calling Refresh.
2015-06-24 16:17:01 +00:00
type Lock struct {
lock sync.Mutex
2015-06-24 16:17:01 +00:00
Time time.Time `json:"time"`
Exclusive bool `json:"exclusive"`
Hostname string `json:"hostname"`
Username string `json:"username"`
PID int `json:"pid"`
UID uint32 `json:"uid,omitempty"`
GID uint32 `json:"gid,omitempty"`
2016-08-31 18:29:54 +00:00
repo Repository
lockID *ID
2015-06-24 16:17:01 +00:00
}
// alreadyLockedError is returned when NewLock or NewExclusiveLock are unable to
// acquire the desired lock.
type alreadyLockedError struct {
otherLock *Lock
}
func (e *alreadyLockedError) Error() string {
s := ""
if e.otherLock.Exclusive {
s = "exclusively "
}
return fmt.Sprintf("repository is already locked %sby %v", s, e.otherLock)
}
// IsAlreadyLocked returns true iff err indicates that a repository is
// already locked.
func IsAlreadyLocked(err error) bool {
var e *alreadyLockedError
return errors.As(err, &e)
}
2015-06-24 16:17:01 +00:00
// NewLock returns a new, non-exclusive lock for the repository. If an
// exclusive lock is already held by another process, it returns an error
// that satisfies IsAlreadyLocked.
2017-06-04 09:16:55 +00:00
func NewLock(ctx context.Context, repo Repository) (*Lock, error) {
return newLock(ctx, repo, false)
2015-06-24 16:17:01 +00:00
}
// NewExclusiveLock returns a new, exclusive lock for the repository. If
// another lock (normal and exclusive) is already held by another process,
// it returns an error that satisfies IsAlreadyLocked.
2017-06-04 09:16:55 +00:00
func NewExclusiveLock(ctx context.Context, repo Repository) (*Lock, error) {
return newLock(ctx, repo, true)
2015-06-24 16:17:01 +00:00
}
2016-09-01 19:13:06 +00:00
var waitBeforeLockCheck = 200 * time.Millisecond
// TestSetLockTimeout can be used to reduce the lock wait timeout for tests.
func TestSetLockTimeout(t testing.TB, d time.Duration) {
t.Logf("setting lock timeout to %v", d)
waitBeforeLockCheck = d
}
2015-06-27 12:26:33 +00:00
2017-06-04 09:16:55 +00:00
func newLock(ctx context.Context, repo Repository, excl bool) (*Lock, error) {
2015-06-24 16:17:01 +00:00
lock := &Lock{
Time: time.Now(),
PID: os.Getpid(),
Exclusive: excl,
repo: repo,
}
hn, err := os.Hostname()
if err == nil {
lock.Hostname = hn
}
if err = lock.fillUserInfo(); err != nil {
return nil, err
}
2017-06-04 09:16:55 +00:00
if err = lock.checkForOtherLocks(ctx); err != nil {
2015-06-24 16:17:01 +00:00
return nil, err
}
2017-06-04 09:16:55 +00:00
lockID, err := lock.createLock(ctx)
2015-06-24 16:17:01 +00:00
if err != nil {
return nil, err
}
lock.lockID = &lockID
2015-06-27 12:26:33 +00:00
time.Sleep(waitBeforeLockCheck)
2017-06-04 09:16:55 +00:00
if err = lock.checkForOtherLocks(ctx); err != nil {
_ = lock.Unlock()
return nil, err
2015-06-27 12:26:33 +00:00
}
2015-06-24 16:17:01 +00:00
return lock, nil
}
func (l *Lock) fillUserInfo() error {
usr, err := user.Current()
if err != nil {
return nil
}
l.Username = usr.Username
l.UID, l.GID, err = uidGidInt(usr)
return err
2015-06-24 16:17:01 +00:00
}
// checkForOtherLocks looks for other locks that currently exist in the repository.
//
// If an exclusive lock is to be created, checkForOtherLocks returns an error
// if there are any other locks, regardless if exclusive or not. If a
// non-exclusive lock is to be created, an error is only returned when an
// exclusive lock is found.
2017-06-04 09:16:55 +00:00
func (l *Lock) checkForOtherLocks(ctx context.Context) error {
var err error
// retry locking a few times
for i := 0; i < 3; i++ {
err = ForAllLocks(ctx, l.repo, l.lockID, func(id ID, lock *Lock, err error) error {
if err != nil {
// if we cannot load a lock then it is unclear whether it can be ignored
// it could either be invalid or just unreadable due to network/permission problems
debug.Log("ignore lock %v: %v", id, err)
return errors.Fatal(err.Error())
}
if l.Exclusive {
return &alreadyLockedError{otherLock: lock}
}
if !l.Exclusive && lock.Exclusive {
return &alreadyLockedError{otherLock: lock}
}
2015-06-24 16:17:01 +00:00
return nil
})
// no lock detected
if err == nil {
return nil
2015-06-24 16:17:01 +00:00
}
// lock conflicts are permanent
if _, ok := err.(*alreadyLockedError); ok {
return err
2015-06-24 16:17:01 +00:00
}
}
return err
2015-06-24 16:17:01 +00:00
}
// createLock acquires the lock by creating a file in the repository.
2017-06-04 09:16:55 +00:00
func (l *Lock) createLock(ctx context.Context) (ID, error) {
id, err := SaveJSONUnpacked(ctx, l.repo, LockFile, l)
2015-06-24 16:17:01 +00:00
if err != nil {
2016-08-31 18:29:54 +00:00
return ID{}, err
2015-06-24 16:17:01 +00:00
}
2015-07-12 19:02:00 +00:00
return id, nil
2015-06-24 16:17:01 +00:00
}
// Unlock removes the lock from the repository.
func (l *Lock) Unlock() error {
if l == nil || l.lockID == nil {
return nil
}
2017-06-03 15:39:57 +00:00
return l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: l.lockID.String()})
2015-06-24 16:17:01 +00:00
}
var StaleLockTimeout = 30 * time.Minute
2015-06-24 16:17:01 +00:00
// Stale returns true if the lock is stale. A lock is stale if the timestamp is
// older than 30 minutes or if it was created on the current machine and the
// process isn't alive any more.
func (l *Lock) Stale() bool {
l.lock.Lock()
defer l.lock.Unlock()
2016-09-27 20:35:08 +00:00
debug.Log("testing if lock %v for process %d is stale", l, l.PID)
if time.Since(l.Time) > StaleLockTimeout {
2016-09-27 20:35:08 +00:00
debug.Log("lock is stale, timestamp is too old: %v\n", l.Time)
2015-06-24 16:17:01 +00:00
return true
}
hn, err := os.Hostname()
if err != nil {
2017-02-08 23:43:10 +00:00
debug.Log("unable to find current hostname: %v", err)
// since we cannot find the current hostname, assume that the lock is
// not stale.
return false
}
if hn != l.Hostname {
// lock was created on a different host, assume the lock is not stale.
return false
}
// check if we can reach the process retaining the lock
exists := l.processExists()
if !exists {
2016-09-27 20:35:08 +00:00
debug.Log("could not reach process, %d, lock is probably stale\n", l.PID)
2015-06-24 16:17:01 +00:00
return true
}
2016-09-27 20:35:08 +00:00
debug.Log("lock not stale\n")
2015-06-24 16:17:01 +00:00
return false
}
2015-07-12 19:02:00 +00:00
// Refresh refreshes the lock by creating a new file in the backend with a new
// timestamp. Afterwards the old lock is removed.
2017-06-04 09:16:55 +00:00
func (l *Lock) Refresh(ctx context.Context) error {
2018-01-25 19:49:41 +00:00
debug.Log("refreshing lock %v", l.lockID)
l.lock.Lock()
2019-09-04 18:38:35 +00:00
l.Time = time.Now()
l.lock.Unlock()
2017-06-04 09:16:55 +00:00
id, err := l.createLock(ctx)
2015-07-12 19:02:00 +00:00
if err != nil {
return err
}
l.lock.Lock()
defer l.lock.Unlock()
2018-01-25 19:49:41 +00:00
debug.Log("new lock ID %v", id)
oldLockID := l.lockID
l.lockID = &id
2015-07-12 19:02:00 +00:00
return l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: oldLockID.String()})
2015-07-12 19:02:00 +00:00
}
func (l *Lock) String() string {
l.lock.Lock()
defer l.lock.Unlock()
text := fmt.Sprintf("PID %d on %s by %s (UID %d, GID %d)\nlock was created at %s (%s ago)\nstorage ID %v",
l.PID, l.Hostname, l.Username, l.UID, l.GID,
l.Time.Format("2006-01-02 15:04:05"), time.Since(l.Time),
l.lockID.Str())
return text
}
2015-06-24 16:17:01 +00:00
// listen for incoming SIGHUP and ignore
var ignoreSIGHUP sync.Once
func init() {
ignoreSIGHUP.Do(func() {
go func() {
c := make(chan os.Signal, 1)
2015-06-24 16:17:01 +00:00
signal.Notify(c, syscall.SIGHUP)
for s := range c {
2016-09-27 20:35:08 +00:00
debug.Log("Signal received: %v\n", s)
2015-06-24 16:17:01 +00:00
}
}()
})
}
// LoadLock loads and unserializes a lock from a repository.
2017-06-04 09:16:55 +00:00
func LoadLock(ctx context.Context, repo Repository, id ID) (*Lock, error) {
2015-06-24 16:17:01 +00:00
lock := &Lock{}
if err := LoadJSONUnpacked(ctx, repo, LockFile, id, lock); err != nil {
2015-06-24 16:17:01 +00:00
return nil, err
}
lock.lockID = &id
2015-06-24 16:17:01 +00:00
return lock, nil
}
// RemoveStaleLocks deletes all locks detected as stale from the repository.
func RemoveStaleLocks(ctx context.Context, repo Repository) (uint, error) {
var processed uint
err := ForAllLocks(ctx, repo, nil, func(id ID, lock *Lock, err error) error {
2015-06-24 16:17:01 +00:00
if err != nil {
// ignore locks that cannot be loaded
debug.Log("ignore lock %v: %v", id, err)
2015-06-24 16:17:01 +00:00
return nil
}
if lock.Stale() {
err = repo.Backend().Remove(ctx, Handle{Type: LockFile, Name: id.String()})
if err == nil {
processed++
}
return err
2015-06-24 16:17:01 +00:00
}
return nil
})
return processed, err
2015-06-24 16:17:01 +00:00
}
2015-07-12 19:02:00 +00:00
// RemoveAllLocks removes all locks forcefully.
func RemoveAllLocks(ctx context.Context, repo Repository) (uint, error) {
2022-10-15 15:25:45 +00:00
var processed uint32
err := ParallelList(ctx, repo.Backend(), LockFile, repo.Connections(), func(ctx context.Context, id ID, size int64) error {
err := repo.Backend().Remove(ctx, Handle{Type: LockFile, Name: id.String()})
if err == nil {
2022-10-15 15:25:45 +00:00
atomic.AddUint32(&processed, 1)
}
return err
})
2022-10-15 15:25:45 +00:00
return uint(processed), err
}
2020-12-18 19:46:16 +00:00
// ForAllLocks reads all locks in parallel and calls the given callback.
// It is guaranteed that the function is not run concurrently. If the
// callback returns an error, this function is cancelled and also returns that error.
// If a lock ID is passed via excludeID, it will be ignored.
func ForAllLocks(ctx context.Context, repo Repository, excludeID *ID, fn func(ID, *Lock, error) error) error {
var m sync.Mutex
// For locks decoding is nearly for free, thus just assume were only limited by IO
return ParallelList(ctx, repo.Backend(), LockFile, repo.Connections(), func(ctx context.Context, id ID, size int64) error {
if excludeID != nil && id.Equal(*excludeID) {
2020-12-18 19:46:16 +00:00
return nil
}
lock, err := LoadLock(ctx, repo, id)
2020-12-18 19:46:16 +00:00
m.Lock()
defer m.Unlock()
return fn(id, lock, err)
})
2020-12-18 19:46:16 +00:00
}