forked from TrueCloudLab/restic
Merge pull request #1548 from ifedorenko/checker-backend-Test
checker: Optimize checker.Packs()
This commit is contained in:
commit
bcb6881ffb
2 changed files with 32 additions and 61 deletions
6
changelog/0.8.2/issue-1541
Normal file
6
changelog/0.8.2/issue-1541
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
Enhancement: Reduce number of remote requests during repository check
|
||||||
|
|
||||||
|
This change eliminates redundant remote repository calls and significantly
|
||||||
|
improves repository check time.
|
||||||
|
|
||||||
|
https://github.com/restic/restic/issues/1541
|
|
@ -30,8 +30,7 @@ type Checker struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
M map[restic.ID]uint
|
M map[restic.ID]uint
|
||||||
}
|
}
|
||||||
indexes map[restic.ID]*repository.Index
|
indexes map[restic.ID]*repository.Index
|
||||||
orphanedPacks restic.IDs
|
|
||||||
|
|
||||||
masterIndex *repository.MasterIndex
|
masterIndex *repository.MasterIndex
|
||||||
|
|
||||||
|
@ -183,38 +182,6 @@ func (e PackError) Error() string {
|
||||||
return "pack " + e.ID.String() + ": " + e.Err.Error()
|
return "pack " + e.ID.String() + ": " + e.Err.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
func packIDTester(ctx context.Context, repo restic.Repository, inChan <-chan restic.ID, errChan chan<- error, wg *sync.WaitGroup) {
|
|
||||||
debug.Log("worker start")
|
|
||||||
defer debug.Log("worker done")
|
|
||||||
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
for id := range inChan {
|
|
||||||
h := restic.Handle{Type: restic.DataFile, Name: id.String()}
|
|
||||||
ok, err := repo.Backend().Test(ctx, h)
|
|
||||||
if err != nil {
|
|
||||||
err = PackError{ID: id, Err: err}
|
|
||||||
} else {
|
|
||||||
if !ok {
|
|
||||||
err = PackError{ID: id, Err: errors.New("does not exist")}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
debug.Log("error checking for pack %s: %v", id.Str(), err)
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case errChan <- err:
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
debug.Log("pack %s exists", id.Str())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Packs checks that all packs referenced in the index are still available and
|
// Packs checks that all packs referenced in the index are still available and
|
||||||
// there are no packs that aren't in an index. errChan is closed after all
|
// there are no packs that aren't in an index. errChan is closed after all
|
||||||
// packs have been checked.
|
// packs have been checked.
|
||||||
|
@ -222,35 +189,33 @@ func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
|
||||||
defer close(errChan)
|
defer close(errChan)
|
||||||
|
|
||||||
debug.Log("checking for %d packs", len(c.packs))
|
debug.Log("checking for %d packs", len(c.packs))
|
||||||
seenPacks := restic.NewIDSet()
|
|
||||||
|
|
||||||
var workerWG sync.WaitGroup
|
|
||||||
|
|
||||||
IDChan := make(chan restic.ID)
|
|
||||||
for i := 0; i < defaultParallelism; i++ {
|
|
||||||
workerWG.Add(1)
|
|
||||||
go packIDTester(ctx, c.repo, IDChan, errChan, &workerWG)
|
|
||||||
}
|
|
||||||
|
|
||||||
for id := range c.packs {
|
|
||||||
seenPacks.Insert(id)
|
|
||||||
IDChan <- id
|
|
||||||
}
|
|
||||||
close(IDChan)
|
|
||||||
|
|
||||||
debug.Log("waiting for %d workers to terminate", defaultParallelism)
|
|
||||||
workerWG.Wait()
|
|
||||||
debug.Log("workers terminated")
|
|
||||||
|
|
||||||
|
debug.Log("listing repository packs")
|
||||||
|
repoPacks := restic.NewIDSet()
|
||||||
for id := range c.repo.List(ctx, restic.DataFile) {
|
for id := range c.repo.List(ctx, restic.DataFile) {
|
||||||
debug.Log("check data blob %v", id.Str())
|
select {
|
||||||
if !seenPacks.Has(id) {
|
case <-ctx.Done():
|
||||||
c.orphanedPacks = append(c.orphanedPacks, id)
|
return
|
||||||
select {
|
default:
|
||||||
case <-ctx.Done():
|
}
|
||||||
return
|
repoPacks.Insert(id)
|
||||||
case errChan <- PackError{ID: id, Orphaned: true, Err: errors.New("not referenced in any index")}:
|
}
|
||||||
}
|
|
||||||
|
// orphaned: present in the repo but not in c.packs
|
||||||
|
for orphanID := range repoPacks.Sub(c.packs) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case errChan <- PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// missing: present in c.packs but not in the repo
|
||||||
|
for missingID := range c.packs.Sub(repoPacks) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case errChan <- PackError{ID: missingID, Err: errors.New("does not exist")}:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue