Merge pull request #3787 from MichaelEischer/refactor-repository

repository: (Mostly) index-related cleanups
This commit is contained in:
MichaelEischer 2022-07-02 18:54:04 +02:00 committed by GitHub
commit fdc53a9d32
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 146 additions and 250 deletions

View file

@ -596,10 +596,8 @@ func prune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, usedB
func writeIndexFiles(gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs) (restic.IDSet, error) { func writeIndexFiles(gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs) (restic.IDSet, error) {
Verbosef("rebuilding index\n") Verbosef("rebuilding index\n")
idx := (repo.Index()).(*repository.MasterIndex) bar := newProgressMax(!gopts.Quiet, 0, "packs processed")
packcount := uint64(len(idx.Packs(removePacks))) obsoleteIndexes, err := repo.Index().Save(gopts.ctx, repo, removePacks, extraObsolete, bar)
bar := newProgressMax(!gopts.Quiet, packcount, "packs processed")
obsoleteIndexes, err := idx.Save(gopts.ctx, repo, removePacks, extraObsolete, bar)
bar.Done() bar.Done()
return obsoleteIndexes, err return obsoleteIndexes, err
} }

View file

@ -508,7 +508,6 @@ func TestCheckerBlobTypeConfusion(t *testing.T) {
test.OK(t, err) test.OK(t, err)
test.OK(t, repo.Flush(ctx)) test.OK(t, repo.Flush(ctx))
test.OK(t, repo.SaveIndex(ctx))
snapshot, err := restic.NewSnapshot([]string{"/damaged"}, []string{"test"}, "foo", time.Now()) snapshot, err := restic.NewSnapshot([]string{"/damaged"}, []string{"test"}, "foo", time.Now())
test.OK(t, err) test.OK(t, err)

View file

@ -46,8 +46,6 @@ type Index struct {
byType [restic.NumBlobTypes]indexMap byType [restic.NumBlobTypes]indexMap
packs restic.IDs packs restic.IDs
mixedPacks restic.IDSet mixedPacks restic.IDSet
// only used by Store, StorePacks does not check for already saved packIDs
packIDToIndex map[restic.ID]int
final bool // set to true for all indexes read from the backend ("finalized") final bool // set to true for all indexes read from the backend ("finalized")
ids restic.IDs // set to the IDs of the contained finalized indexes ids restic.IDs // set to the IDs of the contained finalized indexes
@ -58,7 +56,6 @@ type Index struct {
// NewIndex returns a new index. // NewIndex returns a new index.
func NewIndex() *Index { func NewIndex() *Index {
return &Index{ return &Index{
packIDToIndex: make(map[restic.ID]int),
mixedPacks: restic.NewIDSet(), mixedPacks: restic.NewIDSet(),
created: time.Now(), created: time.Now(),
} }
@ -131,27 +128,6 @@ var IndexFull = func(idx *Index, compress bool) bool {
} }
// Store remembers the id and pack in the index.
func (idx *Index) Store(pb restic.PackedBlob) {
idx.m.Lock()
defer idx.m.Unlock()
if idx.final {
panic("store new item in finalized index")
}
debug.Log("%v", pb)
// get packIndex and save if new packID
packIndex, ok := idx.packIDToIndex[pb.PackID]
if !ok {
packIndex = idx.addToPacks(pb.PackID)
idx.packIDToIndex[pb.PackID] = packIndex
}
idx.store(packIndex, pb.Blob)
}
// StorePack remembers the ids of all blobs of a given pack // StorePack remembers the ids of all blobs of a given pack
// in the index // in the index
func (idx *Index) StorePack(id restic.ID, blobs []restic.Blob) { func (idx *Index) StorePack(id restic.ID, blobs []restic.Blob) {
@ -197,24 +173,6 @@ func (idx *Index) Lookup(bh restic.BlobHandle, pbs []restic.PackedBlob) []restic
return pbs return pbs
} }
// ListPack returns a list of blobs contained in a pack.
func (idx *Index) ListPack(id restic.ID) (pbs []restic.PackedBlob) {
idx.m.Lock()
defer idx.m.Unlock()
for typ := range idx.byType {
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
if idx.packs[e.packIndex] == id {
pbs = append(pbs, idx.toPackedBlob(e, restic.BlobType(typ)))
}
return true
})
}
return pbs
}
// Has returns true iff the id is listed in the index. // Has returns true iff the id is listed in the index.
func (idx *Index) Has(bh restic.BlobHandle) bool { func (idx *Index) Has(bh restic.BlobHandle) bool {
idx.m.Lock() idx.m.Lock()
@ -353,15 +311,6 @@ func (idx *Index) Packs() restic.IDSet {
return packs return packs
} }
// Count returns the number of blobs of type t in the index.
func (idx *Index) Count(t restic.BlobType) (n uint) {
debug.Log("counting blobs of type %v", t)
idx.m.Lock()
defer idx.m.Unlock()
return idx.byType[t].len()
}
type packJSON struct { type packJSON struct {
ID restic.ID `json:"id"` ID restic.ID `json:"id"`
Blobs []blobJSON `json:"blobs"` Blobs []blobJSON `json:"blobs"`
@ -430,13 +379,6 @@ func (idx *Index) Encode(w io.Writer) error {
idx.m.Lock() idx.m.Lock()
defer idx.m.Unlock() defer idx.m.Unlock()
return idx.encode(w)
}
// encode writes the JSON serialization of the index to the writer w.
func (idx *Index) encode(w io.Writer) error {
debug.Log("encoding index")
list, err := idx.generatePackList() list, err := idx.generatePackList()
if err != nil { if err != nil {
return err return err
@ -457,8 +399,6 @@ func (idx *Index) Finalize() {
defer idx.m.Unlock() defer idx.m.Unlock()
idx.final = true idx.final = true
// clear packIDToIndex as no more elements will be added
idx.packIDToIndex = nil
} }
// IDs returns the IDs of the index, if available. If the index is not yet // IDs returns the IDs of the index, if available. If the index is not yet

View file

@ -2,6 +2,7 @@ package repository_test
import ( import (
"bytes" "bytes"
"context"
"math/rand" "math/rand"
"sync" "sync"
"testing" "testing"
@ -19,6 +20,7 @@ func TestIndexSerialize(t *testing.T) {
// create 50 packs with 20 blobs each // create 50 packs with 20 blobs each
for i := 0; i < 50; i++ { for i := 0; i < 50; i++ {
packID := restic.NewRandomID() packID := restic.NewRandomID()
var blobs []restic.Blob
pos := uint(0) pos := uint(0)
for j := 0; j < 20; j++ { for j := 0; j < 20; j++ {
@ -37,10 +39,11 @@ func TestIndexSerialize(t *testing.T) {
}, },
PackID: packID, PackID: packID,
} }
idx.Store(pb) blobs = append(blobs, pb.Blob)
tests = append(tests, pb) tests = append(tests, pb)
pos += length pos += length
} }
idx.StorePack(packID, blobs)
} }
wr := bytes.NewBuffer(nil) wr := bytes.NewBuffer(nil)
@ -83,6 +86,7 @@ func TestIndexSerialize(t *testing.T) {
newtests := []restic.PackedBlob{} newtests := []restic.PackedBlob{}
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
packID := restic.NewRandomID() packID := restic.NewRandomID()
var blobs []restic.Blob
pos := uint(0) pos := uint(0)
for j := 0; j < 10; j++ { for j := 0; j < 10; j++ {
@ -95,10 +99,11 @@ func TestIndexSerialize(t *testing.T) {
}, },
PackID: packID, PackID: packID,
} }
idx.Store(pb) blobs = append(blobs, pb.Blob)
newtests = append(newtests, pb) newtests = append(newtests, pb)
pos += length pos += length
} }
idx.StorePack(packID, blobs)
} }
// finalize; serialize idx, unserialize to idx3 // finalize; serialize idx, unserialize to idx3
@ -141,24 +146,23 @@ func TestIndexSize(t *testing.T) {
idx := repository.NewIndex() idx := repository.NewIndex()
packs := 200 packs := 200
blobs := 100 blobCount := 100
for i := 0; i < packs; i++ { for i := 0; i < packs; i++ {
packID := restic.NewRandomID() packID := restic.NewRandomID()
var blobs []restic.Blob
pos := uint(0) pos := uint(0)
for j := 0; j < blobs; j++ { for j := 0; j < blobCount; j++ {
length := uint(i*100 + j) length := uint(i*100 + j)
idx.Store(restic.PackedBlob{ blobs = append(blobs, restic.Blob{
Blob: restic.Blob{
BlobHandle: restic.NewRandomBlobHandle(), BlobHandle: restic.NewRandomBlobHandle(),
Offset: pos, Offset: pos,
Length: length, Length: length,
},
PackID: packID,
}) })
pos += length pos += length
} }
idx.StorePack(packID, blobs)
} }
wr := bytes.NewBuffer(nil) wr := bytes.NewBuffer(nil)
@ -166,7 +170,7 @@ func TestIndexSize(t *testing.T) {
err := idx.Encode(wr) err := idx.Encode(wr)
rtest.OK(t, err) rtest.OK(t, err)
t.Logf("Index file size for %d blobs in %d packs is %d", blobs*packs, packs, wr.Len()) t.Logf("Index file size for %d blobs in %d packs is %d", blobCount*packs, packs, wr.Len())
} }
// example index serialization from doc/Design.rst // example index serialization from doc/Design.rst
@ -333,7 +337,7 @@ func TestIndexUnserialize(t *testing.T) {
rtest.Equals(t, oldIdx, idx.Supersedes()) rtest.Equals(t, oldIdx, idx.Supersedes())
blobs := idx.ListPack(exampleLookupTest.packID) blobs := listPack(idx, exampleLookupTest.packID)
if len(blobs) != len(exampleLookupTest.blobs) { if len(blobs) != len(exampleLookupTest.blobs) {
t.Fatalf("expected %d blobs in pack, got %d", len(exampleLookupTest.blobs), len(blobs)) t.Fatalf("expected %d blobs in pack, got %d", len(exampleLookupTest.blobs), len(blobs))
} }
@ -350,6 +354,15 @@ func TestIndexUnserialize(t *testing.T) {
} }
} }
func listPack(idx *repository.Index, id restic.ID) (pbs []restic.PackedBlob) {
for pb := range idx.Each(context.TODO()) {
if pb.PackID.Equal(id) {
pbs = append(pbs, pb)
}
}
return pbs
}
var ( var (
benchmarkIndexJSON []byte benchmarkIndexJSON []byte
benchmarkIndexJSONOnce sync.Once benchmarkIndexJSONOnce sync.Once
@ -419,13 +432,12 @@ func TestIndexPacks(t *testing.T) {
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
packID := restic.NewRandomID() packID := restic.NewRandomID()
idx.Store(restic.PackedBlob{ idx.StorePack(packID, []restic.Blob{
Blob: restic.Blob{ {
BlobHandle: restic.NewRandomBlobHandle(), BlobHandle: restic.NewRandomBlobHandle(),
Offset: 0, Offset: 0,
Length: 23, Length: 23,
}, },
PackID: packID,
}) })
packs.Insert(packID) packs.Insert(packID)
@ -529,6 +541,7 @@ func TestIndexHas(t *testing.T) {
// create 50 packs with 20 blobs each // create 50 packs with 20 blobs each
for i := 0; i < 50; i++ { for i := 0; i < 50; i++ {
packID := restic.NewRandomID() packID := restic.NewRandomID()
var blobs []restic.Blob
pos := uint(0) pos := uint(0)
for j := 0; j < 20; j++ { for j := 0; j < 20; j++ {
@ -547,10 +560,11 @@ func TestIndexHas(t *testing.T) {
}, },
PackID: packID, PackID: packID,
} }
idx.Store(pb) blobs = append(blobs, pb.Blob)
tests = append(tests, pb) tests = append(tests, pb)
pos += length pos += length
} }
idx.StorePack(packID, blobs)
} }
for _, testBlob := range tests { for _, testBlob := range tests {

View file

@ -1,6 +1,7 @@
package repository package repository
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"sync" "sync"
@ -157,19 +158,6 @@ func (mi *MasterIndex) Packs(packBlacklist restic.IDSet) restic.IDSet {
return packs return packs
} }
// Count returns the number of blobs of type t in the index.
func (mi *MasterIndex) Count(t restic.BlobType) (n uint) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
var sum uint
for _, idx := range mi.idx {
sum += idx.Count(t)
}
return sum
}
// Insert adds a new index to the MasterIndex. // Insert adds a new index to the MasterIndex.
func (mi *MasterIndex) Insert(idx *Index) { func (mi *MasterIndex) Insert(idx *Index) {
mi.idxMutex.Lock() mi.idxMutex.Lock()
@ -200,9 +188,9 @@ func (mi *MasterIndex) StorePack(id restic.ID, blobs []restic.Blob) {
mi.idx = append(mi.idx, newIdx) mi.idx = append(mi.idx, newIdx)
} }
// FinalizeNotFinalIndexes finalizes all indexes that // finalizeNotFinalIndexes finalizes all indexes that
// have not yet been saved and returns that list // have not yet been saved and returns that list
func (mi *MasterIndex) FinalizeNotFinalIndexes() []*Index { func (mi *MasterIndex) finalizeNotFinalIndexes() []*Index {
mi.idxMutex.Lock() mi.idxMutex.Lock()
defer mi.idxMutex.Unlock() defer mi.idxMutex.Unlock()
@ -219,8 +207,8 @@ func (mi *MasterIndex) FinalizeNotFinalIndexes() []*Index {
return list return list
} }
// FinalizeFullIndexes finalizes all indexes that are full and returns that list. // finalizeFullIndexes finalizes all indexes that are full and returns that list.
func (mi *MasterIndex) FinalizeFullIndexes() []*Index { func (mi *MasterIndex) finalizeFullIndexes() []*Index {
mi.idxMutex.Lock() mi.idxMutex.Lock()
defer mi.idxMutex.Unlock() defer mi.idxMutex.Unlock()
@ -229,7 +217,6 @@ func (mi *MasterIndex) FinalizeFullIndexes() []*Index {
debug.Log("checking %d indexes", len(mi.idx)) debug.Log("checking %d indexes", len(mi.idx))
for _, idx := range mi.idx { for _, idx := range mi.idx {
if idx.Final() { if idx.Final() {
debug.Log("index %p is final", idx)
continue continue
} }
@ -246,14 +233,6 @@ func (mi *MasterIndex) FinalizeFullIndexes() []*Index {
return list return list
} }
// All returns all indexes.
func (mi *MasterIndex) All() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
return mi.idx
}
// Each returns a channel that yields all blobs known to the index. When the // Each returns a channel that yields all blobs known to the index. When the
// context is cancelled, the background goroutine terminates. This blocks any // context is cancelled, the background goroutine terminates. This blocks any
// modification of the index. // modification of the index.
@ -264,13 +243,10 @@ func (mi *MasterIndex) Each(ctx context.Context) <-chan restic.PackedBlob {
go func() { go func() {
defer mi.idxMutex.RUnlock() defer mi.idxMutex.RUnlock()
defer func() { defer close(ch)
close(ch)
}()
for _, idx := range mi.idx { for _, idx := range mi.idx {
idxCh := idx.Each(ctx) for pb := range idx.Each(ctx) {
for pb := range idxCh {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -299,7 +275,9 @@ func (mi *MasterIndex) MergeFinalIndexes() error {
idx := mi.idx[i] idx := mi.idx[i]
// clear reference in masterindex as it may become stale // clear reference in masterindex as it may become stale
mi.idx[i] = nil mi.idx[i] = nil
if !idx.Final() { // do not merge indexes that have no id set
ids, _ := idx.IDs()
if !idx.Final() || len(ids) == 0 {
newIdx = append(newIdx, idx) newIdx = append(newIdx, idx)
} else { } else {
err := mi.idx[0].merge(idx) err := mi.idx[0].merge(idx)
@ -320,7 +298,9 @@ const saveIndexParallelism = 4
// The new index contains the IDs of all known indexes in the "supersedes" // The new index contains the IDs of all known indexes in the "supersedes"
// field. The IDs are also returned in the IDSet obsolete. // field. The IDs are also returned in the IDSet obsolete.
// After calling this function, you should remove the obsolete index files. // After calling this function, you should remove the obsolete index files.
func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBlacklist restic.IDSet, extraObsolete restic.IDs, p *progress.Counter) (obsolete restic.IDSet, err error) { func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverUnpacked, packBlacklist restic.IDSet, extraObsolete restic.IDs, p *progress.Counter) (obsolete restic.IDSet, err error) {
p.SetMax(uint64(len(mi.Packs(packBlacklist))))
mi.idxMutex.Lock() mi.idxMutex.Lock()
defer mi.idxMutex.Unlock() defer mi.idxMutex.Unlock()
@ -405,6 +385,50 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBla
return obsolete, err return obsolete, err
} }
// SaveIndex saves an index in the repository.
func SaveIndex(ctx context.Context, repo restic.SaverUnpacked, index *Index) (restic.ID, error) {
buf := bytes.NewBuffer(nil)
err := index.Encode(buf)
if err != nil {
return restic.ID{}, err
}
id, err := repo.SaveUnpacked(ctx, restic.IndexFile, buf.Bytes())
ierr := index.SetID(id)
if ierr != nil {
// logic bug
panic(ierr)
}
return id, err
}
// saveIndex saves all indexes in the backend.
func (mi *MasterIndex) saveIndex(ctx context.Context, r restic.SaverUnpacked, indexes ...*Index) error {
for i, idx := range indexes {
debug.Log("Saving index %d", i)
sid, err := SaveIndex(ctx, r, idx)
if err != nil {
return err
}
debug.Log("Saved index %d as %v", i, sid)
}
return mi.MergeFinalIndexes()
}
// SaveIndex saves all new indexes in the backend.
func (mi *MasterIndex) SaveIndex(ctx context.Context, r restic.SaverUnpacked) error {
return mi.saveIndex(ctx, r, mi.finalizeNotFinalIndexes()...)
}
// SaveFullIndex saves all full indexes in the backend.
func (mi *MasterIndex) SaveFullIndex(ctx context.Context, r restic.SaverUnpacked) error {
return mi.saveIndex(ctx, r, mi.finalizeFullIndexes()...)
}
// ListPacks returns the blobs of the specified pack files grouped by pack file. // ListPacks returns the blobs of the specified pack files grouped by pack file.
func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan restic.PackBlobs { func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan restic.PackBlobs {
out := make(chan restic.PackBlobs) out := make(chan restic.PackBlobs)

View file

@ -57,12 +57,12 @@ func TestMasterIndex(t *testing.T) {
} }
idx1 := repository.NewIndex() idx1 := repository.NewIndex()
idx1.Store(blob1) idx1.StorePack(blob1.PackID, []restic.Blob{blob1.Blob})
idx1.Store(blob12a) idx1.StorePack(blob12a.PackID, []restic.Blob{blob12a.Blob})
idx2 := repository.NewIndex() idx2 := repository.NewIndex()
idx2.Store(blob2) idx2.StorePack(blob2.PackID, []restic.Blob{blob2.Blob})
idx2.Store(blob12b) idx2.StorePack(blob12b.PackID, []restic.Blob{blob12b.Blob})
mIdx := repository.NewMasterIndex() mIdx := repository.NewMasterIndex()
mIdx.Insert(idx1) mIdx.Insert(idx1)
@ -122,12 +122,6 @@ func TestMasterIndex(t *testing.T) {
rtest.Assert(t, blobs == nil, "Expected no blobs when fetching with a random id") rtest.Assert(t, blobs == nil, "Expected no blobs when fetching with a random id")
_, found = mIdx.LookupSize(restic.NewRandomBlobHandle()) _, found = mIdx.LookupSize(restic.NewRandomBlobHandle())
rtest.Assert(t, !found, "Expected no blobs when fetching with a random id") rtest.Assert(t, !found, "Expected no blobs when fetching with a random id")
// Test Count
num := mIdx.Count(restic.DataBlob)
rtest.Equals(t, uint(2), num)
num = mIdx.Count(restic.TreeBlob)
rtest.Equals(t, uint(2), num)
} }
func TestMasterMergeFinalIndexes(t *testing.T) { func TestMasterMergeFinalIndexes(t *testing.T) {
@ -154,25 +148,18 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
} }
idx1 := repository.NewIndex() idx1 := repository.NewIndex()
idx1.Store(blob1) idx1.StorePack(blob1.PackID, []restic.Blob{blob1.Blob})
idx2 := repository.NewIndex() idx2 := repository.NewIndex()
idx2.Store(blob2) idx2.StorePack(blob2.PackID, []restic.Blob{blob2.Blob})
mIdx := repository.NewMasterIndex() mIdx := repository.NewMasterIndex()
mIdx.Insert(idx1) mIdx.Insert(idx1)
mIdx.Insert(idx2) mIdx.Insert(idx2)
finalIndexes := mIdx.FinalizeNotFinalIndexes() finalIndexes, idxCount := repository.TestMergeIndex(t, mIdx)
rtest.Equals(t, []*repository.Index{idx1, idx2}, finalIndexes) rtest.Equals(t, []*repository.Index{idx1, idx2}, finalIndexes)
rtest.Equals(t, 1, idxCount)
err := mIdx.MergeFinalIndexes()
if err != nil {
t.Fatal(err)
}
allIndexes := mIdx.All()
rtest.Equals(t, 1, len(allIndexes))
blobCount := 0 blobCount := 0
for range mIdx.Each(context.TODO()) { for range mIdx.Each(context.TODO()) {
@ -191,20 +178,13 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
// merge another index containing identical blobs // merge another index containing identical blobs
idx3 := repository.NewIndex() idx3 := repository.NewIndex()
idx3.Store(blob1) idx3.StorePack(blob1.PackID, []restic.Blob{blob1.Blob})
idx3.Store(blob2) idx3.StorePack(blob2.PackID, []restic.Blob{blob2.Blob})
mIdx.Insert(idx3) mIdx.Insert(idx3)
finalIndexes = mIdx.FinalizeNotFinalIndexes() finalIndexes, idxCount = repository.TestMergeIndex(t, mIdx)
rtest.Equals(t, []*repository.Index{idx3}, finalIndexes) rtest.Equals(t, []*repository.Index{idx3}, finalIndexes)
rtest.Equals(t, 1, idxCount)
err = mIdx.MergeFinalIndexes()
if err != nil {
t.Fatal(err)
}
allIndexes = mIdx.All()
rtest.Equals(t, 1, len(allIndexes))
// Index should have same entries as before! // Index should have same entries as before!
blobs = mIdx.Lookup(bhInIdx1) blobs = mIdx.Lookup(bhInIdx1)
@ -229,11 +209,7 @@ func createRandomMasterIndex(t testing.TB, rng *rand.Rand, num, size int) (*repo
idx1, lookupBh := createRandomIndex(rng, size) idx1, lookupBh := createRandomIndex(rng, size)
mIdx.Insert(idx1) mIdx.Insert(idx1)
mIdx.FinalizeNotFinalIndexes() repository.TestMergeIndex(t, mIdx)
err := mIdx.MergeFinalIndexes()
if err != nil {
t.Fatal(err)
}
return mIdx, lookupBh return mIdx, lookupBh
} }
@ -291,14 +267,12 @@ func BenchmarkMasterIndexLookupMultipleIndexUnknown(b *testing.B) {
} }
func BenchmarkMasterIndexLookupParallel(b *testing.B) { func BenchmarkMasterIndexLookupParallel(b *testing.B) {
mIdx := repository.NewMasterIndex()
for _, numindices := range []int{25, 50, 100} { for _, numindices := range []int{25, 50, 100} {
var lookupBh restic.BlobHandle var lookupBh restic.BlobHandle
b.StopTimer() b.StopTimer()
rng := rand.New(rand.NewSource(0)) rng := rand.New(rand.NewSource(0))
mIdx, lookupBh = createRandomMasterIndex(b, rng, numindices, 10000) mIdx, lookupBh := createRandomMasterIndex(b, rng, numindices, 10000)
b.StartTimer() b.StartTimer()
name := fmt.Sprintf("known,indices=%d", numindices) name := fmt.Sprintf("known,indices=%d", numindices)
@ -361,7 +335,7 @@ func testIndexSave(t *testing.T, version uint) {
t.Fatal(err) t.Fatal(err)
} }
obsoletes, err := repo.Index().(*repository.MasterIndex).Save(context.TODO(), repo, nil, nil, nil) obsoletes, err := repo.Index().Save(context.TODO(), repo, nil, nil, nil)
if err != nil { if err != nil {
t.Fatalf("unable to save new index: %v", err) t.Fatalf("unable to save new index: %v", err)
} }

View file

@ -154,7 +154,7 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe
if r.noAutoIndexUpdate { if r.noAutoIndexUpdate {
return nil return nil
} }
return r.SaveFullIndex(ctx) return r.idx.SaveFullIndex(ctx, r)
} }
// countPacker returns the number of open (unfinished) packers. // countPacker returns the number of open (unfinished) packers.

View file

@ -155,8 +155,8 @@ func repack(t *testing.T, repo restic.Repository, packs restic.IDSet, blobs rest
} }
} }
func saveIndex(t *testing.T, repo restic.Repository) { func flush(t *testing.T, repo restic.Repository) {
if err := repo.SaveIndex(context.TODO()); err != nil { if err := repo.Flush(context.TODO()); err != nil {
t.Fatalf("repo.SaveIndex() %v", err) t.Fatalf("repo.SaveIndex() %v", err)
} }
} }
@ -192,9 +192,7 @@ func rebuildIndex(t *testing.T, repo restic.Repository) {
t.Fatal(err) t.Fatal(err)
} }
_, err = (repo.Index()).(*repository.MasterIndex). _, err = repo.Index().Save(context.TODO(), repo, restic.NewIDSet(), nil, nil)
Save(context.TODO(), repo, restic.NewIDSet(), nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -237,7 +235,7 @@ func testRepack(t *testing.T, version uint) {
packsBefore, packsAfter) packsBefore, packsAfter)
} }
saveIndex(t, repo) flush(t, repo)
removeBlobs, keepBlobs := selectBlobs(t, repo, 0.2) removeBlobs, keepBlobs := selectBlobs(t, repo, 0.2)
@ -297,7 +295,7 @@ func testRepackCopy(t *testing.T, version uint) {
t.Logf("rand seed is %v", seed) t.Logf("rand seed is %v", seed)
createRandomBlobs(t, repo, 100, 0.7) createRandomBlobs(t, repo, 100, 0.7)
saveIndex(t, repo) flush(t, repo)
_, keepBlobs := selectBlobs(t, repo, 0.2) _, keepBlobs := selectBlobs(t, repo, 0.2)
copyPacks := findPacksForBlobs(t, repo, keepBlobs) copyPacks := findPacksForBlobs(t, repo, keepBlobs)

View file

@ -519,7 +519,7 @@ func (r *Repository) SaveUnpacked(ctx context.Context, t restic.FileType, p []by
// Flush saves all remaining packs and the index // Flush saves all remaining packs and the index
func (r *Repository) Flush(ctx context.Context) error { func (r *Repository) Flush(ctx context.Context) error {
if err := r.FlushPacks(ctx); err != nil { if err := r.flushPacks(ctx); err != nil {
return err return err
} }
@ -527,11 +527,11 @@ func (r *Repository) Flush(ctx context.Context) error {
if r.noAutoIndexUpdate { if r.noAutoIndexUpdate {
return nil return nil
} }
return r.SaveIndex(ctx) return r.idx.SaveIndex(ctx, r)
} }
// FlushPacks saves all remaining packs. // flushPacks saves all remaining packs.
func (r *Repository) FlushPacks(ctx context.Context) error { func (r *Repository) flushPacks(ctx context.Context) error {
pms := []struct { pms := []struct {
t restic.BlobType t restic.BlobType
pm *packerManager pm *packerManager
@ -573,44 +573,6 @@ func (r *Repository) SetIndex(i restic.MasterIndex) error {
return r.PrepareCache() return r.PrepareCache()
} }
// SaveIndex saves an index in the repository.
func SaveIndex(ctx context.Context, repo restic.Repository, index *Index) (restic.ID, error) {
buf := bytes.NewBuffer(nil)
err := index.Encode(buf)
if err != nil {
return restic.ID{}, err
}
return repo.SaveUnpacked(ctx, restic.IndexFile, buf.Bytes())
}
// saveIndex saves all indexes in the backend.
func (r *Repository) saveIndex(ctx context.Context, indexes ...*Index) error {
for i, idx := range indexes {
debug.Log("Saving index %d", i)
sid, err := SaveIndex(ctx, r, idx)
if err != nil {
return err
}
debug.Log("Saved index %d as %v", i, sid)
}
return r.idx.MergeFinalIndexes()
}
// SaveIndex saves all new indexes in the backend.
func (r *Repository) SaveIndex(ctx context.Context) error {
return r.saveIndex(ctx, r.idx.FinalizeNotFinalIndexes()...)
}
// SaveFullIndex saves all full indexes in the backend.
func (r *Repository) SaveFullIndex(ctx context.Context) error {
return r.saveIndex(ctx, r.idx.FinalizeFullIndexes()...)
}
// LoadIndex loads all index files from the backend in parallel and stores them // LoadIndex loads all index files from the backend in parallel and stores them
// in the master index. The first error that occurred is returned. // in the master index. The first error that occurred is returned.
func (r *Repository) LoadIndex(ctx context.Context) error { func (r *Repository) LoadIndex(ctx context.Context) error {
@ -620,12 +582,6 @@ func (r *Repository) LoadIndex(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
_, err = idx.IDs()
if err != nil {
return err
}
r.idx.Insert(idx) r.idx.Insert(idx)
return nil return nil
}) })
@ -687,7 +643,6 @@ func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[rest
return nil return nil
}) })
idx := NewIndex()
// a worker receives an pack ID from ch, reads the pack contents, and adds them to idx // a worker receives an pack ID from ch, reads the pack contents, and adds them to idx
worker := func() error { worker := func() error {
for fi := range ch { for fi := range ch {
@ -698,7 +653,7 @@ func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[rest
invalid = append(invalid, fi.ID) invalid = append(invalid, fi.ID)
m.Unlock() m.Unlock()
} }
idx.StorePack(fi.ID, entries) r.idx.StorePack(fi.ID, entries)
p.Add(1) p.Add(1)
} }
@ -715,9 +670,6 @@ func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[rest
return invalid, errors.Fatal(err.Error()) return invalid, errors.Fatal(err.Error())
} }
// Add idx to MasterIndex
r.idx.Insert(idx)
return invalid, nil return invalid, nil
} }
@ -919,11 +871,6 @@ func (r *Repository) SaveTree(ctx context.Context, t *restic.Tree) (restic.ID, e
return id, err return id, err
} }
// Loader allows loading data from a backend.
type Loader interface {
Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
}
type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
// StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to // StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to

View file

@ -362,20 +362,19 @@ func benchmarkLoadIndex(b *testing.B, version uint) {
idx := repository.NewIndex() idx := repository.NewIndex()
for i := 0; i < 5000; i++ { for i := 0; i < 5000; i++ {
idx.Store(restic.PackedBlob{ idx.StorePack(restic.NewRandomID(), []restic.Blob{
Blob: restic.Blob{ {
BlobHandle: restic.NewRandomBlobHandle(), BlobHandle: restic.NewRandomBlobHandle(),
Length: 1234, Length: 1234,
Offset: 1235, Offset: 1235,
}, },
PackID: restic.NewRandomID(),
}) })
} }
id, err := repository.SaveIndex(context.TODO(), repo, idx) id, err := repository.SaveIndex(context.TODO(), repo, idx)
rtest.OK(b, err) rtest.OK(b, err)
b.Logf("index saved as %v (%v entries)", id.Str(), idx.Count(restic.DataBlob)) b.Logf("index saved as %v", id.Str())
fi, err := repo.Backend().Stat(context.TODO(), restic.Handle{Type: restic.IndexFile, Name: id.String()}) fi, err := repo.Backend().Stat(context.TODO(), restic.Handle{Type: restic.IndexFile, Name: id.String()})
rtest.OK(b, err) rtest.OK(b, err)
b.Logf("filesize is %v", fi.Size) b.Logf("filesize is %v", fi.Size)
@ -414,25 +413,15 @@ func testRepositoryIncrementalIndex(t *testing.T, version uint) {
repository.IndexFull = func(*repository.Index, bool) bool { return true } repository.IndexFull = func(*repository.Index, bool) bool { return true }
// add 15 packs // add a few rounds of packs
for j := 0; j < 5; j++ { for j := 0; j < 5; j++ {
// add 3 packs, write intermediate index // add some packs, write intermediate index
for i := 0; i < 3; i++ { saveRandomDataBlobs(t, repo, 20, 1<<15)
saveRandomDataBlobs(t, repo, 5, 1<<15) rtest.OK(t, repo.Flush(context.TODO()))
rtest.OK(t, repo.FlushPacks(context.Background()))
}
rtest.OK(t, repo.SaveFullIndex(context.TODO()))
}
// add another 5 packs
for i := 0; i < 5; i++ {
saveRandomDataBlobs(t, repo, 5, 1<<15)
rtest.OK(t, repo.FlushPacks(context.Background()))
} }
// save final index // save final index
rtest.OK(t, repo.SaveIndex(context.TODO())) rtest.OK(t, repo.Flush(context.TODO()))
packEntries := make(map[restic.ID]map[restic.ID]struct{}) packEntries := make(map[restic.ID]map[restic.ID]struct{})

View file

@ -132,3 +132,13 @@ func BenchmarkAllVersions(b *testing.B, bench VersionedBenchmark) {
}) })
} }
} }
func TestMergeIndex(t testing.TB, mi *MasterIndex) ([]*Index, int) {
finalIndexes := mi.finalizeNotFinalIndexes()
for _, idx := range finalIndexes {
test.OK(t, idx.SetID(restic.NewRandomID()))
}
test.OK(t, mi.MergeFinalIndexes())
return finalIndexes, len(mi.idx)
}

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/ui/progress"
) )
// Repository stores data in a backend. It provides high-level functions and // Repository stores data in a backend. It provides high-level functions and
@ -15,17 +16,13 @@ type Repository interface {
Key() *crypto.Key Key() *crypto.Key
SetIndex(MasterIndex) error
Index() MasterIndex Index() MasterIndex
SaveFullIndex(context.Context) error
SaveIndex(context.Context) error
LoadIndex(context.Context) error LoadIndex(context.Context) error
SetIndex(MasterIndex) error
LookupBlobSize(ID, BlobType) (uint, bool)
Config() Config Config() Config
LookupBlobSize(ID, BlobType) (uint, bool)
// List calls the function fn for each file of type t in the repository. // List calls the function fn for each file of type t in the repository.
// When an error is returned by fn, processing stops and List() returns the // When an error is returned by fn, processing stops and List() returns the
// error. // error.
@ -65,6 +62,11 @@ type LoadJSONUnpackeder interface {
LoadJSONUnpacked(ctx context.Context, t FileType, id ID, dest interface{}) error LoadJSONUnpacked(ctx context.Context, t FileType, id ID, dest interface{}) error
} }
// SaverUnpacked allows saving a blob not stored in a pack file
type SaverUnpacked interface {
SaveUnpacked(context.Context, FileType, []byte) (ID, error)
}
type PackBlobs struct { type PackBlobs struct {
PackID ID PackID ID
Blobs []Blob Blobs []Blob
@ -74,11 +76,12 @@ type PackBlobs struct {
type MasterIndex interface { type MasterIndex interface {
Has(BlobHandle) bool Has(BlobHandle) bool
Lookup(BlobHandle) []PackedBlob Lookup(BlobHandle) []PackedBlob
Count(BlobType) uint
// Each returns a channel that yields all blobs known to the index. When // Each returns a channel that yields all blobs known to the index. When
// the context is cancelled, the background goroutine terminates. This // the context is cancelled, the background goroutine terminates. This
// blocks any modification of the index. // blocks any modification of the index.
Each(ctx context.Context) <-chan PackedBlob Each(ctx context.Context) <-chan PackedBlob
ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs
Save(ctx context.Context, repo SaverUnpacked, packBlacklist IDSet, extraObsolete IDs, p *progress.Counter) (obsolete IDSet, err error)
} }