Merge pull request #406 from restic/fix-405

Test and fix for #405
This commit is contained in:
Alexander Neumann 2016-02-02 20:53:52 +01:00
commit c0bbb7254d
7 changed files with 195 additions and 100 deletions

View file

@ -32,6 +32,10 @@ var archiverAllowAllFiles = func(string, os.FileInfo) bool { return true }
// Archiver is used to backup a set of directories.
type Archiver struct {
repo *repository.Repository
knownBlobs struct {
backend.IDSet
sync.Mutex
}
blobToken chan struct{}
@ -45,6 +49,12 @@ func NewArchiver(repo *repository.Repository) *Archiver {
arch := &Archiver{
repo: repo,
blobToken: make(chan struct{}, maxConcurrentBlobs),
knownBlobs: struct {
backend.IDSet
sync.Mutex
}{
IDSet: backend.NewIDSet(),
},
}
for i := 0; i < maxConcurrentBlobs; i++ {
@ -57,17 +67,37 @@ func NewArchiver(repo *repository.Repository) *Archiver {
return arch
}
// isKnownBlob returns true iff the blob is not yet in the list of known blobs.
// When the blob is not known, false is returned and the blob is added to the
// list. This means that the caller false is returned to is responsible to save
// the blob to the backend.
func (arch *Archiver) isKnownBlob(id backend.ID) bool {
arch.knownBlobs.Lock()
defer arch.knownBlobs.Unlock()
if arch.knownBlobs.Has(id) {
return true
}
arch.knownBlobs.Insert(id)
_, err := arch.repo.Index().Lookup(id)
if err == nil {
return true
}
return false
}
// Save stores a blob read from rd in the repository.
func (arch *Archiver) Save(t pack.BlobType, id backend.ID, length uint, rd io.Reader) error {
debug.Log("Archiver.Save", "Save(%v, %v)\n", t, id.Str())
// test if this blob is already known
if arch.repo.Index().Has(id) {
debug.Log("Archiver.Save", "(%v, %v) already saved\n", t, id.Str())
if arch.isKnownBlob(id) {
debug.Log("Archiver.Save", "blob %v is known\n", id.Str())
return nil
}
// otherwise save blob
err := arch.repo.SaveFrom(t, &id, length, rd)
if err != nil {
debug.Log("Archiver.Save", "Save(%v, %v): error %v\n", t, id.Str(), err)
@ -88,7 +118,7 @@ func (arch *Archiver) SaveTreeJSON(item interface{}) (backend.ID, error) {
// check if tree has been saved before
id := backend.Hash(data)
if arch.repo.Index().IsInFlight(id) || arch.repo.Index().Has(id) {
if arch.isKnownBlob(id) {
return id, nil
}

View file

@ -0,0 +1,150 @@
package restic_test
import (
"bytes"
"crypto/rand"
"errors"
"io"
mrand "math/rand"
"sync"
"testing"
"time"
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/pack"
"github.com/restic/restic/repository"
)
const parallelSaves = 50
const testSaveIndexTime = 100 * time.Millisecond
const testTimeout = 2 * time.Second
var DupID backend.ID
func randomID() backend.ID {
if mrand.Float32() < 0.5 {
return DupID
}
id := backend.ID{}
_, err := io.ReadFull(rand.Reader, id[:])
if err != nil {
panic(err)
}
return id
}
// forgetfulBackend returns a backend that forgets everything.
func forgetfulBackend() backend.Backend {
be := &backend.MockBackend{}
be.TestFn = func(t backend.Type, name string) (bool, error) {
return false, nil
}
be.LoadFn = func(h backend.Handle, p []byte, off int64) (int, error) {
return 0, errors.New("not found")
}
be.SaveFn = func(h backend.Handle, p []byte) error {
return nil
}
be.StatFn = func(h backend.Handle) (backend.BlobInfo, error) {
return backend.BlobInfo{}, errors.New("not found")
}
be.RemoveFn = func(t backend.Type, name string) error {
return nil
}
be.ListFn = func(t backend.Type, done <-chan struct{}) <-chan string {
ch := make(chan string)
close(ch)
return ch
}
be.DeleteFn = func() error {
return nil
}
return be
}
func testArchiverDuplication(t *testing.T) {
_, err := io.ReadFull(rand.Reader, DupID[:])
if err != nil {
t.Fatal(err)
}
repo := repository.New(forgetfulBackend())
err = repo.Init("foo")
if err != nil {
t.Fatal(err)
}
arch := restic.NewArchiver(repo)
wg := &sync.WaitGroup{}
done := make(chan struct{})
for i := 0; i < parallelSaves; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-done:
return
default:
}
id := randomID()
if repo.Index().Has(id) {
continue
}
buf := make([]byte, 50)
err := arch.Save(pack.Data, id, uint(len(buf)), bytes.NewReader(buf))
if err != nil {
t.Fatal(err)
}
}
}()
}
saveIndex := func() {
defer wg.Done()
ticker := time.NewTicker(testSaveIndexTime)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-ticker.C:
err := repo.SaveFullIndex()
if err != nil {
t.Fatal(err)
}
}
}
}
wg.Add(1)
go saveIndex()
<-time.After(testTimeout)
close(done)
wg.Wait()
}
func TestArchiverDuplication(t *testing.T) {
for i := 0; i < 5; i++ {
testArchiverDuplication(t)
}
}

View file

@ -136,7 +136,7 @@ func repackBlob(src, dst *repository.Repository, id backend.ID) error {
return errors.New("LoadBlob returned wrong data, len() doesn't match")
}
_, err = dst.SaveAndEncrypt(blob.Type, buf, &id, true)
_, err = dst.SaveAndEncrypt(blob.Type, buf, &id)
if err != nil {
return err
}

View file

@ -13,23 +13,11 @@ import (
type MasterIndex struct {
idx []*Index
idxMutex sync.RWMutex
inFlight struct {
backend.IDSet
sync.RWMutex
}
}
// NewMasterIndex creates a new master index.
func NewMasterIndex() *MasterIndex {
return &MasterIndex{
inFlight: struct {
backend.IDSet
sync.RWMutex
}{
IDSet: backend.NewIDSet(),
},
}
return &MasterIndex{}
}
// Lookup queries all known Indexes for the ID and returns the first match.
@ -154,68 +142,6 @@ func (mi *MasterIndex) Current() *Index {
return newIdx
}
// AddInFlight add the given ID to the list of in-flight IDs. An error is
// returned when the ID is already in the list. Setting ignoreDuplicates to
// true only checks the in flight list, otherwise the index itself is also
// tested.
func (mi *MasterIndex) AddInFlight(id backend.ID, ignoreDuplicates bool) error {
// The index + inFlight store must be searched for a matching id in one
// atomic operation. This requires locking the inFlight store and the
// index together!
mi.inFlight.Lock()
defer mi.inFlight.Unlock()
if !ignoreDuplicates {
// Note: mi.Has read locks the index again.
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
}
debug.Log("MasterIndex.AddInFlight", "adding %v", id.Str())
if mi.inFlight.Has(id) {
return fmt.Errorf("%v is already in flight", id.Str())
}
if !ignoreDuplicates {
if mi.Has(id) {
return fmt.Errorf("%v is already indexed (fully processed)", id)
}
}
mi.inFlight.Insert(id)
return nil
}
// IsInFlight returns true iff the id is contained in the list of in-flight IDs.
func (mi *MasterIndex) IsInFlight(id backend.ID) bool {
// The index + inFlight store must be searched for a matching id in one
// atomic operation. This requires locking the inFlight store and the
// index together!
mi.inFlight.RLock()
defer mi.inFlight.RUnlock()
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
inFlight := mi.inFlight.Has(id)
debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight)
indexed := mi.Has(id)
debug.Log("MasterIndex.IsInFlight", "testing whether %v is indexed (fully processed): %v", id.Str(), indexed)
return inFlight
}
// RemoveFromInFlight deletes the given ID from the liste of in-flight IDs.
func (mi *MasterIndex) RemoveFromInFlight(id backend.ID) {
mi.inFlight.Lock()
defer mi.inFlight.Unlock()
debug.Log("MasterIndex.RemoveFromInFlight", "removing %v from list of in flight blobs", id.Str())
mi.inFlight.Delete(id)
}
// NotFinalIndexes returns all indexes that have not yet been saved.
func (mi *MasterIndex) NotFinalIndexes() []*Index {
mi.idxMutex.Lock()

View file

@ -84,7 +84,6 @@ func (r *Repository) savePacker(p *pack.Packer) error {
Offset: b.Offset,
Length: uint(b.Length),
})
r.idx.RemoveFromInFlight(b.ID)
}
return nil

View file

@ -167,10 +167,9 @@ func (r *Repository) LookupBlobSize(id backend.ID) (uint, error) {
return r.idx.LookupSize(id)
}
// SaveAndEncrypt encrypts data and stores it to the backend as type t. If data is small
// enough, it will be packed together with other small blobs. When
// ignoreDuplicates is true, blobs already in the index will be saved again.
func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID, ignoreDuplicates bool) (backend.ID, error) {
// SaveAndEncrypt encrypts data and stores it to the backend as type t. If data
// is small enough, it will be packed together with other small blobs.
func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID) (backend.ID, error) {
if id == nil {
// compute plaintext hash
hashedID := backend.Hash(data)
@ -189,18 +188,9 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID
return backend.ID{}, err
}
// add this id to the list of in-flight chunk ids.
debug.Log("Repo.Save", "add %v to list of in-flight IDs", id.Str())
err = r.idx.AddInFlight(*id, ignoreDuplicates)
if err != nil {
debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t)
return *id, nil
}
// find suitable packer and add blob
packer, err := r.findPacker(uint(len(ciphertext)))
if err != nil {
r.idx.RemoveFromInFlight(*id)
return backend.ID{}, err
}
@ -234,7 +224,7 @@ func (r *Repository) SaveFrom(t pack.BlobType, id *backend.ID, length uint, rd i
return err
}
_, err = r.SaveAndEncrypt(t, buf, id, false)
_, err = r.SaveAndEncrypt(t, buf, id)
if err != nil {
return err
}
@ -258,7 +248,7 @@ func (r *Repository) SaveJSON(t pack.BlobType, item interface{}) (backend.ID, er
}
buf = wr.Bytes()
return r.SaveAndEncrypt(t, buf, nil, false)
return r.SaveAndEncrypt(t, buf, nil)
}
// SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the

View file

@ -83,7 +83,7 @@ func TestSave(t *testing.T) {
id := backend.Hash(data)
// save
sid, err := repo.SaveAndEncrypt(pack.Data, data, nil, false)
sid, err := repo.SaveAndEncrypt(pack.Data, data, nil)
OK(t, err)
Equals(t, id, sid)
@ -253,7 +253,7 @@ func saveRandomDataBlobs(t testing.TB, repo *repository.Repository, num int, siz
_, err := io.ReadFull(rand.Reader, buf)
OK(t, err)
_, err = repo.SaveAndEncrypt(pack.Data, buf, nil, false)
_, err = repo.SaveAndEncrypt(pack.Data, buf, nil)
OK(t, err)
}
}