From 8cf71b7f1c314fd7da2a182d8092f54b610104a1 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov <evgeniy@nspcc.ru> Date: Fri, 22 Apr 2022 16:22:40 +0300 Subject: [PATCH] [#1324] local_object_storage: Implement tree service backend In this commit we implement algorithm for CRDT trees from https://martin.klepmann.com/papers/move-op.pdf Each tree is identified by the ID of a container it belongs to and the tree name itself. Essentially, it is a sequence of operations which should be applied in chronological order to get a usual tree representation. There are 2 backends for now: bbolt database and in-memory. In-memory backend is here for debugging and will eventually act as a memory-cache for the on-disk database. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru> --- pkg/local_object_storage/pilorama/boltdb.go | 529 ++++++++++++++++++ pkg/local_object_storage/pilorama/forest.go | 114 ++++ .../pilorama/forest_test.go | 432 ++++++++++++++ pkg/local_object_storage/pilorama/inmemory.go | 235 ++++++++ .../pilorama/interface.go | 35 ++ pkg/local_object_storage/pilorama/meta.go | 48 ++ .../pilorama/meta_test.go | 54 ++ pkg/local_object_storage/pilorama/types.go | 52 ++ 8 files changed, 1499 insertions(+) create mode 100644 pkg/local_object_storage/pilorama/boltdb.go create mode 100644 pkg/local_object_storage/pilorama/forest.go create mode 100644 pkg/local_object_storage/pilorama/forest_test.go create mode 100644 pkg/local_object_storage/pilorama/inmemory.go create mode 100644 pkg/local_object_storage/pilorama/interface.go create mode 100644 pkg/local_object_storage/pilorama/meta.go create mode 100644 pkg/local_object_storage/pilorama/meta_test.go create mode 100644 pkg/local_object_storage/pilorama/types.go diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go new file mode 100644 index 000000000..6eb12a6c6 --- /dev/null +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -0,0 +1,529 @@ +package pilorama + +import ( + "bytes" + "encoding/binary" + "math/rand" + "os" + "path/filepath" + + "github.com/nspcc-dev/neo-go/pkg/io" + cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id" + "go.etcd.io/bbolt" +) + +type boltForest struct { + path string + db *bbolt.DB +} + +var ( + dataBucket = []byte{0} + logBucket = []byte{1} +) + +// NewBoltForest returns storage wrapper for storing operations on CRDT trees. +// +// Each tree is stored in a separate bucket by `CID + treeID` key. +// All integers are stored in little-endian unless explicitly specified otherwise. +// +// DB schema (for a single tree): +// timestamp is 8-byte, id is 4-byte. +// +// log storage (logBucket): +// timestamp in big-endian -> log operation +// +// tree storage (dataBucket): +// 't' + node (id) -> timestamp when the node first appeared +// 'p' + node (id) -> parent (id) +// 'm' + node (id) -> serialized meta +// 'c' + parent (id) + child (id) -> 0/1 +func NewBoltForest(path string) ForestStorage { + return &boltForest{path: path} +} + +func (t *boltForest) Init() error { return nil } +func (t *boltForest) Open() error { + if err := os.MkdirAll(filepath.Dir(t.path), os.ModePerm); err != nil { + return err + } + + db, err := bbolt.Open(t.path, os.ModePerm, bbolt.DefaultOptions) + if err != nil { + return err + } + + t.db = db + + return db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(dataBucket) + if err != nil { + return err + } + _, err = tx.CreateBucketIfNotExists(logBucket) + if err != nil { + return err + } + return nil + }) +} +func (t *boltForest) Close() error { return t.db.Close() } + +// TreeMove implements the Forest interface. +func (t *boltForest) TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error) { + var lm *LogMove + return lm, t.db.Update(func(tx *bbolt.Tx) error { + bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID) + if err != nil { + return err + } + + m.Time = t.getLatestTimestamp(bLog) + if m.Child == RootID { + m.Child = t.findSpareID(bTree) + } + lm, err = t.applyOperation(bLog, bTree, m) + return err + }) +} + +// TreeAddByPath implements the Forest interface. +func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) { + var lm []LogMove + var key [17]byte + + err := t.db.Update(func(tx *bbolt.Tx) error { + bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID) + if err != nil { + return err + } + + i, node, err := t.getPathPrefix(bTree, attr, path) + if err != nil { + return err + } + + lm = make([]LogMove, len(path)-i+1) + for j := i; j < len(path); j++ { + lm[j-i].Move = Move{ + Parent: node, + Meta: Meta{ + Time: t.getLatestTimestamp(bLog), + Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}, + }, + Child: t.findSpareID(bTree), + } + + err := t.do(bLog, bTree, key[:], &lm[j-i]) + if err != nil { + return err + } + + node = lm[j-i].Child + } + + lm[len(lm)-1].Move = Move{ + Parent: node, + Meta: Meta{ + Time: t.getLatestTimestamp(bLog), + Items: meta, + }, + Child: t.findSpareID(bTree), + } + return t.do(bLog, bTree, key[:], &lm[len(lm)-1]) + }) + return lm, err +} + +// getLatestTimestamp returns timestamp for a new operation which is guaranteed to be bigger than +// all timestamps corresponding to already stored operations. +// FIXME timestamp should be based on a node position in the container. +func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket) uint64 { + c := bLog.Cursor() + key, _ := c.Last() + if len(key) == 0 { + return 1 + } + return binary.BigEndian.Uint64(key) + 1 +} + +// findSpareID returns random unused ID. +func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 { + id := uint64(rand.Int63()) + + var key [9]byte + key[0] = 't' + binary.LittleEndian.PutUint64(key[1:], id) + + for { + if bTree.Get(key[:]) == nil { + return id + } + id = uint64(rand.Int63()) + binary.LittleEndian.PutUint64(key[1:], id) + } +} + +// TreeApply implements the Forest interface. +func (t *boltForest) TreeApply(cid cidSDK.ID, treeID string, m *Move) error { + return t.db.Update(func(tx *bbolt.Tx) error { + bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID) + if err != nil { + return err + } + _, err = t.applyOperation(bLog, bTree, m) + return err + }) +} + +func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string) (*bbolt.Bucket, *bbolt.Bucket, error) { + treeRoot := bucketName(cid, treeID) + child, err := tx.CreateBucket(treeRoot) + if err != nil && err != bbolt.ErrBucketExists { + return nil, nil, err + } + + var bLog, bData *bbolt.Bucket + if err == nil { + if bLog, err = child.CreateBucket(logBucket); err != nil { + return nil, nil, err + } + if bData, err = child.CreateBucket(dataBucket); err != nil { + return nil, nil, err + } + } else { + child = tx.Bucket(treeRoot) + bLog = child.Bucket(logBucket) + bData = child.Bucket(dataBucket) + } + + return bLog, bData, nil +} + +func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, m *Move) (*LogMove, error) { + var lm LogMove + var tmp LogMove + var cKey [17]byte + + c := logBucket.Cursor() + + key, value := c.Last() + + // 1. Undo up until the desired timestamp is here. + for len(key) == 8 && binary.BigEndian.Uint64(key) > m.Time { + if err := t.logFromBytes(&tmp, key, value); err != nil { + return nil, err + } + if err := t.undo(&tmp.Move, &tmp, treeBucket, cKey[:]); err != nil { + return nil, err + } + key, value = c.Prev() + } + + // 2. Insert the operation. + if len(key) != 8 || binary.BigEndian.Uint64(key) != m.Time { + lm.Move = *m + if err := t.do(logBucket, treeBucket, cKey[:], &lm); err != nil { + return nil, err + } + } + key, value = c.Next() + + // 3. Re-apply all other operations. + for len(key) == 8 { + if err := t.logFromBytes(&tmp, key, value); err != nil { + return nil, err + } + if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil { + return nil, err + } + key, value = c.Next() + } + + return &lm, nil +} + +func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMove) error { + shouldPut := !t.isAncestor(b, key, op.Child, op.Parent) && + !(op.Parent != 0 && op.Parent != TrashID && b.Get(timestampKey(key, op.Parent)) == nil) + shouldRemove := op.Parent == TrashID + + currParent := b.Get(parentKey(key, op.Child)) + if currParent != nil { // node is already in tree + op.HasOld = true + op.Old.Parent = binary.LittleEndian.Uint64(currParent) + if err := op.Old.Meta.FromBytes(b.Get(metaKey(key, op.Child))); err != nil { + return err + } + } + + binary.BigEndian.PutUint64(key, op.Time) + if err := lb.Put(key[:8], t.logToBytes(op)); err != nil { + return err + } + + if !shouldPut { + return nil + } + + if shouldRemove { + if currParent != nil { + p := binary.LittleEndian.Uint64(currParent) + if err := b.Delete(childrenKey(key, op.Child, p)); err != nil { + return err + } + } + return t.removeNode(b, key, op.Child) + } + + if currParent == nil { + if err := b.Put(timestampKey(key, op.Child), toUint64(op.Time)); err != nil { + return err + } + } else { + if err := b.Delete(childrenKey(key, op.Child, binary.LittleEndian.Uint64(currParent))); err != nil { + return err + } + } + return t.addNode(b, key, op.Child, op.Parent, op.Meta) +} + +// removeNode removes node keys from the tree except the children key or its parent. +func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node Node) error { + if err := b.Delete(parentKey(key, node)); err != nil { + return err + } + if err := b.Delete(metaKey(key, node)); err != nil { + return err + } + return b.Delete(timestampKey(key, node)) +} + +// addNode adds node keys to the tree except the timestamp key. +func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, meta Meta) error { + err := b.Put(parentKey(key, child), toUint64(parent)) + if err != nil { + return err + } + err = b.Put(childrenKey(key, child, parent), []byte{1}) + if err != nil { + return err + } + return b.Put(metaKey(key, child), meta.Bytes()) +} + +func (t *boltForest) undo(m *Move, lm *LogMove, b *bbolt.Bucket, key []byte) error { + if err := b.Delete(childrenKey(key, m.Child, m.Parent)); err != nil { + return err + } + + if !lm.HasOld { + return t.removeNode(b, key, m.Child) + } + return t.addNode(b, key, m.Child, lm.Old.Parent, lm.Old.Meta) +} + +func (t *boltForest) isAncestor(b *bbolt.Bucket, key []byte, parent, child Node) bool { + key[0] = 'p' + for c := child; c != parent; { + binary.LittleEndian.PutUint64(key[1:], c) + rawParent := b.Get(key[:9]) + if len(rawParent) != 8 { + return false + } + c = binary.LittleEndian.Uint64(rawParent) + } + return true +} + +// TreeGetByPath implements the Forest interface. +func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) { + if len(path) == 0 { + return nil, nil + } + + var nodes []Node + + return nodes, t.db.View(func(tx *bbolt.Tx) error { + treeRoot := tx.Bucket(bucketName(cid, treeID)) + if treeRoot == nil { + return ErrTreeNotFound + } + + b := treeRoot.Bucket(dataBucket) + + i, curNode, err := t.getPathPrefix(b, attr, path[:len(path)-1]) + if err != nil { + return err + } + if i < len(path)-1 { + return nil + } + + c := b.Cursor() + + var ( + metaKey [9]byte + id [9]byte + childID [9]byte + m Meta + maxTimestamp uint64 + ) + + id[0] = 'c' + metaKey[0] = 'm' + + binary.LittleEndian.PutUint64(id[1:], curNode) + + key, _ := c.Seek(id[:]) + for len(key) == 1+8+8 && bytes.Equal(id[:9], key[:9]) { + child := binary.LittleEndian.Uint64(key[9:]) + copy(metaKey[1:], key[9:17]) + + if m.FromBytes(b.Get(metaKey[:])) == nil && string(m.GetAttr(attr)) == path[len(path)-1] { + if latest { + ts := binary.LittleEndian.Uint64(b.Get(timestampKey(childID[:], child))) + if ts >= maxTimestamp { + nodes = append(nodes[:0], child) + maxTimestamp = ts + } + } else { + nodes = append(nodes, child) + } + } + key, _ = c.Next() + } + + return nil + }) +} + +// TreeGetMeta implements the forest interface. +func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Meta, error) { + key := metaKey(make([]byte, 9), nodeID) + + var m Meta + err := t.db.View(func(tx *bbolt.Tx) error { + treeRoot := tx.Bucket(bucketName(cid, treeID)) + if treeRoot == nil { + return ErrTreeNotFound + } + + b := treeRoot.Bucket(dataBucket) + return m.FromBytes(b.Get(key)) + }) + + return m, err +} + +func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) { + var key [9]byte + + c := bTree.Cursor() + + var curNode Node + var m Meta + +loop: + for i := range path { + key[0] = 'c' + binary.LittleEndian.PutUint64(key[1:], curNode) + + childKey, _ := c.Seek(key[:]) + for { + if len(childKey) != 17 || binary.LittleEndian.Uint64(childKey[1:]) != curNode { + break + } + + child := binary.LittleEndian.Uint64(childKey[9:]) + if err := m.FromBytes(bTree.Get(metaKey(key[:], child))); err != nil { + return 0, 0, err + } + + for j := range m.Items { + if m.Items[j].Key == attr { + if string(m.Items[j].Value) == path[i] { + curNode = child + continue loop + } + break + } + } + childKey, _ = c.Next() + } + return i, curNode, nil + } + + return len(path), curNode, nil +} + +func (t *boltForest) logFromBytes(lm *LogMove, key []byte, data []byte) error { + r := io.NewBinReaderFromBuf(data) + lm.Child = r.ReadU64LE() + lm.Parent = r.ReadU64LE() + if err := lm.Meta.FromBytes(r.ReadVarBytes()); err != nil { + return err + } + + lm.HasOld = r.ReadBool() + if lm.HasOld { + lm.Old.Parent = r.ReadU64LE() + if err := lm.Old.Meta.FromBytes(r.ReadVarBytes()); err != nil { + return err + } + } + + return r.Err +} + +func (t *boltForest) logToBytes(lm *LogMove) []byte { + w := io.NewBufBinWriter() + w.WriteU64LE(lm.Child) + w.WriteU64LE(lm.Parent) + w.WriteVarBytes(lm.Meta.Bytes()) + w.WriteBool(lm.HasOld) + if lm.HasOld { + w.WriteU64LE(lm.Old.Parent) + w.WriteVarBytes(lm.Old.Meta.Bytes()) + } + return w.Bytes() +} + +func bucketName(cid cidSDK.ID, treeID string) []byte { + return []byte(cid.String() + treeID) +} + +// 't' + node (id) -> timestamp when the node first appeared +func timestampKey(key []byte, child Node) []byte { + key[0] = 't' + binary.LittleEndian.PutUint64(key[1:], child) + return key[:9] +} + +// 'p' + node (id) -> parent (id) +func parentKey(key []byte, child Node) []byte { + key[0] = 'p' + binary.LittleEndian.PutUint64(key[1:], child) + return key[:9] +} + +// 'm' + node (id) -> serialized meta +func metaKey(key []byte, child Node) []byte { + key[0] = 'm' + binary.LittleEndian.PutUint64(key[1:], child) + return key[:9] +} + +// 'c' + parent (id) + child (id) -> 0/1 +func childrenKey(key []byte, child, parent Node) []byte { + key[0] = 'c' + binary.LittleEndian.PutUint64(key[1:], parent) + binary.LittleEndian.PutUint64(key[9:], child) + return key[:17] +} + +func toUint64(x uint64) []byte { + var a [8]byte + binary.LittleEndian.PutUint64(a[:], x) + return a[:] +} diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go new file mode 100644 index 000000000..53d044866 --- /dev/null +++ b/pkg/local_object_storage/pilorama/forest.go @@ -0,0 +1,114 @@ +package pilorama + +import ( + cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id" +) + +// memoryForest represents multiple replicating trees sharing a single storage. +type memoryForest struct { + // treeMap maps tree identifier (container ID + name) to the replicated log. + treeMap map[string]*state +} + +var _ Forest = (*memoryForest)(nil) + +// NewMemoryForest creates new empty forest. +// TODO: this function will eventually be removed and is here for debugging. +func NewMemoryForest() ForestStorage { + return &memoryForest{ + treeMap: make(map[string]*state), + } +} + +// TreeMove implements the Forest interface. +func (f *memoryForest) TreeMove(cid cidSDK.ID, treeID string, op *Move) (*LogMove, error) { + fullID := cid.String() + "/" + treeID + s, ok := f.treeMap[fullID] + if !ok { + s = newState() + f.treeMap[fullID] = s + } + + op.Time = s.timestamp() + if op.Child == RootID { + op.Child = s.findSpareID() + } + + lm := s.do(op) + s.operations = append(s.operations, lm) + return &lm, nil +} + +// TreeAddByPath implements the Forest interface. +func (f *memoryForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, m []KeyValue) ([]LogMove, error) { + fullID := cid.String() + "/" + treeID + s, ok := f.treeMap[fullID] + if !ok { + s = newState() + f.treeMap[fullID] = s + } + + i, node := s.getPathPrefix(attr, path) + lm := make([]LogMove, len(path)-i+1) + for j := i; j < len(path); j++ { + lm[j-i] = s.do(&Move{ + Parent: node, + Meta: Meta{Time: s.timestamp(), Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}}, + Child: s.findSpareID(), + }) + node = lm[j-i].Child + s.operations = append(s.operations, lm[j-i]) + } + lm[len(lm)-1] = s.do(&Move{ + Parent: node, + Meta: Meta{Time: s.timestamp(), Items: m}, + Child: s.findSpareID(), + }) + return lm, nil +} + +// TreeApply implements the Forest interface. +func (f *memoryForest) TreeApply(cid cidSDK.ID, treeID string, op *Move) error { + fullID := cid.String() + "/" + treeID + s, ok := f.treeMap[fullID] + if !ok { + s = newState() + f.treeMap[fullID] = s + } + + return s.Apply(op) +} + +func (f *memoryForest) Init() error { + return nil +} + +func (f *memoryForest) Open() error { + return nil +} + +func (f *memoryForest) Close() error { + return nil +} + +// TreeGetByPath implements the Forest interface. +func (f *memoryForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) { + fullID := cid.String() + "/" + treeID + s, ok := f.treeMap[fullID] + if !ok { + return nil, ErrTreeNotFound + } + + return s.get(attr, path, latest), nil +} + +// TreeGetMeta implements the Forest interface. +func (f *memoryForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Meta, error) { + fullID := cid.String() + "/" + treeID + s, ok := f.treeMap[fullID] + if !ok { + return Meta{}, ErrTreeNotFound + } + + return s.getMeta(nodeID), nil +} diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go new file mode 100644 index 000000000..f3a498fe5 --- /dev/null +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -0,0 +1,432 @@ +package pilorama + +import ( + "errors" + "math/rand" + "os" + "path/filepath" + "testing" + + cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + "github.com/stretchr/testify/require" +) + +var providers = []struct { + name string + construct func(t *testing.T) Forest +}{ + {"inmemory", func(t *testing.T) Forest { + f := NewMemoryForest() + require.NoError(t, f.Init()) + require.NoError(t, f.Open()) + t.Cleanup(func() { + require.NoError(t, f.Close()) + }) + + return f + }}, + {"bbolt", func(t *testing.T) Forest { + // Use `os.TempDir` because we construct multiple times in the same test. + tmpDir, err := os.MkdirTemp(os.TempDir(), "*") + require.NoError(t, err) + + f := NewBoltForest(filepath.Join(tmpDir, "test.db")) + require.NoError(t, f.Init()) + require.NoError(t, f.Open()) + t.Cleanup(func() { + require.NoError(t, f.Close()) + require.NoError(t, os.RemoveAll(tmpDir)) + }) + return f + }}, +} + +func testMeta(t *testing.T, f Forest, cid cidSDK.ID, treeID string, nodeID Node, expected Meta) { + actual, err := f.TreeGetMeta(cid, treeID, nodeID) + require.NoError(t, err) + require.Equal(t, expected, actual) +} + +func TestForest_TreeMove(t *testing.T) { + for i := range providers { + t.Run(providers[i].name, func(t *testing.T) { + testForestTreeMove(t, providers[i].construct(t)) + }) + } +} + +func testForestTreeMove(t *testing.T, s Forest) { + cid := cidtest.ID() + treeID := "version" + + meta := []KeyValue{ + {Key: AttributeVersion, Value: []byte("XXX")}, + {Key: AttributeFilename, Value: []byte("file.txt")}} + lm, err := s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta) + require.NoError(t, err) + require.Equal(t, 3, len(lm)) + + nodeID := lm[2].Child + t.Run("same parent, update meta", func(t *testing.T) { + _, err = s.TreeMove(cid, treeID, &Move{ + Parent: lm[1].Child, + Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})}, + Child: nodeID, + }) + require.NoError(t, err) + + nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, false) + require.NoError(t, err) + require.ElementsMatch(t, []Node{nodeID}, nodes) + }) + t.Run("different parent", func(t *testing.T) { + _, err = s.TreeMove(cid, treeID, &Move{ + Parent: RootID, + Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})}, + Child: nodeID, + }) + require.NoError(t, err) + + nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, false) + require.NoError(t, err) + require.True(t, len(nodes) == 0) + + nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"file.txt"}, false) + require.NoError(t, err) + require.ElementsMatch(t, []Node{nodeID}, nodes) + }) +} + +func TestForest_TreeAdd(t *testing.T) { + for i := range providers { + t.Run(providers[i].name, func(t *testing.T) { + testForestTreeAdd(t, providers[i].construct(t)) + }) + } +} + +func testForestTreeAdd(t *testing.T, s Forest) { + cid := cidtest.ID() + treeID := "version" + + meta := []KeyValue{ + {Key: AttributeVersion, Value: []byte("XXX")}, + {Key: AttributeFilename, Value: []byte("file.txt")}} + lm, err := s.TreeMove(cid, treeID, &Move{ + Parent: RootID, + Child: RootID, + Meta: Meta{Items: meta}, + }) + require.NoError(t, err) + + testMeta(t, s, cid, treeID, lm.Child, Meta{Time: lm.Time, Items: meta}) + + nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"file.txt"}, false) + require.NoError(t, err) + require.ElementsMatch(t, []Node{lm.Child}, nodes) + + t.Run("other trees are unaffected", func(t *testing.T) { + _, err := s.TreeGetByPath(cid, treeID+"123", AttributeFilename, []string{"file.txt"}, false) + require.True(t, errors.Is(err, ErrTreeNotFound), "got: %v", err) + + _, err = s.TreeGetMeta(cid, treeID+"123", 0) + require.True(t, errors.Is(err, ErrTreeNotFound), "got: %v", err) + }) +} + +func TestForest_TreeAddByPath(t *testing.T) { + for i := range providers { + t.Run(providers[i].name, func(t *testing.T) { + testForestTreeAddByPath(t, providers[i].construct(t)) + }) + } +} + +func testForestTreeAddByPath(t *testing.T, s Forest) { + cid := cidtest.ID() + treeID := "version" + + meta := []KeyValue{ + {Key: AttributeVersion, Value: []byte("XXX")}, + {Key: AttributeFilename, Value: []byte("file.txt")}} + lm, err := s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta) + require.NoError(t, err) + require.Equal(t, 3, len(lm)) + testMeta(t, s, cid, treeID, lm[0].Child, Meta{Time: lm[0].Time, Items: []KeyValue{{AttributeFilename, []byte("path")}}}) + testMeta(t, s, cid, treeID, lm[1].Child, Meta{Time: lm[1].Time, Items: []KeyValue{{AttributeFilename, []byte("to")}}}) + + firstID := lm[2].Child + testMeta(t, s, cid, treeID, firstID, Meta{Time: lm[2].Time, Items: meta}) + + meta[0].Value = []byte("YYY") + lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta) + require.NoError(t, err) + require.Equal(t, 1, len(lm)) + + secondID := lm[0].Child + testMeta(t, s, cid, treeID, secondID, Meta{Time: lm[0].Time, Items: meta}) + + t.Run("get versions", func(t *testing.T) { + // All versions. + nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, false) + require.NoError(t, err) + require.ElementsMatch(t, []Node{firstID, secondID}, nodes) + + // Latest version. + nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, true) + require.NoError(t, err) + require.Equal(t, []Node{secondID}, nodes) + }) + + meta[0].Value = []byte("ZZZ") + meta[1].Value = []byte("cat.jpg") + lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "dir"}, meta) + require.NoError(t, err) + require.Equal(t, 2, len(lm)) + testMeta(t, s, cid, treeID, lm[0].Child, Meta{Time: lm[0].Time, Items: []KeyValue{{AttributeFilename, []byte("dir")}}}) + testMeta(t, s, cid, treeID, lm[1].Child, Meta{Time: lm[1].Time, Items: meta}) +} + +func TestForest_Apply(t *testing.T) { + for i := range providers { + t.Run(providers[i].name, func(t *testing.T) { + testForestTreeApply(t, providers[i].construct) + }) + } +} + +func testForestTreeApply(t *testing.T, constructor func(t *testing.T) Forest) { + cid := cidtest.ID() + treeID := "version" + + testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) { + require.NoError(t, s.TreeApply(cid, treeID, &Move{ + Child: child, + Parent: parent, + Meta: meta, + })) + } + + t.Run("add a child, then insert a parent removal", func(t *testing.T) { + s := constructor(t) + testApply(t, s, 10, 0, Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}}) + + meta := Meta{Time: 3, Items: []KeyValue{{"child", []byte{3}}}} + testApply(t, s, 11, 10, meta) + testMeta(t, s, cid, treeID, 11, meta) + + testApply(t, s, 10, TrashID, Meta{Time: 2, Items: []KeyValue{{"parent", []byte{2}}}}) + testMeta(t, s, cid, treeID, 11, Meta{}) + }) + t.Run("add a child to non-existent parent, then add a parent", func(t *testing.T) { + s := constructor(t) + + testApply(t, s, 11, 10, Meta{Time: 1, Items: []KeyValue{{"child", []byte{3}}}}) + testMeta(t, s, cid, treeID, 11, Meta{}) + + testApply(t, s, 10, 0, Meta{Time: 2, Items: []KeyValue{{"grand", []byte{1}}}}) + testMeta(t, s, cid, treeID, 11, Meta{}) + }) +} + +func TestForest_ApplyRandom(t *testing.T) { + for i := range providers { + t.Run(providers[i].name, func(t *testing.T) { + testForestTreeApplyRandom(t, providers[i].construct) + }) + } +} + +func testForestTreeApplyRandom(t *testing.T, constructor func(t *testing.T) Forest) { + rand.Seed(42) + + const ( + nodeCount = 4 + opCount = 10 + iterCount = 100 + ) + + cid := cidtest.ID() + treeID := "version" + expected := constructor(t) + + ops := make([]Move, nodeCount+opCount) + for i := 0; i < nodeCount; i++ { + ops[i] = Move{ + Parent: 0, + Meta: Meta{ + Time: Timestamp(i), + Items: []KeyValue{{Value: make([]byte, 10)}}, + }, + Child: uint64(i) + 1, + } + rand.Read(ops[i].Meta.Items[0].Value) + } + + for i := nodeCount; i < len(ops); i++ { + ops[i] = Move{ + Parent: rand.Uint64() % (nodeCount + 1), + Meta: Meta{ + Time: Timestamp(i + nodeCount), + Items: []KeyValue{{Value: make([]byte, 10)}}, + }, + Child: rand.Uint64() % (nodeCount + 1), + } + rand.Read(ops[i].Meta.Items[0].Value) + } + for i := range ops { + require.NoError(t, expected.TreeApply(cid, treeID, &ops[i])) + } + + for i := 0; i < iterCount; i++ { + // Shuffle random operations, leave initialization in place. + rand.Shuffle(len(ops)-nodeCount, func(i, j int) { ops[i+nodeCount], ops[j+nodeCount] = ops[j+nodeCount], ops[i+nodeCount] }) + + actual := constructor(t) + for i := range ops { + require.NoError(t, actual.TreeApply(cid, treeID, &ops[i])) + } + for i := uint64(0); i < nodeCount; i++ { + expectedMeta, err := expected.TreeGetMeta(cid, treeID, i) + require.NoError(t, err) + actualMeta, err := actual.TreeGetMeta(cid, treeID, i) + require.NoError(t, err) + require.Equal(t, expectedMeta, actualMeta, "node id: %d", i) + + if _, ok := actual.(*memoryForest); ok { + require.Equal(t, expected, actual, i) + } + } + } +} + +const benchNodeCount = 1000 + +func BenchmarkApplySequential(b *testing.B) { + benchmarkApply(b, benchNodeCount, func(nodeCount, opCount int) []Move { + ops := make([]Move, opCount) + for i := range ops { + ops[i] = Move{ + Parent: uint64(rand.Intn(nodeCount)), + Meta: Meta{ + Time: Timestamp(i), + Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}}, + }, + Child: uint64(rand.Intn(nodeCount)), + } + } + return ops + }) +} + +func BenchmarkApplyReorderLast(b *testing.B) { + // Group operations in a blocks of 10, order blocks in increasing timestamp order, + // and operations in a single block in reverse. + const blockSize = 10 + + benchmarkApply(b, benchNodeCount, func(nodeCount, opCount int) []Move { + ops := make([]Move, opCount) + for i := range ops { + ops[i] = Move{ + Parent: uint64(rand.Intn(nodeCount)), + Meta: Meta{ + Time: Timestamp(i), + Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}}, + }, + Child: uint64(rand.Intn(nodeCount)), + } + if i != 0 && i%blockSize == 0 { + for j := 0; j < blockSize/2; j++ { + ops[i-j], ops[i+j-blockSize] = ops[i+j-blockSize], ops[i-j] + } + } + } + return ops + }) +} + +func benchmarkApply(b *testing.B, n int, genFunc func(int, int) []Move) { + rand.Seed(42) + + s := newState() + ops := genFunc(n, b.N) + + b.ResetTimer() + b.ReportAllocs() + for i := range ops { + if err := s.Apply(&ops[i]); err != nil { + b.Fatalf("error in `Apply`: %v", err) + } + } +} + +func TestTreeGetByPath(t *testing.T) { + for i := range providers { + t.Run(providers[i].name, func(t *testing.T) { + testTreeGetByPath(t, providers[i].construct(t)) + }) + } +} + +func testTreeGetByPath(t *testing.T, s Forest) { + cid := cidtest.ID() + treeID := "version" + + // / + // |- a (1) + // |- cat1.jpg, Version=TTT (3) + // |- b (2) + // |- cat1.jpg, Version=XXX (4) + // |- cat1.jpg, Version=YYY (5) + // |- cat2.jpg, Version=ZZZ (6) + testMove(t, s, 0, 1, 0, cid, treeID, "a", "") + testMove(t, s, 1, 2, 0, cid, treeID, "b", "") + testMove(t, s, 2, 3, 1, cid, treeID, "cat1.jpg", "TTT") + testMove(t, s, 3, 4, 2, cid, treeID, "cat1.jpg", "XXX") + testMove(t, s, 4, 5, 2, cid, treeID, "cat1.jpg", "YYY") + testMove(t, s, 5, 6, 2, cid, treeID, "cat2.jpg", "ZZZ") + + if mf, ok := s.(*memoryForest); ok { + single := mf.treeMap[cid.String()+"/"+treeID] + t.Run("test meta", func(t *testing.T) { + for i := 0; i < 6; i++ { + require.Equal(t, uint64(i), single.infoMap[Node(i+1)].Timestamp) + } + }) + } + + nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"b", "cat1.jpg"}, false) + require.NoError(t, err) + require.Equal(t, []Node{4, 5}, nodes) + + nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"a", "cat1.jpg"}, false) + require.Equal(t, []Node{3}, nodes) + + t.Run("missing child", func(t *testing.T) { + nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"a", "cat3.jpg"}, false) + require.True(t, len(nodes) == 0) + }) + t.Run("missing parent", func(t *testing.T) { + nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"xyz", "cat1.jpg"}, false) + require.True(t, len(nodes) == 0) + }) + t.Run("empty path", func(t *testing.T) { + nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, nil, false) + require.True(t, len(nodes) == 0) + }) +} + +func testMove(t *testing.T, s Forest, ts int, node, parent Node, cid cidSDK.ID, treeID, filename, version string) { + require.NoError(t, s.TreeApply(cid, treeID, &Move{ + Parent: parent, + Child: node, + Meta: Meta{ + Time: uint64(ts), + Items: []KeyValue{ + {AttributeFilename, []byte(filename)}, + {AttributeVersion, []byte(version)}, + }, + }, + })) +} diff --git a/pkg/local_object_storage/pilorama/inmemory.go b/pkg/local_object_storage/pilorama/inmemory.go new file mode 100644 index 000000000..91199cc1c --- /dev/null +++ b/pkg/local_object_storage/pilorama/inmemory.go @@ -0,0 +1,235 @@ +package pilorama + +// nodeInfo couples parent and metadata. +type nodeInfo struct { + Parent Node + Meta Meta + Timestamp Timestamp +} + +// state represents state being replicated. +type state struct { + operations []LogMove + tree +} + +// newState constructs new empty tree. +func newState() *state { + return &state{ + tree: *newTree(), + } +} + +// undo un-does op and changes s in-place. +func (s *state) undo(op *LogMove) { + children := s.tree.childMap[op.Parent] + for i := range children { + if children[i] == op.Child { + if len(children) > 1 { + s.tree.childMap[op.Parent] = append(children[:i], children[i+1:]...) + } else { + delete(s.tree.childMap, op.Parent) + } + break + } + } + + if op.HasOld { + s.tree.infoMap[op.Child] = op.Old + oldChildren := s.tree.childMap[op.Old.Parent] + for i := range oldChildren { + if oldChildren[i] == op.Child { + return + } + } + s.tree.childMap[op.Old.Parent] = append(oldChildren, op.Child) + } else { + delete(s.tree.infoMap, op.Child) + } +} + +// Apply puts op in log at a proper position, re-applies all subsequent operations +// from log and changes s in-place. +func (s *state) Apply(op *Move) error { + var index int + for index = len(s.operations); index > 0; index-- { + if s.operations[index-1].Time <= op.Time { + break + } + } + + if index == len(s.operations) { + s.operations = append(s.operations, s.do(op)) + return nil + } + + s.operations = append(s.operations[:index+1], s.operations[index:]...) + for i := len(s.operations) - 1; i > index; i-- { + s.undo(&s.operations[i]) + } + + s.operations[index] = s.do(op) + + for i := index + 1; i < len(s.operations); i++ { + s.operations[i] = s.do(&s.operations[i].Move) + } + return nil +} + +// do performs a single move operation on a tree. +func (s *state) do(op *Move) LogMove { + lm := LogMove{ + Move: Move{ + Parent: op.Parent, + Meta: op.Meta, + Child: op.Child, + }, + } + + _, parentInTree := s.tree.infoMap[op.Parent] + shouldPut := !s.tree.isAncestor(op.Child, op.Parent) && + !(op.Parent != 0 && op.Parent != TrashID && !parentInTree) + shouldRemove := op.Parent == TrashID + + p, ok := s.tree.infoMap[op.Child] + if ok { + lm.HasOld = true + lm.Old = p + } + + if !shouldPut { + return lm + } + + if shouldRemove { + if ok { + s.removeChild(op.Child, p.Parent) + } + delete(s.tree.infoMap, op.Child) + return lm + } + + if !ok { + p.Timestamp = op.Time + } else { + s.removeChild(op.Child, p.Parent) + } + + p.Meta = op.Meta + p.Parent = op.Parent + s.tree.infoMap[op.Child] = p + s.tree.childMap[op.Parent] = append(s.tree.childMap[op.Parent], op.Child) + + return lm +} + +func (s *state) removeChild(child, parent Node) { + oldChildren := s.tree.childMap[parent] + for i := range oldChildren { + if oldChildren[i] == child { + s.tree.childMap[parent] = append(oldChildren[:i], oldChildren[i+1:]...) + break + } + } +} + +func (s *state) timestamp() Timestamp { + if len(s.operations) == 0 { + return 0 + } + return s.operations[len(s.operations)-1].Time + 1 +} + +func (s *state) findSpareID() Node { + id := uint64(1) + for _, ok := s.infoMap[id]; ok; _, ok = s.infoMap[id] { + id++ + } + return id +} + +// tree is a mapping from the child nodes to their parent and metadata. +type tree struct { + infoMap map[Node]nodeInfo + childMap map[Node][]Node +} + +func newTree() *tree { + return &tree{ + childMap: make(map[Node][]Node), + infoMap: make(map[Node]nodeInfo), + } +} + +// isAncestor returns true if parent is an ancestor of a child. +// For convenience, also return true if parent == child. +func (t tree) isAncestor(parent, child Node) bool { + for c := child; c != parent; { + p, ok := t.infoMap[c] + if !ok { + return false + } + c = p.Parent + } + return true +} + +// getPathPrefix descends by path constructed from values of attr until +// there is no node corresponding to a path element. Returns the amount of nodes +// processed and ID of the last node. +func (t tree) getPathPrefix(attr string, path []string) (int, Node) { + var curNode Node + +loop: + for i := range path { + children := t.childMap[curNode] + for j := range children { + f := t.infoMap[children[j]].Meta.GetAttr(attr) + if string(f) == path[i] { + curNode = children[j] + continue loop + } + } + return i, curNode + } + + return len(path), curNode +} + +// get returns list of nodes which have the specified path from root +// descending by values of attr from meta. +func (t tree) get(attr string, path []string, latest bool) []Node { + if len(path) == 0 { + return nil + } + + i, curNode := t.getPathPrefix(attr, path[:len(path)-1]) + if i < len(path)-1 { + return nil + } + + var nodes []Node + var lastTs Timestamp + + children := t.childMap[curNode] + for i := range children { + info := t.infoMap[children[i]] + fileName := string(info.Meta.GetAttr(attr)) + if fileName == path[len(path)-1] { + if latest { + if info.Timestamp >= lastTs { + nodes = append(nodes[:0], children[i]) + } + } else { + nodes = append(nodes, children[i]) + } + } + } + + return nodes +} + +// getMeta returns meta information of node n. +func (t tree) getMeta(n Node) Meta { + return t.infoMap[n].Meta +} diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go new file mode 100644 index 000000000..005ce5ce5 --- /dev/null +++ b/pkg/local_object_storage/pilorama/interface.go @@ -0,0 +1,35 @@ +package pilorama + +import cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id" + +// Forest represents CRDT tree. +type Forest interface { + // TreeMove moves node in the tree. + // If the parent of the move operation is TrashID, the node is removed. + // If the child of the move operation is RootID, new ID is generated and added to a tree. + TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error) + // TreeAddByPath adds new node in the tree using provided path. + // The path is constructed by descending from the root using the values of the attr in meta. + TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) + // TreeApply applies replicated operation from another node. + TreeApply(cid cidSDK.ID, treeID string, m *Move) error + // TreeGetByPath returns all nodes corresponding to the path. + // The path is constructed by descending from the root using the values of the + // AttributeFilename in meta. + // The last argument determines whether only the node with the latest timestamp is returned. + TreeGetByPath(cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) + // TreeGetMeta returns meta information of the node with the specified ID. + TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Meta, error) +} + +type ForestStorage interface { + Init() error + Open() error + Close() error + Forest +} + +const ( + AttributeFilename = "FileName" + AttributeVersion = "Version" +) diff --git a/pkg/local_object_storage/pilorama/meta.go b/pkg/local_object_storage/pilorama/meta.go new file mode 100644 index 000000000..f5ed21be7 --- /dev/null +++ b/pkg/local_object_storage/pilorama/meta.go @@ -0,0 +1,48 @@ +package pilorama + +import "github.com/nspcc-dev/neo-go/pkg/io" + +func (x *Meta) FromBytes(data []byte) error { + if len(data) == 0 { + x.Items = nil + x.Time = 0 + return nil + } + + r := io.NewBinReaderFromBuf(data) + ts := r.ReadVarUint() + size := r.ReadVarUint() + m := make([]KeyValue, size) + for i := range m { + m[i].Key = r.ReadString() + m[i].Value = r.ReadVarBytes() + } + if r.Err != nil { + return r.Err + } + + x.Time = ts + x.Items = m + return nil +} + +func (x Meta) Bytes() []byte { + w := io.NewBufBinWriter() + w.WriteVarUint(x.Time) + w.WriteVarUint(uint64(len(x.Items))) + for _, e := range x.Items { + w.WriteString(e.Key) + w.WriteVarBytes(e.Value) + } + + return w.Bytes() +} + +func (x Meta) GetAttr(name string) []byte { + for _, kv := range x.Items { + if kv.Key == name { + return kv.Value + } + } + return nil +} diff --git a/pkg/local_object_storage/pilorama/meta_test.go b/pkg/local_object_storage/pilorama/meta_test.go new file mode 100644 index 000000000..2e10649f3 --- /dev/null +++ b/pkg/local_object_storage/pilorama/meta_test.go @@ -0,0 +1,54 @@ +package pilorama + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMeta_Bytes(t *testing.T) { + t.Run("empty", func(t *testing.T) { + var m Meta + require.NoError(t, m.FromBytes(nil)) + require.True(t, len(m.Items) == 0) + require.Equal(t, uint64(0), m.Time) + require.Equal(t, []byte{0, 0}, m.Bytes()) + }) + t.Run("filled", func(t *testing.T) { + expected := Meta{ + Time: 123, + Items: []KeyValue{ + {"abc", []byte{1, 2, 3}}, + {"xyz", []byte{5, 6, 7, 8}}, + }} + + data := expected.Bytes() + + var actual Meta + require.NoError(t, actual.FromBytes(data)) + require.Equal(t, expected, actual) + + t.Run("error", func(t *testing.T) { + require.Error(t, new(Meta).FromBytes(data[:len(data)/2])) + }) + }) +} + +func TestMeta_GetAttr(t *testing.T) { + attr := [][]byte{ + make([]byte, 5), + make([]byte, 10), + } + for i := range attr { + rand.Read(attr[i]) + } + + m := Meta{Items: []KeyValue{{"abc", attr[0]}, {"xyz", attr[1]}}} + require.Equal(t, attr[0], m.GetAttr("abc")) + require.Equal(t, attr[1], m.GetAttr("xyz")) + + require.Nil(t, m.GetAttr("a")) + require.Nil(t, m.GetAttr("xyza")) + require.Nil(t, m.GetAttr("")) +} diff --git a/pkg/local_object_storage/pilorama/types.go b/pkg/local_object_storage/pilorama/types.go new file mode 100644 index 000000000..b73b26e84 --- /dev/null +++ b/pkg/local_object_storage/pilorama/types.go @@ -0,0 +1,52 @@ +package pilorama + +import ( + "errors" + "math" +) + +// Timestamp is an alias for integer timestamp type. +// TODO: remove after the debugging. +type Timestamp = uint64 + +// Node is used to represent nodes. +// TODO: remove after the debugging. +type Node = uint64 + +// Meta represents arbitrary meta information. +// TODO: remove after the debugging or create a proper interface. +type Meta struct { + Time Timestamp + Items []KeyValue +} + +// KeyValue represents a key-value pair. +type KeyValue struct { + Key string + Value []byte +} + +// Move represents a single move operation. +type Move struct { + Parent Node + Meta + // Child represents the ID of a node being moved. If zero, new ID is generated. + Child Node +} + +// LogMove represents log record for a single move operation. +type LogMove struct { + Move + HasOld bool + Old nodeInfo +} + +const ( + // RootID represents the ID of a root node. + RootID = 0 + // TrashID is a parent for all removed nodes. + TrashID = math.MaxUint64 +) + +// ErrTreeNotFound is returned when the requested tree is not found. +var ErrTreeNotFound = errors.New("tree not found")