restic/internal/repository/master_index.go

357 lines
8.4 KiB
Go
Raw Normal View History

2015-10-12 20:34:12 +00:00
package repository
import (
2017-06-18 12:45:02 +00:00
"context"
2015-10-12 20:34:12 +00:00
"sync"
2017-07-23 12:21:03 +00:00
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
2020-11-12 01:49:53 +00:00
"golang.org/x/sync/errgroup"
2015-10-12 20:34:12 +00:00
)
// MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved.
type MasterIndex struct {
idx []*Index
pendingBlobs restic.BlobSet
idxMutex sync.RWMutex
2015-10-12 20:34:12 +00:00
}
// NewMasterIndex creates a new master index.
func NewMasterIndex() *MasterIndex {
// Always add an empty final index, such that MergeFinalIndexes can merge into this.
// Note that removing this index could lead to a race condition in the rare
// sitation that only two indexes exist which are saved and merged concurrently.
idx := []*Index{NewIndex()}
idx[0].Finalize()
return &MasterIndex{idx: idx, pendingBlobs: restic.NewBlobSet()}
2015-10-12 20:34:12 +00:00
}
// Lookup queries all known Indexes for the ID and returns all matches.
func (mi *MasterIndex) Lookup(id restic.ID, tpe restic.BlobType) (blobs []restic.PackedBlob) {
2015-10-12 20:34:12 +00:00
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
blobs = idx.Lookup(id, tpe, blobs)
2015-10-12 20:34:12 +00:00
}
return blobs
2015-10-12 20:34:12 +00:00
}
// LookupSize queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) LookupSize(id restic.ID, tpe restic.BlobType) (uint, bool) {
2015-10-12 20:34:12 +00:00
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
if size, found := idx.LookupSize(id, tpe); found {
return size, found
2015-10-12 20:34:12 +00:00
}
}
return 0, false
2015-10-12 20:34:12 +00:00
}
// AddPending adds a given blob to list of pending Blobs
// Before doing so it checks if this blob is already known.
// Returns true if adding was successful and false if the blob
// was already known
func (mi *MasterIndex) addPending(id restic.ID, tpe restic.BlobType) bool {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// Check if blob is pending or in index
if mi.pendingBlobs.Has(restic.BlobHandle{ID: id, Type: tpe}) {
return false
}
for _, idx := range mi.idx {
if idx.Has(id, tpe) {
return false
}
}
// really not known -> insert
mi.pendingBlobs.Insert(restic.BlobHandle{ID: id, Type: tpe})
return true
}
2015-10-12 20:34:12 +00:00
// Has queries all known Indexes for the ID and returns the first match.
// Also returns true if the ID is pending.
2016-08-31 18:58:57 +00:00
func (mi *MasterIndex) Has(id restic.ID, tpe restic.BlobType) bool {
2015-10-12 20:34:12 +00:00
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
// also return true if blob is pending
if mi.pendingBlobs.Has(restic.BlobHandle{ID: id, Type: tpe}) {
return true
}
2015-10-12 20:34:12 +00:00
for _, idx := range mi.idx {
if idx.Has(id, tpe) {
2015-10-12 20:34:12 +00:00
return true
}
}
return false
}
// Packs returns all packs that are covered by the index.
func (mi *MasterIndex) Packs() restic.IDSet {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
packs := restic.NewIDSet()
for _, idx := range mi.idx {
packs.Merge(idx.Packs())
}
return packs
}
2015-10-12 20:34:12 +00:00
// Count returns the number of blobs of type t in the index.
2016-08-31 18:58:57 +00:00
func (mi *MasterIndex) Count(t restic.BlobType) (n uint) {
2015-10-12 20:34:12 +00:00
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
var sum uint
for _, idx := range mi.idx {
sum += idx.Count(t)
}
return sum
}
// Insert adds a new index to the MasterIndex.
func (mi *MasterIndex) Insert(idx *Index) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
mi.idx = append(mi.idx, idx)
}
// StorePack remembers the id and pack in the index.
func (mi *MasterIndex) StorePack(id restic.ID, blobs []restic.Blob) {
2017-10-07 12:11:42 +00:00
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// delete blobs from pending
for _, blob := range blobs {
mi.pendingBlobs.Delete(restic.BlobHandle{Type: blob.Type, ID: blob.ID})
}
2015-10-12 20:34:12 +00:00
for _, idx := range mi.idx {
if !idx.Final() {
idx.StorePack(id, blobs)
return
2015-10-12 20:34:12 +00:00
}
}
newIdx := NewIndex()
newIdx.StorePack(id, blobs)
2015-10-12 20:34:12 +00:00
mi.idx = append(mi.idx, newIdx)
}
// FinalizeNotFinalIndexes finalizes all indexes that
// have not yet been saved and returns that list
func (mi *MasterIndex) FinalizeNotFinalIndexes() []*Index {
2015-10-12 20:34:12 +00:00
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
for _, idx := range mi.idx {
if !idx.Final() {
idx.Finalize()
2015-10-12 20:34:12 +00:00
list = append(list, idx)
}
}
2016-09-27 20:35:08 +00:00
debug.Log("return %d indexes", len(list))
2015-10-12 21:59:17 +00:00
return list
}
// FinalizeFullIndexes finalizes all indexes that are full and returns that list.
func (mi *MasterIndex) FinalizeFullIndexes() []*Index {
2015-10-12 21:59:17 +00:00
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
2016-09-27 20:35:08 +00:00
debug.Log("checking %d indexes", len(mi.idx))
2015-10-12 21:59:17 +00:00
for _, idx := range mi.idx {
if idx.Final() {
2016-09-27 20:35:08 +00:00
debug.Log("index %p is final", idx)
2015-10-12 21:59:17 +00:00
continue
}
if IndexFull(idx) {
2016-09-27 20:35:08 +00:00
debug.Log("index %p is full", idx)
idx.Finalize()
2015-10-12 21:59:17 +00:00
list = append(list, idx)
} else {
2016-09-27 20:35:08 +00:00
debug.Log("index %p not full", idx)
2015-10-12 21:59:17 +00:00
}
}
2016-09-27 20:35:08 +00:00
debug.Log("return %d indexes", len(list))
2015-10-12 20:34:12 +00:00
return list
}
// All returns all indexes.
func (mi *MasterIndex) All() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
return mi.idx
}
2015-11-02 18:28:30 +00:00
2017-06-18 12:45:02 +00:00
// Each returns a channel that yields all blobs known to the index. When the
// context is cancelled, the background goroutine terminates. This blocks any
// modification of the index.
func (mi *MasterIndex) Each(ctx context.Context) <-chan restic.PackedBlob {
mi.idxMutex.RLock()
ch := make(chan restic.PackedBlob)
go func() {
defer mi.idxMutex.RUnlock()
defer func() {
close(ch)
}()
for _, idx := range mi.idx {
idxCh := idx.Each(ctx)
for pb := range idxCh {
select {
case <-ctx.Done():
return
case ch <- pb:
}
}
}
}()
return ch
}
// MergeFinalIndexes merges all final indexes together.
// After calling, there will be only one big final index in MasterIndex
// containing all final index contents.
// Indexes that are not final are left untouched.
// This merging can only be called after all index files are loaded - as
// removing of superseded index contents is only possible for unmerged indexes.
func (mi *MasterIndex) MergeFinalIndexes() {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// The first index is always final and the one to merge into
newIdx := mi.idx[:1]
for i := 1; i < len(mi.idx); i++ {
idx := mi.idx[i]
// clear reference in masterindex as it may become stale
mi.idx[i] = nil
if !idx.Final() {
newIdx = append(newIdx, idx)
} else {
mi.idx[0].merge(idx)
}
}
mi.idx = newIdx
}
2020-11-12 01:49:53 +00:00
const saveIndexParallelism = 4
// Save saves all known indexes to index files, leaving out any
2020-11-12 01:49:53 +00:00
// packs whose ID is contained in packBlacklist from finalized indexes.
// The new index contains the IDs of all known indexes in the "supersedes"
// field. The IDs are also returned in the IDSet obsolete.
// After calling this function, you should remove the obsolete index files.
2020-10-18 07:24:34 +00:00
func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBlacklist restic.IDSet, extraObsolete restic.IDs, p *progress.Counter) (obsolete restic.IDSet, err error) {
2015-11-02 18:28:30 +00:00
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
2016-09-27 20:35:08 +00:00
debug.Log("start rebuilding index of %d indexes, pack blacklist: %v", len(mi.idx), packBlacklist)
2015-11-02 18:28:30 +00:00
newIndex := NewIndex()
obsolete = restic.NewIDSet()
2017-06-18 12:45:02 +00:00
2020-11-12 01:49:53 +00:00
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx)
2015-11-02 18:28:30 +00:00
2020-11-12 01:49:53 +00:00
ch := make(chan *Index)
wg.Go(func() error {
defer close(ch)
for i, idx := range mi.idx {
if idx.Final() {
ids, err := idx.IDs()
if err != nil {
debug.Log("index %d does not have an ID: %v", err)
return err
}
2015-11-02 18:28:30 +00:00
2020-11-12 01:49:53 +00:00
debug.Log("adding index ids %v to supersedes field", ids)
err = newIndex.AddToSupersedes(ids...)
if err != nil {
return err
}
obsolete.Merge(restic.NewIDSet(ids...))
} else {
debug.Log("index %d isn't final, don't add to supersedes field", i)
}
debug.Log("adding index %d", i)
for pbs := range idx.EachByPack(ctx, packBlacklist) {
newIndex.StorePack(pbs.packID, pbs.blobs)
p.Add(1)
if IndexFull(newIndex) {
select {
case ch <- newIndex:
case <-ctx.Done():
return nil
}
newIndex = NewIndex()
}
}
2015-11-02 18:28:30 +00:00
}
2020-11-12 01:49:53 +00:00
err = newIndex.AddToSupersedes(extraObsolete...)
if err != nil {
return err
}
obsolete.Merge(restic.NewIDSet(extraObsolete...))
2015-11-02 18:28:30 +00:00
2020-11-12 01:49:53 +00:00
select {
case ch <- newIndex:
case <-ctx.Done():
}
return nil
})
// a worker receives an index from ch, and saves the index
worker := func() error {
for idx := range ch {
idx.Finalize()
if _, err := SaveIndex(ctx, repo, idx); err != nil {
return err
}
2015-11-02 18:28:30 +00:00
}
2020-11-12 01:49:53 +00:00
return nil
2015-11-02 18:28:30 +00:00
}
2020-10-18 07:24:34 +00:00
2020-11-12 01:49:53 +00:00
// run workers on ch
wg.Go(func() error {
return RunWorkers(saveIndexParallelism, worker)
})
2020-10-18 07:24:34 +00:00
2020-11-12 01:49:53 +00:00
err = wg.Wait()
2015-11-02 18:28:30 +00:00
2020-11-12 01:49:53 +00:00
return obsolete, err
2015-11-02 18:28:30 +00:00
}