Merge pull request #2328 from MichaelEischer/no-repeated-checks

Fix duplicate tree checks within `restic check`
This commit is contained in:
MichaelEischer 2020-07-22 22:08:02 +02:00 committed by GitHub
commit 34181b13a2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 302 additions and 56 deletions

View file

@ -0,0 +1,8 @@
Enhancement: Improve speed of check command
We've improved the check command to traverse trees only once independent of
whether they are contained in multiple snapshots. The check command is now much
faster for repositories with a large number of snapshots.
https://github.com/restic/restic/pull/2328
https://github.com/restic/restic/issues/2284

View file

@ -259,7 +259,7 @@ func runCheck(opts CheckOptions, gopts GlobalOptions, args []string) error {
if opts.CheckUnused {
for _, id := range chkr.UnusedBlobs() {
Verbosef("unused blob %v\n", id.Str())
Verbosef("unused blob %v\n", id)
errorsFound = true
}
}

View file

@ -22,29 +22,33 @@ import (
// repository (e.g. missing blobs), and needs a valid Repository to work on.
type Checker struct {
packs restic.IDSet
blobs restic.IDSet
blobRefs struct {
sync.Mutex
M map[restic.ID]uint
// see flags below
M map[restic.BlobHandle]blobStatus
}
indexes map[restic.ID]*repository.Index
masterIndex *repository.MasterIndex
repo restic.Repository
}
type blobStatus uint8
const (
blobStatusExists blobStatus = 1 << iota
blobStatusReferenced
)
// New returns a new checker which runs on repo.
func New(repo restic.Repository) *Checker {
c := &Checker{
packs: restic.NewIDSet(),
blobs: restic.NewIDSet(),
masterIndex: repository.NewMasterIndex(),
indexes: make(map[restic.ID]*repository.Index),
repo: repo,
}
c.blobRefs.M = make(map[restic.ID]uint)
c.blobRefs.M = make(map[restic.BlobHandle]blobStatus)
return c
}
@ -152,15 +156,14 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
continue
}
c.indexes[res.ID] = res.Index
c.masterIndex.Insert(res.Index)
debug.Log("process blobs")
cnt := 0
for blob := range res.Index.Each(ctx) {
c.packs.Insert(blob.PackID)
c.blobs.Insert(blob.ID)
c.blobRefs.M[blob.ID] = 0
h := restic.BlobHandle{ID: blob.ID, Type: blob.Type}
c.blobRefs.M[h] = blobStatusExists
cnt++
if _, ok := packToIndex[blob.PackID]; !ok {
@ -447,20 +450,6 @@ func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out ch
return
}
id := job.ID
alreadyChecked := false
c.blobRefs.Lock()
if c.blobRefs.M[id] > 0 {
alreadyChecked = true
}
c.blobRefs.M[id]++
debug.Log("tree %v refcount %d", job.ID, c.blobRefs.M[id])
c.blobRefs.Unlock()
if alreadyChecked {
continue
}
debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.error)
var errs []error
@ -485,7 +474,7 @@ func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out ch
}
}
func filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- restic.ID, in <-chan treeJob, out chan<- treeJob) {
func (c *Checker) filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- restic.ID, in <-chan treeJob, out chan<- treeJob) {
defer func() {
debug.Log("closing output channels")
close(loaderChan)
@ -506,8 +495,21 @@ func filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- rest
for {
if loadCh == nil && len(backlog) > 0 {
// process last added ids first, that is traverse the tree in depth-first order
ln := len(backlog) - 1
nextTreeID, backlog = backlog[ln], backlog[:ln]
// use a separate flag for processed trees to ensure that check still processes trees
// even when a file references a tree blob
c.blobRefs.Lock()
h := restic.BlobHandle{ID: nextTreeID, Type: restic.TreeBlob}
status := c.blobRefs.M[h]
c.blobRefs.Unlock()
if (status & blobStatusReferenced) != 0 {
continue
}
loadCh = loaderChan
nextTreeID, backlog = backlog[0], backlog[1:]
}
if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 {
@ -522,6 +524,10 @@ func filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- rest
case loadCh <- nextTreeID:
outstandingLoadTreeJobs++
loadCh = nil
c.blobRefs.Lock()
h := restic.BlobHandle{ID: nextTreeID, Type: restic.TreeBlob}
c.blobRefs.M[h] |= blobStatusReferenced
c.blobRefs.Unlock()
case j, ok := <-inCh:
if !ok {
@ -535,16 +541,18 @@ func filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- rest
debug.Log("input job tree %v", j.ID)
var err error
if j.error != nil {
debug.Log("received job with error: %v (tree %v, ID %v)", j.error, j.Tree, j.ID)
} else if j.Tree == nil {
debug.Log("received job with nil tree pointer: %v (ID %v)", j.error, j.ID)
err = errors.New("tree is nil and error is nil")
// send a new job with the new error instead of the old one
j = treeJob{ID: j.ID, error: errors.New("tree is nil and error is nil")}
} else {
debug.Log("subtrees for tree %v: %v", j.ID, j.Tree.Subtrees())
for _, id := range j.Tree.Subtrees() {
subtrees := j.Tree.Subtrees()
debug.Log("subtrees for tree %v: %v", j.ID, subtrees)
// iterate backwards over subtree to compensate backwards traversal order of nextTreeID selection
for i := len(subtrees) - 1; i >= 0; i-- {
id := subtrees[i]
if id.IsNull() {
// We do not need to raise this error here, it is
// checked when the tree is checked. Just make sure
@ -556,11 +564,6 @@ func filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- rest
}
}
if err != nil {
// send a new job with the new error instead of the old one
j = treeJob{ID: j.ID, error: err}
}
job = j
outCh = out
inCh = nil
@ -601,7 +604,7 @@ func (c *Checker) Structure(ctx context.Context, errChan chan<- error) {
go c.checkTreeWorker(ctx, treeJobChan2, errChan, &wg)
}
filterTrees(ctx, trees, treeIDChan, treeJobChan1, treeJobChan2)
c.filterTrees(ctx, trees, treeIDChan, treeJobChan1, treeJobChan2)
wg.Wait()
}
@ -656,28 +659,27 @@ func (c *Checker) checkTree(id restic.ID, tree *restic.Tree) (errs []error) {
for _, blobID := range blobs {
c.blobRefs.Lock()
c.blobRefs.M[blobID]++
debug.Log("blob %v refcount %d", blobID, c.blobRefs.M[blobID])
c.blobRefs.Unlock()
if !c.blobs.Has(blobID) {
h := restic.BlobHandle{ID: blobID, Type: restic.DataBlob}
if (c.blobRefs.M[h] & blobStatusExists) == 0 {
debug.Log("tree %v references blob %v which isn't contained in index", id, blobID)
errs = append(errs, Error{TreeID: id, BlobID: blobID, Err: errors.New("not found in index")})
}
c.blobRefs.M[h] |= blobStatusReferenced
debug.Log("blob %v is referenced", blobID)
c.blobRefs.Unlock()
}
return errs
}
// UnusedBlobs returns all blobs that have never been referenced.
func (c *Checker) UnusedBlobs() (blobs restic.IDs) {
func (c *Checker) UnusedBlobs() (blobs restic.BlobHandles) {
c.blobRefs.Lock()
defer c.blobRefs.Unlock()
debug.Log("checking %d blobs", len(c.blobs))
for id := range c.blobs {
if c.blobRefs.M[id] == 0 {
debug.Log("checking %d blobs", len(c.blobRefs.M))
for id, flags := range c.blobRefs.M {
if (flags & blobStatusReferenced) == 0 {
debug.Log("blob %v not referenced", id)
blobs = append(blobs, id)
}

View file

@ -8,10 +8,14 @@ import (
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"testing"
"time"
"github.com/restic/restic/internal/archiver"
"github.com/restic/restic/internal/checker"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
@ -153,13 +157,13 @@ func TestUnreferencedBlobs(t *testing.T) {
}
test.OK(t, repo.Backend().Remove(context.TODO(), snapshotHandle))
unusedBlobsBySnapshot := restic.IDs{
restic.TestParseID("58c748bbe2929fdf30c73262bd8313fe828f8925b05d1d4a87fe109082acb849"),
restic.TestParseID("988a272ab9768182abfd1fe7d7a7b68967825f0b861d3b36156795832c772235"),
restic.TestParseID("c01952de4d91da1b1b80bc6e06eaa4ec21523f4853b69dc8231708b9b7ec62d8"),
restic.TestParseID("bec3a53d7dc737f9a9bee68b107ec9e8ad722019f649b34d474b9982c3a3fec7"),
restic.TestParseID("2a6f01e5e92d8343c4c6b78b51c5a4dc9c39d42c04e26088c7614b13d8d0559d"),
restic.TestParseID("18b51b327df9391732ba7aaf841a4885f350d8a557b2da8352c9acf8898e3f10"),
unusedBlobsBySnapshot := restic.BlobHandles{
restic.TestParseHandle("58c748bbe2929fdf30c73262bd8313fe828f8925b05d1d4a87fe109082acb849", restic.DataBlob),
restic.TestParseHandle("988a272ab9768182abfd1fe7d7a7b68967825f0b861d3b36156795832c772235", restic.DataBlob),
restic.TestParseHandle("c01952de4d91da1b1b80bc6e06eaa4ec21523f4853b69dc8231708b9b7ec62d8", restic.TreeBlob),
restic.TestParseHandle("bec3a53d7dc737f9a9bee68b107ec9e8ad722019f649b34d474b9982c3a3fec7", restic.TreeBlob),
restic.TestParseHandle("2a6f01e5e92d8343c4c6b78b51c5a4dc9c39d42c04e26088c7614b13d8d0559d", restic.TreeBlob),
restic.TestParseHandle("18b51b327df9391732ba7aaf841a4885f350d8a557b2da8352c9acf8898e3f10", restic.DataBlob),
}
sort.Sort(unusedBlobsBySnapshot)
@ -363,13 +367,38 @@ func TestCheckerModifiedData(t *testing.T) {
}
}
func BenchmarkChecker(t *testing.B) {
// loadTreesOnceRepository allows each tree to be loaded only once
type loadTreesOnceRepository struct {
restic.Repository
loadedTrees restic.IDSet
mutex sync.Mutex
DuplicateTree bool
}
func (r *loadTreesOnceRepository) LoadTree(ctx context.Context, id restic.ID) (*restic.Tree, error) {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.loadedTrees.Has(id) {
// additionally store error to ensure that it cannot be swallowed
r.DuplicateTree = true
return nil, errors.Errorf("trying to load tree with id %v twice", id)
}
r.loadedTrees.Insert(id)
return r.Repository.LoadTree(ctx, id)
}
func TestCheckerNoDuplicateTreeDecodes(t *testing.T) {
repodir, cleanup := test.Env(t, checkerTestData)
defer cleanup()
repo := repository.TestOpenLocal(t, repodir)
checkRepo := &loadTreesOnceRepository{
Repository: repo,
loadedTrees: restic.NewIDSet(),
}
chkr := checker.New(repo)
chkr := checker.New(checkRepo)
hints, errs := chkr.LoadIndex(context.TODO())
if len(errs) > 0 {
t.Fatalf("expected no errors, got %v: %v", len(errs), errs)
@ -379,6 +408,198 @@ func BenchmarkChecker(t *testing.B) {
t.Errorf("expected no hints, got %v: %v", len(hints), hints)
}
test.OKs(t, checkPacks(chkr))
test.OKs(t, checkStruct(chkr))
test.Assert(t, !checkRepo.DuplicateTree, "detected duplicate tree loading")
}
// delayRepository delays read of a specific handle.
type delayRepository struct {
restic.Repository
DelayTree restic.ID
UnblockChannel chan struct{}
Unblocker sync.Once
}
func (r *delayRepository) LoadTree(ctx context.Context, id restic.ID) (*restic.Tree, error) {
if id == r.DelayTree {
<-r.UnblockChannel
}
return r.Repository.LoadTree(ctx, id)
}
func (r *delayRepository) LookupBlobSize(id restic.ID, t restic.BlobType) (uint, bool) {
if id == r.DelayTree && t == restic.DataBlob {
r.Unblock()
}
return r.Repository.LookupBlobSize(id, t)
}
func (r *delayRepository) Unblock() {
r.Unblocker.Do(func() {
close(r.UnblockChannel)
})
}
func TestCheckerBlobTypeConfusion(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
repo, cleanup := repository.TestRepository(t)
defer cleanup()
damagedNode := &restic.Node{
Name: "damaged",
Type: "file",
Mode: 0644,
Size: 42,
Content: restic.IDs{restic.TestParseID("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")},
}
damagedTree := &restic.Tree{
Nodes: []*restic.Node{damagedNode},
}
id, err := repo.SaveTree(ctx, damagedTree)
test.OK(t, repo.Flush(ctx))
test.OK(t, err)
buf, err := repo.LoadBlob(ctx, restic.TreeBlob, id, nil)
test.OK(t, err)
_, _, err = repo.SaveBlob(ctx, restic.DataBlob, buf, id, false)
test.OK(t, err)
malNode := &restic.Node{
Name: "aaaaa",
Type: "file",
Mode: 0644,
Size: uint64(len(buf)),
Content: restic.IDs{id},
}
dirNode := &restic.Node{
Name: "bbbbb",
Type: "dir",
Mode: 0755,
Subtree: &id,
}
rootTree := &restic.Tree{
Nodes: []*restic.Node{malNode, dirNode},
}
rootId, err := repo.SaveTree(ctx, rootTree)
test.OK(t, err)
test.OK(t, repo.Flush(ctx))
test.OK(t, repo.SaveIndex(ctx))
snapshot, err := restic.NewSnapshot([]string{"/damaged"}, []string{"test"}, "foo", time.Now())
test.OK(t, err)
snapshot.Tree = &rootId
snapId, err := repo.SaveJSONUnpacked(ctx, restic.SnapshotFile, snapshot)
test.OK(t, err)
t.Logf("saved snapshot %v", snapId.Str())
delayRepo := &delayRepository{
Repository: repo,
DelayTree: id,
UnblockChannel: make(chan struct{}),
}
chkr := checker.New(delayRepo)
go func() {
<-ctx.Done()
delayRepo.Unblock()
}()
hints, errs := chkr.LoadIndex(ctx)
if len(errs) > 0 {
t.Fatalf("expected no errors, got %v: %v", len(errs), errs)
}
if len(hints) > 0 {
t.Errorf("expected no hints, got %v: %v", len(hints), hints)
}
errFound := false
for _, err := range checkStruct(chkr) {
t.Logf("struct error: %v", err)
errFound = true
}
test.OK(t, ctx.Err())
if !errFound {
t.Fatal("no error found, checker is broken")
}
}
func loadBenchRepository(t *testing.B) (*checker.Checker, restic.Repository, func()) {
repodir, cleanup := test.Env(t, checkerTestData)
repo := repository.TestOpenLocal(t, repodir)
chkr := checker.New(repo)
hints, errs := chkr.LoadIndex(context.TODO())
if len(errs) > 0 {
defer cleanup()
t.Fatalf("expected no errors, got %v: %v", len(errs), errs)
}
if len(hints) > 0 {
t.Errorf("expected no hints, got %v: %v", len(hints), hints)
}
return chkr, repo, cleanup
}
func BenchmarkChecker(t *testing.B) {
chkr, _, cleanup := loadBenchRepository(t)
defer cleanup()
t.ResetTimer()
for i := 0; i < t.N; i++ {
test.OKs(t, checkPacks(chkr))
test.OKs(t, checkStruct(chkr))
test.OKs(t, checkData(chkr))
}
}
func benchmarkSnapshotScaling(t *testing.B, newSnapshots int) {
chkr, repo, cleanup := loadBenchRepository(t)
defer cleanup()
snID, err := restic.FindSnapshot(repo, "51d249d2")
if err != nil {
t.Fatal(err)
}
var sn2 restic.Snapshot
err = repo.LoadJSONUnpacked(context.TODO(), restic.SnapshotFile, snID, &sn2)
if err != nil {
t.Fatal(err)
}
treeID := sn2.Tree
for i := 0; i < newSnapshots; i++ {
sn, err := restic.NewSnapshot([]string{"test" + strconv.Itoa(i)}, nil, "", time.Now())
if err != nil {
t.Fatal(err)
}
sn.Tree = treeID
_, err = repo.SaveJSONUnpacked(context.TODO(), restic.SnapshotFile, sn)
if err != nil {
t.Fatal(err)
}
}
t.ResetTimer()
for i := 0; i < t.N; i++ {
@ -387,3 +608,13 @@ func BenchmarkChecker(t *testing.B) {
test.OKs(t, checkData(chkr))
}
}
func BenchmarkCheckerSnapshotScaling(b *testing.B) {
counts := []int{50, 100, 200}
for _, count := range counts {
count := count
b.Run(strconv.Itoa(count), func(b *testing.B) {
benchmarkSnapshotScaling(b, count)
})
}
}

View file

@ -200,3 +200,8 @@ func TestParseID(s string) ID {
return id
}
// TestParseHandle parses s as a ID, panics if that fails and creates a BlobHandle with t.
func TestParseHandle(s string, t BlobType) BlobHandle {
return BlobHandle{ID: TestParseID(s), Type: t}
}