Merge final indexes together for faster index access

This commit is contained in:
Alexander Weiss 2020-07-04 07:06:14 +02:00 committed by Michael Eischer
parent 3b7a3711e6
commit e388d962a5
5 changed files with 90 additions and 20 deletions

View file

@ -190,6 +190,8 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
} }
} }
c.masterIndex.MergeFinalIndexes()
err = c.repo.SetIndex(c.masterIndex) err = c.repo.SetIndex(c.masterIndex)
if err != nil { if err != nil {
debug.Log("SetIndex returned error: %v", err) debug.Log("SetIndex returned error: %v", err)

View file

@ -50,7 +50,7 @@ type Index struct {
packIDToIndex map[restic.ID]int packIDToIndex map[restic.ID]int
final bool // set to true for all indexes read from the backend ("finalized") final bool // set to true for all indexes read from the backend ("finalized")
id restic.ID // set to the ID of the index when it's finalized ids restic.IDs // set to the IDs of the contained finalized indexes
supersedes restic.IDs supersedes restic.IDs
created time.Time created time.Time
} }
@ -393,17 +393,17 @@ func (idx *Index) Finalize() {
idx.packIDToIndex = nil idx.packIDToIndex = nil
} }
// ID returns the ID of the index, if available. If the index is not yet // ID returns the IDs of the index, if available. If the index is not yet
// finalized, an error is returned. // finalized, an error is returned.
func (idx *Index) ID() (restic.ID, error) { func (idx *Index) IDs() (restic.IDs, error) {
idx.m.Lock() idx.m.Lock()
defer idx.m.Unlock() defer idx.m.Unlock()
if !idx.final { if !idx.final {
return restic.ID{}, errors.New("index not finalized") return nil, errors.New("index not finalized")
} }
return idx.id, nil return idx.ids, nil
} }
// SetID sets the ID the index has been written to. This requires that // SetID sets the ID the index has been written to. This requires that
@ -416,12 +416,12 @@ func (idx *Index) SetID(id restic.ID) error {
return errors.New("index is not final") return errors.New("index is not final")
} }
if !idx.id.IsNull() { if len(idx.ids) > 0 {
return errors.New("ID already set") return errors.New("ID already set")
} }
debug.Log("ID set to %v", id) debug.Log("ID set to %v", id)
idx.id = id idx.ids = append(idx.ids, id)
return nil return nil
} }
@ -462,6 +462,38 @@ func (idx *Index) TreePacks() restic.IDs {
return idx.treePacks return idx.treePacks
} }
// merge() merges indexes, i.e. idx.merge(idx2) merges the contents of idx2 into idx.
// idx2 is not changed by this method.
func (idx *Index) merge(idx2 *Index) error {
idx.m.Lock()
defer idx.m.Unlock()
idx2.m.Lock()
defer idx2.m.Unlock()
if !idx2.final {
return errors.New("index to merge is not final!")
}
packlen := len(idx.packs)
// copy all index entries of idx2 to idx
for typ := range idx2.byType {
m2 := &idx2.byType[typ]
m := &idx.byType[typ]
m2.foreach(func(entry *indexEntry) bool {
// packIndex is changed as idx2.pack is appended to idx.pack, see below
m.add(entry.id, entry.packIndex+packlen, entry.offset, entry.length)
return true
})
}
idx.packs = append(idx.packs, idx2.packs...)
idx.treePacks = append(idx.treePacks, idx2.treePacks...)
idx.ids = append(idx.ids, idx2.ids...)
idx.supersedes = append(idx.supersedes, idx2.supersedes...)
return nil
}
// isErrOldIndex returns true if the error may be caused by an old index // isErrOldIndex returns true if the error may be caused by an old index
// format. // format.
func isErrOldIndex(err error) bool { func isErrOldIndex(err error) bool {
@ -581,7 +613,7 @@ func LoadIndexWithDecoder(ctx context.Context, repo restic.Repository, buf []byt
return nil, buf[:0], err return nil, buf[:0], err
} }
idx.id = id idx.ids = append(idx.ids, id)
return idx, buf, nil return idx, buf, nil
} }

View file

