From e4889e06ba07a32241c4eb4cd723f8aec189eb23 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 4 May 2023 13:58:26 +0300 Subject: [PATCH] [#329] node: Make evacuate async Now it's possible to run evacuate shard in async. Also only one evacuate process can be in progress. Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 4 + pkg/local_object_storage/engine/engine.go | 4 + pkg/local_object_storage/engine/evacuate.go | 194 ++++++++++++++++-- .../engine/evacuate_limiter.go | 178 ++++++++++++++++ .../engine/evacuate_test.go | 139 +++++++++++-- pkg/local_object_storage/shard/count.go | 31 +++ pkg/services/control/server/convert.go | 60 ++++++ pkg/services/control/server/evacuate.go | 2 +- pkg/services/control/server/evacuate_async.go | 91 +++++++- pkg/services/control/service.proto | 1 + pkg/services/control/service_grpc.pb.go | Bin 23469 -> 23641 bytes 11 files changed, 667 insertions(+), 37 deletions(-) create mode 100644 pkg/local_object_storage/engine/evacuate_limiter.go create mode 100644 pkg/local_object_storage/shard/count.go create mode 100644 pkg/services/control/server/convert.go diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 742f6a8f..b84746a7 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -484,5 +484,9 @@ const ( ShardGCCollectingExpiredLocksCompleted = "collecting expired locks completed" ShardGCRemoveGarbageStarted = "garbage remove started" ShardGCRemoveGarbageCompleted = "garbage remove completed" + EngineShardsEvacuationFailedToCount = "failed to get total objects count to evacuate" + EngineShardsEvacuationFailedToListObjects = "failed to list objects to evacuate" + EngineShardsEvacuationFailedToReadObject = "failed to read object to evacuate" + EngineShardsEvacuationFailedToMoveObject = "failed to evacuate object to other node" ShardGCFailedToGetExpiredWithLinked = "failed to get expired objects with linked" ) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 20c8a946..b7be4756 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -35,6 +35,7 @@ type StorageEngine struct { err error } + evacuateLimiter *evacuationLimiter } type shardWrapper struct { @@ -230,6 +231,9 @@ func New(opts ...Option) *StorageEngine { shardPools: make(map[string]util.WorkerPool), closeCh: make(chan struct{}), setModeCh: make(chan setModeRequest), + evacuateLimiter: &evacuationLimiter{ + guard: &sync.RWMutex{}, + }, } } diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 761ed24b..4693b261 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" @@ -14,6 +15,9 @@ import ( objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/hrw" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -24,11 +28,23 @@ type EvacuateShardPrm struct { shardID []*shard.ID handler func(context.Context, oid.Address, *objectSDK.Object) error ignoreErrors bool + async bool } // EvacuateShardRes represents result of the EvacuateShard operation. type EvacuateShardRes struct { - count int + evacuated *atomic.Uint64 + total *atomic.Uint64 + failed *atomic.Uint64 +} + +// NewEvacuateShardRes creates new EvacuateShardRes instance. +func NewEvacuateShardRes() *EvacuateShardRes { + return &EvacuateShardRes{ + evacuated: atomic.NewUint64(0), + total: atomic.NewUint64(0), + failed: atomic.NewUint64(0), + } } // WithShardIDList sets shard ID. @@ -46,10 +62,46 @@ func (p *EvacuateShardPrm) WithFaultHandler(f func(context.Context, oid.Address, p.handler = f } -// Count returns amount of evacuated objects. +// WithAsync sets flag to run evacuate async. +func (p *EvacuateShardPrm) WithAsync(async bool) { + p.async = async +} + +// Evacuated returns amount of evacuated objects. // Objects for which handler returned no error are also assumed evacuated. -func (p EvacuateShardRes) Count() int { - return p.count +func (p *EvacuateShardRes) Evacuated() uint64 { + if p == nil { + return 0 + } + return p.evacuated.Load() +} + +// Total returns total count objects to evacuate. +func (p *EvacuateShardRes) Total() uint64 { + if p == nil { + return 0 + } + return p.total.Load() +} + +// Failed returns count of failed objects to evacuate. +func (p *EvacuateShardRes) Failed() uint64 { + if p == nil { + return 0 + } + return p.failed.Load() +} + +// DeepCopy returns deep copy of result instance. +func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes { + if p == nil { + return nil + } + return &EvacuateShardRes{ + evacuated: atomic.NewUint64(p.evacuated.Load()), + total: atomic.NewUint64(p.total.Load()), + failed: atomic.NewUint64(p.failed.Load()), + } } const defaultEvacuateBatchSize = 100 @@ -63,15 +115,29 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // Evacuate moves data from one shard to the others. // The shard being moved must be in read-only mode. -func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (EvacuateShardRes, error) { +func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + shardIDs := make([]string, len(prm.shardID)) for i := range prm.shardID { shardIDs[i] = prm.shardID[i].String() } + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate", + trace.WithAttributes( + attribute.StringSlice("shardIDs", shardIDs), + attribute.Bool("async", prm.async), + attribute.Bool("ignoreErrors", prm.ignoreErrors), + )) + defer span.End() + shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil) if err != nil { - return EvacuateShardRes{}, err + return nil, err } shardsToEvacuate := make(map[string]*shard.Shard) @@ -83,23 +149,91 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (Eva } } + res := NewEvacuateShardRes() + ctx = ctxOrBackground(ctx, prm.async) + eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res) + + if err != nil { + return nil, err + } + + eg.Go(func() error { + return e.evacuateShards(egCtx, shardIDs, prm, res, shards, weights, shardsToEvacuate) + }) + + if prm.async { + return nil, nil + } + + return res, eg.Wait() +} + +func ctxOrBackground(ctx context.Context, background bool) context.Context { + if background { + return context.Background() + } + return ctx +} + +func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, + shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error { + var err error + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", + trace.WithAttributes( + attribute.StringSlice("shardIDs", shardIDs), + attribute.Bool("async", prm.async), + attribute.Bool("ignoreErrors", prm.ignoreErrors), + )) + + defer func() { + span.End() + e.evacuateLimiter.Complete(err) + }() + e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs)) - var res EvacuateShardRes + err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res) + if err != nil { + e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err)) + return err + } for _, shardID := range shardIDs { - if err = e.evacuateShard(ctx, shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil { + if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil { e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs)) - return res, err + return err } } e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs)) - return res, nil + return nil +} + +func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotalObjectsCount") + defer span.End() + + for _, sh := range shardsToEvacuate { + cnt, err := sh.LogicalObjectsCount(ctx) + if err != nil { + if errors.Is(err, shard.ErrDegradedMode) { + continue + } + return err + } + res.total.Add(cnt) + } + return nil } func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard", + trace.WithAttributes( + attribute.String("shardID", shardID), + )) + defer span.End() + var listPrm shard.ListWithCursorPrm listPrm.WithCount(defaultEvacuateBatchSize) @@ -116,6 +250,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) { break } + e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err)) return err } @@ -168,6 +303,12 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects", + trace.WithAttributes( + attribute.Int("objects_count", len(toEvacuate)), + )) + defer span.End() + for i := range toEvacuate { select { case <-ctx.Done(): @@ -182,12 +323,14 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to getRes, err := sh.Get(ctx, getPrm) if err != nil { if prm.ignoreErrors { + res.failed.Inc() continue } + e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err)) return err } - evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate) + evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, weights, shardsToEvacuate, res) if err != nil { return err } @@ -204,15 +347,16 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to err = prm.handler(ctx, addr, getRes.Object()) if err != nil { + e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err)) return err } - res.count++ + res.evacuated.Inc() } return nil } -func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes, - shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) (bool, error) { +func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, + shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) (bool, error) { hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString()))) for j := range shards { select { @@ -227,11 +371,11 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object) if putDone || exists { if putDone { + res.evacuated.Inc() e.log.Debug(logs.EngineObjectIsMovedToAnotherShard, zap.Stringer("from", sh.ID()), zap.Stringer("to", shards[j].ID()), zap.Stringer("addr", addr)) - res.count++ } return true, nil } @@ -239,3 +383,23 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add return false, nil } + +func (e *StorageEngine) GetEvacuationState(ctx context.Context) (*EvacuationState, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + return e.evacuateLimiter.GetState(), nil +} + +func (e *StorageEngine) EnqueRunningEvacuationStop(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + return e.evacuateLimiter.CancelIfRunning() +} diff --git a/pkg/local_object_storage/engine/evacuate_limiter.go b/pkg/local_object_storage/engine/evacuate_limiter.go new file mode 100644 index 00000000..425fdc77 --- /dev/null +++ b/pkg/local_object_storage/engine/evacuate_limiter.go @@ -0,0 +1,178 @@ +package engine + +import ( + "context" + "fmt" + "sync" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "golang.org/x/sync/errgroup" +) + +type EvacuateProcessState int + +const ( + EvacuateProcessStateUndefined EvacuateProcessState = iota + EvacuateProcessStateRunning + EvacuateProcessStateCompleted +) + +type EvacuationState struct { + shardIDs []string + processState EvacuateProcessState + startedAt time.Time + finishedAt time.Time + result *EvacuateShardRes + errMessage string +} + +func (s *EvacuationState) ShardIDs() []string { + if s == nil { + return nil + } + return s.shardIDs +} + +func (s *EvacuationState) Evacuated() uint64 { + if s == nil { + return 0 + } + return s.result.Evacuated() +} + +func (s *EvacuationState) Total() uint64 { + if s == nil { + return 0 + } + return s.result.Total() +} + +func (s *EvacuationState) Failed() uint64 { + if s == nil { + return 0 + } + return s.result.Failed() +} + +func (s *EvacuationState) ProcessingStatus() EvacuateProcessState { + if s == nil { + return EvacuateProcessStateUndefined + } + return s.processState +} + +func (s *EvacuationState) StartedAt() *time.Time { + if s == nil { + return nil + } + defaultTime := time.Time{} + if s.startedAt == defaultTime { + return nil + } + return &s.startedAt +} + +func (s *EvacuationState) FinishedAt() *time.Time { + if s == nil { + return nil + } + defaultTime := time.Time{} + if s.finishedAt == defaultTime { + return nil + } + return &s.finishedAt +} + +func (s *EvacuationState) ErrorMessage() string { + if s == nil { + return "" + } + return s.errMessage +} + +func (s *EvacuationState) DeepCopy() *EvacuationState { + if s == nil { + return nil + } + shardIDs := make([]string, len(s.shardIDs)) + copy(shardIDs, s.shardIDs) + + return &EvacuationState{ + shardIDs: shardIDs, + processState: s.processState, + startedAt: s.startedAt, + finishedAt: s.finishedAt, + errMessage: s.errMessage, + result: s.result.DeepCopy(), + } +} + +type evacuationLimiter struct { + state EvacuationState + eg *errgroup.Group + cancel context.CancelFunc + + guard *sync.RWMutex +} + +func (l *evacuationLimiter) TryStart(ctx context.Context, shardIDs []string, result *EvacuateShardRes) (*errgroup.Group, context.Context, error) { + l.guard.Lock() + defer l.guard.Unlock() + + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + default: + } + + if l.state.processState == EvacuateProcessStateRunning { + return nil, nil, logicerr.New(fmt.Sprintf("evacuate is already running for shard ids %v", l.state.shardIDs)) + } + + var egCtx context.Context + egCtx, l.cancel = context.WithCancel(ctx) + l.eg, egCtx = errgroup.WithContext(egCtx) + l.state = EvacuationState{ + shardIDs: shardIDs, + processState: EvacuateProcessStateRunning, + startedAt: time.Now().UTC(), + result: result, + } + + return l.eg, egCtx, nil +} + +func (l *evacuationLimiter) Complete(err error) { + l.guard.Lock() + defer l.guard.Unlock() + + errMsq := "" + if err != nil { + errMsq = err.Error() + } + l.state.processState = EvacuateProcessStateCompleted + l.state.errMessage = errMsq + l.state.finishedAt = time.Now().UTC() + + l.eg = nil +} + +func (l *evacuationLimiter) GetState() *EvacuationState { + l.guard.RLock() + defer l.guard.RUnlock() + + return l.state.DeepCopy() +} + +func (l *evacuationLimiter) CancelIfRunning() error { + l.guard.Lock() + defer l.guard.Unlock() + + if l.state.processState != EvacuateProcessStateRunning { + return logicerr.New("there is no running evacuation task") + } + + l.cancel() + return nil +} diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index bc5b05ef..43737e7f 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strconv" "testing" + "time" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" @@ -21,6 +22,7 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" + "golang.org/x/sync/errgroup" ) func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { @@ -103,14 +105,14 @@ func TestEvacuateShard(t *testing.T) { t.Run("must be read-only", func(t *testing.T) { res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, ErrMustBeReadOnly) - require.Equal(t, 0, res.Count()) + require.Equal(t, uint64(0), res.Evacuated()) }) require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) - require.Equal(t, objPerShard, res.count) + require.Equal(t, uint64(objPerShard), res.Evacuated()) // We check that all objects are available both before and after shard removal. // First case is a real-world use-case. It ensures that an object can be put in presense @@ -121,7 +123,7 @@ func TestEvacuateShard(t *testing.T) { // Calling it again is OK, but all objects are already moved, so no new PUTs should be done. res, err = e.Evacuate(context.Background(), prm) require.NoError(t, err) - require.Equal(t, 0, res.count) + require.Equal(t, uint64(0), res.Evacuated()) checkHasObjects(t) @@ -138,8 +140,8 @@ func TestEvacuateNetwork(t *testing.T) { var errReplication = errors.New("handler error") - acceptOneOf := func(objects []*objectSDK.Object, max int) func(context.Context, oid.Address, *objectSDK.Object) error { - var n int + acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) error { + var n uint64 return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error { if n == max { return errReplication @@ -169,13 +171,13 @@ func TestEvacuateNetwork(t *testing.T) { res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errMustHaveTwoShards) - require.Equal(t, 0, res.Count()) + require.Equal(t, uint64(0), res.Evacuated()) prm.handler = acceptOneOf(objects, 2) res, err = e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) - require.Equal(t, 2, res.Count()) + require.Equal(t, uint64(2), res.Evacuated()) }) t.Run("multiple shards, evacuate one", func(t *testing.T) { t.Parallel() @@ -190,14 +192,14 @@ func TestEvacuateNetwork(t *testing.T) { res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) - require.Equal(t, 2, res.Count()) + require.Equal(t, uint64(2), res.Evacuated()) t.Run("no errors", func(t *testing.T) { prm.handler = acceptOneOf(objects, 3) res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) - require.Equal(t, 3, res.Count()) + require.Equal(t, uint64(3), res.Evacuated()) }) }) t.Run("multiple shards, evacuate many", func(t *testing.T) { @@ -205,12 +207,12 @@ func TestEvacuateNetwork(t *testing.T) { e, ids, objects := newEngineEvacuate(t, 4, 5) evacuateIDs := ids[0:3] - var totalCount int + var totalCount uint64 for i := range evacuateIDs { res, err := e.shards[ids[i].String()].List() require.NoError(t, err) - totalCount += len(res.AddressList()) + totalCount += uint64(len(res.AddressList())) } for i := range ids { @@ -223,14 +225,14 @@ func TestEvacuateNetwork(t *testing.T) { res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) - require.Equal(t, totalCount-1, res.Count()) + require.Equal(t, totalCount-1, res.Evacuated()) t.Run("no errors", func(t *testing.T) { prm.handler = acceptOneOf(objects, totalCount) res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) - require.Equal(t, totalCount, res.Count()) + require.Equal(t, totalCount, res.Evacuated()) }) }) } @@ -258,5 +260,114 @@ func TestEvacuateCancellation(t *testing.T) { res, err := e.Evacuate(ctx, prm) require.ErrorContains(t, err, "context canceled") - require.Equal(t, 0, res.Count()) + require.Equal(t, uint64(0), res.Evacuated()) +} + +func TestEvacuateSingleProcess(t *testing.T) { + e, ids, _ := newEngineEvacuate(t, 2, 3) + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + + blocker := make(chan interface{}) + running := make(chan interface{}) + + var prm EvacuateShardPrm + prm.shardID = ids[1:2] + prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + select { + case <-running: + default: + close(running) + } + <-blocker + return nil + } + + eg, egCtx := errgroup.WithContext(context.Background()) + eg.Go(func() error { + res, err := e.Evacuate(egCtx, prm) + require.NoError(t, err, "first evacuation failed") + require.Equal(t, uint64(3), res.Evacuated()) + return nil + }) + eg.Go(func() error { + <-running + res, err := e.Evacuate(egCtx, prm) + require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed") + require.Equal(t, uint64(0), res.Evacuated()) + close(blocker) + return nil + }) + require.NoError(t, eg.Wait()) +} + +func TestEvacuateAsync(t *testing.T) { + e, ids, _ := newEngineEvacuate(t, 2, 3) + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + + blocker := make(chan interface{}) + running := make(chan interface{}) + + var prm EvacuateShardPrm + prm.shardID = ids[1:2] + prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + select { + case <-running: + default: + close(running) + } + <-blocker + return nil + } + + st, err := e.GetEvacuationState(context.Background()) + require.NoError(t, err, "get init state failed") + require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state") + require.Equal(t, uint64(0), st.Evacuated(), "invalid init count") + require.Nil(t, st.StartedAt(), "invalid init started at") + require.Nil(t, st.FinishedAt(), "invalid init finished at") + require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") + require.Equal(t, "", st.ErrorMessage(), "invalid init error message") + + eg, egCtx := errgroup.WithContext(context.Background()) + eg.Go(func() error { + res, err := e.Evacuate(egCtx, prm) + require.NoError(t, err, "first evacuation failed") + require.Equal(t, uint64(3), res.Evacuated()) + return nil + }) + + <-running + + st, err = e.GetEvacuationState(context.Background()) + require.NoError(t, err, "get running state failed") + require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state") + require.Equal(t, uint64(0), st.Evacuated(), "invalid running count") + require.NotNil(t, st.StartedAt(), "invalid running started at") + require.Nil(t, st.FinishedAt(), "invalid init finished at") + expectedShardIDs := make([]string, 0, 2) + for _, id := range ids[1:2] { + expectedShardIDs = append(expectedShardIDs, id.String()) + } + require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid running shard ids") + require.Equal(t, "", st.ErrorMessage(), "invalid init error message") + + close(blocker) + + require.Eventually(t, func() bool { + st, err = e.GetEvacuationState(context.Background()) + return st.ProcessingStatus() == EvacuateProcessStateCompleted + }, 3*time.Second, 10*time.Millisecond, "invalid final state") + + require.NoError(t, err, "get final state failed") + require.Equal(t, uint64(3), st.Evacuated(), "invalid final count") + require.NotNil(t, st.StartedAt(), "invalid final started at") + require.NotNil(t, st.FinishedAt(), "invalid final finished at") + require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids") + require.Equal(t, "", st.ErrorMessage(), "invalid final error message") + + require.NoError(t, eg.Wait()) } diff --git a/pkg/local_object_storage/shard/count.go b/pkg/local_object_storage/shard/count.go new file mode 100644 index 00000000..b68c2f43 --- /dev/null +++ b/pkg/local_object_storage/shard/count.go @@ -0,0 +1,31 @@ +package shard + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// LogicalObjectsCount returns logical objects count. +func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) { + _, span := tracing.StartSpanFromContext(ctx, "Shard.LogicalObjectsCount", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + )) + defer span.End() + + s.m.RLock() + defer s.m.RUnlock() + + if s.GetMode().NoMetabase() { + return 0, ErrDegradedMode + } + + cc, err := s.metaBase.ObjectCounters() + if err != nil { + return 0, err + } + return cc.Logic(), nil +} diff --git a/pkg/services/control/server/convert.go b/pkg/services/control/server/convert.go new file mode 100644 index 00000000..1d29ed40 --- /dev/null +++ b/pkg/services/control/server/convert.go @@ -0,0 +1,60 @@ +package control + +import ( + "fmt" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "github.com/mr-tron/base58" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func stateToResponse(state *engine.EvacuationState) (*control.GetShardEvacuationStatusResponse, error) { + shardIDs := make([][]byte, 0, len(state.ShardIDs())) + for _, shID := range state.ShardIDs() { + id, err := base58.Decode(shID) + if err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("invalid shard id format: %s", shID)) + } + shardIDs = append(shardIDs, id) + } + var evacStatus control.GetShardEvacuationStatusResponse_Body_Status + switch state.ProcessingStatus() { + case engine.EvacuateProcessStateRunning: + evacStatus = control.GetShardEvacuationStatusResponse_Body_RUNNING + case engine.EvacuateProcessStateCompleted: + evacStatus = control.GetShardEvacuationStatusResponse_Body_COMPLETED + default: + evacStatus = control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED + } + var startedAt *control.GetShardEvacuationStatusResponse_Body_UnixTimestamp + if state.StartedAt() != nil { + startedAt = &control.GetShardEvacuationStatusResponse_Body_UnixTimestamp{ + Value: state.StartedAt().Unix(), + } + } + var duration *control.GetShardEvacuationStatusResponse_Body_Duration + if state.StartedAt() != nil { + end := time.Now().UTC() + if state.FinishedAt() != nil { + end = *state.FinishedAt() + } + duration = &control.GetShardEvacuationStatusResponse_Body_Duration{ + Seconds: int64(end.Sub(*state.StartedAt()).Seconds()), + } + } + return &control.GetShardEvacuationStatusResponse{ + Body: &control.GetShardEvacuationStatusResponse_Body{ + Shard_ID: shardIDs, + Evacuated: state.Evacuated(), + Total: state.Total(), + Failed: state.Failed(), + Status: evacStatus, + StartedAt: startedAt, + Duration: duration, + ErrorMessage: state.ErrorMessage(), + }, + }, nil +} diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index afa4011b..fc6dd3f6 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -37,7 +37,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe resp := &control.EvacuateShardResponse{ Body: &control.EvacuateShardResponse_Body{ - Count: uint32(res.Count()), + Count: uint32(res.Evacuated()), }, } diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index 94ddc73d..cdf3656e 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -2,19 +2,96 @@ package control import ( "context" - "fmt" + "errors" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) -func (s *Server) StartShardEvacuation(context.Context, *control.StartShardEvacuationRequest) (*control.StartShardEvacuationResponse, error) { - return nil, fmt.Errorf("not implemented") +func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartShardEvacuationRequest) (*control.StartShardEvacuationResponse, error) { + err := s.isValidRequest(req) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + + var prm engine.EvacuateShardPrm + prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID())) + prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) + prm.WithFaultHandler(s.replicate) + prm.WithAsync(true) + + _, err = s.s.Evacuate(ctx, prm) + if err != nil { + var logicalErr logicerr.Logical + if errors.As(err, &logicalErr) { + return nil, status.Error(codes.Aborted, err.Error()) + } + return nil, status.Error(codes.Internal, err.Error()) + } + + resp := &control.StartShardEvacuationResponse{ + Body: &control.StartShardEvacuationResponse_Body{}, + } + + err = SignMessage(s.key, resp) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return resp, nil } -func (s *Server) GetShardEvacuationStatus(context.Context, *control.GetShardEvacuationStatusRequest) (*control.GetShardEvacuationStatusResponse, error) { - return nil, fmt.Errorf("not implemented") +func (s *Server) GetShardEvacuationStatus(ctx context.Context, req *control.GetShardEvacuationStatusRequest) (*control.GetShardEvacuationStatusResponse, error) { + err := s.isValidRequest(req) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + + state, err := s.s.GetEvacuationState(ctx) + if err != nil { + var logicalErr logicerr.Logical + if errors.As(err, &logicalErr) { + return nil, status.Error(codes.Aborted, err.Error()) + } + return nil, status.Error(codes.Internal, err.Error()) + } + + resp, err := stateToResponse(state) + if err != nil { + return nil, err + } + + err = SignMessage(s.key, resp) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return resp, nil } -func (s *Server) StopShardEvacuation(context.Context, *control.StopShardEvacuationRequest) (*control.StopShardEvacuationResponse, error) { - return nil, fmt.Errorf("not implemented") +func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShardEvacuationRequest) (*control.StopShardEvacuationResponse, error) { + err := s.isValidRequest(req) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + + err = s.s.EnqueRunningEvacuationStop(ctx) + if err != nil { + var logicalErr logicerr.Logical + if errors.As(err, &logicalErr) { + return nil, status.Error(codes.Aborted, err.Error()) + } + return nil, status.Error(codes.Internal, err.Error()) + } + + resp := &control.StopShardEvacuationResponse{ + Body: &control.StopShardEvacuationResponse_Body{}, + } + + err = SignMessage(s.key, resp) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return resp, nil } diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index ca35ae04..a80deb2d 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -27,6 +27,7 @@ service ControlService { rpc SynchronizeTree (SynchronizeTreeRequest) returns (SynchronizeTreeResponse); // EvacuateShard moves all data from one shard to the others. + // Deprecated: Use StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation rpc EvacuateShard (EvacuateShardRequest) returns (EvacuateShardResponse); // StartShardEvacuation starts moving all data from one shard to the others. diff --git a/pkg/services/control/service_grpc.pb.go b/pkg/services/control/service_grpc.pb.go index 96642153363125f6062c7ca65bf27631ad2f980b..8afc6086aabe13e9d7226973518d980aac746c5e 100644 GIT binary patch delta 89 zcmZ3xo$=-l#tm9LLi+j&E~y1YsmX~YsVP KQD?83X9fW2dm5eq delta 19 bcmcb)gK_P4#tm9Ln;Uq