From 79e839aa632d651b09872521c2b9c389d1a25db7 Mon Sep 17 00:00:00 2001
From: Dmitrii Stepanov <d.stepanov@yadro.com>
Date: Thu, 4 May 2023 13:58:26 +0300
Subject: [PATCH] [#329] node: Make evacuate async

Now it's possible to run evacuate shard in async.
Also only one evacuate process can be in progress.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
---
 internal/logs/logs.go                         |   4 +
 pkg/local_object_storage/engine/engine.go     |   4 +
 pkg/local_object_storage/engine/evacuate.go   | 194 ++++++++++++++++--
 .../engine/evacuate_limiter.go                | 178 ++++++++++++++++
 .../engine/evacuate_test.go                   | 139 +++++++++++--
 pkg/local_object_storage/shard/count.go       |  31 +++
 pkg/services/control/server/convert.go        |  60 ++++++
 pkg/services/control/server/evacuate.go       |   2 +-
 pkg/services/control/server/evacuate_async.go |  91 +++++++-
 pkg/services/control/service.proto            |   1 +
 pkg/services/control/service_grpc.pb.go       | Bin 23469 -> 23641 bytes
 11 files changed, 667 insertions(+), 37 deletions(-)
 create mode 100644 pkg/local_object_storage/engine/evacuate_limiter.go
 create mode 100644 pkg/local_object_storage/shard/count.go
 create mode 100644 pkg/services/control/server/convert.go

diff --git a/internal/logs/logs.go b/internal/logs/logs.go
index 742f6a8f7..b84746a72 100644
--- a/internal/logs/logs.go
+++ b/internal/logs/logs.go
@@ -484,5 +484,9 @@ const (
 	ShardGCCollectingExpiredLocksCompleted                                  = "collecting expired locks completed"
 	ShardGCRemoveGarbageStarted                                             = "garbage remove started"
 	ShardGCRemoveGarbageCompleted                                           = "garbage remove completed"
+	EngineShardsEvacuationFailedToCount                                     = "failed to get total objects count to evacuate"
+	EngineShardsEvacuationFailedToListObjects                               = "failed to list objects to evacuate"
+	EngineShardsEvacuationFailedToReadObject                                = "failed to read object to evacuate"
+	EngineShardsEvacuationFailedToMoveObject                                = "failed to evacuate object to other node"
 	ShardGCFailedToGetExpiredWithLinked                                     = "failed to get expired objects with linked"
 )
diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go
index 20c8a946b..b7be4756d 100644
--- a/pkg/local_object_storage/engine/engine.go
+++ b/pkg/local_object_storage/engine/engine.go
@@ -35,6 +35,7 @@ type StorageEngine struct {
 
 		err error
 	}
+	evacuateLimiter *evacuationLimiter
 }
 
 type shardWrapper struct {
@@ -230,6 +231,9 @@ func New(opts ...Option) *StorageEngine {
 		shardPools: make(map[string]util.WorkerPool),
 		closeCh:    make(chan struct{}),
 		setModeCh:  make(chan setModeRequest),
+		evacuateLimiter: &evacuationLimiter{
+			guard: &sync.RWMutex{},
+		},
 	}
 }
 
diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go
index 761ed24b9..4693b2618 100644
--- a/pkg/local_object_storage/engine/evacuate.go
+++ b/pkg/local_object_storage/engine/evacuate.go
@@ -5,6 +5,7 @@ import (
 	"errors"
 	"fmt"
 
+	"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
 	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
 	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
 	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@@ -14,6 +15,9 @@ import (
 	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
 	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
 	"git.frostfs.info/TrueCloudLab/hrw"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/trace"
+	"go.uber.org/atomic"
 	"go.uber.org/zap"
 )
 
@@ -24,11 +28,23 @@ type EvacuateShardPrm struct {
 	shardID      []*shard.ID
 	handler      func(context.Context, oid.Address, *objectSDK.Object) error
 	ignoreErrors bool
+	async        bool
 }
 
 // EvacuateShardRes represents result of the EvacuateShard operation.
 type EvacuateShardRes struct {
-	count int
+	evacuated *atomic.Uint64
+	total     *atomic.Uint64
+	failed    *atomic.Uint64
+}
+
+// NewEvacuateShardRes creates new EvacuateShardRes instance.
+func NewEvacuateShardRes() *EvacuateShardRes {
+	return &EvacuateShardRes{
+		evacuated: atomic.NewUint64(0),
+		total:     atomic.NewUint64(0),
+		failed:    atomic.NewUint64(0),
+	}
 }
 
 // WithShardIDList sets shard ID.
@@ -46,10 +62,46 @@ func (p *EvacuateShardPrm) WithFaultHandler(f func(context.Context, oid.Address,
 	p.handler = f
 }
 
-// Count returns amount of evacuated objects.
+// WithAsync sets flag to run evacuate async.
+func (p *EvacuateShardPrm) WithAsync(async bool) {
+	p.async = async
+}
+
+// Evacuated returns amount of evacuated objects.
 // Objects for which handler returned no error are also assumed evacuated.
-func (p EvacuateShardRes) Count() int {
-	return p.count
+func (p *EvacuateShardRes) Evacuated() uint64 {
+	if p == nil {
+		return 0
+	}
+	return p.evacuated.Load()
+}
+
+// Total returns total count objects to evacuate.
+func (p *EvacuateShardRes) Total() uint64 {
+	if p == nil {
+		return 0
+	}
+	return p.total.Load()
+}
+
+// Failed returns count of failed objects to evacuate.
+func (p *EvacuateShardRes) Failed() uint64 {
+	if p == nil {
+		return 0
+	}
+	return p.failed.Load()
+}
+
+// DeepCopy returns deep copy of result instance.
+func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
+	if p == nil {
+		return nil
+	}
+	return &EvacuateShardRes{
+		evacuated: atomic.NewUint64(p.evacuated.Load()),
+		total:     atomic.NewUint64(p.total.Load()),
+		failed:    atomic.NewUint64(p.failed.Load()),
+	}
 }
 
 const defaultEvacuateBatchSize = 100
@@ -63,15 +115,29 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
 
 // Evacuate moves data from one shard to the others.
 // The shard being moved must be in read-only mode.
-func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (EvacuateShardRes, error) {
+func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) {
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	default:
+	}
+
 	shardIDs := make([]string, len(prm.shardID))
 	for i := range prm.shardID {
 		shardIDs[i] = prm.shardID[i].String()
 	}
 
+	ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate",
+		trace.WithAttributes(
+			attribute.StringSlice("shardIDs", shardIDs),
+			attribute.Bool("async", prm.async),
+			attribute.Bool("ignoreErrors", prm.ignoreErrors),
+		))
+	defer span.End()
+
 	shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil)
 	if err != nil {
-		return EvacuateShardRes{}, err
+		return nil, err
 	}
 
 	shardsToEvacuate := make(map[string]*shard.Shard)
@@ -83,23 +149,91 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (Eva
 		}
 	}
 
+	res := NewEvacuateShardRes()
+	ctx = ctxOrBackground(ctx, prm.async)
+	eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
+
+	if err != nil {
+		return nil, err
+	}
+
+	eg.Go(func() error {
+		return e.evacuateShards(egCtx, shardIDs, prm, res, shards, weights, shardsToEvacuate)
+	})
+
+	if prm.async {
+		return nil, nil
+	}
+
+	return res, eg.Wait()
+}
+
+func ctxOrBackground(ctx context.Context, background bool) context.Context {
+	if background {
+		return context.Background()
+	}
+	return ctx
+}
+
+func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
+	shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
+	var err error
+	ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
+		trace.WithAttributes(
+			attribute.StringSlice("shardIDs", shardIDs),
+			attribute.Bool("async", prm.async),
+			attribute.Bool("ignoreErrors", prm.ignoreErrors),
+		))
+
+	defer func() {
+		span.End()
+		e.evacuateLimiter.Complete(err)
+	}()
+
 	e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs))
 
-	var res EvacuateShardRes
+	err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res)
+	if err != nil {
+		e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err))
+		return err
+	}
 
 	for _, shardID := range shardIDs {
-		if err = e.evacuateShard(ctx, shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil {
+		if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
 			e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs))
-			return res, err
+			return err
 		}
 	}
 
 	e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs))
