[#1324] local_object_storage: Implement tree service backend
In this commit we implement algorithm for CRDT trees from https://martin.klepmann.com/papers/move-op.pdf Each tree is identified by the ID of a container it belongs to and the tree name itself. Essentially, it is a sequence of operations which should be applied in chronological order to get a usual tree representation. There are 2 backends for now: bbolt database and in-memory. In-memory backend is here for debugging and will eventually act as a memory-cache for the on-disk database. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
198beae703
commit
8cf71b7f1c
8 changed files with 1499 additions and 0 deletions
529
pkg/local_object_storage/pilorama/boltdb.go
Normal file
529
pkg/local_object_storage/pilorama/boltdb.go
Normal file
|
@ -0,0 +1,529 @@
|
|||
package pilorama
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type boltForest struct {
|
||||
path string
|
||||
db *bbolt.DB
|
||||
}
|
||||
|
||||
var (
|
||||
dataBucket = []byte{0}
|
||||
logBucket = []byte{1}
|
||||
)
|
||||
|
||||
// NewBoltForest returns storage wrapper for storing operations on CRDT trees.
|
||||
//
|
||||
// Each tree is stored in a separate bucket by `CID + treeID` key.
|
||||
// All integers are stored in little-endian unless explicitly specified otherwise.
|
||||
//
|
||||
// DB schema (for a single tree):
|
||||
// timestamp is 8-byte, id is 4-byte.
|
||||
//
|
||||
// log storage (logBucket):
|
||||
// timestamp in big-endian -> log operation
|
||||
//
|
||||
// tree storage (dataBucket):
|
||||
// 't' + node (id) -> timestamp when the node first appeared
|
||||
// 'p' + node (id) -> parent (id)
|
||||
// 'm' + node (id) -> serialized meta
|
||||
// 'c' + parent (id) + child (id) -> 0/1
|
||||
func NewBoltForest(path string) ForestStorage {
|
||||
return &boltForest{path: path}
|
||||
}
|
||||
|
||||
func (t *boltForest) Init() error { return nil }
|
||||
func (t *boltForest) Open() error {
|
||||
if err := os.MkdirAll(filepath.Dir(t.path), os.ModePerm); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
db, err := bbolt.Open(t.path, os.ModePerm, bbolt.DefaultOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.db = db
|
||||
|
||||
return db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(dataBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = tx.CreateBucketIfNotExists(logBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
func (t *boltForest) Close() error { return t.db.Close() }
|
||||
|
||||
// TreeMove implements the Forest interface.
|
||||
func (t *boltForest) TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error) {
|
||||
var lm *LogMove
|
||||
return lm, t.db.Update(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.Time = t.getLatestTimestamp(bLog)
|
||||
if m.Child == RootID {
|
||||
m.Child = t.findSpareID(bTree)
|
||||
}
|
||||
lm, err = t.applyOperation(bLog, bTree, m)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// TreeAddByPath implements the Forest interface.
|
||||
func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) {
|
||||
var lm []LogMove
|
||||
var key [17]byte
|
||||
|
||||
err := t.db.Update(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
i, node, err := t.getPathPrefix(bTree, attr, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lm = make([]LogMove, len(path)-i+1)
|
||||
for j := i; j < len(path); j++ {
|
||||
lm[j-i].Move = Move{
|
||||
Parent: node,
|
||||
Meta: Meta{
|
||||
Time: t.getLatestTimestamp(bLog),
|
||||
Items: []KeyValue{{Key: attr, Value: []byte(path[j])}},
|
||||
},
|
||||
Child: t.findSpareID(bTree),
|
||||
}
|
||||
|
||||
err := t.do(bLog, bTree, key[:], &lm[j-i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
node = lm[j-i].Child
|
||||
}
|
||||
|
||||
lm[len(lm)-1].Move = Move{
|
||||
Parent: node,
|
||||
Meta: Meta{
|
||||
Time: t.getLatestTimestamp(bLog),
|
||||
Items: meta,
|
||||
},
|
||||
Child: t.findSpareID(bTree),
|
||||
}
|
||||
return t.do(bLog, bTree, key[:], &lm[len(lm)-1])
|
||||
})
|
||||
return lm, err
|
||||
}
|
||||
|
||||
// getLatestTimestamp returns timestamp for a new operation which is guaranteed to be bigger than
|
||||
// all timestamps corresponding to already stored operations.
|
||||
// FIXME timestamp should be based on a node position in the container.
|
||||
func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket) uint64 {
|
||||
c := bLog.Cursor()
|
||||
key, _ := c.Last()
|
||||
if len(key) == 0 {
|
||||
return 1
|
||||
}
|
||||
return binary.BigEndian.Uint64(key) + 1
|
||||
}
|
||||
|
||||
// findSpareID returns random unused ID.
|
||||
func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
|
||||
id := uint64(rand.Int63())
|
||||
|
||||
var key [9]byte
|
||||
key[0] = 't'
|
||||
binary.LittleEndian.PutUint64(key[1:], id)
|
||||
|
||||
for {
|
||||
if bTree.Get(key[:]) == nil {
|
||||
return id
|
||||
}
|
||||
id = uint64(rand.Int63())
|
||||
binary.LittleEndian.PutUint64(key[1:], id)
|
||||
}
|
||||
}
|
||||
|
||||
// TreeApply implements the Forest interface.
|
||||
func (t *boltForest) TreeApply(cid cidSDK.ID, treeID string, m *Move) error {
|
||||
return t.db.Update(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = t.applyOperation(bLog, bTree, m)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string) (*bbolt.Bucket, *bbolt.Bucket, error) {
|
||||
treeRoot := bucketName(cid, treeID)
|
||||
child, err := tx.CreateBucket(treeRoot)
|
||||
if err != nil && err != bbolt.ErrBucketExists {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var bLog, bData *bbolt.Bucket
|
||||
if err == nil {
|
||||
if bLog, err = child.CreateBucket(logBucket); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if bData, err = child.CreateBucket(dataBucket); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
} else {
|
||||
child = tx.Bucket(treeRoot)
|
||||
bLog = child.Bucket(logBucket)
|
||||
bData = child.Bucket(dataBucket)
|
||||
}
|
||||
|
||||
return bLog, bData, nil
|
||||
}
|
||||
|
||||
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, m *Move) (*LogMove, error) {
|
||||
var lm LogMove
|
||||
var tmp LogMove
|
||||
var cKey [17]byte
|
||||
|
||||
c := logBucket.Cursor()
|
||||
|
||||
key, value := c.Last()
|
||||
|
||||
// 1. Undo up until the desired timestamp is here.
|
||||
for len(key) == 8 && binary.BigEndian.Uint64(key) > m.Time {
|
||||
if err := t.logFromBytes(&tmp, key, value); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := t.undo(&tmp.Move, &tmp, treeBucket, cKey[:]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
key, value = c.Prev()
|
||||
}
|
||||
|
||||
// 2. Insert the operation.
|
||||
if len(key) != 8 || binary.BigEndian.Uint64(key) != m.Time {
|
||||
lm.Move = *m
|
||||
if err := t.do(logBucket, treeBucket, cKey[:], &lm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
key, value = c.Next()
|
||||
|
||||
// 3. Re-apply all other operations.
|
||||
for len(key) == 8 {
|
||||
if err := t.logFromBytes(&tmp, key, value); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
key, value = c.Next()
|
||||
}
|
||||
|
||||
return &lm, nil
|
||||
}
|
||||
|
||||
func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMove) error {
|
||||
shouldPut := !t.isAncestor(b, key, op.Child, op.Parent) &&
|
||||
!(op.Parent != 0 && op.Parent != TrashID && b.Get(timestampKey(key, op.Parent)) == nil)
|
||||
shouldRemove := op.Parent == TrashID
|
||||
|
||||
currParent := b.Get(parentKey(key, op.Child))
|
||||
if currParent != nil { // node is already in tree
|
||||
op.HasOld = true
|
||||
op.Old.Parent = binary.LittleEndian.Uint64(currParent)
|
||||
if err := op.Old.Meta.FromBytes(b.Get(metaKey(key, op.Child))); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
binary.BigEndian.PutUint64(key, op.Time)
|
||||
if err := lb.Put(key[:8], t.logToBytes(op)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !shouldPut {
|
||||
return nil
|
||||
}
|
||||
|
||||
if shouldRemove {
|
||||
if currParent != nil {
|
||||
p := binary.LittleEndian.Uint64(currParent)
|
||||
if err := b.Delete(childrenKey(key, op.Child, p)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return t.removeNode(b, key, op.Child)
|
||||
}
|
||||
|
||||
if currParent == nil {
|
||||
if err := b.Put(timestampKey(key, op.Child), toUint64(op.Time)); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := b.Delete(childrenKey(key, op.Child, binary.LittleEndian.Uint64(currParent))); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return t.addNode(b, key, op.Child, op.Parent, op.Meta)
|
||||
}
|
||||
|
||||
// removeNode removes node keys from the tree except the children key or its parent.
|
||||
func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node Node) error {
|
||||
if err := b.Delete(parentKey(key, node)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.Delete(metaKey(key, node)); err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Delete(timestampKey(key, node))
|
||||
}
|
||||
|
||||
// addNode adds node keys to the tree except the timestamp key.
|
||||
func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, meta Meta) error {
|
||||
err := b.Put(parentKey(key, child), toUint64(parent))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = b.Put(childrenKey(key, child, parent), []byte{1})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Put(metaKey(key, child), meta.Bytes())
|
||||
}
|
||||
|
||||
func (t *boltForest) undo(m *Move, lm *LogMove, b *bbolt.Bucket, key []byte) error {
|
||||
if err := b.Delete(childrenKey(key, m.Child, m.Parent)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !lm.HasOld {
|
||||
return t.removeNode(b, key, m.Child)
|
||||
}
|
||||
return t.addNode(b, key, m.Child, lm.Old.Parent, lm.Old.Meta)
|
||||
}
|
||||
|
||||
func (t *boltForest) isAncestor(b *bbolt.Bucket, key []byte, parent, child Node) bool {
|
||||
key[0] = 'p'
|
||||
for c := child; c != parent; {
|
||||
binary.LittleEndian.PutUint64(key[1:], c)
|
||||
rawParent := b.Get(key[:9])
|
||||
if len(rawParent) != 8 {
|
||||
return false
|
||||
}
|
||||
c = binary.LittleEndian.Uint64(rawParent)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// TreeGetByPath implements the Forest interface.
|
||||
func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) {
|
||||
if len(path) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var nodes []Node
|
||||
|
||||
return nodes, t.db.View(func(tx *bbolt.Tx) error {
|
||||
treeRoot := tx.Bucket(bucketName(cid, treeID))
|
||||
if treeRoot == nil {
|
||||
return ErrTreeNotFound
|
||||
}
|
||||
|
||||
b := treeRoot.Bucket(dataBucket)
|
||||
|
||||
i, curNode, err := t.getPathPrefix(b, attr, path[:len(path)-1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if i < len(path)-1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
c := b.Cursor()
|
||||
|
||||
var (
|
||||
metaKey [9]byte
|
||||
id [9]byte
|
||||
childID [9]byte
|
||||
m Meta
|
||||
maxTimestamp uint64
|
||||
)
|
||||
|
||||
id[0] = 'c'
|
||||
metaKey[0] = 'm'
|
||||
|
||||
binary.LittleEndian.PutUint64(id[1:], curNode)
|
||||
|
||||
key, _ := c.Seek(id[:])
|
||||
for len(key) == 1+8+8 && bytes.Equal(id[:9], key[:9]) {
|
||||
child := binary.LittleEndian.Uint64(key[9:])
|
||||
copy(metaKey[1:], key[9:17])
|
||||
|
||||
if m.FromBytes(b.Get(metaKey[:])) == nil && string(m.GetAttr(attr)) == path[len(path)-1] {
|
||||
if latest {
|
||||
ts := binary.LittleEndian.Uint64(b.Get(timestampKey(childID[:], child)))
|
||||
if ts >= maxTimestamp {
|
||||
nodes = append(nodes[:0], child)
|
||||
maxTimestamp = ts
|
||||
}
|
||||
} else {
|
||||
nodes = append(nodes, child)
|
||||
}
|
||||
}
|
||||
key, _ = c.Next()
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// TreeGetMeta implements the forest interface.
|
||||
func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Meta, error) {
|
||||
key := metaKey(make([]byte, 9), nodeID)
|
||||
|
||||
var m Meta
|
||||
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||
treeRoot := tx.Bucket(bucketName(cid, treeID))
|
||||
if treeRoot == nil {
|
||||
return ErrTreeNotFound
|
||||
}
|
||||
|
||||
b := treeRoot.Bucket(dataBucket)
|
||||
return m.FromBytes(b.Get(key))
|
||||
})
|
||||
|
||||
return m, err
|
||||
}
|
||||
|
||||
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {
|
||||
var key [9]byte
|
||||
|
||||
c := bTree.Cursor()
|
||||
|
||||
var curNode Node
|
||||
var m Meta
|
||||
|
||||
loop:
|
||||
for i := range path {
|
||||
key[0] = 'c'
|
||||
binary.LittleEndian.PutUint64(key[1:], curNode)
|
||||
|
||||
childKey, _ := c.Seek(key[:])
|
||||
for {
|
||||
if len(childKey) != 17 || binary.LittleEndian.Uint64(childKey[1:]) != curNode {
|
||||
break
|
||||
}
|
||||
|
||||
child := binary.LittleEndian.Uint64(childKey[9:])
|
||||
if err := m.FromBytes(bTree.Get(metaKey(key[:], child))); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
for j := range m.Items {
|
||||
if m.Items[j].Key == attr {
|
||||
if string(m.Items[j].Value) == path[i] {
|
||||
curNode = child
|
||||
continue loop
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
childKey, _ = c.Next()
|
||||
}
|
||||
return i, curNode, nil
|
||||
}
|
||||
|
||||
return len(path), curNode, nil
|
||||
}
|
||||
|
||||
func (t *boltForest) logFromBytes(lm *LogMove, key []byte, data []byte) error {
|
||||
r := io.NewBinReaderFromBuf(data)
|
||||
lm.Child = r.ReadU64LE()
|
||||
lm.Parent = r.ReadU64LE()
|
||||
if err := lm.Meta.FromBytes(r.ReadVarBytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lm.HasOld = r.ReadBool()
|
||||
if lm.HasOld {
|
||||
lm.Old.Parent = r.ReadU64LE()
|
||||
if err := lm.Old.Meta.FromBytes(r.ReadVarBytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return r.Err
|
||||
}
|
||||
|
||||
func (t *boltForest) logToBytes(lm *LogMove) []byte {
|
||||
w := io.NewBufBinWriter()
|
||||
w.WriteU64LE(lm.Child)
|
||||
w.WriteU64LE(lm.Parent)
|
||||
w.WriteVarBytes(lm.Meta.Bytes())
|
||||
w.WriteBool(lm.HasOld)
|
||||
if lm.HasOld {
|
||||
w.WriteU64LE(lm.Old.Parent)
|
||||
w.WriteVarBytes(lm.Old.Meta.Bytes())
|
||||
}
|
||||
return w.Bytes()
|
||||
}
|
||||
|
||||
func bucketName(cid cidSDK.ID, treeID string) []byte {
|
||||
return []byte(cid.String() + treeID)
|
||||
}
|
||||
|
||||
// 't' + node (id) -> timestamp when the node first appeared
|
||||
func timestampKey(key []byte, child Node) []byte {
|
||||
key[0] = 't'
|
||||
binary.LittleEndian.PutUint64(key[1:], child)
|
||||
return key[:9]
|
||||
}
|
||||
|
||||
// 'p' + node (id) -> parent (id)
|
||||
func parentKey(key []byte, child Node) []byte {
|
||||
key[0] = 'p'
|
||||
binary.LittleEndian.PutUint64(key[1:], child)
|
||||
return key[:9]
|
||||
}
|
||||
|
||||
// 'm' + node (id) -> serialized meta
|
||||
func metaKey(key []byte, child Node) []byte {
|
||||
key[0] = 'm'
|
||||
binary.LittleEndian.PutUint64(key[1:], child)
|
||||
return key[:9]
|
||||
}
|
||||
|
||||
// 'c' + parent (id) + child (id) -> 0/1
|
||||
func childrenKey(key []byte, child, parent Node) []byte {
|
||||
key[0] = 'c'
|
||||
binary.LittleEndian.PutUint64(key[1:], parent)
|
||||
binary.LittleEndian.PutUint64(key[9:], child)
|
||||
return key[:17]
|
||||
}
|
||||
|
||||
func toUint64(x uint64) []byte {
|
||||
var a [8]byte
|
||||
binary.LittleEndian.PutUint64(a[:], x)
|
||||
return a[:]
|
||||
}
|
114
pkg/local_object_storage/pilorama/forest.go
Normal file
114
pkg/local_object_storage/pilorama/forest.go
Normal file
|
@ -0,0 +1,114 @@
|
|||
package pilorama
|
||||
|
||||
import (
|
||||
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
)
|
||||
|
||||
// memoryForest represents multiple replicating trees sharing a single storage.
|
||||
type memoryForest struct {
|
||||
// treeMap maps tree identifier (container ID + name) to the replicated log.
|
||||
treeMap map[string]*state
|
||||
}
|
||||
|
||||
var _ Forest = (*memoryForest)(nil)
|
||||
|
||||
// NewMemoryForest creates new empty forest.
|
||||
// TODO: this function will eventually be removed and is here for debugging.
|
||||
func NewMemoryForest() ForestStorage {
|
||||
return &memoryForest{
|
||||
treeMap: make(map[string]*state),
|
||||
}
|
||||
}
|
||||
|
||||
// TreeMove implements the Forest interface.
|
||||
func (f *memoryForest) TreeMove(cid cidSDK.ID, treeID string, op *Move) (*LogMove, error) {
|
||||
fullID := cid.String() + "/" + treeID
|
||||
s, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
s = newState()
|
||||
f.treeMap[fullID] = s
|
||||
}
|
||||
|
||||
op.Time = s.timestamp()
|
||||
if op.Child == RootID {
|
||||
op.Child = s.findSpareID()
|
||||
}
|
||||
|
||||
lm := s.do(op)
|
||||
s.operations = append(s.operations, lm)
|
||||
return &lm, nil
|
||||
}
|
||||
|
||||
// TreeAddByPath implements the Forest interface.
|
||||
func (f *memoryForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, m []KeyValue) ([]LogMove, error) {
|
||||
fullID := cid.String() + "/" + treeID
|
||||
s, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
s = newState()
|
||||
f.treeMap[fullID] = s
|
||||
}
|
||||
|
||||
i, node := s.getPathPrefix(attr, path)
|
||||
lm := make([]LogMove, len(path)-i+1)
|
||||
for j := i; j < len(path); j++ {
|
||||
lm[j-i] = s.do(&Move{
|
||||
Parent: node,
|
||||
Meta: Meta{Time: s.timestamp(), Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}},
|
||||
Child: s.findSpareID(),
|
||||
})
|
||||
node = lm[j-i].Child
|
||||
s.operations = append(s.operations, lm[j-i])
|
||||
}
|
||||
lm[len(lm)-1] = s.do(&Move{
|
||||
Parent: node,
|
||||
Meta: Meta{Time: s.timestamp(), Items: m},
|
||||
Child: s.findSpareID(),
|
||||
})
|
||||
return lm, nil
|
||||
}
|
||||
|
||||
// TreeApply implements the Forest interface.
|
||||
func (f *memoryForest) TreeApply(cid cidSDK.ID, treeID string, op *Move) error {
|
||||
fullID := cid.String() + "/" + treeID
|
||||
s, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
s = newState()
|
||||
f.treeMap[fullID] = s
|
||||
}
|
||||
|
||||
return s.Apply(op)
|
||||
}
|
||||
|
||||
func (f *memoryForest) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *memoryForest) Open() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *memoryForest) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TreeGetByPath implements the Forest interface.
|
||||
func (f *memoryForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) {
|
||||
fullID := cid.String() + "/" + treeID
|
||||
s, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
return nil, ErrTreeNotFound
|
||||
}
|
||||
|
||||
return s.get(attr, path, latest), nil
|
||||
}
|
||||
|
||||
// TreeGetMeta implements the Forest interface.
|
||||
func (f *memoryForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Meta, error) {
|
||||
fullID := cid.String() + "/" + treeID
|
||||
s, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
return Meta{}, ErrTreeNotFound
|
||||
}
|
||||
|
||||
return s.getMeta(nodeID), nil
|
||||
}
|
432
pkg/local_object_storage/pilorama/forest_test.go
Normal file
432
pkg/local_object_storage/pilorama/forest_test.go
Normal file
|
@ -0,0 +1,432 @@
|
|||
package pilorama
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var providers = []struct {
|
||||
name string
|
||||
construct func(t *testing.T) Forest
|
||||
}{
|
||||
{"inmemory", func(t *testing.T) Forest {
|
||||
f := NewMemoryForest()
|
||||
require.NoError(t, f.Init())
|
||||
require.NoError(t, f.Open())
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, f.Close())
|
||||
})
|
||||
|
||||
return f
|
||||
}},
|
||||
{"bbolt", func(t *testing.T) Forest {
|
||||
// Use `os.TempDir` because we construct multiple times in the same test.
|
||||
tmpDir, err := os.MkdirTemp(os.TempDir(), "*")
|
||||
require.NoError(t, err)
|
||||
|
||||
f := NewBoltForest(filepath.Join(tmpDir, "test.db"))
|
||||
require.NoError(t, f.Init())
|
||||
require.NoError(t, f.Open())
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, f.Close())
|
||||
require.NoError(t, os.RemoveAll(tmpDir))
|
||||
})
|
||||
return f
|
||||
}},
|
||||
}
|
||||
|
||||
func testMeta(t *testing.T, f Forest, cid cidSDK.ID, treeID string, nodeID Node, expected Meta) {
|
||||
actual, err := f.TreeGetMeta(cid, treeID, nodeID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestForest_TreeMove(t *testing.T) {
|
||||
for i := range providers {
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
testForestTreeMove(t, providers[i].construct(t))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testForestTreeMove(t *testing.T, s Forest) {
|
||||
cid := cidtest.ID()
|
||||
treeID := "version"
|
||||
|
||||
meta := []KeyValue{
|
||||
{Key: AttributeVersion, Value: []byte("XXX")},
|
||||
{Key: AttributeFilename, Value: []byte("file.txt")}}
|
||||
lm, err := s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, len(lm))
|
||||
|
||||
nodeID := lm[2].Child
|
||||
t.Run("same parent, update meta", func(t *testing.T) {
|
||||
_, err = s.TreeMove(cid, treeID, &Move{
|
||||
Parent: lm[1].Child,
|
||||
Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})},
|
||||
Child: nodeID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, false)
|
||||
require.NoError(t, err)
|
||||
require.ElementsMatch(t, []Node{nodeID}, nodes)
|
||||
})
|
||||
t.Run("different parent", func(t *testing.T) {
|
||||
_, err = s.TreeMove(cid, treeID, &Move{
|
||||
Parent: RootID,
|
||||
Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})},
|
||||
Child: nodeID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, false)
|
||||
require.NoError(t, err)
|
||||
require.True(t, len(nodes) == 0)
|
||||
|
||||
nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"file.txt"}, false)
|
||||
require.NoError(t, err)
|
||||
require.ElementsMatch(t, []Node{nodeID}, nodes)
|
||||
})
|
||||
}
|
||||
|
||||
func TestForest_TreeAdd(t *testing.T) {
|
||||
for i := range providers {
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
testForestTreeAdd(t, providers[i].construct(t))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testForestTreeAdd(t *testing.T, s Forest) {
|
||||
cid := cidtest.ID()
|
||||
treeID := "version"
|
||||
|
||||
meta := []KeyValue{
|
||||
{Key: AttributeVersion, Value: []byte("XXX")},
|
||||
{Key: AttributeFilename, Value: []byte("file.txt")}}
|
||||
lm, err := s.TreeMove(cid, treeID, &Move{
|
||||
Parent: RootID,
|
||||
Child: RootID,
|
||||
Meta: Meta{Items: meta},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
testMeta(t, s, cid, treeID, lm.Child, Meta{Time: lm.Time, Items: meta})
|
||||
|
||||
nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"file.txt"}, false)
|
||||
require.NoError(t, err)
|
||||
require.ElementsMatch(t, []Node{lm.Child}, nodes)
|
||||
|
||||
t.Run("other trees are unaffected", func(t *testing.T) {
|
||||
_, err := s.TreeGetByPath(cid, treeID+"123", AttributeFilename, []string{"file.txt"}, false)
|
||||
require.True(t, errors.Is(err, ErrTreeNotFound), "got: %v", err)
|
||||
|
||||
_, err = s.TreeGetMeta(cid, treeID+"123", 0)
|
||||
require.True(t, errors.Is(err, ErrTreeNotFound), "got: %v", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestForest_TreeAddByPath(t *testing.T) {
|
||||
for i := range providers {
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
testForestTreeAddByPath(t, providers[i].construct(t))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testForestTreeAddByPath(t *testing.T, s Forest) {
|
||||
cid := cidtest.ID()
|
||||
treeID := "version"
|
||||
|
||||
meta := []KeyValue{
|
||||
{Key: AttributeVersion, Value: []byte("XXX")},
|
||||
{Key: AttributeFilename, Value: []byte("file.txt")}}
|
||||
lm, err := s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, len(lm))
|
||||
testMeta(t, s, cid, treeID, lm[0].Child, Meta{Time: lm[0].Time, Items: []KeyValue{{AttributeFilename, []byte("path")}}})
|
||||
testMeta(t, s, cid, treeID, lm[1].Child, Meta{Time: lm[1].Time, Items: []KeyValue{{AttributeFilename, []byte("to")}}})
|
||||
|
||||
firstID := lm[2].Child
|
||||
testMeta(t, s, cid, treeID, firstID, Meta{Time: lm[2].Time, Items: meta})
|
||||
|
||||
meta[0].Value = []byte("YYY")
|
||||
lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(lm))
|
||||
|
||||
secondID := lm[0].Child
|
||||
testMeta(t, s, cid, treeID, secondID, Meta{Time: lm[0].Time, Items: meta})
|
||||
|
||||
t.Run("get versions", func(t *testing.T) {
|
||||
// All versions.
|
||||
nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, false)
|
||||
require.NoError(t, err)
|
||||
require.ElementsMatch(t, []Node{firstID, secondID}, nodes)
|
||||
|
||||
// Latest version.
|
||||
nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, true)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []Node{secondID}, nodes)
|
||||
})
|
||||
|
||||
meta[0].Value = []byte("ZZZ")
|
||||
meta[1].Value = []byte("cat.jpg")
|
||||
lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "dir"}, meta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(lm))
|
||||
testMeta(t, s, cid, treeID, lm[0].Child, Meta{Time: lm[0].Time, Items: []KeyValue{{AttributeFilename, []byte("dir")}}})
|
||||
testMeta(t, s, cid, treeID, lm[1].Child, Meta{Time: lm[1].Time, Items: meta})
|
||||
}
|
||||
|
||||
func TestForest_Apply(t *testing.T) {
|
||||
for i := range providers {
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
testForestTreeApply(t, providers[i].construct)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testForestTreeApply(t *testing.T, constructor func(t *testing.T) Forest) {
|
||||
cid := cidtest.ID()
|
||||
treeID := "version"
|
||||
|
||||
testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) {
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &Move{
|
||||
Child: child,
|
||||
Parent: parent,
|
||||
Meta: meta,
|
||||
}))
|
||||
}
|
||||
|
||||
t.Run("add a child, then insert a parent removal", func(t *testing.T) {
|
||||
s := constructor(t)
|
||||
testApply(t, s, 10, 0, Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}})
|
||||
|
||||
meta := Meta{Time: 3, Items: []KeyValue{{"child", []byte{3}}}}
|
||||
testApply(t, s, 11, 10, meta)
|
||||
testMeta(t, s, cid, treeID, 11, meta)
|
||||
|
||||
testApply(t, s, 10, TrashID, Meta{Time: 2, Items: []KeyValue{{"parent", []byte{2}}}})
|
||||
testMeta(t, s, cid, treeID, 11, Meta{})
|
||||
})
|
||||
t.Run("add a child to non-existent parent, then add a parent", func(t *testing.T) {
|
||||
s := constructor(t)
|
||||
|
||||
testApply(t, s, 11, 10, Meta{Time: 1, Items: []KeyValue{{"child", []byte{3}}}})
|
||||
testMeta(t, s, cid, treeID, 11, Meta{})
|
||||
|
||||
testApply(t, s, 10, 0, Meta{Time: 2, Items: []KeyValue{{"grand", []byte{1}}}})
|
||||
testMeta(t, s, cid, treeID, 11, Meta{})
|
||||
})
|
||||
}
|
||||
|
||||
func TestForest_ApplyRandom(t *testing.T) {
|
||||
for i := range providers {
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
testForestTreeApplyRandom(t, providers[i].construct)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testForestTreeApplyRandom(t *testing.T, constructor func(t *testing.T) Forest) {
|
||||
rand.Seed(42)
|
||||
|
||||
const (
|
||||
nodeCount = 4
|
||||
opCount = 10
|
||||
iterCount = 100
|
||||
)
|
||||
|
||||
cid := cidtest.ID()
|
||||
treeID := "version"
|
||||
expected := constructor(t)
|
||||
|
||||
ops := make([]Move, nodeCount+opCount)
|
||||
for i := 0; i < nodeCount; i++ {
|
||||
ops[i] = Move{
|
||||
Parent: 0,
|
||||
Meta: Meta{
|
||||
Time: Timestamp(i),
|
||||
Items: []KeyValue{{Value: make([]byte, 10)}},
|
||||
},
|
||||
Child: uint64(i) + 1,
|
||||
}
|
||||
rand.Read(ops[i].Meta.Items[0].Value)
|
||||
}
|
||||
|
||||
for i := nodeCount; i < len(ops); i++ {
|
||||
ops[i] = Move{
|
||||
Parent: rand.Uint64() % (nodeCount + 1),
|
||||
Meta: Meta{
|
||||
Time: Timestamp(i + nodeCount),
|
||||
Items: []KeyValue{{Value: make([]byte, 10)}},
|
||||
},
|
||||
Child: rand.Uint64() % (nodeCount + 1),
|
||||
}
|
||||
rand.Read(ops[i].Meta.Items[0].Value)
|
||||
}
|
||||
for i := range ops {
|
||||
require.NoError(t, expected.TreeApply(cid, treeID, &ops[i]))
|
||||
}
|
||||
|
||||
for i := 0; i < iterCount; i++ {
|
||||
// Shuffle random operations, leave initialization in place.
|
||||
rand.Shuffle(len(ops)-nodeCount, func(i, j int) { ops[i+nodeCount], ops[j+nodeCount] = ops[j+nodeCount], ops[i+nodeCount] })
|
||||
|
||||
actual := constructor(t)
|
||||
for i := range ops {
|
||||
require.NoError(t, actual.TreeApply(cid, treeID, &ops[i]))
|
||||
}
|
||||
for i := uint64(0); i < nodeCount; i++ {
|
||||
expectedMeta, err := expected.TreeGetMeta(cid, treeID, i)
|
||||
require.NoError(t, err)
|
||||
actualMeta, err := actual.TreeGetMeta(cid, treeID, i)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedMeta, actualMeta, "node id: %d", i)
|
||||
|
||||
if _, ok := actual.(*memoryForest); ok {
|
||||
require.Equal(t, expected, actual, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const benchNodeCount = 1000
|
||||
|
||||
func BenchmarkApplySequential(b *testing.B) {
|
||||
benchmarkApply(b, benchNodeCount, func(nodeCount, opCount int) []Move {
|
||||
ops := make([]Move, opCount)
|
||||
for i := range ops {
|
||||
ops[i] = Move{
|
||||
Parent: uint64(rand.Intn(nodeCount)),
|
||||
Meta: Meta{
|
||||
Time: Timestamp(i),
|
||||
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
|
||||
},
|
||||
Child: uint64(rand.Intn(nodeCount)),
|
||||
}
|
||||
}
|
||||
return ops
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkApplyReorderLast(b *testing.B) {
|
||||
// Group operations in a blocks of 10, order blocks in increasing timestamp order,
|
||||
// and operations in a single block in reverse.
|
||||
const blockSize = 10
|
||||
|
||||
benchmarkApply(b, benchNodeCount, func(nodeCount, opCount int) []Move {
|
||||
ops := make([]Move, opCount)
|
||||
for i := range ops {
|
||||
ops[i] = Move{
|
||||
Parent: uint64(rand.Intn(nodeCount)),
|
||||
Meta: Meta{
|
||||
Time: Timestamp(i),
|
||||
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
|
||||
},
|
||||
Child: uint64(rand.Intn(nodeCount)),
|
||||
}
|
||||
if i != 0 && i%blockSize == 0 {
|
||||
for j := 0; j < blockSize/2; j++ {
|
||||
ops[i-j], ops[i+j-blockSize] = ops[i+j-blockSize], ops[i-j]
|
||||
}
|
||||
}
|
||||
}
|
||||
return ops
|
||||
})
|
||||
}
|
||||
|
||||
func benchmarkApply(b *testing.B, n int, genFunc func(int, int) []Move) {
|
||||
rand.Seed(42)
|
||||
|
||||
s := newState()
|
||||
ops := genFunc(n, b.N)
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
for i := range ops {
|
||||
if err := s.Apply(&ops[i]); err != nil {
|
||||
b.Fatalf("error in `Apply`: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTreeGetByPath(t *testing.T) {
|
||||
for i := range providers {
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
testTreeGetByPath(t, providers[i].construct(t))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testTreeGetByPath(t *testing.T, s Forest) {
|
||||
cid := cidtest.ID()
|
||||
treeID := "version"
|
||||
|
||||
// /
|
||||
// |- a (1)
|
||||
// |- cat1.jpg, Version=TTT (3)
|
||||
// |- b (2)
|
||||
// |- cat1.jpg, Version=XXX (4)
|
||||
// |- cat1.jpg, Version=YYY (5)
|
||||
// |- cat2.jpg, Version=ZZZ (6)
|
||||
testMove(t, s, 0, 1, 0, cid, treeID, "a", "")
|
||||
testMove(t, s, 1, 2, 0, cid, treeID, "b", "")
|
||||
testMove(t, s, 2, 3, 1, cid, treeID, "cat1.jpg", "TTT")
|
||||
testMove(t, s, 3, 4, 2, cid, treeID, "cat1.jpg", "XXX")
|
||||
testMove(t, s, 4, 5, 2, cid, treeID, "cat1.jpg", "YYY")
|
||||
testMove(t, s, 5, 6, 2, cid, treeID, "cat2.jpg", "ZZZ")
|
||||
|
||||
if mf, ok := s.(*memoryForest); ok {
|
||||
single := mf.treeMap[cid.String()+"/"+treeID]
|
||||
t.Run("test meta", func(t *testing.T) {
|
||||
for i := 0; i < 6; i++ {
|
||||
require.Equal(t, uint64(i), single.infoMap[Node(i+1)].Timestamp)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"b", "cat1.jpg"}, false)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []Node{4, 5}, nodes)
|
||||
|
||||
nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"a", "cat1.jpg"}, false)
|
||||
require.Equal(t, []Node{3}, nodes)
|
||||
|
||||
t.Run("missing child", func(t *testing.T) {
|
||||
nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"a", "cat3.jpg"}, false)
|
||||
require.True(t, len(nodes) == 0)
|
||||
})
|
||||
t.Run("missing parent", func(t *testing.T) {
|
||||
nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"xyz", "cat1.jpg"}, false)
|
||||
require.True(t, len(nodes) == 0)
|
||||
})
|
||||
t.Run("empty path", func(t *testing.T) {
|
||||
nodes, err = s.TreeGetByPath(cid, treeID, AttributeFilename, nil, false)
|
||||
require.True(t, len(nodes) == 0)
|
||||
})
|
||||
}
|
||||
|
||||
func testMove(t *testing.T, s Forest, ts int, node, parent Node, cid cidSDK.ID, treeID, filename, version string) {
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &Move{
|
||||
Parent: parent,
|
||||
Child: node,
|
||||
Meta: Meta{
|
||||
Time: uint64(ts),
|
||||
Items: []KeyValue{
|
||||
{AttributeFilename, []byte(filename)},
|
||||
{AttributeVersion, []byte(version)},
|
||||
},
|
||||
},
|
||||
}))
|
||||
}
|
235
pkg/local_object_storage/pilorama/inmemory.go
Normal file
235
pkg/local_object_storage/pilorama/inmemory.go
Normal file
|
@ -0,0 +1,235 @@
|
|||
package pilorama
|
||||
|
||||
// nodeInfo couples parent and metadata.
|
||||
type nodeInfo struct {
|
||||
Parent Node
|
||||
Meta Meta
|
||||
Timestamp Timestamp
|
||||
}
|
||||
|
||||
// state represents state being replicated.
|
||||
type state struct {
|
||||
operations []LogMove
|
||||
tree
|
||||
}
|
||||
|
||||
// newState constructs new empty tree.
|
||||
func newState() *state {
|
||||
return &state{
|
||||
tree: *newTree(),
|
||||
}
|
||||
}
|
||||
|
||||
// undo un-does op and changes s in-place.
|
||||
func (s *state) undo(op *LogMove) {
|
||||
children := s.tree.childMap[op.Parent]
|
||||
for i := range children {
|
||||
if children[i] == op.Child {
|
||||
if len(children) > 1 {
|
||||
s.tree.childMap[op.Parent] = append(children[:i], children[i+1:]...)
|
||||
} else {
|
||||
delete(s.tree.childMap, op.Parent)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if op.HasOld {
|
||||
s.tree.infoMap[op.Child] = op.Old
|
||||
oldChildren := s.tree.childMap[op.Old.Parent]
|
||||
for i := range oldChildren {
|
||||
if oldChildren[i] == op.Child {
|
||||
return
|
||||
}
|
||||
}
|
||||
s.tree.childMap[op.Old.Parent] = append(oldChildren, op.Child)
|
||||
} else {
|
||||
delete(s.tree.infoMap, op.Child)
|
||||
}
|
||||
}
|
||||
|
||||
// Apply puts op in log at a proper position, re-applies all subsequent operations
|
||||
// from log and changes s in-place.
|
||||
func (s *state) Apply(op *Move) error {
|
||||
var index int
|
||||
for index = len(s.operations); index > 0; index-- {
|
||||
if s.operations[index-1].Time <= op.Time {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if index == len(s.operations) {
|
||||
s.operations = append(s.operations, s.do(op))
|
||||
return nil
|
||||
}
|
||||
|
||||
s.operations = append(s.operations[:index+1], s.operations[index:]...)
|
||||
for i := len(s.operations) - 1; i > index; i-- {
|
||||
s.undo(&s.operations[i])
|
||||
}
|
||||
|
||||
s.operations[index] = s.do(op)
|
||||
|
||||
for i := index + 1; i < len(s.operations); i++ {
|
||||
s.operations[i] = s.do(&s.operations[i].Move)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// do performs a single move operation on a tree.
|
||||
func (s *state) do(op *Move) LogMove {
|
||||
lm := LogMove{
|
||||
Move: Move{
|
||||
Parent: op.Parent,
|
||||
Meta: op.Meta,
|
||||
Child: op.Child,
|
||||
},
|
||||
}
|
||||
|
||||
_, parentInTree := s.tree.infoMap[op.Parent]
|
||||
shouldPut := !s.tree.isAncestor(op.Child, op.Parent) &&
|
||||
!(op.Parent != 0 && op.Parent != TrashID && !parentInTree)
|
||||
shouldRemove := op.Parent == TrashID
|
||||
|
||||
p, ok := s.tree.infoMap[op.Child]
|
||||
if ok {
|
||||
lm.HasOld = true
|
||||
lm.Old = p
|
||||
}
|
||||
|
||||
if !shouldPut {
|
||||
return lm
|
||||
}
|
||||
|
||||
if shouldRemove {
|
||||
if ok {
|
||||
s.removeChild(op.Child, p.Parent)
|
||||
}
|
||||
delete(s.tree.infoMap, op.Child)
|
||||
return lm
|
||||
}
|
||||
|
||||
if !ok {
|
||||
p.Timestamp = op.Time
|
||||
} else {
|
||||
s.removeChild(op.Child, p.Parent)
|
||||
}
|
||||
|
||||
p.Meta = op.Meta
|
||||
p.Parent = op.Parent
|
||||
s.tree.infoMap[op.Child] = p
|
||||
s.tree.childMap[op.Parent] = append(s.tree.childMap[op.Parent], op.Child)
|
||||
|
||||
return lm
|
||||
}
|
||||
|
||||
func (s *state) removeChild(child, parent Node) {
|
||||
oldChildren := s.tree.childMap[parent]
|
||||
for i := range oldChildren {
|
||||
if oldChildren[i] == child {
|
||||
s.tree.childMap[parent] = append(oldChildren[:i], oldChildren[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *state) timestamp() Timestamp {
|
||||
if len(s.operations) == 0 {
|
||||
return 0
|
||||
}
|
||||
return s.operations[len(s.operations)-1].Time + 1
|
||||
}
|
||||
|
||||
func (s *state) findSpareID() Node {
|
||||
id := uint64(1)
|
||||
for _, ok := s.infoMap[id]; ok; _, ok = s.infoMap[id] {
|
||||
id++
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
// tree is a mapping from the child nodes to their parent and metadata.
|
||||
type tree struct {
|
||||
infoMap map[Node]nodeInfo
|
||||
childMap map[Node][]Node
|
||||
}
|
||||
|
||||
func newTree() *tree {
|
||||
return &tree{
|
||||
childMap: make(map[Node][]Node),
|
||||
infoMap: make(map[Node]nodeInfo),
|
||||
}
|
||||
}
|
||||
|
||||
// isAncestor returns true if parent is an ancestor of a child.
|
||||
// For convenience, also return true if parent == child.
|
||||
func (t tree) isAncestor(parent, child Node) bool {
|
||||
for c := child; c != parent; {
|
||||
p, ok := t.infoMap[c]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
c = p.Parent
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// getPathPrefix descends by path constructed from values of attr until
|
||||
// there is no node corresponding to a path element. Returns the amount of nodes
|
||||
// processed and ID of the last node.
|
||||
func (t tree) getPathPrefix(attr string, path []string) (int, Node) {
|
||||
var curNode Node
|
||||
|
||||
loop:
|
||||
for i := range path {
|
||||
children := t.childMap[curNode]
|
||||
for j := range children {
|
||||
f := t.infoMap[children[j]].Meta.GetAttr(attr)
|
||||
if string(f) == path[i] {
|
||||
curNode = children[j]
|
||||
continue loop
|
||||
}
|
||||
}
|
||||
return i, curNode
|
||||
}
|
||||
|
||||
return len(path), curNode
|
||||
}
|
||||
|
||||
// get returns list of nodes which have the specified path from root
|
||||
// descending by values of attr from meta.
|
||||
func (t tree) get(attr string, path []string, latest bool) []Node {
|
||||
if len(path) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
i, curNode := t.getPathPrefix(attr, path[:len(path)-1])
|
||||
if i < len(path)-1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var nodes []Node
|
||||
var lastTs Timestamp
|
||||
|
||||
children := t.childMap[curNode]
|
||||
for i := range children {
|
||||
info := t.infoMap[children[i]]
|
||||
fileName := string(info.Meta.GetAttr(attr))
|
||||
if fileName == path[len(path)-1] {
|
||||
if latest {
|
||||
if info.Timestamp >= lastTs {
|
||||
nodes = append(nodes[:0], children[i])
|
||||
}
|
||||
} else {
|
||||
nodes = append(nodes, children[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nodes
|
||||
}
|
||||
|
||||
// getMeta returns meta information of node n.
|
||||
func (t tree) getMeta(n Node) Meta {
|
||||
return t.infoMap[n].Meta
|
||||
}
|
35
pkg/local_object_storage/pilorama/interface.go
Normal file
35
pkg/local_object_storage/pilorama/interface.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package pilorama
|
||||
|
||||
import cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
|
||||
// Forest represents CRDT tree.
|
||||
type Forest interface {
|
||||
// TreeMove moves node in the tree.
|
||||
// If the parent of the move operation is TrashID, the node is removed.
|
||||
// If the child of the move operation is RootID, new ID is generated and added to a tree.
|
||||
TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error)
|
||||
// TreeAddByPath adds new node in the tree using provided path.
|
||||
// The path is constructed by descending from the root using the values of the attr in meta.
|
||||
TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error)
|
||||
// TreeApply applies replicated operation from another node.
|
||||
TreeApply(cid cidSDK.ID, treeID string, m *Move) error
|
||||
// TreeGetByPath returns all nodes corresponding to the path.
|
||||
// The path is constructed by descending from the root using the values of the
|
||||
// AttributeFilename in meta.
|
||||
// The last argument determines whether only the node with the latest timestamp is returned.
|
||||
TreeGetByPath(cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error)
|
||||
// TreeGetMeta returns meta information of the node with the specified ID.
|
||||
TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Meta, error)
|
||||
}
|
||||
|
||||
type ForestStorage interface {
|
||||
Init() error
|
||||
Open() error
|
||||
Close() error
|
||||
Forest
|
||||
}
|
||||
|
||||
const (
|
||||
AttributeFilename = "FileName"
|
||||
AttributeVersion = "Version"
|
||||
)
|
48
pkg/local_object_storage/pilorama/meta.go
Normal file
48
pkg/local_object_storage/pilorama/meta.go
Normal file
|
@ -0,0 +1,48 @@
|
|||
package pilorama
|
||||
|
||||
import "github.com/nspcc-dev/neo-go/pkg/io"
|
||||
|
||||
func (x *Meta) FromBytes(data []byte) error {
|
||||
if len(data) == 0 {
|
||||
x.Items = nil
|
||||
x.Time = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
r := io.NewBinReaderFromBuf(data)
|
||||
ts := r.ReadVarUint()
|
||||
size := r.ReadVarUint()
|
||||
m := make([]KeyValue, size)
|
||||
for i := range m {
|
||||
m[i].Key = r.ReadString()
|
||||
m[i].Value = r.ReadVarBytes()
|
||||
}
|
||||
if r.Err != nil {
|
||||
return r.Err
|
||||
}
|
||||
|
||||
x.Time = ts
|
||||
x.Items = m
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x Meta) Bytes() []byte {
|
||||
w := io.NewBufBinWriter()
|
||||
w.WriteVarUint(x.Time)
|
||||
w.WriteVarUint(uint64(len(x.Items)))
|
||||
for _, e := range x.Items {
|
||||
w.WriteString(e.Key)
|
||||
w.WriteVarBytes(e.Value)
|
||||
}
|
||||
|
||||
return w.Bytes()
|
||||
}
|
||||
|
||||
func (x Meta) GetAttr(name string) []byte {
|
||||
for _, kv := range x.Items {
|
||||
if kv.Key == name {
|
||||
return kv.Value
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
54
pkg/local_object_storage/pilorama/meta_test.go
Normal file
54
pkg/local_object_storage/pilorama/meta_test.go
Normal file
|
@ -0,0 +1,54 @@
|
|||
package pilorama
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMeta_Bytes(t *testing.T) {
|
||||
t.Run("empty", func(t *testing.T) {
|
||||
var m Meta
|
||||
require.NoError(t, m.FromBytes(nil))
|
||||
require.True(t, len(m.Items) == 0)
|
||||
require.Equal(t, uint64(0), m.Time)
|
||||
require.Equal(t, []byte{0, 0}, m.Bytes())
|
||||
})
|
||||
t.Run("filled", func(t *testing.T) {
|
||||
expected := Meta{
|
||||
Time: 123,
|
||||
Items: []KeyValue{
|
||||
{"abc", []byte{1, 2, 3}},
|
||||
{"xyz", []byte{5, 6, 7, 8}},
|
||||
}}
|
||||
|
||||
data := expected.Bytes()
|
||||
|
||||
var actual Meta
|
||||
require.NoError(t, actual.FromBytes(data))
|
||||
require.Equal(t, expected, actual)
|
||||
|
||||
t.Run("error", func(t *testing.T) {
|
||||
require.Error(t, new(Meta).FromBytes(data[:len(data)/2]))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestMeta_GetAttr(t *testing.T) {
|
||||
attr := [][]byte{
|
||||
make([]byte, 5),
|
||||
make([]byte, 10),
|
||||
}
|
||||
for i := range attr {
|
||||
rand.Read(attr[i])
|
||||
}
|
||||
|
||||
m := Meta{Items: []KeyValue{{"abc", attr[0]}, {"xyz", attr[1]}}}
|
||||
require.Equal(t, attr[0], m.GetAttr("abc"))
|
||||
require.Equal(t, attr[1], m.GetAttr("xyz"))
|
||||
|
||||
require.Nil(t, m.GetAttr("a"))
|
||||
require.Nil(t, m.GetAttr("xyza"))
|
||||
require.Nil(t, m.GetAttr(""))
|
||||
}
|
52
pkg/local_object_storage/pilorama/types.go
Normal file
52
pkg/local_object_storage/pilorama/types.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package pilorama
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"math"
|
||||
)
|
||||
|
||||
// Timestamp is an alias for integer timestamp type.
|
||||
// TODO: remove after the debugging.
|
||||
type Timestamp = uint64
|
||||
|
||||
// Node is used to represent nodes.
|
||||
// TODO: remove after the debugging.
|
||||
type Node = uint64
|
||||
|
||||
// Meta represents arbitrary meta information.
|
||||
// TODO: remove after the debugging or create a proper interface.
|
||||
type Meta struct {
|
||||
Time Timestamp
|
||||
Items []KeyValue
|
||||
}
|
||||
|
||||
// KeyValue represents a key-value pair.
|
||||
type KeyValue struct {
|
||||
Key string
|
||||
Value []byte
|
||||
}
|
||||
|
||||
// Move represents a single move operation.
|
||||
type Move struct {
|
||||
Parent Node
|
||||
Meta
|
||||
// Child represents the ID of a node being moved. If zero, new ID is generated.
|
||||
Child Node
|
||||
}
|
||||
|
||||
// LogMove represents log record for a single move operation.
|
||||
type LogMove struct {
|
||||
Move
|
||||
HasOld bool
|
||||
Old nodeInfo
|
||||
}
|
||||
|
||||
const (
|
||||
// RootID represents the ID of a root node.
|
||||
RootID = 0
|
||||
// TrashID is a parent for all removed nodes.
|
||||
TrashID = math.MaxUint64
|
||||
)
|
||||
|
||||
// ErrTreeNotFound is returned when the requested tree is not found.
|
||||
var ErrTreeNotFound = errors.New("tree not found")
|
Loading…
Reference in a new issue