Parallelize ForAllSnapshots
This commit is contained in:
parent
5b9ee56335
commit
26f85779be
2 changed files with 71 additions and 86 deletions
|
@ -298,86 +298,6 @@ func (e Error) Error() string {
|
|||
return e.Err.Error()
|
||||
}
|
||||
|
||||
func loadTreeFromSnapshot(ctx context.Context, repo restic.Repository, id restic.ID) (restic.ID, error) {
|
||||
sn, err := restic.LoadSnapshot(ctx, repo, id)
|
||||
if err != nil {
|
||||
debug.Log("error loading snapshot %v: %v", id, err)
|
||||
return restic.ID{}, err
|
||||
}
|
||||
|
||||
if sn.Tree == nil {
|
||||
debug.Log("snapshot %v has no tree", id)
|
||||
return restic.ID{}, errors.Errorf("snapshot %v has no tree", id)
|
||||
}
|
||||
|
||||
return *sn.Tree, nil
|
||||
}
|
||||
|
||||
// loadSnapshotTreeIDs loads all snapshots from backend and returns the tree IDs.
|
||||
func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (restic.IDs, []error) {
|
||||
var trees struct {
|
||||
IDs restic.IDs
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
var errs struct {
|
||||
errs []error
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// track spawned goroutines using wg, create a new context which is
|
||||
// cancelled as soon as an error occurs.
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
ch := make(chan restic.ID)
|
||||
|
||||
// send list of index files through ch, which is closed afterwards
|
||||
wg.Go(func() error {
|
||||
defer close(ch)
|
||||
return repo.List(ctx, restic.SnapshotFile, func(id restic.ID, size int64) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case ch <- id:
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
// a worker receives an index ID from ch, loads the snapshot and the tree,
|
||||
// and adds the result to errs and trees.
|
||||
worker := func() error {
|
||||
for id := range ch {
|
||||
debug.Log("load snapshot %v", id)
|
||||
|
||||
treeID, err := loadTreeFromSnapshot(ctx, repo, id)
|
||||
if err != nil {
|
||||
errs.Lock()
|
||||
errs.errs = append(errs.errs, err)
|
||||
errs.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
debug.Log("snapshot %v has tree %v", id, treeID)
|
||||
trees.Lock()
|
||||
trees.IDs = append(trees.IDs, treeID)
|
||||
trees.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < defaultParallelism; i++ {
|
||||
wg.Go(worker)
|
||||
}
|
||||
|
||||
err := wg.Wait()
|
||||
if err != nil {
|
||||
errs.errs = append(errs.errs, err)
|
||||
}
|
||||
|
||||
return trees.IDs, errs.errs
|
||||
}
|
||||
|
||||
// TreeError collects several errors that occurred while processing a tree.
|
||||
type TreeError struct {
|
||||
ID restic.ID
|
||||
|
@ -586,6 +506,24 @@ func (c *Checker) filterTrees(ctx context.Context, backlog restic.IDs, loaderCha
|
|||
}
|
||||
}
|
||||
|
||||
func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (ids restic.IDs, errs []error) {
|
||||
err := restic.ForAllSnapshots(ctx, repo, nil, func(id restic.ID, sn *restic.Snapshot, err error) error {
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
return nil
|
||||
}
|
||||
treeID := *sn.Tree
|
||||
debug.Log("snapshot %v has tree %v", id, treeID)
|
||||
ids = append(ids, treeID)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
return ids, errs
|
||||
}
|
||||
|
||||
// Structure checks that for all snapshots all referenced data blobs and
|
||||
// subtrees are available in the index. errChan is closed after all trees have
|
||||
// been traversed.
|
||||
|
|
|
@ -5,8 +5,11 @@ import (
|
|||
"fmt"
|
||||
"os/user"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
)
|
||||
|
||||
|
@ -66,17 +69,61 @@ func LoadSnapshot(ctx context.Context, repo Repository, id ID) (*Snapshot, error
|
|||
return sn, nil
|
||||
}
|
||||
|
||||
// ForAllSnapshots reads all snapshots and calls the given function.
|
||||
const loadSnapshotParallelism = 5
|
||||
|
||||
// ForAllSnapshots reads all snapshots in parallel and calls the
|
||||
// given function. It is guaranteed that the function is not run concurrently.
|
||||
// If the called function returns an error, this function is cancelled and
|
||||
// also returns this error.
|
||||
// If a snapshot ID is in excludeIDs, it will be ignored.
|
||||
func ForAllSnapshots(ctx context.Context, repo Repository, excludeIDs IDSet, fn func(ID, *Snapshot, error) error) error {
|
||||
return repo.List(ctx, SnapshotFile, func(id ID, size int64) error {
|
||||
if excludeIDs.Has(id) {
|
||||
var m sync.Mutex
|
||||
|
||||
// track spawned goroutines using wg, create a new context which is
|
||||
// cancelled as soon as an error occurs.
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
ch := make(chan ID)
|
||||
|
||||
// send list of snapshot files through ch, which is closed afterwards
|
||||
wg.Go(func() error {
|
||||
defer close(ch)
|
||||
return repo.List(ctx, SnapshotFile, func(id ID, size int64) error {
|
||||
if excludeIDs.Has(id) {
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case ch <- id:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
sn, err := LoadSnapshot(ctx, repo, id)
|
||||
return fn(id, sn, err)
|
||||
})
|
||||
})
|
||||
|
||||
// a worker receives an snapshot ID from ch, loads the snapshot
|
||||
// and runs fn with id, the snapshot and the error
|
||||
worker := func() error {
|
||||
for id := range ch {
|
||||
debug.Log("load snapshot %v", id)
|
||||
sn, err := LoadSnapshot(ctx, repo, id)
|
||||
|
||||
m.Lock()
|
||||
err = fn(id, sn, err)
|
||||
m.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < loadSnapshotParallelism; i++ {
|
||||
wg.Go(worker)
|
||||
}
|
||||
|
||||
return wg.Wait()
|
||||
}
|
||||
|
||||
// LoadAllSnapshots returns a list of all snapshots in the repo.
|
||||
|
|
Loading…
Reference in a new issue