[#1324] local_object_storage: Implement tree service backend

In this commit we implement algorithm for CRDT trees from
https://martin.klepmann.com/papers/move-op.pdf

Each tree is identified by the ID of a container it belongs to
and the tree name itself. Essentially, it is a sequence of operations
which should be applied in chronological order to get a usual tree
representation.

There are 2 backends for now: bbolt database and in-memory.
In-memory backend is here for debugging and will eventually act
as a memory-cache for the on-disk database.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-04-22 16:22:40 +03:00
parent 3a2c025843
commit cf73feb3f8
8 changed files with 1499 additions and 0 deletions

View file

@ -0,0 +1,529 @@
package pilorama
import (
"bytes"
"encoding/binary"
"math/rand"
"os"
"path/filepath"
"github.com/nspcc-dev/neo-go/pkg/io"
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
"go.etcd.io/bbolt"
)
type boltForest struct {
path string
db *bbolt.DB
}
var (
dataBucket = []byte{0}
logBucket = []byte{1}
)
// NewBoltForest returns storage wrapper for storing operations on CRDT trees.
//
// Each tree is stored in a separate bucket by `CID + treeID` key.
// All integers are stored in little-endian unless explicitly specified otherwise.
//
// DB schema (for a single tree):
// timestamp is 8-byte, id is 4-byte.
//
// log storage (logBucket):
// timestamp in big-endian -> log operation
//
// tree storage (dataBucket):
// 't' + node (id) -> timestamp when the node first appeared
// 'p' + node (id) -> parent (id)
// 'm' + node (id) -> serialized meta
// 'c' + parent (id) + child (id) -> 0/1
func NewBoltForest(path string) ForestStorage {
return &boltForest{path: path}
}
func (t *boltForest) Init() error { return nil }
func (t *boltForest) Open() error {
if err := os.MkdirAll(filepath.Dir(t.path), os.ModePerm); err != nil {
return err
}
db, err := bbolt.Open(t.path, os.ModePerm, bbolt.DefaultOptions)
if err != nil {
return err
}
t.db = db
return db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(dataBucket)
if err != nil {
return err
}
_, err = tx.CreateBucketIfNotExists(logBucket)
if err != nil {
return err
}
return nil
})
}
func (t *boltForest) Close() error { return t.db.Close() }
// TreeMove implements the Forest interface.
func (t *boltForest) TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error) {
var lm *LogMove
return lm, t.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
if err != nil {
return err
}
m.Time = t.getLatestTimestamp(bLog)
if m.Child == RootID {
m.Child = t.findSpareID(bTree)
}
lm, err = t.applyOperation(bLog, bTree, m)
return err
})
}
// TreeAddByPath implements the Forest interface.
func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) {
var lm []LogMove
var key [17]byte
err := t.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
if err != nil {
return err
}
i, node, err := t.getPathPrefix(bTree, attr, path)
if err != nil {
return err
}
lm = make([]LogMove, len(path)-i+1)
for j := i; j < len(path); j++ {
lm[j-i].Move = Move{
Parent: node,
Meta: Meta{
Time: t.getLatestTimestamp(bLog),
Items: []KeyValue{{Key: attr, Value: []byte(path[j])}},
},
Child: t.findSpareID(bTree),
}
err := t.do(bLog, bTree, key[:], &lm[j-i])
if err != nil {
return err
}
node = lm[j-i].Child
}
lm[len(lm)-1].Move = Move{
Parent: node,
Meta: Meta{
Time: t.getLatestTimestamp(bLog),
Items: meta,
},
Child: t.findSpareID(bTree),
}
return t.do(bLog, bTree, key[:], &lm[len(lm)-1])
})
return lm, err
}
// getLatestTimestamp returns timestamp for a new operation which is guaranteed to be bigger than
// all timestamps corresponding to already stored operations.
// FIXME timestamp should be based on a node position in the container.
func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket) uint64 {
c := bLog.Cursor()
key, _ := c.Last()
if len(key) == 0 {
return 1
}
return binary.BigEndian.Uint64(key) + 1
}
// findSpareID returns random unused ID.
func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
id := uint64(rand.Int63())
var key [9]byte
key[0] = 't'
binary.LittleEndian.PutUint64(key[1:], id)
for {
if bTree.Get(key[:]) == nil {
return id
}
id = uint64(rand.Int63())
binary.LittleEndian.PutUint64(key[1:], id)
}
}
// TreeApply implements the Forest interface.
func (t *boltForest) TreeApply(cid cidSDK.ID, treeID string, m *Move) error {
return t.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
if err != nil {
return err
}
_, err = t.applyOperation(bLog, bTree, m)
return err
})
}
func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string) (*bbolt.Bucket, *bbolt.Bucket, error) {
treeRoot := bucketName(cid, treeID)
child, err := tx.CreateBucket(treeRoot)
if err != nil && err != bbolt.ErrBucketExists {
return nil, nil, err
}
var bLog, bData *bbolt.Bucket
if err == nil {
if bLog, err = child.CreateBucket(logBucket); err != nil {
return nil, nil, err
}
if bData, err = child.CreateBucket(dataBucket); err != nil {
return nil, nil, err
}
} else {
child = tx.Bucket(treeRoot)
bLog = child.Bucket(logBucket)
bData = child.Bucket(dataBucket)
}
return bLog, bData, nil
}
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, m *Move) (*LogMove, error) {
var lm LogMove
var tmp LogMove
var cKey [17]byte
c := logBucket.Cursor()
key, value := c.Last()
// 1. Undo up until the desired timestamp is here.
for len(key) == 8 && binary.BigEndian.Uint64(key) > m.Time {
if err := t.logFromBytes(&tmp, key, value); err != nil {
return nil, err
}
if err := t.undo(&tmp.Move, &tmp, treeBucket, cKey[:]); err != nil {
return nil, err
}
key, value = c.Prev()
}
// 2. Insert the operation.
if len(key) != 8 || binary.BigEndian.Uint64(key) != m.Time {
lm.Move = *m
if err := t.do(logBucket, treeBucket, cKey[:], &lm); err != nil {
return nil, err
}
}
key, value = c.Next()
// 3. Re-apply all other operations.
for len(key) == 8 {
if err := t.logFromBytes(&tmp, key, value); err != nil {
return nil, err
}
if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil {
return nil, err
}
key, value = c.Next()
}
return &lm, nil
}
func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMove) error {
shouldPut := !t.isAncestor(b, key, op.Child, op.Parent) &&
!(op.Parent != 0 && op.Parent != TrashID && b.Get(timestampKey(key, op.Parent)) == nil)
shouldRemove := op.Parent == TrashID
currParent := b.Get(parentKey(key, op.Child))
if currParent != nil { // node is already in tree
op.HasOld = true
op.Old.Parent = binary.LittleEndian.Uint64(currParent)
if err := op.Old.Meta.FromBytes(b.Get(metaKey(key, op.Child))); err != nil {
return err
}
}
binary.BigEndian.PutUint64(key, op.Time)
if err := lb.Put(key[:8], t.logToBytes(op)); err != nil {
return err
}
if !shouldPut {
return nil
}
if shouldRemove {
if currParent != nil {
p := binary.LittleEndian.Uint64(currParent)
if err := b.Delete(childrenKey(key, op.Child, p)); err != nil {
return err
}
}
return t.removeNode(b, key, op.Child)
}
if currParent == nil {
if err := b.Put(timestampKey(key, op.Child), toUint64(op.Time)); err != nil {
return err
}
} else {
if err := b.Delete(childrenKey(key, op.Child, binary.LittleEndian.Uint64(currParent))); err != nil {
return err
}
}
return t.addNode(b, key, op.Child, op.Parent, op.Meta)
}
// removeNode removes node keys from the tree except the children key or its parent.
func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node Node) error {
if err := b.Delete(parentKey(key, node)); err != nil {
return err
}
if err := b.Delete(metaKey(key, node)); err != nil {
return err
}
return b.Delete(timestampKey(key, node))
}
// addNode adds node keys to the tree except the timestamp key.
func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, meta Meta) error {
err := b.Put(parentKey(key, child), toUint64(parent))
if err != nil {
return err
}
err = b.Put(childrenKey(key, child, parent), []byte{1})
if err != nil {
return err
}
return b.Put(metaKey(key, child), meta.Bytes())
}
func (t *boltForest) undo(m *Move, lm *LogMove, b *bbolt.Bucket, key []byte) error {
if err := b.Delete(childrenKey(key, m.Child, m.Parent)); err != nil {
return err
}
if !lm.HasOld {
return t.removeNode(b, key, m.Child)
}
return t.addNode(b, key, m.Child, lm.Old.Parent, lm.Old.Meta)
}
func (t *boltForest) isAncestor(b *bbolt.Bucket, key []byte, parent, child Node) bool {
key[0] = 'p'
for c := child; c != parent; {
binary.LittleEndian.PutUint64(key[1:], c)
rawParent := b.Get(key[:9])
if len(rawParent) != 8 {
return false
}
c = binary.LittleEndian.Uint64(rawParent)
}
return true
}
// TreeGetByPath implements the Forest interface.
func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) {
if len(path) == 0 {
return nil, nil
}
var nodes []Node
return nodes, t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot == nil {
return ErrTreeNotFound
}
b := treeRoot.Bucket(dataBucket)
i, curNode, err := t.getPathPrefix(b, attr, path[:len(path)-1])
if err != nil {
return err
}
if i < len(path)-1 {
return nil
}
c := b.Cursor()
var (
metaKey [9]byte
id [9]byte
childID [9]byte
m Meta
maxTimestamp uint64
)
id[0] = 'c'
metaKey[0] = 'm'
binary.LittleEndian.PutUint64(id[1:], curNode)
key, _ := c.Seek(id[:])
for len(key) == 1+8+8 && bytes.Equal(id[:9], key[:9]) {
child := binary.LittleEndian.Uint64(key[9:])
copy(metaKey[1:], key[9:17])
if m.FromBytes(b.Get(metaKey[:])) == nil && string(m.GetAttr(attr)) == path[len(path)-1] {
if latest {
ts := binary.LittleEndian.Uint64(b.Get(timestampKey(childID[:], child)))
if ts >= maxTimestamp {
nodes = append(nodes[:0], child)
maxTimestamp = ts
}
} else {
nodes = append(nodes, child)
}
}
key, _ = c.Next()
}
return nil
})
}
// TreeGetMeta implements the forest interface.
func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Meta, error) {
key := metaKey(make([]byte, 9), nodeID)
var m Meta
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot == nil {
return ErrTreeNotFound
}
b := treeRoot.Bucket(dataBucket)
return m.FromBytes(b.Get(key))
})
return m, err
}
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {
var key [9]byte
c := bTree.Cursor()
var curNode Node
var m Meta
loop:
for i := range path {
key[0] = 'c'
binary.LittleEndian.PutUint64(key[1:], curNode)
childKey, _ := c.Seek(key[:])
for {
if len(childKey) != 17 || binary.LittleEndian.Uint64(childKey[1:]) != curNode {
break
}
child := binary.LittleEndian.Uint64(childKey[9:])
if err := m.FromBytes(bTree.Get(metaKey(key[:], child))); err != nil {
return 0, 0, err
}
for j := range m.Items {
if m.Items[j].Key == attr {
if string(m.Items[j].Value) == path[i] {
curNode = child
continue loop
}
break
}
}
childKey, _ = c.Next()
}
return i, curNode, nil
}
return len(path), curNode, nil
}
func (t *boltForest) logFromBytes(lm *LogMove, key []byte, data []byte) error {
r := io.NewBinReaderFromBuf(data)
lm.Child = r.ReadU64LE()
lm.Parent = r.ReadU64LE()
if err := lm.Meta.FromBytes(r.ReadVarBytes()); err != nil {
return err
}
lm.HasOld = r.ReadBool()
if lm.HasOld {
lm.Old.Parent = r.ReadU64LE()
if err := lm.Old.Meta.FromBytes(r.ReadVarBytes()); err != nil {
return err
}
}
return r.Err
}
func (t *boltForest) logToBytes(lm *LogMove) []byte {
w := io.NewBufBinWriter()
w.WriteU64LE(lm.Child)
w.WriteU64LE(lm.Parent)
w.WriteVarBytes(lm.Meta.Bytes())
w.WriteBool(lm.HasOld)
if lm.HasOld {
w.WriteU64LE(lm.Old.Parent)
w.WriteVarBytes(lm.Old.Meta.Bytes())
}
return w.Bytes()
}
func bucketName(cid cidSDK.ID, treeID string) []byte {
return []byte(cid.String() + treeID)
}
// 't' + node (id) -> timestamp when the node first appeared
func timestampKey(key []byte, child Node) []byte {
key[0] = 't'
binary.LittleEndian.PutUint64(key[1:], child)
return key[:9]
}
// 'p' + node (id) -> parent (id)
func parentKey(key []byte, child Node) []byte {
key[0] = 'p'
binary.LittleEndian.PutUint64(key[1:], child)
return key[:9]
}
// 'm' + node (id) -> serialized meta
func metaKey(key []byte, child Node) []byte {
key[0] = 'm'
binary.LittleEndian.PutUint64(key[1:], child)
return key[:9]
}
// 'c' + parent (id) + child (id) -> 0/1
func childrenKey(key []byte, child, parent Node) []byte {
key[0] = 'c'
binary.LittleEndian.PutUint64(key[1:], parent)
binary.LittleEndian.PutUint64(key[9:], child)
return key[:17]
}
func toUint64(x uint64) []byte {
var a [8]byte
binary.LittleEndian.PutUint64(a[:], x)
return a[:]
}

View file

@ -0,0 +1,114 @@
package pilorama
import (
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
)
// memoryForest represents multiple replicating trees sharing a single storage.
type memoryForest struct {
// treeMap maps tree identifier (container ID + name) to the replicated log.
treeMap map[string]*state
}
var _ Forest = (*memoryForest)(nil)
// NewMemoryForest creates new empty forest.
// TODO: this function will eventually be removed and is here for debugging.
func NewMemoryForest() ForestStorage {
return &memoryForest{
treeMap: make(map[string]*state),
}
}
// TreeMove implements the Forest interface.
func (f *memoryForest) TreeMove(cid cidSDK.ID, treeID string, op *Move) (*LogMove, error) {
fullID := cid.String() + "/" + treeID
s, ok := f.treeMap[fullID]
if !ok {
s = newState()
f.treeMap[fullID] = s
}
op.Time = s.timestamp()
if op.Child == RootID {
op.Child = s.findSpareID()
}
lm := s.do(op)
s.operations = append(s.operations, lm)
return &lm, nil
}
// TreeAddByPath implements the Forest interface.
func (f *memoryForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, m []KeyValue) ([]LogMove, error) {
fullID := cid.String() + "/" + treeID
s, ok := f.treeMap[fullID]
if !ok {
s = newState()
f.treeMap[fullID] = s
}
i, node := s.getPathPrefix(attr, path)
lm := make([]LogMove, len(path)-i+1)
for j := i; j < len(path); j++ {
lm[j-i] = s.do(&Move{
Parent: node,
Meta: Meta{Time: s.timestamp(), Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}},
Child: s.findSpareID(),
})
node = lm[j-i].Child
s.operations = append(s.operations, lm[j-i])
}
lm[len(lm)-1] = s.do(&Move{
Parent: node,
Meta: Meta{Time: s.timestamp(), Items: m},
Child: s.findSpareID(),
})
return lm, nil
}
// TreeApply implements the Forest interface.
func (f *memoryForest) TreeApply(cid cidSDK.ID, treeID string, op *Move) error {
fullID := cid.String() + "/" + treeID
s, ok := f.treeMap[fullID]
if !ok {
s = newState()
f.treeMap[fullID] = s
}
return s.Apply(op)
}
func (f *memoryForest) Init() error {
return nil
}
func (f *memoryForest) Open() error {
return nil
}
func (f *memoryForest) Close() error {
return nil
}
// TreeGetByPath implements the Forest interface.
func (f *memoryForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) {
fullID := cid.String() + "/" + treeID
s, ok := f.treeMap[fullID]
if !ok {
return nil, ErrTreeNotFound
}
return s.get(attr, path, latest), nil
}
// TreeGetMeta implements the Forest interface.
func (f *memoryForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Meta, error) {
fullID := cid.String() + "/" + treeID
s, ok := f.treeMap[fullID]
if !ok {
return Meta{}, ErrTreeNotFound
}
return s.getMeta(nodeID), nil
}

