repository: optimize MasterIndex.Each

Sending data through a channel at very high frequency is extremely
inefficient. Thus use simple callbacks instead of channels.

> name                old time/op  new time/op  delta
> MasterIndexEach-16   6.68s ±24%   0.96s ± 2%  -85.64%  (p=0.008 n=5+5)
This commit is contained in:
Michael Eischer 2022-08-19 20:04:39 +02:00
parent 825b95e313
commit 1ebd57247a
14 changed files with 68 additions and 91 deletions

View file

@ -467,7 +467,7 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc
// remember which packs were found in the index // remember which packs were found in the index
indexPackIDs := make(map[string]struct{}) indexPackIDs := make(map[string]struct{})
for pb := range f.repo.Index().Each(wctx) { f.repo.Index().Each(wctx, func(pb restic.PackedBlob) {
idStr := pb.PackID.String() idStr := pb.PackID.String()
// keep entry in packIDs as Each() returns individual index entries // keep entry in packIDs as Each() returns individual index entries
matchingID := false matchingID := false
@ -485,7 +485,7 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc
f.blobIDs[pb.ID.String()] = struct{}{} f.blobIDs[pb.ID.String()] = struct{}{}
indexPackIDs[idStr] = struct{}{} indexPackIDs[idStr] = struct{}{}
} }
} })
for id := range indexPackIDs { for id := range indexPackIDs {
delete(packIDs, id) delete(packIDs, id)

View file

@ -64,9 +64,9 @@ func runList(cmd *cobra.Command, opts GlobalOptions, args []string) error {
if err != nil { if err != nil {
return err return err
} }
for blobs := range idx.Each(opts.ctx) { idx.Each(opts.ctx, func(blobs restic.PackedBlob) {
Printf("%v %v\n", blobs.Type, blobs.ID) Printf("%v %v\n", blobs.Type, blobs.ID)
} })
return nil return nil
}) })
default: default:

View file

@ -279,12 +279,12 @@ func planPrune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, i
if len(plan.repackPacks) != 0 { if len(plan.repackPacks) != 0 {
// when repacking, we do not want to keep blobs which are // when repacking, we do not want to keep blobs which are
// already contained in kept packs, so delete them from keepBlobs // already contained in kept packs, so delete them from keepBlobs
for blob := range repo.Index().Each(ctx) { repo.Index().Each(ctx, func(blob restic.PackedBlob) {
if plan.removePacks.Has(blob.PackID) || plan.repackPacks.Has(blob.PackID) { if plan.removePacks.Has(blob.PackID) || plan.repackPacks.Has(blob.PackID) {
continue return
} }
keepBlobs.Delete(blob.BlobHandle) keepBlobs.Delete(blob.BlobHandle)
} })
} else { } else {
// keepBlobs is only needed if packs are repacked // keepBlobs is only needed if packs are repacked
keepBlobs = nil keepBlobs = nil
@ -299,7 +299,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
duplicateBlobs := make(map[restic.BlobHandle]uint8) duplicateBlobs := make(map[restic.BlobHandle]uint8)
// iterate over all blobs in index to find out which blobs are duplicates // iterate over all blobs in index to find out which blobs are duplicates
for blob := range idx.Each(ctx) { idx.Each(ctx, func(blob restic.PackedBlob) {
bh := blob.BlobHandle bh := blob.BlobHandle
size := uint64(blob.Length) size := uint64(blob.Length)
switch { switch {
@ -325,7 +325,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
stats.size.unused += size stats.size.unused += size
stats.blobs.unused++ stats.blobs.unused++
} }
} })
// Check if all used blobs have been found in index // Check if all used blobs have been found in index
if len(usedBlobs) != 0 { if len(usedBlobs) != 0 {
@ -346,7 +346,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
} }
// iterate over all blobs in index to generate packInfo // iterate over all blobs in index to generate packInfo
for blob := range idx.Each(ctx) { idx.Each(ctx, func(blob restic.PackedBlob) {
ip := indexPack[blob.PackID] ip := indexPack[blob.PackID]
// Set blob type if not yet set // Set blob type if not yet set
@ -376,7 +376,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
} }
// update indexPack // update indexPack
indexPack[blob.PackID] = ip indexPack[blob.PackID] = ip
} })
// if duplicate blobs exist, those will be set to either "used" or "unused": // if duplicate blobs exist, those will be set to either "used" or "unused":
// - mark only one occurence of duplicate blobs as used // - mark only one occurence of duplicate blobs as used
@ -384,11 +384,11 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// - if there are no used blobs in a pack, possibly mark duplicates as "unused" // - if there are no used blobs in a pack, possibly mark duplicates as "unused"
if len(duplicateBlobs) > 0 { if len(duplicateBlobs) > 0 {
// iterate again over all blobs in index (this is pretty cheap, all in-mem) // iterate again over all blobs in index (this is pretty cheap, all in-mem)
for blob := range idx.Each(ctx) { idx.Each(ctx, func(blob restic.PackedBlob) {
bh := blob.BlobHandle bh := blob.BlobHandle
count, isDuplicate := duplicateBlobs[bh] count, isDuplicate := duplicateBlobs[bh]
if !isDuplicate { if !isDuplicate {
continue return
} }
ip := indexPack[blob.PackID] ip := indexPack[blob.PackID]
@ -412,7 +412,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
} }
// update indexPack // update indexPack
indexPack[blob.PackID] = ip indexPack[blob.PackID] = ip
} })
} }
return keepBlobs, indexPack, nil return keepBlobs, indexPack, nil

