[#947] engine: Evacuate trees to local shards

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-02-06 13:59:50 +03:00
parent e4064c4394
commit 728150d1d2
6 changed files with 463 additions and 35 deletions

View file

@ -5,11 +5,13 @@ import (
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
@ -56,6 +58,18 @@ func (s EvacuateScope) String() string {
return sb.String() return sb.String()
} }
func (s EvacuateScope) WithObjects() bool {
return s&EvacuateScopeObjects == EvacuateScopeObjects
}
func (s EvacuateScope) WithTrees() bool {
return s&EvacuateScopeTrees == EvacuateScopeTrees
}
func (s EvacuateScope) TreesOnly() bool {
return s == EvacuateScopeTrees
}
// EvacuateShardPrm represents parameters for the EvacuateShard operation. // EvacuateShardPrm represents parameters for the EvacuateShard operation.
type EvacuateShardPrm struct { type EvacuateShardPrm struct {
ShardID []*shard.ID ShardID []*shard.ID
@ -264,7 +278,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res) err = e.getTotals(ctx, prm, shardsToEvacuate, res)
if err != nil { if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField, e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
@ -293,11 +307,12 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return nil return nil
} }
func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error { func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotalObjectsCount") ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals")
defer span.End() defer span.End()
for _, sh := range shardsToEvacuate { for _, sh := range shardsToEvacuate {
if prm.Scope.WithObjects() {
cnt, err := sh.LogicalObjectsCount(ctx) cnt, err := sh.LogicalObjectsCount(ctx)
if err != nil { if err != nil {
if errors.Is(err, shard.ErrDegradedMode) { if errors.Is(err, shard.ErrDegradedMode) {
@ -307,6 +322,14 @@ func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacua
} }
res.objTotal.Add(cnt) res.objTotal.Add(cnt)
} }
if prm.Scope.WithTrees() && sh.PiloramaEnabled() {
cnt, err := pilorama.TreeCountAll(ctx, sh)
if err != nil {
return err
}
res.trTotal.Add(cnt)
}
}
return nil return nil
} }
@ -319,6 +342,24 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
)) ))
defer span.End() defer span.End()
if prm.Scope.WithObjects() {
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
return err
}
}
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
return err
}
}
return nil
}
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
) error {
var listPrm shard.ListWithCursorPrm var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize) listPrm.WithCount(defaultEvacuateBatchSize)
@ -349,6 +390,172 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
return nil return nil
} }
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
) error {
sh := shardsToEvacuate[shardID]
var listPrm pilorama.TreeListTreesPrm
first := true
for len(listPrm.NextPageToken) > 0 || first {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
first = false
listRes, err := sh.TreeListTrees(ctx, listPrm)
if err != nil {
return err
}
listPrm.NextPageToken = listRes.NextPageToken
if err := e.evacuateTrees(ctx, sh, listRes.Items, prm, res, shards, weights, shardsToEvacuate); err != nil {
return err
}
}
return nil
}
func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, trees []pilorama.ContainerIDTreeID,
prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, weights []float64,
shardsToEvacuate map[string]*shard.Shard,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateTrees",
trace.WithAttributes(
attribute.Int("trees_count", len(trees)),
))
defer span.End()
for _, contTree := range trees {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
success, _, err := e.tryEvacuateTreeLocal(ctx, sh, contTree, prm, shards, weights, shardsToEvacuate)
if err != nil {
return err
}
if success {
res.trEvacuated.Add(1)
} else {
res.trFailed.Add(1)
}
}
return nil
}
func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID,
prm EvacuateShardPrm, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
) (bool, string, error) {
target, found, err := e.findShardToEvacuateTree(ctx, tree, shards, weights, shardsToEvacuate)
if err != nil {
return false, "", err
}
if !found {
return false, "", nil
}
const readBatchSize = 1000
source := make(chan *pilorama.Move, readBatchSize)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
var applyErr error
go func() {
defer wg.Done()
applyErr = target.TreeApplyStream(ctx, tree.CID, tree.TreeID, source)
if applyErr != nil {
cancel()
}
}()
var height uint64
for {
op, err := sh.TreeGetOpLog(ctx, tree.CID, tree.TreeID, height)
if err != nil {
cancel()
wg.Wait()
close(source) // close after cancel to ctx.Done() hits first
if prm.IgnoreErrors {
return false, "", nil
}
return false, "", err
}
if op.Time == 0 { // completed get op log
close(source)
wg.Wait()
if applyErr == nil {
return true, target.ID().String(), nil
}
if prm.IgnoreErrors {
return false, "", nil
}
return false, "", applyErr
}
select {
case <-ctx.Done(): // apply stream failed or operation cancelled
wg.Wait()
if prm.IgnoreErrors {
return false, "", nil
}
if applyErr != nil {
return false, "", applyErr
}
return false, "", ctx.Err()
case source <- &op:
}
height = op.Time + 1
}
}
// findShardToEvacuateTree returns first shard according HRW or first shard with tree exists.
func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilorama.ContainerIDTreeID,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
) (pooledShard, bool, error) {
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.StringHash(tree.CID.EncodeToString()))
var result pooledShard
var found bool
for _, target := range shards {
select {
case <-ctx.Done():
return pooledShard{}, false, ctx.Err()
default:
}
if _, ok := shardsToEvacuate[target.ID().String()]; ok {
continue
}
if !target.PiloramaEnabled() || target.GetMode().ReadOnly() {
continue
}
if !found {
result = target
found = true
}
exists, err := target.TreeExists(ctx, tree.CID, tree.TreeID)
if err != nil {
continue
}
if exists {
return target, true, nil
}
}
return result, found, nil
}
func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, []float64, error) { func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, []float64, error) {
e.mtx.RLock() e.mtx.RLock()
defer e.mtx.RUnlock() defer e.mtx.RUnlock()
@ -362,9 +569,13 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
if !sh.GetMode().ReadOnly() { if !sh.GetMode().ReadOnly() {
return nil, nil, ErrMustBeReadOnly return nil, nil, ErrMustBeReadOnly
} }
if prm.Scope.TreesOnly() && !sh.PiloramaEnabled() {
return nil, nil, fmt.Errorf("shard %s doesn't have pilorama enabled", sh.ID())
}
} }
if len(e.shards)-len(shardIDs) < 1 && prm.ObjectsHandler == nil { if len(e.shards)-len(shardIDs) < 1 && prm.ObjectsHandler == nil && prm.Scope.WithObjects() {
return nil, nil, errMustHaveTwoShards return nil, nil, errMustHaveTwoShards
} }