View file

@ -0,0 +1,432 @@
package pilorama
import (
"errors"
"math/rand"
"os"
"path/filepath"
"testing"
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
)
var providers = []struct {
name string
construct func(t *testing.T) Forest
}{
{"inmemory", func(t *testing.T) Forest {
f := NewMemoryForest()
require.NoError(t, f.Init())
require.NoError(t, f.Open())
t.Cleanup(func() {
require.NoError(t, f.Close())
})
return f
}},
{"bbolt", func(t *testing.T) Forest {
// Use `os.TempDir` because we construct multiple times in the same test.
tmpDir, err := os.MkdirTemp(os.TempDir(), "*")
require.NoError(t, err)
f := NewBoltForest(filepath.Join(tmpDir, "test.db"))
require.NoError(t, f.Init())
require.NoError(t, f.Open())
t.Cleanup(func() {
require.NoError(t, f.Close())
require.NoError(t, os.RemoveAll(tmpDir))
})
return f
}},
}
func testMeta(t *testing.T, f Forest, cid cidSDK.ID, treeID string, nodeID Node, expected Meta) {
actual, err := f.TreeGetMeta(cid, treeID, nodeID)
require.NoError(t, err)
require.Equal(t, expected, actual)
}
func TestForest_TreeMove(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
testForestTreeMove(t, providers[i].construct(t))
})
}
}
func testForestTreeMove(t *testing.T, s Forest) {
cid := cidtest.ID()
treeID := "version"
meta := []KeyValue{
{Key: AttributeVersion, Value: []byte("XXX")},
{Key: AttributeFilename, Value: []byte("file.txt")}}
lm, err := s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta)
require.NoError(t, err)
require.Equal(t, 3, len(lm))
nodeID := lm[2].Child
t.Run("same parent, update meta", func(t *testing.T) {
_, err = s.TreeMove(cid, treeID, &Move{
Parent: lm[1].Child,
Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})},
Child: nodeID,
})
require.NoError(t, err)
nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, false)
require.NoError(t, err)
require.ElementsMatch(t, []Node{nodeID}, nodes)
})
t.Run("different parent", func(t *testing.T) {
_, err = s.TreeMove(cid, treeID, &Move{
Parent: RootID,
Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})},
Child: nodeID,
})
require.NoError(t, err)
nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, false)
require.NoError(t, err)
require.True(t, len(nodes) == 0)
nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"file.txt"}, false)
require.NoError(t, err)
require.ElementsMatch(t, []Node{nodeID}, nodes)
})
}
func TestForest_TreeAdd(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
testForestTreeAdd(t, providers[i].construct(t))
})
}
}
func testForestTreeAdd(t *testing.T, s Forest) {
cid := cidtest.ID()
treeID := "version"
meta := []KeyValue{
{Key: AttributeVersion, Value: []byte("XXX")},
{Key: AttributeFilename, Value: []byte("file.txt")}}
lm, err := s.TreeMove(cid, treeID, &Move{
Parent: RootID,
Child: RootID,
Meta: Meta{Items: meta},
})
require.NoError(t, err)
testMeta(t, s, cid, treeID, lm.Child, Meta{Time: lm.Time, Items: meta})
nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"file.txt"}, false)
require.NoError(t, err)
require.ElementsMatch(t, []Node{lm.Child}, nodes)
t.Run("other trees are unaffected", func(t *testing.T) {
_, err := s.TreeGetByPath(cid, treeID+"123", AttributeFilename, []string{"file.txt"}, false)
require.True(t, errors.Is(err, ErrTreeNotFound), "got: %v", err)
_, err = s.TreeGetMeta(cid, treeID+"123", 0)
require.True(t, errors.Is(err, ErrTreeNotFound), "got: %v", err)
})
}
func TestForest_TreeAddByPath(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
testForestTreeAddByPath(t, providers[i].construct(t))
})
}
}
func testForestTreeAddByPath(t *testing.T, s Forest) {
cid := cidtest.ID()
treeID := "version"
meta := []KeyValue{
{Key: AttributeVersion, Value: []byte("XXX")},
{Key: AttributeFilename, Value: []byte("file.txt")}}
lm, err := s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta)
require.NoError(t, err)
require.Equal(t, 3, len(lm))
testMeta(t, s, cid, treeID, lm[0].Child, Meta{Time: lm[0].Time, Items: []KeyValue{{AttributeFilename, []byte("path")}}})
testMeta(t, s, cid, treeID, lm[1].Child, Meta{Time: lm[1].Time, Items: []KeyValue{{AttributeFilename, []byte("to")}}})
firstID := lm[2].Child
testMeta(t, s, cid, treeID, firstID, Meta{Time: lm[2].Time, Items: meta})
meta[0].Value = []byte("YYY")
lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta)
require.NoError(t, err)
require.Equal(t, 1, len(lm))
secondID := lm[0].Child
testMeta(t, s, cid, treeID, secondID, Meta{Time: lm[0].Time, Items: meta})
t.Run("get versions", func(t *testing.T) {
// All versions.
nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, false)
require.NoError(t, err)
require.ElementsMatch(t, []Node{firstID, secondID}, nodes)
// Latest version.
nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, true)
require.NoError(t, err)
require.Equal(t, []Node{secondID}, nodes)
})
meta[0].Value = []byte("ZZZ")
meta[1].Value = []byte("cat.jpg")
lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "dir"}, meta)
require.NoError(t, err)
require.Equal(t, 2, len(lm))
testMeta(t, s, cid, treeID, lm[0].Child, Meta{Time: lm[0].Time, Items: []KeyValue{{AttributeFilename, []byte("dir")}}})
testMeta(t, s, cid, treeID, lm[1].Child, Meta{Time: lm[1].Time, Items: meta})
}
func TestForest_Apply(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
testForestTreeApply(t, providers[i].construct)
})
}
}
func testForestTreeApply(t *testing.T, constructor func(t *testing.T) Forest) {
cid := cidtest.ID()
treeID := "version"
testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) {
require.NoError(t, s.TreeApply(cid, treeID, &Move{
Child: child,
Parent: parent,
Meta: meta,
}))
}
t.Run("add a child, then insert a parent removal", func(t *testing.T) {
s := constructor(t)
testApply(t, s, 10, 0, Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}})
meta := Meta{Time: 3, Items: []KeyValue{{"child", []byte{3}}}}
testApply(t, s, 11, 10, meta)
testMeta(t, s, cid, treeID, 11, meta)
testApply(t, s, 10, TrashID, Meta{Time: 2, Items: []KeyValue{{"parent", []byte{2}}}})
testMeta(t, s, cid, treeID, 11, Meta{})
})
t.Run("add a child to non-existent parent, then add a parent", func(t *testing.T) {
s := constructor(t)
testApply(t, s, 11, 10, Meta{Time: 1, Items: []KeyValue{{"child", []byte{3}}}})
testMeta(t, s, cid, treeID, 11, Meta{})
testApply(t, s, 10, 0, Meta{Time: 2, Items: []KeyValue{{"grand", []byte{1}}}})
testMeta(t, s, cid, treeID, 11, Meta{})
})
}
func TestForest_ApplyRandom(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
testForestTreeApplyRandom(t, providers[i].construct)
})
}
}
func testForestTreeApplyRandom(t *testing.T, constructor func(t *testing.T) Forest) {
rand.Seed(42)
const (
nodeCount = 4
opCount = 10
iterCount = 100
)
cid := cidtest.ID()
treeID := "version"
expected := constructor(t)
ops := make([]Move, nodeCount+opCount)
for i := 0; i < nodeCount; i++ {
ops[i] = Move{
Parent: 0,
Meta: Meta{
Time: Timestamp(i),
Items: []KeyValue{{Value: make([]byte, 10)}},
},
Child: uint64(i) + 1,
}
rand.Read(ops[i].Meta.Items[0].Value)
}
for i := nodeCount; i < len(ops); i++ {
ops[i] = Move{
Parent: rand.Uint64() % (nodeCount + 1),
Meta: Meta{
Time: Timestamp(i + nodeCount),
Items: []KeyValue{{Value: make([]byte, 10)}},
},
Child: rand.Uint64() % (nodeCount + 1),
}
rand.Read(ops[i].Meta.Items[0].Value)
}
for i := range ops {
require.NoError(t, expected.TreeApply(cid, treeID, &ops[i]))
}
for i := 0; i < iterCount; i++ {
// Shuffle random operations, leave initialization in place.
rand.Shuffle(len(ops)-nodeCount, func(i, j int) { ops[i+nodeCount], ops[j+nodeCount] = ops[j+nodeCount], ops[i+nodeCount] })
actual := constructor(t)
for i := range ops {
require.NoError(t, actual.TreeApply(cid, treeID, &ops[i]))
}
for i := uint64(0); i < nodeCount; i++ {
expectedMeta, err := expected.TreeGetMeta(cid, treeID, i)
require.NoError(t, err)
actualMeta, err := actual.TreeGetMeta(cid, treeID, i)
require.NoError(t, err)
require.Equal(t, expectedMeta, actualMeta, "node id: %d", i)
if _, ok := actual.(*memoryForest); ok {
require.Equal(t, expected, actual, i)
}
}
}
}
const benchNodeCount = 1000
func BenchmarkApplySequential(b *testing.B) {
benchmarkApply(b, benchNodeCount, func(nodeCount, opCount int) []Move {
ops := make([]Move, opCount)
for i := range ops {
ops[i] = Move{
Parent: uint64(rand.Intn(nodeCount)),
Meta: Meta{
Time: Timestamp(i),
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
},
Child: uint64(rand.Intn(nodeCount)),
}
}
return ops
})
}
func BenchmarkApplyReorderLast(b *testing.B) {
// Group operations in a blocks of 10, order blocks in increasing timestamp order,
// and operations in a single block in reverse.
const blockSize = 10
benchmarkApply(b, benchNodeCount, func(nodeCount, opCount int) []Move {
ops := make([]Move, opCount)
for i := range ops {
ops[i] = Move{
Parent: uint64(rand.Intn(nodeCount)),
Meta: Meta{
Time: Timestamp(i),
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
},
Child: uint64(rand.Intn(nodeCount)),
}
if i != 0 && i%blockSize == 0 {
for j := 0; j < blockSize/2; j++ {
ops[i-j], ops[i+j-blockSize] = ops[i+j-blockSize], ops[i-j]
}
}
}
return ops
})
}
func benchmarkApply(b *testing.B, n int, genFunc func(int, int) []Move) {
rand.Seed(42)
s := newState()
ops := genFunc(n, b.N)
b.ResetTimer()
b.ReportAllocs()
for i := range ops {
if err := s.Apply(&ops[i]); err != nil {
b.Fatalf("error in `Apply`: %v", err)
}
}
}
func TestTreeGetByPath(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
testTreeGetByPath(t, providers[i].construct(t))
})
}
}
func testTreeGetByPath(t *testing.T, s Forest) {
cid := cidtest.ID()
treeID := "version"
// /
// |- a (1)
// |- cat1.jpg, Version=TTT (3)
// |- b (2)
// |- cat1.jpg, Version=XXX (4)
// |- cat1.jpg, Version=YYY (5)
// |- cat2.jpg, Version=ZZZ (6)
testMove(t, s, 0, 1, 0, cid, treeID, "a", "")
testMove(t, s, 1, 2, 0, cid, treeID, "b", "")
testMove(t, s, 2, 3, 1, cid, treeID, "cat1.jpg", "TTT")
testMove(t, s, 3, 4, 2, cid, treeID, "cat1.jpg", "XXX")
testMove(t, s, 4, 5, 2, cid, treeID, "cat1.jpg", "YYY")
testMove(t, s, 5, 6, 2, cid, treeID, "cat2.jpg", "ZZZ")
if mf, ok := s.(*memoryForest); ok {
single := mf.treeMap[cid.String()+"/"+treeID]
t.Run("test meta", func(t *testing.T) {
for i := 0; i < 6; i++ {
require.Equal(t, uint64(i), single.infoMap[Node(i+1)].Timestamp)
}
})
}
nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"b", "cat1.jpg"}, false)
require.NoError(t, err)
require.Equal(t, []Node{4, 5}, nodes)
nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"a", "cat1.jpg"}, false)
require.Equal(t, []Node{3}, nodes)
t.Run("missing child", func(t *testing.T) {
nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"a", "cat3.jpg"}, false)
require.True(t, len(nodes) == 0)
})
t.Run("missing parent", func(t *testing.T) {
nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"xyz", "cat1.jpg"}, false)
require.True(t, len(nodes) == 0)
})
t.Run("empty path", func(t *testing.T) {
nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, nil, false)
require.True(t, len(nodes) == 0)
})
}
func testMove(t *testing.T, s Forest, ts int, node, parent Node, cid cidSDK.ID, treeID, filename, version string) {
require.NoError(t, s.TreeApply(cid, treeID, &Move{
Parent: parent,
Child: node,
Meta: Meta{
Time: uint64(ts),
Items: []KeyValue{
{AttributeFilename, []byte(filename)},
{AttributeVersion, []byte(version)},
},
},
}))
}

