checker: Unify blobs, processed trees and referenced blobs map

The blobRefs map and the processedTrees IDSet are merged to reduce the
memory usage. The blobRefs map now uses separate flags to track blob
usage as data or tree blob. This prevents skipping of trees whose
content is identical to an already processed data blob. A third flag
tracks whether a blob exists or not, which removes the need for the
blobs IDSet.
This commit is contained in:
Michael Eischer 2019-07-13 18:34:55 +02:00
parent 35d8413639
commit 36c69e3ca7

View file

@ -22,10 +22,10 @@ import (
// repository (e.g. missing blobs), and needs a valid Repository to work on.
type Checker struct {
packs restic.IDSet
blobs restic.IDSet
blobRefs struct {
sync.Mutex
M map[restic.ID]bool
// see flags below
M map[restic.ID]blobStatus
}
masterIndex *repository.MasterIndex
@ -33,16 +33,23 @@ type Checker struct {
repo restic.Repository
}
type blobStatus uint8
const (
blobExists blobStatus = 1 << iota
blobReferenced
treeProcessed
)
// New returns a new checker which runs on repo.
func New(repo restic.Repository) *Checker {
c := &Checker{
packs: restic.NewIDSet(),
blobs: restic.NewIDSet(),
masterIndex: repository.NewMasterIndex(),
repo: repo,
}
c.blobRefs.M = make(map[restic.ID]bool)
c.blobRefs.M = make(map[restic.ID]blobStatus)
return c
}
@ -156,7 +163,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
cnt := 0
for blob := range res.Index.Each(ctx) {
c.packs.Insert(blob.PackID)
c.blobs.Insert(blob.ID)
c.blobRefs.M[blob.ID] = blobExists
cnt++
if _, ok := packToIndex[blob.PackID]; !ok {
@ -441,10 +448,6 @@ func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out ch
return
}
c.blobRefs.Lock()
c.blobRefs.M[job.ID] = true
c.blobRefs.Unlock()
debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.error)
var errs []error
@ -469,7 +472,7 @@ func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out ch
}
}
func filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- restic.ID, in <-chan treeJob, out chan<- treeJob) {
func (c *Checker) filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- restic.ID, in <-chan treeJob, out chan<- treeJob) {
defer func() {
debug.Log("closing output channels")
close(loaderChan)
@ -483,7 +486,6 @@ func filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- rest
job treeJob
nextTreeID restic.ID
outstandingLoadTreeJobs = 0
processedTrees = restic.NewIDSet()
)
outCh = nil
@ -492,9 +494,16 @@ func filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- rest
for {
if loadCh == nil && len(backlog) > 0 {
nextTreeID, backlog = backlog[0], backlog[1:]
if processedTrees.Has(nextTreeID) {
// use a separate flag for processed trees to ensure that check still processes trees
// even when a file references a tree blob
c.blobRefs.Lock()
blobFlags := c.blobRefs.M[nextTreeID]
c.blobRefs.Unlock()
if (blobFlags & treeProcessed) != 0 {
continue
}
loadCh = loaderChan
}
@ -510,7 +519,9 @@ func filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- rest
case loadCh <- nextTreeID:
outstandingLoadTreeJobs++
loadCh = nil
processedTrees.Insert(nextTreeID)
c.blobRefs.Lock()
c.blobRefs.M[nextTreeID] |= treeProcessed | blobReferenced
c.blobRefs.Unlock()
case j, ok := <-inCh:
if !ok {
@ -591,7 +602,7 @@ func (c *Checker) Structure(ctx context.Context, errChan chan<- error) {
go c.checkTreeWorker(ctx, treeJobChan2, errChan, &wg)
}
filterTrees(ctx, trees, treeIDChan, treeJobChan1, treeJobChan2)
c.filterTrees(ctx, trees, treeIDChan, treeJobChan1, treeJobChan2)
wg.Wait()
}
@ -646,15 +657,13 @@ func (c *Checker) checkTree(id restic.ID, tree *restic.Tree) (errs []error) {
for _, blobID := range blobs {
c.blobRefs.Lock()
c.blobRefs.M[blobID] = true
debug.Log("blob %v is referenced", blobID)
c.blobRefs.Unlock()
if !c.blobs.Has(blobID) {
if (c.blobRefs.M[blobID] & blobExists) == 0 {
debug.Log("tree %v references blob %v which isn't contained in index", id, blobID)
errs = append(errs, Error{TreeID: id, BlobID: blobID, Err: errors.New("not found in index")})
}
c.blobRefs.M[blobID] |= blobReferenced
debug.Log("blob %v is referenced", blobID)
c.blobRefs.Unlock()
}
return errs
@ -665,9 +674,9 @@ func (c *Checker) UnusedBlobs() (blobs restic.IDs) {
c.blobRefs.Lock()
defer c.blobRefs.Unlock()
debug.Log("checking %d blobs", len(c.blobs))
for id := range c.blobs {
if !c.blobRefs.M[id] {
debug.Log("checking %d blobs", len(c.blobRefs.M))
for id, flags := range c.blobRefs.M {
if (flags & blobReferenced) == 0 {
debug.Log("blob %v not referenced", id)
blobs = append(blobs, id)
}