-	return res, nil
+	return nil
+}
+
+func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
+	ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotalObjectsCount")
+	defer span.End()
+
+	for _, sh := range shardsToEvacuate {
+		cnt, err := sh.LogicalObjectsCount(ctx)
+		if err != nil {
+			if errors.Is(err, shard.ErrDegradedMode) {
+				continue
+			}
+			return err
+		}
+		res.total.Add(cnt)
+	}
+	return nil
 }
 
 func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
 	shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
+	ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
+		trace.WithAttributes(
+			attribute.String("shardID", shardID),
+		))
+	defer span.End()
+
 	var listPrm shard.ListWithCursorPrm
 	listPrm.WithCount(defaultEvacuateBatchSize)
 
@@ -116,6 +250,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
 			if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
 				break
 			}
+			e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err))
 			return err
 		}
 
@@ -168,6 +303,12 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool)
 
 func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
 	shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
+	ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
+		trace.WithAttributes(
+			attribute.Int("objects_count", len(toEvacuate)),
+		))
+	defer span.End()
+
 	for i := range toEvacuate {
 		select {
 		case <-ctx.Done():
@@ -182,12 +323,14 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
 		getRes, err := sh.Get(ctx, getPrm)
 		if err != nil {
 			if prm.ignoreErrors {
+				res.failed.Inc()
 				continue
 			}
+			e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
 			return err
 		}
 
-		evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate)
+		evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, weights, shardsToEvacuate, res)
 		if err != nil {
 			return err
 		}
@@ -204,15 +347,16 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
 
 		err = prm.handler(ctx, addr, getRes.Object())
 		if err != nil {
+			e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
 			return err
 		}
-		res.count++
+		res.evacuated.Inc()
 	}
 	return nil
 }
 
