[#166] Use async pools in object service handlers

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-11-09 18:40:06 +03:00 committed by Alex Vanin
parent ded45e1fbc
commit eb13322bf0
2 changed files with 65 additions and 5 deletions

View file

@ -28,6 +28,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/services/util/response" "github.com/nspcc-dev/neofs-node/pkg/services/util/response"
"github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/nspcc-dev/neofs-node/pkg/util/profiler" "github.com/nspcc-dev/neofs-node/pkg/util/profiler"
"github.com/panjf2000/ants/v2"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
@ -93,6 +94,13 @@ const (
cfgReBootstrapEnabled = "bootstrap.periodic.enabled" cfgReBootstrapEnabled = "bootstrap.periodic.enabled"
cfgReBootstrapInterval = "bootstrap.periodic.interval" cfgReBootstrapInterval = "bootstrap.periodic.interval"
cfgObjectPutPoolSize = "pool.object.put.size"
cfgObjectGetPoolSize = "pool.object.get.size"
cfgObjectHeadPoolSize = "pool.object.head.size"
cfgObjectSearchPoolSize = "pool.object.search.size"
cfgObjectRangePoolSize = "pool.object.range.size"
cfgObjectRangeHashPoolSize = "pool.object.rangehash.size"
) )
const ( const (
@ -202,6 +210,12 @@ type cfgObject struct {
blobstorage bucket.Bucket blobstorage bucket.Bucket
cnrClient *wrapper.Wrapper cnrClient *wrapper.Wrapper
pool cfgObjectRoutines
}
type cfgObjectRoutines struct {
get, head, put, search, rng, rngHash *ants.Pool
} }
const ( const (
@ -274,6 +288,9 @@ func initCfg(path string) *cfg {
respSvc: response.NewService( respSvc: response.NewService(
response.WithNetworkState(state), response.WithNetworkState(state),
), ),
cfgObject: cfgObject{
pool: initObjectPool(viperCfg),
},
} }
initLocalStorage(c) initLocalStorage(c)
@ -349,6 +366,13 @@ func defaultConfiguration(v *viper.Viper) {
v.SetDefault(cfgReBootstrapEnabled, false) // in epochs v.SetDefault(cfgReBootstrapEnabled, false) // in epochs
v.SetDefault(cfgReBootstrapInterval, 2) // in epochs v.SetDefault(cfgReBootstrapInterval, 2) // in epochs
v.SetDefault(cfgObjectGetPoolSize, 10)
v.SetDefault(cfgObjectHeadPoolSize, 10)
v.SetDefault(cfgObjectPutPoolSize, 10)
v.SetDefault(cfgObjectSearchPoolSize, 10)
v.SetDefault(cfgObjectRangePoolSize, 10)
v.SetDefault(cfgObjectRangeHashPoolSize, 10)
} }
func (c *cfg) LocalAddress() *network.Address { func (c *cfg) LocalAddress() *network.Address {
@ -393,3 +417,39 @@ func initBucket(prefix string, c *cfg) (bucket bucket.Bucket, err error) {
return bucket, nil return bucket, nil
} }
func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) {
var err error
pool.get, err = ants.NewPool(cfg.GetInt(cfgObjectGetPoolSize))
if err != nil {
fatalOnErr(err)
}
pool.head, err = ants.NewPool(cfg.GetInt(cfgObjectHeadPoolSize))
if err != nil {
fatalOnErr(err)
}
pool.search, err = ants.NewPool(cfg.GetInt(cfgObjectSearchPoolSize))
if err != nil {
fatalOnErr(err)
}
pool.put, err = ants.NewPool(cfg.GetInt(cfgObjectPutPoolSize))
if err != nil {
fatalOnErr(err)
}
pool.rng, err = ants.NewPool(cfg.GetInt(cfgObjectRangePoolSize))
if err != nil {
fatalOnErr(err)
}
pool.rngHash, err = ants.NewPool(cfg.GetInt(cfgObjectRangeHashPoolSize))
if err != nil {
fatalOnErr(err)
}
return pool
}

View file

@ -36,7 +36,6 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/policer" "github.com/nspcc-dev/neofs-node/pkg/services/policer"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator" "github.com/nspcc-dev/neofs-node/pkg/services/replicator"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -253,6 +252,7 @@ func initObjectService(c *cfg) {
objectCore.WithDeleteHandler(c.cfgObject.metastorage), objectCore.WithDeleteHandler(c.cfgObject.metastorage),
), ),
putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithNetworkState(c.cfgNetmap.state),
putsvc.WithWorkerPool(c.cfgObject.pool.put),
) )
sPutV2 := putsvcV2.NewService( sPutV2 := putsvcV2.NewService(
@ -265,6 +265,7 @@ func initObjectService(c *cfg) {
searchsvc.WithContainerSource(c.cfgObject.cnrStorage), searchsvc.WithContainerSource(c.cfgObject.cnrStorage),
searchsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), searchsvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
searchsvc.WithLocalAddressSource(c), searchsvc.WithLocalAddressSource(c),
searchsvc.WithWorkerPool(c.cfgObject.pool.search),
) )
sSearchV2 := searchsvcV2.NewService( sSearchV2 := searchsvcV2.NewService(
@ -278,22 +279,20 @@ func initObjectService(c *cfg) {
headsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), headsvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
headsvc.WithLocalAddressSource(c), headsvc.WithLocalAddressSource(c),
headsvc.WithRightChildSearcher(searchsvc.NewRightChildSearcher(sSearch)), headsvc.WithRightChildSearcher(searchsvc.NewRightChildSearcher(sSearch)),
headsvc.WithWorkerPool(c.cfgObject.pool.head),
) )
sHeadV2 := headsvcV2.NewService( sHeadV2 := headsvcV2.NewService(
headsvcV2.WithInternalService(sHead), headsvcV2.WithInternalService(sHead),
) )
pool, err := ants.NewPool(10)
fatalOnErr(err)
sRange := rangesvc.NewService( sRange := rangesvc.NewService(
rangesvc.WithKeyStorage(keyStorage), rangesvc.WithKeyStorage(keyStorage),
rangesvc.WithLocalStorage(ls), rangesvc.WithLocalStorage(ls),
rangesvc.WithContainerSource(c.cfgObject.cnrStorage), rangesvc.WithContainerSource(c.cfgObject.cnrStorage),
rangesvc.WithNetworkMapSource(c.cfgObject.netMapStorage), rangesvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
rangesvc.WithLocalAddressSource(c), rangesvc.WithLocalAddressSource(c),
rangesvc.WithWorkerPool(pool), rangesvc.WithWorkerPool(c.cfgObject.pool.rng),
rangesvc.WithHeadService(sHead), rangesvc.WithHeadService(sHead),
) )
@ -317,6 +316,7 @@ func initObjectService(c *cfg) {
rangehashsvc.WithLocalAddressSource(c), rangehashsvc.WithLocalAddressSource(c),
rangehashsvc.WithHeadService(sHead), rangehashsvc.WithHeadService(sHead),
rangehashsvc.WithRangeService(sRange), rangehashsvc.WithRangeService(sRange),
rangehashsvc.WithWorkerPool(c.cfgObject.pool.rngHash),
) )
sRangeHashV2 := rangehashsvcV2.NewService( sRangeHashV2 := rangehashsvcV2.NewService(