View file

@ -14,6 +14,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
@ -41,6 +42,10 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))), meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))),
meta.WithPermissions(0o700), meta.WithPermissions(0o700),
meta.WithEpochState(epochState{})), meta.WithEpochState(epochState{})),
shard.WithPiloramaOptions(
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", id))),
pilorama.WithPerm(0o700),
),
} }
}) })
e, ids := te.engine, te.shardIDs e, ids := te.engine, te.shardIDs
@ -48,36 +53,32 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
require.NoError(t, e.Init(context.Background())) require.NoError(t, e.Init(context.Background()))
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids)) objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
treeID := "version"
meta := []pilorama.KeyValue{
{Key: pilorama.AttributeVersion, Value: []byte("XXX")},
{Key: pilorama.AttributeFilename, Value: []byte("file.txt")},
}
for _, sh := range ids { for _, sh := range ids {
obj := testutil.GenerateObjectWithCID(cidtest.ID()) for i := 0; i < objPerShard; i++ {
contID := cidtest.ID()
obj := testutil.GenerateObjectWithCID(contID)
objects = append(objects, obj) objects = append(objects, obj)
var putPrm shard.PutPrm var putPrm shard.PutPrm
putPrm.SetObject(obj) putPrm.SetObject(obj)
_, err := e.shards[sh.String()].Put(context.Background(), putPrm) _, err := e.shards[sh.String()].Put(context.Background(), putPrm)
require.NoError(t, err) require.NoError(t, err)
}
for i := 0; ; i++ { _, err = e.shards[sh.String()].TreeAddByPath(context.Background(), pilorama.CIDDescriptor{CID: contID, Position: 0, Size: 1},
objects = append(objects, testutil.GenerateObjectWithCID(cidtest.ID())) treeID, pilorama.AttributeFilename, []string{"path", "to", "the", "file"}, meta)
var putPrm PutPrm
putPrm.WithObject(objects[len(objects)-1])
err := e.Put(context.Background(), putPrm)
require.NoError(t, err) require.NoError(t, err)
res, err := e.shards[ids[len(ids)-1].String()].List(context.Background())
require.NoError(t, err)
if len(res.AddressList()) == objPerShard {
break
} }
} }
return e, ids, objects return e, ids, objects
} }
func TestEvacuateShard(t *testing.T) { func TestEvacuateShardObjects(t *testing.T) {
t.Parallel() t.Parallel()
const objPerShard = 3 const objPerShard = 3
@ -103,6 +104,7 @@ func TestEvacuateShard(t *testing.T) {
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[2:3] prm.ShardID = ids[2:3]
prm.Scope = EvacuateScopeObjects
t.Run("must be read-only", func(t *testing.T) { t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
@ -137,7 +139,7 @@ func TestEvacuateShard(t *testing.T) {
checkHasObjects(t) checkHasObjects(t)
} }
func TestEvacuateNetwork(t *testing.T) { func TestEvacuateObjectsNetwork(t *testing.T) {
t.Parallel() t.Parallel()
errReplication := errors.New("handler error") errReplication := errors.New("handler error")
@ -174,6 +176,7 @@ func TestEvacuateNetwork(t *testing.T) {
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[0:1] prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errMustHaveTwoShards) require.ErrorIs(t, err, errMustHaveTwoShards)
@ -198,6 +201,7 @@ func TestEvacuateNetwork(t *testing.T) {
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[1:2] prm.ShardID = ids[1:2]
prm.ObjectsHandler = acceptOneOf(objects, 2) prm.ObjectsHandler = acceptOneOf(objects, 2)
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication) require.ErrorIs(t, err, errReplication)
@ -235,6 +239,7 @@ func TestEvacuateNetwork(t *testing.T) {
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = evacuateIDs prm.ShardID = evacuateIDs
prm.ObjectsHandler = acceptOneOf(objects, totalCount-1) prm.ObjectsHandler = acceptOneOf(objects, totalCount-1)
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication) require.ErrorIs(t, err, errReplication)
@ -270,6 +275,7 @@ func TestEvacuateCancellation(t *testing.T) {
} }
return nil return nil
} }
prm.Scope = EvacuateScopeObjects
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cancel() cancel()
@ -293,6 +299,7 @@ func TestEvacuateSingleProcess(t *testing.T) {
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[1:2] prm.ShardID = ids[1:2]
prm.Scope = EvacuateScopeObjects
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select { select {
case <-running: case <-running:
@ -321,7 +328,7 @@ func TestEvacuateSingleProcess(t *testing.T) {
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
} }
func TestEvacuateAsync(t *testing.T) { func TestEvacuateObjectsAsync(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3) e, ids, _ := newEngineEvacuate(t, 2, 3)
defer func() { defer func() {
require.NoError(t, e.Close(context.Background())) require.NoError(t, e.Close(context.Background()))
@ -335,6 +342,7 @@ func TestEvacuateAsync(t *testing.T) {
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[1:2] prm.ShardID = ids[1:2]
prm.Scope = EvacuateScopeObjects
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select { select {
case <-running: case <-running:
@ -393,3 +401,82 @@ func TestEvacuateAsync(t *testing.T) {
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
} }
func TestEvacuateTreesLocal(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeTrees
expectedShardIDs := make([]string, 0, 1)
for _, id := range ids[0:1] {
expectedShardIDs = append(expectedShardIDs, id.String())
}
st, err := e.GetEvacuationState(context.Background())
require.NoError(t, err, "get init state failed")
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
require.Equal(t, uint64(0), st.TreesEvacuated(), "invalid init count")
require.Nil(t, st.StartedAt(), "invalid init started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
res, err := e.Evacuate(context.Background(), prm)
require.NotNil(t, res, "sync evacuation result must be not nil")
require.NoError(t, err, "evacuation failed")
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get evacuation state failed")
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count")
require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count")
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
sourceTrees, err := pilorama.TreeListAll(context.Background(), e.shards[ids[0].String()])
require.NoError(t, err, "list source trees failed")
require.Len(t, sourceTrees, 3)
for _, tr := range sourceTrees {
exists, err := e.shards[ids[1].String()].TreeExists(context.Background(), tr.CID, tr.TreeID)
require.NoError(t, err, "failed to check tree existance")
require.True(t, exists, "tree doesn't exists on target shard")
var height uint64
var sourceOps []pilorama.Move
for {
op, err := e.shards[ids[0].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height)
require.NoError(t, err)
if op.Time == 0 {
break
}
sourceOps = append(sourceOps, op)
height = op.Time + 1
}
height = 0
var targetOps []pilorama.Move
for {
op, err := e.shards[ids[1].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height)
require.NoError(t, err)
if op.Time == 0 {
break
}
targetOps = append(targetOps, op)
height = op.Time + 1
}
require.Equal(t, sourceOps, targetOps)
}
}

View file

@ -549,6 +549,58 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
// TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed.
func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeApplyStream", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyStream",
trace.WithAttributes(
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return ErrDegradedMode
} else if t.mode.ReadOnly() {
return ErrReadOnlyMode
}
fullID := bucketName(cnr, treeID)
err := metaerr.Wrap(t.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case m, ok := <-source:
if !ok {
return nil
}
var lm Move
if e := t.applyOperation(bLog, bTree, []*Move{m}, &lm); e != nil {
return e
}
}
}
}))
success = err == nil
return err
}
func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) { func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) {
t.mtx.Lock() t.mtx.Lock()
for i := 0; i < len(t.batches); i++ { for i := 0; i < len(t.batches); i++ {

View file

@ -311,3 +311,27 @@ func (f *memoryForest) TreeListTrees(_ context.Context, prm TreeListTreesPrm) (*
} }
return &result, nil return &result, nil
} }
// TreeApplyStream implements ForestStorage.
func (f *memoryForest) TreeApplyStream(ctx context.Context, cnr cid.ID, treeID string, source <-chan *Move) error {
fullID := cnr.String() + "/" + treeID
s, ok := f.treeMap[fullID]
if !ok {
s = newMemoryTree()
f.treeMap[fullID] = s
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case m, ok := <-source:
if !ok {
return nil
}
if e := s.Apply(m); e != nil {
return e
}
}
}
}

View file

@ -66,6 +66,7 @@ type ForestStorage interface {
// TreeListTrees returns all pairs "containerID:treeID". // TreeListTrees returns all pairs "containerID:treeID".
TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error)
TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error
} }
const ( const (
@ -107,12 +108,17 @@ type TreeListTreesResult struct {
Items []ContainerIDTreeID Items []ContainerIDTreeID
} }
func TreeListAll(ctx context.Context, f ForestStorage) ([]ContainerIDTreeID, error) { type treeList interface {
TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error)
}
func TreeListAll(ctx context.Context, f treeList) ([]ContainerIDTreeID, error) {
return treeListAll(ctx, f, treeListTreesBatchSizeDefault) return treeListAll(ctx, f, treeListTreesBatchSizeDefault)
} }
func treeListAll(ctx context.Context, f ForestStorage, batchSize int) ([]ContainerIDTreeID, error) { func treeListAll(ctx context.Context, f treeList, batchSize int) ([]ContainerIDTreeID, error) {
var prm TreeListTreesPrm var prm TreeListTreesPrm
prm.BatchSize = batchSize
var result []ContainerIDTreeID var result []ContainerIDTreeID
first := true first := true
@ -129,3 +135,22 @@ func treeListAll(ctx context.Context, f ForestStorage, batchSize int) ([]Contain
return result, nil return result, nil
} }
func TreeCountAll(ctx context.Context, f treeList) (uint64, error) {
var prm TreeListTreesPrm
var result uint64
first := true
for len(prm.NextPageToken) > 0 || first {
first = false
res, err := f.TreeListTrees(ctx, prm)
if err != nil {
return 0, err
}
prm.NextPageToken = res.NextPageToken
result += uint64(len(res.Items))
}
return result, nil
}

View file

@ -374,3 +374,32 @@ func (s *Shard) TreeListTrees(ctx context.Context, prm pilorama.TreeListTreesPrm
} }
return s.pilorama.TreeListTrees(ctx, prm) return s.pilorama.TreeListTrees(ctx, prm)
} }
func (s *Shard) PiloramaEnabled() bool {
return s.pilorama != nil
}
func (s *Shard) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *pilorama.Move) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyStream",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID)),
)
defer span.End()
if s.pilorama == nil {
return ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.pilorama.TreeApplyStream(ctx, cnr, treeID, source)
}