pilorama: Fix duplicate directory handling #1234
2 changed files with 144 additions and 14 deletions
|
@ -906,7 +906,7 @@ func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID st
|
|||
|
||||
b := treeRoot.Bucket(dataBucket)
|
||||
|
||||
i, curNode, err := t.getPathPrefix(b, attr, path[:len(path)-1])
|
||||
i, curNodes, err := t.getPathPrefixMultiTraversal(b, attr, path[:len(path)-1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -918,21 +918,23 @@ func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID st
|
|||
|
||||
c := b.Cursor()
|
||||
|
||||
attrKey := internalKey(nil, attr, path[len(path)-1], curNode, 0)
|
||||
attrKey = attrKey[:len(attrKey)-8]
|
||||
childKey, _ := c.Seek(attrKey)
|
||||
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
|
||||
child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:])
|
||||
if latest {
|
||||
_, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child))
|
||||
if ts >= maxTimestamp {
|
||||
nodes = append(nodes[:0], child)
|
||||
maxTimestamp = ts
|
||||
for i := range curNodes {
|
||||
attrKey := internalKey(nil, attr, path[len(path)-1], curNodes[i], 0)
|
||||
attrKey = attrKey[:len(attrKey)-8]
|
||||
childKey, _ := c.Seek(attrKey)
|
||||
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
|
||||
child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:])
|
||||
if latest {
|
||||
_, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child))
|
||||
if ts >= maxTimestamp {
|
||||
nodes = append(nodes[:0], child)
|
||||
maxTimestamp = ts
|
||||
}
|
||||
} else {
|
||||
nodes = append(nodes, child)
|
||||
}
|
||||
} else {
|
||||
nodes = append(nodes, child)
|
||||
childKey, _ = c.Next()
|
||||
}
|
||||
childKey, _ = c.Next()
|
||||
}
|
||||
return nil
|
||||
}))
|
||||
|
@ -1412,6 +1414,36 @@ func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*
|
|||
return &res, nil
|
||||
}
|
||||
|
||||
func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr string, path []string) (int, []Node, error) {
|
||||
c := bTree.Cursor()
|
||||
|
||||
var curNodes []Node
|
||||
nextNodes := []Node{RootID}
|
||||
var attrKey []byte
|
||||
|
||||
for i := range path {
|
||||
curNodes, nextNodes = nextNodes, curNodes[:0]
|
||||
for j := range curNodes {
|
||||
attrKey = internalKey(attrKey, attr, path[i], curNodes[j], 0)
|
||||
attrKey = attrKey[:len(attrKey)-8]
|
||||
|
||||
childKey, value := c.Seek(attrKey)
|
||||
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
|
||||
if len(value) == 1 && value[0] == 1 {
|
||||
nextNodes = append(nextNodes, binary.LittleEndian.Uint64(childKey[len(childKey)-8:]))
|
||||
}
|
||||
childKey, value = c.Next()
|
||||
}
|
||||
}
|
||||
|
||||
if len(nextNodes) == 0 {
|
||||
return i, curNodes, nil
|
||||
}
|
||||
}
|
||||
|
||||
return len(path), nextNodes, nil
|
||||
}
|
||||
|
||||
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {
|
||||
c := bTree.Cursor()
|
||||
|
||||
|
|
98
pkg/local_object_storage/pilorama/split_test.go
Normal file
98
pkg/local_object_storage/pilorama/split_test.go
Normal file
|
@ -0,0 +1,98 @@
|
|||
package pilorama
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDuplicateDirectory(t *testing.T) {
|
||||
for i := range providers {
|
||||
if providers[i].name == "inmemory" {
|
||||
continue
|
||||
}
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
testDuplicateDirectory(t, providers[i].construct(t))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testDuplicateDirectory(t *testing.T, f Forest) {
|
||||
ctx := context.Background()
|
||||
d := CIDDescriptor{CID: cidtest.ID(), Size: 1}
|
||||
treeID := "sometree"
|
||||
|
||||
treeApply := func(t *testing.T, parent, child uint64, filename string, internal bool) {
|
||||
// Nothing magic here, we add items in order and children are unique.
|
||||
// This simplifies function interface a bit.
|
||||
ts := child
|
||||
|
||||
kv := []KeyValue{{Key: AttributeFilename, Value: []byte(filename)}}
|
||||
if !internal {
|
||||
kv = append(kv, KeyValue{Key: "uniqueAttr", Value: []byte{byte(child)}})
|
||||
}
|
||||
|
||||
err := f.TreeApply(ctx, d.CID, treeID, &Move{
|
||||
Parent: parent,
|
||||
Child: child,
|
||||
Meta: Meta{
|
||||
Time: ts,
|
||||
Items: kv,
|
||||
},
|
||||
}, true)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// The following tree is constructed:
|
||||
// 0
|
||||
// [1] |-- dir1 (internal)
|
||||
// [2] |-- value1
|
||||
// [3] |-- dir3 (internal)
|
||||
// [4] |-- value3
|
||||
// [5] |-- dir1 (internal)
|
||||
// [6] |-- value2
|
||||
// [7] |-- dir3 (internal)
|
||||
// [8] |-- value4
|
||||
// [9] |-- dir2 (internal)
|
||||
// [10] |-- value0
|
||||
treeApply(t, RootID, 1, "dir1", true)
|
||||
treeApply(t, 1, 2, "value1", false)
|
||||
treeApply(t, 1, 3, "dir3", true)
|
||||
treeApply(t, 3, 4, "value3", false)
|
||||
treeApply(t, RootID, 5, "dir1", true)
|
||||
treeApply(t, 5, 6, "value2", false)
|
||||
treeApply(t, 5, 7, "dir3", true)
|
||||
treeApply(t, 7, 8, "value4", false)
|
||||
treeApply(t, RootID, 9, "dir2", true)
|
||||
treeApply(t, RootID, 10, "value0", false)
|
||||
|
||||
// The compacted view:
|
||||
// 0
|
||||
// [1,5] |-- dir1 (internal)
|
||||
// [2] |-- value1
|
||||
// [3,7] |-- dir3 (internal)
|
||||
// [4] |-- value3
|
||||
// [8] |-- value4
|
||||
// [6] |-- value2
|
||||
// [9] |-- dir2 (internal)
|
||||
// [10] |-- value0
|
||||
testGetByPath := func(t *testing.T, p string) []byte {
|
||||
pp := strings.Split(p, "/")
|
||||
nodes, err := f.TreeGetByPath(context.Background(), d.CID, treeID, AttributeFilename, pp, false)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(nodes))
|
||||
|
||||
meta, _, err := f.TreeGetMeta(ctx, d.CID, treeID, nodes[0])
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte(pp[len(pp)-1]), meta.GetAttr(AttributeFilename))
|
||||
return meta.GetAttr("uniqueAttr")
|
||||
}
|
||||
|
||||
require.Equal(t, []byte{2}, testGetByPath(t, "dir1/value1"))
|
||||
require.Equal(t, []byte{4}, testGetByPath(t, "dir1/dir3/value3"))
|
||||
require.Equal(t, []byte{8}, testGetByPath(t, "dir1/dir3/value4"))
|
||||
require.Equal(t, []byte{10}, testGetByPath(t, "value0"))
|
||||
}
|
Loading…
Reference in a new issue