-func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes,
-	shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) (bool, error) {
+func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
+	shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) (bool, error) {
 	hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
 	for j := range shards {
 		select {
@@ -227,11 +371,11 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
 		putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
 		if putDone || exists {
 			if putDone {
+				res.evacuated.Inc()
 				e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
 					zap.Stringer("from", sh.ID()),
 					zap.Stringer("to", shards[j].ID()),
 					zap.Stringer("addr", addr))
-				res.count++
 			}
 			return true, nil
 		}
@@ -239,3 +383,23 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
 
 	return false, nil
 }
+
+func (e *StorageEngine) GetEvacuationState(ctx context.Context) (*EvacuationState, error) {
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	default:
+	}
+
+	return e.evacuateLimiter.GetState(), nil
+}
+
+func (e *StorageEngine) EnqueRunningEvacuationStop(ctx context.Context) error {
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	default:
+	}
+
+	return e.evacuateLimiter.CancelIfRunning()
+}
diff --git a/pkg/local_object_storage/engine/evacuate_limiter.go b/pkg/local_object_storage/engine/evacuate_limiter.go
new file mode 100644
index 000000000..425fdc775
--- /dev/null
+++ b/pkg/local_object_storage/engine/evacuate_limiter.go
@@ -0,0 +1,178 @@
+package engine
+
+import (
+	"context"
+	"fmt"
+	"sync"
+	"time"
+
+	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
+	"golang.org/x/sync/errgroup"
+)
+
+type EvacuateProcessState int
+
+const (
+	EvacuateProcessStateUndefined EvacuateProcessState = iota
+	EvacuateProcessStateRunning
+	EvacuateProcessStateCompleted
+)
+
+type EvacuationState struct {
+	shardIDs     []string
+	processState EvacuateProcessState
+	startedAt    time.Time
+	finishedAt   time.Time
+	result       *EvacuateShardRes
+	errMessage   string
+}
+
+func (s *EvacuationState) ShardIDs() []string {
+	if s == nil {
+		return nil
+	}
+	return s.shardIDs
+}
+
+func (s *EvacuationState) Evacuated() uint64 {
+	if s == nil {
+		return 0
+	}
+	return s.result.Evacuated()
+}
+
+func (s *EvacuationState) Total() uint64 {
+	if s == nil {
+		return 0
+	}
+	return s.result.Total()
+}
+
+func (s *EvacuationState) Failed() uint64 {
+	if s == nil {
+		return 0
+	}
+	return s.result.Failed()
+}
+
+func (s *EvacuationState) ProcessingStatus() EvacuateProcessState {
+	if s == nil {
+		return EvacuateProcessStateUndefined
+	}
+	return s.processState
+}
+
+func (s *EvacuationState) StartedAt() *time.Time {
+	if s == nil {
+		return nil
+	}
+	defaultTime := time.Time{}
+	if s.startedAt == defaultTime {
+		return nil
+	}
+	return &s.startedAt
+}
+
+func (s *EvacuationState) FinishedAt() *time.Time {
+	if s == nil {
+		return nil
+	}
+	defaultTime := time.Time{}
+	if s.finishedAt == defaultTime {
+		return nil
+	}
+	return &s.finishedAt
+}
+
+func (s *EvacuationState) ErrorMessage() string {
+	if s == nil {
+		return ""
+	}
+	return s.errMessage
+}
+
+func (s *EvacuationState) DeepCopy() *EvacuationState {
+	if s == nil {
+		return nil
+	}
+	shardIDs := make([]string, len(s.shardIDs))
+	copy(shardIDs, s.shardIDs)
+
+	return &EvacuationState{
+		shardIDs:     shardIDs,
+		processState: s.processState,
+		startedAt:    s.startedAt,
+		finishedAt:   s.finishedAt,
+		errMessage:   s.errMessage,
+		result:       s.result.DeepCopy(),
+	}
+}
+
+type evacuationLimiter struct {
+	state  EvacuationState
+	eg     *errgroup.Group
+	cancel context.CancelFunc
+
+	guard *sync.RWMutex
+}
+
+func (l *evacuationLimiter) TryStart(ctx context.Context, shardIDs []string, result *EvacuateShardRes) (*errgroup.Group, context.Context, error) {
+	l.guard.Lock()
+	defer l.guard.Unlock()
+
+	select {
+	case <-ctx.Done():
+		return nil, nil, ctx.Err()
+	default:
+	}
+
+	if l.state.processState == EvacuateProcessStateRunning {
+		return nil, nil, logicerr.New(fmt.Sprintf("evacuate is already running for shard ids %v", l.state.shardIDs))
+	}
+
+	var egCtx context.Context
+	egCtx, l.cancel = context.WithCancel(ctx)
+	l.eg, egCtx = errgroup.WithContext(egCtx)
+	l.state = EvacuationState{
+		shardIDs:     shardIDs,
+		processState: EvacuateProcessStateRunning,
+		startedAt:    time.Now().UTC(),
+		result:       result,
+	}
+
+	return l.eg, egCtx, nil
+}
+
+func (l *evacuationLimiter) Complete(err error) {
+	l.guard.Lock()
+	defer l.guard.Unlock()
+
+	errMsq := ""
+	if err != nil {
+		errMsq = err.Error()
+	}
+	l.state.processState = EvacuateProcessStateCompleted
+	l.state.errMessage = errMsq
+	l.state.finishedAt = time.Now().UTC()
+
+	l.eg = nil
+}
+
+func (l *evacuationLimiter) GetState() *EvacuationState {
+	l.guard.RLock()
+	defer l.guard.RUnlock()
+
+	return l.state.DeepCopy()
+}
+
+func (l *evacuationLimiter) CancelIfRunning() error {
+	l.guard.Lock()
+	defer l.guard.Unlock()
+
+	if l.state.processState != EvacuateProcessStateRunning {
+		return logicerr.New("there is no running evacuation task")
+	}
+
+	l.cancel()
+	return nil
+}
diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go
index bc5b05ef0..43737e7f7 100644
--- a/pkg/local_object_storage/engine/evacuate_test.go
+++ b/pkg/local_object_storage/engine/evacuate_test.go
@@ -7,6 +7,7 @@ import (
 	"path/filepath"
 	"strconv"
 	"testing"
+	"time"
 
 	objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
 	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@@ -21,6 +22,7 @@ import (
 	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
 	"github.com/stretchr/testify/require"
 	"go.uber.org/zap/zaptest"
+	"golang.org/x/sync/errgroup"
 )
 
 func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
@@ -103,14 +105,14 @@ func TestEvacuateShard(t *testing.T) {
 	t.Run("must be read-only", func(t *testing.T) {
 		res, err := e.Evacuate(context.Background(), prm)
 		require.ErrorIs(t, err, ErrMustBeReadOnly)
-		require.Equal(t, 0, res.Count())
+		require.Equal(t, uint64(0), res.Evacuated())
 	})
 
 	require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
 
 	res, err := e.Evacuate(context.Background(), prm)
 	require.NoError(t, err)
-	require.Equal(t, objPerShard, res.count)
+	require.Equal(t, uint64(objPerShard), res.Evacuated())
 
 	// We check that all objects are available both before and after shard removal.
 	// First case is a real-world use-case. It ensures that an object can be put in presense
@@ -121,7 +123,7 @@ func TestEvacuateShard(t *testing.T) {
 	// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
 	res, err = e.Evacuate(context.Background(), prm)
 	require.NoError(t, err)
-	require.Equal(t, 0, res.count)
+	require.Equal(t, uint64(0), res.Evacuated())
 
 	checkHasObjects(t)
 
@@ -138,8 +140,8 @@ func TestEvacuateNetwork(t *testing.T) {
 
 	var errReplication = errors.New("handler error")
 
-	acceptOneOf := func(objects []*objectSDK.Object, max int) func(context.Context, oid.Address, *objectSDK.Object) error {
-		var n int
+	acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) error {
+		var n uint64
 		return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error {
 			if n == max {
 				return errReplication
@@ -169,13 +171,13 @@ func TestEvacuateNetwork(t *testing.T) {
 
 		res, err := e.Evacuate(context.Background(), prm)
 		require.ErrorIs(t, err, errMustHaveTwoShards)
-		require.Equal(t, 0, res.Count())
+		require.Equal(t, uint64(0), res.Evacuated())
 
 		prm.handler = acceptOneOf(objects, 2)
 
 		res, err = e.Evacuate(context.Background(), prm)
 		require.ErrorIs(t, err, errReplication)
-		require.Equal(t, 2, res.Count())
+		require.Equal(t, uint64(2), res.Evacuated())
 	})
 	t.Run("multiple shards, evacuate one", func(t *testing.T) {
 		t.Parallel()
@@ -190,14 +192,14 @@ func TestEvacuateNetwork(t *testing.T) {
 
 		res, err := e.Evacuate(context.Background(), prm)
 		require.ErrorIs(t, err, errReplication)
-		require.Equal(t, 2, res.Count())
+		require.Equal(t, uint64(2), res.Evacuated())
 
 		t.Run("no errors", func(t *testing.T) {
 			prm.handler = acceptOneOf(objects, 3)
 
 			res, err := e.Evacuate(context.Background(), prm)
 			require.NoError(t, err)
-			require.Equal(t, 3, res.Count())
+			require.Equal(t, uint64(3), res.Evacuated())
 		})
 	})
 	t.Run("multiple shards, evacuate many", func(t *testing.T) {
@@ -205,12 +207,12 @@ func TestEvacuateNetwork(t *testing.T) {
 		e, ids, objects := newEngineEvacuate(t, 4, 5)
 		evacuateIDs := ids[0:3]
 
-		var totalCount int
+		var totalCount uint64
 		for i := range evacuateIDs {
 			res, err := e.shards[ids[i].String()].List()
 			require.NoError(t, err)
 
-			totalCount += len(res.AddressList())
+			totalCount += uint64(len(res.AddressList()))
 		}
 
 		for i := range ids {
@@ -223,14 +225,14 @@ func TestEvacuateNetwork(t *testing.T) {
 
 		res, err := e.Evacuate(context.Background(), prm)
 		require.ErrorIs(t, err, errReplication)
-		require.Equal(t, totalCount-1, res.Count())
+		require.Equal(t, totalCount-1, res.Evacuated())
 
 		t.Run("no errors", func(t *testing.T) {
 			prm.handler = acceptOneOf(objects, totalCount)
 
 			res, err := e.Evacuate(context.Background(), prm)
 			require.NoError(t, err)
-			require.Equal(t, totalCount, res.Count())
+			require.Equal(t, totalCount, res.Evacuated())
 		})
 	})
 }
@@ -258,5 +260,114 @@ func TestEvacuateCancellation(t *testing.T) {
 
 	res, err := e.Evacuate(ctx, prm)
 	require.ErrorContains(t, err, "context canceled")
-	require.Equal(t, 0, res.Count())
+	require.Equal(t, uint64(0), res.Evacuated())
+}
+
+func TestEvacuateSingleProcess(t *testing.T) {
+	e, ids, _ := newEngineEvacuate(t, 2, 3)
+
+	require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
+	require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
+
+	blocker := make(chan interface{})
+	running := make(chan interface{})
+
+	var prm EvacuateShardPrm
+	prm.shardID = ids[1:2]
+	prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
+		select {
+		case <-running:
+		default:
+			close(running)
+		}
+		<-blocker
+		return nil
+	}
+
+	eg, egCtx := errgroup.WithContext(context.Background())
+	eg.Go(func() error {
+		res, err := e.Evacuate(egCtx, prm)
+		require.NoError(t, err, "first evacuation failed")
+		require.Equal(t, uint64(3), res.Evacuated())
+		return nil
+	})
+	eg.Go(func() error {
+		<-running
+		res, err := e.Evacuate(egCtx, prm)
+		require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
+		require.Equal(t, uint64(0), res.Evacuated())
+		close(blocker)
+		return nil
+	})
+	require.NoError(t, eg.Wait())
+}
+
+func TestEvacuateAsync(t *testing.T) {
+	e, ids, _ := newEngineEvacuate(t, 2, 3)
+
+	require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
+	require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
+
+	blocker := make(chan interface{})
+	running := make(chan interface{})
+
+	var prm EvacuateShardPrm
+	prm.shardID = ids[1:2]
+	prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
+		select {
+		case <-running:
+		default:
+			close(running)
+		}
+		<-blocker
+		return nil
+	}
+
+	st, err := e.GetEvacuationState(context.Background())
+	require.NoError(t, err, "get init state failed")
+	require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
+	require.Equal(t, uint64(0), st.Evacuated(), "invalid init count")
+	require.Nil(t, st.StartedAt(), "invalid init started at")
+	require.Nil(t, st.FinishedAt(), "invalid init finished at")
+	require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
+	require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
+
+	eg, egCtx := errgroup.WithContext(context.Background())
+	eg.Go(func() error {
+		res, err := e.Evacuate(egCtx, prm)
+		require.NoError(t, err, "first evacuation failed")
+		require.Equal(t, uint64(3), res.Evacuated())
+		return nil
+	})
+
+	<-running
+
+	st, err = e.GetEvacuationState(context.Background())
+	require.NoError(t, err, "get running state failed")
+	require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state")
+	require.Equal(t, uint64(0), st.Evacuated(), "invalid running count")
+	require.NotNil(t, st.StartedAt(), "invalid running started at")
+	require.Nil(t, st.FinishedAt(), "invalid init finished at")
+	expectedShardIDs := make([]string, 0, 2)
+	for _, id := range ids[1:2] {
+		expectedShardIDs = append(expectedShardIDs, id.String())
+	}
+	require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid running shard ids")
+	require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
+
+	close(blocker)
+
+	require.Eventually(t, func() bool {
+		st, err = e.GetEvacuationState(context.Background())
+		return st.ProcessingStatus() == EvacuateProcessStateCompleted
+	}, 3*time.Second, 10*time.Millisecond, "invalid final state")
+
+	require.NoError(t, err, "get final state failed")
+	require.Equal(t, uint64(3), st.Evacuated(), "invalid final count")
+	require.NotNil(t, st.StartedAt(), "invalid final started at")
+	require.NotNil(t, st.FinishedAt(), "invalid final finished at")
+	require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
+	require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
+
+	require.NoError(t, eg.Wait())
 }
diff --git a/pkg/local_object_storage/shard/count.go b/pkg/local_object_storage/shard/count.go
new file mode 100644
index 000000000..b68c2f43e
--- /dev/null
+++ b/pkg/local_object_storage/shard/count.go
@@ -0,0 +1,31 @@
+package shard
+
+import (
+	"context"
+
+	"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/trace"
+)
+
+// LogicalObjectsCount returns logical objects count.
+func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) {
+	_, span := tracing.StartSpanFromContext(ctx, "Shard.LogicalObjectsCount",
+		trace.WithAttributes(
+			attribute.String("shard_id", s.ID().String()),
+		))
+	defer span.End()
+
+	s.m.RLock()
+	defer s.m.RUnlock()
+
+	if s.GetMode().NoMetabase() {
+		return 0, ErrDegradedMode
+	}
+
+	cc, err := s.metaBase.ObjectCounters()
+	if err != nil {
+		return 0, err
+	}
+	return cc.Logic(), nil
+}
diff --git a/pkg/services/control/server/convert.go b/pkg/services/control/server/convert.go
new file mode 100644
index 000000000..1d29ed406
--- /dev/null
+++ b/pkg/services/control/server/convert.go
@@ -0,0 +1,60 @@
+package control
+
+import (
+	"fmt"
+	"time"
+
+	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
+	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
+	"github.com/mr-tron/base58"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+func stateToResponse(state *engine.EvacuationState) (*control.GetShardEvacuationStatusResponse, error) {
+	shardIDs := make([][]byte, 0, len(state.ShardIDs()))
+	for _, shID := range state.ShardIDs() {
+		id, err := base58.Decode(shID)
+		if err != nil {
+			return nil, status.Error(codes.Internal, fmt.Sprintf("invalid shard id format: %s", shID))
+		}
+		shardIDs = append(shardIDs, id)
+	}
+	var evacStatus control.GetShardEvacuationStatusResponse_Body_Status
+	switch state.ProcessingStatus() {
+	case engine.EvacuateProcessStateRunning:
+		evacStatus = control.GetShardEvacuationStatusResponse_Body_RUNNING
+	case engine.EvacuateProcessStateCompleted:
+		evacStatus = control.GetShardEvacuationStatusResponse_Body_COMPLETED
+	default:
+		evacStatus = control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED
+	}
+	var startedAt *control.GetShardEvacuationStatusResponse_Body_UnixTimestamp
+	if state.StartedAt() != nil {
+		startedAt = &control.GetShardEvacuationStatusResponse_Body_UnixTimestamp{
+			Value: state.StartedAt().Unix(),
+		}
+	}
+	var duration *control.GetShardEvacuationStatusResponse_Body_Duration
+	if state.StartedAt() != nil {
+		end := time.Now().UTC()
+		if state.FinishedAt() != nil {
+			end = *state.FinishedAt()
+		}
+		duration = &control.GetShardEvacuationStatusResponse_Body_Duration{
+			Seconds: int64(end.Sub(*state.StartedAt()).Seconds()),
+		}
+	}
+	return &control.GetShardEvacuationStatusResponse{
+		Body: &control.GetShardEvacuationStatusResponse_Body{
+			Shard_ID:     shardIDs,
+			Evacuated:    state.Evacuated(),
+			Total:        state.Total(),
+			Failed:       state.Failed(),
+			Status:       evacStatus,
+			StartedAt:    startedAt,
+			Duration:     duration,
+			ErrorMessage: state.ErrorMessage(),
+		},
+	}, nil
+}
diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go
index afa4011b9..fc6dd3f60 100644
--- a/pkg/services/control/server/evacuate.go
+++ b/pkg/services/control/server/evacuate.go
@@ -37,7 +37,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
 
 	resp := &control.EvacuateShardResponse{
 		Body: &control.EvacuateShardResponse_Body{
-			Count: uint32(res.Count()),
+			Count: uint32(res.Evacuated()),
 		},
 	}
 
diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go
index 94ddc73d1..cdf3656e2 100644
--- a/pkg/services/control/server/evacuate_async.go
+++ b/pkg/services/control/server/evacuate_async.go
@@ -2,19 +2,96 @@ package control
 
 import (
 	"context"
-	"fmt"
+	"errors"
 
+	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
+	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
 	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
-func (s *Server) StartShardEvacuation(context.Context, *control.StartShardEvacuationRequest) (*control.StartShardEvacuationResponse, error) {
-	return nil, fmt.Errorf("not implemented")
+func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartShardEvacuationRequest) (*control.StartShardEvacuationResponse, error) {
+	err := s.isValidRequest(req)
+	if err != nil {
+		return nil, status.Error(codes.PermissionDenied, err.Error())
+	}
+
+	var prm engine.EvacuateShardPrm
+	prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID()))
+	prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
+	prm.WithFaultHandler(s.replicate)
+	prm.WithAsync(true)
+
+	_, err = s.s.Evacuate(ctx, prm)
+	if err != nil {
+		var logicalErr logicerr.Logical
+		if errors.As(err, &logicalErr) {
+			return nil, status.Error(codes.Aborted, err.Error())
+		}
+		return nil, status.Error(codes.Internal, err.Error())
+	}
+
+	resp := &control.StartShardEvacuationResponse{
+		Body: &control.StartShardEvacuationResponse_Body{},
+	}
+
+	err = SignMessage(s.key, resp)
+	if err != nil {
+		return nil, status.Error(codes.Internal, err.Error())
+	}
+	return resp, nil
 }
 
-func (s *Server) GetShardEvacuationStatus(context.Context, *control.GetShardEvacuationStatusRequest) (*control.GetShardEvacuationStatusResponse, error) {
-	return nil, fmt.Errorf("not implemented")
+func (s *Server) GetShardEvacuationStatus(ctx context.Context, req *control.GetShardEvacuationStatusRequest) (*control.GetShardEvacuationStatusResponse, error) {
+	err := s.isValidRequest(req)
+	if err != nil {
+		return nil, status.Error(codes.PermissionDenied, err.Error())
+	}
+
+	state, err := s.s.GetEvacuationState(ctx)
+	if err != nil {
+		var logicalErr logicerr.Logical
+		if errors.As(err, &logicalErr) {
+			return nil, status.Error(codes.Aborted, err.Error())
+		}
+		return nil, status.Error(codes.Internal, err.Error())
+	}
+
+	resp, err := stateToResponse(state)
+	if err != nil {
+		return nil, err
+	}
+
+	err = SignMessage(s.key, resp)
+	if err != nil {
+		return nil, status.Error(codes.Internal, err.Error())
+	}
+	return resp, nil
 }
 
-func (s *Server) StopShardEvacuation(context.Context, *control.StopShardEvacuationRequest) (*control.StopShardEvacuationResponse, error) {
-	return nil, fmt.Errorf("not implemented")
+func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShardEvacuationRequest) (*control.StopShardEvacuationResponse, error) {
+	err := s.isValidRequest(req)
+	if err != nil {
+		return nil, status.Error(codes.PermissionDenied, err.Error())
+	}
+
+	err = s.s.EnqueRunningEvacuationStop(ctx)
+	if err != nil {
+		var logicalErr logicerr.Logical
+		if errors.As(err, &logicalErr) {
+			return nil, status.Error(codes.Aborted, err.Error())
+		}
+		return nil, status.Error(codes.Internal, err.Error())
+	}
+
+	resp := &control.StopShardEvacuationResponse{
+		Body: &control.StopShardEvacuationResponse_Body{},
+	}
+
+	err = SignMessage(s.key, resp)
+	if err != nil {
+		return nil, status.Error(codes.Internal, err.Error())
+	}
+	return resp, nil
 }
diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto
index ca35ae043..a80deb2da 100644
--- a/pkg/services/control/service.proto
+++ b/pkg/services/control/service.proto
@@ -27,6 +27,7 @@ service ControlService {
     rpc SynchronizeTree (SynchronizeTreeRequest) returns (SynchronizeTreeResponse);
 
     // EvacuateShard moves all data from one shard to the others.
+    // Deprecated: Use StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
     rpc EvacuateShard (EvacuateShardRequest) returns (EvacuateShardResponse);
 
     // StartShardEvacuation starts moving all data from one shard to the others.
diff --git a/pkg/services/control/service_grpc.pb.go b/pkg/services/control/service_grpc.pb.go
index 96642153363125f6062c7ca65bf27631ad2f980b..8afc6086aabe13e9d7226973518d980aac746c5e 100644
GIT binary patch
delta 89
zcmZ3xo$=-l#tm9LLi+j&E~y1YsmX~YsVP<pp~b0_3-v`A^(QOxNdU=%tiqc$cqAo>
KQD?83X9fW2dm5eq

delta 19
bcmcb)gK_P4#tm9Ln;Uq<B{#oS%`pQ2Q}+k7