diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 1487296a4..29a9306b6 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -1069,15 +1069,7 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr } if fewChildren { - sort.Slice(result, func(i, j int) bool { - return bytes.Compare(result[i].Meta.GetAttr(AttributeFilename), result[j].Meta.GetAttr(AttributeFilename)) == -1 - }) - for i := range result { - if bytes.Compare([]byte(last), result[i].Meta.GetAttr(AttributeFilename)) == -1 { - result = result[i:] - break - } - } + result = sortAndCut(result, []byte(last)) } if len(result) != 0 { last = string(result[len(result)-1].Meta.GetAttr(AttributeFilename)) @@ -1085,6 +1077,18 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr return result, last, metaerr.Wrap(err) } +func sortAndCut(result []NodeInfo, last []byte) []NodeInfo { + sort.Slice(result, func(i, j int) bool { + return bytes.Compare(result[i].Meta.GetAttr(AttributeFilename), result[j].Meta.GetAttr(AttributeFilename)) == -1 + }) + for i := range result { + if bytes.Compare(last, result[i].Meta.GetAttr(AttributeFilename)) == -1 { + return result[i:] + } + } + return nil +} + func (t *boltForest) getChildInfo(b *bbolt.Bucket, key []byte, childID Node) (NodeInfo, error) { childInfo := NodeInfo{ID: childID} parentID, _, metaBytes, found := t.getState(b, stateKey(key, childID)) diff --git a/pkg/services/tree/getsubtree_test.go b/pkg/services/tree/getsubtree_test.go index 4f009adb6..4bf679984 100644 --- a/pkg/services/tree/getsubtree_test.go +++ b/pkg/services/tree/getsubtree_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "path" + "path/filepath" "sort" "testing" @@ -123,9 +124,21 @@ func TestGetSubTree(t *testing.T) { } func TestGetSubTreeOrderAsc(t *testing.T) { + t.Run("memory forest", func(t *testing.T) { + testGetSubTreeOrderAsc(t, pilorama.NewMemoryForest()) + }) + + t.Run("boltdb forest", func(t *testing.T) { + p := pilorama.NewBoltForest(pilorama.WithPath(filepath.Join(t.TempDir(), "pilorama"))) + require.NoError(t, p.Open(context.Background(), 0644)) + require.NoError(t, p.Init()) + testGetSubTreeOrderAsc(t, p) + }) +} + +func testGetSubTreeOrderAsc(t *testing.T, p pilorama.ForestStorage) { d := pilorama.CIDDescriptor{CID: cidtest.ID(), Size: 1} treeID := "sometree" - p := pilorama.NewMemoryForest() tree := []struct { path []string @@ -151,35 +164,66 @@ func TestGetSubTreeOrderAsc(t *testing.T) { tree[i].id = lm[0].Child } - acc := subTreeAcc{errIndex: -1} - err := getSubTree(context.Background(), &acc, d.CID, &GetSubTreeRequest_Body{ - TreeId: treeID, - OrderBy: &GetSubTreeRequest_Body_Order{ - Direction: GetSubTreeRequest_Body_Order_Asc, - }, - }, p) - require.NoError(t, err) - // GetSubTree must return child only after is has returned the parent. - require.Equal(t, uint64(0), acc.seen[0].Body.NodeId) + t.Run("total", func(t *testing.T) { + t.Skip() + acc := subTreeAcc{errIndex: -1} + err := getSubTree(context.Background(), &acc, d.CID, &GetSubTreeRequest_Body{ + TreeId: treeID, + OrderBy: &GetSubTreeRequest_Body_Order{ + Direction: GetSubTreeRequest_Body_Order_Asc, + }, + }, p) + require.NoError(t, err) + // GetSubTree must return child only after is has returned the parent. + require.Equal(t, uint64(0), acc.seen[0].Body.NodeId) - paths := make([]string, 0, len(acc.seen)) - for i := range acc.seen { - if i == 0 { - continue - } - found := false - for j := range tree { - if acc.seen[i].Body.NodeId == tree[j].id { - found = true - paths = append(paths, path.Join(tree[j].path...)) + paths := make([]string, 0, len(acc.seen)) + for i := range acc.seen { + if i == 0 { + continue } + found := false + for j := range tree { + if acc.seen[i].Body.NodeId == tree[j].id { + found = true + paths = append(paths, path.Join(tree[j].path...)) + } + } + require.True(t, found, "unknown node %d %v", i, acc.seen[i].GetBody().GetNodeId()) } - require.True(t, found, "unknown node %d %v", i, acc.seen[i].GetBody().GetNodeId()) - } - require.True(t, sort.SliceIsSorted(paths, func(i, j int) bool { - return paths[i] < paths[j] - })) + require.True(t, sort.SliceIsSorted(paths, func(i, j int) bool { + return paths[i] < paths[j] + })) + }) + t.Run("depth=1", func(t *testing.T) { + acc := subTreeAcc{errIndex: -1} + err := getSubTree(context.Background(), &acc, d.CID, &GetSubTreeRequest_Body{ + TreeId: treeID, + Depth: 1, + OrderBy: &GetSubTreeRequest_Body_Order{ + Direction: GetSubTreeRequest_Body_Order_Asc, + }, + }, p) + require.NoError(t, err) + require.Len(t, acc.seen, 1) + require.Equal(t, uint64(0), acc.seen[0].Body.NodeId) + }) + t.Run("depth=2", func(t *testing.T) { + acc := subTreeAcc{errIndex: -1} + err := getSubTree(context.Background(), &acc, d.CID, &GetSubTreeRequest_Body{ + TreeId: treeID, + Depth: 2, + OrderBy: &GetSubTreeRequest_Body_Order{ + Direction: GetSubTreeRequest_Body_Order_Asc, + }, + }, p) + require.NoError(t, err) + require.Len(t, acc.seen, 3) + require.Equal(t, uint64(0), acc.seen[0].Body.NodeId) + require.Equal(t, uint64(0), acc.seen[1].GetBody().GetParentId()) + require.Equal(t, uint64(0), acc.seen[2].GetBody().GetParentId()) + }) } var ( diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index f715d9249..903db4455 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -456,28 +456,23 @@ func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid return err } - err = srv.Send(&GetSubTreeResponse{ - Body: &GetSubTreeResponse_Body{ - NodeId: b.GetRootId(), - ParentId: p, - Timestamp: m.Time, - Meta: metaToProto(m.Items), - }, - }) - if err != nil { - return err - } - stack := []stackItem{{ - values: nil, - parent: b.GetRootId(), - last: "", + values: []pilorama.NodeInfo{{ + ID: b.GetRootId(), + Meta: m, + ParentID: p, + }}, + parent: p, }} for { if len(stack) == 0 { break } else if item := &stack[len(stack)-1]; len(item.values) == 0 { + if len(stack) == 1 { + break + } + nodes, last, err := forest.TreeSortedByFilename(ctx, cid, b.GetTreeId(), item.parent, item.last, batchSize) if err != nil { return err