forked from TrueCloudLab/restic
Allow saving duplicate blobs in the repacker
This adds code to the master index to allow saving duplicate blobs within the repacker. In this mode, only the list of currently in flight blobs is consulted, and not the index. This correct because while repacking, a unique list of blobs is saved again to the index.
This commit is contained in:
parent
34c1056efc
commit
f53008d916
4 changed files with 47 additions and 40 deletions
|
@ -136,7 +136,7 @@ func repackBlob(src, dst *repository.Repository, id backend.ID) error {
|
||||||
return errors.New("LoadBlob returned wrong data, len() doesn't match")
|
return errors.New("LoadBlob returned wrong data, len() doesn't match")
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = dst.SaveAndEncrypt(blob.Type, buf, &id)
|
_, err = dst.SaveAndEncrypt(blob.Type, buf, &id, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -155,49 +155,55 @@ func (mi *MasterIndex) Current() *Index {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddInFlight add the given ID to the list of in-flight IDs. An error is
|
// AddInFlight add the given ID to the list of in-flight IDs. An error is
|
||||||
// returned when the ID is already in the list.
|
// returned when the ID is already in the list. Setting ignoreDuplicates to
|
||||||
func (mi *MasterIndex) AddInFlight(id backend.ID) error {
|
// true only checks the in flight list, otherwise the index itself is also
|
||||||
// The index + inFlight store must be searched for a matching id in one
|
// tested.
|
||||||
// atomic operation. This requires locking the inFlight store and the
|
func (mi *MasterIndex) AddInFlight(id backend.ID, ignoreDuplicates bool) error {
|
||||||
// index together!
|
// The index + inFlight store must be searched for a matching id in one
|
||||||
mi.inFlight.Lock()
|
// atomic operation. This requires locking the inFlight store and the
|
||||||
defer mi.inFlight.Unlock()
|
// index together!
|
||||||
|
mi.inFlight.Lock()
|
||||||
|
defer mi.inFlight.Unlock()
|
||||||
|
|
||||||
// Note: mi.Has read locks the index again.
|
if !ignoreDuplicates {
|
||||||
mi.idxMutex.RLock()
|
// Note: mi.Has read locks the index again.
|
||||||
defer mi.idxMutex.RUnlock()
|
mi.idxMutex.RLock()
|
||||||
|
defer mi.idxMutex.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
debug.Log("MasterIndex.AddInFlight", "adding %v", id)
|
debug.Log("MasterIndex.AddInFlight", "adding %v", id.Str())
|
||||||
if mi.inFlight.Has(id) {
|
if mi.inFlight.Has(id) {
|
||||||
return fmt.Errorf("%v is already in flight", id)
|
return fmt.Errorf("%v is already in flight", id.Str())
|
||||||
}
|
}
|
||||||
if mi.Has(id) {
|
|
||||||
return fmt.Errorf("%v is already indexed (fully processed)", id)
|
|
||||||
}
|
|
||||||
|
|
||||||
mi.inFlight.Insert(id)
|
if !ignoreDuplicates {
|
||||||
return nil
|
if mi.Has(id) {
|
||||||
|
return fmt.Errorf("%v is already indexed (fully processed)", id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mi.inFlight.Insert(id)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsInFlight returns true iff the id is contained in the list of in-flight IDs.
|
// IsInFlight returns true iff the id is contained in the list of in-flight IDs.
|
||||||
func (mi *MasterIndex) IsInFlight(id backend.ID) bool {
|
func (mi *MasterIndex) IsInFlight(id backend.ID) bool {
|
||||||
// The index + inFlight store must be searched for a matching id in one
|
// The index + inFlight store must be searched for a matching id in one
|
||||||
// atomic operation. This requires locking the inFlight store and the
|
// atomic operation. This requires locking the inFlight store and the
|
||||||
// index together!
|
// index together!
|
||||||
mi.inFlight.RLock()
|
mi.inFlight.RLock()
|
||||||
defer mi.inFlight.RUnlock()
|
defer mi.inFlight.RUnlock()
|
||||||
|
|
||||||
// Note: mi.Has read locks the index again.
|
mi.idxMutex.RLock()
|
||||||
mi.idxMutex.RLock()
|
defer mi.idxMutex.RUnlock()
|
||||||
defer mi.idxMutex.RUnlock()
|
|
||||||
|
|
||||||
inFlight := mi.inFlight.Has(id)
|
inFlight := mi.inFlight.Has(id)
|
||||||
debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight)
|
debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight)
|
||||||
|
|
||||||
indexed := mi.Has(id)
|
indexed := mi.Has(id)
|
||||||
debug.Log("MasterIndex.IsInFlight", "testing whether %v is indexed (fully processed): %v", id.Str(), indexed)
|
debug.Log("MasterIndex.IsInFlight", "testing whether %v is indexed (fully processed): %v", id.Str(), indexed)
|
||||||
|
|
||||||
return inFlight
|
return inFlight
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveFromInFlight deletes the given ID from the liste of in-flight IDs.
|
// RemoveFromInFlight deletes the given ID from the liste of in-flight IDs.
|
||||||
|
|
|
@ -219,8 +219,9 @@ func (r *Repository) LookupBlobSize(id backend.ID) (uint, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveAndEncrypt encrypts data and stores it to the backend as type t. If data is small
|
// SaveAndEncrypt encrypts data and stores it to the backend as type t. If data is small
|
||||||
// enough, it will be packed together with other small blobs.
|
// enough, it will be packed together with other small blobs. When
|
||||||
func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID) (backend.ID, error) {
|
// ignoreDuplicates is true, blobs already in the index will be saved again.
|
||||||
|
func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID, ignoreDuplicates bool) (backend.ID, error) {
|
||||||
if id == nil {
|
if id == nil {
|
||||||
// compute plaintext hash
|
// compute plaintext hash
|
||||||
hashedID := backend.Hash(data)
|
hashedID := backend.Hash(data)
|
||||||
|
@ -241,7 +242,7 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID
|
||||||
|
|
||||||
// add this id to the list of in-flight chunk ids.
|
// add this id to the list of in-flight chunk ids.
|
||||||
debug.Log("Repo.Save", "add %v to list of in-flight IDs", id.Str())
|
debug.Log("Repo.Save", "add %v to list of in-flight IDs", id.Str())
|
||||||
err = r.idx.AddInFlight(*id)
|
err = r.idx.AddInFlight(*id, ignoreDuplicates)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t)
|
debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t)
|
||||||
return *id, nil
|
return *id, nil
|
||||||
|
@ -284,7 +285,7 @@ func (r *Repository) SaveFrom(t pack.BlobType, id *backend.ID, length uint, rd i
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = r.SaveAndEncrypt(t, buf, id)
|
_, err = r.SaveAndEncrypt(t, buf, id, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -308,7 +309,7 @@ func (r *Repository) SaveJSON(t pack.BlobType, item interface{}) (backend.ID, er
|
||||||
}
|
}
|
||||||
|
|
||||||
buf = wr.Bytes()
|
buf = wr.Bytes()
|
||||||
return r.SaveAndEncrypt(t, buf, nil)
|
return r.SaveAndEncrypt(t, buf, nil, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the
|
// SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the
|
||||||
|
|
|
@ -83,7 +83,7 @@ func TestSave(t *testing.T) {
|
||||||
id := backend.Hash(data)
|
id := backend.Hash(data)
|
||||||
|
|
||||||
// save
|
// save
|
||||||
sid, err := repo.SaveAndEncrypt(pack.Data, data, nil)
|
sid, err := repo.SaveAndEncrypt(pack.Data, data, nil, false)
|
||||||
OK(t, err)
|
OK(t, err)
|
||||||
|
|
||||||
Equals(t, id, sid)
|
Equals(t, id, sid)
|
||||||
|
@ -253,7 +253,7 @@ func saveRandomDataBlobs(t testing.TB, repo *repository.Repository, num int, siz
|
||||||
_, err := io.ReadFull(rand.Reader, buf)
|
_, err := io.ReadFull(rand.Reader, buf)
|
||||||
OK(t, err)
|
OK(t, err)
|
||||||
|
|
||||||
_, err = repo.SaveAndEncrypt(pack.Data, buf, nil)
|
_, err = repo.SaveAndEncrypt(pack.Data, buf, nil, false)
|
||||||
OK(t, err)
|
OK(t, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue