From b836da1980d635d30bc4051b2045606aaa67cf1b Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 26 Apr 2015 17:10:31 +0200 Subject: [PATCH] Add pack index --- server/index.go | 210 ++++++++++++++++++++++++++++++++++++++++ server/index_test.go | 225 +++++++++++++++++++++++++++++++++++++++++++ test/helpers.go | 2 +- 3 files changed, 436 insertions(+), 1 deletion(-) create mode 100644 server/index.go create mode 100644 server/index_test.go diff --git a/server/index.go b/server/index.go new file mode 100644 index 000000000..8ffefd984 --- /dev/null +++ b/server/index.go @@ -0,0 +1,210 @@ +package server + +import ( + "encoding/json" + "errors" + "io" + "sync" + + "github.com/restic/restic/backend" + "github.com/restic/restic/debug" + "github.com/restic/restic/pack" +) + +// Index holds a lookup table for id -> pack. +type Index struct { + m sync.Mutex + pack map[string]indexEntry +} + +type indexEntry struct { + tpe pack.BlobType + packID backend.ID + offset uint + length uint + old bool +} + +// NewIndex returns a new index. +func NewIndex() *Index { + return &Index{ + pack: make(map[string]indexEntry), + } +} + +func (idx *Index) store(t pack.BlobType, id, pack backend.ID, offset, length uint, old bool) { + idx.pack[id.String()] = indexEntry{ + tpe: t, + packID: pack, + offset: offset, + length: length, + old: old, + } +} + +// Store remembers the id and pack in the index. +func (idx *Index) Store(t pack.BlobType, id, pack backend.ID, offset, length uint) { + idx.m.Lock() + defer idx.m.Unlock() + + debug.Log("Index.Store", "pack %v contains id %v, offset %v, length %v", + pack.Str(), id.Str(), offset, length) + + idx.store(t, id, pack, offset, length, false) +} + +// Remove removes the pack ID from the index. +func (idx *Index) Remove(packID backend.ID) { + idx.m.Lock() + defer idx.m.Unlock() + + debug.Log("Index.Remove", "id %v removed", packID.Str()) + + s := packID.String() + if _, ok := idx.pack[s]; ok { + delete(idx.pack, s) + } +} + +// Lookup returns the pack for the id. +func (idx *Index) Lookup(id backend.ID) (packID backend.ID, tpe pack.BlobType, offset, length uint, err error) { + idx.m.Lock() + defer idx.m.Unlock() + + if p, ok := idx.pack[id.String()]; ok { + debug.Log("Index.Lookup", "id %v found in pack %v at %d, length %d", + id.Str(), p.packID.Str(), p.offset, p.length) + return p.packID, p.tpe, p.offset, p.length, nil + } + + debug.Log("Index.Lookup", "id %v not found", id.Str()) + return nil, pack.Data, 0, 0, errors.New("id not found") +} + +// Has returns true iff the id is listed in the index. +func (idx *Index) Has(id backend.ID) bool { + _, _, _, _, err := idx.Lookup(id) + if err == nil { + return true + } + + return false +} + +// Each returns a channel that yields all blobs known to the index. If done is +// closed, the background goroutine terminates. This blocks any modification of +// the index. +func (idx *Index) Each(done chan struct{}) <-chan pack.Blob { + idx.m.Lock() + + ch := make(chan pack.Blob) + + go func() { + defer idx.m.Unlock() + defer func() { + close(ch) + }() + + for ids, blob := range idx.pack { + id, err := backend.ParseID(ids) + if err != nil { + // ignore invalid IDs + continue + } + + select { + case <-done: + return + case ch <- pack.Blob{ + ID: id, + Offset: blob.offset, + Type: blob.tpe, + Length: uint32(blob.length), + }: + } + } + }() + + return ch +} + +type packJSON struct { + ID string `json:"id"` + Blobs []blobJSON `json:"blobs"` +} + +type blobJSON struct { + ID string `json:"id"` + Type pack.BlobType `json:"type"` + Offset uint `json:"offset"` + Length uint `json:"length"` +} + +// Encode writes the JSON serialization of the index to the writer w. This +// serialization only contains new blobs added via idx.Store(), not old ones +// introduced via DecodeIndex(). +func (idx *Index) Encode(w io.Writer) error { + idx.m.Lock() + defer idx.m.Unlock() + + list := []*packJSON{} + packs := make(map[string]*packJSON) + + for id, blob := range idx.pack { + if blob.old { + continue + } + + // see if pack is already in map + p, ok := packs[blob.packID.String()] + if !ok { + // else create new pack + p = &packJSON{ID: blob.packID.String()} + + // and append it to the list and map + list = append(list, p) + packs[p.ID] = p + } + + // add blob + p.Blobs = append(p.Blobs, blobJSON{ + ID: id, + Type: blob.tpe, + Offset: blob.offset, + Length: blob.length, + }) + } + + enc := json.NewEncoder(w) + return enc.Encode(list) +} + +// DecodeIndex loads and unserializes an index from rd. +func DecodeIndex(rd io.Reader) (*Index, error) { + list := []*packJSON{} + + dec := json.NewDecoder(rd) + err := dec.Decode(&list) + if err != nil { + return nil, err + } + + idx := NewIndex() + for _, pack := range list { + packID, err := backend.ParseID(pack.ID) + if err != nil { + return nil, err + } + + for _, blob := range pack.Blobs { + blobID, err := backend.ParseID(blob.ID) + if err != nil { + return nil, err + } + + idx.store(blob.Type, blobID, packID, blob.Offset, blob.Length, true) + } + } + + return idx, err +} diff --git a/server/index_test.go b/server/index_test.go new file mode 100644 index 000000000..149fdc468 --- /dev/null +++ b/server/index_test.go @@ -0,0 +1,225 @@ +package server_test + +import ( + "bytes" + "crypto/rand" + "io" + "testing" + + "github.com/restic/restic/backend" + "github.com/restic/restic/pack" + "github.com/restic/restic/server" + . "github.com/restic/restic/test" +) + +func randomID() backend.ID { + buf := make([]byte, backend.IDSize) + _, err := io.ReadFull(rand.Reader, buf) + if err != nil { + panic(err) + } + return buf +} + +func TestIndexSerialize(t *testing.T) { + type testEntry struct { + id backend.ID + pack backend.ID + tpe pack.BlobType + offset, length uint + } + tests := []testEntry{} + + idx := server.NewIndex() + + // create 50 packs with 20 blobs each + for i := 0; i < 50; i++ { + packID := randomID() + + pos := uint(0) + for j := 0; j < 20; j++ { + id := randomID() + length := uint(i*100 + j) + idx.Store(pack.Data, id, packID, pos, length) + + tests = append(tests, testEntry{ + id: id, + pack: packID, + tpe: pack.Data, + offset: pos, + length: length, + }) + + pos += length + } + } + + wr := bytes.NewBuffer(nil) + err := idx.Encode(wr) + OK(t, err) + + idx2, err := server.DecodeIndex(wr) + OK(t, err) + Assert(t, idx2 != nil, + "nil returned for decoded index") + + wr2 := bytes.NewBuffer(nil) + err = idx2.Encode(wr2) + OK(t, err) + + for _, testBlob := range tests { + packID, tpe, offset, length, err := idx.Lookup(testBlob.id) + OK(t, err) + + Equals(t, testBlob.pack, packID) + Equals(t, testBlob.tpe, tpe) + Equals(t, testBlob.offset, offset) + Equals(t, testBlob.length, length) + + packID, tpe, offset, length, err = idx2.Lookup(testBlob.id) + OK(t, err) + + Equals(t, testBlob.pack, packID) + Equals(t, testBlob.tpe, tpe) + Equals(t, testBlob.offset, offset) + Equals(t, testBlob.length, length) + } + + // add more blobs to idx2 + newtests := []testEntry{} + for i := 0; i < 10; i++ { + packID := randomID() + + pos := uint(0) + for j := 0; j < 10; j++ { + id := randomID() + length := uint(i*100 + j) + idx2.Store(pack.Data, id, packID, pos, length) + + newtests = append(newtests, testEntry{ + id: id, + pack: packID, + tpe: pack.Data, + offset: pos, + length: length, + }) + + pos += length + } + } + + // serialize idx2, unserialize to idx3 + wr3 := bytes.NewBuffer(nil) + err = idx2.Encode(wr3) + OK(t, err) + + idx3, err := server.DecodeIndex(wr3) + OK(t, err) + Assert(t, idx3 != nil, + "nil returned for decoded index") + + // all old blobs must not be present in the index + for _, testBlob := range tests { + _, _, _, _, err := idx3.Lookup(testBlob.id) + Assert(t, err != nil, + "found old id %v in serialized index", testBlob.id.Str()) + } + + // all new blobs must be in the index + for _, testBlob := range newtests { + packID, tpe, offset, length, err := idx3.Lookup(testBlob.id) + OK(t, err) + + Equals(t, testBlob.pack, packID) + Equals(t, testBlob.tpe, tpe) + Equals(t, testBlob.offset, offset) + Equals(t, testBlob.length, length) + } +} + +func TestIndexSize(t *testing.T) { + idx := server.NewIndex() + + packs := 200 + blobs := 100 + for i := 0; i < packs; i++ { + packID := randomID() + + pos := uint(0) + for j := 0; j < blobs; j++ { + id := randomID() + length := uint(i*100 + j) + idx.Store(pack.Data, id, packID, pos, length) + + pos += length + } + } + + wr := bytes.NewBuffer(nil) + + err := idx.Encode(wr) + OK(t, err) + + t.Logf("Index file size for %d blobs in %d packs is %d", blobs*packs, packs, wr.Len()) +} + +// example index serialization from doc/Design.md +var docExample = []byte(` +[ { + "id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c", + "blobs": [ + { + "id": "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce", + "type": "data", + "offset": 0, + "length": 25 + },{ + "id": "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae", + "type": "tree", + "offset": 38, + "length": 100 + }, + { + "id": "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66", + "type": "data", + "offset": 150, + "length": 123 + } + ] +} ] +`) + +var exampleTests = []struct { + id, packID backend.ID + tpe pack.BlobType + offset, length uint +}{ + { + ParseID("3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce"), + ParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"), + pack.Data, 0, 25, + }, { + ParseID("9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae"), + ParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"), + pack.Tree, 38, 100, + }, { + ParseID("d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66"), + ParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"), + pack.Data, 150, 123, + }, +} + +func TestIndexUnserialize(t *testing.T) { + idx, err := server.DecodeIndex(bytes.NewReader(docExample)) + OK(t, err) + + for _, test := range exampleTests { + packID, tpe, offset, length, err := idx.Lookup(test.id) + OK(t, err) + + Equals(t, test.packID, packID) + Equals(t, test.tpe, tpe) + Equals(t, test.offset, offset) + Equals(t, test.length, length) + } +} diff --git a/test/helpers.go b/test/helpers.go index d7fe0eb16..a3e5a743c 100644 --- a/test/helpers.go +++ b/test/helpers.go @@ -39,7 +39,7 @@ func Equals(tb testing.TB, exp, act interface{}) { } } -func Str2ID(s string) backend.ID { +func ParseID(s string) backend.ID { id, err := backend.ParseID(s) if err != nil { panic(err)