View file

@ -0,0 +1,235 @@
package pilorama
// nodeInfo couples parent and metadata.
type nodeInfo struct {
Parent Node
Meta Meta
Timestamp Timestamp
}
// state represents state being replicated.
type state struct {
operations []LogMove
tree
}
// newState constructs new empty tree.
func newState() *state {
return &state{
tree: *newTree(),
}
}
// undo un-does op and changes s in-place.
func (s *state) undo(op *LogMove) {
children := s.tree.childMap[op.Parent]
for i := range children {
if children[i] == op.Child {
if len(children) > 1 {
s.tree.childMap[op.Parent] = append(children[:i], children[i+1:]...)
} else {
delete(s.tree.childMap, op.Parent)
}
break
}
}
if op.HasOld {
s.tree.infoMap[op.Child] = op.Old
oldChildren := s.tree.childMap[op.Old.Parent]
for i := range oldChildren {
if oldChildren[i] == op.Child {
return
}
}
s.tree.childMap[op.Old.Parent] = append(oldChildren, op.Child)
} else {
delete(s.tree.infoMap, op.Child)
}
}
// Apply puts op in log at a proper position, re-applies all subsequent operations
// from log and changes s in-place.
func (s *state) Apply(op *Move) error {
var index int
for index = len(s.operations); index > 0; index-- {
if s.operations[index-1].Time <= op.Time {
break
}
}
if index == len(s.operations) {
s.operations = append(s.operations, s.do(op))
return nil
}
s.operations = append(s.operations[:index+1], s.operations[index:]...)
for i := len(s.operations) - 1; i > index; i-- {
s.undo(&s.operations[i])
}
s.operations[index] = s.do(op)
for i := index + 1; i < len(s.operations); i++ {
s.operations[i] = s.do(&s.operations[i].Move)
}
return nil
}
// do performs a single move operation on a tree.
func (s *state) do(op *Move) LogMove {
lm := LogMove{
Move: Move{
Parent: op.Parent,
Meta: op.Meta,
Child: op.Child,
},
}
_, parentInTree := s.tree.infoMap[op.Parent]
shouldPut := !s.tree.isAncestor(op.Child, op.Parent) &&
!(op.Parent != 0 && op.Parent != TrashID && !parentInTree)
shouldRemove := op.Parent == TrashID
p, ok := s.tree.infoMap[op.Child]
if ok {
lm.HasOld = true
lm.Old = p
}
if !shouldPut {
return lm
}
if shouldRemove {
if ok {
s.removeChild(op.Child, p.Parent)
}
delete(s.tree.infoMap, op.Child)
return lm
}
if !ok {
p.Timestamp = op.Time
} else {
s.removeChild(op.Child, p.Parent)
}
p.Meta = op.Meta
p.Parent = op.Parent
s.tree.infoMap[op.Child] = p
s.tree.childMap[op.Parent] = append(s.tree.childMap[op.Parent], op.Child)
return lm
}
func (s *state) removeChild(child, parent Node) {
oldChildren := s.tree.childMap[parent]
for i := range oldChildren {
if oldChildren[i] == child {
s.tree.childMap[parent] = append(oldChildren[:i], oldChildren[i+1:]...)
break
}
}
}
func (s *state) timestamp() Timestamp {
if len(s.operations) == 0 {
return 0
}
return s.operations[len(s.operations)-1].Time + 1
}
func (s *state) findSpareID() Node {
id := uint64(1)
for _, ok := s.infoMap[id]; ok; _, ok = s.infoMap[id] {
id++
}
return id
}
// tree is a mapping from the child nodes to their parent and metadata.
type tree struct {
infoMap map[Node]nodeInfo
childMap map[Node][]Node
}
func newTree() *tree {
return &tree{
childMap: make(map[Node][]Node),
infoMap: make(map[Node]nodeInfo),
}
}
// isAncestor returns true if parent is an ancestor of a child.
// For convenience, also return true if parent == child.
func (t tree) isAncestor(parent, child Node) bool {
for c := child; c != parent; {
p, ok := t.infoMap[c]
if !ok {
return false
}
c = p.Parent
}
return true
}
// getPathPrefix descends by path constructed from values of attr until
// there is no node corresponding to a path element. Returns the amount of nodes
// processed and ID of the last node.
func (t tree) getPathPrefix(attr string, path []string) (int, Node) {
var curNode Node
loop:
for i := range path {
children := t.childMap[curNode]
for j := range children {
f := t.infoMap[children[j]].Meta.GetAttr(attr)
if string(f) == path[i] {
curNode = children[j]
continue loop
}
}
return i, curNode
}
return len(path), curNode
}
// get returns list of nodes which have the specified path from root
// descending by values of attr from meta.
func (t tree) get(attr string, path []string, latest bool) []Node {
if len(path) == 0 {
return nil
}
i, curNode := t.getPathPrefix(attr, path[:len(path)-1])
if i < len(path)-1 {
return nil
}
var nodes []Node
var lastTs Timestamp
children := t.childMap[curNode]
for i := range children {
info := t.infoMap[children[i]]
fileName := string(info.Meta.GetAttr(attr))
if fileName == path[len(path)-1] {
if latest {
if info.Timestamp >= lastTs {
nodes = append(nodes[:0], children[i])
}
} else {
nodes = append(nodes, children[i])
}
}
}
return nodes
}
// getMeta returns meta information of node n.
func (t tree) getMeta(n Node) Meta {
return t.infoMap[n].Meta
}

