forked from TrueCloudLab/restic
Merge pull request #324 from restic/fix-index-grow
Fix really large index and memory exhaustion
This commit is contained in:
commit
566fb22bcf
18 changed files with 581 additions and 95 deletions
|
@ -39,6 +39,27 @@ func (s IDSet) List() IDs {
|
|||
return list
|
||||
}
|
||||
|
||||
// Equals returns true iff s equals other.
|
||||
func (s IDSet) Equals(other IDSet) bool {
|
||||
if len(s) != len(other) {
|
||||
return false
|
||||
}
|
||||
|
||||
for id := range s {
|
||||
if _, ok := other[id]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
for id := range other {
|
||||
if _, ok := s[id]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (s IDSet) String() string {
|
||||
str := s.List().String()
|
||||
if len(str) < 2 {
|
||||
|
|
|
@ -223,9 +223,11 @@ func (b *Local) GetReader(t backend.Type, name string, offset, length uint) (io.
|
|||
b.open[filename(b.p, t, name)] = append(open, f)
|
||||
b.mu.Unlock()
|
||||
|
||||
_, err = f.Seek(int64(offset), 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if offset > 0 {
|
||||
_, err = f.Seek(int64(offset), 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if length == 0 {
|
||||
|
|
|
@ -336,9 +336,11 @@ func (r *SFTP) GetReader(t backend.Type, name string, offset, length uint) (io.R
|
|||
return nil, err
|
||||
}
|
||||
|
||||
_, err = f.Seek(int64(offset), 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if offset > 0 {
|
||||
_, err = f.Seek(int64(offset), 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if length == 0 {
|
||||
|
|
|
@ -49,8 +49,18 @@ func New(repo *repository.Repository) *Checker {
|
|||
|
||||
const defaultParallelism = 40
|
||||
|
||||
// ErrDuplicatePacks is returned when a pack is found in more than one index.
|
||||
type ErrDuplicatePacks struct {
|
||||
PackID backend.ID
|
||||
Indexes backend.IDSet
|
||||
}
|
||||
|
||||
func (e ErrDuplicatePacks) Error() string {
|
||||
return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID.Str(), e.Indexes)
|
||||
}
|
||||
|
||||
// LoadIndex loads all index files.
|
||||
func (c *Checker) LoadIndex() error {
|
||||
func (c *Checker) LoadIndex() (hints []error, errs []error) {
|
||||
debug.Log("LoadIndex", "Start")
|
||||
type indexRes struct {
|
||||
Index *repository.Index
|
||||
|
@ -97,14 +107,22 @@ func (c *Checker) LoadIndex() error {
|
|||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
|
||||
if perr != nil {
|
||||
errs = append(errs, perr)
|
||||
return hints, errs
|
||||
}
|
||||
|
||||
packToIndex := make(map[backend.ID]backend.IDSet)
|
||||
|
||||
for res := range indexCh {
|
||||
debug.Log("LoadIndex", "process index %v", res.ID)
|
||||
id, err := backend.ParseID(res.ID)
|
||||
idxID, err := backend.ParseID(res.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
errs = append(errs, fmt.Errorf("unable to parse as index ID: %v", res.ID))
|
||||
continue
|
||||
}
|
||||
|
||||
c.indexes[id] = res.Index
|
||||
c.indexes[idxID] = res.Index
|
||||
c.masterIndex.Insert(res.Index)
|
||||
|
||||
debug.Log("LoadIndex", "process blobs")
|
||||
|
@ -114,6 +132,11 @@ func (c *Checker) LoadIndex() error {
|
|||
c.blobs[blob.ID] = struct{}{}
|
||||
c.blobRefs.M[blob.ID] = 0
|
||||
cnt++
|
||||
|
||||
if _, ok := packToIndex[blob.PackID]; !ok {
|
||||
packToIndex[blob.PackID] = backend.NewIDSet()
|
||||
}
|
||||
packToIndex[blob.PackID].Insert(idxID)
|
||||
}
|
||||
|
||||
debug.Log("LoadIndex", "%d blobs processed", cnt)
|
||||
|
@ -121,9 +144,20 @@ func (c *Checker) LoadIndex() error {
|
|||
|
||||
debug.Log("LoadIndex", "done, error %v", perr)
|
||||
|
||||
debug.Log("LoadIndex", "checking for duplicate packs")
|
||||
for packID := range c.packs {
|
||||
debug.Log("LoadIndex", " check pack %v: contained in %d indexes", packID.Str(), len(packToIndex[packID]))
|
||||
if len(packToIndex[packID]) > 1 {
|
||||
hints = append(hints, ErrDuplicatePacks{
|
||||
PackID: packID,
|
||||
Indexes: packToIndex[packID],
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
c.repo.SetIndex(c.masterIndex)
|
||||
|
||||
return perr
|
||||
return hints, errs
|
||||
}
|
||||
|
||||
// PackError describes an error with a specific pack.
|
||||
|
|
|
@ -59,7 +59,15 @@ func TestCheckRepo(t *testing.T) {
|
|||
repo := OpenLocalRepo(t, repodir)
|
||||
|
||||
chkr := checker.New(repo)
|
||||
OK(t, chkr.LoadIndex())
|
||||
hints, errs := chkr.LoadIndex()
|
||||
if len(errs) > 0 {
|
||||
t.Fatalf("expected no errors, got %v: %v", len(errs), errs)
|
||||
}
|
||||
|
||||
if len(hints) > 0 {
|
||||
t.Errorf("expected no hints, got %v: %v", len(hints), hints)
|
||||
}
|
||||
|
||||
OKs(t, checkPacks(chkr))
|
||||
OKs(t, checkStruct(chkr))
|
||||
})
|
||||
|
@ -73,8 +81,16 @@ func TestMissingPack(t *testing.T) {
|
|||
OK(t, repo.Backend().Remove(backend.Data, packID))
|
||||
|
||||
chkr := checker.New(repo)
|
||||
OK(t, chkr.LoadIndex())
|
||||
errs := checkPacks(chkr)
|
||||
hints, errs := chkr.LoadIndex()
|
||||
if len(errs) > 0 {
|
||||
t.Fatalf("expected no errors, got %v: %v", len(errs), errs)
|
||||
}
|
||||
|
||||
if len(hints) > 0 {
|
||||
t.Errorf("expected no hints, got %v: %v", len(hints), hints)
|
||||
}
|
||||
|
||||
errs = checkPacks(chkr)
|
||||
|
||||
Assert(t, len(errs) == 1,
|
||||
"expected exactly one error, got %v", len(errs))
|
||||
|
@ -97,8 +113,16 @@ func TestUnreferencedPack(t *testing.T) {
|
|||
OK(t, repo.Backend().Remove(backend.Index, indexID))
|
||||
|
||||
chkr := checker.New(repo)
|
||||
OK(t, chkr.LoadIndex())
|
||||
errs := checkPacks(chkr)
|
||||
hints, errs := chkr.LoadIndex()
|
||||
if len(errs) > 0 {
|
||||
t.Fatalf("expected no errors, got %v: %v", len(errs), errs)
|
||||
}
|
||||
|
||||
if len(hints) > 0 {
|
||||
t.Errorf("expected no hints, got %v: %v", len(hints), hints)
|
||||
}
|
||||
|
||||
errs = checkPacks(chkr)
|
||||
|
||||
Assert(t, len(errs) == 1,
|
||||
"expected exactly one error, got %v", len(errs))
|
||||
|
@ -130,7 +154,15 @@ func TestUnreferencedBlobs(t *testing.T) {
|
|||
sort.Sort(unusedBlobsBySnapshot)
|
||||
|
||||
chkr := checker.New(repo)
|
||||
OK(t, chkr.LoadIndex())
|
||||
hints, errs := chkr.LoadIndex()
|
||||
if len(errs) > 0 {
|
||||
t.Fatalf("expected no errors, got %v: %v", len(errs), errs)
|
||||
}
|
||||
|
||||
if len(hints) > 0 {
|
||||
t.Errorf("expected no hints, got %v: %v", len(hints), hints)
|
||||
}
|
||||
|
||||
OKs(t, checkPacks(chkr))
|
||||
OKs(t, checkStruct(chkr))
|
||||
|
||||
|
@ -140,3 +172,35 @@ func TestUnreferencedBlobs(t *testing.T) {
|
|||
Equals(t, unusedBlobsBySnapshot, blobs)
|
||||
})
|
||||
}
|
||||
|
||||
var checkerDuplicateIndexTestData = filepath.Join("testdata", "duplicate-packs-in-index-test-repo.tar.gz")
|
||||
|
||||
func TestDuplicatePacksInIndex(t *testing.T) {
|
||||
WithTestEnvironment(t, checkerDuplicateIndexTestData, func(repodir string) {
|
||||
repo := OpenLocalRepo(t, repodir)
|
||||
|
||||
chkr := checker.New(repo)
|
||||
hints, errs := chkr.LoadIndex()
|
||||
if len(hints) == 0 {
|
||||
t.Fatalf("did not get expected checker hints for duplicate packs in indexes")
|
||||
}
|
||||
|
||||
found := false
|
||||
for _, hint := range hints {
|
||||
if _, ok := hint.(checker.ErrDuplicatePacks); ok {
|
||||
found = true
|
||||
} else {
|
||||
t.Errorf("got unexpected hint: %v", hint)
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
t.Fatalf("did not find hint ErrDuplicatePacks")
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
t.Errorf("expected no errors, got %v: %v", len(errs), errs)
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
|
|
BIN
checker/testdata/duplicate-packs-in-index-test-repo.tar.gz
vendored
Normal file
BIN
checker/testdata/duplicate-packs-in-index-test-repo.tar.gz
vendored
Normal file
Binary file not shown.
|
@ -50,8 +50,25 @@ func (cmd CmdCheck) Execute(args []string) error {
|
|||
chkr := checker.New(repo)
|
||||
|
||||
cmd.global.Verbosef("Load indexes\n")
|
||||
if err = chkr.LoadIndex(); err != nil {
|
||||
return err
|
||||
hints, errs := chkr.LoadIndex()
|
||||
|
||||
dupFound := false
|
||||
for _, hint := range hints {
|
||||
cmd.global.Printf("%v\n", hint)
|
||||
if _, ok := hint.(checker.ErrDuplicatePacks); ok {
|
||||
dupFound = true
|
||||
}
|
||||
}
|
||||
|
||||
if dupFound {
|
||||
cmd.global.Printf("\nrun `restic rebuild-index' to correct this\n")
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
for _, err := range errs {
|
||||
cmd.global.Warnf("error: %v\n", err)
|
||||
}
|
||||
return fmt.Errorf("LoadIndex returned errors")
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
|
|
|
@ -32,7 +32,7 @@ func init() {
|
|||
}
|
||||
|
||||
func (cmd CmdDump) Usage() string {
|
||||
return "[index|snapshots|trees|all]"
|
||||
return "[indexes|snapshots|trees|all]"
|
||||
}
|
||||
|
||||
func prettyPrintJSON(wr io.Writer, item interface{}) error {
|
||||
|
|
188
cmd/restic/cmd_rebuild_index.go
Normal file
188
cmd/restic/cmd_rebuild_index.go
Normal file
|
@ -0,0 +1,188 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/restic/restic/backend"
|
||||
"github.com/restic/restic/debug"
|
||||
"github.com/restic/restic/pack"
|
||||
"github.com/restic/restic/repository"
|
||||
)
|
||||
|
||||
type CmdRebuildIndex struct {
|
||||
global *GlobalOptions
|
||||
|
||||
repo *repository.Repository
|
||||
}
|
||||
|
||||
func init() {
|
||||
_, err := parser.AddCommand("rebuild-index",
|
||||
"rebuild the index",
|
||||
"The rebuild-index command builds a new index",
|
||||
&CmdRebuildIndex{global: &globalOpts})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (cmd CmdRebuildIndex) storeIndex(index *repository.Index) (*repository.Index, error) {
|
||||
debug.Log("RebuildIndex.RebuildIndex", "saving index")
|
||||
|
||||
cmd.global.Printf(" saving new index\n")
|
||||
id, err := repository.SaveIndex(cmd.repo, index)
|
||||
if err != nil {
|
||||
debug.Log("RebuildIndex.RebuildIndex", "error saving index: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
debug.Log("RebuildIndex.RebuildIndex", "index saved as %v", id.Str())
|
||||
index = repository.NewIndex()
|
||||
|
||||
return index, nil
|
||||
}
|
||||
|
||||
func (cmd CmdRebuildIndex) RebuildIndex() error {
|
||||
debug.Log("RebuildIndex.RebuildIndex", "start")
|
||||
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
|
||||
indexIDs := backend.NewIDSet()
|
||||
for id := range cmd.repo.List(backend.Index, done) {
|
||||
indexIDs.Insert(id)
|
||||
}
|
||||
|
||||
cmd.global.Printf("rebuilding index from %d indexes\n", len(indexIDs))
|
||||
|
||||
debug.Log("RebuildIndex.RebuildIndex", "found %v indexes", len(indexIDs))
|
||||
|
||||
combinedIndex := repository.NewIndex()
|
||||
packsDone := backend.NewIDSet()
|
||||
|
||||
i := 0
|
||||
for indexID := range indexIDs {
|
||||
cmd.global.Printf(" loading index %v\n", i)
|
||||
|
||||
debug.Log("RebuildIndex.RebuildIndex", "load index %v", indexID.Str())
|
||||
idx, err := repository.LoadIndex(cmd.repo, indexID.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
debug.Log("RebuildIndex.RebuildIndex", "adding blobs from index %v", indexID.Str())
|
||||
|
||||
for packedBlob := range idx.Each(done) {
|
||||
combinedIndex.Store(packedBlob.Type, packedBlob.ID, packedBlob.PackID, packedBlob.Offset, packedBlob.Length)
|
||||
packsDone.Insert(packedBlob.PackID)
|
||||
}
|
||||
|
||||
combinedIndex.AddToSupersedes(indexID)
|
||||
|
||||
if repository.IndexFull(combinedIndex) {
|
||||
combinedIndex, err = cmd.storeIndex(combinedIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
i++
|
||||
}
|
||||
|
||||
var err error
|
||||
if combinedIndex.Length() > 0 {
|
||||
combinedIndex, err = cmd.storeIndex(combinedIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cmd.global.Printf("removing %d old indexes\n", len(indexIDs))
|
||||
for id := range indexIDs {
|
||||
debug.Log("RebuildIndex.RebuildIndex", "remove index %v", id.Str())
|
||||
|
||||
err := cmd.repo.Backend().Remove(backend.Index, id.String())
|
||||
if err != nil {
|
||||
debug.Log("RebuildIndex.RebuildIndex", "error removing index %v: %v", id.Str(), err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cmd.global.Printf("checking for additional packs\n")
|
||||
newPacks := 0
|
||||
for packID := range cmd.repo.List(backend.Data, done) {
|
||||
if packsDone.Has(packID) {
|
||||
continue
|
||||
}
|
||||
|
||||
debug.Log("RebuildIndex.RebuildIndex", "pack %v not indexed", packID.Str())
|
||||
newPacks++
|
||||
|
||||
rd, err := cmd.repo.Backend().GetReader(backend.Data, packID.String(), 0, 0)
|
||||
if err != nil {
|
||||
debug.Log("RebuildIndex.RebuildIndex", "GetReader returned error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
var readSeeker io.ReadSeeker
|
||||
if r, ok := rd.(io.ReadSeeker); ok {
|
||||
debug.Log("RebuildIndex.RebuildIndex", "reader is seekable")
|
||||
readSeeker = r
|
||||
} else {
|
||||
debug.Log("RebuildIndex.RebuildIndex", "reader is not seekable, loading contents to ram")
|
||||
buf, err := ioutil.ReadAll(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
readSeeker = bytes.NewReader(buf)
|
||||
}
|
||||
|
||||
up, err := pack.NewUnpacker(cmd.repo.Key(), readSeeker)
|
||||
if err != nil {
|
||||
debug.Log("RebuildIndex.RebuildIndex", "error while unpacking pack %v", packID.Str())
|
||||
return err
|
||||
}
|
||||
|
||||
for _, blob := range up.Entries {
|
||||
debug.Log("RebuildIndex.RebuildIndex", "pack %v: blob %v", packID.Str(), blob)
|
||||
combinedIndex.Store(blob.Type, blob.ID, packID, blob.Offset, blob.Length)
|
||||
}
|
||||
|
||||
if repository.IndexFull(combinedIndex) {
|
||||
combinedIndex, err = cmd.storeIndex(combinedIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if combinedIndex.Length() > 0 {
|
||||
combinedIndex, err = cmd.storeIndex(combinedIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cmd.global.Printf("added %d packs to the index\n", newPacks)
|
||||
|
||||
debug.Log("RebuildIndex.RebuildIndex", "done")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cmd CmdRebuildIndex) Execute(args []string) error {
|
||||
repo, err := cmd.global.OpenRepository()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd.repo = repo
|
||||
|
||||
lock, err := lockRepoExclusive(repo)
|
||||
defer unlockRepo(lock)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return cmd.RebuildIndex()
|
||||
}
|
|
@ -90,6 +90,19 @@ func cmdCheck(t testing.TB, global GlobalOptions) {
|
|||
OK(t, cmd.Execute(nil))
|
||||
}
|
||||
|
||||
func cmdCheckOutput(t testing.TB, global GlobalOptions) string {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
global.stdout = buf
|
||||
cmd := &CmdCheck{global: &global, ReadData: true}
|
||||
OK(t, cmd.Execute(nil))
|
||||
return string(buf.Bytes())
|
||||
}
|
||||
|
||||
func cmdRebuildIndex(t testing.TB, global GlobalOptions) {
|
||||
cmd := &CmdRebuildIndex{global: &global}
|
||||
OK(t, cmd.Execute(nil))
|
||||
}
|
||||
|
||||
func cmdLs(t testing.TB, global GlobalOptions, snapshotID string) []string {
|
||||
var buf bytes.Buffer
|
||||
global.stdout = &buf
|
||||
|
@ -646,3 +659,26 @@ func TestFind(t *testing.T) {
|
|||
Assert(t, len(results) < 2, "less than two file found in repo (%v)", datafile)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRebuildIndex(t *testing.T) {
|
||||
withTestEnvironment(t, func(env *testEnvironment, global GlobalOptions) {
|
||||
datafile := filepath.Join("..", "..", "checker", "testdata", "duplicate-packs-in-index-test-repo.tar.gz")
|
||||
SetupTarTestFixture(t, env.base, datafile)
|
||||
|
||||
out := cmdCheckOutput(t, global)
|
||||
if !strings.Contains(out, "contained in several indexes") {
|
||||
t.Fatalf("did not find checker hint for packs in several indexes")
|
||||
}
|
||||
|
||||
if !strings.Contains(out, "restic rebuild-index") {
|
||||
t.Fatalf("did not find hint for rebuild-index comman")
|
||||
}
|
||||
|
||||
cmdRebuildIndex(t, global)
|
||||
|
||||
out = cmdCheckOutput(t, global)
|
||||
if len(out) != 0 {
|
||||
t.Fatalf("expected no output from the checker, got: %v", out)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -121,7 +121,7 @@ func getPosition() string {
|
|||
|
||||
goroutine := goroutineNum()
|
||||
|
||||
return fmt.Sprintf("%3d %s:%3d", goroutine, filepath.Base(file), line)
|
||||
return fmt.Sprintf("%3d %s:%d", goroutine, filepath.Base(file), line)
|
||||
}
|
||||
|
||||
var maxTagLen = 10
|
||||
|
|
42
pack/pack.go
42
pack/pack.go
|
@ -233,7 +233,7 @@ type Unpacker struct {
|
|||
|
||||
// NewUnpacker returns a pointer to Unpacker which can be used to read
|
||||
// individual Blobs from a pack.
|
||||
func NewUnpacker(k *crypto.Key, entries []Blob, rd io.ReadSeeker) (*Unpacker, error) {
|
||||
func NewUnpacker(k *crypto.Key, rd io.ReadSeeker) (*Unpacker, error) {
|
||||
var err error
|
||||
ls := binary.Size(uint32(0))
|
||||
|
||||
|
@ -261,28 +261,28 @@ func NewUnpacker(k *crypto.Key, entries []Blob, rd io.ReadSeeker) (*Unpacker, er
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if entries == nil {
|
||||
pos := uint(0)
|
||||
for {
|
||||
e := headerEntry{}
|
||||
err = binary.Read(hrd, binary.LittleEndian, &e)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
var entries []Blob
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
entries = append(entries, Blob{
|
||||
Type: e.Type,
|
||||
Length: uint(e.Length),
|
||||
ID: e.ID,
|
||||
Offset: pos,
|
||||
})
|
||||
|
||||
pos += uint(e.Length)
|
||||
pos := uint(0)
|
||||
for {
|
||||
e := headerEntry{}
|
||||
err = binary.Read(hrd, binary.LittleEndian, &e)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
entries = append(entries, Blob{
|
||||
Type: e.Type,
|
||||
Length: uint(e.Length),
|
||||
ID: e.ID,
|
||||
Offset: pos,
|
||||
})
|
||||
|
||||
pos += uint(e.Length)
|
||||
}
|
||||
|
||||
p := &Unpacker{
|
||||
|
|
|
@ -67,7 +67,7 @@ func TestCreatePack(t *testing.T) {
|
|||
|
||||
// read and parse it again
|
||||
rd := bytes.NewReader(file.Bytes())
|
||||
np, err := pack.NewUnpacker(k, nil, rd)
|
||||
np, err := pack.NewUnpacker(k, rd)
|
||||
OK(t, err)
|
||||
Equals(t, len(np.Entries), len(bufs))
|
||||
|
||||
|
@ -85,7 +85,7 @@ func TestCreatePack(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
var blobTypeJson = []struct {
|
||||
var blobTypeJSON = []struct {
|
||||
t pack.BlobType
|
||||
res string
|
||||
}{
|
||||
|
@ -94,7 +94,7 @@ var blobTypeJson = []struct {
|
|||
}
|
||||
|
||||
func TestBlobTypeJSON(t *testing.T) {
|
||||
for _, test := range blobTypeJson {
|
||||
for _, test := range blobTypeJSON {
|
||||
// test serialize
|
||||
buf, err := json.Marshal(test.t)
|
||||
OK(t, err)
|
||||
|
|
|
@ -26,7 +26,7 @@ type Index struct {
|
|||
|
||||
type indexEntry struct {
|
||||
tpe pack.BlobType
|
||||
packID *backend.ID
|
||||
packID backend.ID
|
||||
offset uint
|
||||
length uint
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ func NewIndex() *Index {
|
|||
}
|
||||
}
|
||||
|
||||
func (idx *Index) store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) {
|
||||
func (idx *Index) store(t pack.BlobType, id backend.ID, pack backend.ID, offset, length uint) {
|
||||
idx.pack[id] = indexEntry{
|
||||
tpe: t,
|
||||
packID: pack,
|
||||
|
@ -64,8 +64,8 @@ const (
|
|||
indexMaxAge = 15 * time.Minute
|
||||
)
|
||||
|
||||
// Full returns true iff the index is "full enough" to be saved as a preliminary index.
|
||||
func (idx *Index) Full() bool {
|
||||
// IndexFull returns true iff the index is "full enough" to be saved as a preliminary index.
|
||||
var IndexFull = func(idx *Index) bool {
|
||||
idx.m.Lock()
|
||||
defer idx.m.Unlock()
|
||||
|
||||
|
@ -95,7 +95,7 @@ func (idx *Index) Full() bool {
|
|||
|
||||
// Store remembers the id and pack in the index. An existing entry will be
|
||||
// silently overwritten.
|
||||
func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) {
|
||||
func (idx *Index) Store(t pack.BlobType, id backend.ID, pack backend.ID, offset, length uint) {
|
||||
idx.m.Lock()
|
||||
defer idx.m.Unlock()
|
||||
|
||||
|
@ -110,7 +110,7 @@ func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset
|
|||
}
|
||||
|
||||
// Lookup returns the pack for the id.
|
||||
func (idx *Index) Lookup(id backend.ID) (packID *backend.ID, tpe pack.BlobType, offset, length uint, err error) {
|
||||
func (idx *Index) Lookup(id backend.ID) (packID backend.ID, tpe pack.BlobType, offset, length uint, err error) {
|
||||
idx.m.Lock()
|
||||
defer idx.m.Unlock()
|
||||
|
||||
|
@ -121,7 +121,7 @@ func (idx *Index) Lookup(id backend.ID) (packID *backend.ID, tpe pack.BlobType,
|
|||
}
|
||||
|
||||
debug.Log("Index.Lookup", "id %v not found", id.Str())
|
||||
return nil, pack.Data, 0, 0, fmt.Errorf("id %v not found in index", id)
|
||||
return backend.ID{}, pack.Data, 0, 0, fmt.Errorf("id %v not found in index", id)
|
||||
}
|
||||
|
||||
// Has returns true iff the id is listed in the index.
|
||||
|
@ -165,6 +165,20 @@ func (idx *Index) Supersedes() backend.IDs {
|
|||
return idx.supersedes
|
||||
}
|
||||
|
||||
// AddToSupersedes adds the ids to the list of indexes superseded by this
|
||||
// index. If the index has already been finalized, an error is returned.
|
||||
func (idx *Index) AddToSupersedes(ids ...backend.ID) error {
|
||||
idx.m.Lock()
|
||||
defer idx.m.Unlock()
|
||||
|
||||
if idx.final {
|
||||
return errors.New("index already finalized")
|
||||
}
|
||||
|
||||
idx.supersedes = append(idx.supersedes, ids...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// PackedBlob is a blob already saved within a pack.
|
||||
type PackedBlob struct {
|
||||
pack.Blob
|
||||
|
@ -196,7 +210,7 @@ func (idx *Index) Each(done chan struct{}) <-chan PackedBlob {
|
|||
Type: blob.tpe,
|
||||
Length: blob.length,
|
||||
},
|
||||
PackID: *blob.packID,
|
||||
PackID: blob.packID,
|
||||
}:
|
||||
}
|
||||
}
|
||||
|
@ -205,6 +219,19 @@ func (idx *Index) Each(done chan struct{}) <-chan PackedBlob {
|
|||
return ch
|
||||
}
|
||||
|
||||
// Packs returns all packs in this index
|
||||
func (idx *Index) Packs() backend.IDSet {
|
||||
idx.m.Lock()
|
||||
defer idx.m.Unlock()
|
||||
|
||||
packs := backend.NewIDSet()
|
||||
for _, entry := range idx.pack {
|
||||
packs.Insert(entry.packID)
|
||||
}
|
||||
|
||||
return packs
|
||||
}
|
||||
|
||||
// Count returns the number of blobs of type t in the index.
|
||||
func (idx *Index) Count(t pack.BlobType) (n uint) {
|
||||
debug.Log("Index.Count", "counting blobs of type %v", t)
|
||||
|
@ -221,6 +248,15 @@ func (idx *Index) Count(t pack.BlobType) (n uint) {
|
|||
return
|
||||
}
|
||||
|
||||
// Length returns the number of entries in the Index.
|
||||
func (idx *Index) Length() uint {
|
||||
debug.Log("Index.Count", "counting blobs")
|
||||
idx.m.Lock()
|
||||
defer idx.m.Unlock()
|
||||
|
||||
return uint(len(idx.pack))
|
||||
}
|
||||
|
||||
type packJSON struct {
|
||||
ID backend.ID `json:"id"`
|
||||
Blobs []blobJSON `json:"blobs"`
|
||||
|
@ -233,20 +269,14 @@ type blobJSON struct {
|
|||
Length uint `json:"length"`
|
||||
}
|
||||
|
||||
// generatePackList returns a list of packs containing only the index entries
|
||||
// that selsectFn returned true for. If selectFn is nil, the list contains all
|
||||
// blobs in the index.
|
||||
func (idx *Index) generatePackList(selectFn func(indexEntry) bool) ([]*packJSON, error) {
|
||||
// generatePackList returns a list of packs.
|
||||
func (idx *Index) generatePackList() ([]*packJSON, error) {
|
||||
list := []*packJSON{}
|
||||
packs := make(map[backend.ID]*packJSON)
|
||||
|
||||
for id, blob := range idx.pack {
|
||||
if blob.packID == nil {
|
||||
panic("nil pack id")
|
||||
}
|
||||
|
||||
if selectFn != nil && !selectFn(blob) {
|
||||
continue
|
||||
if blob.packID.IsNull() {
|
||||
panic("null pack id")
|
||||
}
|
||||
|
||||
debug.Log("Index.generatePackList", "handle blob %v", id.Str())
|
||||
|
@ -258,10 +288,10 @@ func (idx *Index) generatePackList(selectFn func(indexEntry) bool) ([]*packJSON,
|
|||
}
|
||||
|
||||
// see if pack is already in map
|
||||
p, ok := packs[*blob.packID]
|
||||
p, ok := packs[blob.packID]
|
||||
if !ok {
|
||||
// else create new pack
|
||||
p = &packJSON{ID: *blob.packID}
|
||||
p = &packJSON{ID: blob.packID}
|
||||
|
||||
// and append it to the list and map
|
||||
list = append(list, p)
|
||||
|
@ -302,7 +332,7 @@ func (idx *Index) Encode(w io.Writer) error {
|
|||
func (idx *Index) encode(w io.Writer) error {
|
||||
debug.Log("Index.encode", "encoding index")
|
||||
|
||||
list, err := idx.generatePackList(nil)
|
||||
list, err := idx.generatePackList()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -332,7 +362,7 @@ func (idx *Index) Dump(w io.Writer) error {
|
|||
idx.m.Lock()
|
||||
defer idx.m.Unlock()
|
||||
|
||||
list, err := idx.generatePackList(nil)
|
||||
list, err := idx.generatePackList()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -386,7 +416,7 @@ func DecodeIndex(rd io.Reader) (idx *Index, err error) {
|
|||
idx = NewIndex()
|
||||
for _, pack := range idxJSON.Packs {
|
||||
for _, blob := range pack.Blobs {
|
||||
idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length)
|
||||
idx.store(blob.Type, blob.ID, pack.ID, blob.Offset, blob.Length)
|
||||
}
|
||||
}
|
||||
idx.supersedes = idxJSON.Supersedes
|
||||
|
@ -411,7 +441,7 @@ func DecodeOldIndex(rd io.Reader) (idx *Index, err error) {
|
|||
idx = NewIndex()
|
||||
for _, pack := range list {
|
||||
for _, blob := range pack.Blobs {
|
||||
idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length)
|
||||
idx.store(blob.Type, blob.ID, pack.ID, blob.Offset, blob.Length)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ func TestIndexSerialize(t *testing.T) {
|
|||
for j := 0; j < 20; j++ {
|
||||
id := randomID()
|
||||
length := uint(i*100 + j)
|
||||
idx.Store(pack.Data, id, &packID, pos, length)
|
||||
idx.Store(pack.Data, id, packID, pos, length)
|
||||
|
||||
tests = append(tests, testEntry{
|
||||
id: id,
|
||||
|
@ -72,7 +72,7 @@ func TestIndexSerialize(t *testing.T) {
|
|||
packID, tpe, offset, length, err := idx.Lookup(testBlob.id)
|
||||
OK(t, err)
|
||||
|
||||
Equals(t, testBlob.pack, *packID)
|
||||
Equals(t, testBlob.pack, packID)
|
||||
Equals(t, testBlob.tpe, tpe)
|
||||
Equals(t, testBlob.offset, offset)
|
||||
Equals(t, testBlob.length, length)
|
||||
|
@ -80,7 +80,7 @@ func TestIndexSerialize(t *testing.T) {
|
|||
packID, tpe, offset, length, err = idx2.Lookup(testBlob.id)
|
||||
OK(t, err)
|
||||
|
||||
Equals(t, testBlob.pack, *packID)
|
||||
Equals(t, testBlob.pack, packID)
|
||||
Equals(t, testBlob.tpe, tpe)
|
||||
Equals(t, testBlob.offset, offset)
|
||||
Equals(t, testBlob.length, length)
|
||||
|
@ -95,7 +95,7 @@ func TestIndexSerialize(t *testing.T) {
|
|||
for j := 0; j < 10; j++ {
|
||||
id := randomID()
|
||||
length := uint(i*100 + j)
|
||||
idx.Store(pack.Data, id, &packID, pos, length)
|
||||
idx.Store(pack.Data, id, packID, pos, length)
|
||||
|
||||
newtests = append(newtests, testEntry{
|
||||
id: id,
|
||||
|
@ -129,7 +129,7 @@ func TestIndexSerialize(t *testing.T) {
|
|||
packID, tpe, offset, length, err := idx3.Lookup(testBlob.id)
|
||||
OK(t, err)
|
||||
|
||||
Equals(t, testBlob.pack, *packID)
|
||||
Equals(t, testBlob.pack, packID)
|
||||
Equals(t, testBlob.tpe, tpe)
|
||||
Equals(t, testBlob.offset, offset)
|
||||
Equals(t, testBlob.length, length)
|
||||
|
@ -148,7 +148,7 @@ func TestIndexSize(t *testing.T) {
|
|||
for j := 0; j < blobs; j++ {
|
||||
id := randomID()
|
||||
length := uint(i*100 + j)
|
||||
idx.Store(pack.Data, id, &packID, pos, length)
|
||||
idx.Store(pack.Data, id, packID, pos, length)
|
||||
|
||||
pos += length
|
||||
}
|
||||
|
@ -250,7 +250,7 @@ func TestIndexUnserialize(t *testing.T) {
|
|||
packID, tpe, offset, length, err := idx.Lookup(test.id)
|
||||
OK(t, err)
|
||||
|
||||
Equals(t, test.packID, *packID)
|
||||
Equals(t, test.packID, packID)
|
||||
Equals(t, test.tpe, tpe)
|
||||
Equals(t, test.offset, offset)
|
||||
Equals(t, test.length, length)
|
||||
|
@ -267,7 +267,7 @@ func TestIndexUnserializeOld(t *testing.T) {
|
|||
packID, tpe, offset, length, err := idx.Lookup(test.id)
|
||||
OK(t, err)
|
||||
|
||||
Equals(t, test.packID, *packID)
|
||||
Equals(t, test.packID, packID)
|
||||
Equals(t, test.tpe, tpe)
|
||||
Equals(t, test.offset, offset)
|
||||
Equals(t, test.length, length)
|
||||
|
@ -313,7 +313,7 @@ func TestConvertIndex(t *testing.T) {
|
|||
packID, tpe, offset, length, err := oldIndex.Lookup(packedBlob.ID)
|
||||
OK(t, err)
|
||||
|
||||
Assert(t, *packID == packedBlob.PackID,
|
||||
Assert(t, packID == packedBlob.PackID,
|
||||
"Check blob %v: pack ID %v != %v", packedBlob.ID, packID, packedBlob.PackID)
|
||||
Assert(t, tpe == packedBlob.Type,
|
||||
"Check blob %v: Type %v != %v", packedBlob.ID, tpe, packedBlob.Type)
|
||||
|
@ -325,3 +325,18 @@ func TestConvertIndex(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestIndexPacks(t *testing.T) {
|
||||
idx := repository.NewIndex()
|
||||
packs := backend.NewIDSet()
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
packID := randomID()
|
||||
idx.Store(pack.Data, randomID(), packID, 0, 23)
|
||||
|
||||
packs.Insert(packID)
|
||||
}
|
||||
|
||||
idxPacks := idx.Packs()
|
||||
Assert(t, packs.Equals(idxPacks), "packs in index do not match packs added to index")
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ func NewMasterIndex() *MasterIndex {
|
|||
}
|
||||
|
||||
// Lookup queries all known Indexes for the ID and returns the first match.
|
||||
func (mi *MasterIndex) Lookup(id backend.ID) (packID *backend.ID, tpe pack.BlobType, offset, length uint, err error) {
|
||||
func (mi *MasterIndex) Lookup(id backend.ID) (packID backend.ID, tpe pack.BlobType, offset, length uint, err error) {
|
||||
mi.idxMutex.RLock()
|
||||
defer mi.idxMutex.RUnlock()
|
||||
|
||||
|
@ -50,7 +50,7 @@ func (mi *MasterIndex) Lookup(id backend.ID) (packID *backend.ID, tpe pack.BlobT
|
|||
}
|
||||
|
||||
debug.Log("MasterIndex.Lookup", "id %v not found in any index", id.Str())
|
||||
return nil, pack.Data, 0, 0, fmt.Errorf("id %v not found in any index", id)
|
||||
return backend.ID{}, pack.Data, 0, 0, fmt.Errorf("id %v not found in any index", id)
|
||||
}
|
||||
|
||||
// LookupSize queries all known Indexes for the ID and returns the first match.
|
||||
|
@ -206,7 +206,7 @@ func (mi *MasterIndex) FullIndexes() []*Index {
|
|||
continue
|
||||
}
|
||||
|
||||
if idx.Full() {
|
||||
if IndexFull(idx) {
|
||||
debug.Log("MasterIndex.FullIndexes", "index %p is full", idx)
|
||||
list = append(list, idx)
|
||||
} else {
|
||||
|
|
|
@ -269,7 +269,7 @@ func (r *Repository) savePacker(p *pack.Packer) error {
|
|||
// update blobs in the index
|
||||
for _, b := range p.Blobs() {
|
||||
debug.Log("Repo.savePacker", " updating blob %v to pack %v", b.ID.Str(), sid.Str())
|
||||
r.idx.Current().Store(b.Type, b.ID, &sid, b.Offset, uint(b.Length))
|
||||
r.idx.Current().Store(b.Type, b.ID, sid, b.Offset, uint(b.Length))
|
||||
r.idx.RemoveFromInFlight(b.ID)
|
||||
}
|
||||
|
||||
|
@ -507,28 +507,37 @@ func (bw *BlobWriter) ID() backend.ID {
|
|||
return bw.id
|
||||
}
|
||||
|
||||
// SaveIndex saves an index to repo's backend.
|
||||
func SaveIndex(repo *Repository, index *Index) (backend.ID, error) {
|
||||
blob, err := repo.CreateEncryptedBlob(backend.Index)
|
||||
if err != nil {
|
||||
return backend.ID{}, err
|
||||
}
|
||||
|
||||
err = index.Finalize(blob)
|
||||
if err != nil {
|
||||
return backend.ID{}, err
|
||||
}
|
||||
|
||||
err = blob.Close()
|
||||
if err != nil {
|
||||
return backend.ID{}, err
|
||||
}
|
||||
|
||||
sid := blob.ID()
|
||||
return sid, nil
|
||||
}
|
||||
|
||||
// saveIndex saves all indexes in the backend.
|
||||
func (r *Repository) saveIndex(indexes ...*Index) error {
|
||||
for i, idx := range indexes {
|
||||
debug.Log("Repo.SaveIndex", "Saving index %d", i)
|
||||
|
||||
blob, err := r.CreateEncryptedBlob(backend.Index)
|
||||
sid, err := SaveIndex(r, idx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = idx.Encode(blob)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = blob.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sid := blob.ID()
|
||||
|
||||
debug.Log("Repo.SaveIndex", "Saved index %d as %v", i, sid.Str())
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"io"
|
||||
mrand "math/rand"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
|
@ -220,3 +221,70 @@ func BenchmarkLoadIndex(b *testing.B) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
// saveRandomDataBlobs generates random data blobs and saves them to the repository.
|
||||
func saveRandomDataBlobs(t testing.TB, repo *repository.Repository, num int, sizeMax int) {
|
||||
for i := 0; i < num; i++ {
|
||||
size := mrand.Int() % sizeMax
|
||||
|
||||
buf := make([]byte, size)
|
||||
_, err := io.ReadFull(rand.Reader, buf)
|
||||
OK(t, err)
|
||||
|
||||
_, err = repo.SaveAndEncrypt(pack.Data, buf, nil)
|
||||
OK(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRepositoryIncrementalIndex(t *testing.T) {
|
||||
repo := SetupRepo()
|
||||
defer TeardownRepo(repo)
|
||||
|
||||
repository.IndexFull = func(*repository.Index) bool { return true }
|
||||
|
||||
// add 15 packs
|
||||
for j := 0; j < 5; j++ {
|
||||
// add 3 packs, write intermediate index
|
||||
for i := 0; i < 3; i++ {
|
||||
saveRandomDataBlobs(t, repo, 5, 1<<15)
|
||||
OK(t, repo.Flush())
|
||||
}
|
||||
|
||||
OK(t, repo.SaveFullIndex())
|
||||
}
|
||||
|
||||
// add another 5 packs
|
||||
for i := 0; i < 5; i++ {
|
||||
saveRandomDataBlobs(t, repo, 5, 1<<15)
|
||||
OK(t, repo.Flush())
|
||||
}
|
||||
|
||||
// save final index
|
||||
OK(t, repo.SaveIndex())
|
||||
|
||||
type packEntry struct {
|
||||
id backend.ID
|
||||
indexes []*repository.Index
|
||||
}
|
||||
|
||||
packEntries := make(map[backend.ID]map[backend.ID]struct{})
|
||||
|
||||
for id := range repo.List(backend.Index, nil) {
|
||||
idx, err := repository.LoadIndex(repo, id.String())
|
||||
OK(t, err)
|
||||
|
||||
for pb := range idx.Each(nil) {
|
||||
if _, ok := packEntries[pb.PackID]; !ok {
|
||||
packEntries[pb.PackID] = make(map[backend.ID]struct{})
|
||||
}
|
||||
|
||||
packEntries[pb.PackID][id] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
for packID, ids := range packEntries {
|
||||
if len(ids) > 1 {
|
||||
t.Errorf("pack %v listed in %d indexes\n", packID, len(ids))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue