package engine import ( "context" "errors" "fmt" "strings" "sync" "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/hrw" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) const ( // containerWorkerCountDefault is a default value of the count of // concurrent container evacuation workers. containerWorkerCountDefault = 10 // objectWorkerCountDefault is a default value of the count of // concurrent object evacuation workers. objectWorkerCountDefault = 10 ) var ( ErrMustBeReadOnly = logicerr.New("shard must be in read-only mode") evacuationOperationLogField = zap.String("operation", "evacuation") ) // EvacuateScope is an evacuation scope. Keep in sync with pkg/services/control/service.proto. type EvacuateScope uint32 var ( EvacuateScopeObjects EvacuateScope = 1 EvacuateScopeTrees EvacuateScope = 2 ) func (s EvacuateScope) String() string { var sb strings.Builder first := true if s&EvacuateScopeObjects == EvacuateScopeObjects { if !first { sb.WriteString(";") } sb.WriteString("objects") first = false } if s&EvacuateScopeTrees == EvacuateScopeTrees { if !first { sb.WriteString(";") } sb.WriteString("trees") } return sb.String() } func (s EvacuateScope) WithObjects() bool { return s&EvacuateScopeObjects == EvacuateScopeObjects } func (s EvacuateScope) WithTrees() bool { return s&EvacuateScopeTrees == EvacuateScopeTrees } func (s EvacuateScope) TreesOnly() bool { return s == EvacuateScopeTrees } // EvacuateShardPrm represents parameters for the EvacuateShard operation. type EvacuateShardPrm struct { ShardID []*shard.ID ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error) TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (bool, string, error) IgnoreErrors bool Async bool Scope EvacuateScope RepOneOnly bool ContainerWorkerCount uint32 ObjectWorkerCount uint32 } // EvacuateShardRes represents result of the EvacuateShard operation. type EvacuateShardRes struct { objEvacuated *atomic.Uint64 objTotal *atomic.Uint64 objFailed *atomic.Uint64 objSkipped *atomic.Uint64 trEvacuated *atomic.Uint64 trTotal *atomic.Uint64 trFailed *atomic.Uint64 } // NewEvacuateShardRes creates new EvacuateShardRes instance. func NewEvacuateShardRes() *EvacuateShardRes { return &EvacuateShardRes{ objEvacuated: new(atomic.Uint64), objTotal: new(atomic.Uint64), objFailed: new(atomic.Uint64), objSkipped: new(atomic.Uint64), trEvacuated: new(atomic.Uint64), trTotal: new(atomic.Uint64), trFailed: new(atomic.Uint64), } } // ObjectsEvacuated returns amount of evacuated objects. // Objects for which handler returned no error are also assumed evacuated. func (p *EvacuateShardRes) ObjectsEvacuated() uint64 { if p == nil { return 0 } return p.objEvacuated.Load() } // ObjectsTotal returns total count objects to evacuate. func (p *EvacuateShardRes) ObjectsTotal() uint64 { if p == nil { return 0 } return p.objTotal.Load() } // ObjectsFailed returns count of failed objects to evacuate. func (p *EvacuateShardRes) ObjectsFailed() uint64 { if p == nil { return 0 } return p.objFailed.Load() } // ObjectsSkipped returns count of skipped objects. func (p *EvacuateShardRes) ObjectsSkipped() uint64 { if p == nil { return 0 } return p.objSkipped.Load() } // TreesEvacuated returns amount of evacuated trees. func (p *EvacuateShardRes) TreesEvacuated() uint64 { if p == nil { return 0 } return p.trEvacuated.Load() } // TreesTotal returns total count trees to evacuate. func (p *EvacuateShardRes) TreesTotal() uint64 { if p == nil { return 0 } return p.trTotal.Load() } // TreesFailed returns count of failed trees to evacuate. func (p *EvacuateShardRes) TreesFailed() uint64 { if p == nil { return 0 } return p.trFailed.Load() } // DeepCopy returns deep copy of result instance. func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes { if p == nil { return nil } res := &EvacuateShardRes{ objEvacuated: new(atomic.Uint64), objTotal: new(atomic.Uint64), objFailed: new(atomic.Uint64), objSkipped: new(atomic.Uint64), trEvacuated: new(atomic.Uint64), trTotal: new(atomic.Uint64), trFailed: new(atomic.Uint64), } res.objEvacuated.Store(p.objEvacuated.Load()) res.objTotal.Store(p.objTotal.Load()) res.objFailed.Store(p.objFailed.Load()) res.objSkipped.Store(p.objSkipped.Load()) res.trTotal.Store(p.trTotal.Load()) res.trEvacuated.Store(p.trEvacuated.Load()) res.trFailed.Store(p.trFailed.Load()) return res } type pooledShard struct { hashedShard pool util.WorkerPool } var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // Evacuate moves data from one shard to the others. // The shard being moved must be in read-only mode. func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) { select { case <-ctx.Done(): return nil, ctx.Err() default: } shardIDs := make([]string, len(prm.ShardID)) for i := range prm.ShardID { shardIDs[i] = prm.ShardID[i].String() } ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate", trace.WithAttributes( attribute.StringSlice("shardIDs", shardIDs), attribute.Bool("async", prm.Async), attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Stringer("scope", prm.Scope), )) defer span.End() shards, err := e.getActualShards(shardIDs, prm) if err != nil { return nil, err } shardsToEvacuate := make(map[string]*shard.Shard) for i := range shardIDs { for j := range shards { if shards[j].ID().String() == shardIDs[i] { shardsToEvacuate[shardIDs[i]] = shards[j].Shard } } } res := NewEvacuateShardRes() ctx = ctxOrBackground(ctx, prm.Async) eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res) if err != nil { return nil, err } var mtx sync.RWMutex copyShards := func() []pooledShard { mtx.RLock() defer mtx.RUnlock() t := make([]pooledShard, len(shards)) copy(t, shards) return t } eg.Go(func() error { return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate) }) if prm.Async { return nil, nil } return res, eg.Wait() } func ctxOrBackground(ctx context.Context, background bool) context.Context { if background { return context.Background() } return ctx } func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { var err error ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", trace.WithAttributes( attribute.StringSlice("shardIDs", shardIDs), attribute.Bool("async", prm.Async), attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Stringer("scope", prm.Scope), attribute.Bool("repOneOnly", prm.RepOneOnly), )) defer func() { span.End() e.evacuateLimiter.Complete(err) }() e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) err = e.getTotals(ctx, prm, shardsToEvacuate, res) if err != nil { e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) return err } ctx, cancel, egShard, egContainer, egObject := e.createErrorGroupsForEvacuation(ctx, prm) continueLoop := true for i := 0; continueLoop && i < len(shardIDs); i++ { select { case <-ctx.Done(): continueLoop = false default: egShard.Go(func() error { err := e.evacuateShard(ctx, cancel, shardIDs[i], prm, res, shards, shardsToEvacuate, egContainer, egObject) if err != nil { cancel(err) } return err }) } } err = egShard.Wait() if err != nil { err = fmt.Errorf("shard error: %w", err) } errContainer := egContainer.Wait() errObject := egObject.Wait() if errContainer != nil { err = errors.Join(err, fmt.Errorf("container error: %w", errContainer)) } if errObject != nil { err = errors.Join(err, fmt.Errorf("object error: %w", errObject)) } if err != nil { e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) return err } e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, zap.Uint64("total_objects", res.ObjectsTotal()), zap.Uint64("evacuated_objects", res.ObjectsEvacuated()), zap.Uint64("failed_objects", res.ObjectsFailed()), zap.Uint64("skipped_objects", res.ObjectsSkipped()), zap.Uint64("total_trees", res.TreesTotal()), zap.Uint64("evacuated_trees", res.TreesEvacuated()), zap.Uint64("failed_trees", res.TreesFailed()), ) return nil } func (e *StorageEngine) createErrorGroupsForEvacuation(ctx context.Context, prm EvacuateShardPrm) ( context.Context, context.CancelCauseFunc, *errgroup.Group, *errgroup.Group, *errgroup.Group, ) { operationCtx, cancel := context.WithCancelCause(ctx) egObject, _ := errgroup.WithContext(operationCtx) objectWorkerCount := prm.ObjectWorkerCount if objectWorkerCount == 0 { objectWorkerCount = objectWorkerCountDefault } egObject.SetLimit(int(objectWorkerCount)) egContainer, _ := errgroup.WithContext(operationCtx) containerWorkerCount := prm.ContainerWorkerCount if containerWorkerCount == 0 { containerWorkerCount = containerWorkerCountDefault } egContainer.SetLimit(int(containerWorkerCount)) egShard, _ := errgroup.WithContext(operationCtx) return operationCtx, cancel, egShard, egContainer, egObject } func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals") defer span.End() for _, sh := range shardsToEvacuate { if prm.Scope.WithObjects() { cnt, err := sh.LogicalObjectsCount(ctx) if err != nil { if errors.Is(err, shard.ErrDegradedMode) { continue } return err } res.objTotal.Add(cnt) } if prm.Scope.WithTrees() && sh.PiloramaEnabled() { cnt, err := pilorama.TreeCountAll(ctx, sh) if err != nil { return err } res.trTotal.Add(cnt) } } return nil } func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, egContainer *errgroup.Group, egObject *errgroup.Group, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard", trace.WithAttributes( attribute.String("shardID", shardID), )) defer span.End() if prm.Scope.WithObjects() { if err := e.evacuateShardObjects(ctx, cancel, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil { return err } } if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() { if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { return err } } return nil } func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, egContainer *errgroup.Group, egObject *errgroup.Group, ) error { sh := shardsToEvacuate[shardID] var cntPrm shard.IterateOverContainersPrm cntPrm.Handler = func(ctx context.Context, name []byte, cnt cid.ID) error { select { case <-ctx.Done(): return context.Cause(ctx) default: } egContainer.Go(func() error { var skip bool c, err := e.containerSource.Load().cs.Get(cnt) if err != nil { if client.IsErrContainerNotFound(err) { skip = true } else { return err } } if !skip && prm.RepOneOnly { skip = e.isNotRepOne(c) } if skip { countPrm := shard.CountAliveObjectsInBucketPrm{BucketName: name} count, err := sh.CountAliveObjectsInBucket(ctx, countPrm) if err != nil { return err } res.objSkipped.Add(count) return nil } var objPrm shard.IterateOverObjectsInContainerPrm objPrm.BucketName = name objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error { select { case <-ctx.Done(): return context.Cause(ctx) default: } egObject.Go(func() error { err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate) if err != nil { cancel(err) } return err }) return nil } err = sh.IterateOverObjectsInContainer(ctx, objPrm) if err != nil { cancel(err) } return err }) return nil } sh.SetEvacuationInProgress(true) err := sh.IterateOverContainers(ctx, cntPrm) if err != nil { cancel(err) e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) } return err } func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { sh := shardsToEvacuate[shardID] shards := getShards() var listPrm pilorama.TreeListTreesPrm first := true for len(listPrm.NextPageToken) > 0 || first { select { case <-ctx.Done(): return ctx.Err() default: } first = false listRes, err := sh.TreeListTrees(ctx, listPrm) if err != nil { return err } listPrm.NextPageToken = listRes.NextPageToken if err := e.evacuateTrees(ctx, sh, listRes.Items, prm, res, shards, shardsToEvacuate); err != nil { return err } } return nil } func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, trees []pilorama.ContainerIDTreeID, prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateTrees", trace.WithAttributes( attribute.Int("trees_count", len(trees)), )) defer span.End() for _, contTree := range trees { select { case <-ctx.Done(): return ctx.Err() default: } success, shardID, err := e.tryEvacuateTreeLocal(ctx, sh, contTree, prm, shards, shardsToEvacuate) if err != nil { return err } if success { e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedLocal, zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID), zap.String("from_shard_id", sh.ID().String()), zap.String("to_shard_id", shardID), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) res.trEvacuated.Add(1) continue } moved, nodePK, err := e.evacuateTreeToOtherNode(ctx, sh, contTree, prm) if err != nil { e.log.Error(logs.EngineShardsEvacuationFailedToMoveTree, zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID), zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField, zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err } if moved { e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedRemote, zap.String("cid", contTree.CID.EncodeToString()), zap.String("treeID", contTree.TreeID), zap.String("from_shardID", sh.ID().String()), zap.String("to_node", nodePK), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) res.trEvacuated.Add(1) } else if prm.IgnoreErrors { res.trFailed.Add(1) e.log.Warn(logs.EngineShardsEvacuationFailedToMoveTree, zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID), zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField, zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx))) } else { e.log.Error(logs.EngineShardsEvacuationFailedToMoveTree, zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID), zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField, zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return fmt.Errorf("no remote nodes available to replicate tree '%s' of container %s", contTree.TreeID, contTree.CID) } } return nil } func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (bool, string, error) { if prm.TreeHandler == nil { return false, "", fmt.Errorf("failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID()) } return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh) } func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) (bool, string, error) { target, found, err := e.findShardToEvacuateTree(ctx, tree, shards, shardsToEvacuate) if err != nil { return false, "", err } if !found { return false, "", nil } const readBatchSize = 1000 source := make(chan *pilorama.Move, readBatchSize) ctx, cancel := context.WithCancel(ctx) defer cancel() var wg sync.WaitGroup wg.Add(1) var applyErr error go func() { defer wg.Done() applyErr = target.TreeApplyStream(ctx, tree.CID, tree.TreeID, source) if applyErr != nil { cancel() } }() var height uint64 for { op, err := sh.TreeGetOpLog(ctx, tree.CID, tree.TreeID, height) if err != nil { cancel() wg.Wait() close(source) // close after cancel to ctx.Done() hits first if prm.IgnoreErrors { return false, "", nil } return false, "", err } if op.Time == 0 { // completed get op log close(source) wg.Wait() if applyErr == nil { return true, target.ID().String(), nil } if prm.IgnoreErrors { return false, "", nil } return false, "", applyErr } select { case <-ctx.Done(): // apply stream failed or operation cancelled wg.Wait() if prm.IgnoreErrors { return false, "", nil } if applyErr != nil { return false, "", applyErr } return false, "", ctx.Err() case source <- &op: } height = op.Time + 1 } } // findShardToEvacuateTree returns first shard according HRW or first shard with tree exists. func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilorama.ContainerIDTreeID, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) (pooledShard, bool, error) { hrw.SortHasherSliceByValue(shards, hrw.StringHash(tree.CID.EncodeToString())) var result pooledShard var found bool for _, target := range shards { select { case <-ctx.Done(): return pooledShard{}, false, ctx.Err() default: } if _, ok := shardsToEvacuate[target.ID().String()]; ok { continue } if !target.PiloramaEnabled() || target.GetMode().ReadOnly() { continue } if !found { result = target found = true } exists, err := target.TreeExists(ctx, tree.CID, tree.TreeID) if err != nil { continue } if exists { return target, true, nil } } return result, found, nil } func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, error) { e.mtx.RLock() defer e.mtx.RUnlock() for i := range shardIDs { sh, ok := e.shards[shardIDs[i]] if !ok { return nil, errShardNotFound } if !sh.GetMode().ReadOnly() { return nil, ErrMustBeReadOnly } if prm.Scope.TreesOnly() && !sh.PiloramaEnabled() { return nil, fmt.Errorf("shard %s doesn't have pilorama enabled", sh.ID()) } } if len(e.shards)-len(shardIDs) < 1 && prm.ObjectsHandler == nil && prm.Scope.WithObjects() { return nil, errMustHaveTwoShards } if len(e.shards)-len(shardIDs) < 1 && prm.TreeHandler == nil && prm.Scope.WithTrees() { return nil, errMustHaveTwoShards } // We must have all shards, to have correct information about their // indexes in a sorted slice and set appropriate marks in the metabase. // Evacuated shard is skipped during put. shards := make([]pooledShard, 0, len(e.shards)) for id := range e.shards { shards = append(shards, pooledShard{ hashedShard: hashedShard(e.shards[id]), pool: e.shardPools[id], }) } return shards, nil } func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects") defer span.End() select { case <-ctx.Done(): return context.Cause(ctx) default: } shards := getShards() addr := objInfo.Address var getPrm shard.GetPrm getPrm.SetAddress(addr) getPrm.SkipEvacCheck(true) getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm) if err != nil { if prm.IgnoreErrors { res.objFailed.Add(1) return nil } e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err } evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res) if err != nil { return err } if evacuatedLocal { return nil } if prm.ObjectsHandler == nil { // Do not check ignoreErrors flag here because // ignoring errors on put make this command kinda useless. return fmt.Errorf("%w: %s", errPutShard, objInfo) } moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object()) if err != nil { e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err } if moved { res.objEvacuated.Add(1) } else if prm.IgnoreErrors { res.objFailed.Add(1) e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) } else { return fmt.Errorf("object %s was not replicated", addr) } return nil } func (e *StorageEngine) isNotRepOne(c *container.Container) bool { p := c.Value.PlacementPolicy() for i := range p.NumberOfReplicas() { if p.ReplicaDescriptor(i).NumberOfObjects() > 1 { return true } } return false } func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, ) (bool, error) { hrw.SortHasherSliceByValue(shards, hrw.StringHash(addr.EncodeToString())) for j := range shards { select { case <-ctx.Done(): return false, ctx.Err() default: } if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok { continue } switch e.putToShard(ctx, shards[j].hashedShard, shards[j].pool, addr, object).status { case putToShardSuccess: res.objEvacuated.Add(1) e.log.Debug(logs.EngineObjectIsMovedToAnotherShard, zap.Stringer("from", sh.ID()), zap.Stringer("to", shards[j].ID()), zap.Stringer("addr", addr), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return true, nil case putToShardExists, putToShardRemoved: res.objSkipped.Add(1) return true, nil default: continue } } return false, nil } func (e *StorageEngine) GetEvacuationState(ctx context.Context) (*EvacuationState, error) { select { case <-ctx.Done(): return nil, ctx.Err() default: } return e.evacuateLimiter.GetState(), nil } func (e *StorageEngine) EnqueRunningEvacuationStop(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() default: } return e.evacuateLimiter.CancelIfRunning() } func (e *StorageEngine) ResetEvacuationStatus(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() default: } return e.evacuateLimiter.ResetEvacuationStatus() } func (e *StorageEngine) ResetEvacuationStatusForShards() { e.mtx.RLock() defer e.mtx.RUnlock() for _, sh := range e.shards { sh.SetEvacuationInProgress(false) } }