From 5108d91bc7af270eb34491edc232663a053cc17c Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 12 Jul 2015 16:42:22 +0200 Subject: [PATCH] checker: check trees and blobs in parallel --- checker/checker.go | 362 +++++++++++++++++++++++++++++++++++---------- tree.go | 11 ++ 2 files changed, 296 insertions(+), 77 deletions(-) diff --git a/checker/checker.go b/checker/checker.go index d3d1b5016..0238a53b3 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -44,8 +44,11 @@ func map2id(id mapID) backend.ID { type Checker struct { packs map[mapID]struct{} blobs map[mapID]struct{} - blobRefs map[mapID]uint - indexes map[mapID]*repository.Index + blobRefs struct { + sync.Mutex + M map[mapID]uint + } + indexes map[mapID]*repository.Index masterIndex *repository.Index @@ -54,17 +57,20 @@ type Checker struct { // New returns a new checker which runs on repo. func New(repo *repository.Repository) *Checker { - return &Checker{ - blobRefs: make(map[mapID]uint), + c := &Checker{ packs: make(map[mapID]struct{}), blobs: make(map[mapID]struct{}), masterIndex: repository.NewIndex(), indexes: make(map[mapID]*repository.Index), repo: repo, } + + c.blobRefs.M = make(map[mapID]uint) + + return c } -const defaultParallelism = 20 +const defaultParallelism = 40 // LoadIndex loads all index files. func (c *Checker) LoadIndex() error { @@ -117,7 +123,7 @@ func (c *Checker) LoadIndex() error { for blob := range res.Index.Each(done) { c.packs[id2map(blob.PackID)] = struct{}{} c.blobs[id2map(blob.ID)] = struct{}{} - c.blobRefs[id2map(blob.ID)] = 0 + c.blobRefs.M[id2map(blob.ID)] = 0 cnt++ } @@ -219,7 +225,7 @@ func (c *Checker) Packs(errChan chan<- error, done <-chan struct{}) { debug.Log("Checker.Packs", "workers terminated") for id := range c.repo.List(backend.Data, done) { - debug.Log("Checker.Packs", "check data blob %v", id) + debug.Log("Checker.Packs", "check data blob %v", id.Str()) if _, ok := seenPacks[id2map(id)]; !ok { select { case <-done: @@ -267,92 +273,279 @@ func loadTreeFromSnapshot(repo *repository.Repository, id backend.ID) (backend.I return sn.Tree, nil } -// Structure checks that for all snapshots all referenced blobs are available -// in the index. errChan is closed after all trees have been traversed. -func (c *Checker) Structure(errChan chan<- error, done <-chan struct{}) { - defer close(errChan) +// loadSnapshotTreeIDs loads all snapshots from backend and returns the tree IDs. +func loadSnapshotTreeIDs(repo *repository.Repository) (backend.IDs, []error) { + var trees struct { + IDs backend.IDs + sync.Mutex + } - var todo backend.IDs + var errs struct { + errs []error + sync.Mutex + } - for id := range c.repo.List(backend.Snapshot, done) { - debug.Log("Checker.Snaphots", "check snapshot %v", id.Str()) - - treeID, err := loadTreeFromSnapshot(c.repo, id) + snapshotWorker := func(strID string, done <-chan struct{}) error { + id, err := backend.ParseID(strID) if err != nil { - select { - case <-done: - return - case errChan <- err: - } - continue + return err + } + + debug.Log("Checker.Snaphots", "load snapshot %v", id.Str()) + + treeID, err := loadTreeFromSnapshot(repo, id) + if err != nil { + errs.Lock() + errs.errs = append(errs.errs, err) + errs.Unlock() + return nil } debug.Log("Checker.Snaphots", "snapshot %v has tree %v", id.Str(), treeID.Str()) - todo = append(todo, treeID) + trees.Lock() + trees.IDs = append(trees.IDs, treeID) + trees.Unlock() + + return nil } - for _, err := range c.trees(todo) { + err := repository.FilesInParallel(repo.Backend(), backend.Snapshot, defaultParallelism, snapshotWorker) + if err != nil { + errs.errs = append(errs.errs, err) + } + + return trees.IDs, errs.errs +} + +// TreeError is returned when loading a tree from the repository failed. +type TreeError struct { + ID backend.ID + Errors []error +} + +func (e TreeError) Error() string { + return fmt.Sprintf("%v: %d errors", e.ID.String(), len(e.Errors)) +} + +type treeJob struct { + backend.ID + error + *restic.Tree +} + +// loadTreeWorker loads trees from repo and sends them to out. +func loadTreeWorker(repo *repository.Repository, + in <-chan backend.ID, out chan<- treeJob, + done <-chan struct{}, wg *sync.WaitGroup) { + + defer func() { + debug.Log("checker.loadTreeWorker", "exiting") + wg.Done() + }() + + var ( + inCh = in + outCh = out + job treeJob + ) + + outCh = nil + for { + select { + case <-done: + return + + case treeID, ok := <-inCh: + if !ok { + return + } + debug.Log("checker.loadTreeWorker", "load tree %v", treeID.Str()) + + tree, err := restic.LoadTree(repo, treeID) + debug.Log("checker.loadTreeWorker", "load tree %v (%v) returned err %v", tree, treeID.Str(), err) + job = treeJob{ID: treeID, error: err, Tree: tree} + outCh = out + inCh = nil + + case outCh <- job: + debug.Log("checker.loadTreeWorker", "sent tree %v", job.ID.Str()) + outCh = nil + inCh = in + } + } +} + +// checkTreeWorker checks the trees received and sends out errors to errChan. +func (c *Checker) checkTreeWorker(in <-chan treeJob, out chan<- TreeError, done <-chan struct{}, wg *sync.WaitGroup) { + defer func() { + debug.Log("checker.checkTreeWorker", "exiting") + wg.Done() + }() + + var ( + inCh = in + outCh = out + treeError TreeError + ) + + outCh = nil + for { + select { + case <-done: + return + + case job, ok := <-inCh: + if !ok { + return + } + + id := id2map(job.ID) + alreadyChecked := false + c.blobRefs.Lock() + if c.blobRefs.M[id] > 0 { + alreadyChecked = true + } + c.blobRefs.M[id]++ + debug.Log("checker.checkTreeWorker", "tree %v refcount %d", job.ID.Str(), c.blobRefs.M[id]) + c.blobRefs.Unlock() + + if alreadyChecked { + continue + } + + debug.Log("checker.checkTreeWorker", "load tree %v", job.ID.Str()) + + errs := c.checkTree(job.ID, job.Tree) + if len(errs) > 0 { + debug.Log("checker.checkTreeWorker", "checked tree %v: %v errors", job.ID.Str(), len(errs)) + treeError = TreeError{ID: job.ID, Errors: errs} + outCh = out + inCh = nil + } + + case outCh <- treeError: + debug.Log("checker.checkTreeWorker", "tree %v: sent %d errors", treeError.ID, len(treeError.Errors)) + outCh = nil + inCh = in + } + } +} + +func filterTrees(backlog backend.IDs, loaderChan chan<- backend.ID, in <-chan treeJob, out chan<- treeJob, done <-chan struct{}) { + defer func() { + debug.Log("checker.filterTrees", "closing output channels") + close(loaderChan) + close(out) + }() + + var ( + inCh = in + outCh = out + loadCh = loaderChan + job treeJob + nextTreeID backend.ID + outstandingLoadTreeJobs = 0 + ) + + outCh = nil + loadCh = nil + + for { + if loadCh == nil && len(backlog) > 0 { + loadCh = loaderChan + nextTreeID, backlog = backlog[0], backlog[1:] + } + + if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 { + debug.Log("checker.filterTrees", "backlog is empty, all channels nil, exiting") + return + } + + select { + case <-done: + return + + case loadCh <- nextTreeID: + outstandingLoadTreeJobs++ + loadCh = nil + + case j, ok := <-inCh: + if !ok { + debug.Log("checker.filterTrees", "input channel closed") + inCh = nil + in = nil + continue + } + + outstandingLoadTreeJobs-- + debug.Log("checker.filterTrees", "input job tree %v", j.ID.Str()) + + backlog = append(backlog, j.Tree.Subtrees()...) + + job = j + outCh = out + inCh = nil + + case outCh <- job: + outCh = nil + inCh = in + } + } +} + +// Structure checks that for all snapshots all referenced data blobs and +// subtrees are available in the index. errChan is closed after all trees have +// been traversed. +func (c *Checker) Structure(errChan chan<- error, done <-chan struct{}) { + defer close(errChan) + + trees, errs := loadSnapshotTreeIDs(c.repo) + debug.Log("checker.Structure", "need to check %d trees from snapshots, %d errs returned", len(trees), len(errs)) + + for _, err := range errs { select { case <-done: return case errChan <- err: } } -} -func (c *Checker) trees(treeIDs backend.IDs) (errs []error) { - treesChecked := make(map[mapID]struct{}) + treeIDChan := make(chan backend.ID) + treeJobChan1 := make(chan treeJob) + treeJobChan2 := make(chan treeJob) + treeErrChan := make(chan TreeError) - for len(treeIDs) > 0 { - id := treeIDs[0] - treeIDs = treeIDs[1:] - - c.blobRefs[id2map(id)]++ - debug.Log("Checker.trees", "tree %v refcount %d", id.Str(), c.blobRefs[id2map(id)]) - - if _, ok := treesChecked[id2map(id)]; ok { - debug.Log("Checker.trees", "tree %v already checked", id.Str()) - continue - } - - debug.Log("Checker.trees", "check tree %v", id.Str()) - - if _, ok := c.blobs[id2map(id)]; !ok { - errs = append(errs, Error{TreeID: id, Err: errors.New("not found in index")}) - continue - } - - blobs, subtrees, treeErrors := c.tree(id) - if treeErrors != nil { - debug.Log("Checker.trees", "error checking tree %v: %v", id.Str(), treeErrors) - errs = append(errs, treeErrors...) - continue - } - - for _, blobID := range blobs { - c.blobRefs[id2map(blobID)]++ - debug.Log("Checker.trees", "blob %v refcount %d", blobID.Str(), c.blobRefs[id2map(blobID)]) - - if _, ok := c.blobs[id2map(blobID)]; !ok { - debug.Log("Checker.trees", "tree %v references blob %v which isn't contained in index", id.Str(), blobID.Str()) - - errs = append(errs, Error{TreeID: id, BlobID: blobID, Err: errors.New("not found in index")}) - } - } - - treeIDs = append(treeIDs, subtrees...) - - treesChecked[id2map(id)] = struct{}{} + var wg sync.WaitGroup + for i := 0; i < defaultParallelism; i++ { + wg.Add(2) + go loadTreeWorker(c.repo, treeIDChan, treeJobChan1, done, &wg) + go c.checkTreeWorker(treeJobChan2, treeErrChan, done, &wg) } - return errs + filterTrees(trees, treeIDChan, treeJobChan1, treeJobChan2, done) + + wg.Wait() } -func (c *Checker) tree(id backend.ID) (blobs backend.IDs, subtrees backend.IDs, errs []error) { - tree, err := restic.LoadTree(c.repo, id) - if err != nil { - return nil, nil, []error{Error{TreeID: id, Err: err}} - } +func (c *Checker) checkTree(id backend.ID, tree *restic.Tree) (errs []error) { + debug.Log("Checker.checkTree", "checking tree %v", id.Str()) + + // if _, ok := c.blobs[id2map(id)]; !ok { + // errs = append(errs, Error{TreeID: id, Err: errors.New("not found in index")}) + // } + + // blobs, subtrees, treeErrors := c.tree(id) + // if treeErrors != nil { + // debug.Log("Checker.trees", "error checking tree %v: %v", id.Str(), treeErrors) + // errs = append(errs, treeErrors...) + // continue + // } + + // treeIDs = append(treeIDs, subtrees...) + + // treesChecked[id2map(id)] = struct{}{} + + var blobs []backend.ID for i, node := range tree.Nodes { switch node.Type { @@ -363,19 +556,34 @@ func (c *Checker) tree(id backend.ID) (blobs backend.IDs, subtrees backend.IDs, errs = append(errs, Error{TreeID: id, Err: fmt.Errorf("node %d is dir but has no subtree", i)}) continue } - - subtrees = append(subtrees, node.Subtree) } } - return blobs, subtrees, errs + for _, blobID := range blobs { + mid := id2map(blobID) + c.blobRefs.Lock() + c.blobRefs.M[mid]++ + debug.Log("Checker.checkTree", "blob %v refcount %d", blobID.Str(), c.blobRefs.M[mid]) + c.blobRefs.Unlock() + + if _, ok := c.blobs[id2map(blobID)]; !ok { + debug.Log("Checker.trees", "tree %v references blob %v which isn't contained in index", id.Str(), blobID.Str()) + + errs = append(errs, Error{TreeID: id, BlobID: blobID, Err: errors.New("not found in index")}) + } + } + + return errs } // UnusedBlobs returns all blobs that have never been referenced. func (c *Checker) UnusedBlobs() (blobs backend.IDs) { + c.blobRefs.Lock() + defer c.blobRefs.Unlock() + debug.Log("Checker.UnusedBlobs", "checking %d blobs", len(c.blobs)) for id := range c.blobs { - if c.blobRefs[id] == 0 { + if c.blobRefs.M[id] == 0 { debug.Log("Checker.UnusedBlobs", "blob %v not not referenced", map2id(id).Str()) blobs = append(blobs, map2id(id)) } diff --git a/tree.go b/tree.go index 8afa6cc64..8b34f2bd6 100644 --- a/tree.go +++ b/tree.go @@ -89,3 +89,14 @@ func (t Tree) Find(name string) (*Node, error) { _, node, err := t.binarySearch(name) return node, err } + +// Subtrees returns a slice of all subtree IDs of the tree. +func (t Tree) Subtrees() (trees backend.IDs) { + for _, node := range t.Nodes { + if node.Type == "dir" && node.Subtree != nil { + trees = append(trees, node.Subtree) + } + } + + return trees +}