Merge pull request #308 from episource/fix/restic_cr292_unreferenced_pack

fix:restic#292 Prevent concurrent processing of the same blob
This commit is contained in:
Alexander Neumann 2015-09-27 17:18:28 +02:00
commit 321c2e6a47
3 changed files with 104 additions and 6 deletions

View file

@ -2,6 +2,7 @@ package repository
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"sync" "sync"
@ -43,7 +44,8 @@ func (idx *Index) store(t pack.BlobType, id backend.ID, pack *backend.ID, offset
} }
} }
// Store remembers the id and pack in the index. // Store remembers the id and pack in the index. An existing entry will be
// silently overwritten.
func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) { func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) {
idx.m.Lock() idx.m.Lock()
defer idx.m.Unlock() defer idx.m.Unlock()
@ -54,6 +56,26 @@ func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset
idx.store(t, id, pack, offset, length, false) idx.store(t, id, pack, offset, length, false)
} }
// StoreInProgress adds a preliminary index entry for a blob that is about to be
// saved. The entry must be updated using Store once the the blob has been
// written to a pack. Adding an preliminary index fails if there's an existing
// entry associated with the same id.
func (idx *Index) StoreInProgress(t pack.BlobType, id backend.ID) error {
idx.m.Lock()
defer idx.m.Unlock()
if _, hasID := idx.pack[id]; hasID {
errorMsg := fmt.Sprintf("index already contains id %v (%v)", id.Str(), t)
debug.Log("Index.StoreInProgress", errorMsg)
return errors.New(errorMsg)
}
idx.store(t, id, nil, 0, 0, false)
debug.Log("Index.StoreInProgress", "preliminary entry added for id %v (%v)",
id.Str(), t)
return nil
}
// Remove removes the pack ID from the index. // Remove removes the pack ID from the index.
func (idx *Index) Remove(packID backend.ID) { func (idx *Index) Remove(packID backend.ID) {
idx.m.Lock() idx.m.Lock()

View file

@ -223,3 +223,68 @@ func TestIndexUnserialize(t *testing.T) {
Equals(t, test.length, length) Equals(t, test.length, length)
} }
} }
func TestStoreOverwritesPreliminaryEntry(t *testing.T) {
idx := repository.NewIndex()
blobID := randomID()
dataType := pack.Data
idx.StoreInProgress(dataType, blobID)
packID := randomID()
offset := uint(0)
length := uint(100)
idx.Store(dataType, blobID, &packID, offset, length)
actPackID, actType, actOffset, actLength, err := idx.Lookup(blobID)
OK(t, err)
Equals(t, packID, *actPackID)
Equals(t, dataType, actType)
Equals(t, offset, actOffset)
Equals(t, length, actLength)
}
func TestStoreInProgressAddsPreliminaryEntry(t *testing.T) {
idx := repository.NewIndex()
blobID := randomID()
dataType := pack.Data
err := idx.StoreInProgress(dataType, blobID)
OK(t, err)
actPackID, actType, actOffset, actLength, err := idx.Lookup(blobID)
OK(t, err)
Assert(t, actPackID == nil,
"Preliminary index entry illegaly associated with a pack id.")
Equals(t, uint(0), actOffset)
Equals(t, uint(0), actLength)
Equals(t, dataType, actType)
}
func TestStoreInProgressRefusesToOverwriteExistingFinalEntry(t *testing.T) {
idx := repository.NewIndex()
blobID := randomID()
dataType := pack.Data
packID := randomID()
offset := uint(0)
length := uint(100)
idx.Store(dataType, blobID, &packID, offset, length)
err := idx.StoreInProgress(dataType, blobID)
Assert(t, err != nil,
"index.StoreInProgress did not refuse to overwrite existing entry")
}
func TestStoreInProgressRefusesToOverwriteExistingPreliminaryEntry(t *testing.T) {
idx := repository.NewIndex()
blobID := randomID()
dataType := pack.Data
_ = idx.StoreInProgress(dataType, blobID)
err := idx.StoreInProgress(dataType, blobID)
Assert(t, err != nil,
"index.StoreInProgress did not refuse to overwrite existing entry")
}

View file

@ -303,20 +303,31 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID
return backend.ID{}, err return backend.ID{}, err
} }
// add this id to the index, although we don't know yet in which pack it
// will be saved; the entry will be updated when the pack is written.
// Note: the current id needs to be added to the index before searching
// for a suitable packer: There's a little chance that more than one
// goroutine handles the same blob concurrently. Due to idx.StoreInProgress
// locking the index and raising an error if a matching index entry
// already exists, updating the index first ensures that only one of
// those goroutines will continue. See issue restic#292.
debug.Log("Repo.Save", "saving stub for %v (%v) in index", id.Str, t)
err = r.idx.StoreInProgress(t, *id)
if err != nil {
debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t)
return backend.ID{}, nil
}
// find suitable packer and add blob // find suitable packer and add blob
packer, err := r.findPacker(uint(len(ciphertext))) packer, err := r.findPacker(uint(len(ciphertext)))
if err != nil { if err != nil {
r.idx.Remove(*id)
return backend.ID{}, err return backend.ID{}, err
} }
// save ciphertext // save ciphertext
packer.Add(t, *id, bytes.NewReader(ciphertext)) packer.Add(t, *id, bytes.NewReader(ciphertext))
// add this id to the index, although we don't know yet in which pack it
// will be saved, the entry will be updated when the pack is written.
r.idx.Store(t, *id, nil, 0, 0)
debug.Log("Repo.Save", "saving stub for %v (%v) in index", id.Str, t)
// if the pack is not full enough and there are less than maxPackers // if the pack is not full enough and there are less than maxPackers
// packers, put back to the list // packers, put back to the list
if packer.Size() < minPackSize && r.countPacker() < maxPackers { if packer.Size() < minPackSize && r.countPacker() < maxPackers {