repository: deduplicate index loading implementation

This commit is contained in:
Michael Eischer 2020-11-07 18:50:19 +01:00
parent ccc84af73d
commit 24474a36f4

View file

@ -434,88 +434,35 @@ func (r *Repository) SaveFullIndex(ctx context.Context) error {
return r.saveIndex(ctx, r.idx.FinalizeFullIndexes()...)
}
const loadIndexParallelism = 4
// LoadIndex loads all index files from the backend in parallel and stores them
// in the master index. The first error that occurred is returned.
func (r *Repository) LoadIndex(ctx context.Context) error {
debug.Log("Loading index")
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx)
type FileInfo struct {
restic.ID
Size int64
}
ch := make(chan FileInfo)
indexCh := make(chan *Index)
// send list of index files through ch, which is closed afterwards
wg.Go(func() error {
defer close(ch)
return r.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error {
select {
case <-ctx.Done():
return nil
case ch <- FileInfo{id, size}:
}
return nil
})
})
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
worker := func() error {
var buf []byte
for fi := range ch {
var err error
buf, err = r.LoadAndDecrypt(ctx, buf[:0], restic.IndexFile, fi.ID)
if err != nil {
return errors.Wrapf(err, "unable to load index %s", fi.ID.Str())
}
idx, _, err := DecodeIndex(buf, fi.ID)
if err != nil {
return errors.Wrapf(err, "unable to decode index %s", fi.ID.Str())
}
select {
case indexCh <- idx:
case <-ctx.Done():
}
}
return nil
}
// run workers on ch
wg.Go(func() error {
defer close(indexCh)
return RunWorkers(loadIndexParallelism, worker)
})
// receive decoded indexes
validIndex := restic.NewIDSet()
wg.Go(func() error {
for idx := range indexCh {
ids, err := idx.IDs()
if err == nil {
for _, id := range ids {
validIndex.Insert(id)
}
}
r.idx.Insert(idx)
err := ForAllIndexes(ctx, r, func(id restic.ID, idx *Index, oldFormat bool, err error) error {
if err != nil {
return err
}
r.idx.MergeFinalIndexes()
ids, err := idx.IDs()
if err != nil {
return err
}
for _, id := range ids {
validIndex.Insert(id)
}
r.idx.Insert(idx)
return nil
})
err := wg.Wait()
if err != nil {
return errors.Fatal(err.Error())
}
r.idx.MergeFinalIndexes()
// remove index files from the cache which have been removed in the repo
return r.PrepareCache(validIndex)
}