forked from TrueCloudLab/restic
repository: Load index in parallel
This commit is contained in:
parent
14d252dfba
commit
5c1fe5784a
1 changed files with 40 additions and 22 deletions
|
@ -472,52 +472,70 @@ func (r *Repository) SaveIndex() (backend.ID, error) {
|
||||||
return sid, nil
|
return sid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadIndex loads all index files from the backend and merges them with the
|
const loadIndexParallelism = 20
|
||||||
// current index.
|
|
||||||
|
// LoadIndex loads all index files from the backend in parallel and merges them
|
||||||
|
// with the current index. The first error that occurred is returned.
|
||||||
func (r *Repository) LoadIndex() error {
|
func (r *Repository) LoadIndex() error {
|
||||||
debug.Log("Repo.LoadIndex", "Loading index")
|
debug.Log("Repo.LoadIndex", "Loading index")
|
||||||
done := make(chan struct{})
|
|
||||||
defer close(done)
|
|
||||||
|
|
||||||
for id := range r.be.List(backend.Index, done) {
|
errCh := make(chan error, 1)
|
||||||
err := r.loadIndex(id)
|
indexes := make(chan *Index)
|
||||||
|
|
||||||
|
worker := func(id string, done <-chan struct{}) error {
|
||||||
|
idx, err := LoadIndex(r, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case indexes <- idx:
|
||||||
|
case <-done:
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(indexes)
|
||||||
|
errCh <- FilesInParallel(r.be, backend.Index, loadIndexParallelism, worker)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for idx := range indexes {
|
||||||
|
r.idx.Merge(idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := <-errCh; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadIndex loads the index id and merges it with the currently used index.
|
// LoadIndex loads the index id from backend and returns it.
|
||||||
func (r *Repository) loadIndex(id string) error {
|
func LoadIndex(repo *Repository, id string) (*Index, error) {
|
||||||
debug.Log("Repo.loadIndex", "Loading index %v", id[:8])
|
debug.Log("LoadIndex", "Loading index %v", id[:8])
|
||||||
before := len(r.idx.pack)
|
|
||||||
|
|
||||||
rd, err := r.be.Get(backend.Index, id)
|
rd, err := repo.be.Get(backend.Index, id)
|
||||||
defer rd.Close()
|
defer rd.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// decrypt
|
// decrypt
|
||||||
decryptRd, err := crypto.DecryptFrom(r.key, rd)
|
decryptRd, err := crypto.DecryptFrom(repo.key, rd)
|
||||||
defer decryptRd.Close()
|
defer decryptRd.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
idx, err := DecodeIndex(decryptRd)
|
idx, err := DecodeIndex(decryptRd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debug.Log("Repo.loadIndex", "error while decoding index %v: %v", id, err)
|
debug.Log("LoadIndex", "error while decoding index %v: %v", id, err)
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
r.idx.Merge(idx)
|
return idx, nil
|
||||||
|
|
||||||
after := len(r.idx.pack)
|
|
||||||
debug.Log("Repo.loadIndex", "Loaded index %v, added %v blobs", id[:8], after-before)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SearchKey finds a key with the supplied password, afterwards the config is
|
// SearchKey finds a key with the supplied password, afterwards the config is
|
||||||
|
|
Loading…
Reference in a new issue