From 8e2a0611f4a93ec580122bb18920a3a49325f3b3 Mon Sep 17 00:00:00 2001
From: Dmitrii Stepanov <d.stepanov@yadro.com>
Date: Mon, 5 Feb 2024 14:09:58 +0300
Subject: [PATCH] [#947] tree: Add method to list all trees

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
---
 pkg/local_object_storage/pilorama/boltdb.go   | 62 +++++++++++++++++++
 pkg/local_object_storage/pilorama/forest.go   | 51 +++++++++++++++
 .../pilorama/forest_test.go                   | 56 +++++++++++++++++
 .../pilorama/interface.go                     | 44 +++++++++++++
 pkg/local_object_storage/shard/tree.go        | 21 +++++++
 5 files changed, 234 insertions(+)

diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go
index 52cdec586..39400391f 100644
--- a/pkg/local_object_storage/pilorama/boltdb.go
+++ b/pkg/local_object_storage/pilorama/boltdb.go
@@ -1134,6 +1134,68 @@ func (t *boltForest) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string)
 	return err
 }
 
+// TreeListTrees implements ForestStorage.
+func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error) {
+	var (
+		startedAt = time.Now()
+		success   = false
+	)
+	defer func() {
+		t.metrics.AddMethodDuration("TreeListTrees", time.Since(startedAt), success)
+	}()
+
+	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeListTrees")
+	defer span.End()
+
+	t.modeMtx.RLock()
+	defer t.modeMtx.RUnlock()
+
+	if t.mode.NoMetabase() {
+		return nil, ErrDegradedMode
+	}
+
+	batchSize := prm.BatchSize
+	if batchSize <= 0 {
+		batchSize = treeListTreesBatchSizeDefault
+	}
+	var res TreeListTreesResult
+	err := metaerr.Wrap(t.db.View(func(tx *bbolt.Tx) error {
+		c := tx.Cursor()
+		checkNextPageToken := true
+		for k, _ := c.Seek(prm.NextPageToken); k != nil; k, _ = c.Next() {
+			if bytes.Equal(k, dataBucket) || bytes.Equal(k, logBucket) {
+				continue
+			}
+
+			if checkNextPageToken && bytes.Equal(k, prm.NextPageToken) {
+				checkNextPageToken = false
+				continue
+			}
+
+			var contID cidSDK.ID
+			if err := contID.Decode(k[:32]); err != nil {
+				return fmt.Errorf("failed to decode containerID: %w", err)
+			}
+			res.Items = append(res.Items, ContainerIDTreeID{
+				CID:    contID,
+				TreeID: string(k[32:]),
+			})
+
+			if len(res.Items) == batchSize {
+				res.NextPageToken = make([]byte, len(k))
+				copy(res.NextPageToken, k)
+				break
+			}
+		}
+		return nil
+	}))
+	success = err == nil
+	if err != nil {
+		return nil, err
+	}
+	return &res, nil
+}
+
 func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {
 	c := bTree.Cursor()
 
diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go
index b82e5b3bd..8a1e86147 100644
--- a/pkg/local_object_storage/pilorama/forest.go
+++ b/pkg/local_object_storage/pilorama/forest.go
@@ -2,6 +2,7 @@ package pilorama
 
 import (
 	"context"
+	"fmt"
 	"sort"
 	"strings"
 
@@ -260,3 +261,53 @@ func (f *memoryForest) TreeLastSyncHeight(_ context.Context, cid cid.ID, treeID
 	}
 	return t.syncHeight, nil
 }
+
+// TreeListTrees implements Forest.
+func (f *memoryForest) TreeListTrees(_ context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error) {
+	batchSize := prm.BatchSize
+	if batchSize <= 0 {
+		batchSize = treeListTreesBatchSizeDefault
+	}
+	tmpSlice := make([]string, 0, len(f.treeMap))
+	for k := range f.treeMap {
+		tmpSlice = append(tmpSlice, k)
+	}
+	sort.Strings(tmpSlice)
+	var idx int
+	if len(prm.NextPageToken) > 0 {
+		last := string(prm.NextPageToken)
+		idx, _ = sort.Find(len(tmpSlice), func(i int) int {
+			return -1 * strings.Compare(tmpSlice[i], last)
+		})
+		if idx == len(tmpSlice) {
+			return &TreeListTreesResult{}, nil
+		}
+		if tmpSlice[idx] == last {
+			idx++
+		}
+	}
+
+	var result TreeListTreesResult
+	for idx < len(tmpSlice) {
+		cidAndTree := strings.Split(tmpSlice[idx], "/")
+		if len(cidAndTree) != 2 {
+			return nil, fmt.Errorf("invalid format: key must be cid and treeID")
+		}
+		var contID cid.ID
+		if err := contID.DecodeString(cidAndTree[0]); err != nil {
+			return nil, fmt.Errorf("invalid format: %w", err)
+		}
+
+		result.Items = append(result.Items, ContainerIDTreeID{
+			CID:    contID,
+			TreeID: cidAndTree[1],
+		})
+
+		if len(result.Items) == batchSize {
+			result.NextPageToken = []byte(tmpSlice[idx])
+			break
+		}
+		idx++
+	}
+	return &result, nil
+}
diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go
index 5813395f0..2e7c1f529 100644
--- a/pkg/local_object_storage/pilorama/forest_test.go
+++ b/pkg/local_object_storage/pilorama/forest_test.go
@@ -1189,3 +1189,59 @@ func testTreeLastSyncHeight(t *testing.T, f ForestStorage) {
 		require.ErrorIs(t, err, ErrTreeNotFound)
 	})
 }
+
+func TestForest_ListTrees(t *testing.T) {
+	for i := range providers {
+		i := i
+		t.Run(providers[i].name, func(t *testing.T) {
+			testTreeListTrees(t, providers[i].construct)
+		})
+	}
+}
+
+func testTreeListTrees(t *testing.T, constructor func(t testing.TB, _ ...Option) ForestStorage) {
+	batchSize := 10
+	t.Run("empty", func(t *testing.T) {
+		testTreeListTreesCount(t, constructor, batchSize, 0)
+	})
+	t.Run("count lower than batch size", func(t *testing.T) {
+		testTreeListTreesCount(t, constructor, batchSize, batchSize-1)
+	})
+	t.Run("count equals batch size", func(t *testing.T) {
+		testTreeListTreesCount(t, constructor, batchSize, batchSize)
+	})
+	t.Run("count greater than batch size", func(t *testing.T) {
+		testTreeListTreesCount(t, constructor, batchSize, batchSize+1)
+	})
+	t.Run("count equals multiplied batch size", func(t *testing.T) {
+		testTreeListTreesCount(t, constructor, batchSize, 3*batchSize)
+	})
+	t.Run("count equals multiplied batch size with addition", func(t *testing.T) {
+		testTreeListTreesCount(t, constructor, batchSize, 3*batchSize+batchSize/2)
+	})
+}
+
+func testTreeListTreesCount(t *testing.T, constructor func(t testing.TB, _ ...Option) ForestStorage, batchSize, count int) {
+	f := constructor(t)
+	var expected []ContainerIDTreeID
+
+	treeIDs := []string{"version", "system", "s", "avada kedavra"}
+	for i := 0; i < count; i++ {
+		cid := cidtest.ID()
+		treeID := treeIDs[i%len(treeIDs)]
+		expected = append(expected, ContainerIDTreeID{
+			CID:    cid,
+			TreeID: treeID,
+		})
+
+		ops := prepareRandomTree(5, 5)
+		for _, op := range ops {
+			require.NoError(t, f.TreeApply(context.Background(), cid, treeID, &op, false))
+		}
+	}
+
+	actual, err := treeListAll(context.Background(), f, batchSize)
+	require.NoError(t, err)
+
+	require.ElementsMatch(t, expected, actual)
+}
diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go
index e7f7eb512..3efd1a68b 100644
--- a/pkg/local_object_storage/pilorama/interface.go
+++ b/pkg/local_object_storage/pilorama/interface.go
@@ -63,6 +63,9 @@ type ForestStorage interface {
 	SetMode(m mode.Mode) error
 	SetParentID(id string)
 	Forest
+
+	// TreeListTrees returns all pairs "containerID:treeID".
+	TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error)
 }
 
 const (
@@ -85,3 +88,44 @@ var ErrInvalidCIDDescriptor = logicerr.New("cid descriptor is invalid")
 func (d CIDDescriptor) checkValid() bool {
 	return 0 <= d.Position && d.Position < d.Size
 }
+
+var treeListTreesBatchSizeDefault = 1000
+
+type ContainerIDTreeID struct {
+	CID    cidSDK.ID
+	TreeID string
+}
+
+type TreeListTreesPrm struct {
+	NextPageToken []byte
+	// BatchSize is batch size to list trees. If not lower or equals zero, than treeListTreesBatchSizeDefault is used.
+	BatchSize int
+}
+
+type TreeListTreesResult struct {
+	NextPageToken []byte
+	Items         []ContainerIDTreeID
+}
+
+func TreeListAll(ctx context.Context, f ForestStorage) ([]ContainerIDTreeID, error) {
+	return treeListAll(ctx, f, treeListTreesBatchSizeDefault)
+}
+
+func treeListAll(ctx context.Context, f ForestStorage, batchSize int) ([]ContainerIDTreeID, error) {
+	var prm TreeListTreesPrm
+	var result []ContainerIDTreeID
+	first := true
+
+	for len(prm.NextPageToken) > 0 || first {
+		first = false
+
+		res, err := f.TreeListTrees(ctx, prm)
+		if err != nil {
+			return nil, err
+		}
+		prm.NextPageToken = res.NextPageToken
+		result = append(result, res.Items...)
+	}
+
+	return result, nil
+}
diff --git a/pkg/local_object_storage/shard/tree.go b/pkg/local_object_storage/shard/tree.go
index 7795b820d..017b3450d 100644
--- a/pkg/local_object_storage/shard/tree.go
+++ b/pkg/local_object_storage/shard/tree.go
@@ -353,3 +353,24 @@ func (s *Shard) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID st
 	}
 	return s.pilorama.TreeLastSyncHeight(ctx, cid, treeID)
 }
+
+func (s *Shard) TreeListTrees(ctx context.Context, prm pilorama.TreeListTreesPrm) (*pilorama.TreeListTreesResult, error) {
+	ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeListTrees",
+		trace.WithAttributes(
+			attribute.String("shard_id", s.ID().String()),
+		),
+	)
+	defer span.End()
+
+	if s.pilorama == nil {
+		return nil, ErrPiloramaDisabled
+	}
+
+	s.m.RLock()
+	defer s.m.RUnlock()
+
+	if s.info.Mode.NoMetabase() {
+		return nil, ErrDegradedMode
+	}
+	return s.pilorama.TreeListTrees(ctx, prm)
+}