diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 3853c4e3..15fb69c2 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -5,11 +5,13 @@ import ( "errors" "fmt" "strings" + "sync" "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" @@ -56,6 +58,18 @@ func (s EvacuateScope) String() string { return sb.String() } +func (s EvacuateScope) WithObjects() bool { + return s&EvacuateScopeObjects == EvacuateScopeObjects +} + +func (s EvacuateScope) WithTrees() bool { + return s&EvacuateScopeTrees == EvacuateScopeTrees +} + +func (s EvacuateScope) TreesOnly() bool { + return s == EvacuateScopeTrees +} + // EvacuateShardPrm represents parameters for the EvacuateShard operation. type EvacuateShardPrm struct { ShardID []*shard.ID @@ -264,7 +278,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) - err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res) + err = e.getTotals(ctx, prm, shardsToEvacuate, res) if err != nil { e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) @@ -293,19 +307,28 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p return nil } -func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error { - ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotalObjectsCount") +func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals") defer span.End() for _, sh := range shardsToEvacuate { - cnt, err := sh.LogicalObjectsCount(ctx) - if err != nil { - if errors.Is(err, shard.ErrDegradedMode) { - continue + if prm.Scope.WithObjects() { + cnt, err := sh.LogicalObjectsCount(ctx) + if err != nil { + if errors.Is(err, shard.ErrDegradedMode) { + continue + } + return err } - return err + res.objTotal.Add(cnt) + } + if prm.Scope.WithTrees() && sh.PiloramaEnabled() { + cnt, err := pilorama.TreeCountAll(ctx, sh) + if err != nil { + return err + } + res.trTotal.Add(cnt) } - res.objTotal.Add(cnt) } return nil } @@ -319,6 +342,24 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E )) defer span.End() + if prm.Scope.WithObjects() { + if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil { + return err + } + } + + if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() { + if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil { + return err + } + } + + return nil +} + +func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, + shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, +) error { var listPrm shard.ListWithCursorPrm listPrm.WithCount(defaultEvacuateBatchSize) @@ -349,6 +390,172 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E return nil } +func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, + shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, +) error { + sh := shardsToEvacuate[shardID] + + var listPrm pilorama.TreeListTreesPrm + first := true + + for len(listPrm.NextPageToken) > 0 || first { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + first = false + + listRes, err := sh.TreeListTrees(ctx, listPrm) + if err != nil { + return err + } + listPrm.NextPageToken = listRes.NextPageToken + if err := e.evacuateTrees(ctx, sh, listRes.Items, prm, res, shards, weights, shardsToEvacuate); err != nil { + return err + } + } + return nil +} + +func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, trees []pilorama.ContainerIDTreeID, + prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, weights []float64, + shardsToEvacuate map[string]*shard.Shard, +) error { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateTrees", + trace.WithAttributes( + attribute.Int("trees_count", len(trees)), + )) + defer span.End() + + for _, contTree := range trees { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + success, _, err := e.tryEvacuateTreeLocal(ctx, sh, contTree, prm, shards, weights, shardsToEvacuate) + if err != nil { + return err + } + if success { + res.trEvacuated.Add(1) + } else { + res.trFailed.Add(1) + } + } + return nil +} + +func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, + prm EvacuateShardPrm, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, +) (bool, string, error) { + target, found, err := e.findShardToEvacuateTree(ctx, tree, shards, weights, shardsToEvacuate) + if err != nil { + return false, "", err + } + if !found { + return false, "", nil + } + const readBatchSize = 1000 + source := make(chan *pilorama.Move, readBatchSize) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup + + wg.Add(1) + var applyErr error + go func() { + defer wg.Done() + + applyErr = target.TreeApplyStream(ctx, tree.CID, tree.TreeID, source) + if applyErr != nil { + cancel() + } + }() + + var height uint64 + for { + op, err := sh.TreeGetOpLog(ctx, tree.CID, tree.TreeID, height) + if err != nil { + cancel() + wg.Wait() + close(source) // close after cancel to ctx.Done() hits first + if prm.IgnoreErrors { + return false, "", nil + } + return false, "", err + } + + if op.Time == 0 { // completed get op log + close(source) + wg.Wait() + if applyErr == nil { + return true, target.ID().String(), nil + } + if prm.IgnoreErrors { + return false, "", nil + } + return false, "", applyErr + } + + select { + case <-ctx.Done(): // apply stream failed or operation cancelled + wg.Wait() + if prm.IgnoreErrors { + return false, "", nil + } + if applyErr != nil { + return false, "", applyErr + } + return false, "", ctx.Err() + case source <- &op: + } + + height = op.Time + 1 + } +} + +// findShardToEvacuateTree returns first shard according HRW or first shard with tree exists. +func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilorama.ContainerIDTreeID, + shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, +) (pooledShard, bool, error) { + hrw.SortHasherSliceByWeightValue(shards, weights, hrw.StringHash(tree.CID.EncodeToString())) + var result pooledShard + var found bool + for _, target := range shards { + select { + case <-ctx.Done(): + return pooledShard{}, false, ctx.Err() + default: + } + + if _, ok := shardsToEvacuate[target.ID().String()]; ok { + continue + } + + if !target.PiloramaEnabled() || target.GetMode().ReadOnly() { + continue + } + + if !found { + result = target + found = true + } + + exists, err := target.TreeExists(ctx, tree.CID, tree.TreeID) + if err != nil { + continue + } + if exists { + return target, true, nil + } + } + return result, found, nil +} + func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, []float64, error) { e.mtx.RLock() defer e.mtx.RUnlock() @@ -362,9 +569,13 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) if !sh.GetMode().ReadOnly() { return nil, nil, ErrMustBeReadOnly } + + if prm.Scope.TreesOnly() && !sh.PiloramaEnabled() { + return nil, nil, fmt.Errorf("shard %s doesn't have pilorama enabled", sh.ID()) + } } - if len(e.shards)-len(shardIDs) < 1 && prm.ObjectsHandler == nil { + if len(e.shards)-len(shardIDs) < 1 && prm.ObjectsHandler == nil && prm.Scope.WithObjects() { return nil, nil, errMustHaveTwoShards } diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index cadcc293..d234f8ee 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -14,6 +14,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" @@ -41,6 +42,10 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))), meta.WithPermissions(0o700), meta.WithEpochState(epochState{})), + shard.WithPiloramaOptions( + pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", id))), + pilorama.WithPerm(0o700), + ), } }) e, ids := te.engine, te.shardIDs @@ -48,36 +53,32 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng require.NoError(t, e.Init(context.Background())) objects := make([]*objectSDK.Object, 0, objPerShard*len(ids)) - - for _, sh := range ids { - obj := testutil.GenerateObjectWithCID(cidtest.ID()) - objects = append(objects, obj) - - var putPrm shard.PutPrm - putPrm.SetObject(obj) - _, err := e.shards[sh.String()].Put(context.Background(), putPrm) - require.NoError(t, err) + treeID := "version" + meta := []pilorama.KeyValue{ + {Key: pilorama.AttributeVersion, Value: []byte("XXX")}, + {Key: pilorama.AttributeFilename, Value: []byte("file.txt")}, } - for i := 0; ; i++ { - objects = append(objects, testutil.GenerateObjectWithCID(cidtest.ID())) + for _, sh := range ids { + for i := 0; i < objPerShard; i++ { + contID := cidtest.ID() + obj := testutil.GenerateObjectWithCID(contID) + objects = append(objects, obj) - var putPrm PutPrm - putPrm.WithObject(objects[len(objects)-1]) + var putPrm shard.PutPrm + putPrm.SetObject(obj) + _, err := e.shards[sh.String()].Put(context.Background(), putPrm) + require.NoError(t, err) - err := e.Put(context.Background(), putPrm) - require.NoError(t, err) - - res, err := e.shards[ids[len(ids)-1].String()].List(context.Background()) - require.NoError(t, err) - if len(res.AddressList()) == objPerShard { - break + _, err = e.shards[sh.String()].TreeAddByPath(context.Background(), pilorama.CIDDescriptor{CID: contID, Position: 0, Size: 1}, + treeID, pilorama.AttributeFilename, []string{"path", "to", "the", "file"}, meta) + require.NoError(t, err) } } return e, ids, objects } -func TestEvacuateShard(t *testing.T) { +func TestEvacuateShardObjects(t *testing.T) { t.Parallel() const objPerShard = 3 @@ -103,6 +104,7 @@ func TestEvacuateShard(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[2:3] + prm.Scope = EvacuateScopeObjects t.Run("must be read-only", func(t *testing.T) { res, err := e.Evacuate(context.Background(), prm) @@ -137,7 +139,7 @@ func TestEvacuateShard(t *testing.T) { checkHasObjects(t) } -func TestEvacuateNetwork(t *testing.T) { +func TestEvacuateObjectsNetwork(t *testing.T) { t.Parallel() errReplication := errors.New("handler error") @@ -174,6 +176,7 @@ func TestEvacuateNetwork(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[0:1] + prm.Scope = EvacuateScopeObjects res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errMustHaveTwoShards) @@ -198,6 +201,7 @@ func TestEvacuateNetwork(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[1:2] prm.ObjectsHandler = acceptOneOf(objects, 2) + prm.Scope = EvacuateScopeObjects res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) @@ -235,6 +239,7 @@ func TestEvacuateNetwork(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = evacuateIDs prm.ObjectsHandler = acceptOneOf(objects, totalCount-1) + prm.Scope = EvacuateScopeObjects res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) @@ -270,6 +275,7 @@ func TestEvacuateCancellation(t *testing.T) { } return nil } + prm.Scope = EvacuateScopeObjects ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -293,6 +299,7 @@ func TestEvacuateSingleProcess(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[1:2] + prm.Scope = EvacuateScopeObjects prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { select { case <-running: @@ -321,7 +328,7 @@ func TestEvacuateSingleProcess(t *testing.T) { require.NoError(t, eg.Wait()) } -func TestEvacuateAsync(t *testing.T) { +func TestEvacuateObjectsAsync(t *testing.T) { e, ids, _ := newEngineEvacuate(t, 2, 3) defer func() { require.NoError(t, e.Close(context.Background())) @@ -335,6 +342,7 @@ func TestEvacuateAsync(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[1:2] + prm.Scope = EvacuateScopeObjects prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { select { case <-running: @@ -393,3 +401,82 @@ func TestEvacuateAsync(t *testing.T) { require.NoError(t, eg.Wait()) } + +func TestEvacuateTreesLocal(t *testing.T) { + e, ids, _ := newEngineEvacuate(t, 2, 3) + defer func() { + require.NoError(t, e.Close(context.Background())) + }() + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + + var prm EvacuateShardPrm + prm.ShardID = ids[0:1] + prm.Scope = EvacuateScopeTrees + + expectedShardIDs := make([]string, 0, 1) + for _, id := range ids[0:1] { + expectedShardIDs = append(expectedShardIDs, id.String()) + } + + st, err := e.GetEvacuationState(context.Background()) + require.NoError(t, err, "get init state failed") + require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state") + require.Equal(t, uint64(0), st.TreesEvacuated(), "invalid init count") + require.Nil(t, st.StartedAt(), "invalid init started at") + require.Nil(t, st.FinishedAt(), "invalid init finished at") + require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") + require.Equal(t, "", st.ErrorMessage(), "invalid init error message") + + res, err := e.Evacuate(context.Background(), prm) + require.NotNil(t, res, "sync evacuation result must be not nil") + require.NoError(t, err, "evacuation failed") + + st, err = e.GetEvacuationState(context.Background()) + require.NoError(t, err, "get evacuation state failed") + require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus()) + + require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count") + require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count") + require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count") + require.NotNil(t, st.StartedAt(), "invalid final started at") + require.NotNil(t, st.FinishedAt(), "invalid final finished at") + require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids") + require.Equal(t, "", st.ErrorMessage(), "invalid final error message") + + sourceTrees, err := pilorama.TreeListAll(context.Background(), e.shards[ids[0].String()]) + require.NoError(t, err, "list source trees failed") + require.Len(t, sourceTrees, 3) + + for _, tr := range sourceTrees { + exists, err := e.shards[ids[1].String()].TreeExists(context.Background(), tr.CID, tr.TreeID) + require.NoError(t, err, "failed to check tree existance") + require.True(t, exists, "tree doesn't exists on target shard") + + var height uint64 + var sourceOps []pilorama.Move + for { + op, err := e.shards[ids[0].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height) + require.NoError(t, err) + if op.Time == 0 { + break + } + sourceOps = append(sourceOps, op) + height = op.Time + 1 + } + + height = 0 + var targetOps []pilorama.Move + for { + op, err := e.shards[ids[1].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height) + require.NoError(t, err) + if op.Time == 0 { + break + } + targetOps = append(targetOps, op) + height = op.Time + 1 + } + + require.Equal(t, sourceOps, targetOps) + } +} diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 39400391..61318fff 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -549,6 +549,58 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string return metaerr.Wrap(err) } +// TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed. +func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + t.metrics.AddMethodDuration("TreeApplyStream", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyStream", + trace.WithAttributes( + attribute.String("container_id", cnr.EncodeToString()), + attribute.String("tree_id", treeID), + ), + ) + defer span.End() + + t.modeMtx.RLock() + defer t.modeMtx.RUnlock() + + if t.mode.NoMetabase() { + return ErrDegradedMode + } else if t.mode.ReadOnly() { + return ErrReadOnlyMode + } + + fullID := bucketName(cnr, treeID) + err := metaerr.Wrap(t.db.Update(func(tx *bbolt.Tx) error { + bLog, bTree, err := t.getTreeBuckets(tx, fullID) + if err != nil { + return err + } + for { + select { + case <-ctx.Done(): + return ctx.Err() + case m, ok := <-source: + if !ok { + return nil + } + var lm Move + if e := t.applyOperation(bLog, bTree, []*Move{m}, &lm); e != nil { + return e + } + } + } + })) + success = err == nil + return err +} + func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) { t.mtx.Lock() for i := 0; i < len(t.batches); i++ { diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index 8a1e8614..ab617400 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -311,3 +311,27 @@ func (f *memoryForest) TreeListTrees(_ context.Context, prm TreeListTreesPrm) (* } return &result, nil } + +// TreeApplyStream implements ForestStorage. +func (f *memoryForest) TreeApplyStream(ctx context.Context, cnr cid.ID, treeID string, source <-chan *Move) error { + fullID := cnr.String() + "/" + treeID + s, ok := f.treeMap[fullID] + if !ok { + s = newMemoryTree() + f.treeMap[fullID] = s + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case m, ok := <-source: + if !ok { + return nil + } + if e := s.Apply(m); e != nil { + return e + } + } + } +} diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go index 3efd1a68..b3391407 100644 --- a/pkg/local_object_storage/pilorama/interface.go +++ b/pkg/local_object_storage/pilorama/interface.go @@ -66,6 +66,7 @@ type ForestStorage interface { // TreeListTrees returns all pairs "containerID:treeID". TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error) + TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error } const ( @@ -107,12 +108,17 @@ type TreeListTreesResult struct { Items []ContainerIDTreeID } -func TreeListAll(ctx context.Context, f ForestStorage) ([]ContainerIDTreeID, error) { +type treeList interface { + TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error) +} + +func TreeListAll(ctx context.Context, f treeList) ([]ContainerIDTreeID, error) { return treeListAll(ctx, f, treeListTreesBatchSizeDefault) } -func treeListAll(ctx context.Context, f ForestStorage, batchSize int) ([]ContainerIDTreeID, error) { +func treeListAll(ctx context.Context, f treeList, batchSize int) ([]ContainerIDTreeID, error) { var prm TreeListTreesPrm + prm.BatchSize = batchSize var result []ContainerIDTreeID first := true @@ -129,3 +135,22 @@ func treeListAll(ctx context.Context, f ForestStorage, batchSize int) ([]Contain return result, nil } + +func TreeCountAll(ctx context.Context, f treeList) (uint64, error) { + var prm TreeListTreesPrm + var result uint64 + first := true + + for len(prm.NextPageToken) > 0 || first { + first = false + + res, err := f.TreeListTrees(ctx, prm) + if err != nil { + return 0, err + } + prm.NextPageToken = res.NextPageToken + result += uint64(len(res.Items)) + } + + return result, nil +} diff --git a/pkg/local_object_storage/shard/tree.go b/pkg/local_object_storage/shard/tree.go index 017b3450..e78f2927 100644 --- a/pkg/local_object_storage/shard/tree.go +++ b/pkg/local_object_storage/shard/tree.go @@ -374,3 +374,32 @@ func (s *Shard) TreeListTrees(ctx context.Context, prm pilorama.TreeListTreesPrm } return s.pilorama.TreeListTrees(ctx, prm) } + +func (s *Shard) PiloramaEnabled() bool { + return s.pilorama != nil +} + +func (s *Shard) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *pilorama.Move) error { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyStream", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.String("container_id", cnr.EncodeToString()), + attribute.String("tree_id", treeID)), + ) + defer span.End() + + if s.pilorama == nil { + return ErrPiloramaDisabled + } + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.ReadOnly() { + return ErrReadOnlyMode + } + if s.info.Mode.NoMetabase() { + return ErrDegradedMode + } + return s.pilorama.TreeApplyStream(ctx, cnr, treeID, source) +}