[#1251] pilorama: Allow traversing multiple branches in parallel

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2024-07-10 09:30:01 +03:00
parent e5767c9002
commit 3940bc17c1
18 changed files with 335 additions and 141 deletions

View file

@ -66,7 +66,7 @@ func move(cmd *cobra.Command, _ []string) {
Body: &tree.GetSubTreeRequest_Body{
ContainerId: rawCID,
TreeId: tid,
RootId: nid,
RootId: []uint64{nid},
Depth: 1,
BearerToken: bt,
},

View file

@ -68,7 +68,7 @@ func getSubTree(cmd *cobra.Command, _ []string) {
Body: &tree.GetSubTreeRequest_Body{
ContainerId: rawCID,
TreeId: tid,
RootId: rid,
RootId: []uint64{rid},
Depth: depth,
BearerToken: bt,
},
@ -83,10 +83,15 @@ func getSubTree(cmd *cobra.Command, _ []string) {
for ; err == nil; subtreeResp, err = resp.Recv() {
b := subtreeResp.GetBody()
if len(b.GetNodeId()) == 1 {
cmd.Printf("Node ID: %d\n", b.GetNodeId())
cmd.Println("\tParent ID: ", b.GetParentId())
cmd.Println("\tTimestamp: ", b.GetTimestamp())
} else {
cmd.Printf("Node IDs: %v\n", b.GetNodeId())
cmd.Println("\tParent IDs: ", b.GetParentId())
cmd.Println("\tTimestamps: ", b.GetTimestamp())
}
if meta := b.GetMeta(); len(meta) > 0 {
cmd.Println("\tMeta pairs: ")

View file

@ -210,18 +210,17 @@ func (e *StorageEngine) TreeGetChildren(ctx context.Context, cid cidSDK.ID, tree
}
// TreeSortedByFilename implements the pilorama.Forest interface.
func (e *StorageEngine) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node, last *string, count int) ([]pilorama.NodeInfo, *string, error) {
func (e *StorageEngine) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.MultiNode, last *string, count int) ([]pilorama.MultiNodeInfo, *string, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeSortedByFilename",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
),
)
defer span.End()
var err error
var nodes []pilorama.NodeInfo
var nodes []pilorama.MultiNodeInfo
var cursor *string
for _, sh := range e.sortShards(cid) {
nodes, cursor, err = sh.TreeSortedByFilename(ctx, cid, treeID, nodeID, last, count)

View file

@ -9,6 +9,7 @@ import (
"math/rand"
"os"
"path/filepath"
"slices"
"sort"
"strconv"
"sync"
@ -990,23 +991,26 @@ func (t *boltForest) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID stri
return m, parentID, metaerr.Wrap(err)
}
func (t *boltForest) hasFewChildren(b *bbolt.Bucket, nodeID Node, threshold int) bool {
func (t *boltForest) hasFewChildren(b *bbolt.Bucket, nodeIDs MultiNode, threshold int) bool {
key := make([]byte, 9)
key[0] = 'c'
binary.LittleEndian.PutUint64(key[1:], nodeID)
count := 0
for _, nodeID := range nodeIDs {
binary.LittleEndian.PutUint64(key[1:], nodeID)
c := b.Cursor()
for k, _ := c.Seek(key); len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == nodeID; k, _ = c.Next() {
if count++; count > threshold {
return false
}
}
}
return true
}
// TreeSortedByFilename implements the Forest interface.
func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node, last *string, count int) ([]NodeInfo, *string, error) {
func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeIDs MultiNode, last *string, count int) ([]MultiNodeInfo, *string, error) {
var (
startedAt = time.Now()
success = false
@ -1019,7 +1023,6 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
),
)
defer span.End()
@ -1030,6 +1033,9 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
if t.mode.NoMetabase() {
return nil, last, ErrDegradedMode
}
if len(nodeIDs) == 0 {
return nil, last, errors.New("empty node list")
}
h := newHeap(last, count)
key := make([]byte, 9)
@ -1048,21 +1054,23 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
// If the node is a leaf, we could scan all filenames in the tree.
// To prevent this we first count the number of children: if it is less than
// the number of nodes we need to return, fallback to TreeGetChildren() implementation.
if fewChildren = t.hasFewChildren(b, nodeID, count); fewChildren {
if fewChildren = t.hasFewChildren(b, nodeIDs, count); fewChildren {
var err error
result, err = t.getChildren(b, nodeID)
result, err = t.getChildren(b, nodeIDs)
return err
}
t.fillSortedChildren(b, nodeID, h)
t.fillSortedChildren(b, nodeIDs, h)
for info, ok := h.pop(); ok; info, ok = h.pop() {
childInfo, err := t.getChildInfo(b, key, info.id)
for _, id := range info.id {
childInfo, err := t.getChildInfo(b, key, id)
if err != nil {
return err
}
result = append(result, childInfo)
}
}
return nil
})
@ -1074,11 +1082,15 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
if fewChildren {
result = sortAndCut(result, last)
}
if len(result) != 0 {
s := string(result[len(result)-1].Meta.GetAttr(AttributeFilename))
res := mergeNodeInfos(result)
if len(res) > count {
res = res[:count]
}
if len(res) != 0 {
s := string(findAttr(res[len(res)-1].Meta, AttributeFilename))
last = &s
}
return result, last, metaerr.Wrap(err)
return res, last, metaerr.Wrap(err)
}
func sortAndCut(result []NodeInfo, last *string) []NodeInfo {
@ -1109,37 +1121,56 @@ func (t *boltForest) getChildInfo(b *bbolt.Bucket, key []byte, childID Node) (No
return childInfo, nil
}
func (t *boltForest) fillSortedChildren(b *bbolt.Bucket, nodeID Node, h *fixedHeap) {
func (t *boltForest) fillSortedChildren(b *bbolt.Bucket, nodeIDs MultiNode, h *fixedHeap) {
c := b.Cursor()
prefix := internalKeyPrefix(nil, AttributeFilename)
length := uint16(0)
count := 0
var nodes []uint64
var lastFilename *string
for k, _ := c.Seek(prefix); len(k) > 0 && k[0] == 'i'; k, _ = c.Next() {
if len(k) < len(prefix)+2+16 {
continue
}
parentID := binary.LittleEndian.Uint64(k[len(k)-16:])
if parentID != nodeID {
if !slices.Contains(nodeIDs, parentID) {
continue
}
actualLength := binary.LittleEndian.Uint16(k[len(prefix):])
childID := binary.LittleEndian.Uint64(k[len(k)-8:])
filename := string(k[len(prefix)+2 : len(k)-16])
processed := h.push(childID, filename)
if lastFilename == nil {
lastFilename = &filename
nodes = append(nodes, childID)
} else if *lastFilename == filename {
nodes = append(nodes, childID)
} else {
processed := h.push(nodes, *lastFilename)
nodes = MultiNode{childID}
lastFilename = &filename
if actualLength != length {
length = actualLength
count = 1
} else if processed {
if count++; count > h.count {
lastFilename = nil
nodes = nil
length = actualLength + 1
c.Seek(append(prefix, byte(length), byte(length>>8)))
c.Prev() // c.Next() will be performed by for loop
}
}
}
}
if len(nodes) != 0 && lastFilename != nil {
h.push(nodes, *lastFilename)
}
}
// TreeGetChildren implements the Forest interface.
@ -1179,17 +1210,18 @@ func (t *boltForest) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID
b := treeRoot.Bucket(dataBucket)
var err error
result, err = t.getChildren(b, nodeID)
result, err = t.getChildren(b, []Node{nodeID})
return err
})
success = err == nil
return result, metaerr.Wrap(err)
}
func (t *boltForest) getChildren(b *bbolt.Bucket, nodeID Node) ([]NodeInfo, error) {
func (t *boltForest) getChildren(b *bbolt.Bucket, nodeIDs MultiNode) ([]NodeInfo, error) {
var result []NodeInfo
key := make([]byte, 9)
for _, nodeID := range nodeIDs {
key[0] = 'c'
binary.LittleEndian.PutUint64(key[1:], nodeID)
@ -1202,6 +1234,7 @@ func (t *boltForest) getChildren(b *bbolt.Bucket, nodeID Node) ([]NodeInfo, erro
}
result = append(result, childInfo)
}
}
return result, nil
}

View file

@ -156,7 +156,7 @@ func (f *memoryForest) TreeGetMeta(_ context.Context, cid cid.ID, treeID string,
}
// TreeSortedByFilename implements the Forest interface.
func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeID string, nodeID Node, start *string, count int) ([]NodeInfo, *string, error) {
func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeID string, nodeIDs MultiNode, start *string, count int) ([]MultiNodeInfo, *string, error) {
fullID := cid.String() + "/" + treeID
s, ok := f.treeMap[fullID]
if !ok {
@ -166,8 +166,10 @@ func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeI
return nil, start, nil
}
var res []NodeInfo
for _, nodeID := range nodeIDs {
children := s.tree.getChildren(nodeID)
res := make([]NodeInfo, 0, len(children))
for _, childID := range children {
var found bool
for _, kv := range s.infoMap[childID].Meta.Items {
@ -185,21 +187,24 @@ func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeI
ParentID: s.infoMap[childID].Parent,
})
}
}
if len(res) == 0 {
return res, start, nil
return nil, start, nil
}
sort.Slice(res, func(i, j int) bool {
return bytes.Compare(res[i].Meta.GetAttr(AttributeFilename), res[j].Meta.GetAttr(AttributeFilename)) == -1
})
for i := range res {
if start == nil || string(res[i].Meta.GetAttr(AttributeFilename)) > *start {
r := mergeNodeInfos(res)
for i := range r {
if start == nil || string(findAttr(r[i].Meta, AttributeFilename)) > *start {
finish := i + count
if len(res) < finish {
finish = len(res)
}
last := string(res[finish-1].Meta.GetAttr(AttributeFilename))
return res[i:finish], &last, nil
last := string(findAttr(r[finish-1].Meta, AttributeFilename))
return r[i:finish], &last, nil
}
}
last := string(res[len(res)-1].Meta.GetAttr(AttributeFilename))

View file

@ -215,7 +215,7 @@ func BenchmarkForestSortedIteration(b *testing.B) {
b.Run(providers[i].name+",root", func(b *testing.B) {
for i := 0; i < b.N; i++ {
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, RootID, nil, 100)
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, MultiNode{RootID}, nil, 100)
if err != nil || len(res) != 100 {
b.Fatalf("err %v, count %d", err, len(res))
}
@ -223,7 +223,7 @@ func BenchmarkForestSortedIteration(b *testing.B) {
})
b.Run(providers[i].name+",leaf", func(b *testing.B) {
for i := 0; i < b.N; i++ {
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, 1, nil, 100)
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, MultiNode{1}, nil, 100)
if err != nil || len(res) != 0 {
b.FailNow()
}
@ -266,9 +266,9 @@ func testForestTreeSortedIteration(t *testing.T, s ForestStorage) {
treeAdd(t, i+1, strconv.Itoa(i+1))
}
var result []NodeInfo
var result []MultiNodeInfo
treeAppend := func(t *testing.T, last *string, count int) *string {
res, cursor, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, RootID, last, count)
res, cursor, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, MultiNode{RootID}, last, count)
require.NoError(t, err)
result = append(result, res...)
return cursor
@ -282,11 +282,11 @@ func testForestTreeSortedIteration(t *testing.T, s ForestStorage) {
require.Len(t, result, count)
for i := range result {
require.Equal(t, RootID+uint64(i+1), result[i].ID)
require.Equal(t, MultiNode{RootID + uint64(i+1)}, result[i].Children)
if i == 0 {
require.Equal(t, "", string(result[i].Meta.GetAttr(AttributeFilename)))
require.Equal(t, "", string(findAttr(result[i].Meta, AttributeFilename)))
} else {
require.Equal(t, strconv.Itoa(RootID+i+1), string(result[i].Meta.GetAttr(AttributeFilename)))
require.Equal(t, strconv.Itoa(RootID+i+1), string(findAttr(result[i].Meta, AttributeFilename)))
}
}
}
@ -318,12 +318,12 @@ func testForestTreeSortedByFilename(t *testing.T, s ForestStorage) {
require.NoError(t, err)
}
expectAttributes := func(t *testing.T, attr string, expected []string, res []NodeInfo) {
expectAttributes := func(t *testing.T, attr string, expected []string, res []MultiNodeInfo) {
require.Equal(t, len(expected), len(res))
actual := make([]string, len(res))
for i := range actual {
actual[i] = string(res[i].Meta.GetAttr(attr))
actual[i] = string(findAttr(res[i].Meta, attr))
}
require.Equal(t, expected, actual)
}
@ -345,40 +345,40 @@ func testForestTreeSortedByFilename(t *testing.T, s ForestStorage) {
treeAddByPath(t, items[i])
}
getChildren := func(t *testing.T, id Node) []NodeInfo {
getChildren := func(t *testing.T, id MultiNode) []MultiNodeInfo {
res, _, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, id, nil, len(items))
require.NoError(t, err)
return res
}
res := getChildren(t, RootID)
res := getChildren(t, MultiNode{RootID})
expectAttributes(t, AttributeFilename, []string{"a", "b", "c"}, res)
expectAttributes(t, controlAttr, []string{"", "", "c"}, res)
{
ra := getChildren(t, res[0].ID)
ra := getChildren(t, res[0].Children)
expectAttributes(t, AttributeFilename, []string{"bbb"}, ra)
expectAttributes(t, controlAttr, []string{""}, ra)
rabbb := getChildren(t, ra[0].ID)
rabbb := getChildren(t, ra[0].Children)
expectAttributes(t, AttributeFilename, []string{"ccc", "xxx", "z"}, rabbb)
expectAttributes(t, controlAttr, []string{"a/bbb/ccc", "a/bbb/xxx", "a/bbb/z"}, rabbb)
}
{
rb := getChildren(t, res[1].ID)
rb := getChildren(t, res[1].Children)
expectAttributes(t, AttributeFilename, []string{"bbb", "xxx"}, rb)
expectAttributes(t, controlAttr, []string{"", ""}, rb)
rbbbb := getChildren(t, rb[0].ID)
rbbbb := getChildren(t, rb[0].Children)
expectAttributes(t, AttributeFilename, []string{"ccc"}, rbbbb)
expectAttributes(t, controlAttr, []string{"b/bbb/ccc"}, rbbbb)
rbxxx := getChildren(t, rb[1].ID)
rbxxx := getChildren(t, rb[1].Children)
expectAttributes(t, AttributeFilename, []string{"z"}, rbxxx)
expectAttributes(t, controlAttr, []string{"b/xxx/z"}, rbxxx)
}
{
rc := getChildren(t, res[2].ID)
rc := getChildren(t, res[2].Children)
require.Len(t, rc, 0)
}
}

View file

@ -5,7 +5,7 @@ import (
)
type heapInfo struct {
id Node
id MultiNode
filename string
}
@ -44,7 +44,7 @@ func newHeap(start *string, count int) *fixedHeap {
}
}
func (h *fixedHeap) push(id Node, filename string) bool {
func (h *fixedHeap) push(id MultiNode, filename string) bool {
if h.start != nil && filename <= *h.start {
return false
}

View file

@ -35,7 +35,7 @@ type Forest interface {
TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) ([]NodeInfo, error)
// TreeSortedByFilename returns children of the node with the specified ID. The nodes are sorted by the filename attribute..
// Should return ErrTreeNotFound if the tree is not found, and empty result if the node is not in the tree.
TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node, last *string, count int) ([]NodeInfo, *string, error)
TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID MultiNode, last *string, count int) ([]MultiNodeInfo, *string, error)
// TreeGetOpLog returns first log operation stored at or above the height.
// In case no such operation is found, empty Move and nil error should be returned.
TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (Move, error)

View file

@ -21,7 +21,11 @@ func (x Meta) Bytes() []byte {
}
func (x Meta) GetAttr(name string) []byte {
for _, kv := range x.Items {
return findAttr(x.Items, name)
}
func findAttr(ms []KeyValue, name string) []byte {
for _, kv := range ms {
if kv.Key == name {
return kv.Value
}

View file

@ -0,0 +1,49 @@
package pilorama
import "bytes"
// MultiNode represents a group of internal nodes accessible by the same path, but having different id.
type MultiNode []Node
// MultiNodeInfo represents a group of internal nodes accessible by the same path, but having different id.
type MultiNodeInfo struct {
Children MultiNode
Parents MultiNode
Timestamps []uint64
Meta []KeyValue
}
func (r *MultiNodeInfo) Add(info NodeInfo) bool {
if !isInternal(info.Meta.Items) || !isInternal(r.Meta) ||
!bytes.Equal(r.Meta[0].Value, info.Meta.Items[0].Value) {
return false
}
r.Children = append(r.Children, info.ID)
r.Parents = append(r.Parents, info.ParentID)
r.Timestamps = append(r.Timestamps, info.Meta.Time)
return true
}
func (n NodeInfo) ToMultiNode() MultiNodeInfo {
return MultiNodeInfo{
Children: MultiNode{n.ID},
Parents: MultiNode{n.ParentID},
Timestamps: []uint64{n.Meta.Time},
Meta: n.Meta.Items,
}
}
func isInternal(m []KeyValue) bool {
return len(m) == 1 && m[0].Key == AttributeFilename
}
func mergeNodeInfos(ns []NodeInfo) []MultiNodeInfo {
var r []MultiNodeInfo
for _, info := range ns {
if len(r) == 0 || !r[len(r)-1].Add(info) {
r = append(r, info.ToMultiNode())
}
}
return r
}

View file

@ -95,4 +95,61 @@ func testDuplicateDirectory(t *testing.T, f Forest) {
require.Equal(t, []byte{4}, testGetByPath(t, "dir1/dir3/value3"))
require.Equal(t, []byte{8}, testGetByPath(t, "dir1/dir3/value4"))
require.Equal(t, []byte{10}, testGetByPath(t, "value0"))
testSortedByFilename := func(t *testing.T, root MultiNode, last *string, batchSize int) ([]MultiNodeInfo, *string) {
res, last, err := f.TreeSortedByFilename(context.Background(), d.CID, treeID, root, last, batchSize)
require.NoError(t, err)
return res, last
}
t.Run("test sorted listing, full children branch", func(t *testing.T) {
t.Run("big batch size", func(t *testing.T) {
res, _ := testSortedByFilename(t, MultiNode{RootID}, nil, 10)
require.Equal(t, 3, len(res))
require.Equal(t, MultiNode{1, 5}, res[0].Children)
require.Equal(t, MultiNode{9}, res[1].Children)
require.Equal(t, MultiNode{10}, res[2].Children)
t.Run("multi-root", func(t *testing.T) {
res, _ := testSortedByFilename(t, MultiNode{1, 5}, nil, 10)
require.Equal(t, 3, len(res))
require.Equal(t, MultiNode{3, 7}, res[0].Children)
require.Equal(t, MultiNode{2}, res[1].Children)
require.Equal(t, MultiNode{6}, res[2].Children)
})
})
t.Run("small batch size", func(t *testing.T) {
res, last := testSortedByFilename(t, MultiNode{RootID}, nil, 1)
require.Equal(t, 1, len(res))
require.Equal(t, MultiNode{1, 5}, res[0].Children)
res, last = testSortedByFilename(t, MultiNode{RootID}, last, 1)
require.Equal(t, 1, len(res))
require.Equal(t, MultiNode{9}, res[0].Children)
res, last = testSortedByFilename(t, MultiNode{RootID}, last, 1)
require.Equal(t, 1, len(res))
require.Equal(t, MultiNode{10}, res[0].Children)
res, _ = testSortedByFilename(t, MultiNode{RootID}, last, 1)
require.Equal(t, 0, len(res))
t.Run("multi-root", func(t *testing.T) {
res, last := testSortedByFilename(t, MultiNode{1, 5}, nil, 1)
require.Equal(t, 1, len(res))
require.Equal(t, MultiNode{3, 7}, res[0].Children)
res, last = testSortedByFilename(t, MultiNode{1, 5}, last, 1)
require.Equal(t, 1, len(res))
require.Equal(t, MultiNode{2}, res[0].Children)
res, last = testSortedByFilename(t, MultiNode{1, 5}, last, 1)
require.Equal(t, 1, len(res))
require.Equal(t, MultiNode{6}, res[0].Children)
res, _ = testSortedByFilename(t, MultiNode{RootID}, last, 1)
require.Equal(t, 0, len(res))
})
})
})
}

View file

@ -184,13 +184,12 @@ func (s *Shard) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID strin
}
// TreeSortedByFilename implements the pilorama.Forest interface.
func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node, last *string, count int) ([]pilorama.NodeInfo, *string, error) {
func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.MultiNode, last *string, count int) ([]pilorama.MultiNodeInfo, *string, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeSortedByFilename",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
),
)
defer span.End()

View file

@ -48,7 +48,7 @@ func TestGetSubTree(t *testing.T) {
acc := subTreeAcc{errIndex: errIndex}
err := getSubTree(context.Background(), &acc, d.CID, &GetSubTreeRequest_Body{
TreeId: treeID,
RootId: rootID,
RootId: []uint64{rootID},
Depth: depth,
}, p)
if errIndex == -1 {
@ -58,12 +58,12 @@ func TestGetSubTree(t *testing.T) {
}
// GetSubTree must return child only after is has returned the parent.
require.Equal(t, rootID, acc.seen[0].Body.NodeId)
require.Equal(t, rootID, acc.seen[0].Body.NodeId[0])
loop:
for i := 1; i < len(acc.seen); i++ {
parent := acc.seen[i].Body.ParentId
for j := 0; j < i; j++ {
if acc.seen[j].Body.NodeId == parent {
if acc.seen[j].Body.NodeId[0] == parent[0] {
continue loop
}
}
@ -73,16 +73,16 @@ func TestGetSubTree(t *testing.T) {
// GetSubTree must return valid meta.
for i := range acc.seen {
b := acc.seen[i].Body
meta, node, err := p.TreeGetMeta(context.Background(), d.CID, treeID, b.NodeId)
meta, node, err := p.TreeGetMeta(context.Background(), d.CID, treeID, b.NodeId[0])
require.NoError(t, err)
require.Equal(t, node, b.ParentId)
require.Equal(t, meta.Time, b.Timestamp)
require.Equal(t, node, b.ParentId[0])
require.Equal(t, meta.Time, b.Timestamp[0])
require.Equal(t, metaToProto(meta.Items), b.Meta)
}
ordered := make([]uint64, len(acc.seen))
for i := range acc.seen {
ordered[i] = acc.seen[i].Body.NodeId
ordered[i] = acc.seen[i].Body.NodeId[0]
}
return ordered
}
@ -184,7 +184,7 @@ func testGetSubTreeOrderAsc(t *testing.T, p pilorama.ForestStorage) {
}
found := false
for j := range tree {
if acc.seen[i].Body.NodeId == tree[j].id {
if acc.seen[i].Body.NodeId[0] == tree[j].id {
found = true
paths = append(paths, path.Join(tree[j].path...))
}
@ -205,7 +205,7 @@ func testGetSubTreeOrderAsc(t *testing.T, p pilorama.ForestStorage) {
}, p)
require.NoError(t, err)
require.Len(t, acc.seen, 1)
require.Equal(t, uint64(0), acc.seen[0].Body.NodeId)
require.Equal(t, uint64(0), acc.seen[0].Body.NodeId[0])
})
t.Run("depth=2", func(t *testing.T) {
acc := subTreeAcc{errIndex: -1}
@ -218,15 +218,16 @@ func testGetSubTreeOrderAsc(t *testing.T, p pilorama.ForestStorage) {
}, p)
require.NoError(t, err)
require.Len(t, acc.seen, 3)
require.Equal(t, uint64(0), acc.seen[0].Body.NodeId)
require.Equal(t, uint64(0), acc.seen[1].GetBody().GetParentId())
require.Equal(t, uint64(0), acc.seen[2].GetBody().GetParentId())
require.Equal(t, uint64(0), acc.seen[0].Body.NodeId[0])
require.Equal(t, uint64(0), acc.seen[1].GetBody().GetParentId()[0])
require.Equal(t, uint64(0), acc.seen[2].GetBody().GetParentId()[0])
})
}
var (
errSubTreeSend = errors.New("send finished with error")
errSubTreeSendAfterError = errors.New("send was invoked after an error occurred")
errInvalidResponse = errors.New("send got invalid response")
)
type subTreeAcc struct {
@ -239,6 +240,16 @@ type subTreeAcc struct {
var _ TreeService_GetSubTreeServer = &subTreeAcc{}
func (s *subTreeAcc) Send(r *GetSubTreeResponse) error {
b := r.GetBody()
if len(b.GetNodeId()) > 1 {
return errInvalidResponse
}
if len(b.GetParentId()) > 1 {
return errInvalidResponse
}
if len(b.GetTimestamp()) > 1 {
return errInvalidResponse
}
s.seen = append(s.seen, r)
if s.errIndex >= 0 {
if len(s.seen) == s.errIndex+1 {

View file

@ -16,6 +16,8 @@ import (
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Service represents tree-service capable of working with multiple
@ -440,29 +442,50 @@ func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeS
return getSubTree(srv.Context(), srv, cid, b, s.forest)
}
type stackItem struct {
values []pilorama.MultiNodeInfo
parent pilorama.MultiNode
last *string
}
func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error {
const batchSize = 1000
type stackItem struct {
values []pilorama.NodeInfo
parent pilorama.Node
last *string
// For backward compatibility.
rootIDs := b.GetRootId()
if len(rootIDs) == 0 {
rootIDs = []uint64{0}
}
// Traverse the tree in a DFS manner. Because we need to support arbitrary depth,
// recursive implementation is not suitable here, so we maintain explicit stack.
m, p, err := forest.TreeGetMeta(ctx, cid, b.GetTreeId(), b.GetRootId())
var ms []pilorama.KeyValue
var ps []uint64
var ts []uint64
for _, rootID := range rootIDs {
m, p, err := forest.TreeGetMeta(ctx, cid, b.GetTreeId(), rootID)
if err != nil {
return err
}
if ms == nil {
ms = m.Items
} else {
if len(m.Items) != 1 {
return status.Error(codes.InvalidArgument, "multiple non-internal nodes provided")
}
}
ts = append(ts, m.Time)
ps = append(ps, p)
}
stack := []stackItem{{
values: []pilorama.NodeInfo{{
ID: b.GetRootId(),
Meta: m,
ParentID: p,
values: []pilorama.MultiNodeInfo{{
Children: rootIDs,
Timestamps: ts,
Meta: ms,
Parents: ps,
}},
parent: p,
parent: ps,
}}
for {
@ -486,30 +509,20 @@ func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid
}
}
node := stack[len(stack)-1].values[0]
stack[len(stack)-1].values = stack[len(stack)-1].values[1:]
err = srv.Send(&GetSubTreeResponse{
Body: &GetSubTreeResponse_Body{
NodeId: node.ID,
ParentId: node.ParentID,
Timestamp: node.Meta.Time,
Meta: metaToProto(node.Meta.Items),
},
})
node, err := stackPopAndSend(stack, srv)
if err != nil {
return err
}
if b.GetDepth() == 0 || uint32(len(stack)) < b.GetDepth() {
children, last, err := forest.TreeSortedByFilename(ctx, cid, b.GetTreeId(), node.ID, nil, batchSize)
children, last, err := forest.TreeSortedByFilename(ctx, cid, b.GetTreeId(), node.Children, nil, batchSize)
if err != nil {
return err
}
if len(children) != 0 {
stack = append(stack, stackItem{
values: children,
parent: node.ID,
parent: node.Children,
last: last,
})
}
@ -518,19 +531,38 @@ func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid
return nil
}
func stackPopAndSend(stack []stackItem, srv TreeService_GetSubTreeServer) (pilorama.MultiNodeInfo, error) {
node := stack[len(stack)-1].values[0]
stack[len(stack)-1].values = stack[len(stack)-1].values[1:]
return node, srv.Send(&GetSubTreeResponse{
Body: &GetSubTreeResponse_Body{
NodeId: node.Children,
ParentId: node.Parents,
Timestamp: node.Timestamps,
Meta: metaToProto(node.Meta),
},
})
}
func getSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error {
if b.GetOrderBy().GetDirection() == GetSubTreeRequest_Body_Order_Asc {
return getSortedSubTree(ctx, srv, cid, b, forest)
}
var rootID uint64
if len(b.GetRootId()) > 0 {
rootID = b.GetRootId()[0]
}
// Traverse the tree in a DFS manner. Because we need to support arbitrary depth,
// recursive implementation is not suitable here, so we maintain explicit stack.
m, p, err := forest.TreeGetMeta(ctx, cid, b.GetTreeId(), b.GetRootId())
m, p, err := forest.TreeGetMeta(ctx, cid, b.GetTreeId(), rootID)
if err != nil {
return err
}
stack := [][]pilorama.NodeInfo{{{
ID: b.GetRootId(),
ID: rootID,
Meta: m,
ParentID: p,
}}}
@ -548,9 +580,9 @@ func getSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSD
err = srv.Send(&GetSubTreeResponse{
Body: &GetSubTreeResponse_Body{
NodeId: node.ID,
ParentId: node.ParentID,
Timestamp: node.Meta.Time,
NodeId: []uint64{node.ID},
ParentId: []uint64{node.ParentID},
Timestamp: []uint64{node.Meta.Time},
Meta: metaToProto(node.Meta.Items),
},
})

Binary file not shown.

View file

@ -242,8 +242,8 @@ message GetSubTreeRequest {
bytes container_id = 1;
// The name of the tree.
string tree_id = 2;
// ID of the root node of a subtree.
uint64 root_id = 3;
// IDs of the root nodes of a subtree forest.
repeated uint64 root_id = 3 [ packed = false ];
// Optional depth of the traversal. Zero means return only root.
// Maximum depth is 10.
uint32 depth = 4;
@ -262,11 +262,11 @@ message GetSubTreeRequest {
message GetSubTreeResponse {
message Body {
// ID of the node.
uint64 node_id = 1;
repeated uint64 node_id = 1 [ packed = false ];
// ID of the parent.
uint64 parent_id = 2;
repeated uint64 parent_id = 2 [ packed = false ];
// Time node was first added to a tree.
uint64 timestamp = 3;
repeated uint64 timestamp = 3 [ packed = false ];
// Node meta-information.
repeated KeyValue meta = 4;
}

Binary file not shown.

Binary file not shown.