[#1059] services/tree: Fast sorted listing

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
pull/1049/head
Evgenii Stratonikov 2024-03-28 15:53:26 +03:00 committed by Evgenii Stratonikov
parent f23e38c285
commit e12fcc041d
10 changed files with 638 additions and 29 deletions

2
go.mod
View File

@ -14,6 +14,7 @@ require (
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
github.com/cheggaaa/pb v1.0.29
github.com/chzyer/readline v1.5.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
@ -64,7 +65,6 @@ require (
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.2-0.20231222162921-eb75782795d2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/davidmz/go-pageant v1.0.2 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect

View File

@ -209,6 +209,39 @@ func (e *StorageEngine) TreeGetChildren(ctx context.Context, cid cidSDK.ID, tree
return nil, err
}
// TreeSortedByFilename implements the pilorama.Forest interface.
func (e *StorageEngine) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node, last string, count int) ([]pilorama.NodeInfo, string, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeSortedByFilename",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
),
)
defer span.End()
var err error
var nodes []pilorama.NodeInfo
var cursor string
for _, sh := range e.sortShards(cid) {
nodes, cursor, err = sh.TreeSortedByFilename(ctx, cid, treeID, nodeID, last, count)
if err != nil {
if err == shard.ErrPiloramaDisabled {
break
}
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't perform `TreeSortedByFilename`", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
return nodes, cursor, nil
}
return nil, last, err
}
// TreeGetOpLog implements the pilorama.Forest interface.
func (e *StorageEngine) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (pilorama.Move, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetOpLog",

View File

@ -9,6 +9,7 @@ import (
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"time"
@ -986,6 +987,149 @@ func (t *boltForest) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID stri
return m, parentID, metaerr.Wrap(err)
}
func (t *boltForest) hasFewChildren(b *bbolt.Bucket, nodeID Node, threshold int) bool {
key := make([]byte, 9)
key[0] = 'c'
binary.LittleEndian.PutUint64(key[1:], nodeID)
count := 0
c := b.Cursor()
for k, _ := c.Seek(key); len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == nodeID; k, _ = c.Next() {
if count++; count > threshold {
return false
}
}
return true
}
// TreeSortedByFilename implements the Forest interface.
func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node, last string, count int) ([]NodeInfo, string, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeSortedByFilename", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeSortedByFilename",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, "", ErrDegradedMode
}
h := newHeap(last, count)
key := make([]byte, 9)
var result []NodeInfo
var fewChildren bool
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot == nil {
return ErrTreeNotFound
}
b := treeRoot.Bucket(dataBucket)
// If the node is a leaf, we could scan all filenames in the tree.
// To prevent this we first count the number of children: if it is less than
// the number of nodes we need to return, fallback to TreeGetChildren() implementation.
if fewChildren = t.hasFewChildren(b, nodeID, count); fewChildren {
var err error
result, err = t.getChildren(b, nodeID)
return err
}
t.fillSortedChildren(b, nodeID, h)
for info, ok := h.pop(); ok; info, ok = h.pop() {
childInfo, err := t.getChildInfo(b, key, info.id)
if err != nil {
return err
}
result = append(result, childInfo)
}
return nil
})
success = err == nil
if err != nil {
return nil, last, metaerr.Wrap(err)
}
if fewChildren {
sort.Slice(result, func(i, j int) bool {
return bytes.Compare(result[i].Meta.GetAttr(AttributeFilename), result[j].Meta.GetAttr(AttributeFilename)) == -1
})
for i := range result {
if bytes.Compare([]byte(last), result[i].Meta.GetAttr(AttributeFilename)) == -1 {
result = result[i:]
break
}
}
}
if len(result) != 0 {
last = string(result[len(result)-1].Meta.GetAttr(AttributeFilename))
}
return result, last, metaerr.Wrap(err)
}
func (t *boltForest) getChildInfo(b *bbolt.Bucket, key []byte, childID Node) (NodeInfo, error) {
childInfo := NodeInfo{ID: childID}
parentID, _, metaBytes, found := t.getState(b, stateKey(key, childID))
if found {
childInfo.ParentID = parentID
if err := childInfo.Meta.FromBytes(metaBytes); err != nil {
return NodeInfo{}, err
}
}
return childInfo, nil
}
func (t *boltForest) fillSortedChildren(b *bbolt.Bucket, nodeID Node, h *fixedHeap) {
c := b.Cursor()
prefix := internalKeyPrefix(nil, AttributeFilename)
length := uint16(0)
count := 0
for k, _ := c.Seek(prefix); len(k) > 0 && k[0] == 'i'; k, _ = c.Next() {
if len(k) < len(prefix)+2+16 {
continue
}
parentID := binary.LittleEndian.Uint64(k[len(k)-16:])
if parentID != nodeID {
continue
}
actualLength := binary.LittleEndian.Uint16(k[len(prefix):])
childID := binary.LittleEndian.Uint64(k[len(k)-8:])
filename := string(k[len(prefix)+2 : len(k)-16])
processed := h.push(childID, filename)
if actualLength != length {
length = actualLength
count = 1
} else if processed {
if count++; count > h.count {
length = actualLength + 1
c.Seek(append(prefix, byte(length), byte(length>>8)))
c.Prev() // c.Next() will be performed by for loop
}
}
}
}
// TreeGetChildren implements the Forest interface.
func (t *boltForest) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) ([]NodeInfo, error) {
var (
@ -1012,10 +1156,6 @@ func (t *boltForest) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID
return nil, ErrDegradedMode
}
key := make([]byte, 9)
key[0] = 'c'
binary.LittleEndian.PutUint64(key[1:], nodeID)
var result []NodeInfo
err := t.db.View(func(tx *bbolt.Tx) error {
@ -1025,27 +1165,34 @@ func (t *boltForest) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID
}
b := treeRoot.Bucket(dataBucket)
c := b.Cursor()
for k, _ := c.Seek(key); len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == nodeID; k, _ = c.Next() {
childID := binary.LittleEndian.Uint64(k[9:])
childInfo := NodeInfo{
ID: childID,
}
parentID, _, metaBytes, found := t.getState(b, stateKey(key, childID))
if found {
childInfo.ParentID = parentID
if err := childInfo.Meta.FromBytes(metaBytes); err != nil {
return err
}
}
result = append(result, childInfo)
}
return nil
var err error
result, err = t.getChildren(b, nodeID)
return err
})
success = err == nil
return result, metaerr.Wrap(err)
}
func (t *boltForest) getChildren(b *bbolt.Bucket, nodeID Node) ([]NodeInfo, error) {
var result []NodeInfo
key := make([]byte, 9)
key[0] = 'c'
binary.LittleEndian.PutUint64(key[1:], nodeID)
c := b.Cursor()
for k, _ := c.Seek(key); len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == nodeID; k, _ = c.Next() {
childID := binary.LittleEndian.Uint64(k[9:])
childInfo, err := t.getChildInfo(b, key, childID)
if err != nil {
return nil, err
}
result = append(result, childInfo)
}
return result, nil
}
// TreeList implements the Forest interface.
func (t *boltForest) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) {
var (
@ -1358,6 +1505,16 @@ func childrenKey(key []byte, child, parent Node) []byte {
return key[:childrenKeySize]
}
func internalKeyPrefix(key []byte, k string) []byte {
key = key[:0]
key = append(key, 'i')
l := len(k)
key = append(key, byte(l), byte(l>>8))
key = append(key, k...)
return key
}
// 'i' + attribute name (string) + attribute value (string) + parent (id) + node (id) -> 0/1.
func internalKey(key []byte, k, v string, parent, node Node) []byte {
size := 1 /* prefix */ + 2*2 /* len */ + 2*8 /* nodes */ + len(k) + len(v)
@ -1365,14 +1522,9 @@ func internalKey(key []byte, k, v string, parent, node Node) []byte {
key = make([]byte, 0, size)
}
key = key[:0]
key = append(key, 'i')
key = internalKeyPrefix(key, k)
l := len(k)
key = append(key, byte(l), byte(l>>8))
key = append(key, k...)
l = len(v)
l := len(v)
key = append(key, byte(l), byte(l>>8))
key = append(key, v...)

View File

@ -1,6 +1,7 @@
package pilorama
import (
"bytes"
"context"
"errors"
"fmt"
@ -154,6 +155,48 @@ func (f *memoryForest) TreeGetMeta(_ context.Context, cid cid.ID, treeID string,
return s.infoMap[nodeID].Meta, s.infoMap[nodeID].Parent, nil
}
// TreeSortedByFilename implements the Forest interface.
func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeID string, nodeID Node, start string, count int) ([]NodeInfo, string, error) {
fullID := cid.String() + "/" + treeID
s, ok := f.treeMap[fullID]
if !ok {
return nil, "", ErrTreeNotFound
}
if count == 0 {
return nil, start, nil
}
children := s.tree.getChildren(nodeID)
res := make([]NodeInfo, 0, len(children))
for _, childID := range children {
if len(s.infoMap[childID].Meta.GetAttr(AttributeFilename)) == 0 {
continue
}
res = append(res, NodeInfo{
ID: childID,
Meta: s.infoMap[childID].Meta,
ParentID: s.infoMap[childID].Parent,
})
}
if len(res) == 0 {
return res, "", nil
}
sort.Slice(res, func(i, j int) bool {
return bytes.Compare(res[i].Meta.GetAttr(AttributeFilename), res[j].Meta.GetAttr(AttributeFilename)) == -1
})
for i := range res {
if string(res[i].Meta.GetAttr(AttributeFilename)) > start {
finish := i + count
if len(res) < finish {
finish = len(res)
}
return res[i:finish], string(res[finish-1].Meta.GetAttr(AttributeFilename)), nil
}
}
return nil, string(res[len(res)-1].Meta.GetAttr(AttributeFilename)), nil
}
// TreeGetChildren implements the Forest interface.
func (f *memoryForest) TreeGetChildren(_ context.Context, cid cid.ID, treeID string, nodeID Node) ([]NodeInfo, error) {
fullID := cid.String() + "/" + treeID

View File

@ -7,6 +7,7 @@ import (
mrand "math/rand"
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -15,6 +16,8 @@ import (
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/davecgh/go-spew/spew"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
@ -178,6 +181,205 @@ func testForestTreeGetChildren(t *testing.T, s ForestStorage) {
})
}
func BenchmarkForestSortedIteration(b *testing.B) {
for i := range providers {
if providers[i].name == "inmemory" {
continue
}
cnr := cidtest.ID()
treeID := "version"
f := providers[i].construct(b)
const total = 100_000
d := CIDDescriptor{cnr, 0, 1}
for i := 0; i < total; i++ {
u, err := uuid.NewRandom()
if err != nil {
b.FailNow()
}
_, err = f.TreeMove(context.Background(), d, treeID, &Move{
Parent: RootID,
Child: RootID + Node(i+1),
Meta: Meta{
Time: Timestamp(i + 1),
Items: []KeyValue{{
Key: AttributeFilename, Value: []byte(u.String()),
}},
},
})
if err != nil {
b.FailNow()
}
}
b.Run(providers[i].name+",root", func(b *testing.B) {
for i := 0; i < b.N; i++ {
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, RootID, "", 100)
if err != nil || len(res) != 100 {
b.Fatalf("err %v, count %d", err, len(res))
}
}
})
b.Run(providers[i].name+",leaf", func(b *testing.B) {
for i := 0; i < b.N; i++ {
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, 1, "", 100)
if err != nil || len(res) != 0 {
b.FailNow()
}
}
})
}
}
func TestForest_TreeSortedIteration(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
testForestTreeSortedIteration(t, providers[i].construct(t))
})
}
}
func testForestTreeSortedIteration(t *testing.T, s ForestStorage) {
defer func() { require.NoError(t, s.Close()) }()
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
treeAdd := func(t *testing.T, ts int) {
_, err := s.TreeMove(context.Background(), d, treeID, &Move{
Child: RootID + uint64(ts),
Parent: RootID,
Meta: Meta{
Time: Timestamp(ts),
Items: []KeyValue{
{Key: AttributeFilename, Value: []byte(strconv.Itoa(ts))},
},
},
})
require.NoError(t, err)
}
const count = 9
for i := 0; i < count; i++ {
treeAdd(t, i+1)
}
var result []NodeInfo
treeAppend := func(t *testing.T, last string, count int) string {
res, cursor, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, RootID, last, count)
require.NoError(t, err)
result = append(result, res...)
spew.Dump(last, res)
return cursor
}
last := treeAppend(t, "", 2)
last = treeAppend(t, last, 3)
last = treeAppend(t, last, 0)
last = treeAppend(t, last, 1)
_ = treeAppend(t, last, 10)
require.Len(t, result, count)
for i := range result {
require.Equal(t, RootID+uint64(i+1), result[i].ID)
require.Equal(t, strconv.Itoa(RootID+i+1), string(result[i].Meta.GetAttr(AttributeFilename)))
}
}
func TestForest_TreeSortedFilename(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
testForestTreeSortedByFilename(t, providers[i].construct(t))
})
}
}
func testForestTreeSortedByFilename(t *testing.T, s ForestStorage) {
defer func() { require.NoError(t, s.Close()) }()
const controlAttr = "control_attr"
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
treeAddByPath := func(t *testing.T, filename string) {
path := strings.Split(filename, "/")
_, err := s.TreeAddByPath(context.Background(), d, treeID, AttributeFilename, path[:len(path)-1],
[]KeyValue{
{Key: AttributeFilename, Value: []byte(path[len(path)-1])},
{Key: controlAttr, Value: []byte(filename)},
},
)
require.NoError(t, err)
}
expectAttributes := func(t *testing.T, attr string, expected []string, res []NodeInfo) {
require.Equal(t, len(expected), len(res))
actual := make([]string, len(res))
for i := range actual {
actual[i] = string(res[i].Meta.GetAttr(attr))
}
require.Equal(t, expected, actual)
}
items := []string{
"a/bbb/ccc",
"a/bbb/xxx",
"a/bbb/z",
"b/bbb/ccc",
"b/xxx/z",
"c",
}
// Ensure we do not depend on insertion order in any way.
mrand.Shuffle(len(items), func(i, j int) {
items[i], items[j] = items[j], items[i]
})
for i := range items {
treeAddByPath(t, items[i])
}
getChildren := func(t *testing.T, id Node) []NodeInfo {
res, _, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, id, "", len(items))
require.NoError(t, err)
return res
}
res := getChildren(t, RootID)
expectAttributes(t, AttributeFilename, []string{"a", "b", "c"}, res)
expectAttributes(t, controlAttr, []string{"", "", "c"}, res)
{
ra := getChildren(t, res[0].ID)
expectAttributes(t, AttributeFilename, []string{"bbb"}, ra)
expectAttributes(t, controlAttr, []string{""}, ra)
rabbb := getChildren(t, ra[0].ID)
expectAttributes(t, AttributeFilename, []string{"ccc", "xxx", "z"}, rabbb)
expectAttributes(t, controlAttr, []string{"a/bbb/ccc", "a/bbb/xxx", "a/bbb/z"}, rabbb)
}
{
rb := getChildren(t, res[1].ID)
expectAttributes(t, AttributeFilename, []string{"bbb", "xxx"}, rb)
expectAttributes(t, controlAttr, []string{"", ""}, rb)
rbbbb := getChildren(t, rb[0].ID)
expectAttributes(t, AttributeFilename, []string{"ccc"}, rbbbb)
expectAttributes(t, controlAttr, []string{"b/bbb/ccc"}, rbbbb)
rbxxx := getChildren(t, rb[1].ID)
expectAttributes(t, AttributeFilename, []string{"z"}, rbxxx)
expectAttributes(t, controlAttr, []string{"b/xxx/z"}, rbxxx)
}
{
rc := getChildren(t, res[2].ID)
require.Len(t, rc, 0)
}
}
func TestForest_TreeDrop(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {

View File

@ -0,0 +1,64 @@
package pilorama
import (
"container/heap"
)
type heapInfo struct {
id Node
filename string
}
type filenameHeap []heapInfo
func (h filenameHeap) Len() int { return len(h) }
func (h filenameHeap) Less(i, j int) bool { return h[i].filename < h[j].filename }
func (h filenameHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *filenameHeap) Push(x any) {
*h = append(*h, x.(heapInfo))
}
func (h *filenameHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// fixedHeap maintains a fixed number of smallest elements started at some point.
type fixedHeap struct {
start string
max string
count int
h *filenameHeap
}
func newHeap(start string, count int) *fixedHeap {
h := new(filenameHeap)
heap.Init(h)
return &fixedHeap{
start: start,
max: "",
count: count,
h: h,
}
}
func (h *fixedHeap) push(id Node, filename string) bool {
if filename == "" || filename <= h.start {
return false
}
heap.Push(h.h, heapInfo{id: id, filename: filename})
if h.h.Len() > h.count {
heap.Remove(h.h, h.h.Len()-1)
}
return true
}
func (h *fixedHeap) pop() (heapInfo, bool) {
if h.h.Len() != 0 {
return heap.Pop(h.h).(heapInfo), true
}
return heapInfo{}, false
}

View File

@ -33,6 +33,9 @@ type Forest interface {
// TreeGetChildren returns children of the node with the specified ID. The order is arbitrary.
// Should return ErrTreeNotFound if the tree is not found, and empty result if the node is not in the tree.
TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) ([]NodeInfo, error)
// TreeSortedByFilename returns children of the node with the specified ID. The nodes are sorted by the filename attribute..
// Should return ErrTreeNotFound if the tree is not found, and empty result if the node is not in the tree.
TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node, last string, count int) ([]NodeInfo, string, error)
// TreeGetOpLog returns first log operation stored at or above the height.
// In case no such operation is found, empty Move and nil error should be returned.
TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (Move, error)

View File

@ -183,6 +183,31 @@ func (s *Shard) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID strin
return s.pilorama.TreeGetChildren(ctx, cid, treeID, nodeID)
}
// TreeSortedByFilename implements the pilorama.Forest interface.
func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node, last string, count int) ([]pilorama.NodeInfo, string, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeSortedByFilename",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
),
)
defer span.End()
if s.pilorama == nil {
return nil, "", ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, "", ErrDegradedMode
}
return s.pilorama.TreeSortedByFilename(ctx, cid, treeID, nodeID, last, count)
}
// TreeGetOpLog implements the pilorama.Forest interface.
func (s *Shard) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (pilorama.Move, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetOpLog",

View File

@ -174,7 +174,7 @@ func TestGetSubTreeOrderAsc(t *testing.T) {
paths = append(paths, path.Join(tree[j].path...))
}
}
require.True(t, found, "unknown node")
require.True(t, found, "unknown node %d %v", i, acc.seen[i].GetBody().GetNodeId())
}
require.True(t, sort.SliceIsSorted(paths, func(i, j int) bool {

View File

@ -440,7 +440,94 @@ func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeS
return getSubTree(srv.Context(), srv, cid, b, s.forest)
}
func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error {
const batchSize = 1000
type stackItem struct {
values []pilorama.NodeInfo
parent pilorama.Node
last string
}
// Traverse the tree in a DFS manner. Because we need to support arbitrary depth,
// recursive implementation is not suitable here, so we maintain explicit stack.
m, p, err := forest.TreeGetMeta(ctx, cid, b.GetTreeId(), b.GetRootId())
if err != nil {
return err
}
err = srv.Send(&GetSubTreeResponse{
Body: &GetSubTreeResponse_Body{
NodeId: b.GetRootId(),
ParentId: p,
Timestamp: m.Time,
Meta: metaToProto(m.Items),
},
})
if err != nil {
return err
}
stack := []stackItem{{
values: nil,
parent: b.GetRootId(),
last: "",
}}
for {
if len(stack) == 0 {
break
} else if item := &stack[len(stack)-1]; len(item.values) == 0 {
nodes, last, err := forest.TreeSortedByFilename(ctx, cid, b.GetTreeId(), item.parent, item.last, batchSize)
if err != nil {
return err
}
item.values = nodes
item.last = last
if len(nodes) == 0 {
stack = stack[:len(stack)-1]
continue
}
}
node := stack[len(stack)-1].values[0]
stack[len(stack)-1].values = stack[len(stack)-1].values[1:]
err = srv.Send(&GetSubTreeResponse{
Body: &GetSubTreeResponse_Body{
NodeId: node.ID,
ParentId: node.ParentID,
Timestamp: node.Meta.Time,
Meta: metaToProto(node.Meta.Items),
},
})
if err != nil {
return err
}
if b.GetDepth() == 0 || uint32(len(stack)) < b.GetDepth() {
children, last, err := forest.TreeSortedByFilename(ctx, cid, b.GetTreeId(), node.ID, "", batchSize)
if err != nil {
return err
}
if len(children) != 0 {
stack = append(stack, stackItem{
values: children,
parent: node.ID,
last: last,
})
}
}
}
return nil
}
func getSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error {
if b.GetOrderBy().GetDirection() == GetSubTreeRequest_Body_Order_Asc {
return getSortedSubTree(ctx, srv, cid, b, forest)
}
// Traverse the tree in a DFS manner. Because we need to support arbitrary depth,
// recursive implementation is not suitable here, so we maintain explicit stack.
m, p, err := forest.TreeGetMeta(ctx, cid, b.GetTreeId(), b.GetRootId())