View file

@ -66,11 +66,11 @@ func runRecover(gopts GlobalOptions) error {
// tree. If it is not referenced, we have a root tree. // tree. If it is not referenced, we have a root tree.
trees := make(map[restic.ID]bool) trees := make(map[restic.ID]bool)
for blob := range repo.Index().Each(gopts.ctx) { repo.Index().Each(gopts.ctx, func(blob restic.PackedBlob) {
if blob.Type == restic.TreeBlob { if blob.Type == restic.TreeBlob {
trees[blob.Blob.ID] = false trees[blob.Blob.ID] = false
} }
} })
Verbosef("load %d trees\n", len(trees)) Verbosef("load %d trees\n", len(trees))
bar := newProgressMax(!gopts.Quiet, uint64(len(trees)), "trees loaded") bar := newProgressMax(!gopts.Quiet, uint64(len(trees)), "trees loaded")

View file

@ -444,11 +444,11 @@ func removePacksExcept(gopts GlobalOptions, t *testing.T, keep restic.IDSet, rem
rtest.OK(t, r.LoadIndex(gopts.ctx)) rtest.OK(t, r.LoadIndex(gopts.ctx))
treePacks := restic.NewIDSet() treePacks := restic.NewIDSet()
for pb := range r.Index().Each(context.TODO()) { r.Index().Each(context.TODO(), func(pb restic.PackedBlob) {
if pb.Type == restic.TreeBlob { if pb.Type == restic.TreeBlob {
treePacks.Insert(pb.PackID) treePacks.Insert(pb.PackID)
} }
} })
// remove all packs containing data blobs // remove all packs containing data blobs
rtest.OK(t, r.List(gopts.ctx, restic.PackFile, func(id restic.ID, size int64) error { rtest.OK(t, r.List(gopts.ctx, restic.PackFile, func(id restic.ID, size int64) error {
@ -506,11 +506,11 @@ func TestBackupTreeLoadError(t *testing.T) {
rtest.OK(t, err) rtest.OK(t, err)
rtest.OK(t, r.LoadIndex(env.gopts.ctx)) rtest.OK(t, r.LoadIndex(env.gopts.ctx))
treePacks := restic.NewIDSet() treePacks := restic.NewIDSet()
for pb := range r.Index().Each(context.TODO()) { r.Index().Each(context.TODO(), func(pb restic.PackedBlob) {
if pb.Type == restic.TreeBlob { if pb.Type == restic.TreeBlob {
treePacks.Insert(pb.PackID) treePacks.Insert(pb.PackID)
} }
} })
testRunBackup(t, filepath.Dir(env.testdata), []string{filepath.Base(env.testdata)}, opts, env.gopts) testRunBackup(t, filepath.Dir(env.testdata), []string{filepath.Base(env.testdata)}, opts, env.gopts)
testRunCheck(t, env.gopts) testRunCheck(t, env.gopts)

View file

@ -124,14 +124,14 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
debug.Log("process blobs") debug.Log("process blobs")
cnt := 0 cnt := 0
for blob := range index.Each(ctx) { index.Each(ctx, func(blob restic.PackedBlob) {
cnt++ cnt++
if _, ok := packToIndex[blob.PackID]; !ok { if _, ok := packToIndex[blob.PackID]; !ok {
packToIndex[blob.PackID] = restic.NewIDSet() packToIndex[blob.PackID] = restic.NewIDSet()
} }
packToIndex[blob.PackID].Insert(id) packToIndex[blob.PackID].Insert(id)
} })
debug.Log("%d blobs processed", cnt) debug.Log("%d blobs processed", cnt)
return nil return nil
@ -458,13 +458,13 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
for blob := range c.repo.Index().Each(ctx) { c.repo.Index().Each(ctx, func(blob restic.PackedBlob) {
h := restic.BlobHandle{ID: blob.ID, Type: blob.Type} h := restic.BlobHandle{ID: blob.ID, Type: blob.Type}
if !c.blobRefs.M.Has(h) { if !c.blobRefs.M.Has(h) {
debug.Log("blob %v not referenced", h) debug.Log("blob %v not referenced", h)
blobs = append(blobs, h) blobs = append(blobs, h)
} }
} })
return blobs return blobs
} }

View file

@ -370,7 +370,7 @@ func CalculateHeaderSize(blobs []restic.Blob) int {
func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.ID]int64 { func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.ID]int64 {
packSize := make(map[restic.ID]int64) packSize := make(map[restic.ID]int64)
for blob := range mi.Each(ctx) { mi.Each(ctx, func(blob restic.PackedBlob) {
size, ok := packSize[blob.PackID] size, ok := packSize[blob.PackID]
if !ok { if !ok {
size = headerSize size = headerSize
@ -379,7 +379,7 @@ func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.I
size += int64(blob.Length) size += int64(blob.Length)
} }
packSize[blob.PackID] = size + int64(CalculateEntrySize(blob.Blob)) packSize[blob.PackID] = size + int64(CalculateEntrySize(blob.Blob))
} })
return packSize return packSize
} }

View file

@ -217,34 +217,22 @@ func (idx *Index) AddToSupersedes(ids ...restic.ID) error {
return nil return nil
} }
// Each returns a channel that yields all blobs known to the index. When the // Each passes all blobs known to the index to the callback fn. This blocks any
// context is cancelled, the background goroutine terminates. This blocks any
// modification of the index. // modification of the index.
func (idx *Index) Each(ctx context.Context) <-chan restic.PackedBlob { func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) {
idx.m.Lock() idx.m.Lock()
ch := make(chan restic.PackedBlob)
go func() {
defer idx.m.Unlock() defer idx.m.Unlock()
defer func() {
close(ch)
}()
for typ := range idx.byType { for typ := range idx.byType {
m := &idx.byType[typ] m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool { m.foreach(func(e *indexEntry) bool {
select { if ctx.Err() != nil {
case <-ctx.Done():
return false return false
case ch <- idx.toPackedBlob(e, restic.BlobType(typ)):
return true
} }
fn(idx.toPackedBlob(e, restic.BlobType(typ)))
return true
}) })
} }
}()
return ch
} }
type EachByPackResult struct { type EachByPackResult struct {

View file

@ -355,11 +355,11 @@ func TestIndexUnserialize(t *testing.T) {
} }
func listPack(idx *repository.Index, id restic.ID) (pbs []restic.PackedBlob) { func listPack(idx *repository.Index, id restic.ID) (pbs []restic.PackedBlob) {
for pb := range idx.Each(context.TODO()) { idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if pb.PackID.Equal(id) { if pb.PackID.Equal(id) {
pbs = append(pbs, pb) pbs = append(pbs, pb)
} }
} })
return pbs return pbs
} }

View file

@ -234,30 +234,15 @@ func (mi *MasterIndex) finalizeFullIndexes() []*Index {
return list return list
} }
// Each returns a channel that yields all blobs known to the index. When the // Each runs fn on all blobs known to the index. When the context is cancelled,
// context is cancelled, the background goroutine terminates. This blocks any // the index iteration return immediately. This blocks any modification of the index.
// modification of the index. func (mi *MasterIndex) Each(ctx context.Context, fn func(restic.PackedBlob)) {
func (mi *MasterIndex) Each(ctx context.Context) <-chan restic.PackedBlob {
mi.idxMutex.RLock() mi.idxMutex.RLock()
ch := make(chan restic.PackedBlob)
go func() {
defer mi.idxMutex.RUnlock() defer mi.idxMutex.RUnlock()
defer close(ch)
for _, idx := range mi.idx { for _, idx := range mi.idx {
for pb := range idx.Each(ctx) { idx.Each(ctx, fn)
select {
case <-ctx.Done():
return
case ch <- pb:
} }
}
}
}()
return ch
} }
// MergeFinalIndexes merges all final indexes together. // MergeFinalIndexes merges all final indexes together.
@ -450,11 +435,11 @@ func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan
if len(packBlob) == 0 { if len(packBlob) == 0 {
continue continue
} }
for pb := range mi.Each(ctx) { mi.Each(ctx, func(pb restic.PackedBlob) {
if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i { if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i {
packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob) packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob)
} }
} })
// pass on packs // pass on packs
for packID, pbs := range packBlob { for packID, pbs := range packBlob {

View file

@ -163,9 +163,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
rtest.Equals(t, 1, idxCount) rtest.Equals(t, 1, idxCount)
blobCount := 0 blobCount := 0
for range mIdx.Each(context.TODO()) { mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobCount++ blobCount++
} })
rtest.Equals(t, 2, blobCount) rtest.Equals(t, 2, blobCount)
blobs := mIdx.Lookup(bhInIdx1) blobs := mIdx.Lookup(bhInIdx1)
@ -195,9 +195,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
rtest.Equals(t, []restic.PackedBlob{blob2}, blobs) rtest.Equals(t, []restic.PackedBlob{blob2}, blobs)
blobCount = 0 blobCount = 0
for range mIdx.Each(context.TODO()) { mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobCount++ blobCount++
} })
rtest.Equals(t, 2, blobCount) rtest.Equals(t, 2, blobCount)
} }
@ -316,9 +316,9 @@ func BenchmarkMasterIndexEach(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
entries := 0 entries := 0
for _ = range mIdx.Each(context.TODO()) { mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
entries++ entries++
} })
} }
} }

