From 51e64c3101c5cf9d272d9cc63e6331253e9aae8f Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Wed, 18 Sep 2024 12:15:32 +0300 Subject: [PATCH 1/3] [#1356] engine: Evacuate object from shards concurrently Signed-off-by: Anton Nikiforov --- cmd/frostfs-cli/modules/control/evacuation.go | 15 +- cmd/frostfs-node/config.go | 7 + cmd/frostfs-node/config/engine/config.go | 22 ++ config/example/node.env | 2 + config/example/node.json | 2 + config/example/node.yaml | 2 + docs/storage-node-configuration.md | 14 +- pkg/local_object_storage/engine/engine.go | 20 ++ pkg/local_object_storage/engine/evacuate.go | 240 ++++++++++-------- .../engine/evacuate_test.go | 14 +- pkg/local_object_storage/metabase/list.go | 168 ++++++++++++ pkg/local_object_storage/shard/list.go | 59 +++++ pkg/services/control/server/evacuate_async.go | 14 +- pkg/services/control/service.proto | 4 + pkg/services/control/service_frostfs.pb.go | Bin 404856 -> 406555 bytes 15 files changed, 463 insertions(+), 120 deletions(-) diff --git a/cmd/frostfs-cli/modules/control/evacuation.go b/cmd/frostfs-cli/modules/control/evacuation.go index 6fa5ed75c..04a67e5b5 100644 --- a/cmd/frostfs-cli/modules/control/evacuation.go +++ b/cmd/frostfs-cli/modules/control/evacuation.go @@ -21,6 +21,9 @@ const ( noProgressFlag = "no-progress" scopeFlag = "scope" + containerWorkerCountFlag = "container-worker-count" + objectWorkerCountFlag = "object-worker-count" + scopeAll = "all" scopeObjects = "objects" scopeTrees = "trees" @@ -64,12 +67,16 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) { pk := key.Get(cmd) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) + containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag) + objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag) req := &control.StartShardEvacuationRequest{ Body: &control.StartShardEvacuationRequest_Body{ - Shard_ID: getShardIDList(cmd), - IgnoreErrors: ignoreErrors, - Scope: getEvacuationScope(cmd), + Shard_ID: getShardIDList(cmd), + IgnoreErrors: ignoreErrors, + Scope: getEvacuationScope(cmd), + ContainerWorkerCount: containerWorkerCount, + ObjectWorkerCount: objectWorkerCount, }, } @@ -371,6 +378,8 @@ func initControlStartEvacuationShardCmd() { flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll)) flags.Bool(awaitFlag, false, "Block execution until evacuation is completed") flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) + flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers") + flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers") startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) } diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 63f410b89..1780f41d0 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -110,6 +110,9 @@ type applicationConfiguration struct { shardPoolSize uint32 shards []shardCfg lowMem bool + + evacuationObjectWorkerCount uint32 + evacuationContainerWorkerCount uint32 } // if need to run node in compatibility with other versions mode @@ -228,6 +231,8 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c) + a.EngineCfg.evacuationObjectWorkerCount = engineconfig.EngineEvacuationObjectWorkerCount(c) + a.EngineCfg.evacuationContainerWorkerCount = engineconfig.EngineEvacuationContainerWorkerCount(c) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) }) } @@ -829,6 +834,8 @@ func (c *cfg) engineOpts() []engine.Option { engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithLogger(c.log), engine.WithLowMemoryConsumption(c.EngineCfg.lowMem), + engine.WithEvacuationObjectWorkerCount(c.EngineCfg.evacuationObjectWorkerCount), + engine.WithEvacuationContainerWorkerCount(c.EngineCfg.evacuationContainerWorkerCount), ) if c.metricsCollector != nil { diff --git a/cmd/frostfs-node/config/engine/config.go b/cmd/frostfs-node/config/engine/config.go index c944d1c58..e5fe638b1 100644 --- a/cmd/frostfs-node/config/engine/config.go +++ b/cmd/frostfs-node/config/engine/config.go @@ -15,6 +15,12 @@ const ( // ShardPoolSizeDefault is a default value of routine pool size per-shard to // process object PUT operations in a storage engine. ShardPoolSizeDefault = 20 + // EvacuationContainerWorkerCountDefault is a default value of the count of + // concurrent container evacuation workers. + EvacuationContainerWorkerCountDefault = 10 + // EvacuationObjectWorkerCountDefault is a default value of the count of + // concurrent object evacuation workers. + EvacuationObjectWorkerCountDefault = 10 ) // ErrNoShardConfigured is returned when at least 1 shard is required but none are found. @@ -88,3 +94,19 @@ func ShardErrorThreshold(c *config.Config) uint32 { func EngineLowMemoryConsumption(c *config.Config) bool { return config.BoolSafe(c.Sub(subsection), "low_mem") } + +// EngineEvacuationObjectWorkerCount returns value of "evacuation_object_worker_count" config parameter from "storage" section. +func EngineEvacuationObjectWorkerCount(c *config.Config) uint32 { + if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_worker_count"); v > 0 { + return v + } + return EvacuationObjectWorkerCountDefault +} + +// EngineEvacuationContainerWorkerCount returns value of "evacuation_container_worker_count" config parameter from "storage" section. +func EngineEvacuationContainerWorkerCount(c *config.Config) uint32 { + if v := config.Uint32Safe(c.Sub(subsection), "evacuation_container_worker_count"); v > 0 { + return v + } + return EvacuationContainerWorkerCountDefault +} diff --git a/config/example/node.env b/config/example/node.env index 6618a981a..6f43ce1bc 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -91,6 +91,8 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 # Storage engine section FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100 +EVACUATION_OBJECT_WORKER_COUNT=10 +EVACUATION_CONTAINER_WORKER_COUNT=10 ## 0 shard ### Flag to refill Metabase from BlobStor FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false diff --git a/config/example/node.json b/config/example/node.json index 0d100ed80..7f252e4fb 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -136,6 +136,8 @@ "storage": { "shard_pool_size": 15, "shard_ro_error_threshold": 100, + "evacuation_container_worker_count": 10, + "evacuation_object_worker_count": 10, "shard": { "0": { "mode": "read-only", diff --git a/config/example/node.yaml b/config/example/node.yaml index 86be35ba8..db0f380da 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -118,6 +118,8 @@ storage: # note: shard configuration can be omitted for relay node (see `node.relay`) shard_pool_size: 15 # size of per-shard worker pools used for PUT operations shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors) + evacuation_container_worker_count: 10 # the count of the concurrent container evacuation workers + evacuation_object_worker_count: 10 # the count of the concurrent object evacuation workers shard: default: # section with the default shard parameters diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index c74695e2b..519ffab0a 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -167,12 +167,14 @@ morph: Local storage engine configuration. -| Parameter | Type | Default value | Description | -|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------| -| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. | -| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. | -| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. | -| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. | +| Parameter | Type | Default value | Description | +|-------------------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------| +| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. | +| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. | +| `evacuation_container_worker_count` | `int` | `10` | The count of the concurrent container evacuation workers. | +| `evacuation_object_worker_count` | `int` | `10` | The count of the concurrent object evacuation workers. | +| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. | +| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. | ## `shard` subsection diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 5e883a641..4c78ee278 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -213,12 +213,18 @@ type cfg struct { lowMem bool containerSource atomic.Pointer[containerSource] + + evacuationObjectWorkerCount uint32 + evacuationContainerWorkerCount uint32 } func defaultCfg() *cfg { res := &cfg{ log: &logger.Logger{Logger: zap.L()}, shardPoolSize: 20, + + evacuationObjectWorkerCount: 10, + evacuationContainerWorkerCount: 10, } res.containerSource.Store(&containerSource{}) return res @@ -277,6 +283,20 @@ func WithLowMemoryConsumption(lowMemCons bool) Option { } } +// WithEvacuationObjectWorkerCount returns an option to set the count of the concurrent object evacuation workers per container. +func WithEvacuationObjectWorkerCount(count uint32) Option { + return func(c *cfg) { + c.evacuationObjectWorkerCount = count + } +} + +// WithEvacuationContainerWorkerCount returns an option to set the count of the concurrent container evacuation workers for each shard. +func WithEvacuationContainerWorkerCount(count uint32) Option { + return func(c *cfg) { + c.evacuationContainerWorkerCount = count + } +} + // SetContainerSource sets container source. func (e *StorageEngine) SetContainerSource(cs container.Source) { e.containerSource.Store(&containerSource{cs: cs}) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 7bef6edfb..ec731e914 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -10,7 +10,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -24,6 +23,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var ( @@ -79,6 +79,9 @@ type EvacuateShardPrm struct { IgnoreErrors bool Async bool Scope EvacuateScope + + ContainerWorkerCount uint32 + ObjectWorkerCount uint32 } // EvacuateShardRes represents result of the EvacuateShard operation. @@ -189,8 +192,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes { return res } -const defaultEvacuateBatchSize = 100 - type pooledShard struct { hashedShard pool util.WorkerPool @@ -242,8 +243,16 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev return nil, err } + var mtx sync.RWMutex + copyShards := func() []pooledShard { + mtx.RLock() + defer mtx.RUnlock() + t := make([]pooledShard, len(shards)) + copy(t, shards) + return t + } eg.Go(func() error { - return e.evacuateShards(egCtx, shardIDs, prm, res, shards, shardsToEvacuate) + return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate) }) if prm.Async { @@ -261,7 +270,7 @@ func ctxOrBackground(ctx context.Context, background bool) context.Context { } func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { var err error ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", @@ -287,12 +296,46 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p return err } + egShard, _ := errgroup.WithContext(ctx) + + egContainer, _ := errgroup.WithContext(ctx) + containerWorkerCount := prm.ContainerWorkerCount + if containerWorkerCount == 0 { + containerWorkerCount = e.cfg.evacuationContainerWorkerCount + } + egContainer.SetLimit(int(containerWorkerCount)) + + egObject, _ := errgroup.WithContext(ctx) + objectWorkerCount := prm.ObjectWorkerCount + if objectWorkerCount == 0 { + objectWorkerCount = e.cfg.evacuationObjectWorkerCount + } + egObject.SetLimit(int(objectWorkerCount)) + for _, shardID := range shardIDs { - if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { - e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) - return err - } + egShard.Go(func() error { + if err := e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil { + e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) + return err + } + return nil + }) + } + errShard := egShard.Wait() + errContainer := egContainer.Wait() + errObject := egObject.Wait() + if errShard != nil { + err = errShard + return errShard + } + if errContainer != nil { + err = errContainer + return errContainer + } + if errObject != nil { + err = errObject + return errObject } e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, @@ -336,7 +379,8 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha } func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, + egContainer *errgroup.Group, egObject *errgroup.Group, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard", trace.WithAttributes( @@ -344,59 +388,56 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E )) defer span.End() + eg, egCtx := errgroup.WithContext(ctx) if prm.Scope.WithObjects() { - if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { - return err - } + eg.Go(func() error { + return e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject) + }) } - if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() { - if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { - return err - } + eg.Go(func() error { + return e.evacuateShardTrees(egCtx, shardID, prm, res, shards, shardsToEvacuate) + }) } - return nil + return eg.Wait() } func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, + egContainer *errgroup.Group, egObject *errgroup.Group, ) error { - var listPrm shard.ListWithCursorPrm - listPrm.WithCount(defaultEvacuateBatchSize) - sh := shardsToEvacuate[shardID] - sh.SetEvacuationInProgress(true) - - var c *meta.Cursor - for { - listPrm.WithCursor(c) - - // TODO (@fyrchik): #1731 this approach doesn't work in degraded modes - // because ListWithCursor works only with the metabase. - listRes, err := sh.ListWithCursor(ctx, listPrm) - if err != nil { - if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) { - break + var cntPrm shard.IterateOverContainersPrm + cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error { + egContainer.Go(func() error { + var objPrm shard.IterateOverObjectsInContainerPrm + objPrm.BucketName = name + objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error { + egObject.Go(func() error { + return e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate) + }) + return nil } - e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - return err - } - - if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil { - return err - } - - c = listRes.Cursor() + return sh.IterateOverObjectsInContainer(ctx, objPrm) + }) + return nil } - return nil + + sh.SetEvacuationInProgress(true) + err := sh.IterateOverContainers(ctx, cntPrm) + if err != nil { + e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } + return err } func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { sh := shardsToEvacuate[shardID] + shards := getShards() var listPrm pilorama.TreeListTreesPrm first := true @@ -637,68 +678,65 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) return shards, nil } -func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, +func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, + getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { - ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects", - trace.WithAttributes( - attribute.Int("objects_count", len(toEvacuate)), - )) + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects") defer span.End() - for i := range toEvacuate { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - addr := toEvacuate[i].Address + select { + case <-ctx.Done(): + return ctx.Err() + default: + } - var getPrm shard.GetPrm - getPrm.SetAddress(addr) - getPrm.SkipEvacCheck(true) + shards := getShards() + addr := objInfo.Address - getRes, err := sh.Get(ctx, getPrm) - if err != nil { - if prm.IgnoreErrors { - res.objFailed.Add(1) - continue - } - e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - return err - } + var getPrm shard.GetPrm + getPrm.SetAddress(addr) + getPrm.SkipEvacCheck(true) - evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, shardsToEvacuate, res) - if err != nil { - return err - } - - if evacuatedLocal { - continue - } - - if prm.ObjectsHandler == nil { - // Do not check ignoreErrors flag here because - // ignoring errors on put make this command kinda useless. - return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) - } - - moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object()) - if err != nil { - e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - return err - } - if moved { - res.objEvacuated.Add(1) - } else if prm.IgnoreErrors { + getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm) + if err != nil { + if prm.IgnoreErrors { res.objFailed.Add(1) - e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - } else { - return fmt.Errorf("object %s was not replicated", addr) + return nil } + e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + return err + } + + evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res) + if err != nil { + return err + } + + if evacuatedLocal { + return nil + } + + if prm.ObjectsHandler == nil { + // Do not check ignoreErrors flag here because + // ignoring errors on put make this command kinda useless. + return fmt.Errorf("%w: %s", errPutShard, objInfo) + } + + moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object()) + if err != nil { + e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + return err + } + if moved { + res.objEvacuated.Add(1) + } else if prm.IgnoreErrors { + res.objFailed.Add(1) + e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } else { + return fmt.Errorf("object %s was not replicated", addr) } return nil } diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 28529fab9..8fdc4daae 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -6,6 +6,8 @@ import ( "fmt" "path/filepath" "strconv" + "sync" + "sync/atomic" "testing" "time" @@ -29,7 +31,8 @@ import ( func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { dir := t.TempDir() - te := testNewEngine(t). + te := testNewEngine(t, + WithEvacuationContainerWorkerCount(2), WithEvacuationObjectWorkerCount(2)). setShardsNumOpts(t, shardNum, func(id int) []shard.Option { return []shard.Option{ shard.WithLogger(test.NewLogger(t)), @@ -174,13 +177,13 @@ func TestEvacuateObjectsNetwork(t *testing.T) { errReplication := errors.New("handler error") acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) { - var n uint64 + var n atomic.Uint64 return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { - if n == max { + if n.Load() == max { return false, errReplication } - n++ + n.Add(1) for i := range objects { if addr == objectCore.AddressOf(objects[i]) { require.Equal(t, objects[i], obj) @@ -531,6 +534,7 @@ func TestEvacuateTreesRemote(t *testing.T) { require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + mutex := sync.Mutex{} evacuatedTreeOps := make(map[string][]*pilorama.Move) var prm EvacuateShardPrm prm.ShardID = ids @@ -545,7 +549,9 @@ func TestEvacuateTreesRemote(t *testing.T) { if op.Time == 0 { return true, "", nil } + mutex.Lock() evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op) + mutex.Unlock() height = op.Time + 1 } } diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index b4326a92c..3a9cea5b4 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -61,6 +61,20 @@ func (l ListRes) Cursor() *Cursor { return l.cursor } +// IterateOverContainersPrm contains parameters for IterateOverContainers operation. +type IterateOverContainersPrm struct { + // Handler function executed upon containers in db. + Handler func(context.Context, []byte, cid.ID) error +} + +// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation. +type IterateOverObjectsInContainerPrm struct { + // BucketName container's bucket name + BucketName []byte + // Handler function executed upon objects in db. + Handler func(context.Context, *objectcore.Info) error +} + // ListWithCursor lists physical objects available in metabase starting from // cursor. Includes objects of all types. Does not include inhumed objects. // Use cursor value from response for consecutive requests. @@ -259,3 +273,157 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) return rawID, name[0] } + +// IterateOverContainers lists physical containers available in metabase starting from first. +func (db *DB) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("IterateOverContainers", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverContainers", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + return db.iterateOverContainers(ctx, tx, prm) + }) + success = err == nil + return metaerr.Wrap(err) +} + +func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm IterateOverContainersPrm) error { + c := tx.Cursor() + name, _ := c.First() + + var containerID cid.ID + for ; name != nil; name, _ = c.Next() { + cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name) + if cidRaw == nil { + continue + } + if prefix != primaryPrefix && prefix != lockersPrefix && prefix != tombstonePrefix { + continue + } + + bkt := tx.Bucket(name) + if bkt != nil { + bktName := make([]byte, len(name)) + copy(bktName, name) + var cnt cid.ID + copy(cnt[:], containerID[:]) + err := prm.Handler(ctx, bktName, cnt) + if err != nil { + return err + } + } + } + + return nil +} + +// IterateOverObjectsInContainer iterate over physical objects available in metabase starting from first. +func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("IterateOverObjectsInContainer", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverObjectsInContainer", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + return db.iterateOverObjectsInContainer(ctx, tx, prm) + }) + success = err == nil + return metaerr.Wrap(err) +} + +func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, prm IterateOverObjectsInContainerPrm) error { + bkt := tx.Bucket(prm.BucketName) + graveyardBkt := tx.Bucket(graveyardBucketName) + garbageBkt := tx.Bucket(garbageBucketName) + c := bkt.Cursor() + k, v := c.First() + + var containerID cid.ID + cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName) + if cidRaw == nil { + return nil + } + + var objType objectSDK.Type + + switch prefix { + case primaryPrefix: + objType = objectSDK.TypeRegular + case lockersPrefix: + objType = objectSDK.TypeLock + case tombstonePrefix: + objType = objectSDK.TypeTombstone + default: + return nil + } + + for ; k != nil; k, v = c.Next() { + var obj oid.ID + if err := obj.Decode(k); err != nil { + break + } + + if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { + continue + } + + var isLinkingObj bool + var ecInfo *objectcore.ECInfo + if objType == objectSDK.TypeRegular { + var o objectSDK.Object + if err := o.Unmarshal(v); err != nil { + return err + } + isLinkingObj = isLinkObject(&o) + ecHeader := o.ECHeader() + if ecHeader != nil { + ecInfo = &objectcore.ECInfo{ + ParentID: ecHeader.Parent(), + Index: ecHeader.Index(), + Total: ecHeader.Total(), + } + } + } + + var a oid.Address + a.SetContainer(containerID) + a.SetObject(obj) + objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo} + err := prm.Handler(ctx, &objInfo) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index 08ea81a0c..7ea0df2bd 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -34,6 +34,20 @@ func (r ListContainersRes) Containers() []cid.ID { return r.containers } +// IterateOverContainersPrm contains parameters for IterateOverContainers operation. +type IterateOverContainersPrm struct { + // Handler function executed upon containers in db. + Handler func(context.Context, []byte, cid.ID) error +} + +// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation. +type IterateOverObjectsInContainerPrm struct { + // BucketName container's bucket name + BucketName []byte + // Handler function executed upon containers in db. + Handler func(context.Context, *objectcore.Info) error +} + // ListWithCursorPrm contains parameters for ListWithCursor operation. type ListWithCursorPrm struct { count uint32 @@ -164,3 +178,48 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List cursor: res.Cursor(), }, nil } + +// IterateOverContainers lists physical containers presented in shard. +func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error { + _, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverContainers", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + if s.GetMode().NoMetabase() { + return ErrDegradedMode + } + + var metaPrm meta.IterateOverContainersPrm + metaPrm.Handler = prm.Handler + err := s.metaBase.IterateOverContainers(ctx, metaPrm) + if err != nil { + return fmt.Errorf("could not iterate over containers: %w", err) + } + + return nil +} + +// IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name. +func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error { + _, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + if s.GetMode().NoMetabase() { + return ErrDegradedMode + } + + var metaPrm meta.IterateOverObjectsInContainerPrm + metaPrm.BucketName = prm.BucketName + metaPrm.Handler = prm.Handler + err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm) + if err != nil { + return fmt.Errorf("could not iterate over objects: %w", err) + } + + return nil +} diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index aacebe9e3..bdc6f7c38 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -23,12 +23,14 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha } prm := engine.EvacuateShardPrm{ - ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), - IgnoreErrors: req.GetBody().GetIgnoreErrors(), - ObjectsHandler: s.replicateObject, - TreeHandler: s.replicateTree, - Async: true, - Scope: engine.EvacuateScope(req.GetBody().GetScope()), + ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), + IgnoreErrors: req.GetBody().GetIgnoreErrors(), + ObjectsHandler: s.replicateObject, + TreeHandler: s.replicateTree, + Async: true, + Scope: engine.EvacuateScope(req.GetBody().GetScope()), + ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(), + ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(), } _, err = s.s.Evacuate(ctx, prm) diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 04994328a..88a06de22 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -394,6 +394,10 @@ message StartShardEvacuationRequest { bool ignore_errors = 2; // Evacuation scope. uint32 scope = 3; + // Count of concurrent container evacuation workers. + uint32 container_worker_count = 4; + // Count of concurrent object evacuation workers. + uint32 object_worker_count = 5; } Body body = 1; diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index 019cac290b7733eb3572ae9d4d24009eab31115b..e92a8acd1f75be83532662057f0bd05a9d051fbb 100644 GIT binary patch delta 1264 zcmZ`(Ur19?80Vb3vvF_QTrN?E?Km?uZL@L^GQr`4MU+B;5EZ-46&l^Uw(FmY;S5pe zK}x~zp@*=bu!rg)7(qe!*vl#-NSZF82%?@M5=1)pUW>Eqx^N!O$M^ewzwdXx@BUt@ zy#2K@J&vB@Q%QKeNC>=LB)i~S29-Lf`N3Ec`{6+n(^0WgRW!L@i6wf~#Kl;mQyo+^ zI@qsh2M@61$Ust6+I-O>Nxq0AcAp!FMKvDDFjlZ?O=8HDBn`<4+5neEP#pw3b`i1@ z*aa=q*bd$i!45scLLE%q!|vG>-e!a{5e@%BP`-wDK=wUxRJAtK5q~lLP_~A58ab|@ zp2N!o`Vqw-KZbqKeaQ9<((|~2QIx=wVZ2@?_Mb0a6IwnHPwRgf8znI%nP8qwchf*t zY|&vD4VNF{INZF>)ONJlUckx}_UIlEc)mp5WpY=s4^>CyWQ?}9(LjJ&nuYiZik23< zAG}#XZlI&c(ab+fJMEJ-SrSE~SkNfO3)L&h0X|~r^Rd1}y)#iPbkS5FUa;+%%@GMA zGv?blz?&lx?)vBsX@-|MQqILn50f+S;})*gJ9WT~6#fof(|EUe%`OvrnRO!J(=yrX zv_xV5GO6e4sv&F-T5=SvVQ6z88|gKPbz!m4Y@}lvHyQNKu9(KV0Q>T&$;n3;z(O26 zQHU{0T4^mM6m6z9bYyc3!$KN1e*U#FeMQ&~qi1d75bz?B=~+M&!PGIf!bit&2zoaJ zuWQRX0$L^h@*=&>)XMcvwQvKCwltV=vkQQ6o12wsvMsl3pSMvHqphNnM%v}AJrS21|U%9!$s7U<=+ zZi0J8m{2NiD|+r(4D~Ge6#i*L{6vVMQXs$bUbw}8z0*;yi$O@U-IBZ{F!dHy1pjo3nc(T-uwRs+bv`fR9RiaIXryDl? zCl6_QfvyOV1W#&4-V>1DW{YrF+a|~ABeNr3HQqKBS@0&XaV*RU5l9(wmclX_2nJnR ISmv)6zw;D+bpQYW -- 2.45.2 From bcc0fed99cf3a0951c1be756d1a60e2bf73f46ef Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Tue, 3 Sep 2024 15:42:38 +0300 Subject: [PATCH 2/3] [#1350] node: Add ability to evacuate objects from `REP 1` only Signed-off-by: Anton Nikiforov --- cmd/frostfs-cli/modules/control/evacuation.go | 4 + docs/evacuation.md | 4 +- pkg/local_object_storage/engine/evacuate.go | 35 +++++++- .../engine/evacuate_test.go | 83 ++++++++++++++++++ pkg/local_object_storage/metabase/list.go | 47 ++++++++++ pkg/local_object_storage/shard/list.go | 25 ++++++ pkg/services/control/server/evacuate_async.go | 1 + pkg/services/control/service.proto | 2 + pkg/services/control/service_frostfs.pb.go | Bin 406555 -> 407248 bytes 9 files changed, 197 insertions(+), 4 deletions(-) diff --git a/cmd/frostfs-cli/modules/control/evacuation.go b/cmd/frostfs-cli/modules/control/evacuation.go index 04a67e5b5..fffc5e33e 100644 --- a/cmd/frostfs-cli/modules/control/evacuation.go +++ b/cmd/frostfs-cli/modules/control/evacuation.go @@ -20,6 +20,7 @@ const ( awaitFlag = "await" noProgressFlag = "no-progress" scopeFlag = "scope" + repOneOnlyFlag = "rep-one-only" containerWorkerCountFlag = "container-worker-count" objectWorkerCountFlag = "object-worker-count" @@ -69,6 +70,7 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) { ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag) objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag) + repOneOnly, _ := cmd.Flags().GetBool(repOneOnlyFlag) req := &control.StartShardEvacuationRequest{ Body: &control.StartShardEvacuationRequest_Body{ @@ -77,6 +79,7 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) { Scope: getEvacuationScope(cmd), ContainerWorkerCount: containerWorkerCount, ObjectWorkerCount: objectWorkerCount, + RepOneOnly: repOneOnly, }, } @@ -380,6 +383,7 @@ func initControlStartEvacuationShardCmd() { flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers") flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers") + flags.Bool(repOneOnlyFlag, false, "Evacuate objects only from containers with policy 'REP 1 ...'") startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) } diff --git a/docs/evacuation.md b/docs/evacuation.md index 885ce169a..322689446 100644 --- a/docs/evacuation.md +++ b/docs/evacuation.md @@ -20,7 +20,9 @@ Because it is necessary to prevent removing by policer objects with policy `REP ## Commands -`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`). +`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. +By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`). +To evacuate objects only from containers with policy `REP 1` use option `--rep-one-only`. `frostfs-cli control shards evacuation stop` stops running evacuation process. diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index ec731e914..da0cfb087 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -79,6 +79,7 @@ type EvacuateShardPrm struct { IgnoreErrors bool Async bool Scope EvacuateScope + RepOneOnly bool ContainerWorkerCount uint32 ObjectWorkerCount uint32 @@ -279,6 +280,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p attribute.Bool("async", prm.Async), attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Stringer("scope", prm.Scope), + attribute.Bool("repOneOnly", prm.RepOneOnly), )) defer func() { @@ -297,14 +299,12 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p } egShard, _ := errgroup.WithContext(ctx) - egContainer, _ := errgroup.WithContext(ctx) containerWorkerCount := prm.ContainerWorkerCount if containerWorkerCount == 0 { containerWorkerCount = e.cfg.evacuationContainerWorkerCount } egContainer.SetLimit(int(containerWorkerCount)) - egObject, _ := errgroup.WithContext(ctx) objectWorkerCount := prm.ObjectWorkerCount if objectWorkerCount == 0 { @@ -409,8 +409,23 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string ) error { sh := shardsToEvacuate[shardID] var cntPrm shard.IterateOverContainersPrm - cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error { + cntPrm.Handler = func(ctx context.Context, name []byte, cnt cid.ID) error { egContainer.Go(func() error { + if prm.RepOneOnly { + notRepOne, err := e.isNotRepOne(cnt) + if err != nil { + return err + } + if notRepOne { + countPrm := shard.CountAliveObjectsInBucketPrm{BucketName: name} + count, err := sh.CountAliveObjectsInBucket(ctx, countPrm) + if err != nil { + return err + } + res.objSkipped.Add(count) + return nil + } + } var objPrm shard.IterateOverObjectsInContainerPrm objPrm.BucketName = name objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error { @@ -741,6 +756,20 @@ func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objI return nil } +func (e *StorageEngine) isNotRepOne(cid cid.ID) (bool, error) { + c, err := e.containerSource.Load().cs.Get(cid) + if err != nil { + return false, err + } + p := c.Value.PlacementPolicy() + for i := range p.NumberOfReplicas() { + if p.ReplicaDescriptor(i).NumberOfObjects() == 1 { + return false, nil + } + } + return true, nil +} + func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, ) (bool, error) { diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 8fdc4daae..b04192dfe 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" @@ -20,14 +21,31 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) +type containerStorage struct { + cntmap map[cid.ID]*container.Container +} + +func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) { + coreCnt := coreContainer.Container{ + Value: *cs.cntmap[id], + } + return &coreCnt, nil +} + +func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) { + return nil, nil +} + func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { dir := t.TempDir() @@ -611,3 +629,68 @@ func TestEvacuateTreesRemote(t *testing.T) { require.Equal(t, expectedTreeOps, evacuatedTreeOps) } + +func TestEvacuateShardObjectsRepOneOnly(t *testing.T) { + e, ids, _ := newEngineEvacuate(t, 2, 0) + defer func() { + require.NoError(t, e.Close(context.Background())) + }() + + // Create container with policy REP 2 + cnr1 := container.Container{} + p1 := netmap.PlacementPolicy{} + p1.SetContainerBackupFactor(1) + x1 := netmap.ReplicaDescriptor{} + x1.SetNumberOfObjects(2) + p1.AddReplicas(x1) + cnr1.SetPlacementPolicy(p1) + + var idCnr1 cid.ID + container.CalculateID(&idCnr1, cnr1) + + cnrmap := make(map[cid.ID]*container.Container) + var cids []cid.ID + cnrmap[idCnr1] = &cnr1 + cids = append(cids, idCnr1) + + // Create container with policy REP 1 + cnr2 := container.Container{} + p2 := netmap.PlacementPolicy{} + p2.SetContainerBackupFactor(1) + x2 := netmap.ReplicaDescriptor{} + x2.SetNumberOfObjects(1) + p2.AddReplicas(x2) + cnr2.SetPlacementPolicy(p2) + + var idCnr2 cid.ID + container.CalculateID(&idCnr2, cnr2) + cnrmap[idCnr2] = &cnr2 + cids = append(cids, idCnr2) + + e.SetContainerSource(&containerStorage{cntmap: cnrmap}) + + for _, sh := range ids { + for j := range 2 { + for range 4 { + obj := testutil.GenerateObjectWithCID(cids[j]) + var putPrm shard.PutPrm + putPrm.SetObject(obj) + _, err := e.shards[sh.String()].Put(context.Background(), putPrm) + require.NoError(t, err) + } + } + } + + var prm EvacuateShardPrm + prm.ShardID = ids[0:1] + prm.Scope = EvacuateScopeObjects + prm.RepOneOnly = true + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + + res, err := e.Evacuate(context.Background(), prm) + require.NoError(t, err) + require.Equal(t, uint64(4), res.ObjectsEvacuated()) + require.Equal(t, uint64(4), res.ObjectsSkipped()) + require.Equal(t, uint64(0), res.ObjectsFailed()) +} diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index 3a9cea5b4..65426cbb5 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -75,6 +75,12 @@ type IterateOverObjectsInContainerPrm struct { Handler func(context.Context, *objectcore.Info) error } +// CountAliveObjectsInBucketPrm contains parameters for IterateOverObjectsInContainer operation. +type CountAliveObjectsInBucketPrm struct { + // BucketName container's bucket name + BucketName []byte +} + // ListWithCursor lists physical objects available in metabase starting from // cursor. Includes objects of all types. Does not include inhumed objects. // Use cursor value from response for consecutive requests. @@ -427,3 +433,44 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, p } return nil } + +// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage. +func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("CountAliveObjectsInBucket", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.CountAliveObjectsInBucket") + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return 0, ErrDegradedMode + } + var count uint64 + err := db.boltDB.View(func(tx *bbolt.Tx) error { + bkt := tx.Bucket(prm.BucketName) + graveyardBkt := tx.Bucket(graveyardBucketName) + garbageBkt := tx.Bucket(garbageBucketName) + c := bkt.Cursor() + k, _ := c.First() + cidRaw := prm.BucketName[1:bucketKeySize] + if cidRaw == nil { + return nil + } + for ; k != nil; k, _ = c.Next() { + if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { + continue + } + count++ + } + return nil + }) + success = err == nil + return count, metaerr.Wrap(err) +} diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index 7ea0df2bd..028a31456 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -48,6 +48,12 @@ type IterateOverObjectsInContainerPrm struct { Handler func(context.Context, *objectcore.Info) error } +// CountAliveObjectsInBucketPrm contains parameters for CountAliveObjectsInBucket operation. +type CountAliveObjectsInBucketPrm struct { + // BucketName container's bucket name + BucketName []byte +} + // ListWithCursorPrm contains parameters for ListWithCursor operation. type ListWithCursorPrm struct { count uint32 @@ -223,3 +229,22 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv return nil } + +// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage. +func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) { + _, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer") + defer span.End() + + if s.GetMode().NoMetabase() { + return 0, ErrDegradedMode + } + + var metaPrm meta.CountAliveObjectsInBucketPrm + metaPrm.BucketName = prm.BucketName + count, err := s.metaBase.CountAliveObjectsInBucket(ctx, metaPrm) + if err != nil { + return 0, fmt.Errorf("could not count alive objects in bucket: %w", err) + } + + return count, nil +} diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index bdc6f7c38..146ac7e16 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -31,6 +31,7 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha Scope: engine.EvacuateScope(req.GetBody().GetScope()), ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(), ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(), + RepOneOnly: req.GetBody().GetRepOneOnly(), } _, err = s.s.Evacuate(ctx, prm) diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 88a06de22..ae1939e13 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -398,6 +398,8 @@ message StartShardEvacuationRequest { uint32 container_worker_count = 4; // Count of concurrent object evacuation workers. uint32 object_worker_count = 5; + // Choose for evacuation objects in `REP 1` containers only. + bool rep_one_only = 6; } Body body = 1; diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index e92a8acd1f75be83532662057f0bd05a9d051fbb..e16f082b13a254d4b1d626c468fde46aa7f40241 100644 GIT binary patch delta 677 zcmbRJLgK<(iG~)&Elgo02An~u1^#)d{&_i-3J8#tpPvKfC1e%n=UFKg!4*w6WMng+ zuHVZfIDJDE6Wipz1M1WDS22lAFQ{XdoPOaQt1^=0yx;26-}N#{gOyH~+sQa_y1giWAV=8tAqvmPJqB(9Wc_{Xi9y9g9eEVsWa1nU#XR zz5?87lP?O3OrKE0jm&g`5LSL2`-+CLm@8VwUX=y{xyF0RUy9>6`!n delta 300 zcmcccR$}%GiG~)&Elgo0(^D%LwWn`b#3VZXLq8MibcG5gw&@0q%$(B`ZZXPDPher= zn*MGL6A!8^-*g3YMycuZKeBRSk=2~O;3F#s774cL3;r^9PY;;LBn`5NeS3TvlLF&( zjXR97)A_rYl%^-NF|kj6(7b8;g9@fSjFZ*M!lny!F= Date: Mon, 23 Sep 2024 12:19:15 +0300 Subject: [PATCH 3/3] [#1350] node: Add unit test to measure time required for evacuation Signed-off-by: Anton Nikiforov --- .../engine/evacuate_test.go | 60 ++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index b04192dfe..e3a026d9e 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -32,10 +32,12 @@ import ( ) type containerStorage struct { - cntmap map[cid.ID]*container.Container + cntmap map[cid.ID]*container.Container + latency time.Duration } func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) { + time.Sleep(cs.latency) coreCnt := coreContainer.Container{ Value: *cs.cntmap[id], } @@ -694,3 +696,59 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) { require.Equal(t, uint64(4), res.ObjectsSkipped()) require.Equal(t, uint64(0), res.ObjectsFailed()) } + +func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) { + t.Skip() + e, ids, _ := newEngineEvacuate(t, 2, 0) + defer func() { + require.NoError(t, e.Close(context.Background())) + }() + + cnrmap := make(map[cid.ID]*container.Container) + var cids []cid.ID + // Create containers with policy REP 1 + for i := range 10_000 { + cnr1 := container.Container{} + p1 := netmap.PlacementPolicy{} + p1.SetContainerBackupFactor(1) + x1 := netmap.ReplicaDescriptor{} + x1.SetNumberOfObjects(2) + p1.AddReplicas(x1) + cnr1.SetPlacementPolicy(p1) + cnr1.SetAttribute("i", strconv.Itoa(i)) + + var idCnr1 cid.ID + container.CalculateID(&idCnr1, cnr1) + + cnrmap[idCnr1] = &cnr1 + cids = append(cids, idCnr1) + } + + e.SetContainerSource(&containerStorage{ + cntmap: cnrmap, + latency: time.Millisecond * 100, + }) + + for _, cnt := range cids { + for range 1 { + obj := testutil.GenerateObjectWithCID(cnt) + var putPrm shard.PutPrm + putPrm.SetObject(obj) + _, err := e.shards[ids[0].String()].Put(context.Background(), putPrm) + require.NoError(t, err) + } + } + + var prm EvacuateShardPrm + prm.ShardID = ids[0:1] + prm.Scope = EvacuateScopeObjects + prm.RepOneOnly = true + prm.ContainerWorkerCount = 10 + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + + start := time.Now() + _, err := e.Evacuate(context.Background(), prm) + t.Logf("evacuate took %v\n", time.Since(start)) + require.NoError(t, err) +} -- 2.45.2