diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 18becc4455..3aba953e08 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -38,6 +38,7 @@ var ( // 'p' + node (id) -> parent (id) // 'm' + node (id) -> serialized meta // 'c' + parent (id) + child (id) -> 0/1 +// 'i' + 0 + attrKey + 0 + attrValue + 0 + parent (id) + node (id) -> 0/1 (1 for automatically created nodes) func NewBoltForest(path string) ForestStorage { return &boltForest{path: path} } @@ -89,6 +90,10 @@ func (t *boltForest) TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, // TreeAddByPath implements the Forest interface. func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) { + if !isAttributeInternal(attr) { + return nil, ErrNotPathAttribute + } + var lm []LogMove var key [17]byte @@ -273,7 +278,7 @@ func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMo return err } } - return t.removeNode(b, key, op.Child) + return t.removeNode(b, key, op.Child, op.Parent) } if currParent == nil { @@ -281,18 +286,43 @@ func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMo return err } } else { - if err := b.Delete(childrenKey(key, op.Child, binary.LittleEndian.Uint64(currParent))); err != nil { + parent := binary.LittleEndian.Uint64(currParent) + if err := b.Delete(childrenKey(key, op.Child, parent)); err != nil { return err } + var meta Meta + var k = metaKey(key, op.Child) + if err := meta.FromBytes(b.Get(k)); err == nil { + for i := range meta.Items { + if isAttributeInternal(meta.Items[i].Key) { + err := b.Delete(internalKey(nil, meta.Items[i].Key, string(meta.Items[i].Value), parent, op.Child)) + if err != nil { + return err + } + } + } + } } return t.addNode(b, key, op.Child, op.Parent, op.Meta) } // removeNode removes node keys from the tree except the children key or its parent. -func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node Node) error { +func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node) error { if err := b.Delete(parentKey(key, node)); err != nil { return err } + var meta Meta + var k = metaKey(key, node) + if err := meta.FromBytes(b.Get(k)); err == nil { + for i := range meta.Items { + if isAttributeInternal(meta.Items[i].Key) { + err := b.Delete(internalKey(nil, meta.Items[i].Key, string(meta.Items[i].Value), parent, node)) + if err != nil { + return err + } + } + } + } if err := b.Delete(metaKey(key, node)); err != nil { return err } @@ -309,7 +339,27 @@ func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, me if err != nil { return err } - return b.Put(metaKey(key, child), meta.Bytes()) + err = b.Put(metaKey(key, child), meta.Bytes()) + if err != nil { + return err + } + + for i := range meta.Items { + if !isAttributeInternal(meta.Items[i].Key) { + continue + } + + key = internalKey(key, meta.Items[i].Key, string(meta.Items[i].Value), parent, child) + if len(meta.Items) == 1 { + err = b.Put(key, []byte{1}) + } else { + err = b.Put(key, []byte{0}) + } + if err != nil { + return err + } + } + return nil } func (t *boltForest) undo(m *Move, lm *LogMove, b *bbolt.Bucket, key []byte) error { @@ -318,7 +368,7 @@ func (t *boltForest) undo(m *Move, lm *LogMove, b *bbolt.Bucket, key []byte) err } if !lm.HasOld { - return t.removeNode(b, key, m.Child) + return t.removeNode(b, key, m.Child, m.Parent) } return t.addNode(b, key, m.Child, lm.Old.Parent, lm.Old.Meta) } @@ -338,6 +388,10 @@ func (t *boltForest) isAncestor(b *bbolt.Bucket, key []byte, parent, child Node) // TreeGetByPath implements the Forest interface. func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) { + if !isAttributeInternal(attr) { + return nil, ErrNotPathAttribute + } + if len(path) == 0 { return nil, nil } @@ -360,40 +414,29 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa return nil } - c := b.Cursor() - var ( - metaKey [9]byte - id [9]byte childID [9]byte - m Meta maxTimestamp uint64 ) - id[0] = 'c' - metaKey[0] = 'm' + c := b.Cursor() - binary.LittleEndian.PutUint64(id[1:], curNode) - - key, _ := c.Seek(id[:]) - for len(key) == 1+8+8 && bytes.Equal(id[:9], key[:9]) { - child := binary.LittleEndian.Uint64(key[9:]) - copy(metaKey[1:], key[9:17]) - - if m.FromBytes(b.Get(metaKey[:])) == nil && string(m.GetAttr(attr)) == path[len(path)-1] { - if latest { - ts := binary.LittleEndian.Uint64(b.Get(timestampKey(childID[:], child))) - if ts >= maxTimestamp { - nodes = append(nodes[:0], child) - maxTimestamp = ts - } - } else { - nodes = append(nodes, child) + attrKey := internalKey(nil, attr, path[len(path)-1], curNode, 0) + attrKey = attrKey[:len(attrKey)-8] + childKey, _ := c.Seek(attrKey) + for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) { + child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:]) + if latest { + ts := binary.LittleEndian.Uint64(b.Get(timestampKey(childID[:], child))) + if ts >= maxTimestamp { + nodes = append(nodes[:0], child) + maxTimestamp = ts } + } else { + nodes = append(nodes, child) } - key, _ = c.Next() + childKey, _ = c.Next() } - return nil }) } @@ -470,36 +513,25 @@ func (t *boltForest) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) ( } func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) { - var key [9]byte - c := bTree.Cursor() var curNode Node - var m Meta + var attrKey []byte loop: for i := range path { - key[0] = 'c' - binary.LittleEndian.PutUint64(key[1:], curNode) + attrKey = internalKey(attrKey, attr, path[i], curNode, 0) + attrKey = attrKey[:len(attrKey)-8] - childKey, _ := c.Seek(key[:]) - for { - if len(childKey) != 17 || binary.LittleEndian.Uint64(childKey[1:]) != curNode { - break - } - - child := binary.LittleEndian.Uint64(childKey[9:]) - if err := m.FromBytes(bTree.Get(metaKey(key[:], child))); err != nil { - return 0, 0, err - } - - // Internal nodes have exactly one attribute. - if len(m.Items) == 1 && m.Items[0].Key == attr && string(m.Items[0].Value) == path[i] { - curNode = child + childKey, value := c.Seek(attrKey) + for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) { + if len(value) == 1 && value[0] == 1 { + curNode = binary.LittleEndian.Uint64(childKey[len(childKey)-8:]) continue loop } - childKey, _ = c.Next() + childKey, value = c.Next() } + return i, curNode, nil } @@ -581,6 +613,33 @@ func childrenKey(key []byte, child, parent Node) []byte { return key[:17] } +// 'i' + attribute name (string) + attribute value (string) + parent (id) + node (id) -> 0/1 +func internalKey(key []byte, k, v string, parent, node Node) []byte { + size := 1 /* prefix */ + 2*2 /* len */ + 2*8 /* nodes */ + len(k) + len(v) + if cap(key) < size { + key = make([]byte, 0, size) + } + + key = key[:0] + key = append(key, 'i') + + l := len(k) + key = append(key, byte(l), byte(l>>8)) + key = append(key, k...) + + l = len(v) + key = append(key, byte(l), byte(l>>8)) + key = append(key, v...) + + var raw [8]byte + binary.LittleEndian.PutUint64(raw[:], parent) + key = append(key, raw[:]...) + + binary.LittleEndian.PutUint64(raw[:], node) + key = append(key, raw[:]...) + return key +} + func toUint64(x uint64) []byte { var a [8]byte binary.LittleEndian.PutUint64(a[:], x) diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index 174bb529e1..21d143aa9c 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -43,6 +43,10 @@ func (f *memoryForest) TreeMove(cid cidSDK.ID, treeID string, op *Move) (*LogMov // TreeAddByPath implements the Forest interface. func (f *memoryForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, m []KeyValue) ([]LogMove, error) { + if !isAttributeInternal(attr) { + return nil, ErrNotPathAttribute + } + fullID := cid.String() + "/" + treeID s, ok := f.treeMap[fullID] if !ok { @@ -98,6 +102,10 @@ func (f *memoryForest) Close() error { // TreeGetByPath implements the Forest interface. func (f *memoryForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) { + if !isAttributeInternal(attr) { + return nil, ErrNotPathAttribute + } + fullID := cid.String() + "/" + treeID s, ok := f.treeMap[fullID] if !ok { diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go index 27f7128efa..322d5aab38 100644 --- a/pkg/local_object_storage/pilorama/forest_test.go +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -4,6 +4,7 @@ import ( "math/rand" "os" "path/filepath" + "strconv" "testing" cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -206,6 +207,12 @@ func testForestTreeAddByPath(t *testing.T, s Forest) { meta := []KeyValue{ {Key: AttributeVersion, Value: []byte("XXX")}, {Key: AttributeFilename, Value: []byte("file.txt")}} + + t.Run("invalid attribute", func(t *testing.T) { + _, err := s.TreeAddByPath(cid, treeID, AttributeVersion, []string{"yyy"}, meta) + require.ErrorIs(t, err, ErrNotPathAttribute) + }) + lm, err := s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta) require.NoError(t, err) require.Equal(t, 3, len(lm)) @@ -408,24 +415,33 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore ops[i] = Move{ Parent: 0, Meta: Meta{ - Time: Timestamp(i), - Items: []KeyValue{{Value: make([]byte, 10)}}, + Time: Timestamp(i), + Items: []KeyValue{ + {Key: AttributeFilename, Value: []byte(strconv.Itoa(i))}, + {Value: make([]byte, 10)}, + }, }, Child: uint64(i) + 1, } - rand.Read(ops[i].Meta.Items[0].Value) + rand.Read(ops[i].Meta.Items[1].Value) } for i := nodeCount; i < len(ops); i++ { ops[i] = Move{ Parent: rand.Uint64() % (nodeCount + 1), Meta: Meta{ - Time: Timestamp(i + nodeCount), - Items: []KeyValue{{Value: make([]byte, 10)}}, + Time: Timestamp(i + nodeCount), + Items: []KeyValue{ + {Key: AttributeFilename, Value: []byte(strconv.Itoa(i))}, + {Value: make([]byte, 10)}, + }, }, Child: rand.Uint64() % (nodeCount + 1), } - rand.Read(ops[i].Meta.Items[0].Value) + if rand.Uint32()%5 == 0 { + ops[i].Parent = TrashID + } + rand.Read(ops[i].Meta.Items[1].Value) } for i := range ops { require.NoError(t, expected.TreeApply(cid, treeID, &ops[i])) @@ -558,6 +574,11 @@ func testTreeGetByPath(t *testing.T, s Forest) { }) } + t.Run("invalid attribute", func(t *testing.T) { + _, err := s.TreeGetByPath(cid, treeID, AttributeVersion, []string{"", "TTT"}, false) + require.ErrorIs(t, err, ErrNotPathAttribute) + }) + nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"b", "cat1.jpg"}, false) require.NoError(t, err) require.Equal(t, []Node{4, 5}, nodes) diff --git a/pkg/local_object_storage/pilorama/types.go b/pkg/local_object_storage/pilorama/types.go index b73b26e840..013b7b5ffd 100644 --- a/pkg/local_object_storage/pilorama/types.go +++ b/pkg/local_object_storage/pilorama/types.go @@ -48,5 +48,16 @@ const ( TrashID = math.MaxUint64 ) -// ErrTreeNotFound is returned when the requested tree is not found. -var ErrTreeNotFound = errors.New("tree not found") +var ( + // ErrTreeNotFound is returned when the requested tree is not found. + ErrTreeNotFound = errors.New("tree not found") + // ErrNotPathAttribute is returned when the path is trying to be constructed with a non-internal + // attribute. Currently the only attribute allowed is AttributeFilename. + ErrNotPathAttribute = errors.New("attribute can't be used in path construction") +) + +// isAttributeInternal returns true iff key can be used in `*ByPath` methods. +// For such attributes an additional index is maintained in the database. +func isAttributeInternal(key string) bool { + return key == AttributeFilename +}