[#1337] config: Move rebuild_worker_count
to shard section
Some checks failed
Tests and linters / Run gofumpt (pull_request) Successful in 1m53s
DCO action / DCO (pull_request) Successful in 2m4s
Pre-commit hooks / Pre-commit (pull_request) Failing after 2m56s
Vulncheck / Vulncheck (pull_request) Successful in 2m41s
Tests and linters / Tests (1.23) (pull_request) Successful in 2m52s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m55s
Build / Build Components (1.22) (pull_request) Successful in 2m54s
Build / Build Components (1.23) (pull_request) Successful in 3m7s
Tests and linters / Lint (pull_request) Successful in 3m40s
Tests and linters / Staticcheck (pull_request) Successful in 3m34s
Tests and linters / Tests with -race (pull_request) Successful in 4m14s
Tests and linters / gopls check (pull_request) Successful in 4m22s
Some checks failed
Tests and linters / Run gofumpt (pull_request) Successful in 1m53s
DCO action / DCO (pull_request) Successful in 2m4s
Pre-commit hooks / Pre-commit (pull_request) Failing after 2m56s
Vulncheck / Vulncheck (pull_request) Successful in 2m41s
Tests and linters / Tests (1.23) (pull_request) Successful in 2m52s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m55s
Build / Build Components (1.22) (pull_request) Successful in 2m54s
Build / Build Components (1.23) (pull_request) Successful in 3m7s
Tests and linters / Lint (pull_request) Successful in 3m40s
Tests and linters / Staticcheck (pull_request) Successful in 3m34s
Tests and linters / Tests with -race (pull_request) Successful in 4m14s
Tests and linters / gopls check (pull_request) Successful in 4m22s
This makes it simple to limit performance degradation for every shard because of rebuild. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
f48a68c274
commit
6fc79de12f
15 changed files with 35 additions and 68 deletions
|
@ -109,7 +109,6 @@ type applicationConfiguration struct {
|
||||||
shardPoolSize uint32
|
shardPoolSize uint32
|
||||||
shards []shardCfg
|
shards []shardCfg
|
||||||
lowMem bool
|
lowMem bool
|
||||||
rebuildWorkers uint32
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if need to run node in compatibility with other versions mode
|
// if need to run node in compatibility with other versions mode
|
||||||
|
@ -127,6 +126,8 @@ type shardCfg struct {
|
||||||
refillMetabaseWorkersCount int
|
refillMetabaseWorkersCount int
|
||||||
mode shardmode.Mode
|
mode shardmode.Mode
|
||||||
|
|
||||||
|
rebuildWorkersCount uint32
|
||||||
|
|
||||||
metaCfg struct {
|
metaCfg struct {
|
||||||
path string
|
path string
|
||||||
perm fs.FileMode
|
perm fs.FileMode
|
||||||
|
@ -230,7 +231,6 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
||||||
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
||||||
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
||||||
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
|
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
|
||||||
a.EngineCfg.rebuildWorkers = engineconfig.EngineRebuildWorkersCount(c)
|
|
||||||
|
|
||||||
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
||||||
}
|
}
|
||||||
|
@ -240,6 +240,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig
|
||||||
|
|
||||||
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
||||||
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
|
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
|
||||||
|
newConfig.rebuildWorkersCount = oldConfig.RebuildWorkerCount()
|
||||||
newConfig.mode = oldConfig.Mode()
|
newConfig.mode = oldConfig.Mode()
|
||||||
newConfig.compress = oldConfig.Compress()
|
newConfig.compress = oldConfig.Compress()
|
||||||
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
||||||
|
@ -835,7 +836,6 @@ func (c *cfg) engineOpts() []engine.Option {
|
||||||
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
||||||
engine.WithLogger(c.log),
|
engine.WithLogger(c.log),
|
||||||
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
|
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
|
||||||
engine.WithRebuildWorkersCount(c.EngineCfg.rebuildWorkers),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if c.metricsCollector != nil {
|
if c.metricsCollector != nil {
|
||||||
|
@ -998,6 +998,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
|
||||||
shard.WithLogger(c.log),
|
shard.WithLogger(c.log),
|
||||||
shard.WithRefillMetabase(shCfg.refillMetabase),
|
shard.WithRefillMetabase(shCfg.refillMetabase),
|
||||||
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
|
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
|
||||||
|
shard.WithRebuildWorkersCount(shCfg.rebuildWorkersCount),
|
||||||
shard.WithMode(shCfg.mode),
|
shard.WithMode(shCfg.mode),
|
||||||
shard.WithBlobStorOptions(blobstoreOpts...),
|
shard.WithBlobStorOptions(blobstoreOpts...),
|
||||||
shard.WithMetaBaseOptions(mbOptions...),
|
shard.WithMetaBaseOptions(mbOptions...),
|
||||||
|
|
|
@ -15,9 +15,6 @@ const (
|
||||||
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
|
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
|
||||||
// process object PUT operations in a storage engine.
|
// process object PUT operations in a storage engine.
|
||||||
ShardPoolSizeDefault = 20
|
ShardPoolSizeDefault = 20
|
||||||
// RebuildWorkersCountDefault is a default value of the workers count to
|
|
||||||
// process storage rebuild operations in a storage engine.
|
|
||||||
RebuildWorkersCountDefault = 100
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
|
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
|
||||||
|
@ -91,11 +88,3 @@ func ShardErrorThreshold(c *config.Config) uint32 {
|
||||||
func EngineLowMemoryConsumption(c *config.Config) bool {
|
func EngineLowMemoryConsumption(c *config.Config) bool {
|
||||||
return config.BoolSafe(c.Sub(subsection), "low_mem")
|
return config.BoolSafe(c.Sub(subsection), "low_mem")
|
||||||
}
|
}
|
||||||
|
|
||||||
// EngineRebuildWorkersCount returns value of "rebuild_workers_count" config parmeter from "storage" section.
|
|
||||||
func EngineRebuildWorkersCount(c *config.Config) uint32 {
|
|
||||||
if v := config.Uint32Safe(c.Sub(subsection), "rebuild_workers_count"); v > 0 {
|
|
||||||
return v
|
|
||||||
}
|
|
||||||
return RebuildWorkersCountDefault
|
|
||||||
}
|
|
||||||
|
|
|
@ -39,7 +39,6 @@ func TestEngineSection(t *testing.T) {
|
||||||
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
|
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
|
||||||
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
|
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
|
||||||
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
|
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
|
||||||
require.EqualValues(t, engineconfig.RebuildWorkersCountDefault, engineconfig.EngineRebuildWorkersCount(empty))
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const path = "../../../../config/example/node"
|
const path = "../../../../config/example/node"
|
||||||
|
@ -49,7 +48,6 @@ func TestEngineSection(t *testing.T) {
|
||||||
|
|
||||||
require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
|
require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
|
||||||
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
|
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
|
||||||
require.EqualValues(t, uint32(1000), engineconfig.EngineRebuildWorkersCount(c))
|
|
||||||
|
|
||||||
err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error {
|
err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error {
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -121,6 +119,7 @@ func TestEngineSection(t *testing.T) {
|
||||||
require.Equal(t, false, sc.RefillMetabase())
|
require.Equal(t, false, sc.RefillMetabase())
|
||||||
require.Equal(t, mode.ReadOnly, sc.Mode())
|
require.Equal(t, mode.ReadOnly, sc.Mode())
|
||||||
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
|
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
|
||||||
|
require.Equal(t, uint32(1000), sc.RebuildWorkerCount())
|
||||||
case 1:
|
case 1:
|
||||||
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
||||||
require.Equal(t, fs.FileMode(0o644), pl.Perm())
|
require.Equal(t, fs.FileMode(0o644), pl.Perm())
|
||||||
|
@ -176,6 +175,7 @@ func TestEngineSection(t *testing.T) {
|
||||||
require.Equal(t, true, sc.RefillMetabase())
|
require.Equal(t, true, sc.RefillMetabase())
|
||||||
require.Equal(t, mode.ReadWrite, sc.Mode())
|
require.Equal(t, mode.ReadWrite, sc.Mode())
|
||||||
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
|
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
|
||||||
|
require.Equal(t, uint32(shardconfig.RebuildWorkersCountDefault), sc.RebuildWorkerCount())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -19,6 +19,7 @@ const (
|
||||||
SmallSizeLimitDefault = 1 << 20
|
SmallSizeLimitDefault = 1 << 20
|
||||||
EstimateCompressibilityThresholdDefault = 0.1
|
EstimateCompressibilityThresholdDefault = 0.1
|
||||||
RefillMetabaseWorkersCountDefault = 500
|
RefillMetabaseWorkersCountDefault = 500
|
||||||
|
RebuildWorkersCountDefault = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
// From wraps config section into Config.
|
// From wraps config section into Config.
|
||||||
|
@ -149,6 +150,20 @@ func (x *Config) RefillMetabaseWorkersCount() int {
|
||||||
return RefillMetabaseWorkersCountDefault
|
return RefillMetabaseWorkersCountDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RebuildWorkersCount returns the value of "resync_metabase_worker_count" config parameter.
|
||||||
|
//
|
||||||
|
// Returns RebuildWorkersCountDefault if the value is not a positive number.
|
||||||
|
func (x *Config) RebuildWorkerCount() uint32 {
|
||||||
|
v := config.Uint32Safe(
|
||||||
|
(*config.Config)(x),
|
||||||
|
"rebuild_worker_count",
|
||||||
|
)
|
||||||
|
if v > 0 {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
return RebuildWorkersCountDefault
|
||||||
|
}
|
||||||
|
|
||||||
// Mode return the value of "mode" config parameter.
|
// Mode return the value of "mode" config parameter.
|
||||||
//
|
//
|
||||||
// Panics if read the value is not one of predefined
|
// Panics if read the value is not one of predefined
|
||||||
|
|
|
@ -90,11 +90,11 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||||
# Storage engine section
|
# Storage engine section
|
||||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||||
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
|
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
|
||||||
FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000
|
|
||||||
## 0 shard
|
## 0 shard
|
||||||
### Flag to refill Metabase from BlobStor
|
### Flag to refill Metabase from BlobStor
|
||||||
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
|
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
|
||||||
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE_WORKER_COUNT=100
|
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE_WORKER_COUNT=100
|
||||||
|
FROSTFS_STORAGE_SHARD_0_REBUILD_WORKER_COUNT=1000
|
||||||
### Flag to set shard mode
|
### Flag to set shard mode
|
||||||
FROSTFS_STORAGE_SHARD_0_MODE=read-only
|
FROSTFS_STORAGE_SHARD_0_MODE=read-only
|
||||||
### Write cache config
|
### Write cache config
|
||||||
|
|
|
@ -135,12 +135,12 @@
|
||||||
"storage": {
|
"storage": {
|
||||||
"shard_pool_size": 15,
|
"shard_pool_size": 15,
|
||||||
"shard_ro_error_threshold": 100,
|
"shard_ro_error_threshold": 100,
|
||||||
"rebuild_workers_count": 1000,
|
|
||||||
"shard": {
|
"shard": {
|
||||||
"0": {
|
"0": {
|
||||||
"mode": "read-only",
|
"mode": "read-only",
|
||||||
"resync_metabase": false,
|
"resync_metabase": false,
|
||||||
"resync_metabase_worker_count": 100,
|
"resync_metabase_worker_count": 100,
|
||||||
|
"rebuild_worker_count": 1000,
|
||||||
"writecache": {
|
"writecache": {
|
||||||
"enabled": false,
|
"enabled": false,
|
||||||
"no_sync": true,
|
"no_sync": true,
|
||||||
|
|
|
@ -117,7 +117,6 @@ storage:
|
||||||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||||
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
||||||
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
|
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
|
||||||
rebuild_workers_count: 1000 # count of rebuild storage concurrent workers
|
|
||||||
|
|
||||||
shard:
|
shard:
|
||||||
default: # section with the default shard parameters
|
default: # section with the default shard parameters
|
||||||
|
@ -165,6 +164,7 @@ storage:
|
||||||
# disabled (do not work with the shard, allows to not remove it from the config)
|
# disabled (do not work with the shard, allows to not remove it from the config)
|
||||||
resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding
|
resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding
|
||||||
resync_metabase_worker_count: 100
|
resync_metabase_worker_count: 100
|
||||||
|
rebuild_worker_count: 1000 # count of rebuild storage concurrent workers
|
||||||
|
|
||||||
writecache:
|
writecache:
|
||||||
enabled: false
|
enabled: false
|
||||||
|
|
|
@ -189,6 +189,7 @@ The following table describes configuration for each shard.
|
||||||
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
|
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
|
||||||
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
|
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
|
||||||
| `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. |
|
| `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. |
|
||||||
|
| `rebuild_worker_count` | `int` | `5` | Count of concurrent workers to rebuild blobstore. |
|
||||||
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
|
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
|
||||||
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
|
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
|
||||||
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
||||||
|
|
|
@ -40,7 +40,6 @@ type StorageEngine struct {
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
evacuateLimiter *evacuationLimiter
|
evacuateLimiter *evacuationLimiter
|
||||||
rebuildLimiter *rebuildLimiter
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type shardWrapper struct {
|
type shardWrapper struct {
|
||||||
|
@ -213,16 +212,13 @@ type cfg struct {
|
||||||
|
|
||||||
lowMem bool
|
lowMem bool
|
||||||
|
|
||||||
rebuildWorkersCount uint32
|
|
||||||
|
|
||||||
containerSource atomic.Pointer[containerSource]
|
containerSource atomic.Pointer[containerSource]
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
res := &cfg{
|
res := &cfg{
|
||||||
log: &logger.Logger{Logger: zap.L()},
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
shardPoolSize: 20,
|
shardPoolSize: 20,
|
||||||
rebuildWorkersCount: 100,
|
|
||||||
}
|
}
|
||||||
res.containerSource.Store(&containerSource{})
|
res.containerSource.Store(&containerSource{})
|
||||||
return res
|
return res
|
||||||
|
@ -243,7 +239,6 @@ func New(opts ...Option) *StorageEngine {
|
||||||
closeCh: make(chan struct{}),
|
closeCh: make(chan struct{}),
|
||||||
setModeCh: make(chan setModeRequest),
|
setModeCh: make(chan setModeRequest),
|
||||||
evacuateLimiter: &evacuationLimiter{},
|
evacuateLimiter: &evacuationLimiter{},
|
||||||
rebuildLimiter: newRebuildLimiter(c.rebuildWorkersCount),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -282,13 +277,6 @@ func WithLowMemoryConsumption(lowMemCons bool) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithRebuildWorkersCount returns an option to set the count of concurrent rebuild workers.
|
|
||||||
func WithRebuildWorkersCount(count uint32) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.rebuildWorkersCount = count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetContainerSource sets container source.
|
// SetContainerSource sets container source.
|
||||||
func (e *StorageEngine) SetContainerSource(cs container.Source) {
|
func (e *StorageEngine) SetContainerSource(cs container.Source) {
|
||||||
e.containerSource.Store(&containerSource{cs: cs})
|
e.containerSource.Store(&containerSource{cs: cs})
|
||||||
|
|
|
@ -41,7 +41,7 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes
|
||||||
}
|
}
|
||||||
resGuard := &sync.Mutex{}
|
resGuard := &sync.Mutex{}
|
||||||
|
|
||||||
limiter := newRebuildLimiter(prm.ConcurrencyLimit)
|
limiter := shard.NewRebuildLimiter(prm.ConcurrencyLimit)
|
||||||
|
|
||||||
eg, egCtx := errgroup.WithContext(ctx)
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
for _, shardID := range prm.ShardIDs {
|
for _, shardID := range prm.ShardIDs {
|
||||||
|
|
|
@ -1,26 +0,0 @@
|
||||||
package engine
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
type rebuildLimiter struct {
|
|
||||||
semaphore chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newRebuildLimiter(workersCount uint32) *rebuildLimiter {
|
|
||||||
return &rebuildLimiter{
|
|
||||||
semaphore: make(chan struct{}, workersCount),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error {
|
|
||||||
select {
|
|
||||||
case l.semaphore <- struct{}{}:
|
|
||||||
return nil
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *rebuildLimiter) ReleaseWorkSlot() {
|
|
||||||
<-l.semaphore
|
|
||||||
}
|
|
|
@ -133,7 +133,6 @@ func (e *StorageEngine) createShard(_ context.Context, opts []shard.Option) (*sh
|
||||||
shard.WithExpiredLocksCallback(e.processExpiredLocks),
|
shard.WithExpiredLocksCallback(e.processExpiredLocks),
|
||||||
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
||||||
shard.WithReportErrorFunc(e.reportShardErrorBackground),
|
shard.WithReportErrorFunc(e.reportShardErrorBackground),
|
||||||
shard.WithRebuildWorkerLimiter(e.rebuildLimiter),
|
|
||||||
shard.WithZeroSizeCallback(e.processZeroSizeContainers),
|
shard.WithZeroSizeCallback(e.processZeroSizeContainers),
|
||||||
shard.WithZeroCountCallback(e.processZeroCountContainers),
|
shard.WithZeroCountCallback(e.processZeroCountContainers),
|
||||||
)...)
|
)...)
|
||||||
|
|
|
@ -129,7 +129,7 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
|
|
||||||
s.gc.init(ctx)
|
s.gc.init(ctx)
|
||||||
|
|
||||||
s.rb = newRebuilder(s.rebuildLimiter)
|
s.rb = newRebuilder(NewRebuildLimiter(s.rebuildWorkersCount))
|
||||||
if !m.NoMetabase() {
|
if !m.NoMetabase() {
|
||||||
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ type rebuildLimiter struct {
|
||||||
semaphore chan struct{}
|
semaphore chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRebuildLimiter(workersCount uint32) *rebuildLimiter {
|
func NewRebuildLimiter(workersCount uint32) RebuildWorkerLimiter {
|
||||||
return &rebuildLimiter{
|
return &rebuildLimiter{
|
||||||
semaphore: make(chan struct{}, workersCount),
|
semaphore: make(chan struct{}, workersCount),
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,7 +138,7 @@ type cfg struct {
|
||||||
|
|
||||||
reportErrorFunc func(selfID string, message string, err error)
|
reportErrorFunc func(selfID string, message string, err error)
|
||||||
|
|
||||||
rebuildLimiter RebuildWorkerLimiter
|
rebuildWorkersCount uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
|
@ -147,7 +147,7 @@ func defaultCfg() *cfg {
|
||||||
log: &logger.Logger{Logger: zap.L()},
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
gcCfg: defaultGCCfg(),
|
gcCfg: defaultGCCfg(),
|
||||||
reportErrorFunc: func(string, string, error) {},
|
reportErrorFunc: func(string, string, error) {},
|
||||||
rebuildLimiter: newRebuildLimiter(1),
|
rebuildWorkersCount: 1,
|
||||||
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
|
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
|
||||||
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
|
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
|
||||||
}
|
}
|
||||||
|
@ -380,11 +380,11 @@ func WithExpiredCollectorWorkerCount(count int) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithRebuildWorkerLimiter return option to set concurrent
|
// WithRebuildWorkersCount return option to set concurrent
|
||||||
// workers count of storage rebuild operation.
|
// workers count of storage rebuild operation.
|
||||||
func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option {
|
func WithRebuildWorkersCount(count uint32) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.rebuildLimiter = l
|
c.rebuildWorkersCount = count
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue