index: Add Each() to MasterIndex

This commit is contained in:
Alexander Neumann 2017-06-18 14:45:02 +02:00
parent f31e993f09
commit f676c0c41b
6 changed files with 47 additions and 11 deletions

View file

@ -142,7 +142,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
debug.Log("process blobs") debug.Log("process blobs")
cnt := 0 cnt := 0
for blob := range res.Index.Each(done) { for blob := range res.Index.Each(ctx) {
c.packs.Insert(blob.PackID) c.packs.Insert(blob.PackID)
c.blobs.Insert(blob.ID) c.blobs.Insert(blob.ID)
c.blobRefs.M[blob.ID] = 0 c.blobRefs.M[blob.ID] = 0

View file

@ -28,7 +28,7 @@ type BlobSizeCache struct {
func NewBlobSizeCache(midx *repository.MasterIndex) *BlobSizeCache { func NewBlobSizeCache(midx *repository.MasterIndex) *BlobSizeCache {
m := make(map[restic.ID]uint, 1000) m := make(map[restic.ID]uint, 1000)
for _, idx := range midx.All() { for _, idx := range midx.All() {
for pb := range idx.Each(nil) { for pb := range idx.Each(context.TODO()) {
m[pb.ID] = pb.Length m[pb.ID] = pb.Length
} }
} }

View file

@ -58,4 +58,9 @@ type Index interface {
Has(ID, BlobType) bool Has(ID, BlobType) bool
Lookup(ID, BlobType) ([]PackedBlob, error) Lookup(ID, BlobType) ([]PackedBlob, error)
Count(BlobType) uint Count(BlobType) uint
// Each returns a channel that yields all blobs known to the index. When
// the context is cancelled, the background goroutine terminates. This
// blocks any modification of the index.
Each(ctx context.Context) <-chan PackedBlob
} }

View file

@ -206,10 +206,10 @@ func (idx *Index) AddToSupersedes(ids ...restic.ID) error {
return nil return nil
} }
// Each returns a channel that yields all blobs known to the index. If done is // Each returns a channel that yields all blobs known to the index. When the
// closed, the background goroutine terminates. This blocks any modification of // context is cancelled, the background goroutine terminates. This blocks any
// the index. // modification of the index.
func (idx *Index) Each(done chan struct{}) <-chan restic.PackedBlob { func (idx *Index) Each(ctx context.Context) <-chan restic.PackedBlob {
idx.m.Lock() idx.m.Lock()
ch := make(chan restic.PackedBlob) ch := make(chan restic.PackedBlob)
@ -223,7 +223,7 @@ func (idx *Index) Each(done chan struct{}) <-chan restic.PackedBlob {
for h, packs := range idx.pack { for h, packs := range idx.pack {
for _, blob := range packs { for _, blob := range packs {
select { select {
case <-done: case <-ctx.Done():
return return
case ch <- restic.PackedBlob{ case ch <- restic.PackedBlob{
Blob: restic.Blob{ Blob: restic.Blob{

View file

@ -1,6 +1,7 @@
package repository package repository
import ( import (
"context"
"restic" "restic"
"sync" "sync"
@ -188,6 +189,35 @@ func (mi *MasterIndex) All() []*Index {
return mi.idx return mi.idx
} }
// Each returns a channel that yields all blobs known to the index. When the
// context is cancelled, the background goroutine terminates. This blocks any
// modification of the index.
func (mi *MasterIndex) Each(ctx context.Context) <-chan restic.PackedBlob {
mi.idxMutex.RLock()
ch := make(chan restic.PackedBlob)
go func() {
defer mi.idxMutex.RUnlock()
defer func() {
close(ch)
}()
for _, idx := range mi.idx {
idxCh := idx.Each(ctx)
for pb := range idxCh {
select {
case <-ctx.Done():
return
case ch <- pb:
}
}
}
}()
return ch
}
// RebuildIndex combines all known indexes to a new index, leaving out any // RebuildIndex combines all known indexes to a new index, leaving out any
// packs whose ID is contained in packBlacklist. The new index contains the IDs // packs whose ID is contained in packBlacklist. The new index contains the IDs
// of all known indexes in the "supersedes" field. // of all known indexes in the "supersedes" field.
@ -198,13 +228,14 @@ func (mi *MasterIndex) RebuildIndex(packBlacklist restic.IDSet) (*Index, error)
debug.Log("start rebuilding index of %d indexes, pack blacklist: %v", len(mi.idx), packBlacklist) debug.Log("start rebuilding index of %d indexes, pack blacklist: %v", len(mi.idx), packBlacklist)
newIndex := NewIndex() newIndex := NewIndex()
done := make(chan struct{})
defer close(done) ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
for i, idx := range mi.idx { for i, idx := range mi.idx {
debug.Log("adding index %d", i) debug.Log("adding index %d", i)
for pb := range idx.Each(done) { for pb := range idx.Each(ctx) {
if packBlacklist.Has(pb.PackID) { if packBlacklist.Has(pb.PackID) {
continue continue
} }

View file

@ -373,7 +373,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) {
idx, err := repository.LoadIndex(context.TODO(), repo, id) idx, err := repository.LoadIndex(context.TODO(), repo, id)
OK(t, err) OK(t, err)
for pb := range idx.Each(nil) { for pb := range idx.Each(context.TODO()) {
if _, ok := packEntries[pb.PackID]; !ok { if _, ok := packEntries[pb.PackID]; !ok {
packEntries[pb.PackID] = make(map[restic.ID]struct{}) packEntries[pb.PackID] = make(map[restic.ID]struct{})
} }