checker: Optimize checker.Packs()

Use result of single repository.List() to find both missing and
orphaned data packs. For 500GB repository this eliminates ~100K
repository.Test() calls and improves check time by >30M in my
environment (~45min before this change and ~7min after).

Signed-off-by: Igor Fedorenko <igor@ifedorenko.com>
This commit is contained in:
Igor Fedorenko 2018-01-11 21:00:48 -05:00
parent 5723636b35
commit 231076fa4a
2 changed files with 32 additions and 61 deletions

View file

@ -0,0 +1,6 @@
Enhancement: Reduce number of remote requests during repository check
This change eliminates redundant remote repository calls and significantly
improves repository check time.
https://github.com/restic/restic/issues/1541

View file

@ -31,7 +31,6 @@ type Checker struct {
M map[restic.ID]uint
}
indexes map[restic.ID]*repository.Index
orphanedPacks restic.IDs
masterIndex *repository.MasterIndex
@ -183,38 +182,6 @@ func (e PackError) Error() string {
return "pack " + e.ID.String() + ": " + e.Err.Error()
}
func packIDTester(ctx context.Context, repo restic.Repository, inChan <-chan restic.ID, errChan chan<- error, wg *sync.WaitGroup) {
debug.Log("worker start")
defer debug.Log("worker done")
defer wg.Done()
for id := range inChan {
h := restic.Handle{Type: restic.DataFile, Name: id.String()}
ok, err := repo.Backend().Test(ctx, h)
if err != nil {
err = PackError{ID: id, Err: err}
} else {
if !ok {
err = PackError{ID: id, Err: errors.New("does not exist")}
}
}
if err != nil {
debug.Log("error checking for pack %s: %v", id.Str(), err)
select {
case <-ctx.Done():
return
case errChan <- err:
}
continue
}
debug.Log("pack %s exists", id.Str())
}
}
// Packs checks that all packs referenced in the index are still available and
// there are no packs that aren't in an index. errChan is closed after all
// packs have been checked.
@ -222,35 +189,33 @@ func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
defer close(errChan)
debug.Log("checking for %d packs", len(c.packs))
seenPacks := restic.NewIDSet()
var workerWG sync.WaitGroup
IDChan := make(chan restic.ID)
for i := 0; i < defaultParallelism; i++ {
workerWG.Add(1)
go packIDTester(ctx, c.repo, IDChan, errChan, &workerWG)
}
for id := range c.packs {
seenPacks.Insert(id)
IDChan <- id
}
close(IDChan)
debug.Log("waiting for %d workers to terminate", defaultParallelism)
workerWG.Wait()
debug.Log("workers terminated")
debug.Log("listing repository packs")
repoPacks := restic.NewIDSet()
for id := range c.repo.List(ctx, restic.DataFile) {
debug.Log("check data blob %v", id.Str())
if !seenPacks.Has(id) {
c.orphanedPacks = append(c.orphanedPacks, id)
select {
case <-ctx.Done():
return
case errChan <- PackError{ID: id, Orphaned: true, Err: errors.New("not referenced in any index")}:
default:
}
repoPacks.Insert(id)
}
// orphaned: present in the repo but not in c.packs
for orphanID := range repoPacks.Sub(c.packs) {
select {
case <-ctx.Done():
return
case errChan <- PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}:
}
}
// missing: present in c.packs but not in the repo
for missingID := range c.packs.Sub(repoPacks) {
select {
case <-ctx.Done():
return
case errChan <- PackError{ID: missingID, Err: errors.New("does not exist")}:
}
}
}