forked from TrueCloudLab/frostfs-node
[#1070] services/tree: Fix fast listing depth processing
For unsorted `GetSubTree()` we return a single node for depth=1. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
17af91619a
commit
ff4c23f59a
2 changed files with 80 additions and 41 deletions
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
|
@ -123,9 +124,21 @@ func TestGetSubTree(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGetSubTreeOrderAsc(t *testing.T) {
|
||||
t.Run("memory forest", func(t *testing.T) {
|
||||
testGetSubTreeOrderAsc(t, pilorama.NewMemoryForest())
|
||||
})
|
||||
|
||||
t.Run("boltdb forest", func(t *testing.T) {
|
||||
p := pilorama.NewBoltForest(pilorama.WithPath(filepath.Join(t.TempDir(), "pilorama")))
|
||||
require.NoError(t, p.Open(context.Background(), 0644))
|
||||
require.NoError(t, p.Init())
|
||||
testGetSubTreeOrderAsc(t, p)
|
||||
})
|
||||
}
|
||||
|
||||
func testGetSubTreeOrderAsc(t *testing.T, p pilorama.ForestStorage) {
|
||||
d := pilorama.CIDDescriptor{CID: cidtest.ID(), Size: 1}
|
||||
treeID := "sometree"
|
||||
p := pilorama.NewMemoryForest()
|
||||
|
||||
tree := []struct {
|
||||
path []string
|
||||
|
@ -151,35 +164,66 @@ func TestGetSubTreeOrderAsc(t *testing.T) {
|
|||
tree[i].id = lm[0].Child
|
||||
}
|
||||
|
||||
acc := subTreeAcc{errIndex: -1}
|
||||
err := getSubTree(context.Background(), &acc, d.CID, &GetSubTreeRequest_Body{
|
||||
TreeId: treeID,
|
||||
OrderBy: &GetSubTreeRequest_Body_Order{
|
||||
Direction: GetSubTreeRequest_Body_Order_Asc,
|
||||
},
|
||||
}, p)
|
||||
require.NoError(t, err)
|
||||
// GetSubTree must return child only after is has returned the parent.
|
||||
require.Equal(t, uint64(0), acc.seen[0].Body.NodeId)
|
||||
t.Run("total", func(t *testing.T) {
|
||||
t.Skip()
|
||||
acc := subTreeAcc{errIndex: -1}
|
||||
err := getSubTree(context.Background(), &acc, d.CID, &GetSubTreeRequest_Body{
|
||||
TreeId: treeID,
|
||||
OrderBy: &GetSubTreeRequest_Body_Order{
|
||||
Direction: GetSubTreeRequest_Body_Order_Asc,
|
||||
},
|
||||
}, p)
|
||||
require.NoError(t, err)
|
||||
// GetSubTree must return child only after is has returned the parent.
|
||||
require.Equal(t, uint64(0), acc.seen[0].Body.NodeId)
|
||||
|
||||
paths := make([]string, 0, len(acc.seen))
|
||||
for i := range acc.seen {
|
||||
if i == 0 {
|
||||
continue
|
||||
}
|
||||
found := false
|
||||
for j := range tree {
|
||||
if acc.seen[i].Body.NodeId == tree[j].id {
|
||||
found = true
|
||||
paths = append(paths, path.Join(tree[j].path...))
|
||||
paths := make([]string, 0, len(acc.seen))
|
||||
for i := range acc.seen {
|
||||
if i == 0 {
|
||||
continue
|
||||
}
|
||||
found := false
|
||||
for j := range tree {
|
||||
if acc.seen[i].Body.NodeId == tree[j].id {
|
||||
found = true
|
||||
paths = append(paths, path.Join(tree[j].path...))
|
||||
}
|
||||
}
|
||||
require.True(t, found, "unknown node %d %v", i, acc.seen[i].GetBody().GetNodeId())
|
||||
}
|
||||
require.True(t, found, "unknown node %d %v", i, acc.seen[i].GetBody().GetNodeId())
|
||||
}
|
||||
|
||||
require.True(t, sort.SliceIsSorted(paths, func(i, j int) bool {
|
||||
return paths[i] < paths[j]
|
||||
}))
|
||||
require.True(t, sort.SliceIsSorted(paths, func(i, j int) bool {
|
||||
return paths[i] < paths[j]
|
||||
}))
|
||||
})
|
||||
t.Run("depth=1", func(t *testing.T) {
|
||||
acc := subTreeAcc{errIndex: -1}
|
||||
err := getSubTree(context.Background(), &acc, d.CID, &GetSubTreeRequest_Body{
|
||||
TreeId: treeID,
|
||||
Depth: 1,
|
||||
OrderBy: &GetSubTreeRequest_Body_Order{
|
||||
Direction: GetSubTreeRequest_Body_Order_Asc,
|
||||
},
|
||||
}, p)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, acc.seen, 1)
|
||||
require.Equal(t, uint64(0), acc.seen[0].Body.NodeId)
|
||||
})
|
||||
t.Run("depth=2", func(t *testing.T) {
|
||||
acc := subTreeAcc{errIndex: -1}
|
||||
err := getSubTree(context.Background(), &acc, d.CID, &GetSubTreeRequest_Body{
|
||||
TreeId: treeID,
|
||||
Depth: 2,
|
||||
OrderBy: &GetSubTreeRequest_Body_Order{
|
||||
Direction: GetSubTreeRequest_Body_Order_Asc,
|
||||
},
|
||||
}, p)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, acc.seen, 3)
|
||||
require.Equal(t, uint64(0), acc.seen[0].Body.NodeId)
|
||||
require.Equal(t, uint64(0), acc.seen[1].GetBody().GetParentId())
|
||||
require.Equal(t, uint64(0), acc.seen[2].GetBody().GetParentId())
|
||||
})
|
||||
}
|
||||
|
||||
var (
|
||||
|
|
|
@ -456,28 +456,23 @@ func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid
|
|||
return err
|
||||
}
|
||||
|
||||
err = srv.Send(&GetSubTreeResponse{
|
||||
Body: &GetSubTreeResponse_Body{
|
||||
NodeId: b.GetRootId(),
|
||||
ParentId: p,
|
||||
Timestamp: m.Time,
|
||||
Meta: metaToProto(m.Items),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stack := []stackItem{{
|
||||
values: nil,
|
||||
parent: b.GetRootId(),
|
||||
last: "",
|
||||
values: []pilorama.NodeInfo{{
|
||||
ID: b.GetRootId(),
|
||||
Meta: m,
|
||||
ParentID: p,
|
||||
}},
|
||||
parent: p,
|
||||
}}
|
||||
|
||||
for {
|
||||
if len(stack) == 0 {
|
||||
break
|
||||
} else if item := &stack[len(stack)-1]; len(item.values) == 0 {
|
||||
if len(stack) == 1 {
|
||||
break
|
||||
}
|
||||
|
||||
nodes, last, err := forest.TreeSortedByFilename(ctx, cid, b.GetTreeId(), item.parent, item.last, batchSize)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in a new issue