View file

@ -0,0 +1,35 @@
package pilorama
import cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
// Forest represents CRDT tree.
type Forest interface {
// TreeMove moves node in the tree.
// If the parent of the move operation is TrashID, the node is removed.
// If the child of the move operation is RootID, new ID is generated and added to a tree.
TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error)
// TreeAddByPath adds new node in the tree using provided path.
// The path is constructed by descending from the root using the values of the attr in meta.
TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error)
// TreeApply applies replicated operation from another node.
TreeApply(cid cidSDK.ID, treeID string, m *Move) error
// TreeGetByPath returns all nodes corresponding to the path.
// The path is constructed by descending from the root using the values of the
// AttributeFilename in meta.
// The last argument determines whether only the node with the latest timestamp is returned.
TreeGetByPath(cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error)
// TreeGetMeta returns meta information of the node with the specified ID.
TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Meta, error)
}
type ForestStorage interface {
Init() error
Open() error
Close() error
Forest
}
const (
AttributeFilename = "FileName"
AttributeVersion = "Version"
)

View file

@ -0,0 +1,48 @@
package pilorama
import "github.com/nspcc-dev/neo-go/pkg/io"
func (x *Meta) FromBytes(data []byte) error {
if len(data) == 0 {
x.Items = nil
x.Time = 0
return nil
}
r := io.NewBinReaderFromBuf(data)
ts := r.ReadVarUint()
size := r.ReadVarUint()
m := make([]KeyValue, size)
for i := range m {
m[i].Key = r.ReadString()
m[i].Value = r.ReadVarBytes()
}
if r.Err != nil {
return r.Err
}
x.Time = ts
x.Items = m
return nil
}
func (x Meta) Bytes() []byte {
w := io.NewBufBinWriter()
w.WriteVarUint(x.Time)
w.WriteVarUint(uint64(len(x.Items)))
for _, e := range x.Items {
w.WriteString(e.Key)
w.WriteVarBytes(e.Value)
}
return w.Bytes()
}
func (x Meta) GetAttr(name string) []byte {
for _, kv := range x.Items {
if kv.Key == name {
return kv.Value
}
}
return nil
}

