diff --git a/cmd/frostfs-cli/modules/control/shards.go b/cmd/frostfs-cli/modules/control/shards.go index d6c2a0b9b..329cb9100 100644 --- a/cmd/frostfs-cli/modules/control/shards.go +++ b/cmd/frostfs-cli/modules/control/shards.go @@ -19,6 +19,7 @@ func initControlShardsCmd() { shardsCmd.AddCommand(doctorCmd) shardsCmd.AddCommand(writecacheShardCmd) shardsCmd.AddCommand(shardsDetachCmd) + shardsCmd.AddCommand(shardsRebuildCmd) initControlShardsListCmd() initControlSetShardModeCmd() diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 110281418..b59518d14 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -109,7 +109,6 @@ type applicationConfiguration struct { shardPoolSize uint32 shards []shardCfg lowMem bool - rebuildWorkers uint32 } // if need to run node in compatibility with other versions mode @@ -127,6 +126,8 @@ type shardCfg struct { refillMetabaseWorkersCount int mode shardmode.Mode + rebuildWorkersCount uint32 + metaCfg struct { path string perm fs.FileMode @@ -230,7 +231,6 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c) - a.EngineCfg.rebuildWorkers = engineconfig.EngineRebuildWorkersCount(c) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) }) } @@ -240,6 +240,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig newConfig.refillMetabase = oldConfig.RefillMetabase() newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount() + newConfig.rebuildWorkersCount = oldConfig.RebuildWorkerCount() newConfig.mode = oldConfig.Mode() newConfig.compress = oldConfig.Compress() newConfig.estimateCompressibility = oldConfig.EstimateCompressibility() @@ -835,7 +836,6 @@ func (c *cfg) engineOpts() []engine.Option { engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithLogger(c.log), engine.WithLowMemoryConsumption(c.EngineCfg.lowMem), - engine.WithRebuildWorkersCount(c.EngineCfg.rebuildWorkers), ) if c.metricsCollector != nil { @@ -998,6 +998,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID shard.WithLogger(c.log), shard.WithRefillMetabase(shCfg.refillMetabase), shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount), + shard.WithRebuildWorkersCount(shCfg.rebuildWorkersCount), shard.WithMode(shCfg.mode), shard.WithBlobStorOptions(blobstoreOpts...), shard.WithMetaBaseOptions(mbOptions...), diff --git a/cmd/frostfs-node/config/engine/config.go b/cmd/frostfs-node/config/engine/config.go index baa4e3c9d..c944d1c58 100644 --- a/cmd/frostfs-node/config/engine/config.go +++ b/cmd/frostfs-node/config/engine/config.go @@ -15,9 +15,6 @@ const ( // ShardPoolSizeDefault is a default value of routine pool size per-shard to // process object PUT operations in a storage engine. ShardPoolSizeDefault = 20 - // RebuildWorkersCountDefault is a default value of the workers count to - // process storage rebuild operations in a storage engine. - RebuildWorkersCountDefault = 100 ) // ErrNoShardConfigured is returned when at least 1 shard is required but none are found. @@ -91,11 +88,3 @@ func ShardErrorThreshold(c *config.Config) uint32 { func EngineLowMemoryConsumption(c *config.Config) bool { return config.BoolSafe(c.Sub(subsection), "low_mem") } - -// EngineRebuildWorkersCount returns value of "rebuild_workers_count" config parmeter from "storage" section. -func EngineRebuildWorkersCount(c *config.Config) uint32 { - if v := config.Uint32Safe(c.Sub(subsection), "rebuild_workers_count"); v > 0 { - return v - } - return RebuildWorkersCountDefault -} diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index d53207ccc..464d72556 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -39,7 +39,6 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty)) require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty)) require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode()) - require.EqualValues(t, engineconfig.RebuildWorkersCountDefault, engineconfig.EngineRebuildWorkersCount(empty)) }) const path = "../../../../config/example/node" @@ -49,7 +48,6 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c)) require.EqualValues(t, 15, engineconfig.ShardPoolSize(c)) - require.EqualValues(t, uint32(1000), engineconfig.EngineRebuildWorkersCount(c)) err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error { defer func() { @@ -121,6 +119,7 @@ func TestEngineSection(t *testing.T) { require.Equal(t, false, sc.RefillMetabase()) require.Equal(t, mode.ReadOnly, sc.Mode()) require.Equal(t, 100, sc.RefillMetabaseWorkersCount()) + require.Equal(t, uint32(1000), sc.RebuildWorkerCount()) case 1: require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path()) require.Equal(t, fs.FileMode(0o644), pl.Perm()) @@ -176,6 +175,7 @@ func TestEngineSection(t *testing.T) { require.Equal(t, true, sc.RefillMetabase()) require.Equal(t, mode.ReadWrite, sc.Mode()) require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount()) + require.Equal(t, uint32(shardconfig.RebuildWorkersCountDefault), sc.RebuildWorkerCount()) } return nil }) diff --git a/cmd/frostfs-node/config/engine/shard/config.go b/cmd/frostfs-node/config/engine/shard/config.go index 0620c9f63..ec9df0e89 100644 --- a/cmd/frostfs-node/config/engine/shard/config.go +++ b/cmd/frostfs-node/config/engine/shard/config.go @@ -19,6 +19,7 @@ const ( SmallSizeLimitDefault = 1 << 20 EstimateCompressibilityThresholdDefault = 0.1 RefillMetabaseWorkersCountDefault = 500 + RebuildWorkersCountDefault = 5 ) // From wraps config section into Config. @@ -149,6 +150,20 @@ func (x *Config) RefillMetabaseWorkersCount() int { return RefillMetabaseWorkersCountDefault } +// RebuildWorkersCount returns the value of "resync_metabase_worker_count" config parameter. +// +// Returns RebuildWorkersCountDefault if the value is not a positive number. +func (x *Config) RebuildWorkerCount() uint32 { + v := config.Uint32Safe( + (*config.Config)(x), + "rebuild_worker_count", + ) + if v > 0 { + return v + } + return RebuildWorkersCountDefault +} + // Mode return the value of "mode" config parameter. // // Panics if read the value is not one of predefined diff --git a/config/example/node.env b/config/example/node.env index b39423ffb..1eccd8a5d 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -90,11 +90,11 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 # Storage engine section FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100 -FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000 ## 0 shard ### Flag to refill Metabase from BlobStor FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE_WORKER_COUNT=100 +FROSTFS_STORAGE_SHARD_0_REBUILD_WORKER_COUNT=1000 ### Flag to set shard mode FROSTFS_STORAGE_SHARD_0_MODE=read-only ### Write cache config diff --git a/config/example/node.json b/config/example/node.json index fe2de0e01..be7ced77a 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -135,12 +135,12 @@ "storage": { "shard_pool_size": 15, "shard_ro_error_threshold": 100, - "rebuild_workers_count": 1000, "shard": { "0": { "mode": "read-only", "resync_metabase": false, "resync_metabase_worker_count": 100, + "rebuild_worker_count": 1000, "writecache": { "enabled": false, "no_sync": true, diff --git a/config/example/node.yaml b/config/example/node.yaml index cc339a427..4b9720655 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -117,7 +117,6 @@ storage: # note: shard configuration can be omitted for relay node (see `node.relay`) shard_pool_size: 15 # size of per-shard worker pools used for PUT operations shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors) - rebuild_workers_count: 1000 # count of rebuild storage concurrent workers shard: default: # section with the default shard parameters @@ -165,6 +164,7 @@ storage: # disabled (do not work with the shard, allows to not remove it from the config) resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding resync_metabase_worker_count: 100 + rebuild_worker_count: 1000 # count of rebuild storage concurrent workers writecache: enabled: false diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 5bf35cd65..f390d84a4 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -189,6 +189,7 @@ The following table describes configuration for each shard. | `mode` | `string` | `read-write` | Shard Mode.
Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` | | `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. | | `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. | +| `rebuild_worker_count` | `int` | `5` | Count of concurrent workers to rebuild blobstore. | | `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. | | `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. | | `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. | diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index b87d77e6c..5e883a641 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -40,7 +40,6 @@ type StorageEngine struct { err error } evacuateLimiter *evacuationLimiter - rebuildLimiter *rebuildLimiter } type shardWrapper struct { @@ -213,16 +212,13 @@ type cfg struct { lowMem bool - rebuildWorkersCount uint32 - containerSource atomic.Pointer[containerSource] } func defaultCfg() *cfg { res := &cfg{ - log: &logger.Logger{Logger: zap.L()}, - shardPoolSize: 20, - rebuildWorkersCount: 100, + log: &logger.Logger{Logger: zap.L()}, + shardPoolSize: 20, } res.containerSource.Store(&containerSource{}) return res @@ -243,7 +239,6 @@ func New(opts ...Option) *StorageEngine { closeCh: make(chan struct{}), setModeCh: make(chan setModeRequest), evacuateLimiter: &evacuationLimiter{}, - rebuildLimiter: newRebuildLimiter(c.rebuildWorkersCount), } } @@ -282,13 +277,6 @@ func WithLowMemoryConsumption(lowMemCons bool) Option { } } -// WithRebuildWorkersCount returns an option to set the count of concurrent rebuild workers. -func WithRebuildWorkersCount(count uint32) Option { - return func(c *cfg) { - c.rebuildWorkersCount = count - } -} - // SetContainerSource sets container source. func (e *StorageEngine) SetContainerSource(cs container.Source) { e.containerSource.Store(&containerSource{cs: cs}) diff --git a/pkg/local_object_storage/engine/rebuild.go b/pkg/local_object_storage/engine/rebuild.go index 3970aae89..83c6a54ed 100644 --- a/pkg/local_object_storage/engine/rebuild.go +++ b/pkg/local_object_storage/engine/rebuild.go @@ -41,7 +41,7 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes } resGuard := &sync.Mutex{} - limiter := newRebuildLimiter(prm.ConcurrencyLimit) + limiter := shard.NewRebuildLimiter(prm.ConcurrencyLimit) eg, egCtx := errgroup.WithContext(ctx) for _, shardID := range prm.ShardIDs { diff --git a/pkg/local_object_storage/engine/rebuild_limiter.go b/pkg/local_object_storage/engine/rebuild_limiter.go deleted file mode 100644 index 28b02b0a3..000000000 --- a/pkg/local_object_storage/engine/rebuild_limiter.go +++ /dev/null @@ -1,26 +0,0 @@ -package engine - -import "context" - -type rebuildLimiter struct { - semaphore chan struct{} -} - -func newRebuildLimiter(workersCount uint32) *rebuildLimiter { - return &rebuildLimiter{ - semaphore: make(chan struct{}, workersCount), - } -} - -func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error { - select { - case l.semaphore <- struct{}{}: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -func (l *rebuildLimiter) ReleaseWorkSlot() { - <-l.semaphore -} diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 40584149e..2ad6859e4 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -137,7 +137,6 @@ func (e *StorageEngine) createShard(_ context.Context, opts []shard.Option) (*sh shard.WithExpiredLocksCallback(e.processExpiredLocks), shard.WithDeletedLockCallback(e.processDeletedLocks), shard.WithReportErrorFunc(e.reportShardErrorBackground), - shard.WithRebuildWorkerLimiter(e.rebuildLimiter), shard.WithZeroSizeCallback(e.processZeroSizeContainers), shard.WithZeroCountCallback(e.processZeroCountContainers), )...) diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 6efe4ec37..5e9639a7b 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -129,7 +129,7 @@ func (s *Shard) Init(ctx context.Context) error { s.gc.init(ctx) - s.rb = newRebuilder(s.rebuildLimiter) + s.rb = newRebuilder(NewRebuildLimiter(s.rebuildWorkersCount)) if !m.NoMetabase() && !s.rebuildDisabled { s.rb.Start(ctx, s.blobStor, s.metaBase, s.log) } diff --git a/pkg/local_object_storage/shard/rebuild.go b/pkg/local_object_storage/shard/rebuild.go index f8051999e..2eef456be 100644 --- a/pkg/local_object_storage/shard/rebuild.go +++ b/pkg/local_object_storage/shard/rebuild.go @@ -26,7 +26,7 @@ type rebuildLimiter struct { semaphore chan struct{} } -func newRebuildLimiter(workersCount uint32) *rebuildLimiter { +func NewRebuildLimiter(workersCount uint32) RebuildWorkerLimiter { return &rebuildLimiter{ semaphore: make(chan struct{}, workersCount), } diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 1eaee8815..1e2bb7900 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -140,7 +140,7 @@ type cfg struct { reportErrorFunc func(selfID string, message string, err error) - rebuildLimiter RebuildWorkerLimiter + rebuildWorkersCount uint32 rebuildDisabled bool } @@ -151,7 +151,7 @@ func defaultCfg() *cfg { log: &logger.Logger{Logger: zap.L()}, gcCfg: defaultGCCfg(), reportErrorFunc: func(string, string, error) {}, - rebuildLimiter: newRebuildLimiter(1), + rebuildWorkersCount: 1, zeroSizeContainersCallback: func(context.Context, []cid.ID) {}, zeroCountContainersCallback: func(context.Context, []cid.ID) {}, } @@ -384,11 +384,11 @@ func WithExpiredCollectorWorkerCount(count int) Option { } } -// WithRebuildWorkerLimiter return option to set concurrent +// WithRebuildWorkersCount return option to set concurrent // workers count of storage rebuild operation. -func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option { +func WithRebuildWorkersCount(count uint32) Option { return func(c *cfg) { - c.rebuildLimiter = l + c.rebuildWorkersCount = count } }