Rework index decode and handling old format

This commit is contained in:
Alexander Neumann 2015-08-08 12:22:17 +02:00
parent 356bb62243
commit eb73182fcf
8 changed files with 180 additions and 48 deletions

View file

@ -3,6 +3,7 @@ package checker
import ( import (
"errors" "errors"
"fmt" "fmt"
"os"
"sync" "sync"
"github.com/restic/restic" "github.com/restic/restic"
@ -58,15 +59,26 @@ func (c *Checker) LoadIndex() error {
indexCh := make(chan indexRes) indexCh := make(chan indexRes)
worker := func(id string, done <-chan struct{}) error { worker := func(id backend.ID, done <-chan struct{}) error {
debug.Log("LoadIndex", "worker got index %v", id) debug.Log("LoadIndex", "worker got index %v", id)
idx, err := repository.LoadIndex(c.repo, id) idx, err := repository.LoadIndexWithDecoder(c.repo, id.String(), repository.DecodeIndex)
if err == repository.ErrOldIndexFormat {
debug.Log("LoadIndex", "old index format found, converting")
fmt.Fprintf(os.Stderr, "convert index %v to new format\n", id.Str())
id, err = repository.ConvertIndex(c.repo, id)
if err != nil {
return err
}
idx, err = repository.LoadIndexWithDecoder(c.repo, id.String(), repository.DecodeIndex)
}
if err != nil { if err != nil {
return err return err
} }
select { select {
case indexCh <- indexRes{Index: idx, ID: id}: case indexCh <- indexRes{Index: idx, ID: id.String()}:
case <-done: case <-done:
} }
@ -77,7 +89,8 @@ func (c *Checker) LoadIndex() error {
go func() { go func() {
defer close(indexCh) defer close(indexCh)
debug.Log("LoadIndex", "start loading indexes in parallel") debug.Log("LoadIndex", "start loading indexes in parallel")
perr = repository.FilesInParallel(c.repo.Backend(), backend.Index, defaultParallelism, worker) perr = repository.FilesInParallel(c.repo.Backend(), backend.Index, defaultParallelism,
repository.ParallelWorkFuncParseID(worker))
debug.Log("LoadIndex", "loading indexes finished, error: %v", perr) debug.Log("LoadIndex", "loading indexes finished, error: %v", perr)
}() }()

View file

@ -91,8 +91,8 @@ func TestUnreferencedPack(t *testing.T) {
WithTestEnvironment(t, checkerTestData, func(repodir string) { WithTestEnvironment(t, checkerTestData, func(repodir string) {
repo := OpenLocalRepo(t, repodir) repo := OpenLocalRepo(t, repodir)
// index 8eb5 only references pack 60e0 // index 3f1a only references pack 60e0
indexID := "8eb5b61062bf8e959f244fba0c971108bc8d4d2a4b236f71a704998e28cc5cf6" indexID := "3f1abfcb79c6f7d0a3be517d2c83c8562fba64ef2c8e9a3544b4edaf8b5e3b44"
packID := "60e0438dcb978ec6860cc1f8c43da648170ee9129af8f650f876bad19f8f788e" packID := "60e0438dcb978ec6860cc1f8c43da648170ee9129af8f650f876bad19f8f788e"
OK(t, repo.Backend().Remove(backend.Index, indexID)) OK(t, repo.Backend().Remove(backend.Index, indexID))

Binary file not shown.

View file

@ -17,6 +17,8 @@ import (
type Index struct { type Index struct {
m sync.Mutex m sync.Mutex
pack map[backend.ID]indexEntry pack map[backend.ID]indexEntry
supersedes backend.IDs
} }
type indexEntry struct { type indexEntry struct {
@ -139,6 +141,11 @@ func (idx *Index) Merge(other *Index) {
debug.Log("Index.Merge", "done merging index") debug.Log("Index.Merge", "done merging index")
} }
// Supersedes returns the list of indexes this index supersedes, if any.
func (idx *Index) Supersedes() backend.IDs {
return idx.supersedes
}
// PackedBlob is a blob already saved within a pack. // PackedBlob is a blob already saved within a pack.
type PackedBlob struct { type PackedBlob struct {
pack.Blob pack.Blob
@ -257,22 +264,20 @@ func (idx *Index) generatePackList(selectFn func(indexEntry) bool) ([]*packJSON,
} }
type jsonIndex struct { type jsonIndex struct {
Supersedes []backend.ID `json:"supersedes,omitempty"` Supersedes backend.IDs `json:"supersedes,omitempty"`
Packs []*packJSON `json:"packs"` Packs []*packJSON `json:"packs"`
} }
type jsonOldIndex []*packJSON type jsonOldIndex []*packJSON
// encode writes the JSON serialization of the index filtered by selectFn to enc. // encode writes the JSON serialization of the index filtered by selectFn to enc.
func (idx *Index) encode(w io.Writer, supersedes []backend.ID, selectFn func(indexEntry) bool) error { func (idx *Index) encode(w io.Writer, supersedes backend.IDs, selectFn func(indexEntry) bool) error {
list, err := idx.generatePackList(func(entry indexEntry) bool { list, err := idx.generatePackList(selectFn)
return !entry.old
})
if err != nil { if err != nil {
return err return err
} }
debug.Log("Index.Encode", "done") debug.Log("Index.Encode", "done, %d entries selected", len(list))
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
idxJSON := jsonIndex{ idxJSON := jsonIndex{
@ -290,7 +295,7 @@ func (idx *Index) Encode(w io.Writer) error {
idx.m.Lock() idx.m.Lock()
defer idx.m.Unlock() defer idx.m.Unlock()
return idx.encode(w, nil, func(e indexEntry) bool { return !e.old }) return idx.encode(w, idx.supersedes, func(e indexEntry) bool { return !e.old })
} }
// Dump writes the pretty-printed JSON representation of the index to w. // Dump writes the pretty-printed JSON representation of the index to w.
@ -333,47 +338,48 @@ func isErrOldIndex(err error) bool {
var ErrOldIndexFormat = errors.New("index has old format") var ErrOldIndexFormat = errors.New("index has old format")
// DecodeIndex loads and unserializes an index from rd. // DecodeIndex loads and unserializes an index from rd.
func DecodeIndex(rd io.Reader) (*Index, backend.IDs, error) { func DecodeIndex(rd io.Reader) (idx *Index, err error) {
debug.Log("Index.DecodeIndex", "Start decoding index") debug.Log("Index.DecodeIndex", "Start decoding index")
idxJSON := jsonIndex{} idxJSON := jsonIndex{}
dec := json.NewDecoder(rd) dec := json.NewDecoder(rd)
err := dec.Decode(&idxJSON) err = dec.Decode(&idxJSON)
if err != nil { if err != nil {
debug.Log("Index.DecodeIndex", "Error %#v", err) debug.Log("Index.DecodeIndex", "Error %v", err)
if isErrOldIndex(err) { if isErrOldIndex(err) {
debug.Log("Index.DecodeIndex", "index is probably old format, trying that") debug.Log("Index.DecodeIndex", "index is probably old format, trying that")
err = ErrOldIndexFormat err = ErrOldIndexFormat
} }
return nil, nil, err return nil, err
} }
idx := NewIndex() idx = NewIndex()
for _, pack := range idxJSON.Packs { for _, pack := range idxJSON.Packs {
for _, blob := range pack.Blobs { for _, blob := range pack.Blobs {
idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length, true) idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length, true)
} }
} }
idx.supersedes = idxJSON.Supersedes
debug.Log("Index.DecodeIndex", "done") debug.Log("Index.DecodeIndex", "done")
return idx, idxJSON.Supersedes, err return idx, err
} }
// DecodeOldIndex loads and unserializes an index in the old format from rd. // DecodeOldIndex loads and unserializes an index in the old format from rd.
func DecodeOldIndex(rd io.Reader) (*Index, backend.IDs, error) { func DecodeOldIndex(rd io.Reader) (idx *Index, err error) {
debug.Log("Index.DecodeOldIndex", "Start decoding old index") debug.Log("Index.DecodeOldIndex", "Start decoding old index")
list := []*packJSON{} list := []*packJSON{}
dec := json.NewDecoder(rd) dec := json.NewDecoder(rd)
err := dec.Decode(&list) err = dec.Decode(&list)
if err != nil { if err != nil {
debug.Log("Index.DecodeOldIndex", "Error %#v", err) debug.Log("Index.DecodeOldIndex", "Error %#v", err)
return nil, nil, err return nil, err
} }
idx := NewIndex() idx = NewIndex()
for _, pack := range list { for _, pack := range list {
for _, blob := range pack.Blobs { for _, blob := range pack.Blobs {
idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length, true) idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length, true)
@ -381,5 +387,76 @@ func DecodeOldIndex(rd io.Reader) (*Index, backend.IDs, error) {
} }
debug.Log("Index.DecodeOldIndex", "done") debug.Log("Index.DecodeOldIndex", "done")
return idx, backend.IDs{}, err return idx, err
}
// ConvertIndexes loads all indexes from the repo and converts them to the new
// format (if necessary). When the conversion is succcessful, the old indexes
// are removed.
func ConvertIndexes(repo *Repository) error {
debug.Log("ConvertIndexes", "start")
done := make(chan struct{})
defer close(done)
for id := range repo.List(backend.Index, done) {
debug.Log("ConvertIndexes", "checking index %v", id.Str())
newID, err := ConvertIndex(repo, id)
if err != nil {
debug.Log("ConvertIndexes", "Converting index %v returns error: %v", id.Str(), err)
return err
}
if id != newID {
debug.Log("ConvertIndexes", "index %v converted to new format as %v", id.Str(), newID.Str())
}
}
debug.Log("ConvertIndexes", "done")
return nil
}
// ConvertIndex loads the given index from the repo and converts them to the new
// format (if necessary). When the conversion is succcessful, the old index
// is removed. Returned is either the old id (if no conversion was needed) or
// the new id.
func ConvertIndex(repo *Repository, id backend.ID) (backend.ID, error) {
debug.Log("ConvertIndex", "checking index %v", id.Str())
idx, err := LoadIndexWithDecoder(repo, id.String(), DecodeOldIndex)
if err != nil {
debug.Log("ConvertIndex", "LoadIndexWithDecoder(%v) returned error: %v", id.Str(), err)
return id, err
}
blob, err := repo.CreateEncryptedBlob(backend.Index)
if err != nil {
return id, err
}
idx.supersedes = backend.IDs{id}
// select all blobs for export
err = idx.encode(blob, idx.supersedes, func(e indexEntry) bool { return true })
if err != nil {
debug.Log("ConvertIndex", "oldIdx.Encode() returned error: %v", err)
return id, err
}
err = blob.Close()
if err != nil {
debug.Log("ConvertIndex", "blob.Close() returned error: %v", err)
return id, err
}
newID := blob.ID()
debug.Log("ConvertIndex", "index %v converted to new format as %v", id.Str(), newID.Str())
err = repo.be.Remove(backend.Index, id.String())
if err != nil {
debug.Log("ConvertIndex", "backend.Remove(%v) returned error: %v", id.Str(), err)
return id, err
}
return newID, nil
} }

View file

@ -3,7 +3,6 @@ package repository_test
import ( import (
"bytes" "bytes"
"crypto/rand" "crypto/rand"
"fmt"
"io" "io"
"path/filepath" "path/filepath"
"testing" "testing"
@ -60,7 +59,7 @@ func TestIndexSerialize(t *testing.T) {
err := idx.Encode(wr) err := idx.Encode(wr)
OK(t, err) OK(t, err)
idx2, _, err := repository.DecodeIndex(wr) idx2, err := repository.DecodeIndex(wr)
OK(t, err) OK(t, err)
Assert(t, idx2 != nil, Assert(t, idx2 != nil,
"nil returned for decoded index") "nil returned for decoded index")
@ -115,7 +114,7 @@ func TestIndexSerialize(t *testing.T) {
err = idx2.Encode(wr3) err = idx2.Encode(wr3)
OK(t, err) OK(t, err)
idx3, _, err := repository.DecodeIndex(wr3) idx3, err := repository.DecodeIndex(wr3)
OK(t, err) OK(t, err)
Assert(t, idx3 != nil, Assert(t, idx3 != nil,
"nil returned for decoded index") "nil returned for decoded index")
@ -246,7 +245,7 @@ var exampleTests = []struct {
func TestIndexUnserialize(t *testing.T) { func TestIndexUnserialize(t *testing.T) {
oldIdx := backend.IDs{ParseID("ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452")} oldIdx := backend.IDs{ParseID("ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452")}
idx, supersedes, err := repository.DecodeIndex(bytes.NewReader(docExample)) idx, err := repository.DecodeIndex(bytes.NewReader(docExample))
OK(t, err) OK(t, err)
for _, test := range exampleTests { for _, test := range exampleTests {
@ -259,11 +258,11 @@ func TestIndexUnserialize(t *testing.T) {
Equals(t, test.length, length) Equals(t, test.length, length)
} }
Equals(t, oldIdx, supersedes) Equals(t, oldIdx, idx.Supersedes())
} }
func TestIndexUnserializeOld(t *testing.T) { func TestIndexUnserializeOld(t *testing.T) {
idx, supersedes, err := repository.DecodeOldIndex(bytes.NewReader(docOldExample)) idx, err := repository.DecodeOldIndex(bytes.NewReader(docOldExample))
OK(t, err) OK(t, err)
for _, test := range exampleTests { for _, test := range exampleTests {
@ -276,8 +275,57 @@ func TestIndexUnserializeOld(t *testing.T) {
Equals(t, test.length, length) Equals(t, test.length, length)
} }
Assert(t, len(supersedes) == 0, Equals(t, 0, len(idx.Supersedes()))
"expected %v supersedes, got %v", 0, len(supersedes)) }
var oldIndexTestRepo = filepath.Join("testdata", "old-index-repo.tar.gz")
func TestConvertIndex(t *testing.T) {
WithTestEnvironment(t, oldIndexTestRepo, func(repodir string) {
repo := OpenLocalRepo(t, repodir)
old := make(map[backend.ID]*repository.Index)
for id := range repo.List(backend.Index, nil) {
idx, err := repository.LoadIndex(repo, id.String())
OK(t, err)
old[id] = idx
}
OK(t, repository.ConvertIndexes(repo))
for id := range repo.List(backend.Index, nil) {
idx, err := repository.LoadIndexWithDecoder(repo, id.String(), repository.DecodeIndex)
OK(t, err)
Assert(t, len(idx.Supersedes()) == 1,
"Expected index %v to supersed exactly one index, got %v", id, idx.Supersedes())
oldIndexID := idx.Supersedes()[0]
oldIndex, ok := old[oldIndexID]
Assert(t, ok,
"Index %v superseds %v, but that wasn't found in the old index map", id.Str(), oldIndexID.Str())
Assert(t, idx.Count(pack.Data) == oldIndex.Count(pack.Data),
"Index %v count blobs %v: %v != %v", id.Str(), pack.Data, idx.Count(pack.Data), oldIndex.Count(pack.Data))
Assert(t, idx.Count(pack.Tree) == oldIndex.Count(pack.Tree),
"Index %v count blobs %v: %v != %v", id.Str(), pack.Tree, idx.Count(pack.Tree), oldIndex.Count(pack.Tree))
for packedBlob := range idx.Each(nil) {
packID, tpe, offset, length, err := oldIndex.Lookup(packedBlob.ID)
OK(t, err)
Assert(t, *packID == packedBlob.PackID,
"Check blob %v: pack ID %v != %v", packedBlob.ID, packID, packedBlob.PackID)
Assert(t, tpe == packedBlob.Type,
"Check blob %v: Type %v != %v", packedBlob.ID, tpe, packedBlob.Type)
Assert(t, offset == packedBlob.Offset,
"Check blob %v: Type %v != %v", packedBlob.ID, offset, packedBlob.Offset)
Assert(t, length == packedBlob.Length,
"Check blob %v: Type %v != %v", packedBlob.ID, length, packedBlob.Length)
}
}
})
} }
func TestStoreOverwritesPreliminaryEntry(t *testing.T) { func TestStoreOverwritesPreliminaryEntry(t *testing.T) {

View file

@ -579,14 +579,14 @@ func (r *Repository) LoadIndex() error {
// LoadIndex loads the index id from backend and returns it. // LoadIndex loads the index id from backend and returns it.
func LoadIndex(repo *Repository, id string) (*Index, error) { func LoadIndex(repo *Repository, id string) (*Index, error) {
idx, err := loadIndex(repo, id, false) idx, err := LoadIndexWithDecoder(repo, id, DecodeIndex)
if err == nil { if err == nil {
return idx, nil return idx, nil
} }
if err == ErrOldIndexFormat { if err == ErrOldIndexFormat {
fmt.Fprintf(os.Stderr, "index %v has old format\n", id) fmt.Fprintf(os.Stderr, "index %v has old format\n", id[:10])
return loadIndex(repo, id, true) return LoadIndexWithDecoder(repo, id, DecodeOldIndex)
} }
return nil, err return nil, err
@ -632,8 +632,9 @@ func (r *Repository) GetDecryptReader(t backend.Type, id string) (io.ReadCloser,
return newDecryptReadCloser(r.key, rd) return newDecryptReadCloser(r.key, rd)
} }
func loadIndex(repo *Repository, id string, oldFormat bool) (*Index, error) { // LoadIndexWithDecoder loads the index and decodes it with fn.
debug.Log("loadIndex", "Loading index %v", id[:8]) func LoadIndexWithDecoder(repo *Repository, id string, fn func(io.Reader) (*Index, error)) (*Index, error) {
debug.Log("LoadIndexWithDecoder", "Loading index %v", id[:8])
rd, err := repo.GetDecryptReader(backend.Index, id) rd, err := repo.GetDecryptReader(backend.Index, id)
if err != nil { if err != nil {
@ -641,16 +642,9 @@ func loadIndex(repo *Repository, id string, oldFormat bool) (*Index, error) {
} }
defer rd.Close() defer rd.Close()
var idx *Index idx, err := fn(rd)
if !oldFormat {
idx, _, err = DecodeIndex(rd)
} else {
idx, _, err = DecodeOldIndex(rd)
}
if err != nil { if err != nil {
debug.Log("loadIndex", "error while decoding index %v: %v", id, err) debug.Log("LoadIndexWithDecoder", "error while decoding index %v: %v", id, err)
return nil, err return nil, err
} }

View file

@ -199,7 +199,7 @@ func TestLoadJSONUnpacked(t *testing.T) {
var repoFixture = filepath.Join("testdata", "test-repo.tar.gz") var repoFixture = filepath.Join("testdata", "test-repo.tar.gz")
func TestLoadIndex(t *testing.T) { func TestRepositoryLoadIndex(t *testing.T) {
WithTestEnvironment(t, repoFixture, func(repodir string) { WithTestEnvironment(t, repoFixture, func(repodir string) {
repo := OpenLocalRepo(t, repodir) repo := OpenLocalRepo(t, repodir)
OK(t, repo.LoadIndex()) OK(t, repo.LoadIndex())

Binary file not shown.