forked from TrueCloudLab/restic
check: extract parallel index loading
This commit is contained in:
parent
69f9d269eb
commit
96904f8972
2 changed files with 104 additions and 91 deletions
|
@ -74,102 +74,38 @@ func (err ErrOldIndexFormat) Error() string {
|
|||
func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
|
||||
debug.Log("Start")
|
||||
|
||||
// track spawned goroutines using wg, create a new context which is
|
||||
// cancelled as soon as an error occurs.
|
||||
wg, wgCtx := errgroup.WithContext(ctx)
|
||||
|
||||
type FileInfo struct {
|
||||
restic.ID
|
||||
Size int64
|
||||
}
|
||||
|
||||
type Result struct {
|
||||
*repository.Index
|
||||
restic.ID
|
||||
Err error
|
||||
}
|
||||
|
||||
ch := make(chan FileInfo)
|
||||
resultCh := make(chan Result)
|
||||
|
||||
// send list of index files through ch, which is closed afterwards
|
||||
wg.Go(func() error {
|
||||
defer close(ch)
|
||||
return c.repo.List(wgCtx, restic.IndexFile, func(id restic.ID, size int64) error {
|
||||
select {
|
||||
case <-wgCtx.Done():
|
||||
return nil
|
||||
case ch <- FileInfo{id, size}:
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
|
||||
worker := func() error {
|
||||
var buf []byte
|
||||
for fi := range ch {
|
||||
debug.Log("worker got file %v", fi.ID.Str())
|
||||
var err error
|
||||
var idx *repository.Index
|
||||
oldFormat := false
|
||||
|
||||
buf, err = c.repo.LoadAndDecrypt(wgCtx, buf[:0], restic.IndexFile, fi.ID)
|
||||
if err == nil {
|
||||
idx, oldFormat, err = repository.DecodeIndex(buf, fi.ID)
|
||||
}
|
||||
packToIndex := make(map[restic.ID]restic.IDSet)
|
||||
err := repository.ForAllIndexes(ctx, c.repo, func(id restic.ID, index *repository.Index, oldFormat bool, err error) error {
|
||||
debug.Log("process index %v, err %v", id, err)
|
||||
|
||||
if oldFormat {
|
||||
debug.Log("index %v has old format", fi.ID.Str())
|
||||
hints = append(hints, ErrOldIndexFormat{fi.ID})
|
||||
debug.Log("index %v has old format", id.Str())
|
||||
hints = append(hints, ErrOldIndexFormat{id})
|
||||
}
|
||||
|
||||
err = errors.Wrapf(err, "error loading index %v", fi.ID.Str())
|
||||
err = errors.Wrapf(err, "error loading index %v", id.Str())
|
||||
|
||||
select {
|
||||
case resultCh <- Result{idx, fi.ID, err}:
|
||||
case <-wgCtx.Done():
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
// run workers on ch
|
||||
wg.Go(func() error {
|
||||
defer close(resultCh)
|
||||
return repository.RunWorkers(defaultParallelism, worker)
|
||||
})
|
||||
|
||||
// receive decoded indexes
|
||||
packToIndex := make(map[restic.ID]restic.IDSet)
|
||||
wg.Go(func() error {
|
||||
for res := range resultCh {
|
||||
debug.Log("process index %v, err %v", res.ID, res.Err)
|
||||
|
||||
if res.Err != nil {
|
||||
errs = append(errs, res.Err)
|
||||
continue
|
||||
}
|
||||
|
||||
c.masterIndex.Insert(res.Index)
|
||||
c.masterIndex.Insert(index)
|
||||
|
||||
debug.Log("process blobs")
|
||||
cnt := 0
|
||||
for blob := range res.Index.Each(wgCtx) {
|
||||
for blob := range index.Each(ctx) {
|
||||
cnt++
|
||||
|
||||
if _, ok := packToIndex[blob.PackID]; !ok {
|
||||
packToIndex[blob.PackID] = restic.NewIDSet()
|
||||
}
|
||||
packToIndex[blob.PackID].Insert(res.ID)
|
||||
packToIndex[blob.PackID].Insert(id)
|
||||
}
|
||||
|
||||
debug.Log("%d blobs processed", cnt)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
err := wg.Wait()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
|
77
internal/repository/index_parallel.go
Normal file
77
internal/repository/index_parallel.go
Normal file
|
@ -0,0 +1,77 @@
|
|||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const loadIndexParallelism = 5
|
||||
|
||||
// ForAllIndexes loads all index files in parallel and calls the given callback.
|
||||
// It is guaranteed that the function is not run concurrently. If the callback
|
||||
// returns an error, this function is cancelled and also returns that error.
|
||||
func ForAllIndexes(ctx context.Context, repo restic.Repository,
|
||||
fn func(id restic.ID, index *Index, oldFormat bool, err error) error) error {
|
||||
|
||||
debug.Log("Start")
|
||||
|
||||
type FileInfo struct {
|
||||
restic.ID
|
||||
Size int64
|
||||
}
|
||||
|
||||
var m sync.Mutex
|
||||
|
||||
// track spawned goroutines using wg, create a new context which is
|
||||
// cancelled as soon as an error occurs.
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
ch := make(chan FileInfo)
|
||||
// send list of index files through ch, which is closed afterwards
|
||||
wg.Go(func() error {
|
||||
defer close(ch)
|
||||
return repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case ch <- FileInfo{id, size}:
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
|
||||
worker := func() error {
|
||||
var buf []byte
|
||||
for fi := range ch {
|
||||
debug.Log("worker got file %v", fi.ID.Str())
|
||||
var err error
|
||||
var idx *Index
|
||||
oldFormat := false
|
||||
|
||||
buf, err = repo.LoadAndDecrypt(ctx, buf[:0], restic.IndexFile, fi.ID)
|
||||
if err == nil {
|
||||
idx, oldFormat, err = DecodeIndex(buf, fi.ID)
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
err = fn(fi.ID, idx, oldFormat, err)
|
||||
m.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// run workers on ch
|
||||
wg.Go(func() error {
|
||||
return RunWorkers(loadIndexParallelism, worker)
|
||||
})
|
||||
|
||||
return wg.Wait()
|
||||
}
|
Loading…
Reference in a new issue