diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 9fc95dbbb..ee28df426 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -906,7 +906,7 @@ func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID st b := treeRoot.Bucket(dataBucket) - i, curNode, err := t.getPathPrefix(b, attr, path[:len(path)-1]) + i, curNodes, err := t.getPathPrefixMultiTraversal(b, attr, path[:len(path)-1]) if err != nil { return err } @@ -918,21 +918,23 @@ func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID st c := b.Cursor() - attrKey := internalKey(nil, attr, path[len(path)-1], curNode, 0) - attrKey = attrKey[:len(attrKey)-8] - childKey, _ := c.Seek(attrKey) - for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) { - child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:]) - if latest { - _, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child)) - if ts >= maxTimestamp { - nodes = append(nodes[:0], child) - maxTimestamp = ts + for i := range curNodes { + attrKey := internalKey(nil, attr, path[len(path)-1], curNodes[i], 0) + attrKey = attrKey[:len(attrKey)-8] + childKey, _ := c.Seek(attrKey) + for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) { + child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:]) + if latest { + _, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child)) + if ts >= maxTimestamp { + nodes = append(nodes[:0], child) + maxTimestamp = ts + } + } else { + nodes = append(nodes, child) } - } else { - nodes = append(nodes, child) + childKey, _ = c.Next() } - childKey, _ = c.Next() } return nil })) @@ -1412,6 +1414,36 @@ func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (* return &res, nil } +func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr string, path []string) (int, []Node, error) { + c := bTree.Cursor() + + var curNodes []Node + nextNodes := []Node{RootID} + var attrKey []byte + + for i := range path { + curNodes, nextNodes = nextNodes, curNodes[:0] + for j := range curNodes { + attrKey = internalKey(attrKey, attr, path[i], curNodes[j], 0) + attrKey = attrKey[:len(attrKey)-8] + + childKey, value := c.Seek(attrKey) + for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) { + if len(value) == 1 && value[0] == 1 { + nextNodes = append(nextNodes, binary.LittleEndian.Uint64(childKey[len(childKey)-8:])) + } + childKey, value = c.Next() + } + } + + if len(nextNodes) == 0 { + return i, curNodes, nil + } + } + + return len(path), nextNodes, nil +} + func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) { c := bTree.Cursor() diff --git a/pkg/local_object_storage/pilorama/split_test.go b/pkg/local_object_storage/pilorama/split_test.go new file mode 100644 index 000000000..3bd581ce6 --- /dev/null +++ b/pkg/local_object_storage/pilorama/split_test.go @@ -0,0 +1,98 @@ +package pilorama + +import ( + "context" + "strings" + "testing" + + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + "github.com/stretchr/testify/require" +) + +func TestDuplicateDirectory(t *testing.T) { + for i := range providers { + if providers[i].name == "inmemory" { + continue + } + t.Run(providers[i].name, func(t *testing.T) { + testDuplicateDirectory(t, providers[i].construct(t)) + }) + } +} + +func testDuplicateDirectory(t *testing.T, f Forest) { + ctx := context.Background() + d := CIDDescriptor{CID: cidtest.ID(), Size: 1} + treeID := "sometree" + + treeApply := func(t *testing.T, parent, child uint64, filename string, internal bool) { + // Nothing magic here, we add items in order and children are unique. + // This simplifies function interface a bit. + ts := child + + kv := []KeyValue{{Key: AttributeFilename, Value: []byte(filename)}} + if !internal { + kv = append(kv, KeyValue{Key: "uniqueAttr", Value: []byte{byte(child)}}) + } + + err := f.TreeApply(ctx, d.CID, treeID, &Move{ + Parent: parent, + Child: child, + Meta: Meta{ + Time: ts, + Items: kv, + }, + }, true) + require.NoError(t, err) + } + + // The following tree is constructed: + // 0 + // [1] |-- dir1 (internal) + // [2] |-- value1 + // [3] |-- dir3 (internal) + // [4] |-- value3 + // [5] |-- dir1 (internal) + // [6] |-- value2 + // [7] |-- dir3 (internal) + // [8] |-- value4 + // [9] |-- dir2 (internal) + // [10] |-- value0 + treeApply(t, RootID, 1, "dir1", true) + treeApply(t, 1, 2, "value1", false) + treeApply(t, 1, 3, "dir3", true) + treeApply(t, 3, 4, "value3", false) + treeApply(t, RootID, 5, "dir1", true) + treeApply(t, 5, 6, "value2", false) + treeApply(t, 5, 7, "dir3", true) + treeApply(t, 7, 8, "value4", false) + treeApply(t, RootID, 9, "dir2", true) + treeApply(t, RootID, 10, "value0", false) + + // The compacted view: + // 0 + // [1,5] |-- dir1 (internal) + // [2] |-- value1 + // [3,7] |-- dir3 (internal) + // [4] |-- value3 + // [8] |-- value4 + // [6] |-- value2 + // [9] |-- dir2 (internal) + // [10] |-- value0 + testGetByPath := func(t *testing.T, p string) []byte { + pp := strings.Split(p, "/") + nodes, err := f.TreeGetByPath(context.Background(), d.CID, treeID, AttributeFilename, pp, false) + require.NoError(t, err) + require.Equal(t, 1, len(nodes)) + + meta, _, err := f.TreeGetMeta(ctx, d.CID, treeID, nodes[0]) + require.NoError(t, err) + require.Equal(t, []byte(pp[len(pp)-1]), meta.GetAttr(AttributeFilename)) + return meta.GetAttr("uniqueAttr") + } + + require.Equal(t, []byte{2}, testGetByPath(t, "dir1/value1")) + require.Equal(t, []byte{4}, testGetByPath(t, "dir1/dir3/value3")) + require.Equal(t, []byte{8}, testGetByPath(t, "dir1/dir3/value4")) + require.Equal(t, []byte{10}, testGetByPath(t, "value0")) +}