Add and use MasterIndex

This commit is contained in:
Alexander Neumann 2015-10-12 22:34:12 +02:00
parent 64fa89d406
commit 86fcd170f6
10 changed files with 347 additions and 108 deletions

View file

@ -87,7 +87,7 @@ func (arch *Archiver) SaveTreeJSON(item interface{}) (backend.ID, error) {
// check if tree has been saved before
id := backend.Hash(data)
if arch.repo.Index().Has(id) {
if arch.repo.Index().IsInFlight(id) || arch.repo.Index().Has(id) {
return id, nil
}
@ -651,13 +651,13 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentID *backend.ID
}
// save index
indexID, err := arch.repo.SaveIndex()
err = arch.repo.SaveIndex()
if err != nil {
debug.Log("Archiver.Snapshot", "error saving index: %v", err)
return nil, backend.ID{}, err
}
debug.Log("Archiver.Snapshot", "saved index %v", indexID.Str())
debug.Log("Archiver.Snapshot", "saved indexes")
return sn, id, nil
}

View file

@ -209,15 +209,17 @@ func BenchmarkLoadTree(t *testing.B) {
list := make([]backend.ID, 0, 10)
done := make(chan struct{})
for blob := range repo.Index().Each(done) {
if blob.Type != pack.Tree {
continue
}
for _, idx := range repo.Index().All() {
for blob := range idx.Each(done) {
if blob.Type != pack.Tree {
continue
}
list = append(list, blob.ID)
if len(list) == cap(list) {
close(done)
break
list = append(list, blob.ID)
if len(list) == cap(list) {
close(done)
break
}
}
}

View file

@ -27,7 +27,7 @@ type Checker struct {
indexes map[backend.ID]*repository.Index
orphanedPacks backend.IDs
masterIndex *repository.Index
masterIndex *repository.MasterIndex
repo *repository.Repository
}
@ -37,7 +37,7 @@ func New(repo *repository.Repository) *Checker {
c := &Checker{
packs: make(map[backend.ID]struct{}),
blobs: make(map[backend.ID]struct{}),
masterIndex: repository.NewIndex(),
masterIndex: repository.NewMasterIndex(),
indexes: make(map[backend.ID]*repository.Index),
repo: repo,
}
@ -105,7 +105,7 @@ func (c *Checker) LoadIndex() error {
}
c.indexes[id] = res.Index
c.masterIndex.Merge(res.Index)
c.masterIndex.Insert(res.Index)
debug.Log("LoadIndex", "process blobs")
cnt := 0

View file

@ -73,12 +73,14 @@ func printTrees(repo *repository.Repository, wr io.Writer) error {
trees := []backend.ID{}
for blob := range repo.Index().Each(done) {
if blob.Type != pack.Tree {
continue
}
for _, idx := range repo.Index().All() {
for blob := range idx.Each(nil) {
if blob.Type != pack.Tree {
continue
}
trees = append(trees, blob.ID)
trees = append(trees, blob.ID)
}
}
for _, id := range trees {

View file

@ -49,8 +49,10 @@ func (cmd CmdList) Execute(args []string) error {
return err
}
for blob := range repo.Index().Each(nil) {
cmd.global.Printf("%s\n", blob.ID)
for _, idx := range repo.Index().All() {
for blob := range idx.Each(nil) {
cmd.global.Printf("%s\n", blob.ID)
}
}
return nil

View file

@ -18,6 +18,7 @@ type Index struct {
m sync.Mutex
pack map[backend.ID]indexEntry
final bool // set to true for all indexes read from the backend ("finalized")
supersedes backend.IDs
}
@ -26,7 +27,6 @@ type indexEntry struct {
packID *backend.ID
offset uint
length uint
old bool
}
// NewIndex returns a new index.
@ -36,26 +36,38 @@ func NewIndex() *Index {
}
}
func (idx *Index) store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint, old bool) {
func (idx *Index) store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) {
idx.pack[id] = indexEntry{
tpe: t,
packID: pack,
offset: offset,
length: length,
old: old,
}
}
// Final returns true iff the index is already written to the repository, it is
// finalized.
func (idx *Index) Final() bool {
idx.m.Lock()
defer idx.m.Unlock()
return idx.final
}
// Store remembers the id and pack in the index. An existing entry will be
// silently overwritten.
func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) {
idx.m.Lock()
defer idx.m.Unlock()
if idx.final {
panic("store new item in finalized index")
}
debug.Log("Index.Store", "pack %v contains id %v (%v), offset %v, length %v",
pack.Str(), id.Str(), t, offset, length)
idx.store(t, id, pack, offset, length, false)
idx.store(t, id, pack, offset, length)
}
// StoreInProgress adds a preliminary index entry for a blob that is about to be
@ -66,13 +78,17 @@ func (idx *Index) StoreInProgress(t pack.BlobType, id backend.ID) error {
idx.m.Lock()
defer idx.m.Unlock()
if idx.final {
panic("store new item in finalized index")
}
if _, hasID := idx.pack[id]; hasID {
errorMsg := fmt.Sprintf("index already contains id %v (%v)", id.Str(), t)
debug.Log("Index.StoreInProgress", errorMsg)
return errors.New(errorMsg)
}
idx.store(t, id, nil, 0, 0, false)
idx.store(t, id, nil, 0, 0)
debug.Log("Index.StoreInProgress", "preliminary entry added for id %v (%v)",
id.Str(), t)
return nil
@ -83,6 +99,10 @@ func (idx *Index) Remove(packID backend.ID) {
idx.m.Lock()
defer idx.m.Unlock()
if idx.final {
panic("remove item from finalized index")
}
debug.Log("Index.Remove", "id %v removed", packID.Str())
if _, ok := idx.pack[packID]; ok {
@ -270,32 +290,41 @@ type jsonIndex struct {
type jsonOldIndex []*packJSON
// encode writes the JSON serialization of the index filtered by selectFn to enc.
func (idx *Index) encode(w io.Writer, supersedes backend.IDs, selectFn func(indexEntry) bool) error {
list, err := idx.generatePackList(selectFn)
if err != nil {
return err
}
debug.Log("Index.Encode", "done, %d entries selected", len(list))
enc := json.NewEncoder(w)
idxJSON := jsonIndex{
Supersedes: supersedes,
Packs: list,
}
return enc.Encode(idxJSON)
}
// Encode writes the JSON serialization of the index to the writer w. This
// serialization only contains new blobs added via idx.Store(), not old ones
// introduced via DecodeIndex().
// Encode writes the JSON serialization of the index to the writer w.
func (idx *Index) Encode(w io.Writer) error {
debug.Log("Index.Encode", "encoding index")
idx.m.Lock()
defer idx.m.Unlock()
return idx.encode(w, idx.supersedes, func(e indexEntry) bool { return !e.old })
return idx.encode(w)
}
// encode writes the JSON serialization of the index to the writer w.
func (idx *Index) encode(w io.Writer) error {
debug.Log("Index.encode", "encoding index")
list, err := idx.generatePackList(nil)
if err != nil {
return err
}
enc := json.NewEncoder(w)
idxJSON := jsonIndex{
Supersedes: idx.supersedes,
Packs: list,
}
return enc.Encode(idxJSON)
}
// Finalize sets the index to final and writes the JSON serialization to w.
func (idx *Index) Finalize(w io.Writer) error {
debug.Log("Index.Encode", "encoding index")
idx.m.Lock()
defer idx.m.Unlock()
idx.final = true
return idx.encode(w)
}
// Dump writes the pretty-printed JSON representation of the index to w.
@ -358,10 +387,11 @@ func DecodeIndex(rd io.Reader) (idx *Index, err error) {
idx = NewIndex()
for _, pack := range idxJSON.Packs {
for _, blob := range pack.Blobs {
idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length, true)
idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length)
}
}
idx.supersedes = idxJSON.Supersedes
idx.final = true
debug.Log("Index.DecodeIndex", "done")
return idx, err
@ -382,7 +412,7 @@ func DecodeOldIndex(rd io.Reader) (idx *Index, err error) {
idx = NewIndex()
for _, pack := range list {
for _, blob := range pack.Blobs {
idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length, true)
idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length)
}
}
@ -436,8 +466,7 @@ func ConvertIndex(repo *Repository, id backend.ID) (backend.ID, error) {
idx.supersedes = backend.IDs{id}
// select all blobs for export
err = idx.encode(blob, idx.supersedes, func(e indexEntry) bool { return true })
err = idx.Encode(blob)
if err != nil {
debug.Log("ConvertIndex", "oldIdx.Encode() returned error: %v", err)
return id, err

View file

@ -86,7 +86,7 @@ func TestIndexSerialize(t *testing.T) {
Equals(t, testBlob.length, length)
}
// add more blobs to idx2
// add more blobs to idx
newtests := []testEntry{}
for i := 0; i < 10; i++ {
packID := randomID()
@ -95,7 +95,7 @@ func TestIndexSerialize(t *testing.T) {
for j := 0; j < 10; j++ {
id := randomID()
length := uint(i*100 + j)
idx2.Store(pack.Data, id, &packID, pos, length)
idx.Store(pack.Data, id, &packID, pos, length)
newtests = append(newtests, testEntry{
id: id,
@ -109,22 +109,20 @@ func TestIndexSerialize(t *testing.T) {
}
}
// serialize idx2, unserialize to idx3
// serialize idx, unserialize to idx3
wr3 := bytes.NewBuffer(nil)
err = idx2.Encode(wr3)
err = idx.Finalize(wr3)
OK(t, err)
Assert(t, idx.Final(),
"index not final after encoding")
idx3, err := repository.DecodeIndex(wr3)
OK(t, err)
Assert(t, idx3 != nil,
"nil returned for decoded index")
// all old blobs must not be present in the index
for _, testBlob := range tests {
_, _, _, _, err := idx3.Lookup(testBlob.id)
Assert(t, err != nil,
"found old id %v in serialized index", testBlob.id.Str())
}
Assert(t, idx3.Final(),
"decoded index is not final")
// all new blobs must be in the index
for _, testBlob := range newtests {
@ -333,7 +331,8 @@ func TestStoreOverwritesPreliminaryEntry(t *testing.T) {
blobID := randomID()
dataType := pack.Data
idx.StoreInProgress(dataType, blobID)
err := idx.StoreInProgress(dataType, blobID)
OK(t, err)
packID := randomID()
offset := uint(0)

200
repository/master_index.go Normal file
View file

@ -0,0 +1,200 @@
package repository
import (
"fmt"
"sync"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
)
// MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved.
type MasterIndex struct {
idx []*Index
idxMutex sync.RWMutex
inFlight struct {
backend.IDSet
sync.RWMutex
}
}
// NewMasterIndex creates a new master index.
func NewMasterIndex() *MasterIndex {
return &MasterIndex{
inFlight: struct {
backend.IDSet
sync.RWMutex
}{
IDSet: backend.NewIDSet(),
},
}
}
// Lookup queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) Lookup(id backend.ID) (packID *backend.ID, tpe pack.BlobType, offset, length uint, err error) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
debug.Log("MasterIndex.Lookup", "looking up id %v", id.Str())
for _, idx := range mi.idx {
packID, tpe, offset, length, err = idx.Lookup(id)
if err == nil {
debug.Log("MasterIndex.Lookup",
"found id %v in pack %v at offset %d with length %d",
id.Str(), packID.Str(), offset, length)
return
}
}
debug.Log("MasterIndex.Lookup", "id %v not found in any index", id.Str())
return nil, pack.Data, 0, 0, fmt.Errorf("id %v not found in any index", id)
}
// LookupSize queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) LookupSize(id backend.ID) (uint, error) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
length, err := idx.LookupSize(id)
if err == nil {
return length, nil
}
}
return 0, fmt.Errorf("id %v not found in any index", id)
}
// Has queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) Has(id backend.ID) bool {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
if idx.Has(id) {
return true
}
}
return false
}
// Count returns the number of blobs of type t in the index.
func (mi *MasterIndex) Count(t pack.BlobType) (n uint) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
var sum uint
for _, idx := range mi.idx {
sum += idx.Count(t)
}
return sum
}
// Insert adds a new index to the MasterIndex.
func (mi *MasterIndex) Insert(idx *Index) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
mi.idx = append(mi.idx, idx)
}
// Remove deletes an index from the MasterIndex.
func (mi *MasterIndex) Remove(index *Index) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
for i, idx := range mi.idx {
if idx == index {
mi.idx = append(mi.idx[:i], mi.idx[i+1:]...)
return
}
}
}
// Current returns an index that is not yet finalized, so that new entries can
// still be added. If all indexes are finalized, a new index is created and
// returned.
func (mi *MasterIndex) Current() *Index {
mi.idxMutex.RLock()
for _, idx := range mi.idx {
if !idx.Final() {
mi.idxMutex.RUnlock()
return idx
}
}
mi.idxMutex.RUnlock()
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
newIdx := NewIndex()
mi.idx = append(mi.idx, newIdx)
return newIdx
}
// AddInFlight add the given IDs to the list of in-flight IDs.
func (mi *MasterIndex) AddInFlight(IDs ...backend.ID) {
mi.inFlight.Lock()
defer mi.inFlight.Unlock()
ids := backend.IDs(IDs)
debug.Log("MasterIndex.AddInFlight", "adding %v", ids)
for _, id := range ids {
mi.inFlight.Insert(id)
}
}
// IsInFlight returns true iff the id is contained in the list of in-flight IDs.
func (mi *MasterIndex) IsInFlight(id backend.ID) bool {
mi.inFlight.RLock()
defer mi.inFlight.RUnlock()
inFlight := mi.inFlight.Has(id)
debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight)
return inFlight
}
// RemoveFromInFlight deletes the given ID from the liste of in-flight IDs.
func (mi *MasterIndex) RemoveFromInFlight(id backend.ID) {
mi.inFlight.Lock()
defer mi.inFlight.Unlock()
debug.Log("MasterIndex.RemoveFromInFlight", "removing %v from list of in flight blobs", id.Str())
mi.inFlight.Delete(id)
}
// NotFinalIndexes returns all indexes that have not yet been saved.
func (mi *MasterIndex) NotFinalIndexes() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
for _, idx := range mi.idx {
if !idx.Final() {
list = append(list, idx)
}
}
debug.Log("MasterIndex.NotFinalIndexes", "saving %d indexes", len(list))
return list
}
// All returns all indexes.
func (mi *MasterIndex) All() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
return mi.idx
}

View file

@ -24,7 +24,7 @@ type Repository struct {
Config Config
key *crypto.Key
keyName string
idx *Index
idx *MasterIndex
pm sync.Mutex
packs []*pack.Packer
@ -34,7 +34,7 @@ type Repository struct {
func New(be backend.Backend) *Repository {
return &Repository{
be: be,
idx: NewIndex(),
idx: NewMasterIndex(),
}
}
@ -204,7 +204,7 @@ func (r *Repository) LoadJSONPack(t pack.BlobType, id backend.ID, item interface
// LookupBlobSize returns the size of blob id.
func (r *Repository) LookupBlobSize(id backend.ID) (uint, error) {
return r.Index().LookupSize(id)
return r.idx.LookupSize(id)
}
const minPackSize = 4 * chunker.MiB
@ -269,7 +269,8 @@ func (r *Repository) savePacker(p *pack.Packer) error {
// update blobs in the index
for _, b := range p.Blobs() {
debug.Log("Repo.savePacker", " updating blob %v to pack %v", b.ID.Str(), sid.Str())
r.idx.Store(b.Type, b.ID, &sid, b.Offset, uint(b.Length))
r.idx.Current().Store(b.Type, b.ID, &sid, b.Offset, uint(b.Length))
r.idx.RemoveFromInFlight(b.ID)
}
return nil
@ -304,16 +305,15 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID
return backend.ID{}, err
}
// add this id to the index, although we don't know yet in which pack it
// will be saved; the entry will be updated when the pack is written.
// Note: the current id needs to be added to the index before searching
// for a suitable packer: There's a little chance that more than one
// goroutine handles the same blob concurrently. Due to idx.StoreInProgress
// locking the index and raising an error if a matching index entry
// already exists, updating the index first ensures that only one of
// those goroutines will continue. See issue restic#292.
debug.Log("Repo.Save", "saving stub for %v (%v) in index", id.Str, t)
err = r.idx.StoreInProgress(t, *id)
// check if this id is already been saved by another goroutine
if r.idx.IsInFlight(*id) {
debug.Log("Repo.Save", "blob %v is already being saved", id.Str())
return *id, nil
}
// add this id to the list of in-flight chunk ids.
debug.Log("Repo.Save", "add %v to list of in-flight IDs", id.Str())
r.idx.AddInFlight(*id)
if err != nil {
debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t)
return *id, nil
@ -322,12 +322,15 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID
// find suitable packer and add blob
packer, err := r.findPacker(uint(len(ciphertext)))
if err != nil {
r.idx.Remove(*id)
r.idx.RemoveFromInFlight(*id)
return backend.ID{}, err
}
// save ciphertext
packer.Add(t, *id, bytes.NewReader(ciphertext))
_, err = packer.Add(t, *id, bytes.NewReader(ciphertext))
if err != nil {
return backend.ID{}, err
}
// if the pack is not full enough and there are less than maxPackers
// packers, put back to the list
@ -446,13 +449,13 @@ func (r *Repository) Backend() backend.Backend {
return r.be
}
// Index returns the currently loaded Index.
func (r *Repository) Index() *Index {
// Index returns the currently used MasterIndex.
func (r *Repository) Index() *MasterIndex {
return r.idx
}
// SetIndex instructs the repository to use the given index.
func (r *Repository) SetIndex(i *Index) {
func (r *Repository) SetIndex(i *MasterIndex) {
r.idx = i
}
@ -510,37 +513,38 @@ func (bw *BlobWriter) ID() backend.ID {
return bw.id
}
// SaveIndex saves all new packs in the index in the backend, returned is the
// storage ID.
func (r *Repository) SaveIndex() (backend.ID, error) {
debug.Log("Repo.SaveIndex", "Saving index")
// SaveIndex saves all new indexes in the backend.
func (r *Repository) SaveIndex() error {
for i, idx := range r.idx.NotFinalIndexes() {
debug.Log("Repo.SaveIndex", "Saving index %d", i)
blob, err := r.CreateEncryptedBlob(backend.Index)
if err != nil {
return backend.ID{}, err
blob, err := r.CreateEncryptedBlob(backend.Index)
if err != nil {
return err
}
err = idx.Encode(blob)
if err != nil {
return err
}
err = blob.Close()
if err != nil {
return err
}
sid := blob.ID()
debug.Log("Repo.SaveIndex", "Saved index %d as %v", i, sid.Str())
}
err = r.idx.Encode(blob)
if err != nil {
return backend.ID{}, err
}
err = blob.Close()
if err != nil {
return backend.ID{}, err
}
sid := blob.ID()
debug.Log("Repo.SaveIndex", "Saved index as %v", sid.Str())
return sid, nil
return nil
}
const loadIndexParallelism = 20
// LoadIndex loads all index files from the backend in parallel and merges them
// with the current index. The first error that occurred is returned.
// LoadIndex loads all index files from the backend in parallel and stores them
// in the master index. The first error that occurred is returned.
func (r *Repository) LoadIndex() error {
debug.Log("Repo.LoadIndex", "Loading index")
@ -567,7 +571,7 @@ func (r *Repository) LoadIndex() error {
}()
for idx := range indexes {
r.idx.Merge(idx)
r.idx.Insert(idx)
}
if err := <-errCh; err != nil {

View file

@ -88,6 +88,7 @@ func TestSave(t *testing.T) {
Equals(t, id, sid)
OK(t, repo.Flush())
// OK(t, repo.SaveIndex())
// read back
buf, err := repo.LoadBlob(pack.Data, id, make([]byte, size))
@ -214,7 +215,7 @@ func BenchmarkLoadIndex(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
repo.SetIndex(repository.NewIndex())
repo.SetIndex(repository.NewMasterIndex())
OK(b, repo.LoadIndex())
}
})