@ -135,10 +135,9 @@ func TestIndexSerialize(t *testing.T) {
id := restic.NewRandomID() id := restic.NewRandomID()
rtest.OK(t, idx.SetID(id)) rtest.OK(t, idx.SetID(id))
id2, err := idx.ID() ids, err := idx.IDs()
rtest.OK(t, err) rtest.OK(t, err)
rtest.Assert(t, id2.Equal(id), rtest.Equals(t, restic.IDs{id}, ids)
"wrong ID returned: want %v, got %v", id, id2)
idx3, err := repository.DecodeIndex(wr3.Bytes()) idx3, err := repository.DecodeIndex(wr3.Bytes())
rtest.OK(t, err) rtest.OK(t, err)

View file

@ -18,7 +18,12 @@ type MasterIndex struct {
// NewMasterIndex creates a new master index. // NewMasterIndex creates a new master index.
func NewMasterIndex() *MasterIndex { func NewMasterIndex() *MasterIndex {
return &MasterIndex{pendingBlobs: restic.NewBlobSet()} // Always add an empty final index, such that MergeFinalIndexes can merge into this.
// Note that removing this index could lead to a race condition in the rare
// sitation that only two indexes exist which are saved and merged concurrently.
idx := []*Index{NewIndex()}
idx[0].Finalize()
return &MasterIndex{idx: idx, pendingBlobs: restic.NewBlobSet()}
} }
// Lookup queries all known Indexes for the ID and returns the first match. // Lookup queries all known Indexes for the ID and returns the first match.
@ -237,6 +242,31 @@ func (mi *MasterIndex) Each(ctx context.Context) <-chan restic.PackedBlob {
return ch return ch
} }
// MergeFinalIndexes merges all final indexes together.
// After calling, there will be only one big final index in MasterIndex
// containing all final index contents.
// Indexes that are not final are left untouched.
// This merging can only be called after all index files are loaded - as
// removing of superseded index contents is only possible for unmerged indexes.
func (mi *MasterIndex) MergeFinalIndexes() {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// The first index is always final and the one to merge into
newIdx := mi.idx[:1]
for i := 1; i < len(mi.idx); i++ {
idx := mi.idx[i]
// clear reference in masterindex as it may become stale
mi.idx[i] = nil
if !idx.Final() {
newIdx = append(newIdx, idx)
} else {
mi.idx[0].merge(idx)
}
}
mi.idx = newIdx
}
// RebuildIndex combines all known indexes to a new index, leaving out any // RebuildIndex combines all known indexes to a new index, leaving out any
// packs whose ID is contained in packBlacklist. The new index contains the IDs // packs whose ID is contained in packBlacklist. The new index contains the IDs
// of all known indexes in the "supersedes" field. // of all known indexes in the "supersedes" field.
@ -267,15 +297,15 @@ func (mi *MasterIndex) RebuildIndex(packBlacklist restic.IDSet) (*Index, error)
continue continue
} }
id, err := idx.ID() ids, err := idx.IDs()
if err != nil { if err != nil {
debug.Log("index %d does not have an ID: %v", err) debug.Log("index %d does not have an ID: %v", err)
return nil, err return nil, err
} }
debug.Log("adding index id %v to supersedes field", id) debug.Log("adding index ids %v to supersedes field", ids)
err = newIndex.AddToSupersedes(id) err = newIndex.AddToSupersedes(ids...)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -361,13 +361,15 @@ func (r *Repository) SetIndex(i restic.Index) error {
ids := restic.NewIDSet() ids := restic.NewIDSet()
for _, idx := range r.idx.All() { for _, idx := range r.idx.All() {
id, err := idx.ID() indexIDs, err := idx.IDs()
if err != nil { if err != nil {
debug.Log("not using index, ID() returned error %v", err) debug.Log("not using index, ID() returned error %v", err)
continue continue
} }
for _, id := range indexIDs {
ids.Insert(id) ids.Insert(id)
} }
}
return r.PrepareCache(ids) return r.PrepareCache(ids)
} }
@ -396,6 +398,7 @@ func (r *Repository) saveIndex(ctx context.Context, indexes ...*Index) error {
debug.Log("Saved index %d as %v", i, sid) debug.Log("Saved index %d as %v", i, sid)
} }
r.idx.MergeFinalIndexes()
return nil return nil
} }
@ -479,12 +482,16 @@ func (r *Repository) LoadIndex(ctx context.Context) error {
validIndex := restic.NewIDSet() validIndex := restic.NewIDSet()
wg.Go(func() error { wg.Go(func() error {
for idx := range indexCh { for idx := range indexCh {
id, err := idx.ID() ids, err := idx.IDs()
if err == nil { if err == nil {
for _, id := range ids {
validIndex.Insert(id) validIndex.Insert(id)
} }
}
r.idx.Insert(idx) r.idx.Insert(idx)
} }
r.idx.MergeFinalIndexes()
return nil return nil
}) })