forked from TrueCloudLab/restic
Split index/repack functions to different files
This commit is contained in:
parent
bdd085e9f1
commit
34b3e3a095
4 changed files with 173 additions and 158 deletions
|
@ -1,91 +1,14 @@
|
||||||
package repository
|
package repository
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"os"
|
"os"
|
||||||
"restic/backend"
|
"restic/backend"
|
||||||
"restic/crypto"
|
|
||||||
"restic/debug"
|
"restic/debug"
|
||||||
"restic/pack"
|
"restic/pack"
|
||||||
"restic/worker"
|
"restic/worker"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Repack takes a list of packs together with a list of blobs contained in
|
|
||||||
// these packs. Each pack is loaded and the blobs listed in keepBlobs is saved
|
|
||||||
// into a new pack. Afterwards, the packs are removed. This operation requires
|
|
||||||
// an exclusive lock on the repo.
|
|
||||||
func Repack(repo *Repository, packs, keepBlobs backend.IDSet) (err error) {
|
|
||||||
debug.Log("Repack", "repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
|
|
||||||
|
|
||||||
buf := make([]byte, 0, maxPackSize)
|
|
||||||
for packID := range packs {
|
|
||||||
// load the complete pack
|
|
||||||
h := backend.Handle{Type: backend.Data, Name: packID.String()}
|
|
||||||
|
|
||||||
l, err := repo.Backend().Load(h, buf[:cap(buf)], 0)
|
|
||||||
if err == io.ErrUnexpectedEOF {
|
|
||||||
err = nil
|
|
||||||
buf = buf[:l]
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
debug.Log("Repack", "pack %v loaded (%d bytes)", packID.Str(), len(buf))
|
|
||||||
|
|
||||||
unpck, err := pack.NewUnpacker(repo.Key(), bytes.NewReader(buf))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
debug.Log("Repack", "processing pack %v, blobs: %v", packID.Str(), len(unpck.Entries))
|
|
||||||
var plaintext []byte
|
|
||||||
for _, entry := range unpck.Entries {
|
|
||||||
if !keepBlobs.Has(entry.ID) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
ciphertext := buf[entry.Offset : entry.Offset+entry.Length]
|
|
||||||
|
|
||||||
if cap(plaintext) < len(ciphertext) {
|
|
||||||
plaintext = make([]byte, len(ciphertext))
|
|
||||||
}
|
|
||||||
|
|
||||||
plaintext, err = crypto.Decrypt(repo.Key(), plaintext, ciphertext)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = repo.SaveAndEncrypt(entry.Type, plaintext, &entry.ID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
debug.Log("Repack", " saved blob %v", entry.ID.Str())
|
|
||||||
|
|
||||||
keepBlobs.Delete(entry.ID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := repo.Flush(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for packID := range packs {
|
|
||||||
err := repo.Backend().Remove(backend.Data, packID.String())
|
|
||||||
if err != nil {
|
|
||||||
debug.Log("Repack", "error removing pack %v: %v", packID.Str(), err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
debug.Log("Repack", "removed pack %v", packID.Str())
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
const rebuildIndexWorkers = 10
|
const rebuildIndexWorkers = 10
|
||||||
|
|
||||||
type loadBlobsResult struct {
|
type loadBlobsResult struct {
|
|
@ -1,4 +1,4 @@
|
||||||
package repository_test
|
package repository
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
@ -110,83 +110,3 @@ func findPacksForBlobs(t *testing.T, repo *repository.Repository, blobs backend.
|
||||||
|
|
||||||
return packs
|
return packs
|
||||||
}
|
}
|
||||||
|
|
||||||
func repack(t *testing.T, repo *repository.Repository, packs, blobs backend.IDSet) {
|
|
||||||
err := repository.Repack(repo, packs, blobs)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func saveIndex(t *testing.T, repo *repository.Repository) {
|
|
||||||
if err := repo.SaveIndex(); err != nil {
|
|
||||||
t.Fatalf("repo.SaveIndex() %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func rebuildIndex(t *testing.T, repo *repository.Repository) {
|
|
||||||
if err := repository.RebuildIndex(repo); err != nil {
|
|
||||||
t.Fatalf("error rebuilding index: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func reloadIndex(t *testing.T, repo *repository.Repository) {
|
|
||||||
repo.SetIndex(repository.NewMasterIndex())
|
|
||||||
if err := repo.LoadIndex(); err != nil {
|
|
||||||
t.Fatalf("error loading new index: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRepack(t *testing.T) {
|
|
||||||
repo, cleanup := repository.TestRepository(t)
|
|
||||||
defer cleanup()
|
|
||||||
|
|
||||||
createRandomBlobs(t, repo, rand.Intn(400), 0.7)
|
|
||||||
|
|
||||||
packsBefore := listPacks(t, repo)
|
|
||||||
|
|
||||||
// Running repack on empty ID sets should not do anything at all.
|
|
||||||
repack(t, repo, nil, nil)
|
|
||||||
|
|
||||||
packsAfter := listPacks(t, repo)
|
|
||||||
|
|
||||||
if !packsAfter.Equals(packsBefore) {
|
|
||||||
t.Fatalf("packs are not equal, Repack modified something. Before:\n %v\nAfter:\n %v",
|
|
||||||
packsBefore, packsAfter)
|
|
||||||
}
|
|
||||||
|
|
||||||
saveIndex(t, repo)
|
|
||||||
|
|
||||||
removeBlobs, keepBlobs := selectBlobs(t, repo, 0.2)
|
|
||||||
|
|
||||||
removePacks := findPacksForBlobs(t, repo, removeBlobs)
|
|
||||||
|
|
||||||
repack(t, repo, removePacks, keepBlobs)
|
|
||||||
rebuildIndex(t, repo)
|
|
||||||
reloadIndex(t, repo)
|
|
||||||
|
|
||||||
packsAfter = listPacks(t, repo)
|
|
||||||
for id := range removePacks {
|
|
||||||
if packsAfter.Has(id) {
|
|
||||||
t.Errorf("pack %v still present although it should have been repacked and removed", id.Str())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
idx := repo.Index()
|
|
||||||
for id := range keepBlobs {
|
|
||||||
pb, err := idx.Lookup(id)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unable to find blob %v in repo", id.Str())
|
|
||||||
}
|
|
||||||
|
|
||||||
if removePacks.Has(pb.PackID) {
|
|
||||||
t.Errorf("lookup returned pack ID %v that should've been removed", pb.PackID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for id := range removeBlobs {
|
|
||||||
if _, err := idx.Lookup(id); err == nil {
|
|
||||||
t.Errorf("blob %v still contained in the repo", id.Str())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
84
src/restic/repository/repack.go
Normal file
84
src/restic/repository/repack.go
Normal file
|
@ -0,0 +1,84 @@
|
||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"restic/backend"
|
||||||
|
"restic/crypto"
|
||||||
|
"restic/debug"
|
||||||
|
"restic/pack"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Repack takes a list of packs together with a list of blobs contained in
|
||||||
|
// these packs. Each pack is loaded and the blobs listed in keepBlobs is saved
|
||||||
|
// into a new pack. Afterwards, the packs are removed. This operation requires
|
||||||
|
// an exclusive lock on the repo.
|
||||||
|
func Repack(repo *Repository, packs, keepBlobs backend.IDSet) (err error) {
|
||||||
|
debug.Log("Repack", "repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
|
||||||
|
|
||||||
|
buf := make([]byte, 0, maxPackSize)
|
||||||
|
for packID := range packs {
|
||||||
|
// load the complete pack
|
||||||
|
h := backend.Handle{Type: backend.Data, Name: packID.String()}
|
||||||
|
|
||||||
|
l, err := repo.Backend().Load(h, buf[:cap(buf)], 0)
|
||||||
|
if err == io.ErrUnexpectedEOF {
|
||||||
|
err = nil
|
||||||
|
buf = buf[:l]
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
debug.Log("Repack", "pack %v loaded (%d bytes)", packID.Str(), len(buf))
|
||||||
|
|
||||||
|
unpck, err := pack.NewUnpacker(repo.Key(), bytes.NewReader(buf))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
debug.Log("Repack", "processing pack %v, blobs: %v", packID.Str(), len(unpck.Entries))
|
||||||
|
var plaintext []byte
|
||||||
|
for _, entry := range unpck.Entries {
|
||||||
|
if !keepBlobs.Has(entry.ID) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ciphertext := buf[entry.Offset : entry.Offset+entry.Length]
|
||||||
|
|
||||||
|
if cap(plaintext) < len(ciphertext) {
|
||||||
|
plaintext = make([]byte, len(ciphertext))
|
||||||
|
}
|
||||||
|
|
||||||
|
plaintext, err = crypto.Decrypt(repo.Key(), plaintext, ciphertext)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = repo.SaveAndEncrypt(entry.Type, plaintext, &entry.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
debug.Log("Repack", " saved blob %v", entry.ID.Str())
|
||||||
|
|
||||||
|
keepBlobs.Delete(entry.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := repo.Flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for packID := range packs {
|
||||||
|
err := repo.Backend().Remove(backend.Data, packID.String())
|
||||||
|
if err != nil {
|
||||||
|
debug.Log("Repack", "error removing pack %v: %v", packID.Str(), err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
debug.Log("Repack", "removed pack %v", packID.Str())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
88
src/restic/repository/repack_test.go
Normal file
88
src/restic/repository/repack_test.go
Normal file
|
@ -0,0 +1,88 @@
|
||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"restic/backend"
|
||||||
|
"restic/repository"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func repack(t *testing.T, repo *repository.Repository, packs, blobs backend.IDSet) {
|
||||||
|
err := repository.Repack(repo, packs, blobs)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveIndex(t *testing.T, repo *repository.Repository) {
|
||||||
|
if err := repo.SaveIndex(); err != nil {
|
||||||
|
t.Fatalf("repo.SaveIndex() %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func rebuildIndex(t *testing.T, repo *repository.Repository) {
|
||||||
|
if err := repository.RebuildIndex(repo); err != nil {
|
||||||
|
t.Fatalf("error rebuilding index: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func reloadIndex(t *testing.T, repo *repository.Repository) {
|
||||||
|
repo.SetIndex(repository.NewMasterIndex())
|
||||||
|
if err := repo.LoadIndex(); err != nil {
|
||||||
|
t.Fatalf("error loading new index: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRepack(t *testing.T) {
|
||||||
|
repo, cleanup := repository.TestRepository(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
createRandomBlobs(t, repo, rand.Intn(400), 0.7)
|
||||||
|
|
||||||
|
packsBefore := listPacks(t, repo)
|
||||||
|
|
||||||
|
// Running repack on empty ID sets should not do anything at all.
|
||||||
|
repack(t, repo, nil, nil)
|
||||||
|
|
||||||
|
packsAfter := listPacks(t, repo)
|
||||||
|
|
||||||
|
if !packsAfter.Equals(packsBefore) {
|
||||||
|
t.Fatalf("packs are not equal, Repack modified something. Before:\n %v\nAfter:\n %v",
|
||||||
|
packsBefore, packsAfter)
|
||||||
|
}
|
||||||
|
|
||||||
|
saveIndex(t, repo)
|
||||||
|
|
||||||
|
removeBlobs, keepBlobs := selectBlobs(t, repo, 0.2)
|
||||||
|
|
||||||
|
removePacks := findPacksForBlobs(t, repo, removeBlobs)
|
||||||
|
|
||||||
|
repack(t, repo, removePacks, keepBlobs)
|
||||||
|
rebuildIndex(t, repo)
|
||||||
|
reloadIndex(t, repo)
|
||||||
|
|
||||||
|
packsAfter = listPacks(t, repo)
|
||||||
|
for id := range removePacks {
|
||||||
|
if packsAfter.Has(id) {
|
||||||
|
t.Errorf("pack %v still present although it should have been repacked and removed", id.Str())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
idx := repo.Index()
|
||||||
|
for id := range keepBlobs {
|
||||||
|
pb, err := idx.Lookup(id)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unable to find blob %v in repo", id.Str())
|
||||||
|
}
|
||||||
|
|
||||||
|
if removePacks.Has(pb.PackID) {
|
||||||
|
t.Errorf("lookup returned pack ID %v that should've been removed", pb.PackID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for id := range removeBlobs {
|
||||||
|
if _, err := idx.Lookup(id); err == nil {
|
||||||
|
t.Errorf("blob %v still contained in the repo", id.Str())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue