diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 3f0ca72a..6c090bb5 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -28,6 +28,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/util/response" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/profiler" + "github.com/panjf2000/ants/v2" "github.com/pkg/errors" "github.com/spf13/viper" "go.etcd.io/bbolt" @@ -93,6 +94,13 @@ const ( cfgReBootstrapEnabled = "bootstrap.periodic.enabled" cfgReBootstrapInterval = "bootstrap.periodic.interval" + + cfgObjectPutPoolSize = "pool.object.put.size" + cfgObjectGetPoolSize = "pool.object.get.size" + cfgObjectHeadPoolSize = "pool.object.head.size" + cfgObjectSearchPoolSize = "pool.object.search.size" + cfgObjectRangePoolSize = "pool.object.range.size" + cfgObjectRangeHashPoolSize = "pool.object.rangehash.size" ) const ( @@ -202,6 +210,12 @@ type cfgObject struct { blobstorage bucket.Bucket cnrClient *wrapper.Wrapper + + pool cfgObjectRoutines +} + +type cfgObjectRoutines struct { + get, head, put, search, rng, rngHash *ants.Pool } const ( @@ -274,6 +288,9 @@ func initCfg(path string) *cfg { respSvc: response.NewService( response.WithNetworkState(state), ), + cfgObject: cfgObject{ + pool: initObjectPool(viperCfg), + }, } initLocalStorage(c) @@ -349,6 +366,13 @@ func defaultConfiguration(v *viper.Viper) { v.SetDefault(cfgReBootstrapEnabled, false) // in epochs v.SetDefault(cfgReBootstrapInterval, 2) // in epochs + + v.SetDefault(cfgObjectGetPoolSize, 10) + v.SetDefault(cfgObjectHeadPoolSize, 10) + v.SetDefault(cfgObjectPutPoolSize, 10) + v.SetDefault(cfgObjectSearchPoolSize, 10) + v.SetDefault(cfgObjectRangePoolSize, 10) + v.SetDefault(cfgObjectRangeHashPoolSize, 10) } func (c *cfg) LocalAddress() *network.Address { @@ -393,3 +417,39 @@ func initBucket(prefix string, c *cfg) (bucket bucket.Bucket, err error) { return bucket, nil } + +func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) { + var err error + + pool.get, err = ants.NewPool(cfg.GetInt(cfgObjectGetPoolSize)) + if err != nil { + fatalOnErr(err) + } + + pool.head, err = ants.NewPool(cfg.GetInt(cfgObjectHeadPoolSize)) + if err != nil { + fatalOnErr(err) + } + + pool.search, err = ants.NewPool(cfg.GetInt(cfgObjectSearchPoolSize)) + if err != nil { + fatalOnErr(err) + } + + pool.put, err = ants.NewPool(cfg.GetInt(cfgObjectPutPoolSize)) + if err != nil { + fatalOnErr(err) + } + + pool.rng, err = ants.NewPool(cfg.GetInt(cfgObjectRangePoolSize)) + if err != nil { + fatalOnErr(err) + } + + pool.rngHash, err = ants.NewPool(cfg.GetInt(cfgObjectRangeHashPoolSize)) + if err != nil { + fatalOnErr(err) + } + + return pool +} diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 14e605ce..2c52e2f7 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -36,7 +36,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/policer" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" - "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -253,6 +252,7 @@ func initObjectService(c *cfg) { objectCore.WithDeleteHandler(c.cfgObject.metastorage), ), putsvc.WithNetworkState(c.cfgNetmap.state), + putsvc.WithWorkerPool(c.cfgObject.pool.put), ) sPutV2 := putsvcV2.NewService( @@ -265,6 +265,7 @@ func initObjectService(c *cfg) { searchsvc.WithContainerSource(c.cfgObject.cnrStorage), searchsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), searchsvc.WithLocalAddressSource(c), + searchsvc.WithWorkerPool(c.cfgObject.pool.search), ) sSearchV2 := searchsvcV2.NewService( @@ -278,22 +279,20 @@ func initObjectService(c *cfg) { headsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), headsvc.WithLocalAddressSource(c), headsvc.WithRightChildSearcher(searchsvc.NewRightChildSearcher(sSearch)), + headsvc.WithWorkerPool(c.cfgObject.pool.head), ) sHeadV2 := headsvcV2.NewService( headsvcV2.WithInternalService(sHead), ) - pool, err := ants.NewPool(10) - fatalOnErr(err) - sRange := rangesvc.NewService( rangesvc.WithKeyStorage(keyStorage), rangesvc.WithLocalStorage(ls), rangesvc.WithContainerSource(c.cfgObject.cnrStorage), rangesvc.WithNetworkMapSource(c.cfgObject.netMapStorage), rangesvc.WithLocalAddressSource(c), - rangesvc.WithWorkerPool(pool), + rangesvc.WithWorkerPool(c.cfgObject.pool.rng), rangesvc.WithHeadService(sHead), ) @@ -317,6 +316,7 @@ func initObjectService(c *cfg) { rangehashsvc.WithLocalAddressSource(c), rangehashsvc.WithHeadService(sHead), rangehashsvc.WithRangeService(sRange), + rangehashsvc.WithWorkerPool(c.cfgObject.pool.rngHash), ) sRangeHashV2 := rangehashsvcV2.NewService(