forked from TrueCloudLab/restic
Merge pull request #3174 from MichaelEischer/parallelize-lock-loading
Parallelize lock file loading
This commit is contained in:
commit
6aa7e9f9c6
2 changed files with 63 additions and 9 deletions
|
@ -1,4 +1,4 @@
|
||||||
Enhancement: Parallelize reading of snapshots
|
Enhancement: Parallelize reading of locks and snapshots
|
||||||
|
|
||||||
Restic used to read snapshots sequentially. For repositories containing
|
Restic used to read snapshots sequentially. For repositories containing
|
||||||
many snapshots this slowed down commands which have to read all snapshots.
|
many snapshots this slowed down commands which have to read all snapshots.
|
||||||
|
@ -6,4 +6,7 @@ Now the reading of snapshots is parallelized. This speeds up for example
|
||||||
`prune`, `backup` and other commands that search for snapshots with certain
|
`prune`, `backup` and other commands that search for snapshots with certain
|
||||||
properties or which have to find the `latest` snapshot.
|
properties or which have to find the `latest` snapshot.
|
||||||
|
|
||||||
|
The speed up also applies to locks stored in the backup repository.
|
||||||
|
|
||||||
https://github.com/restic/restic/pull/3130
|
https://github.com/restic/restic/pull/3130
|
||||||
|
https://github.com/restic/restic/pull/3174
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/errors"
|
"github.com/restic/restic/internal/errors"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/debug"
|
"github.com/restic/restic/internal/debug"
|
||||||
)
|
)
|
||||||
|
@ -138,12 +139,7 @@ func (l *Lock) fillUserInfo() error {
|
||||||
// non-exclusive lock is to be created, an error is only returned when an
|
// non-exclusive lock is to be created, an error is only returned when an
|
||||||
// exclusive lock is found.
|
// exclusive lock is found.
|
||||||
func (l *Lock) checkForOtherLocks(ctx context.Context) error {
|
func (l *Lock) checkForOtherLocks(ctx context.Context) error {
|
||||||
return l.repo.List(ctx, LockFile, func(id ID, size int64) error {
|
return ForAllLocks(ctx, l.repo, l.lockID, func(id ID, lock *Lock, err error) error {
|
||||||
if l.lockID != nil && id.Equal(*l.lockID) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
lock, err := LoadLock(ctx, l.repo, id)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// ignore locks that cannot be loaded
|
// ignore locks that cannot be loaded
|
||||||
debug.Log("ignore lock %v: %v", id, err)
|
debug.Log("ignore lock %v: %v", id, err)
|
||||||
|
@ -275,8 +271,7 @@ func LoadLock(ctx context.Context, repo Repository, id ID) (*Lock, error) {
|
||||||
|
|
||||||
// RemoveStaleLocks deletes all locks detected as stale from the repository.
|
// RemoveStaleLocks deletes all locks detected as stale from the repository.
|
||||||
func RemoveStaleLocks(ctx context.Context, repo Repository) error {
|
func RemoveStaleLocks(ctx context.Context, repo Repository) error {
|
||||||
return repo.List(ctx, LockFile, func(id ID, size int64) error {
|
return ForAllLocks(ctx, repo, nil, func(id ID, lock *Lock, err error) error {
|
||||||
lock, err := LoadLock(ctx, repo, id)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// ignore locks that cannot be loaded
|
// ignore locks that cannot be loaded
|
||||||
debug.Log("ignore lock %v: %v", id, err)
|
debug.Log("ignore lock %v: %v", id, err)
|
||||||
|
@ -297,3 +292,59 @@ func RemoveAllLocks(ctx context.Context, repo Repository) error {
|
||||||
return repo.Backend().Remove(ctx, Handle{Type: LockFile, Name: id.String()})
|
return repo.Backend().Remove(ctx, Handle{Type: LockFile, Name: id.String()})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const loadLockParallelism = 5
|
||||||
|
|
||||||
|
// ForAllLocks reads all locks in parallel and calls the given callback.
|
||||||
|
// It is guaranteed that the function is not run concurrently. If the
|
||||||
|
// callback returns an error, this function is cancelled and also returns that error.
|
||||||
|
// If a lock ID is passed via excludeID, it will be ignored.
|
||||||
|
func ForAllLocks(ctx context.Context, repo Repository, excludeID *ID, fn func(ID, *Lock, error) error) error {
|
||||||
|
var m sync.Mutex
|
||||||
|
|
||||||
|
// track spawned goroutines using wg, create a new context which is
|
||||||
|
// cancelled as soon as an error occurs.
|
||||||
|
wg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
|
ch := make(chan ID)
|
||||||
|
|
||||||
|
// send list of lock files through ch, which is closed afterwards
|
||||||
|
wg.Go(func() error {
|
||||||
|
defer close(ch)
|
||||||
|
return repo.List(ctx, LockFile, func(id ID, size int64) error {
|
||||||
|
if excludeID != nil && id.Equal(*excludeID) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case ch <- id:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// a worker receives an snapshot ID from ch, loads the snapshot
|
||||||
|
// and runs fn with id, the snapshot and the error
|
||||||
|
worker := func() error {
|
||||||
|
for id := range ch {
|
||||||
|
debug.Log("load lock %v", id)
|
||||||
|
lock, err := LoadLock(ctx, repo, id)
|
||||||
|
|
||||||
|
m.Lock()
|
||||||
|
err = fn(id, lock, err)
|
||||||
|
m.Unlock()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < loadLockParallelism; i++ {
|
||||||
|
wg.Go(worker)
|
||||||
|
}
|
||||||
|
|
||||||
|
return wg.Wait()
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue