[#329] node: Make evacuate async

Now it's possible to run evacuate shard in async.
Also only one evacuate process can be in progress.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-05-04 13:58:26 +03:00 committed by Evgenii Stratonikov
parent 100b1b5128
commit e4889e06ba
11 changed files with 669 additions and 37 deletions

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -14,6 +15,9 @@ import (
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -24,11 +28,23 @@ type EvacuateShardPrm struct {
shardID []*shard.ID
handler func(context.Context, oid.Address, *objectSDK.Object) error
ignoreErrors bool
async bool
}
// EvacuateShardRes represents result of the EvacuateShard operation.
type EvacuateShardRes struct {
count int
evacuated *atomic.Uint64
total *atomic.Uint64
failed *atomic.Uint64
}
// NewEvacuateShardRes creates new EvacuateShardRes instance.
func NewEvacuateShardRes() *EvacuateShardRes {
return &EvacuateShardRes{
evacuated: atomic.NewUint64(0),
total: atomic.NewUint64(0),
failed: atomic.NewUint64(0),
}
}
// WithShardIDList sets shard ID.
@ -46,10 +62,46 @@ func (p *EvacuateShardPrm) WithFaultHandler(f func(context.Context, oid.Address,
p.handler = f
}
// Count returns amount of evacuated objects.
// WithAsync sets flag to run evacuate async.
func (p *EvacuateShardPrm) WithAsync(async bool) {
p.async = async
}
// Evacuated returns amount of evacuated objects.
// Objects for which handler returned no error are also assumed evacuated.
func (p EvacuateShardRes) Count() int {
return p.count
func (p *EvacuateShardRes) Evacuated() uint64 {
if p == nil {
return 0
}
return p.evacuated.Load()
}
// Total returns total count objects to evacuate.
func (p *EvacuateShardRes) Total() uint64 {
if p == nil {
return 0
}
return p.total.Load()
}
// Failed returns count of failed objects to evacuate.
func (p *EvacuateShardRes) Failed() uint64 {
if p == nil {
return 0
}
return p.failed.Load()
}
// DeepCopy returns deep copy of result instance.
func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
if p == nil {
return nil
}
return &EvacuateShardRes{
evacuated: atomic.NewUint64(p.evacuated.Load()),
total: atomic.NewUint64(p.total.Load()),
failed: atomic.NewUint64(p.failed.Load()),
}
}
const defaultEvacuateBatchSize = 100
@ -63,15 +115,29 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (EvacuateShardRes, error) {
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
shardIDs := make([]string, len(prm.shardID))
for i := range prm.shardID {
shardIDs[i] = prm.shardID[i].String()
}
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.async),
attribute.Bool("ignoreErrors", prm.ignoreErrors),
))
defer span.End()
shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil)
if err != nil {
return EvacuateShardRes{}, err
return nil, err
}
shardsToEvacuate := make(map[string]*shard.Shard)
@ -83,23 +149,91 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (Eva
}
}
res := NewEvacuateShardRes()
ctx = ctxOrBackground(ctx, prm.async)
eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
if err != nil {
return nil, err
}
eg.Go(func() error {
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, weights, shardsToEvacuate)
})
if prm.async {
return nil, nil
}
return res, eg.Wait()
}
func ctxOrBackground(ctx context.Context, background bool) context.Context {
if background {
return context.Background()
}
return ctx
}
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
var err error
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.async),
attribute.Bool("ignoreErrors", prm.ignoreErrors),
))
defer func() {
span.End()
e.evacuateLimiter.Complete(err)
}()
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs))
var res EvacuateShardRes
err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res)
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err))
return err
}
for _, shardID := range shardIDs {
if err = e.evacuateShard(ctx, shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil {
if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs))
return res, err
return err
}
}
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs))
return res, nil
return nil
}
func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotalObjectsCount")
defer span.End()
for _, sh := range shardsToEvacuate {
cnt, err := sh.LogicalObjectsCount(ctx)
if err != nil {
if errors.Is(err, shard.ErrDegradedMode) {
continue
}
return err
}
res.total.Add(cnt)
}
return nil
}
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
trace.WithAttributes(
attribute.String("shardID", shardID),
))
defer span.End()
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
@ -116,6 +250,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
}
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err))
return err
}
@ -168,6 +303,12 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool)
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
trace.WithAttributes(
attribute.Int("objects_count", len(toEvacuate)),
))
defer span.End()
for i := range toEvacuate {
select {
case <-ctx.Done():
@ -182,12 +323,14 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
if prm.ignoreErrors {
res.failed.Inc()
continue
}
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
return err
}
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate)
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, weights, shardsToEvacuate, res)
if err != nil {
return err
}
@ -204,15 +347,16 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
err = prm.handler(ctx, addr, getRes.Object())
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
return err
}
res.count++
res.evacuated.Inc()
}
return nil
}
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) (bool, error) {
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) (bool, error) {
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
for j := range shards {
select {
@ -227,11 +371,11 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
if putDone || exists {
if putDone {
res.evacuated.Inc()
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
zap.Stringer("from", sh.ID()),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", addr))
res.count++
}
return true, nil
}
@ -239,3 +383,23 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
return false, nil
}
func (e *StorageEngine) GetEvacuationState(ctx context.Context) (*EvacuationState, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return e.evacuateLimiter.GetState(), nil
}
func (e *StorageEngine) EnqueRunningEvacuationStop(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return e.evacuateLimiter.CancelIfRunning()
}