From d1e4431514a66eb802d974b9bdd9ec0f91e239aa Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Fri, 21 Nov 2014 21:21:44 +0100 Subject: [PATCH] Refactor StorageMap to BlobList --- Makefile | 1 - archiver.go | 33 +++++------ backend/id.go | 5 ++ bloblist.go | 99 +++++++++++++++++++++++++++++++++ bloblist_test.go | 137 ++++++++++++++++++++++++++++++++++++++++++++++ contenthandler.go | 48 +++++++--------- restorer.go | 4 +- snapshot.go | 16 +++--- storagemap.go | 51 ----------------- tree.go | 2 +- 10 files changed, 285 insertions(+), 111 deletions(-) create mode 100644 bloblist.go create mode 100644 bloblist_test.go delete mode 100644 storagemap.go diff --git a/Makefile b/Makefile index be9c4d90b..c58519465 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,6 @@ .PHONY: clean all test test: - go test -race ./... for dir in cmd/* ; do \ (cd "$$dir"; go build -race) \ done diff --git a/archiver.go b/archiver.go index 4d4ac07f4..226ea5ecc 100644 --- a/archiver.go +++ b/archiver.go @@ -20,8 +20,7 @@ type Archiver struct { key *Key ch *ContentHandler - m sync.Mutex - smap *StorageMap // blobs used for the current snapshot + bl *BlobList // blobs used for the current snapshot fileToken chan struct{} @@ -63,7 +62,7 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { // do nothing arch.ScannerUpdate = func(Stats) {} - arch.smap = NewStorageMap() + arch.bl = NewBlobList() arch.ch, err = NewContentHandler(be, key) if err != nil { return nil, err @@ -86,30 +85,26 @@ func (arch *Archiver) saveUpdate(stats Stats) { } } -func (arch *Archiver) Save(t backend.Type, data []byte) (*Blob, error) { +func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) { blob, err := arch.ch.Save(t, data) if err != nil { - return nil, err + return Blob{}, err } // store blob in storage map for current snapshot - arch.m.Lock() - defer arch.m.Unlock() - arch.smap.Insert(blob) + arch.bl.Insert(blob) return blob, nil } -func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (*Blob, error) { +func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (Blob, error) { blob, err := arch.ch.SaveJSON(t, item) if err != nil { - return nil, err + return Blob{}, err } // store blob in storage map for current snapshot - arch.m.Lock() - defer arch.m.Unlock() - arch.smap.Insert(blob) + arch.bl.Insert(blob) return blob, nil } @@ -168,9 +163,7 @@ func (arch *Archiver) SaveFile(node *Node) error { node.Content = make([]backend.ID, len(blobs)) for i, blob := range blobs { node.Content[i] = blob.ID - arch.m.Lock() - arch.smap.Insert(blob) - arch.m.Unlock() + arch.bl.Insert(blob) } return err @@ -258,14 +251,14 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) { return &Tree{node}, nil } -func (arch *Archiver) saveTree(t *Tree) (*Blob, error) { +func (arch *Archiver) saveTree(t *Tree) (Blob, error) { var wg sync.WaitGroup for _, node := range *t { if node.Tree != nil && node.Subtree == nil { b, err := arch.saveTree(node.Tree) if err != nil { - return nil, err + return Blob{}, err } node.Subtree = b.ID arch.saveUpdate(Stats{Directories: 1}) @@ -294,7 +287,7 @@ func (arch *Archiver) saveTree(t *Tree) (*Blob, error) { blob, err := arch.SaveJSON(backend.Tree, t) if err != nil { - return nil, err + return Blob{}, err } return blob, nil @@ -311,7 +304,7 @@ func (arch *Archiver) Snapshot(dir string, t *Tree) (*Snapshot, backend.ID, erro sn.Content = blob.ID // save snapshot - sn.StorageMap = arch.smap + sn.BlobList = arch.bl blob, err = arch.SaveJSON(backend.Snapshot, sn) if err != nil { return nil, nil, err diff --git a/backend/id.go b/backend/id.go index ef9067040..0c15cad26 100644 --- a/backend/id.go +++ b/backend/id.go @@ -47,6 +47,11 @@ func (id ID) EqualString(other string) (bool, error) { return id.Equal(ID(s)), nil } +// Compare compares this ID to another one, returning -1, 0, or 1. +func (id ID) Compare(other ID) int { + return bytes.Compare(other, id) +} + func (id ID) MarshalJSON() ([]byte, error) { return json.Marshal(id.String()) } diff --git a/bloblist.go b/bloblist.go new file mode 100644 index 000000000..48820e95f --- /dev/null +++ b/bloblist.go @@ -0,0 +1,99 @@ +package khepri + +import ( + "bytes" + "encoding/json" + "errors" + "sort" + "sync" +) + +type BlobList struct { + list []Blob + m sync.Mutex +} + +var ErrBlobNotFound = errors.New("Blob not found") + +func NewBlobList() *BlobList { + return &BlobList{ + list: []Blob{}, + } +} + +func (bl *BlobList) find(blob Blob) (int, Blob, error) { + pos := sort.Search(len(bl.list), func(i int) bool { + return blob.ID.Compare(bl.list[i].ID) >= 0 + }) + + if pos < len(bl.list) && blob.ID.Compare(bl.list[pos].ID) == 0 { + return pos, bl.list[pos], nil + } + + return pos, Blob{}, ErrBlobNotFound +} + +func (bl *BlobList) Find(blob Blob) (Blob, error) { + bl.m.Lock() + defer bl.m.Unlock() + + _, blob, err := bl.find(blob) + return blob, err +} + +func (bl *BlobList) Merge(other *BlobList) { + bl.m.Lock() + defer bl.m.Unlock() + other.m.Lock() + defer other.m.Unlock() + + for _, blob := range other.list { + bl.insert(blob) + } +} + +func (bl *BlobList) insert(blob Blob) { + pos, _, err := bl.find(blob) + if err == nil { + // already present + return + } + + // insert blob + // https://code.google.com/p/go-wiki/wiki/bliceTricks + bl.list = append(bl.list, Blob{}) + copy(bl.list[pos+1:], bl.list[pos:]) + bl.list[pos] = blob +} + +func (bl *BlobList) Insert(blob Blob) { + bl.m.Lock() + defer bl.m.Unlock() + + bl.insert(blob) +} + +func (bl BlobList) MarshalJSON() ([]byte, error) { + return json.Marshal(bl.list) +} + +func (bl *BlobList) UnmarshalJSON(data []byte) error { + return json.Unmarshal(data, &bl.list) +} + +// Compare compares two blobs by comparing the ID and the size. It returns -1, +// 0, or 1. +func (blob Blob) Compare(other Blob) int { + if res := bytes.Compare(other.ID, blob.ID); res != 0 { + return res + } + + if blob.Size < other.Size { + return -1 + } + if blob.Size > other.Size { + return 1 + } + + return 0 +} diff --git a/bloblist_test.go b/bloblist_test.go new file mode 100644 index 000000000..3add85cf9 --- /dev/null +++ b/bloblist_test.go @@ -0,0 +1,137 @@ +package khepri_test + +import ( + "crypto/rand" + "encoding/json" + "flag" + "io" + mrand "math/rand" + "sync" + "testing" + "time" + + "github.com/fd0/khepri" +) + +const backendIDSize = 8 + +var maxWorkers = flag.Uint("workers", 100, "number of workers to test BlobList concurrent access against") + +func randomID() []byte { + buf := make([]byte, backendIDSize) + _, err := io.ReadFull(rand.Reader, buf) + if err != nil { + panic(err) + } + return buf +} + +func newBlob() khepri.Blob { + return khepri.Blob{ID: randomID(), Size: uint64(mrand.Uint32())} +} + +// Test basic functionality +func TestBlobList(t *testing.T) { + bl := khepri.NewBlobList() + + b := newBlob() + bl.Insert(b) + + for i := 0; i < 1000; i++ { + bl.Insert(newBlob()) + } + + b2, err := bl.Find(khepri.Blob{ID: b.ID}) + ok(t, err) + assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) + + bl2 := khepri.NewBlobList() + for i := 0; i < 1000; i++ { + bl.Insert(newBlob()) + } + + b2, err = bl2.Find(b) + assert(t, err != nil, "found ID in khepri that was never inserted: %v", b2) + + bl2.Merge(bl) + + b2, err = bl2.Find(b) + + if err != nil { + t.Fatal(err) + } + + if b.Compare(b2) != 0 { + t.Fatalf("items are not equal: want %v, got %v", b, b2) + } +} + +// Test JSON encode/decode +func TestBlobListJSON(t *testing.T) { + bl := khepri.NewBlobList() + b := khepri.Blob{ID: []byte{1, 2, 3, 4}} + bl.Insert(b) + + b2, err := bl.Find(b) + ok(t, err) + assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) + + buf, err := json.Marshal(bl) + ok(t, err) + + bl2 := khepri.BlobList{} + json.Unmarshal(buf, &bl2) + + b2, err = bl2.Find(b) + ok(t, err) + assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) + + buf, err = json.Marshal(bl2) + ok(t, err) +} + +// random insert/find access by several goroutines +func TestBlobListRandom(t *testing.T) { + var wg sync.WaitGroup + + worker := func(bl *khepri.BlobList) { + defer wg.Done() + + b := newBlob() + bl.Insert(b) + + for i := 0; i < 200; i++ { + bl.Insert(newBlob()) + } + + d := time.Duration(mrand.Intn(10)*100) * time.Millisecond + time.Sleep(d) + + for i := 0; i < 100; i++ { + b2, err := bl.Find(b) + if err != nil { + t.Fatal(err) + } + + if b.Compare(b2) != 0 { + t.Fatalf("items are not equal: want %v, got %v", b, b2) + } + } + + bl2 := khepri.NewBlobList() + for i := 0; i < 200; i++ { + bl2.Insert(newBlob()) + } + + bl2.Merge(bl) + } + + bl := khepri.NewBlobList() + + for i := 0; uint(i) < *maxWorkers; i++ { + wg.Add(1) + go worker(bl) + } + + wg.Wait() +} diff --git a/contenthandler.go b/contenthandler.go index eaf9b8bd5..625261993 100644 --- a/contenthandler.go +++ b/contenthandler.go @@ -3,7 +3,7 @@ package khepri import ( "encoding/json" "errors" - "sync" + "fmt" "github.com/fd0/khepri/backend" ) @@ -12,16 +12,15 @@ type ContentHandler struct { be backend.Server key *Key - m sync.Mutex - content *StorageMap + bl *BlobList } // NewContentHandler creates a new content handler. func NewContentHandler(be backend.Server, key *Key) (*ContentHandler, error) { ch := &ContentHandler{ - be: be, - key: key, - content: NewStorageMap(), + be: be, + key: key, + bl: NewBlobList(), } return ch, nil @@ -34,9 +33,8 @@ func (ch *ContentHandler) LoadSnapshot(id backend.ID) (*Snapshot, error) { return nil, err } - ch.m.Lock() - defer ch.m.Unlock() - ch.content.Merge(sn.StorageMap) + ch.bl.Merge(sn.BlobList) + return sn, nil } @@ -50,9 +48,7 @@ func (ch *ContentHandler) LoadAllSnapshots() error { return } - ch.m.Lock() - defer ch.m.Unlock() - ch.content.Merge(sn.StorageMap) + ch.bl.Merge(sn.BlobList) }) if err != nil { return err @@ -63,20 +59,18 @@ func (ch *ContentHandler) LoadAllSnapshots() error { // Save encrypts data and stores it to the backend as type t. If the data was // already saved before, the blob is returned. -func (ch *ContentHandler) Save(t backend.Type, data []byte) (*Blob, error) { +func (ch *ContentHandler) Save(t backend.Type, data []byte) (Blob, error) { // compute plaintext hash id := backend.Hash(data) // test if the hash is already in the backend - ch.m.Lock() - defer ch.m.Unlock() - blob := ch.content.Find(id) - if blob != nil { + blob, err := ch.bl.Find(Blob{ID: id}) + if err == nil { return blob, nil } // else create a new blob - blob = &Blob{ + blob = Blob{ ID: id, Size: uint64(len(data)), } @@ -84,30 +78,30 @@ func (ch *ContentHandler) Save(t backend.Type, data []byte) (*Blob, error) { // encrypt blob ciphertext, err := ch.key.Encrypt(data) if err != nil { - return nil, err + return Blob{}, err } // save blob sid, err := ch.be.Create(t, ciphertext) if err != nil { - return nil, err + return Blob{}, err } blob.Storage = sid blob.StorageSize = uint64(len(ciphertext)) // insert blob into the storage map - ch.content.Insert(blob) + ch.bl.Insert(blob) return blob, nil } // SaveJSON serialises item as JSON and uses Save() to store it to the backend as type t. -func (ch *ContentHandler) SaveJSON(t backend.Type, item interface{}) (*Blob, error) { +func (ch *ContentHandler) SaveJSON(t backend.Type, item interface{}) (Blob, error) { // convert to json data, err := json.Marshal(item) if err != nil { - return nil, err + return Blob{}, err } // compress and save data @@ -133,11 +127,9 @@ func (ch *ContentHandler) Load(t backend.Type, id backend.ID) ([]byte, error) { } // lookup storage hash - ch.m.Lock() - defer ch.m.Unlock() - blob := ch.content.Find(id) - if blob == nil { - return nil, errors.New("Storage ID not found") + blob, err := ch.bl.Find(Blob{ID: id}) + if err != nil { + return nil, fmt.Errorf("Storage ID %s not found", id) } // load data diff --git a/restorer.go b/restorer.go index 8e94f5411..c1717db3f 100644 --- a/restorer.go +++ b/restorer.go @@ -30,12 +30,12 @@ func NewRestorer(be backend.Server, key *Key, snid backend.ID) (*Restorer, error var err error r.ch, err = NewContentHandler(be, key) if err != nil { - return nil, err + return nil, arrar.Annotate(err, "create contenthandler for restorer") } r.sn, err = r.ch.LoadSnapshot(snid) if err != nil { - return nil, err + return nil, arrar.Annotate(err, "load snapshot for restorer") } // abort on all errors diff --git a/snapshot.go b/snapshot.go index fa7d10e1d..c0cc6c2bb 100644 --- a/snapshot.go +++ b/snapshot.go @@ -11,14 +11,14 @@ import ( ) type Snapshot struct { - Time time.Time `json:"time"` - Content backend.ID `json:"content"` - StorageMap *StorageMap `json:"map"` - Dir string `json:"dir"` - Hostname string `json:"hostname,omitempty"` - Username string `json:"username,omitempty"` - UID string `json:"uid,omitempty"` - GID string `json:"gid,omitempty"` + Time time.Time `json:"time"` + Content backend.ID `json:"content"` + BlobList *BlobList `json:"blobs"` + Dir string `json:"dir"` + Hostname string `json:"hostname,omitempty"` + Username string `json:"username,omitempty"` + UID string `json:"uid,omitempty"` + GID string `json:"gid,omitempty"` id backend.ID // plaintext ID, used during restore } diff --git a/storagemap.go b/storagemap.go deleted file mode 100644 index db77952e2..000000000 --- a/storagemap.go +++ /dev/null @@ -1,51 +0,0 @@ -package khepri - -import ( - "bytes" - "sort" - - "github.com/fd0/khepri/backend" -) - -type StorageMap Blobs - -func NewStorageMap() *StorageMap { - return &StorageMap{} -} - -func (m StorageMap) find(id backend.ID) (int, *Blob) { - i := sort.Search(len(m), func(i int) bool { - return bytes.Compare(m[i].ID, id) >= 0 - }) - - if i < len(m) && bytes.Equal(m[i].ID, id) { - return i, m[i] - } - - return i, nil -} - -func (m StorageMap) Find(id backend.ID) *Blob { - _, blob := m.find(id) - return blob -} - -func (m *StorageMap) Insert(blob *Blob) { - pos, b := m.find(blob.ID) - if b != nil { - // already present - return - } - - // insert blob - // https://code.google.com/p/go-wiki/wiki/SliceTricks - *m = append(*m, nil) - copy((*m)[pos+1:], (*m)[pos:]) - (*m)[pos] = blob -} - -func (m *StorageMap) Merge(sm *StorageMap) { - for _, blob := range *sm { - m.Insert(blob) - } -} diff --git a/tree.go b/tree.go index 778388122..0aa41a44d 100644 --- a/tree.go +++ b/tree.go @@ -46,7 +46,7 @@ type Blob struct { StorageSize uint64 `json:"ssize,omitempty"` // encrypted Size } -type Blobs []*Blob +type Blobs []Blob func (n Node) String() string { switch n.Type {