[#947] tree: Add method to list all trees

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-02-05 14:09:58 +03:00
parent 80b581d499
commit 8e2a0611f4
5 changed files with 234 additions and 0 deletions

View file

@ -1134,6 +1134,68 @@ func (t *boltForest) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string)
return err
}
// TreeListTrees implements ForestStorage.
func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeListTrees", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeListTrees")
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
batchSize := prm.BatchSize
if batchSize <= 0 {
batchSize = treeListTreesBatchSizeDefault
}
var res TreeListTreesResult
err := metaerr.Wrap(t.db.View(func(tx *bbolt.Tx) error {
c := tx.Cursor()
checkNextPageToken := true
for k, _ := c.Seek(prm.NextPageToken); k != nil; k, _ = c.Next() {
if bytes.Equal(k, dataBucket) || bytes.Equal(k, logBucket) {
continue
}
if checkNextPageToken && bytes.Equal(k, prm.NextPageToken) {
checkNextPageToken = false
continue
}
var contID cidSDK.ID
if err := contID.Decode(k[:32]); err != nil {
return fmt.Errorf("failed to decode containerID: %w", err)
}
res.Items = append(res.Items, ContainerIDTreeID{
CID: contID,
TreeID: string(k[32:]),
})
if len(res.Items) == batchSize {
res.NextPageToken = make([]byte, len(k))
copy(res.NextPageToken, k)
break
}
}
return nil
}))
success = err == nil
if err != nil {
return nil, err
}
return &res, nil
}
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {
c := bTree.Cursor()

View file

@ -2,6 +2,7 @@ package pilorama
import (
"context"
"fmt"
"sort"
"strings"
@ -260,3 +261,53 @@ func (f *memoryForest) TreeLastSyncHeight(_ context.Context, cid cid.ID, treeID
}
return t.syncHeight, nil
}
// TreeListTrees implements Forest.
func (f *memoryForest) TreeListTrees(_ context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error) {
batchSize := prm.BatchSize
if batchSize <= 0 {
batchSize = treeListTreesBatchSizeDefault
}
tmpSlice := make([]string, 0, len(f.treeMap))
for k := range f.treeMap {
tmpSlice = append(tmpSlice, k)
}
sort.Strings(tmpSlice)
var idx int
if len(prm.NextPageToken) > 0 {
last := string(prm.NextPageToken)
idx, _ = sort.Find(len(tmpSlice), func(i int) int {
return -1 * strings.Compare(tmpSlice[i], last)
})
if idx == len(tmpSlice) {
return &TreeListTreesResult{}, nil
}
if tmpSlice[idx] == last {
idx++
}
}
var result TreeListTreesResult
for idx < len(tmpSlice) {
cidAndTree := strings.Split(tmpSlice[idx], "/")
if len(cidAndTree) != 2 {
return nil, fmt.Errorf("invalid format: key must be cid and treeID")
}
var contID cid.ID
if err := contID.DecodeString(cidAndTree[0]); err != nil {
return nil, fmt.Errorf("invalid format: %w", err)
}
result.Items = append(result.Items, ContainerIDTreeID{
CID: contID,
TreeID: cidAndTree[1],
})
if len(result.Items) == batchSize {
result.NextPageToken = []byte(tmpSlice[idx])
break
}
idx++
}
return &result, nil
}

View file

@ -1189,3 +1189,59 @@ func testTreeLastSyncHeight(t *testing.T, f ForestStorage) {
require.ErrorIs(t, err, ErrTreeNotFound)
})
}
func TestForest_ListTrees(t *testing.T) {
for i := range providers {
i := i
t.Run(providers[i].name, func(t *testing.T) {
testTreeListTrees(t, providers[i].construct)
})
}
}
func testTreeListTrees(t *testing.T, constructor func(t testing.TB, _ ...Option) ForestStorage) {
batchSize := 10
t.Run("empty", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, 0)
})
t.Run("count lower than batch size", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, batchSize-1)
})
t.Run("count equals batch size", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, batchSize)
})
t.Run("count greater than batch size", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, batchSize+1)
})
t.Run("count equals multiplied batch size", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, 3*batchSize)
})
t.Run("count equals multiplied batch size with addition", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, 3*batchSize+batchSize/2)
})
}
func testTreeListTreesCount(t *testing.T, constructor func(t testing.TB, _ ...Option) ForestStorage, batchSize, count int) {
f := constructor(t)
var expected []ContainerIDTreeID
treeIDs := []string{"version", "system", "s", "avada kedavra"}
for i := 0; i < count; i++ {
cid := cidtest.ID()
treeID := treeIDs[i%len(treeIDs)]
expected = append(expected, ContainerIDTreeID{
CID: cid,
TreeID: treeID,
})
ops := prepareRandomTree(5, 5)
for _, op := range ops {
require.NoError(t, f.TreeApply(context.Background(), cid, treeID, &op, false))
}
}
actual, err := treeListAll(context.Background(), f, batchSize)
require.NoError(t, err)
require.ElementsMatch(t, expected, actual)
}

View file

@ -63,6 +63,9 @@ type ForestStorage interface {
SetMode(m mode.Mode) error
SetParentID(id string)
Forest
// TreeListTrees returns all pairs "containerID:treeID".
TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error)
}
const (
@ -85,3 +88,44 @@ var ErrInvalidCIDDescriptor = logicerr.New("cid descriptor is invalid")
func (d CIDDescriptor) checkValid() bool {
return 0 <= d.Position && d.Position < d.Size
}
var treeListTreesBatchSizeDefault = 1000
type ContainerIDTreeID struct {
CID cidSDK.ID
TreeID string
}
type TreeListTreesPrm struct {
NextPageToken []byte
// BatchSize is batch size to list trees. If not lower or equals zero, than treeListTreesBatchSizeDefault is used.
BatchSize int
}
type TreeListTreesResult struct {
NextPageToken []byte
Items []ContainerIDTreeID
}
func TreeListAll(ctx context.Context, f ForestStorage) ([]ContainerIDTreeID, error) {
return treeListAll(ctx, f, treeListTreesBatchSizeDefault)
}
func treeListAll(ctx context.Context, f ForestStorage, batchSize int) ([]ContainerIDTreeID, error) {
var prm TreeListTreesPrm
var result []ContainerIDTreeID
first := true
for len(prm.NextPageToken) > 0 || first {
first = false
res, err := f.TreeListTrees(ctx, prm)
if err != nil {
return nil, err
}
prm.NextPageToken = res.NextPageToken
result = append(result, res.Items...)
}
return result, nil
}

View file

@ -353,3 +353,24 @@ func (s *Shard) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID st
}
return s.pilorama.TreeLastSyncHeight(ctx, cid, treeID)
}
func (s *Shard) TreeListTrees(ctx context.Context, prm pilorama.TreeListTreesPrm) (*pilorama.TreeListTreesResult, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeListTrees",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
),
)
defer span.End()
if s.pilorama == nil {
return nil, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
return s.pilorama.TreeListTrees(ctx, prm)
}