frostfs-node/pkg/local_object_storage/engine/evacuate.go
2024-12-18 15:52:26 +00:00

878 lines
25 KiB
Go

package engine
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
const (
// containerWorkerCountDefault is a default value of the count of
// concurrent container evacuation workers.
containerWorkerCountDefault = 10
// objectWorkerCountDefault is a default value of the count of
// concurrent object evacuation workers.
objectWorkerCountDefault = 10
)
var (
ErrMustBeReadOnly = logicerr.New("shard must be in read-only mode")
evacuationOperationLogField = zap.String("operation", "evacuation")
)
// EvacuateScope is an evacuation scope. Keep in sync with pkg/services/control/service.proto.
type EvacuateScope uint32
var (
EvacuateScopeObjects EvacuateScope = 1
EvacuateScopeTrees EvacuateScope = 2
)
func (s EvacuateScope) String() string {
var sb strings.Builder
first := true
if s&EvacuateScopeObjects == EvacuateScopeObjects {
sb.WriteString("objects")
first = false
}
if s&EvacuateScopeTrees == EvacuateScopeTrees {
if !first {
sb.WriteString(";")
}
sb.WriteString("trees")
}
return sb.String()
}
func (s EvacuateScope) WithObjects() bool {
return s&EvacuateScopeObjects == EvacuateScopeObjects
}
func (s EvacuateScope) WithTrees() bool {
return s&EvacuateScopeTrees == EvacuateScopeTrees
}
func (s EvacuateScope) TreesOnly() bool {
return s == EvacuateScopeTrees
}
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
type EvacuateShardPrm struct {
ShardID []*shard.ID
ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error)
TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (bool, string, error)
IgnoreErrors bool
Scope EvacuateScope
RepOneOnly bool
ContainerWorkerCount uint32
ObjectWorkerCount uint32
}
// EvacuateShardRes represents result of the EvacuateShard operation.
type EvacuateShardRes struct {
objEvacuated *atomic.Uint64
objTotal *atomic.Uint64
objFailed *atomic.Uint64
objSkipped *atomic.Uint64
trEvacuated *atomic.Uint64
trTotal *atomic.Uint64
trFailed *atomic.Uint64
}
// NewEvacuateShardRes creates new EvacuateShardRes instance.
func NewEvacuateShardRes() *EvacuateShardRes {
return &EvacuateShardRes{
objEvacuated: new(atomic.Uint64),
objTotal: new(atomic.Uint64),
objFailed: new(atomic.Uint64),
objSkipped: new(atomic.Uint64),
trEvacuated: new(atomic.Uint64),
trTotal: new(atomic.Uint64),
trFailed: new(atomic.Uint64),
}
}
// ObjectsEvacuated returns amount of evacuated objects.
// Objects for which handler returned no error are also assumed evacuated.
func (p *EvacuateShardRes) ObjectsEvacuated() uint64 {
if p == nil {
return 0
}
return p.objEvacuated.Load()
}
// ObjectsTotal returns total count objects to evacuate.
func (p *EvacuateShardRes) ObjectsTotal() uint64 {
if p == nil {
return 0
}
return p.objTotal.Load()
}
// ObjectsFailed returns count of failed objects to evacuate.
func (p *EvacuateShardRes) ObjectsFailed() uint64 {
if p == nil {
return 0
}
return p.objFailed.Load()
}
// ObjectsSkipped returns count of skipped objects.
func (p *EvacuateShardRes) ObjectsSkipped() uint64 {
if p == nil {
return 0
}
return p.objSkipped.Load()
}
// TreesEvacuated returns amount of evacuated trees.
func (p *EvacuateShardRes) TreesEvacuated() uint64 {
if p == nil {
return 0
}
return p.trEvacuated.Load()
}
// TreesTotal returns total count trees to evacuate.
func (p *EvacuateShardRes) TreesTotal() uint64 {
if p == nil {
return 0
}
return p.trTotal.Load()
}
// TreesFailed returns count of failed trees to evacuate.
func (p *EvacuateShardRes) TreesFailed() uint64 {
if p == nil {
return 0
}
return p.trFailed.Load()
}
// DeepCopy returns deep copy of result instance.
func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
if p == nil {
return nil
}
res := &EvacuateShardRes{
objEvacuated: new(atomic.Uint64),
objTotal: new(atomic.Uint64),
objFailed: new(atomic.Uint64),
objSkipped: new(atomic.Uint64),
trEvacuated: new(atomic.Uint64),
trTotal: new(atomic.Uint64),
trFailed: new(atomic.Uint64),
}
res.objEvacuated.Store(p.objEvacuated.Load())
res.objTotal.Store(p.objTotal.Load())
res.objFailed.Store(p.objFailed.Load())
res.objSkipped.Store(p.objSkipped.Load())
res.trTotal.Store(p.trTotal.Load())
res.trEvacuated.Store(p.trEvacuated.Load())
res.trFailed.Store(p.trFailed.Load())
return res
}
type pooledShard struct {
hashedShard
pool util.WorkerPool
}
var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
shardIDs := make([]string, len(prm.ShardID))
for i := range prm.ShardID {
shardIDs[i] = prm.ShardID[i].String()
}
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
))
defer span.End()
shards, err := e.getActualShards(shardIDs, prm)
if err != nil {
return err
}
shardsToEvacuate := make(map[string]*shard.Shard)
for i := range shardIDs {
for j := range shards {
if shards[j].ID().String() == shardIDs[i] {
shardsToEvacuate[shardIDs[i]] = shards[j].Shard
}
}
}
res := NewEvacuateShardRes()
ctx = context.WithoutCancel(ctx)
eg, ctx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
if err != nil {
return err
}
var mtx sync.RWMutex
copyShards := func() []pooledShard {
mtx.RLock()
defer mtx.RUnlock()
t := make([]pooledShard, len(shards))
copy(t, shards)
return t
}
eg.Go(func() error {
return e.evacuateShards(ctx, shardIDs, prm, res, copyShards, shardsToEvacuate)
})
return nil
}
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
var err error
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
attribute.Bool("repOneOnly", prm.RepOneOnly),
))
defer func() {
span.End()
e.evacuateLimiter.Complete(err)
}()
e.log.Info(ctx, logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
err = e.getTotals(ctx, prm, shardsToEvacuate, res)
if err != nil {
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
ctx, cancel, egShard, egContainer, egObject := e.createErrorGroupsForEvacuation(ctx, prm)
continueLoop := true
for i := 0; continueLoop && i < len(shardIDs); i++ {
select {
case <-ctx.Done():
continueLoop = false
default:
egShard.Go(func() error {
err := e.evacuateShard(ctx, cancel, shardIDs[i], prm, res, shards, shardsToEvacuate, egContainer, egObject)
if err != nil {
cancel(err)
}
return err
})
}
}
err = egShard.Wait()
if err != nil {
err = fmt.Errorf("shard error: %w", err)
}
errContainer := egContainer.Wait()
errObject := egObject.Wait()
if errContainer != nil {
err = errors.Join(err, fmt.Errorf("container error: %w", errContainer))
}
if errObject != nil {
err = errors.Join(err, fmt.Errorf("object error: %w", errObject))
}
if err != nil {
e.log.Error(ctx, logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
e.log.Info(ctx, logs.EngineFinishedSuccessfullyShardsEvacuation,
zap.Strings("shard_ids", shardIDs),
evacuationOperationLogField,
zap.Uint64("total_objects", res.ObjectsTotal()),
zap.Uint64("evacuated_objects", res.ObjectsEvacuated()),
zap.Uint64("failed_objects", res.ObjectsFailed()),
zap.Uint64("skipped_objects", res.ObjectsSkipped()),
zap.Uint64("total_trees", res.TreesTotal()),
zap.Uint64("evacuated_trees", res.TreesEvacuated()),
zap.Uint64("failed_trees", res.TreesFailed()),
)
return nil
}
func (e *StorageEngine) createErrorGroupsForEvacuation(ctx context.Context, prm EvacuateShardPrm) (
context.Context, context.CancelCauseFunc, *errgroup.Group, *errgroup.Group, *errgroup.Group,
) {
operationCtx, cancel := context.WithCancelCause(ctx)
egObject, _ := errgroup.WithContext(operationCtx)
objectWorkerCount := prm.ObjectWorkerCount
if objectWorkerCount == 0 {
objectWorkerCount = objectWorkerCountDefault
}
egObject.SetLimit(int(objectWorkerCount))
egContainer, _ := errgroup.WithContext(operationCtx)
containerWorkerCount := prm.ContainerWorkerCount
if containerWorkerCount == 0 {
containerWorkerCount = containerWorkerCountDefault
}
egContainer.SetLimit(int(containerWorkerCount))
egShard, _ := errgroup.WithContext(operationCtx)
return operationCtx, cancel, egShard, egContainer, egObject
}
func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals")
defer span.End()
for _, sh := range shardsToEvacuate {
if prm.Scope.WithObjects() {
cnt, err := sh.LogicalObjectsCount(ctx)
if err != nil {
if errors.Is(err, shard.ErrDegradedMode) {
continue
}
return err
}
res.objTotal.Add(cnt)
}
if prm.Scope.WithTrees() && sh.PiloramaEnabled() {
cnt, err := pilorama.TreeCountAll(ctx, sh)
if err != nil {
return err
}
res.trTotal.Add(cnt)
}
}
return nil
}
func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
trace.WithAttributes(
attribute.String("shardID", shardID),
))
defer span.End()
if prm.Scope.WithObjects() {
if err := e.evacuateShardObjects(ctx, cancel, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil {
return err
}
}
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
return err
}
}
return nil
}
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
) error {
sh := shardsToEvacuate[shardID]
var cntPrm shard.IterateOverContainersPrm
cntPrm.Handler = func(ctx context.Context, objType objectSDK.Type, cnt cid.ID) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
egContainer.Go(func() error {
var skip bool
c, err := e.containerSource.Load().cs.Get(cnt)
if err != nil {
if client.IsErrContainerNotFound(err) {
skip = true
} else {
return err
}
}
if !skip && prm.RepOneOnly {
skip = e.isNotRepOne(c)
}
if skip {
countPrm := shard.CountAliveObjectsInContainerPrm{
ObjectType: objType,
ContainerID: cnt,
}
count, err := sh.CountAliveObjectsInContainer(ctx, countPrm)
if err != nil {
return err
}
res.objSkipped.Add(count)
return nil
}
var objPrm shard.IterateOverObjectsInContainerPrm
objPrm.ObjectType = objType
objPrm.ContainerID = cnt
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
egObject.Go(func() error {
err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate, c.Value)
if err != nil {
cancel(err)
}
return err
})
return nil
}
err = sh.IterateOverObjectsInContainer(ctx, objPrm)
if err != nil {
cancel(err)
}
return err
})
return nil
}
sh.SetEvacuationInProgress(true)
err := sh.IterateOverContainers(ctx, cntPrm)
if err != nil {
cancel(err)
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return err
}
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
sh := shardsToEvacuate[shardID]
shards := getShards()
var listPrm pilorama.TreeListTreesPrm
first := true
for len(listPrm.NextPageToken) > 0 || first {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
first = false
listRes, err := sh.TreeListTrees(ctx, listPrm)
if err != nil {
return err
}
listPrm.NextPageToken = listRes.NextPageToken
if err := e.evacuateTrees(ctx, sh, listRes.Items, prm, res, shards, shardsToEvacuate); err != nil {
return err
}
}
return nil
}
func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, trees []pilorama.ContainerIDTreeID,
prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateTrees",
trace.WithAttributes(
attribute.Int("trees_count", len(trees)),
))
defer span.End()
for _, contTree := range trees {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
success, shardID, err := e.tryEvacuateTreeLocal(ctx, sh, contTree, prm, shards, shardsToEvacuate)
if err != nil {
return err
}
if success {
e.log.Debug(ctx, logs.EngineShardsEvacuationTreeEvacuatedLocal,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
zap.String("from_shard_id", sh.ID().String()), zap.String("to_shard_id", shardID),
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
res.trEvacuated.Add(1)
continue
}
moved, nodePK, err := e.evacuateTreeToOtherNode(ctx, sh, contTree, prm)
if err != nil {
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToMoveTree,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
if moved {
e.log.Debug(ctx, logs.EngineShardsEvacuationTreeEvacuatedRemote,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("treeID", contTree.TreeID),
zap.String("from_shardID", sh.ID().String()), zap.String("to_node", nodePK),
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
res.trEvacuated.Add(1)
} else if prm.IgnoreErrors {
res.trFailed.Add(1)
e.log.Warn(ctx, logs.EngineShardsEvacuationFailedToMoveTree,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToMoveTree,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return fmt.Errorf("no remote nodes available to replicate tree '%s' of container %s", contTree.TreeID, contTree.CID)
}
}
return nil
}
func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (bool, string, error) {
if prm.TreeHandler == nil {
return false, "", fmt.Errorf("evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
}
return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh)
}
func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID,
prm EvacuateShardPrm, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) (bool, string, error) {
target, found, err := e.findShardToEvacuateTree(ctx, tree, shards, shardsToEvacuate)
if err != nil {
return false, "", err
}
if !found {
return false, "", nil
}
const readBatchSize = 1000
source := make(chan *pilorama.Move, readBatchSize)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
var applyErr error
go func() {
defer wg.Done()
applyErr = target.TreeApplyStream(ctx, tree.CID, tree.TreeID, source)
if applyErr != nil {
cancel()
}
}()
var height uint64
for {
op, err := sh.TreeGetOpLog(ctx, tree.CID, tree.TreeID, height)
if err != nil {
cancel()
wg.Wait()
close(source) // close after cancel to ctx.Done() hits first
if prm.IgnoreErrors {
return false, "", nil
}
return false, "", err
}
if op.Time == 0 { // completed get op log
close(source)
wg.Wait()
if applyErr == nil {
return true, target.ID().String(), nil
}
if prm.IgnoreErrors {
return false, "", nil
}
return false, "", applyErr
}
select {
case <-ctx.Done(): // apply stream failed or operation cancelled
wg.Wait()
if prm.IgnoreErrors {
return false, "", nil
}
if applyErr != nil {
return false, "", applyErr
}
return false, "", ctx.Err()
case source <- &op:
}
height = op.Time + 1
}
}
// findShardToEvacuateTree returns first shard according HRW or first shard with tree exists.
func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilorama.ContainerIDTreeID,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) (pooledShard, bool, error) {
hrw.SortHasherSliceByValue(shards, hrw.StringHash(tree.CID.EncodeToString()))
var result pooledShard
var found bool
for _, target := range shards {
select {
case <-ctx.Done():
return pooledShard{}, false, ctx.Err()
default:
}
if _, ok := shardsToEvacuate[target.ID().String()]; ok {
continue
}
if !target.PiloramaEnabled() || target.GetMode().ReadOnly() {
continue
}
if !found {
result = target
found = true
}
exists, err := target.TreeExists(ctx, tree.CID, tree.TreeID)
if err != nil {
continue
}
if exists {
return target, true, nil
}
}
return result, found, nil
}
func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, error) {
e.mtx.RLock()
defer e.mtx.RUnlock()
for i := range shardIDs {
sh, ok := e.shards[shardIDs[i]]
if !ok {
return nil, errShardNotFound
}
if !sh.GetMode().ReadOnly() {
return nil, ErrMustBeReadOnly
}
if prm.Scope.TreesOnly() && !sh.PiloramaEnabled() {
return nil, fmt.Errorf("shard %s doesn't have pilorama enabled", sh.ID())
}
}
if len(e.shards)-len(shardIDs) < 1 && prm.ObjectsHandler == nil && prm.Scope.WithObjects() {
return nil, errMustHaveTwoShards
}
if len(e.shards)-len(shardIDs) < 1 && prm.TreeHandler == nil && prm.Scope.WithTrees() {
return nil, errMustHaveTwoShards
}
// We must have all shards, to have correct information about their
// indexes in a sorted slice and set appropriate marks in the metabase.
// Evacuated shard is skipped during put.
shards := make([]pooledShard, 0, len(e.shards))
for id := range e.shards {
shards = append(shards, pooledShard{
hashedShard: hashedShard(e.shards[id]),
pool: e.shardPools[id],
})
}
return shards, nil
}
func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, cnr containerSDK.Container,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects")
defer span.End()
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
shards := getShards()
addr := objInfo.Address
var getPrm shard.GetPrm
getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true)
getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm)
if err != nil {
if prm.IgnoreErrors {
res.objFailed.Add(1)
return nil
}
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res, cnr)
if err != nil {
return err
}
if evacuatedLocal {
return nil
}
if prm.ObjectsHandler == nil {
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return fmt.Errorf("%w: %s", errPutShard, objInfo)
}
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
if err != nil {
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
if moved {
res.objEvacuated.Add(1)
} else if prm.IgnoreErrors {
res.objFailed.Add(1)
e.log.Warn(ctx, logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
return fmt.Errorf("object %s was not replicated", addr)
}
return nil
}
func (e *StorageEngine) isNotRepOne(c *container.Container) bool {
p := c.Value.PlacementPolicy()
for i := range p.NumberOfReplicas() {
if p.ReplicaDescriptor(i).NumberOfObjects() > 1 {
return true
}
}
return false
}
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, cnr containerSDK.Container,
) (bool, error) {
hrw.SortHasherSliceByValue(shards, hrw.StringHash(addr.EncodeToString()))
for j := range shards {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
continue
}
switch e.putToShard(ctx, shards[j].hashedShard, shards[j].pool, addr, object, container.IsIndexedContainer(cnr)).status {
case putToShardSuccess:
res.objEvacuated.Add(1)
e.log.Debug(ctx, logs.EngineObjectIsMovedToAnotherShard,
zap.Stringer("from", sh.ID()),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", addr),
evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return true, nil
case putToShardExists, putToShardRemoved:
res.objSkipped.Add(1)
return true, nil
default:
continue
}
}
return false, nil
}
func (e *StorageEngine) GetEvacuationState(ctx context.Context) (*EvacuationState, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return e.evacuateLimiter.GetState(), nil
}
func (e *StorageEngine) EnqueRunningEvacuationStop(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return e.evacuateLimiter.CancelIfRunning()
}
func (e *StorageEngine) ResetEvacuationStatus(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return e.evacuateLimiter.ResetEvacuationStatus()
}
func (e *StorageEngine) ResetEvacuationStatusForShards() {
e.mtx.RLock()
defer e.mtx.RUnlock()
for _, sh := range e.shards {
sh.SetEvacuationInProgress(false)
}
}