Prevent concurrent processing of same blob

... by first adding a preliminary index entry and making this fail if
an index entry for the same blob already exists.

A preliminary index entry is characterized by not yet being associated
with a pack. Until now, these entries where added to the index just
like final index entries using index.Store, which silently overwrites
existing index entries.

This commit adds a new method index.StoreInProgress which refuses to
overwrite existing index entries and allows for creating preliminary
index entries only. The existing method index.Store has not been
changed and continues to silently overwrite existing index entries.
This distinction is important, as otherwise, it would be impossible to
update a preliminary index entry after the blob has been written to a
pack.

Resolves: restic#292
This commit is contained in:
Philipp Serr 2015-09-23 22:27:48 +02:00
parent 4fb46faae7
commit 7b11660f4f
3 changed files with 104 additions and 6 deletions

View file

@ -2,6 +2,7 @@ package repository
import (
"encoding/json"
"errors"
"fmt"
"io"
"sync"
@ -43,7 +44,8 @@ func (idx *Index) store(t pack.BlobType, id backend.ID, pack *backend.ID, offset
}
}
// Store remembers the id and pack in the index.
// Store remembers the id and pack in the index. An existing entry will be
// silently overwritten.
func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) {
idx.m.Lock()
defer idx.m.Unlock()
@ -54,6 +56,26 @@ func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset
idx.store(t, id, pack, offset, length, false)
}
// StoreInProgress adds a preliminary index entry for a blob that is about to be
// saved. The entry must be updated using Store once the the blob has been
// written to a pack. Adding an preliminary index fails if there's an existing
// entry associated with the same id.
func (idx *Index) StoreInProgress(t pack.BlobType, id backend.ID) error {
idx.m.Lock()
defer idx.m.Unlock()
if _, hasID := idx.pack[id]; hasID {
errorMsg := fmt.Sprintf("index already contains id %v (%v)", id.Str(), t)
debug.Log("Index.StoreInProgress", errorMsg)
return errors.New(errorMsg)
}
idx.store(t, id, nil, 0, 0, false)
debug.Log("Index.StoreInProgress", "preliminary entry added for id %v (%v)",
id.Str(), t)
return nil
}
// Remove removes the pack ID from the index.
func (idx *Index) Remove(packID backend.ID) {
idx.m.Lock()

View file

@ -223,3 +223,68 @@ func TestIndexUnserialize(t *testing.T) {
Equals(t, test.length, length)
}
}
func TestStoreOverwritesPreliminaryEntry(t *testing.T) {
idx := repository.NewIndex()
blobID := randomID()
dataType := pack.Data
idx.StoreInProgress(dataType, blobID)
packID := randomID()
offset := uint(0)
length := uint(100)
idx.Store(dataType, blobID, &packID, offset, length)
actPackID, actType, actOffset, actLength, err := idx.Lookup(blobID)
OK(t, err)
Equals(t, packID, *actPackID)
Equals(t, dataType, actType)
Equals(t, offset, actOffset)
Equals(t, length, actLength)
}
func TestStoreInProgressAddsPreliminaryEntry(t *testing.T) {
idx := repository.NewIndex()
blobID := randomID()
dataType := pack.Data
err := idx.StoreInProgress(dataType, blobID)
OK(t, err)
actPackID, actType, actOffset, actLength, err := idx.Lookup(blobID)
OK(t, err)
Assert(t, actPackID == nil,
"Preliminary index entry illegaly associated with a pack id.")
Equals(t, uint(0), actOffset)
Equals(t, uint(0), actLength)
Equals(t, dataType, actType)
}
func TestStoreInProgressRefusesToOverwriteExistingFinalEntry(t *testing.T) {
idx := repository.NewIndex()
blobID := randomID()
dataType := pack.Data
packID := randomID()
offset := uint(0)
length := uint(100)
idx.Store(dataType, blobID, &packID, offset, length)
err := idx.StoreInProgress(dataType, blobID)
Assert(t, err != nil,
"index.StoreInProgress did not refuse to overwrite existing entry")
}
func TestStoreInProgressRefusesToOverwriteExistingPreliminaryEntry(t *testing.T) {
idx := repository.NewIndex()
blobID := randomID()
dataType := pack.Data
_ = idx.StoreInProgress(dataType, blobID)
err := idx.StoreInProgress(dataType, blobID)
Assert(t, err != nil,
"index.StoreInProgress did not refuse to overwrite existing entry")
}

View file

@ -303,20 +303,31 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID
return backend.ID{}, err
}
// add this id to the index, although we don't know yet in which pack it
// will be saved; the entry will be updated when the pack is written.
// Note: the current id needs to be added to the index before searching
// for a suitable packer: There's a little chance that more than one
// goroutine handles the same blob concurrently. Due to idx.StoreInProgress
// locking the index and raising an error if a matching index entry
// already exists, updating the index first ensures that only one of
// those goroutines will continue. See issue restic#292.
debug.Log("Repo.Save", "saving stub for %v (%v) in index", id.Str, t)
err = r.idx.StoreInProgress(t, *id)
if err != nil {
debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t)
return backend.ID{}, nil
}
// find suitable packer and add blob
packer, err := r.findPacker(uint(len(ciphertext)))
if err != nil {
r.idx.Remove(*id)
return backend.ID{}, err
}
// save ciphertext
packer.Add(t, *id, bytes.NewReader(ciphertext))
// add this id to the index, although we don't know yet in which pack it
// will be saved, the entry will be updated when the pack is written.
r.idx.Store(t, *id, nil, 0, 0)
debug.Log("Repo.Save", "saving stub for %v (%v) in index", id.Str, t)
// if the pack is not full enough and there are less than maxPackers
// packers, put back to the list
if packer.Size() < minPackSize && r.countPacker() < maxPackers {