pilorama: Fix duplicate directory handling #1234

Merged
fyrchik merged 2 commits from fyrchik/frostfs-node:fix-duplicate-directory into master 2024-07-10 06:11:42 +00:00
2 changed files with 144 additions and 14 deletions

View file

@ -906,7 +906,7 @@ func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID st
b := treeRoot.Bucket(dataBucket)
i, curNode, err := t.getPathPrefix(b, attr, path[:len(path)-1])
i, curNodes, err := t.getPathPrefixMultiTraversal(b, attr, path[:len(path)-1])
if err != nil {
return err
}
@ -918,21 +918,23 @@ func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID st
c := b.Cursor()
attrKey := internalKey(nil, attr, path[len(path)-1], curNode, 0)
attrKey = attrKey[:len(attrKey)-8]
childKey, _ := c.Seek(attrKey)
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:])
if latest {
_, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child))
if ts >= maxTimestamp {
nodes = append(nodes[:0], child)
maxTimestamp = ts
for i := range curNodes {
attrKey := internalKey(nil, attr, path[len(path)-1], curNodes[i], 0)
attrKey = attrKey[:len(attrKey)-8]
childKey, _ := c.Seek(attrKey)
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:])
if latest {
_, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child))
if ts >= maxTimestamp {
nodes = append(nodes[:0], child)
maxTimestamp = ts
}
} else {
nodes = append(nodes, child)
}
} else {
nodes = append(nodes, child)
childKey, _ = c.Next()
}
childKey, _ = c.Next()
}
return nil
}))
@ -1412,6 +1414,36 @@ func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*
return &res, nil
}
func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr string, path []string) (int, []Node, error) {
c := bTree.Cursor()
var curNodes []Node
nextNodes := []Node{RootID}
var attrKey []byte
for i := range path {
curNodes, nextNodes = nextNodes, curNodes[:0]
for j := range curNodes {
attrKey = internalKey(attrKey, attr, path[i], curNodes[j], 0)
attrKey = attrKey[:len(attrKey)-8]
childKey, value := c.Seek(attrKey)
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
if len(value) == 1 && value[0] == 1 {
nextNodes = append(nextNodes, binary.LittleEndian.Uint64(childKey[len(childKey)-8:]))
}
childKey, value = c.Next()
}
}
if len(nextNodes) == 0 {
return i, curNodes, nil
}
}
return len(path), nextNodes, nil
}
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {
c := bTree.Cursor()

View file

@ -0,0 +1,98 @@
package pilorama
import (
"context"
"strings"
"testing"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
)
func TestDuplicateDirectory(t *testing.T) {
for i := range providers {
if providers[i].name == "inmemory" {
continue
}
t.Run(providers[i].name, func(t *testing.T) {
testDuplicateDirectory(t, providers[i].construct(t))
})
}
}
func testDuplicateDirectory(t *testing.T, f Forest) {
ctx := context.Background()
d := CIDDescriptor{CID: cidtest.ID(), Size: 1}
treeID := "sometree"
treeApply := func(t *testing.T, parent, child uint64, filename string, internal bool) {
// Nothing magic here, we add items in order and children are unique.
// This simplifies function interface a bit.
ts := child
kv := []KeyValue{{Key: AttributeFilename, Value: []byte(filename)}}
if !internal {
kv = append(kv, KeyValue{Key: "uniqueAttr", Value: []byte{byte(child)}})
}
err := f.TreeApply(ctx, d.CID, treeID, &Move{
Parent: parent,
Child: child,
Meta: Meta{
Time: ts,
Items: kv,
},
}, true)
require.NoError(t, err)
}
// The following tree is constructed:
// 0
// [1] |-- dir1 (internal)
// [2] |-- value1
// [3] |-- dir3 (internal)
// [4] |-- value3
// [5] |-- dir1 (internal)
// [6] |-- value2
// [7] |-- dir3 (internal)
// [8] |-- value4
// [9] |-- dir2 (internal)
// [10] |-- value0
treeApply(t, RootID, 1, "dir1", true)
treeApply(t, 1, 2, "value1", false)
treeApply(t, 1, 3, "dir3", true)
treeApply(t, 3, 4, "value3", false)
treeApply(t, RootID, 5, "dir1", true)
treeApply(t, 5, 6, "value2", false)
treeApply(t, 5, 7, "dir3", true)
treeApply(t, 7, 8, "value4", false)
treeApply(t, RootID, 9, "dir2", true)
treeApply(t, RootID, 10, "value0", false)
// The compacted view:
// 0
// [1,5] |-- dir1 (internal)
// [2] |-- value1
// [3,7] |-- dir3 (internal)
// [4] |-- value3
// [8] |-- value4
// [6] |-- value2
// [9] |-- dir2 (internal)
// [10] |-- value0
testGetByPath := func(t *testing.T, p string) []byte {
pp := strings.Split(p, "/")
nodes, err := f.TreeGetByPath(context.Background(), d.CID, treeID, AttributeFilename, pp, false)
require.NoError(t, err)
require.Equal(t, 1, len(nodes))
meta, _, err := f.TreeGetMeta(ctx, d.CID, treeID, nodes[0])
require.NoError(t, err)
require.Equal(t, []byte(pp[len(pp)-1]), meta.GetAttr(AttributeFilename))
return meta.GetAttr("uniqueAttr")
}
require.Equal(t, []byte{2}, testGetByPath(t, "dir1/value1"))
require.Equal(t, []byte{4}, testGetByPath(t, "dir1/dir3/value3"))
require.Equal(t, []byte{8}, testGetByPath(t, "dir1/dir3/value4"))
require.Equal(t, []byte{10}, testGetByPath(t, "value0"))
}