diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index ad432e40..3853c4e3 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -58,62 +58,93 @@ func (s EvacuateScope) String() string { // EvacuateShardPrm represents parameters for the EvacuateShard operation. type EvacuateShardPrm struct { - ShardID []*shard.ID - Handler func(context.Context, oid.Address, *objectSDK.Object) error - IgnoreErrors bool - Async bool - Scope EvacuateScope + ShardID []*shard.ID + ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) error + IgnoreErrors bool + Async bool + Scope EvacuateScope } // EvacuateShardRes represents result of the EvacuateShard operation. type EvacuateShardRes struct { - evacuated *atomic.Uint64 - total *atomic.Uint64 - failed *atomic.Uint64 - skipped *atomic.Uint64 + objEvacuated *atomic.Uint64 + objTotal *atomic.Uint64 + objFailed *atomic.Uint64 + objSkipped *atomic.Uint64 + + trEvacuated *atomic.Uint64 + trTotal *atomic.Uint64 + trFailed *atomic.Uint64 } // NewEvacuateShardRes creates new EvacuateShardRes instance. func NewEvacuateShardRes() *EvacuateShardRes { return &EvacuateShardRes{ - evacuated: new(atomic.Uint64), - total: new(atomic.Uint64), - failed: new(atomic.Uint64), - skipped: new(atomic.Uint64), + objEvacuated: new(atomic.Uint64), + objTotal: new(atomic.Uint64), + objFailed: new(atomic.Uint64), + objSkipped: new(atomic.Uint64), + trEvacuated: new(atomic.Uint64), + trTotal: new(atomic.Uint64), + trFailed: new(atomic.Uint64), } } -// Evacuated returns amount of evacuated objects. +// ObjectsEvacuated returns amount of evacuated objects. // Objects for which handler returned no error are also assumed evacuated. -func (p *EvacuateShardRes) Evacuated() uint64 { +func (p *EvacuateShardRes) ObjectsEvacuated() uint64 { if p == nil { return 0 } - return p.evacuated.Load() + return p.objEvacuated.Load() } -// Total returns total count objects to evacuate. -func (p *EvacuateShardRes) Total() uint64 { +// ObjectsTotal returns total count objects to evacuate. +func (p *EvacuateShardRes) ObjectsTotal() uint64 { if p == nil { return 0 } - return p.total.Load() + return p.objTotal.Load() } -// Failed returns count of failed objects to evacuate. -func (p *EvacuateShardRes) Failed() uint64 { +// ObjectsFailed returns count of failed objects to evacuate. +func (p *EvacuateShardRes) ObjectsFailed() uint64 { if p == nil { return 0 } - return p.failed.Load() + return p.objFailed.Load() } -// Skipped returns count of skipped objects. -func (p *EvacuateShardRes) Skipped() uint64 { +// ObjectsSkipped returns count of skipped objects. +func (p *EvacuateShardRes) ObjectsSkipped() uint64 { if p == nil { return 0 } - return p.skipped.Load() + return p.objSkipped.Load() +} + +// TreesEvacuated returns amount of evacuated trees. +func (p *EvacuateShardRes) TreesEvacuated() uint64 { + if p == nil { + return 0 + } + return p.trEvacuated.Load() +} + +// TreesTotal returns total count trees to evacuate. +func (p *EvacuateShardRes) TreesTotal() uint64 { + if p == nil { + return 0 + } + return p.trTotal.Load() +} + +// TreesFailed returns count of failed trees to evacuate. +func (p *EvacuateShardRes) TreesFailed() uint64 { + if p == nil { + return 0 + } + return p.trFailed.Load() } // DeepCopy returns deep copy of result instance. @@ -123,16 +154,22 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes { } res := &EvacuateShardRes{ - evacuated: new(atomic.Uint64), - total: new(atomic.Uint64), - failed: new(atomic.Uint64), - skipped: new(atomic.Uint64), + objEvacuated: new(atomic.Uint64), + objTotal: new(atomic.Uint64), + objFailed: new(atomic.Uint64), + objSkipped: new(atomic.Uint64), + trEvacuated: new(atomic.Uint64), + trTotal: new(atomic.Uint64), + trFailed: new(atomic.Uint64), } - res.evacuated.Store(p.evacuated.Load()) - res.total.Store(p.total.Load()) - res.failed.Store(p.failed.Load()) - res.skipped.Store(p.skipped.Load()) + res.objEvacuated.Store(p.objEvacuated.Load()) + res.objTotal.Store(p.objTotal.Load()) + res.objFailed.Store(p.objFailed.Load()) + res.objSkipped.Store(p.objSkipped.Load()) + res.trTotal.Store(p.trTotal.Load()) + res.trEvacuated.Store(p.trEvacuated.Load()) + res.trFailed.Store(p.trFailed.Load()) return res } @@ -168,7 +205,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev )) defer span.End() - shards, weights, err := e.getActualShards(shardIDs, prm.Handler != nil) + shards, weights, err := e.getActualShards(shardIDs, prm) if err != nil { return nil, err } @@ -216,6 +253,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p attribute.StringSlice("shardIDs", shardIDs), attribute.Bool("async", prm.Async), attribute.Bool("ignoreErrors", prm.IgnoreErrors), + attribute.Stringer("scope", prm.Scope), )) defer func() { @@ -224,18 +262,19 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p }() e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res) if err != nil { e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) return err } for _, shardID := range shardIDs { if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil { - e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField) + e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) return err } } @@ -243,10 +282,13 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, - zap.Uint64("total", res.Total()), - zap.Uint64("evacuated", res.Evacuated()), - zap.Uint64("failed", res.Failed()), - zap.Uint64("skipped", res.Skipped()), + zap.Uint64("total_objects", res.ObjectsTotal()), + zap.Uint64("evacuated_objects", res.ObjectsEvacuated()), + zap.Uint64("failed_objects", res.ObjectsFailed()), + zap.Uint64("skipped_objects", res.ObjectsSkipped()), + zap.Uint64("total_trees", res.TreesTotal()), + zap.Uint64("evacuated_trees", res.TreesEvacuated()), + zap.Uint64("failed_trees", res.TreesFailed()), ) return nil } @@ -263,7 +305,7 @@ func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacua } return err } - res.total.Add(cnt) + res.objTotal.Add(cnt) } return nil } @@ -307,7 +349,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E return nil } -func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) ([]pooledShard, []float64, error) { +func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, []float64, error) { e.mtx.RLock() defer e.mtx.RUnlock() @@ -322,7 +364,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) } } - if len(e.shards)-len(shardIDs) < 1 && !handlerDefined { + if len(e.shards)-len(shardIDs) < 1 && prm.ObjectsHandler == nil { return nil, nil, errMustHaveTwoShards } @@ -368,7 +410,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to getRes, err := sh.Get(ctx, getPrm) if err != nil { if prm.IgnoreErrors { - res.failed.Add(1) + res.objFailed.Add(1) continue } e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, @@ -385,19 +427,19 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to continue } - if prm.Handler == nil { + if prm.ObjectsHandler == nil { // Do not check ignoreErrors flag here because // ignoring errors on put make this command kinda useless. return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) } - err = prm.Handler(ctx, addr, getRes.Object()) + err = prm.ObjectsHandler(ctx, addr, getRes.Object()) if err != nil { e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err } - res.evacuated.Add(1) + res.objEvacuated.Add(1) } return nil } @@ -418,7 +460,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add } switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status { case putToShardSuccess: - res.evacuated.Add(1) + res.objEvacuated.Add(1) e.log.Debug(logs.EngineObjectIsMovedToAnotherShard, zap.Stringer("from", sh.ID()), zap.Stringer("to", shards[j].ID()), @@ -427,7 +469,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return true, nil case putToShardExists, putToShardRemoved: - res.skipped.Add(1) + res.objSkipped.Add(1) return true, nil default: continue diff --git a/pkg/local_object_storage/engine/evacuate_limiter.go b/pkg/local_object_storage/engine/evacuate_limiter.go index 83acd30d..63960e23 100644 --- a/pkg/local_object_storage/engine/evacuate_limiter.go +++ b/pkg/local_object_storage/engine/evacuate_limiter.go @@ -34,32 +34,53 @@ func (s *EvacuationState) ShardIDs() []string { return s.shardIDs } -func (s *EvacuationState) Evacuated() uint64 { +func (s *EvacuationState) ObjectsEvacuated() uint64 { if s == nil { return 0 } - return s.result.Evacuated() + return s.result.ObjectsEvacuated() } -func (s *EvacuationState) Total() uint64 { +func (s *EvacuationState) ObjectsTotal() uint64 { if s == nil { return 0 } - return s.result.Total() + return s.result.ObjectsTotal() } -func (s *EvacuationState) Failed() uint64 { +func (s *EvacuationState) ObjectsFailed() uint64 { if s == nil { return 0 } - return s.result.Failed() + return s.result.ObjectsFailed() } -func (s *EvacuationState) Skipped() uint64 { +func (s *EvacuationState) ObjectsSkipped() uint64 { if s == nil { return 0 } - return s.result.Skipped() + return s.result.ObjectsSkipped() +} + +func (s *EvacuationState) TreesEvacuated() uint64 { + if s == nil { + return 0 + } + return s.result.TreesEvacuated() +} + +func (s *EvacuationState) TreesTotal() uint64 { + if s == nil { + return 0 + } + return s.result.TreesTotal() +} + +func (s *EvacuationState) TreesFailed() uint64 { + if s == nil { + return 0 + } + return s.result.TreesFailed() } func (s *EvacuationState) ProcessingStatus() EvacuateProcessState { diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 2dc4a177..cadcc293 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -107,14 +107,14 @@ func TestEvacuateShard(t *testing.T) { t.Run("must be read-only", func(t *testing.T) { res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, ErrMustBeReadOnly) - require.Equal(t, uint64(0), res.Evacuated()) + require.Equal(t, uint64(0), res.ObjectsEvacuated()) }) require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) - require.Equal(t, uint64(objPerShard), res.Evacuated()) + require.Equal(t, uint64(objPerShard), res.ObjectsEvacuated()) // We check that all objects are available both before and after shard removal. // First case is a real-world use-case. It ensures that an object can be put in presense @@ -125,7 +125,7 @@ func TestEvacuateShard(t *testing.T) { // Calling it again is OK, but all objects are already moved, so no new PUTs should be done. res, err = e.Evacuate(context.Background(), prm) require.NoError(t, err) - require.Equal(t, uint64(0), res.Evacuated()) + require.Equal(t, uint64(0), res.ObjectsEvacuated()) checkHasObjects(t) @@ -177,13 +177,13 @@ func TestEvacuateNetwork(t *testing.T) { res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errMustHaveTwoShards) - require.Equal(t, uint64(0), res.Evacuated()) + require.Equal(t, uint64(0), res.ObjectsEvacuated()) - prm.Handler = acceptOneOf(objects, 2) + prm.ObjectsHandler = acceptOneOf(objects, 2) res, err = e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) - require.Equal(t, uint64(2), res.Evacuated()) + require.Equal(t, uint64(2), res.ObjectsEvacuated()) }) t.Run("multiple shards, evacuate one", func(t *testing.T) { t.Parallel() @@ -197,18 +197,18 @@ func TestEvacuateNetwork(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[1:2] - prm.Handler = acceptOneOf(objects, 2) + prm.ObjectsHandler = acceptOneOf(objects, 2) res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) - require.Equal(t, uint64(2), res.Evacuated()) + require.Equal(t, uint64(2), res.ObjectsEvacuated()) t.Run("no errors", func(t *testing.T) { - prm.Handler = acceptOneOf(objects, 3) + prm.ObjectsHandler = acceptOneOf(objects, 3) res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) - require.Equal(t, uint64(3), res.Evacuated()) + require.Equal(t, uint64(3), res.ObjectsEvacuated()) }) }) t.Run("multiple shards, evacuate many", func(t *testing.T) { @@ -234,18 +234,18 @@ func TestEvacuateNetwork(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = evacuateIDs - prm.Handler = acceptOneOf(objects, totalCount-1) + prm.ObjectsHandler = acceptOneOf(objects, totalCount-1) res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) - require.Equal(t, totalCount-1, res.Evacuated()) + require.Equal(t, totalCount-1, res.ObjectsEvacuated()) t.Run("no errors", func(t *testing.T) { - prm.Handler = acceptOneOf(objects, totalCount) + prm.ObjectsHandler = acceptOneOf(objects, totalCount) res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) - require.Equal(t, totalCount, res.Evacuated()) + require.Equal(t, totalCount, res.ObjectsEvacuated()) }) }) } @@ -262,7 +262,7 @@ func TestEvacuateCancellation(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[1:2] - prm.Handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { select { case <-ctx.Done(): return ctx.Err() @@ -276,7 +276,7 @@ func TestEvacuateCancellation(t *testing.T) { res, err := e.Evacuate(ctx, prm) require.ErrorContains(t, err, "context canceled") - require.Equal(t, uint64(0), res.Evacuated()) + require.Equal(t, uint64(0), res.ObjectsEvacuated()) } func TestEvacuateSingleProcess(t *testing.T) { @@ -293,7 +293,7 @@ func TestEvacuateSingleProcess(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[1:2] - prm.Handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { select { case <-running: default: @@ -307,14 +307,14 @@ func TestEvacuateSingleProcess(t *testing.T) { eg.Go(func() error { res, err := e.Evacuate(egCtx, prm) require.NoError(t, err, "first evacuation failed") - require.Equal(t, uint64(3), res.Evacuated()) + require.Equal(t, uint64(3), res.ObjectsEvacuated()) return nil }) eg.Go(func() error { <-running res, err := e.Evacuate(egCtx, prm) require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed") - require.Equal(t, uint64(0), res.Evacuated()) + require.Equal(t, uint64(0), res.ObjectsEvacuated()) close(blocker) return nil }) @@ -335,7 +335,7 @@ func TestEvacuateAsync(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[1:2] - prm.Handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { select { case <-running: default: @@ -348,7 +348,7 @@ func TestEvacuateAsync(t *testing.T) { st, err := e.GetEvacuationState(context.Background()) require.NoError(t, err, "get init state failed") require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state") - require.Equal(t, uint64(0), st.Evacuated(), "invalid init count") + require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid init count") require.Nil(t, st.StartedAt(), "invalid init started at") require.Nil(t, st.FinishedAt(), "invalid init finished at") require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") @@ -358,7 +358,7 @@ func TestEvacuateAsync(t *testing.T) { eg.Go(func() error { res, err := e.Evacuate(egCtx, prm) require.NoError(t, err, "first evacuation failed") - require.Equal(t, uint64(3), res.Evacuated()) + require.Equal(t, uint64(3), res.ObjectsEvacuated()) return nil }) @@ -367,7 +367,7 @@ func TestEvacuateAsync(t *testing.T) { st, err = e.GetEvacuationState(context.Background()) require.NoError(t, err, "get running state failed") require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state") - require.Equal(t, uint64(0), st.Evacuated(), "invalid running count") + require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid running count") require.NotNil(t, st.StartedAt(), "invalid running started at") require.Nil(t, st.FinishedAt(), "invalid init finished at") expectedShardIDs := make([]string, 0, 2) @@ -385,7 +385,7 @@ func TestEvacuateAsync(t *testing.T) { }, 3*time.Second, 10*time.Millisecond, "invalid final state") require.NoError(t, err, "get final state failed") - require.Equal(t, uint64(3), st.Evacuated(), "invalid final count") + require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count") require.NotNil(t, st.StartedAt(), "invalid final started at") require.NotNil(t, st.FinishedAt(), "invalid final finished at") require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids") diff --git a/pkg/services/control/server/convert.go b/pkg/services/control/server/convert.go index f922d727..4f313235 100644 --- a/pkg/services/control/server/convert.go +++ b/pkg/services/control/server/convert.go @@ -47,15 +47,18 @@ func stateToResponse(state *engine.EvacuationState) (*control.GetShardEvacuation } return &control.GetShardEvacuationStatusResponse{ Body: &control.GetShardEvacuationStatusResponse_Body{ - Shard_ID: shardIDs, - Evacuated: state.Evacuated(), - Total: state.Total(), - Failed: state.Failed(), - Status: evacStatus, - StartedAt: startedAt, - Duration: duration, - ErrorMessage: state.ErrorMessage(), - Skipped: state.Skipped(), + Shard_ID: shardIDs, + EvacuatedObjects: state.ObjectsEvacuated(), + TotalObjects: state.ObjectsTotal(), + FailedObjects: state.ObjectsFailed(), + Status: evacStatus, + StartedAt: startedAt, + Duration: duration, + ErrorMessage: state.ErrorMessage(), + SkippedObjects: state.ObjectsSkipped(), + TotalTrees: state.TreesTotal(), + EvacuatedTrees: state.TreesEvacuated(), + FailedTrees: state.TreesFailed(), }, }, nil } diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index 6cba72d7..ac8b3d54 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -26,10 +26,10 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe } prm := engine.EvacuateShardPrm{ - ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), - IgnoreErrors: req.GetBody().GetIgnoreErrors(), - Handler: s.replicate, - Scope: engine.EvacuateScopeObjects, + ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), + IgnoreErrors: req.GetBody().GetIgnoreErrors(), + ObjectsHandler: s.replicate, + Scope: engine.EvacuateScopeObjects, } res, err := s.s.Evacuate(ctx, prm) @@ -39,7 +39,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe resp := &control.EvacuateShardResponse{ Body: &control.EvacuateShardResponse_Body{ - Count: uint32(res.Evacuated()), + Count: uint32(res.ObjectsEvacuated()), }, } diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index 0b285127..91f0731c 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -22,11 +22,11 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha } prm := engine.EvacuateShardPrm{ - ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), - IgnoreErrors: req.GetBody().GetIgnoreErrors(), - Handler: s.replicate, - Async: true, - Scope: engine.EvacuateScope(req.GetBody().GetScope()), + ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), + IgnoreErrors: req.GetBody().GetIgnoreErrors(), + ObjectsHandler: s.replicate, + Async: true, + Scope: engine.EvacuateScope(req.GetBody().GetScope()), } _, err = s.s.Evacuate(ctx, prm) diff --git a/pkg/services/control/service.pb.go b/pkg/services/control/service.pb.go index 9c1a8741..4e95ac61 100644 Binary files a/pkg/services/control/service.pb.go and b/pkg/services/control/service.pb.go differ diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 73e8ca90..16e0f707 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -394,11 +394,11 @@ message GetShardEvacuationStatusResponse { } // Total objects to evacuate count. The value is approximate, so evacuated + failed + skipped == total is not guaranteed after completion. - uint64 total = 1; + uint64 total_objects = 1; // Evacuated objects count. - uint64 evacuated = 2; + uint64 evacuated_objects = 2; // Failed objects count. - uint64 failed = 3; + uint64 failed_objects = 3; // Shard IDs. repeated bytes shard_ID = 4; @@ -412,7 +412,14 @@ message GetShardEvacuationStatusResponse { string error_message = 8; // Skipped objects count. - uint64 skipped = 9; + uint64 skipped_objects = 9; + + // Total trees to evacuate count. + uint64 total_trees = 10; + // Evacuated trees count. + uint64 evacuated_trees = 11; + // Failed trees count. + uint64 failed_trees = 12; } Body body = 1; diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index b18d30db..37527aa8 100644 Binary files a/pkg/services/control/service_frostfs.pb.go and b/pkg/services/control/service_frostfs.pb.go differ