From f5f6e9cf37ef1d2b00dfa577abcf9166b8c0fd7e Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 1 Feb 2016 23:26:57 +0100 Subject: [PATCH 1/3] Add test to reproduce #405 --- repository/master_index_test.go | 146 ++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 repository/master_index_test.go diff --git a/repository/master_index_test.go b/repository/master_index_test.go new file mode 100644 index 000000000..923eef338 --- /dev/null +++ b/repository/master_index_test.go @@ -0,0 +1,146 @@ +package repository + +import ( + "bytes" + "crypto/rand" + "errors" + "io" + mrand "math/rand" + "sync" + "testing" + "time" + + "github.com/restic/restic/backend" + "github.com/restic/restic/pack" +) + +const parallelSaves = 20 +const saveIndexTime = 100 * time.Millisecond +const testTimeout = 1 * time.Second + +var DupID backend.ID + +func randomID() backend.ID { + if mrand.Float32() < 0.5 { + return DupID + } + + id := backend.ID{} + _, err := io.ReadFull(rand.Reader, id[:]) + if err != nil { + panic(err) + } + return id +} + +// forgetfulBackend returns a backend that forgets everything. +func forgetfulBackend() backend.Backend { + be := &backend.MockBackend{} + + be.TestFn = func(t backend.Type, name string) (bool, error) { + return false, nil + } + + be.LoadFn = func(h backend.Handle, p []byte, off int64) (int, error) { + return 0, errors.New("not found") + } + + be.SaveFn = func(h backend.Handle, p []byte) error { + return nil + } + + be.StatFn = func(h backend.Handle) (backend.BlobInfo, error) { + return backend.BlobInfo{}, errors.New("not found") + } + + be.RemoveFn = func(t backend.Type, name string) error { + return nil + } + + be.ListFn = func(t backend.Type, done <-chan struct{}) <-chan string { + ch := make(chan string) + close(ch) + return ch + } + + be.DeleteFn = func() error { + return nil + } + + return be +} + +func testMasterIndex(t *testing.T) { + _, err := io.ReadFull(rand.Reader, DupID[:]) + if err != nil { + t.Fatal(err) + } + + repo := New(forgetfulBackend()) + err = repo.Init("foo") + if err != nil { + t.Fatal(err) + } + + wg := &sync.WaitGroup{} + done := make(chan struct{}) + for i := 0; i < parallelSaves; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-done: + return + default: + } + + id := randomID() + + if repo.Index().Has(id) { + continue + } + + buf := make([]byte, 50) + + err := repo.SaveFrom(pack.Data, &id, uint(len(buf)), bytes.NewReader(buf)) + if err != nil { + t.Fatal(err) + } + } + }() + } + + saveIndexes := func() { + defer wg.Done() + + ticker := time.NewTicker(saveIndexTime) + defer ticker.Stop() + + for { + select { + case <-done: + return + case <-ticker.C: + err := repo.SaveFullIndex() + if err != nil { + t.Fatal(err) + } + } + } + } + + wg.Add(1) + go saveIndexes() + + <-time.After(testTimeout) + close(done) + + wg.Wait() +} + +func TestMasterIndex(t *testing.T) { + for i := 0; i < 5; i++ { + testMasterIndex(t) + } +} From 382c766983adb8d7d7fc1e283cc841a2097817ba Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 1 Feb 2016 23:48:36 +0100 Subject: [PATCH 2/3] Move test for #405: Test Archiver instead of Repo --- ...ex_test.go => archiver_duplication_test.go | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) rename repository/master_index_test.go => archiver_duplication_test.go (78%) diff --git a/repository/master_index_test.go b/archiver_duplication_test.go similarity index 78% rename from repository/master_index_test.go rename to archiver_duplication_test.go index 923eef338..f3e4c5475 100644 --- a/repository/master_index_test.go +++ b/archiver_duplication_test.go @@ -1,4 +1,4 @@ -package repository +package restic_test import ( "bytes" @@ -10,13 +10,15 @@ import ( "testing" "time" + "github.com/restic/restic" "github.com/restic/restic/backend" "github.com/restic/restic/pack" + "github.com/restic/restic/repository" ) -const parallelSaves = 20 -const saveIndexTime = 100 * time.Millisecond -const testTimeout = 1 * time.Second +const parallelSaves = 50 +const testSaveIndexTime = 100 * time.Millisecond +const testTimeout = 2 * time.Second var DupID backend.ID @@ -70,18 +72,20 @@ func forgetfulBackend() backend.Backend { return be } -func testMasterIndex(t *testing.T) { +func testArchiverDuplication(t *testing.T) { _, err := io.ReadFull(rand.Reader, DupID[:]) if err != nil { t.Fatal(err) } - repo := New(forgetfulBackend()) + repo := repository.New(forgetfulBackend()) err = repo.Init("foo") if err != nil { t.Fatal(err) } + arch := restic.NewArchiver(repo) + wg := &sync.WaitGroup{} done := make(chan struct{}) for i := 0; i < parallelSaves; i++ { @@ -103,7 +107,7 @@ func testMasterIndex(t *testing.T) { buf := make([]byte, 50) - err := repo.SaveFrom(pack.Data, &id, uint(len(buf)), bytes.NewReader(buf)) + err := arch.Save(pack.Data, id, uint(len(buf)), bytes.NewReader(buf)) if err != nil { t.Fatal(err) } @@ -111,10 +115,10 @@ func testMasterIndex(t *testing.T) { }() } - saveIndexes := func() { + saveIndex := func() { defer wg.Done() - ticker := time.NewTicker(saveIndexTime) + ticker := time.NewTicker(testSaveIndexTime) defer ticker.Stop() for { @@ -131,7 +135,7 @@ func testMasterIndex(t *testing.T) { } wg.Add(1) - go saveIndexes() + go saveIndex() <-time.After(testTimeout) close(done) @@ -139,8 +143,8 @@ func testMasterIndex(t *testing.T) { wg.Wait() } -func TestMasterIndex(t *testing.T) { +func TestArchiverDuplication(t *testing.T) { for i := 0; i < 5; i++ { - testMasterIndex(t) + testArchiverDuplication(t) } } From 4f1f03cdb92ddeff84fef6ee8ee974f9392d9f90 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 1 Feb 2016 23:50:56 +0100 Subject: [PATCH 3/3] Move testing for known blobs to Archiver This removes the list of in-flight blobs from the master index and instead keeps a list of "known" blobs in the Archiver. "known" here means: either already processed, or included in an index. This property is tested atomically, when the blob is not in the list of "known" blobs, it is added to the list and the caller is responsible to make this happen (i.e. save the blob). --- archiver.go | 42 ++++++++++++++++--- checker/repacker.go | 2 +- repository/master_index.go | 76 +---------------------------------- repository/packer_manager.go | 1 - repository/repository.go | 20 +++------ repository/repository_test.go | 4 +- 6 files changed, 45 insertions(+), 100 deletions(-) diff --git a/archiver.go b/archiver.go index 529dd7cdb..23376e656 100644 --- a/archiver.go +++ b/archiver.go @@ -31,7 +31,11 @@ var archiverAllowAllFiles = func(string, os.FileInfo) bool { return true } // Archiver is used to backup a set of directories. type Archiver struct { - repo *repository.Repository + repo *repository.Repository + knownBlobs struct { + backend.IDSet + sync.Mutex + } blobToken chan struct{} @@ -45,6 +49,12 @@ func NewArchiver(repo *repository.Repository) *Archiver { arch := &Archiver{ repo: repo, blobToken: make(chan struct{}, maxConcurrentBlobs), + knownBlobs: struct { + backend.IDSet + sync.Mutex + }{ + IDSet: backend.NewIDSet(), + }, } for i := 0; i < maxConcurrentBlobs; i++ { @@ -57,17 +67,37 @@ func NewArchiver(repo *repository.Repository) *Archiver { return arch } +// isKnownBlob returns true iff the blob is not yet in the list of known blobs. +// When the blob is not known, false is returned and the blob is added to the +// list. This means that the caller false is returned to is responsible to save +// the blob to the backend. +func (arch *Archiver) isKnownBlob(id backend.ID) bool { + arch.knownBlobs.Lock() + defer arch.knownBlobs.Unlock() + + if arch.knownBlobs.Has(id) { + return true + } + + arch.knownBlobs.Insert(id) + + _, err := arch.repo.Index().Lookup(id) + if err == nil { + return true + } + + return false +} + // Save stores a blob read from rd in the repository. func (arch *Archiver) Save(t pack.BlobType, id backend.ID, length uint, rd io.Reader) error { debug.Log("Archiver.Save", "Save(%v, %v)\n", t, id.Str()) - // test if this blob is already known - if arch.repo.Index().Has(id) { - debug.Log("Archiver.Save", "(%v, %v) already saved\n", t, id.Str()) + if arch.isKnownBlob(id) { + debug.Log("Archiver.Save", "blob %v is known\n", id.Str()) return nil } - // otherwise save blob err := arch.repo.SaveFrom(t, &id, length, rd) if err != nil { debug.Log("Archiver.Save", "Save(%v, %v): error %v\n", t, id.Str(), err) @@ -88,7 +118,7 @@ func (arch *Archiver) SaveTreeJSON(item interface{}) (backend.ID, error) { // check if tree has been saved before id := backend.Hash(data) - if arch.repo.Index().IsInFlight(id) || arch.repo.Index().Has(id) { + if arch.isKnownBlob(id) { return id, nil } diff --git a/checker/repacker.go b/checker/repacker.go index e56522714..f3b158d0c 100644 --- a/checker/repacker.go +++ b/checker/repacker.go @@ -136,7 +136,7 @@ func repackBlob(src, dst *repository.Repository, id backend.ID) error { return errors.New("LoadBlob returned wrong data, len() doesn't match") } - _, err = dst.SaveAndEncrypt(blob.Type, buf, &id, true) + _, err = dst.SaveAndEncrypt(blob.Type, buf, &id) if err != nil { return err } diff --git a/repository/master_index.go b/repository/master_index.go index 37d3729a8..432754c53 100644 --- a/repository/master_index.go +++ b/repository/master_index.go @@ -13,23 +13,11 @@ import ( type MasterIndex struct { idx []*Index idxMutex sync.RWMutex - - inFlight struct { - backend.IDSet - sync.RWMutex - } } // NewMasterIndex creates a new master index. func NewMasterIndex() *MasterIndex { - return &MasterIndex{ - inFlight: struct { - backend.IDSet - sync.RWMutex - }{ - IDSet: backend.NewIDSet(), - }, - } + return &MasterIndex{} } // Lookup queries all known Indexes for the ID and returns the first match. @@ -154,68 +142,6 @@ func (mi *MasterIndex) Current() *Index { return newIdx } -// AddInFlight add the given ID to the list of in-flight IDs. An error is -// returned when the ID is already in the list. Setting ignoreDuplicates to -// true only checks the in flight list, otherwise the index itself is also -// tested. -func (mi *MasterIndex) AddInFlight(id backend.ID, ignoreDuplicates bool) error { - // The index + inFlight store must be searched for a matching id in one - // atomic operation. This requires locking the inFlight store and the - // index together! - mi.inFlight.Lock() - defer mi.inFlight.Unlock() - - if !ignoreDuplicates { - // Note: mi.Has read locks the index again. - mi.idxMutex.RLock() - defer mi.idxMutex.RUnlock() - } - - debug.Log("MasterIndex.AddInFlight", "adding %v", id.Str()) - if mi.inFlight.Has(id) { - return fmt.Errorf("%v is already in flight", id.Str()) - } - - if !ignoreDuplicates { - if mi.Has(id) { - return fmt.Errorf("%v is already indexed (fully processed)", id) - } - } - - mi.inFlight.Insert(id) - return nil -} - -// IsInFlight returns true iff the id is contained in the list of in-flight IDs. -func (mi *MasterIndex) IsInFlight(id backend.ID) bool { - // The index + inFlight store must be searched for a matching id in one - // atomic operation. This requires locking the inFlight store and the - // index together! - mi.inFlight.RLock() - defer mi.inFlight.RUnlock() - - mi.idxMutex.RLock() - defer mi.idxMutex.RUnlock() - - inFlight := mi.inFlight.Has(id) - debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight) - - indexed := mi.Has(id) - debug.Log("MasterIndex.IsInFlight", "testing whether %v is indexed (fully processed): %v", id.Str(), indexed) - - return inFlight -} - -// RemoveFromInFlight deletes the given ID from the liste of in-flight IDs. -func (mi *MasterIndex) RemoveFromInFlight(id backend.ID) { - mi.inFlight.Lock() - defer mi.inFlight.Unlock() - - debug.Log("MasterIndex.RemoveFromInFlight", "removing %v from list of in flight blobs", id.Str()) - - mi.inFlight.Delete(id) -} - // NotFinalIndexes returns all indexes that have not yet been saved. func (mi *MasterIndex) NotFinalIndexes() []*Index { mi.idxMutex.Lock() diff --git a/repository/packer_manager.go b/repository/packer_manager.go index 42ffe96cb..4e9ea4bc0 100644 --- a/repository/packer_manager.go +++ b/repository/packer_manager.go @@ -84,7 +84,6 @@ func (r *Repository) savePacker(p *pack.Packer) error { Offset: b.Offset, Length: uint(b.Length), }) - r.idx.RemoveFromInFlight(b.ID) } return nil diff --git a/repository/repository.go b/repository/repository.go index 88f289338..27fe5d9c6 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -167,10 +167,9 @@ func (r *Repository) LookupBlobSize(id backend.ID) (uint, error) { return r.idx.LookupSize(id) } -// SaveAndEncrypt encrypts data and stores it to the backend as type t. If data is small -// enough, it will be packed together with other small blobs. When -// ignoreDuplicates is true, blobs already in the index will be saved again. -func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID, ignoreDuplicates bool) (backend.ID, error) { +// SaveAndEncrypt encrypts data and stores it to the backend as type t. If data +// is small enough, it will be packed together with other small blobs. +func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID) (backend.ID, error) { if id == nil { // compute plaintext hash hashedID := backend.Hash(data) @@ -189,18 +188,9 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID return backend.ID{}, err } - // add this id to the list of in-flight chunk ids. - debug.Log("Repo.Save", "add %v to list of in-flight IDs", id.Str()) - err = r.idx.AddInFlight(*id, ignoreDuplicates) - if err != nil { - debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t) - return *id, nil - } - // find suitable packer and add blob packer, err := r.findPacker(uint(len(ciphertext))) if err != nil { - r.idx.RemoveFromInFlight(*id) return backend.ID{}, err } @@ -234,7 +224,7 @@ func (r *Repository) SaveFrom(t pack.BlobType, id *backend.ID, length uint, rd i return err } - _, err = r.SaveAndEncrypt(t, buf, id, false) + _, err = r.SaveAndEncrypt(t, buf, id) if err != nil { return err } @@ -258,7 +248,7 @@ func (r *Repository) SaveJSON(t pack.BlobType, item interface{}) (backend.ID, er } buf = wr.Bytes() - return r.SaveAndEncrypt(t, buf, nil, false) + return r.SaveAndEncrypt(t, buf, nil) } // SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the diff --git a/repository/repository_test.go b/repository/repository_test.go index 4e3659af2..81378742b 100644 --- a/repository/repository_test.go +++ b/repository/repository_test.go @@ -83,7 +83,7 @@ func TestSave(t *testing.T) { id := backend.Hash(data) // save - sid, err := repo.SaveAndEncrypt(pack.Data, data, nil, false) + sid, err := repo.SaveAndEncrypt(pack.Data, data, nil) OK(t, err) Equals(t, id, sid) @@ -253,7 +253,7 @@ func saveRandomDataBlobs(t testing.TB, repo *repository.Repository, num int, siz _, err := io.ReadFull(rand.Reader, buf) OK(t, err) - _, err = repo.SaveAndEncrypt(pack.Data, buf, nil, false) + _, err = repo.SaveAndEncrypt(pack.Data, buf, nil) OK(t, err) } }