package engine import ( "context" "errors" "fmt" "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/hrw" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) var ( ErrMustBeReadOnly = logicerr.New("shard must be in read-only mode") evacuationOperationLogField = zap.String("operation", "evacuation") ) // EvacuateShardPrm represents parameters for the EvacuateShard operation. type EvacuateShardPrm struct { shardID []*shard.ID handler func(context.Context, oid.Address, *objectSDK.Object) error ignoreErrors bool async bool } // EvacuateShardRes represents result of the EvacuateShard operation. type EvacuateShardRes struct { evacuated *atomic.Uint64 total *atomic.Uint64 failed *atomic.Uint64 skipped *atomic.Uint64 } // NewEvacuateShardRes creates new EvacuateShardRes instance. func NewEvacuateShardRes() *EvacuateShardRes { return &EvacuateShardRes{ evacuated: new(atomic.Uint64), total: new(atomic.Uint64), failed: new(atomic.Uint64), skipped: new(atomic.Uint64), } } // WithShardIDList sets shard ID. func (p *EvacuateShardPrm) WithShardIDList(id []*shard.ID) { p.shardID = id } // WithIgnoreErrors sets flag to ignore errors. func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) { p.ignoreErrors = ignore } // WithFaultHandler sets handler to call for objects which cannot be saved on other shards. func (p *EvacuateShardPrm) WithFaultHandler(f func(context.Context, oid.Address, *objectSDK.Object) error) { p.handler = f } // WithAsync sets flag to run evacuate async. func (p *EvacuateShardPrm) WithAsync(async bool) { p.async = async } // Evacuated returns amount of evacuated objects. // Objects for which handler returned no error are also assumed evacuated. func (p *EvacuateShardRes) Evacuated() uint64 { if p == nil { return 0 } return p.evacuated.Load() } // Total returns total count objects to evacuate. func (p *EvacuateShardRes) Total() uint64 { if p == nil { return 0 } return p.total.Load() } // Failed returns count of failed objects to evacuate. func (p *EvacuateShardRes) Failed() uint64 { if p == nil { return 0 } return p.failed.Load() } // Skipped returns count of skipped objects. func (p *EvacuateShardRes) Skipped() uint64 { if p == nil { return 0 } return p.skipped.Load() } // DeepCopy returns deep copy of result instance. func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes { if p == nil { return nil } res := &EvacuateShardRes{ evacuated: new(atomic.Uint64), total: new(atomic.Uint64), failed: new(atomic.Uint64), skipped: new(atomic.Uint64), } res.evacuated.Store(p.evacuated.Load()) res.total.Store(p.total.Load()) res.failed.Store(p.failed.Load()) res.skipped.Store(p.skipped.Load()) return res } const defaultEvacuateBatchSize = 100 type pooledShard struct { hashedShard pool util.WorkerPool } var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // Evacuate moves data from one shard to the others. // The shard being moved must be in read-only mode. func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) { select { case <-ctx.Done(): return nil, ctx.Err() default: } shardIDs := make([]string, len(prm.shardID)) for i := range prm.shardID { shardIDs[i] = prm.shardID[i].String() } ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate", trace.WithAttributes( attribute.StringSlice("shardIDs", shardIDs), attribute.Bool("async", prm.async), attribute.Bool("ignoreErrors", prm.ignoreErrors), )) defer span.End() shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil) if err != nil { return nil, err } shardsToEvacuate := make(map[string]*shard.Shard) for i := range shardIDs { for j := range shards { if shards[j].ID().String() == shardIDs[i] { shardsToEvacuate[shardIDs[i]] = shards[j].Shard } } } res := NewEvacuateShardRes() ctx = ctxOrBackground(ctx, prm.async) eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res) if err != nil { return nil, err } eg.Go(func() error { return e.evacuateShards(egCtx, shardIDs, prm, res, shards, weights, shardsToEvacuate) }) if prm.async { return nil, nil } return res, eg.Wait() } func ctxOrBackground(ctx context.Context, background bool) context.Context { if background { return context.Background() } return ctx } func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, ) error { var err error ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", trace.WithAttributes( attribute.StringSlice("shardIDs", shardIDs), attribute.Bool("async", prm.async), attribute.Bool("ignoreErrors", prm.ignoreErrors), )) defer func() { span.End() e.evacuateLimiter.Complete(err) }() e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res) if err != nil { e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err } for _, shardID := range shardIDs { if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil { e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField) return err } } e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, zap.Uint64("total", res.Total()), zap.Uint64("evacuated", res.Evacuated()), zap.Uint64("failed", res.Failed()), zap.Uint64("skipped", res.Skipped()), ) return nil } func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotalObjectsCount") defer span.End() for _, sh := range shardsToEvacuate { cnt, err := sh.LogicalObjectsCount(ctx) if err != nil { if errors.Is(err, shard.ErrDegradedMode) { continue } return err } res.total.Add(cnt) } return nil } func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard", trace.WithAttributes( attribute.String("shardID", shardID), )) defer span.End() var listPrm shard.ListWithCursorPrm listPrm.WithCount(defaultEvacuateBatchSize) sh := shardsToEvacuate[shardID] var c *meta.Cursor for { listPrm.WithCursor(c) // TODO (@fyrchik): #1731 this approach doesn't work in degraded modes // because ListWithCursor works only with the metabase. listRes, err := sh.ListWithCursor(ctx, listPrm) if err != nil { if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) { break } e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err } if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, weights, shardsToEvacuate); err != nil { return err } c = listRes.Cursor() } return nil } func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) ([]pooledShard, []float64, error) { e.mtx.RLock() defer e.mtx.RUnlock() for i := range shardIDs { sh, ok := e.shards[shardIDs[i]] if !ok { return nil, nil, errShardNotFound } if !sh.GetMode().ReadOnly() { return nil, nil, ErrMustBeReadOnly } } if len(e.shards)-len(shardIDs) < 1 && !handlerDefined { return nil, nil, errMustHaveTwoShards } // We must have all shards, to have correct information about their // indexes in a sorted slice and set appropriate marks in the metabase. // Evacuated shard is skipped during put. shards := make([]pooledShard, 0, len(e.shards)) for id := range e.shards { shards = append(shards, pooledShard{ hashedShard: hashedShard(e.shards[id]), pool: e.shardPools[id], }) } weights := make([]float64, 0, len(shards)) for i := range shards { weights = append(weights, e.shardWeight(shards[i].Shard)) } return shards, weights, nil } func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects", trace.WithAttributes( attribute.Int("objects_count", len(toEvacuate)), )) defer span.End() for i := range toEvacuate { select { case <-ctx.Done(): return ctx.Err() default: } addr := toEvacuate[i].Address var getPrm shard.GetPrm getPrm.SetAddress(addr) getRes, err := sh.Get(ctx, getPrm) if err != nil { if prm.ignoreErrors { res.failed.Add(1) continue } e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err } evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, weights, shardsToEvacuate, res) if err != nil { return err } if evacuatedLocal { continue } if prm.handler == nil { // Do not check ignoreErrors flag here because // ignoring errors on put make this command kinda useless. return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) } err = prm.handler(ctx, addr, getRes.Object()) if err != nil { e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err } res.evacuated.Add(1) } return nil } func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, ) (bool, error) { hrw.SortHasherSliceByWeightValue(shards, weights, hrw.StringHash(addr.EncodeToString())) for j := range shards { select { case <-ctx.Done(): return false, ctx.Err() default: } if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok { continue } putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object) if putDone || exists { if putDone { res.evacuated.Add(1) e.log.Debug(logs.EngineObjectIsMovedToAnotherShard, zap.Stringer("from", sh.ID()), zap.Stringer("to", shards[j].ID()), zap.Stringer("addr", addr), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) } if exists { res.skipped.Add(1) } return true, nil } } return false, nil } func (e *StorageEngine) GetEvacuationState(ctx context.Context) (*EvacuationState, error) { select { case <-ctx.Done(): return nil, ctx.Err() default: } return e.evacuateLimiter.GetState(), nil } func (e *StorageEngine) EnqueRunningEvacuationStop(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() default: } return e.evacuateLimiter.CancelIfRunning() }