[#674] node: Configure size of per-shard worker pools
Add `shard_pool_size` config to `storage` section. Set app default to 20. Pass the value to `WithShardPoolSize` option. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
5b1975d52a
commit
2126235f0e
6 changed files with 35 additions and 3 deletions
|
@ -318,7 +318,10 @@ func (c *cfg) LocalAddress() network.AddressGroup {
|
||||||
func initLocalStorage(c *cfg) {
|
func initLocalStorage(c *cfg) {
|
||||||
initShardOptions(c)
|
initShardOptions(c)
|
||||||
|
|
||||||
engineOpts := []engine.Option{engine.WithLogger(c.log)}
|
engineOpts := []engine.Option{
|
||||||
|
engine.WithLogger(c.log),
|
||||||
|
engine.WithShardPoolSize(engineconfig.ShardPoolSize(c.appCfg)),
|
||||||
|
}
|
||||||
if c.metricsCollector != nil {
|
if c.metricsCollector != nil {
|
||||||
engineOpts = append(engineOpts, engine.WithMetrics(c.metricsCollector))
|
engineOpts = append(engineOpts, engine.WithMetrics(c.metricsCollector))
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,13 +7,21 @@ import (
|
||||||
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
|
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
subsection = "storage"
|
||||||
|
|
||||||
|
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
|
||||||
|
// process object PUT operations in storage engine.
|
||||||
|
ShardPoolSizeDefault = 20
|
||||||
|
)
|
||||||
|
|
||||||
// IterateShards iterates over subsections ["0":"N") (N - "shard_num" value)
|
// IterateShards iterates over subsections ["0":"N") (N - "shard_num" value)
|
||||||
// of "shard" subsection of "storage" section of c, wrap them into
|
// of "shard" subsection of "storage" section of c, wrap them into
|
||||||
// shardconfig.Config and passes to f.
|
// shardconfig.Config and passes to f.
|
||||||
//
|
//
|
||||||
// Panics if N is not a positive number.
|
// Panics if N is not a positive number.
|
||||||
func IterateShards(c *config.Config, f func(*shardconfig.Config)) {
|
func IterateShards(c *config.Config, f func(*shardconfig.Config)) {
|
||||||
c = c.Sub("storage")
|
c = c.Sub(subsection)
|
||||||
|
|
||||||
num := config.Uint(c, "shard_num")
|
num := config.Uint(c, "shard_num")
|
||||||
if num == 0 {
|
if num == 0 {
|
||||||
|
@ -32,3 +40,15 @@ func IterateShards(c *config.Config, f func(*shardconfig.Config)) {
|
||||||
f(sc)
|
f(sc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ShardPoolSize returns value of "shard_pool_size" config parameter from "storage" section.
|
||||||
|
//
|
||||||
|
// Returns ShardPoolSizeDefault if value is not a positive number.
|
||||||
|
func ShardPoolSize(c *config.Config) uint32 {
|
||||||
|
v := config.Uint32Safe(c.Sub(subsection), "shard_pool_size")
|
||||||
|
if v > 0 {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
return ShardPoolSizeDefault
|
||||||
|
}
|
||||||
|
|
|
@ -14,9 +14,13 @@ import (
|
||||||
|
|
||||||
func TestEngineSection(t *testing.T) {
|
func TestEngineSection(t *testing.T) {
|
||||||
t.Run("defaults", func(t *testing.T) {
|
t.Run("defaults", func(t *testing.T) {
|
||||||
|
empty := configtest.EmptyConfig()
|
||||||
|
|
||||||
require.Panics(t, func() {
|
require.Panics(t, func() {
|
||||||
engineconfig.IterateShards(configtest.EmptyConfig(), nil)
|
engineconfig.IterateShards(empty, nil)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
|
||||||
})
|
})
|
||||||
|
|
||||||
const path = "../../../../config/example/node"
|
const path = "../../../../config/example/node"
|
||||||
|
@ -24,6 +28,8 @@ func TestEngineSection(t *testing.T) {
|
||||||
var fileConfigTest = func(c *config.Config) {
|
var fileConfigTest = func(c *config.Config) {
|
||||||
num := 0
|
num := 0
|
||||||
|
|
||||||
|
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
|
||||||
|
|
||||||
engineconfig.IterateShards(c, func(sc *shardconfig.Config) {
|
engineconfig.IterateShards(c, func(sc *shardconfig.Config) {
|
||||||
defer func() {
|
defer func() {
|
||||||
num++
|
num++
|
||||||
|
|
|
@ -68,6 +68,7 @@ NEOFS_OBJECT_PUT_POOL_SIZE_REMOTE=100
|
||||||
NEOFS_OBJECT_PUT_POOL_SIZE_LOCAL=101
|
NEOFS_OBJECT_PUT_POOL_SIZE_LOCAL=101
|
||||||
|
|
||||||
# Storage engine section
|
# Storage engine section
|
||||||
|
NEOFS_STORAGE_SHARD_POOL_SIZE=15
|
||||||
NEOFS_STORAGE_SHARD_NUM=2
|
NEOFS_STORAGE_SHARD_NUM=2
|
||||||
## 0 shard
|
## 0 shard
|
||||||
### Flag to refill Metabase from BlobStor
|
### Flag to refill Metabase from BlobStor
|
||||||
|
|
|
@ -102,6 +102,7 @@
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"storage": {
|
"storage": {
|
||||||
|
"shard_pool_size": 15,
|
||||||
"shard_num": 2,
|
"shard_num": 2,
|
||||||
"shard": {
|
"shard": {
|
||||||
"0": {
|
"0": {
|
||||||
|
|
|
@ -88,6 +88,7 @@ object:
|
||||||
pool_size_local: 101 # number of async workers for local PUT operations
|
pool_size_local: 101 # number of async workers for local PUT operations
|
||||||
|
|
||||||
storage:
|
storage:
|
||||||
|
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
||||||
shard_num: 2 # total number of shards
|
shard_num: 2 # total number of shards
|
||||||
shard:
|
shard:
|
||||||
0:
|
0:
|
||||||
|
|
Loading…
Reference in a new issue