View file

@ -595,10 +595,15 @@ func (r *Repository) LoadIndex(ctx context.Context) error {
// sanity check // sanity check
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
for blob := range r.idx.Each(ctx) {
invalidIndex := false
r.idx.Each(ctx, func(blob restic.PackedBlob) {
if blob.IsCompressed() { if blob.IsCompressed() {
return errors.Fatal("index uses feature not supported by repository version 1") invalidIndex = true
} }
})
if invalidIndex {
return errors.Fatal("index uses feature not supported by repository version 1")
} }
} }

View file

@ -362,13 +362,13 @@ func testRepositoryIncrementalIndex(t *testing.T, version uint) {
idx, err := loadIndex(context.TODO(), repo, id) idx, err := loadIndex(context.TODO(), repo, id)
rtest.OK(t, err) rtest.OK(t, err)
for pb := range idx.Each(context.TODO()) { idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if _, ok := packEntries[pb.PackID]; !ok { if _, ok := packEntries[pb.PackID]; !ok {
packEntries[pb.PackID] = make(map[restic.ID]struct{}) packEntries[pb.PackID] = make(map[restic.ID]struct{})
} }
packEntries[pb.PackID][id] = struct{}{} packEntries[pb.PackID][id] = struct{}{}
} })
return nil return nil
}) })
if err != nil { if err != nil {

View file

@ -83,10 +83,9 @@ type MasterIndex interface {
Has(BlobHandle) bool Has(BlobHandle) bool
Lookup(BlobHandle) []PackedBlob Lookup(BlobHandle) []PackedBlob
// Each returns a channel that yields all blobs known to the index. When // Each runs fn on all blobs known to the index. When the context is cancelled,
// the context is cancelled, the background goroutine terminates. This // the index iteration return immediately. This blocks any modification of the index.
// blocks any modification of the index. Each(ctx context.Context, fn func(PackedBlob))
Each(ctx context.Context) <-chan PackedBlob
ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs
Save(ctx context.Context, repo SaverUnpacked, packBlacklist IDSet, extraObsolete IDs, p *progress.Counter) (obsolete IDSet, err error) Save(ctx context.Context, repo SaverUnpacked, packBlacklist IDSet, extraObsolete IDs, p *progress.Counter) (obsolete IDSet, err error)