View file

@ -0,0 +1,54 @@
package pilorama
import (
"math/rand"
"testing"
"github.com/stretchr/testify/require"
)
func TestMeta_Bytes(t *testing.T) {
t.Run("empty", func(t *testing.T) {
var m Meta
require.NoError(t, m.FromBytes(nil))
require.True(t, len(m.Items) == 0)
require.Equal(t, uint64(0), m.Time)
require.Equal(t, []byte{0, 0}, m.Bytes())
})
t.Run("filled", func(t *testing.T) {
expected := Meta{
Time: 123,
Items: []KeyValue{
{"abc", []byte{1, 2, 3}},
{"xyz", []byte{5, 6, 7, 8}},
}}
data := expected.Bytes()
var actual Meta
require.NoError(t, actual.FromBytes(data))
require.Equal(t, expected, actual)
t.Run("error", func(t *testing.T) {
require.Error(t, new(Meta).FromBytes(data[:len(data)/2]))
})
})
}
func TestMeta_GetAttr(t *testing.T) {
attr := [][]byte{
make([]byte, 5),
make([]byte, 10),
}
for i := range attr {
rand.Read(attr[i])
}
m := Meta{Items: []KeyValue{{"abc", attr[0]}, {"xyz", attr[1]}}}
require.Equal(t, attr[0], m.GetAttr("abc"))
require.Equal(t, attr[1], m.GetAttr("xyz"))
require.Nil(t, m.GetAttr("a"))
require.Nil(t, m.GetAttr("xyza"))
require.Nil(t, m.GetAttr(""))
}

View file

@ -0,0 +1,52 @@
package pilorama
import (
"errors"
"math"
)
// Timestamp is an alias for integer timestamp type.
// TODO: remove after the debugging.
type Timestamp = uint64
// Node is used to represent nodes.
// TODO: remove after the debugging.
type Node = uint64
// Meta represents arbitrary meta information.
// TODO: remove after the debugging or create a proper interface.
type Meta struct {
Time Timestamp
Items []KeyValue
}
// KeyValue represents a key-value pair.
type KeyValue struct {
Key string
Value []byte
}
// Move represents a single move operation.
type Move struct {
Parent Node
Meta
// Child represents the ID of a node being moved. If zero, new ID is generated.
Child Node
}
// LogMove represents log record for a single move operation.
type LogMove struct {
Move
HasOld bool
Old nodeInfo
}
const (
// RootID represents the ID of a root node.
RootID = 0
// TrashID is a parent for all removed nodes.
TrashID = math.MaxUint64
)
// ErrTreeNotFound is returned when the requested tree is not found.
var ErrTreeNotFound = errors.New("tree not found")