Merge pull request #3173 from MichaelEischer/unify-index-loading
Unify index loading
This commit is contained in:
commit
bdfedf1f5b
7 changed files with 185 additions and 181 deletions
|
@ -110,10 +110,8 @@ func printPacks(ctx context.Context, repo *repository.Repository, wr io.Writer)
|
||||||
}
|
}
|
||||||
|
|
||||||
func dumpIndexes(ctx context.Context, repo restic.Repository, wr io.Writer) error {
|
func dumpIndexes(ctx context.Context, repo restic.Repository, wr io.Writer) error {
|
||||||
return repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error {
|
return repository.ForAllIndexes(ctx, repo, func(id restic.ID, idx *repository.Index, oldFormat bool, err error) error {
|
||||||
Printf("index_id: %v\n", id)
|
Printf("index_id: %v\n", id)
|
||||||
|
|
||||||
idx, err := repository.LoadIndex(ctx, repo, id)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,8 +60,7 @@ func runList(cmd *cobra.Command, opts GlobalOptions, args []string) error {
|
||||||
case "locks":
|
case "locks":
|
||||||
t = restic.LockFile
|
t = restic.LockFile
|
||||||
case "blobs":
|
case "blobs":
|
||||||
return repo.List(opts.ctx, restic.IndexFile, func(id restic.ID, size int64) error {
|
return repository.ForAllIndexes(opts.ctx, repo, func(id restic.ID, idx *repository.Index, oldFormat bool, err error) error {
|
||||||
idx, err := repository.LoadIndex(opts.ctx, repo, id)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -70,7 +69,6 @@ func runList(cmd *cobra.Command, opts GlobalOptions, args []string) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return errors.Fatal("invalid type")
|
return errors.Fatal("invalid type")
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,102 +74,38 @@ func (err ErrOldIndexFormat) Error() string {
|
||||||
func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
|
func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
|
||||||
debug.Log("Start")
|
debug.Log("Start")
|
||||||
|
|
||||||
// track spawned goroutines using wg, create a new context which is
|
packToIndex := make(map[restic.ID]restic.IDSet)
|
||||||
// cancelled as soon as an error occurs.
|
err := repository.ForAllIndexes(ctx, c.repo, func(id restic.ID, index *repository.Index, oldFormat bool, err error) error {
|
||||||
wg, wgCtx := errgroup.WithContext(ctx)
|
debug.Log("process index %v, err %v", id, err)
|
||||||
|
|
||||||
type FileInfo struct {
|
|
||||||
restic.ID
|
|
||||||
Size int64
|
|
||||||
}
|
|
||||||
|
|
||||||
type Result struct {
|
|
||||||
*repository.Index
|
|
||||||
restic.ID
|
|
||||||
Err error
|
|
||||||
}
|
|
||||||
|
|
||||||
ch := make(chan FileInfo)
|
|
||||||
resultCh := make(chan Result)
|
|
||||||
|
|
||||||
// send list of index files through ch, which is closed afterwards
|
|
||||||
wg.Go(func() error {
|
|
||||||
defer close(ch)
|
|
||||||
return c.repo.List(wgCtx, restic.IndexFile, func(id restic.ID, size int64) error {
|
|
||||||
select {
|
|
||||||
case <-wgCtx.Done():
|
|
||||||
return nil
|
|
||||||
case ch <- FileInfo{id, size}:
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
|
|
||||||
worker := func() error {
|
|
||||||
var buf []byte
|
|
||||||
for fi := range ch {
|
|
||||||
debug.Log("worker got file %v", fi.ID.Str())
|
|
||||||
var err error
|
|
||||||
var idx *repository.Index
|
|
||||||
oldFormat := false
|
|
||||||
|
|
||||||
buf, err = c.repo.LoadAndDecrypt(wgCtx, buf[:0], restic.IndexFile, fi.ID)
|
|
||||||
if err == nil {
|
|
||||||
idx, oldFormat, err = repository.DecodeIndex(buf, fi.ID)
|
|
||||||
}
|
|
||||||
|
|
||||||
if oldFormat {
|
if oldFormat {
|
||||||
debug.Log("index %v has old format", fi.ID.Str())
|
debug.Log("index %v has old format", id.Str())
|
||||||
hints = append(hints, ErrOldIndexFormat{fi.ID})
|
hints = append(hints, ErrOldIndexFormat{id})
|
||||||
}
|
}
|
||||||
|
|
||||||
err = errors.Wrapf(err, "error loading index %v", fi.ID.Str())
|
err = errors.Wrapf(err, "error loading index %v", id.Str())
|
||||||
|
|
||||||
select {
|
if err != nil {
|
||||||
case resultCh <- Result{idx, fi.ID, err}:
|
errs = append(errs, err)
|
||||||
case <-wgCtx.Done():
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// run workers on ch
|
c.masterIndex.Insert(index)
|
||||||
wg.Go(func() error {
|
|
||||||
defer close(resultCh)
|
|
||||||
return repository.RunWorkers(defaultParallelism, worker)
|
|
||||||
})
|
|
||||||
|
|
||||||
// receive decoded indexes
|
|
||||||
packToIndex := make(map[restic.ID]restic.IDSet)
|
|
||||||
wg.Go(func() error {
|
|
||||||
for res := range resultCh {
|
|
||||||
debug.Log("process index %v, err %v", res.ID, res.Err)
|
|
||||||
|
|
||||||
if res.Err != nil {
|
|
||||||
errs = append(errs, res.Err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
c.masterIndex.Insert(res.Index)
|
|
||||||
|
|
||||||
debug.Log("process blobs")
|
debug.Log("process blobs")
|
||||||
cnt := 0
|
cnt := 0
|
||||||
for blob := range res.Index.Each(wgCtx) {
|
for blob := range index.Each(ctx) {
|
||||||
cnt++
|
cnt++
|
||||||
|
|
||||||
if _, ok := packToIndex[blob.PackID]; !ok {
|
if _, ok := packToIndex[blob.PackID]; !ok {
|
||||||
packToIndex[blob.PackID] = restic.NewIDSet()
|
packToIndex[blob.PackID] = restic.NewIDSet()
|
||||||
}
|
}
|
||||||
packToIndex[blob.PackID].Insert(res.ID)
|
packToIndex[blob.PackID].Insert(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
debug.Log("%d blobs processed", cnt)
|
debug.Log("%d blobs processed", cnt)
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
err := wg.Wait()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
}
|
}
|
||||||
|
|
77
internal/repository/index_parallel.go
Normal file
77
internal/repository/index_parallel.go
Normal file
|
@ -0,0 +1,77 @@
|
||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/restic/restic/internal/debug"
|
||||||
|
"github.com/restic/restic/internal/restic"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
const loadIndexParallelism = 5
|
||||||
|
|
||||||
|
// ForAllIndexes loads all index files in parallel and calls the given callback.
|
||||||
|
// It is guaranteed that the function is not run concurrently. If the callback
|
||||||
|
// returns an error, this function is cancelled and also returns that error.
|
||||||
|
func ForAllIndexes(ctx context.Context, repo restic.Repository,
|
||||||
|
fn func(id restic.ID, index *Index, oldFormat bool, err error) error) error {
|
||||||
|
|
||||||
|
debug.Log("Start")
|
||||||
|
|
||||||
|
type FileInfo struct {
|
||||||
|
restic.ID
|
||||||
|
Size int64
|
||||||
|
}
|
||||||
|
|
||||||
|
var m sync.Mutex
|
||||||
|
|
||||||
|
// track spawned goroutines using wg, create a new context which is
|
||||||
|
// cancelled as soon as an error occurs.
|
||||||
|
wg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
|
ch := make(chan FileInfo)
|
||||||
|
// send list of index files through ch, which is closed afterwards
|
||||||
|
wg.Go(func() error {
|
||||||
|
defer close(ch)
|
||||||
|
return repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case ch <- FileInfo{id, size}:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
|
||||||
|
worker := func() error {
|
||||||
|
var buf []byte
|
||||||
|
for fi := range ch {
|
||||||
|
debug.Log("worker got file %v", fi.ID.Str())
|
||||||
|
var err error
|
||||||
|
var idx *Index
|
||||||
|
oldFormat := false
|
||||||
|
|
||||||
|
buf, err = repo.LoadAndDecrypt(ctx, buf[:0], restic.IndexFile, fi.ID)
|
||||||
|
if err == nil {
|
||||||
|
idx, oldFormat, err = DecodeIndex(buf, fi.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.Lock()
|
||||||
|
err = fn(fi.ID, idx, oldFormat, err)
|
||||||
|
m.Unlock()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// run workers on ch
|
||||||
|
wg.Go(func() error {
|
||||||
|
return RunWorkers(loadIndexParallelism, worker)
|
||||||
|
})
|
||||||
|
|
||||||
|
return wg.Wait()
|
||||||
|
}
|
46
internal/repository/index_parallel_test.go
Normal file
46
internal/repository/index_parallel_test.go
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
package repository_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/restic/restic/internal/errors"
|
||||||
|
"github.com/restic/restic/internal/repository"
|
||||||
|
"github.com/restic/restic/internal/restic"
|
||||||
|
rtest "github.com/restic/restic/internal/test"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRepositoryForAllIndexes(t *testing.T) {
|
||||||
|
repodir, cleanup := rtest.Env(t, repoFixture)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
repo := repository.TestOpenLocal(t, repodir)
|
||||||
|
|
||||||
|
expectedIndexIDs := restic.NewIDSet()
|
||||||
|
rtest.OK(t, repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error {
|
||||||
|
expectedIndexIDs.Insert(id)
|
||||||
|
return nil
|
||||||
|
}))
|
||||||
|
|
||||||
|
// check that all expected indexes are loaded without errors
|
||||||
|
indexIDs := restic.NewIDSet()
|
||||||
|
var indexErr error
|
||||||
|
rtest.OK(t, repository.ForAllIndexes(context.TODO(), repo, func(id restic.ID, index *repository.Index, oldFormat bool, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
indexErr = err
|
||||||
|
}
|
||||||
|
indexIDs.Insert(id)
|
||||||
|
return nil
|
||||||
|
}))
|
||||||
|
rtest.OK(t, indexErr)
|
||||||
|
rtest.Equals(t, expectedIndexIDs, indexIDs)
|
||||||
|
|
||||||
|
// must failed with the returned error
|
||||||
|
iterErr := errors.New("error to pass upwards")
|
||||||
|
|
||||||
|
err := repository.ForAllIndexes(context.TODO(), repo, func(id restic.ID, index *repository.Index, oldFormat bool, err error) error {
|
||||||
|
return iterErr
|
||||||
|
})
|
||||||
|
|
||||||
|
rtest.Equals(t, iterErr, err)
|
||||||
|
}
|
|
@ -434,88 +434,35 @@ func (r *Repository) SaveFullIndex(ctx context.Context) error {
|
||||||
return r.saveIndex(ctx, r.idx.FinalizeFullIndexes()...)
|
return r.saveIndex(ctx, r.idx.FinalizeFullIndexes()...)
|
||||||
}
|
}
|
||||||
|
|
||||||
const loadIndexParallelism = 4
|
|
||||||
|
|
||||||
// LoadIndex loads all index files from the backend in parallel and stores them
|
// LoadIndex loads all index files from the backend in parallel and stores them
|
||||||
// in the master index. The first error that occurred is returned.
|
// in the master index. The first error that occurred is returned.
|
||||||
func (r *Repository) LoadIndex(ctx context.Context) error {
|
func (r *Repository) LoadIndex(ctx context.Context) error {
|
||||||
debug.Log("Loading index")
|
debug.Log("Loading index")
|
||||||
|
|
||||||
// track spawned goroutines using wg, create a new context which is
|
|
||||||
// cancelled as soon as an error occurs.
|
|
||||||
wg, ctx := errgroup.WithContext(ctx)
|
|
||||||
|
|
||||||
type FileInfo struct {
|
|
||||||
restic.ID
|
|
||||||
Size int64
|
|
||||||
}
|
|
||||||
ch := make(chan FileInfo)
|
|
||||||
indexCh := make(chan *Index)
|
|
||||||
|
|
||||||
// send list of index files through ch, which is closed afterwards
|
|
||||||
wg.Go(func() error {
|
|
||||||
defer close(ch)
|
|
||||||
return r.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
case ch <- FileInfo{id, size}:
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
|
|
||||||
worker := func() error {
|
|
||||||
var buf []byte
|
|
||||||
for fi := range ch {
|
|
||||||
var err error
|
|
||||||
buf, err = r.LoadAndDecrypt(ctx, buf[:0], restic.IndexFile, fi.ID)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "unable to load index %s", fi.ID.Str())
|
|
||||||
}
|
|
||||||
idx, _, err := DecodeIndex(buf, fi.ID)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "unable to decode index %s", fi.ID.Str())
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case indexCh <- idx:
|
|
||||||
case <-ctx.Done():
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// run workers on ch
|
|
||||||
wg.Go(func() error {
|
|
||||||
defer close(indexCh)
|
|
||||||
return RunWorkers(loadIndexParallelism, worker)
|
|
||||||
})
|
|
||||||
|
|
||||||
// receive decoded indexes
|
|
||||||
validIndex := restic.NewIDSet()
|
validIndex := restic.NewIDSet()
|
||||||
wg.Go(func() error {
|
err := ForAllIndexes(ctx, r, func(id restic.ID, idx *Index, oldFormat bool, err error) error {
|
||||||
for idx := range indexCh {
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
ids, err := idx.IDs()
|
ids, err := idx.IDs()
|
||||||
if err == nil {
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
validIndex.Insert(id)
|
validIndex.Insert(id)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
r.idx.Insert(idx)
|
r.idx.Insert(idx)
|
||||||
}
|
|
||||||
r.idx.MergeFinalIndexes()
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
err := wg.Wait()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Fatal(err.Error())
|
return errors.Fatal(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.idx.MergeFinalIndexes()
|
||||||
|
|
||||||
// remove index files from the cache which have been removed in the repo
|
// remove index files from the cache which have been removed in the repo
|
||||||
return r.PrepareCache(validIndex)
|
return r.PrepareCache(validIndex)
|
||||||
}
|
}
|
||||||
|
@ -648,20 +595,6 @@ func (r *Repository) PrepareCache(indexIDs restic.IDSet) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadIndex loads the index id from backend and returns it.
|
|
||||||
func LoadIndex(ctx context.Context, repo restic.Repository, id restic.ID) (*Index, error) {
|
|
||||||
buf, err := repo.LoadAndDecrypt(ctx, nil, restic.IndexFile, id)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
idx, oldFormat, err := DecodeIndex(buf, id)
|
|
||||||
if oldFormat {
|
|
||||||
fmt.Fprintf(os.Stderr, "index %v has old format\n", id.Str())
|
|
||||||
}
|
|
||||||
return idx, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// SearchKey finds a key with the supplied password, afterwards the config is
|
// SearchKey finds a key with the supplied password, afterwards the config is
|
||||||
// read and parsed. It tries at most maxKeys key files in the repo.
|
// read and parsed. It tries at most maxKeys key files in the repo.
|
||||||
func (r *Repository) SearchKey(ctx context.Context, password string, maxKeys int, keyHint string) error {
|
func (r *Repository) SearchKey(ctx context.Context, password string, maxKeys int, keyHint string) error {
|
||||||
|
|
|
@ -4,8 +4,10 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -293,6 +295,20 @@ func TestRepositoryLoadIndex(t *testing.T) {
|
||||||
rtest.OK(t, repo.LoadIndex(context.TODO()))
|
rtest.OK(t, repo.LoadIndex(context.TODO()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// loadIndex loads the index id from backend and returns it.
|
||||||
|
func loadIndex(ctx context.Context, repo restic.Repository, id restic.ID) (*repository.Index, error) {
|
||||||
|
buf, err := repo.LoadAndDecrypt(ctx, nil, restic.IndexFile, id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
idx, oldFormat, err := repository.DecodeIndex(buf, id)
|
||||||
|
if oldFormat {
|
||||||
|
fmt.Fprintf(os.Stderr, "index %v has old format\n", id.Str())
|
||||||
|
}
|
||||||
|
return idx, err
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkLoadIndex(b *testing.B) {
|
func BenchmarkLoadIndex(b *testing.B) {
|
||||||
repository.TestUseLowSecurityKDFParameters(b)
|
repository.TestUseLowSecurityKDFParameters(b)
|
||||||
|
|
||||||
|
@ -323,7 +339,7 @@ func BenchmarkLoadIndex(b *testing.B) {
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
_, err := repository.LoadIndex(context.TODO(), repo, id)
|
_, err := loadIndex(context.TODO(), repo, id)
|
||||||
rtest.OK(b, err)
|
rtest.OK(b, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -373,7 +389,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) {
|
||||||
packEntries := make(map[restic.ID]map[restic.ID]struct{})
|
packEntries := make(map[restic.ID]map[restic.ID]struct{})
|
||||||
|
|
||||||
err := repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error {
|
err := repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error {
|
||||||
idx, err := repository.LoadIndex(context.TODO(), repo, id)
|
idx, err := loadIndex(context.TODO(), repo, id)
|
||||||
rtest.OK(t, err)
|
rtest.OK(t, err)
|
||||||
|
|
||||||
for pb := range idx.Each(context.TODO()) {
|
for pb := range idx.Each(context.TODO()) {
|
||||||
|
|
Loading…
Reference in a new issue