From b3e9139629c4a7537cdf6adf57522ef719ef3307 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Wed, 18 Sep 2024 12:15:32 +0300 Subject: [PATCH] [#1356] engine: Evacuate object from shards concurrently Signed-off-by: Anton Nikiforov --- cmd/frostfs-cli/modules/control/evacuation.go | 15 +- cmd/frostfs-node/config.go | 7 + cmd/frostfs-node/config/engine/config.go | 22 ++ config/example/node.env | 2 + config/example/node.json | 2 + config/example/node.yaml | 2 + docs/storage-node-configuration.md | 14 +- pkg/local_object_storage/engine/engine.go | 20 ++ pkg/local_object_storage/engine/evacuate.go | 261 +++++++++++------- .../engine/evacuate_test.go | 44 ++- pkg/local_object_storage/metabase/list.go | 168 +++++++++++ pkg/local_object_storage/shard/list.go | 59 ++++ pkg/services/control/server/evacuate_async.go | 14 +- pkg/services/control/service.proto | 4 + pkg/services/control/service_frostfs.pb.go | Bin 404856 -> 406555 bytes 15 files changed, 519 insertions(+), 115 deletions(-) diff --git a/cmd/frostfs-cli/modules/control/evacuation.go b/cmd/frostfs-cli/modules/control/evacuation.go index 6fa5ed75c..04a67e5b5 100644 --- a/cmd/frostfs-cli/modules/control/evacuation.go +++ b/cmd/frostfs-cli/modules/control/evacuation.go @@ -21,6 +21,9 @@ const ( noProgressFlag = "no-progress" scopeFlag = "scope" + containerWorkerCountFlag = "container-worker-count" + objectWorkerCountFlag = "object-worker-count" + scopeAll = "all" scopeObjects = "objects" scopeTrees = "trees" @@ -64,12 +67,16 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) { pk := key.Get(cmd) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) + containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag) + objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag) req := &control.StartShardEvacuationRequest{ Body: &control.StartShardEvacuationRequest_Body{ - Shard_ID: getShardIDList(cmd), - IgnoreErrors: ignoreErrors, - Scope: getEvacuationScope(cmd), + Shard_ID: getShardIDList(cmd), + IgnoreErrors: ignoreErrors, + Scope: getEvacuationScope(cmd), + ContainerWorkerCount: containerWorkerCount, + ObjectWorkerCount: objectWorkerCount, }, } @@ -371,6 +378,8 @@ func initControlStartEvacuationShardCmd() { flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll)) flags.Bool(awaitFlag, false, "Block execution until evacuation is completed") flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) + flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers") + flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers") startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) } diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 63f410b89..1780f41d0 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -110,6 +110,9 @@ type applicationConfiguration struct { shardPoolSize uint32 shards []shardCfg lowMem bool + + evacuationObjectWorkerCount uint32 + evacuationContainerWorkerCount uint32 } // if need to run node in compatibility with other versions mode @@ -228,6 +231,8 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c) + a.EngineCfg.evacuationObjectWorkerCount = engineconfig.EngineEvacuationObjectWorkerCount(c) + a.EngineCfg.evacuationContainerWorkerCount = engineconfig.EngineEvacuationContainerWorkerCount(c) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) }) } @@ -829,6 +834,8 @@ func (c *cfg) engineOpts() []engine.Option { engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithLogger(c.log), engine.WithLowMemoryConsumption(c.EngineCfg.lowMem), + engine.WithEvacuationObjectWorkerCount(c.EngineCfg.evacuationObjectWorkerCount), + engine.WithEvacuationContainerWorkerCount(c.EngineCfg.evacuationContainerWorkerCount), ) if c.metricsCollector != nil { diff --git a/cmd/frostfs-node/config/engine/config.go b/cmd/frostfs-node/config/engine/config.go index c944d1c58..e5fe638b1 100644 --- a/cmd/frostfs-node/config/engine/config.go +++ b/cmd/frostfs-node/config/engine/config.go @@ -15,6 +15,12 @@ const ( // ShardPoolSizeDefault is a default value of routine pool size per-shard to // process object PUT operations in a storage engine. ShardPoolSizeDefault = 20 + // EvacuationContainerWorkerCountDefault is a default value of the count of + // concurrent container evacuation workers. + EvacuationContainerWorkerCountDefault = 10 + // EvacuationObjectWorkerCountDefault is a default value of the count of + // concurrent object evacuation workers. + EvacuationObjectWorkerCountDefault = 10 ) // ErrNoShardConfigured is returned when at least 1 shard is required but none are found. @@ -88,3 +94,19 @@ func ShardErrorThreshold(c *config.Config) uint32 { func EngineLowMemoryConsumption(c *config.Config) bool { return config.BoolSafe(c.Sub(subsection), "low_mem") } + +// EngineEvacuationObjectWorkerCount returns value of "evacuation_object_worker_count" config parameter from "storage" section. +func EngineEvacuationObjectWorkerCount(c *config.Config) uint32 { + if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_worker_count"); v > 0 { + return v + } + return EvacuationObjectWorkerCountDefault +} + +// EngineEvacuationContainerWorkerCount returns value of "evacuation_container_worker_count" config parameter from "storage" section. +func EngineEvacuationContainerWorkerCount(c *config.Config) uint32 { + if v := config.Uint32Safe(c.Sub(subsection), "evacuation_container_worker_count"); v > 0 { + return v + } + return EvacuationContainerWorkerCountDefault +} diff --git a/config/example/node.env b/config/example/node.env index 6618a981a..6f43ce1bc 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -91,6 +91,8 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 # Storage engine section FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100 +EVACUATION_OBJECT_WORKER_COUNT=10 +EVACUATION_CONTAINER_WORKER_COUNT=10 ## 0 shard ### Flag to refill Metabase from BlobStor FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false diff --git a/config/example/node.json b/config/example/node.json index 0d100ed80..7f252e4fb 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -136,6 +136,8 @@ "storage": { "shard_pool_size": 15, "shard_ro_error_threshold": 100, + "evacuation_container_worker_count": 10, + "evacuation_object_worker_count": 10, "shard": { "0": { "mode": "read-only", diff --git a/config/example/node.yaml b/config/example/node.yaml index 86be35ba8..db0f380da 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -118,6 +118,8 @@ storage: # note: shard configuration can be omitted for relay node (see `node.relay`) shard_pool_size: 15 # size of per-shard worker pools used for PUT operations shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors) + evacuation_container_worker_count: 10 # the count of the concurrent container evacuation workers + evacuation_object_worker_count: 10 # the count of the concurrent object evacuation workers shard: default: # section with the default shard parameters diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index c74695e2b..519ffab0a 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -167,12 +167,14 @@ morph: Local storage engine configuration. -| Parameter | Type | Default value | Description | -|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------| -| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. | -| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. | -| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. | -| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. | +| Parameter | Type | Default value | Description | +|-------------------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------| +| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. | +| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. | +| `evacuation_container_worker_count` | `int` | `10` | The count of the concurrent container evacuation workers. | +| `evacuation_object_worker_count` | `int` | `10` | The count of the concurrent object evacuation workers. | +| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. | +| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. | ## `shard` subsection diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 5e883a641..4c78ee278 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -213,12 +213,18 @@ type cfg struct { lowMem bool containerSource atomic.Pointer[containerSource] + + evacuationObjectWorkerCount uint32 + evacuationContainerWorkerCount uint32 } func defaultCfg() *cfg { res := &cfg{ log: &logger.Logger{Logger: zap.L()}, shardPoolSize: 20, + + evacuationObjectWorkerCount: 10, + evacuationContainerWorkerCount: 10, } res.containerSource.Store(&containerSource{}) return res @@ -277,6 +283,20 @@ func WithLowMemoryConsumption(lowMemCons bool) Option { } } +// WithEvacuationObjectWorkerCount returns an option to set the count of the concurrent object evacuation workers per container. +func WithEvacuationObjectWorkerCount(count uint32) Option { + return func(c *cfg) { + c.evacuationObjectWorkerCount = count + } +} + +// WithEvacuationContainerWorkerCount returns an option to set the count of the concurrent container evacuation workers for each shard. +func WithEvacuationContainerWorkerCount(count uint32) Option { + return func(c *cfg) { + c.evacuationContainerWorkerCount = count + } +} + // SetContainerSource sets container source. func (e *StorageEngine) SetContainerSource(cs container.Source) { e.containerSource.Store(&containerSource{cs: cs}) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 7bef6edfb..65b7abe38 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -10,7 +10,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -24,6 +23,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var ( @@ -79,6 +79,9 @@ type EvacuateShardPrm struct { IgnoreErrors bool Async bool Scope EvacuateScope + + ContainerWorkerCount uint32 + ObjectWorkerCount uint32 } // EvacuateShardRes represents result of the EvacuateShard operation. @@ -189,8 +192,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes { return res } -const defaultEvacuateBatchSize = 100 - type pooledShard struct { hashedShard pool util.WorkerPool @@ -242,8 +243,16 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev return nil, err } + var mtx sync.RWMutex + copyShards := func() []pooledShard { + mtx.RLock() + defer mtx.RUnlock() + t := make([]pooledShard, len(shards)) + copy(t, shards) + return t + } eg.Go(func() error { - return e.evacuateShards(egCtx, shardIDs, prm, res, shards, shardsToEvacuate) + return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate) }) if prm.Async { @@ -261,7 +270,7 @@ func ctxOrBackground(ctx context.Context, background bool) context.Context { } func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { var err error ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", @@ -287,13 +296,39 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p return err } - for _, shardID := range shardIDs { - if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { - e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) - return err + ctx, cancel, egShard, egContainer, egObject := e.createErrorGroupsForEvacuation(ctx, prm) + continueLoop := true + for i := 0; continueLoop && i < len(shardIDs); i++ { + select { + case <-ctx.Done(): + continueLoop = false + default: + egShard.Go(func() error { + err := e.evacuateShard(ctx, cancel, shardIDs[i], prm, res, shards, shardsToEvacuate, egContainer, egObject) + if err != nil { + cancel(err) + } + return err + }) } } + err = egShard.Wait() + if err != nil { + err = fmt.Errorf("shard error: %w", err) + } + errContainer := egContainer.Wait() + errObject := egObject.Wait() + if errContainer != nil { + err = errors.Join(err, fmt.Errorf("container error: %w", errContainer)) + } + if errObject != nil { + err = errors.Join(err, fmt.Errorf("object error: %w", errObject)) + } + if err != nil { + e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) + return err + } e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs), @@ -309,6 +344,27 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p return nil } +func (e *StorageEngine) createErrorGroupsForEvacuation(ctx context.Context, prm EvacuateShardPrm) ( + context.Context, context.CancelCauseFunc, *errgroup.Group, *errgroup.Group, *errgroup.Group, +) { + operationCtx, cancel := context.WithCancelCause(ctx) + egObject, _ := errgroup.WithContext(operationCtx) + objectWorkerCount := prm.ObjectWorkerCount + if objectWorkerCount == 0 { + objectWorkerCount = e.cfg.evacuationObjectWorkerCount + } + egObject.SetLimit(int(objectWorkerCount)) + egContainer, _ := errgroup.WithContext(operationCtx) + containerWorkerCount := prm.ContainerWorkerCount + if containerWorkerCount == 0 { + containerWorkerCount = e.cfg.evacuationContainerWorkerCount + } + egContainer.SetLimit(int(containerWorkerCount)) + egShard, _ := errgroup.WithContext(operationCtx) + + return operationCtx, cancel, egShard, egContainer, egObject +} + func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals") defer span.End() @@ -335,8 +391,9 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha return nil } -func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, +func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, + shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, + egContainer *errgroup.Group, egObject *errgroup.Group, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard", trace.WithAttributes( @@ -345,11 +402,10 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E defer span.End() if prm.Scope.WithObjects() { - if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { + if err := e.evacuateShardObjects(ctx, cancel, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil { return err } } - if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() { if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { return err @@ -359,44 +415,60 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E return nil } -func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, +func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, + shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, + egContainer *errgroup.Group, egObject *errgroup.Group, ) error { - var listPrm shard.ListWithCursorPrm - listPrm.WithCount(defaultEvacuateBatchSize) - sh := shardsToEvacuate[shardID] - sh.SetEvacuationInProgress(true) - - var c *meta.Cursor - for { - listPrm.WithCursor(c) - - // TODO (@fyrchik): #1731 this approach doesn't work in degraded modes - // because ListWithCursor works only with the metabase. - listRes, err := sh.ListWithCursor(ctx, listPrm) - if err != nil { - if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) { - break + var cntPrm shard.IterateOverContainersPrm + cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error { + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } + egContainer.Go(func() error { + var objPrm shard.IterateOverObjectsInContainerPrm + objPrm.BucketName = name + objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error { + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } + egObject.Go(func() error { + err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate) + if err != nil { + cancel(err) + } + return err + }) + return nil + } + err := sh.IterateOverObjectsInContainer(ctx, objPrm) + if err != nil { + cancel(err) } - e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err - } - - if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil { - return err - } - - c = listRes.Cursor() + }) + return nil } - return nil + + sh.SetEvacuationInProgress(true) + err := sh.IterateOverContainers(ctx, cntPrm) + if err != nil { + cancel(err) + e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } + return err } func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { sh := shardsToEvacuate[shardID] + shards := getShards() var listPrm pilorama.TreeListTreesPrm first := true @@ -637,68 +709,65 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) return shards, nil } -func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, +func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, + getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { - ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects", - trace.WithAttributes( - attribute.Int("objects_count", len(toEvacuate)), - )) + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects") defer span.End() - for i := range toEvacuate { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - addr := toEvacuate[i].Address + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } - var getPrm shard.GetPrm - getPrm.SetAddress(addr) - getPrm.SkipEvacCheck(true) + shards := getShards() + addr := objInfo.Address - getRes, err := sh.Get(ctx, getPrm) - if err != nil { - if prm.IgnoreErrors { - res.objFailed.Add(1) - continue - } - e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - return err - } + var getPrm shard.GetPrm + getPrm.SetAddress(addr) + getPrm.SkipEvacCheck(true) - evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, shardsToEvacuate, res) - if err != nil { - return err - } - - if evacuatedLocal { - continue - } - - if prm.ObjectsHandler == nil { - // Do not check ignoreErrors flag here because - // ignoring errors on put make this command kinda useless. - return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) - } - - moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object()) - if err != nil { - e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - return err - } - if moved { - res.objEvacuated.Add(1) - } else if prm.IgnoreErrors { + getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm) + if err != nil { + if prm.IgnoreErrors { res.objFailed.Add(1) - e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - } else { - return fmt.Errorf("object %s was not replicated", addr) + return nil } + e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + return err + } + + evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res) + if err != nil { + return err + } + + if evacuatedLocal { + return nil + } + + if prm.ObjectsHandler == nil { + // Do not check ignoreErrors flag here because + // ignoring errors on put make this command kinda useless. + return fmt.Errorf("%w: %s", errPutShard, objInfo) + } + + moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object()) + if err != nil { + e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + return err + } + if moved { + res.objEvacuated.Add(1) + } else if prm.IgnoreErrors { + res.objFailed.Add(1) + e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } else { + return fmt.Errorf("object %s was not replicated", addr) } return nil } diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 28529fab9..3635438fb 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -6,6 +6,8 @@ import ( "fmt" "path/filepath" "strconv" + "sync" + "sync/atomic" "testing" "time" @@ -29,7 +31,8 @@ import ( func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { dir := t.TempDir() - te := testNewEngine(t). + te := testNewEngine(t, + WithEvacuationContainerWorkerCount(2), WithEvacuationObjectWorkerCount(2)). setShardsNumOpts(t, shardNum, func(id int) []shard.Option { return []shard.Option{ shard.WithLogger(test.NewLogger(t)), @@ -174,13 +177,13 @@ func TestEvacuateObjectsNetwork(t *testing.T) { errReplication := errors.New("handler error") acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) { - var n uint64 + var n atomic.Uint64 return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { - if n == max { + if n.Load() == max { return false, errReplication } - n++ + n.Add(1) for i := range objects { if addr == objectCore.AddressOf(objects[i]) { require.Equal(t, objects[i], obj) @@ -314,6 +317,36 @@ func TestEvacuateCancellation(t *testing.T) { require.Equal(t, uint64(0), res.ObjectsEvacuated()) } +func TestEvacuateCancellationByError(t *testing.T) { + t.Parallel() + e, ids, _ := newEngineEvacuate(t, 2, 10) + defer func() { + require.NoError(t, e.Close(context.Background())) + }() + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + + var prm EvacuateShardPrm + prm.ShardID = ids[1:2] + var once atomic.Bool + prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) { + var err error + flag := true + if once.CompareAndSwap(false, true) { + err = errors.New("test error") + flag = false + } + return flag, err + } + prm.Scope = EvacuateScopeObjects + prm.ObjectWorkerCount = 2 + prm.ContainerWorkerCount = 2 + + _, err := e.Evacuate(context.Background(), prm) + require.ErrorContains(t, err, "test error") +} + func TestEvacuateSingleProcess(t *testing.T) { e, ids, _ := newEngineEvacuate(t, 2, 3) defer func() { @@ -531,6 +564,7 @@ func TestEvacuateTreesRemote(t *testing.T) { require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + mutex := sync.Mutex{} evacuatedTreeOps := make(map[string][]*pilorama.Move) var prm EvacuateShardPrm prm.ShardID = ids @@ -545,7 +579,9 @@ func TestEvacuateTreesRemote(t *testing.T) { if op.Time == 0 { return true, "", nil } + mutex.Lock() evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op) + mutex.Unlock() height = op.Time + 1 } } diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index b4326a92c..3a9cea5b4 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -61,6 +61,20 @@ func (l ListRes) Cursor() *Cursor { return l.cursor } +// IterateOverContainersPrm contains parameters for IterateOverContainers operation. +type IterateOverContainersPrm struct { + // Handler function executed upon containers in db. + Handler func(context.Context, []byte, cid.ID) error +} + +// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation. +type IterateOverObjectsInContainerPrm struct { + // BucketName container's bucket name + BucketName []byte + // Handler function executed upon objects in db. + Handler func(context.Context, *objectcore.Info) error +} + // ListWithCursor lists physical objects available in metabase starting from // cursor. Includes objects of all types. Does not include inhumed objects. // Use cursor value from response for consecutive requests. @@ -259,3 +273,157 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) return rawID, name[0] } + +// IterateOverContainers lists physical containers available in metabase starting from first. +func (db *DB) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("IterateOverContainers", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverContainers", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + return db.iterateOverContainers(ctx, tx, prm) + }) + success = err == nil + return metaerr.Wrap(err) +} + +func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm IterateOverContainersPrm) error { + c := tx.Cursor() + name, _ := c.First() + + var containerID cid.ID + for ; name != nil; name, _ = c.Next() { + cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name) + if cidRaw == nil { + continue + } + if prefix != primaryPrefix && prefix != lockersPrefix && prefix != tombstonePrefix { + continue + } + + bkt := tx.Bucket(name) + if bkt != nil { + bktName := make([]byte, len(name)) + copy(bktName, name) + var cnt cid.ID + copy(cnt[:], containerID[:]) + err := prm.Handler(ctx, bktName, cnt) + if err != nil { + return err + } + } + } + + return nil +} + +// IterateOverObjectsInContainer iterate over physical objects available in metabase starting from first. +func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("IterateOverObjectsInContainer", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverObjectsInContainer", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + return db.iterateOverObjectsInContainer(ctx, tx, prm) + }) + success = err == nil + return metaerr.Wrap(err) +} + +func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, prm IterateOverObjectsInContainerPrm) error { + bkt := tx.Bucket(prm.BucketName) + graveyardBkt := tx.Bucket(graveyardBucketName) + garbageBkt := tx.Bucket(garbageBucketName) + c := bkt.Cursor() + k, v := c.First() + + var containerID cid.ID + cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName) + if cidRaw == nil { + return nil + } + + var objType objectSDK.Type + + switch prefix { + case primaryPrefix: + objType = objectSDK.TypeRegular + case lockersPrefix: + objType = objectSDK.TypeLock + case tombstonePrefix: + objType = objectSDK.TypeTombstone + default: + return nil + } + + for ; k != nil; k, v = c.Next() { + var obj oid.ID + if err := obj.Decode(k); err != nil { + break + } + + if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { + continue + } + + var isLinkingObj bool + var ecInfo *objectcore.ECInfo + if objType == objectSDK.TypeRegular { + var o objectSDK.Object + if err := o.Unmarshal(v); err != nil { + return err + } + isLinkingObj = isLinkObject(&o) + ecHeader := o.ECHeader() + if ecHeader != nil { + ecInfo = &objectcore.ECInfo{ + ParentID: ecHeader.Parent(), + Index: ecHeader.Index(), + Total: ecHeader.Total(), + } + } + } + + var a oid.Address + a.SetContainer(containerID) + a.SetObject(obj) + objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo} + err := prm.Handler(ctx, &objInfo) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index 08ea81a0c..7ea0df2bd 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -34,6 +34,20 @@ func (r ListContainersRes) Containers() []cid.ID { return r.containers } +// IterateOverContainersPrm contains parameters for IterateOverContainers operation. +type IterateOverContainersPrm struct { + // Handler function executed upon containers in db. + Handler func(context.Context, []byte, cid.ID) error +} + +// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation. +type IterateOverObjectsInContainerPrm struct { + // BucketName container's bucket name + BucketName []byte + // Handler function executed upon containers in db. + Handler func(context.Context, *objectcore.Info) error +} + // ListWithCursorPrm contains parameters for ListWithCursor operation. type ListWithCursorPrm struct { count uint32 @@ -164,3 +178,48 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List cursor: res.Cursor(), }, nil } + +// IterateOverContainers lists physical containers presented in shard. +func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error { + _, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverContainers", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + if s.GetMode().NoMetabase() { + return ErrDegradedMode + } + + var metaPrm meta.IterateOverContainersPrm + metaPrm.Handler = prm.Handler + err := s.metaBase.IterateOverContainers(ctx, metaPrm) + if err != nil { + return fmt.Errorf("could not iterate over containers: %w", err) + } + + return nil +} + +// IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name. +func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error { + _, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + if s.GetMode().NoMetabase() { + return ErrDegradedMode + } + + var metaPrm meta.IterateOverObjectsInContainerPrm + metaPrm.BucketName = prm.BucketName + metaPrm.Handler = prm.Handler + err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm) + if err != nil { + return fmt.Errorf("could not iterate over objects: %w", err) + } + + return nil +} diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index aacebe9e3..bdc6f7c38 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -23,12 +23,14 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha } prm := engine.EvacuateShardPrm{ - ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), - IgnoreErrors: req.GetBody().GetIgnoreErrors(), - ObjectsHandler: s.replicateObject, - TreeHandler: s.replicateTree, - Async: true, - Scope: engine.EvacuateScope(req.GetBody().GetScope()), + ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), + IgnoreErrors: req.GetBody().GetIgnoreErrors(), + ObjectsHandler: s.replicateObject, + TreeHandler: s.replicateTree, + Async: true, + Scope: engine.EvacuateScope(req.GetBody().GetScope()), + ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(), + ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(), } _, err = s.s.Evacuate(ctx, prm) diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 04994328a..88a06de22 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -394,6 +394,10 @@ message StartShardEvacuationRequest { bool ignore_errors = 2; // Evacuation scope. uint32 scope = 3; + // Count of concurrent container evacuation workers. + uint32 container_worker_count = 4; + // Count of concurrent object evacuation workers. + uint32 object_worker_count = 5; } Body body = 1; diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index 019cac290b7733eb3572ae9d4d24009eab31115b..e92a8acd1f75be83532662057f0bd05a9d051fbb 100644 GIT binary patch delta 1264 zcmZ`(Ur19?80Vb3vvF_QTrN?E?Km?uZL@L^GQr`4MU+B;5EZ-46&l^Uw(FmY;S5pe zK}x~zp@*=bu!rg)7(qe!*vl#-NSZF82%?@M5=1)pUW>Eqx^N!O$M^ewzwdXx@BUt@ zy#2K@J&vB@Q%QKeNC>=LB)i~S29-Lf`N3Ec`{6+n(^0WgRW!L@i6wf~#Kl;mQyo+^ zI@qsh2M@61$Ust6+I-O>Nxq0AcAp!FMKvDDFjlZ?O=8HDBn`<4+5neEP#pw3b`i1@ z*aa=q*bd$i!45scLLE%q!|vG>-e!a{5e@%BP`-wDK=wUxRJAtK5q~lLP_~A58ab|@ zp2N!o`Vqw-KZbqKeaQ9<((|~2QIx=wVZ2@?_Mb0a6IwnHPwRgf8znI%nP8qwchf*t zY|&vD4VNF{INZF>)ONJlUckx}_UIlEc)mp5WpY=s4^>CyWQ?}9(LjJ&nuYiZik23< zAG}#XZlI&c(ab+fJMEJ-SrSE~SkNfO3)L&h0X|~r^Rd1}y)#iPbkS5FUa;+%%@GMA zGv?blz?&lx?)vBsX@-|MQqILn50f+S;})*gJ9WT~6#fof(|EUe%`OvrnRO!J(=yrX zv_xV5GO6e4sv&F-T5=SvVQ6z88|gKPbz!m4Y@}lvHyQNKu9(KV0Q>T&$;n3;z(O26 zQHU{0T4^mM6m6z9bYyc3!$KN1e*U#FeMQ&~qi1d75bz?B=~+M&!PGIf!bit&2zoaJ zuWQRX0$L^h@*=&>)XMcvwQvKCwltV=vkQQ6o12wsvMsl3pSMvHqphNnM%v}AJrS21|U%9!$s7U<=+ zZi0J8m{2NiD|+r(4D~Ge6#i*L{6vVMQXs$bUbw}8z0*;yi$O@U-IBZ{F!dHy1pjo3nc(T-uwRs+bv`fR9RiaIXryDl? zCl6_QfvyOV1W#&4-V>1DW{YrF+a|~ABeNr3HQqKBS@0&XaV*RU5l9(wmclX_2nJnR ISmv)6zw;D+bpQYW