[#476] cmd/cfg: Add worker pools

Add worker pools to Netmap and Container
config structures. Add its initialization
that depends on environmental variables(
sync/async; worker pool size).

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2021-04-13 15:15:36 +03:00 committed by Alex Vanin
parent 9a961e21b1
commit 7cd4e409eb

View file

@ -89,10 +89,14 @@ const (
// config keys for cfgNetmap // config keys for cfgNetmap
cfgNetmapContract = "netmap.scripthash" cfgNetmapContract = "netmap.scripthash"
cfgNetmapFee = "netmap.fee" cfgNetmapFee = "netmap.fee"
cfgNetmapWorkerPoolEnabled = "netmap.async_worker.enabled"
cfgNetmapWorkerPoolSize = "netmap.async_worker.size"
// config keys for cfgContainer // config keys for cfgContainer
cfgContainerContract = "container.scripthash" cfgContainerContract = "container.scripthash"
cfgContainerFee = "container.fee" cfgContainerFee = "container.fee"
cfgContainerWorkerPoolEnabled = "container.async_worker.enabled"
cfgContainerWorkerPoolSize = "container.async_worker.size"
cfgGCQueueSize = "gc.queuesize" cfgGCQueueSize = "gc.queuesize"
cfgGCQueueTick = "gc.duration.sleep" cfgGCQueueTick = "gc.duration.sleep"
@ -237,6 +241,7 @@ type cfgContainer struct {
parsers map[event.Type]event.Parser parsers map[event.Type]event.Parser
subscribers map[event.Type][]event.Handler subscribers map[event.Type][]event.Handler
workerPool util2.WorkerPool // pool for asynchronous handlers
} }
type cfgNetmap struct { type cfgNetmap struct {
@ -248,6 +253,7 @@ type cfgNetmap struct {
parsers map[event.Type]event.Parser parsers map[event.Type]event.Parser
subscribers map[event.Type][]event.Handler subscribers map[event.Type][]event.Handler
workerPool util2.WorkerPool // pool for asynchronous handlers
state *networkState state *networkState
@ -336,6 +342,13 @@ func initCfg(path string) *cfg {
state := newNetworkState() state := newNetworkState()
// initialize async workers if it is configured so
containerWorkerPool, err := initContainerWorkerPool(viperCfg)
fatalOnErr(err)
netmapWorkerPool, err := initNetmapWorkerPool(viperCfg)
fatalOnErr(err)
c := &cfg{ c := &cfg{
ctx: context.Background(), ctx: context.Background(),
internalErr: make(chan error), internalErr: make(chan error),
@ -351,11 +364,13 @@ func initCfg(path string) *cfg {
cfgContainer: cfgContainer{ cfgContainer: cfgContainer{
scriptHash: u160Container, scriptHash: u160Container,
fee: fixedn.Fixed8(viperCfg.GetInt(cfgContainerFee)), fee: fixedn.Fixed8(viperCfg.GetInt(cfgContainerFee)),
workerPool: containerWorkerPool,
}, },
cfgNetmap: cfgNetmap{ cfgNetmap: cfgNetmap{
scriptHash: u160Netmap, scriptHash: u160Netmap,
fee: fixedn.Fixed8(viperCfg.GetInt(cfgNetmapFee)), fee: fixedn.Fixed8(viperCfg.GetInt(cfgNetmapFee)),
state: state, state: state,
workerPool: netmapWorkerPool,
reBootstrapInterval: viperCfg.GetUint64(cfgReBootstrapInterval), reBootstrapInterval: viperCfg.GetUint64(cfgReBootstrapInterval),
reBootstrapEnabled: viperCfg.GetBool(cfgReBootstrapEnabled), reBootstrapEnabled: viperCfg.GetBool(cfgReBootstrapEnabled),
goOfflineEnabled: viperCfg.GetBool(cfgShutdownOfflineEnabled), goOfflineEnabled: viperCfg.GetBool(cfgShutdownOfflineEnabled),
@ -428,9 +443,13 @@ func defaultConfiguration(v *viper.Viper) {
v.SetDefault(cfgContainerContract, "9d2ca84d7fb88213c4baced5a6ed4dc402309039") v.SetDefault(cfgContainerContract, "9d2ca84d7fb88213c4baced5a6ed4dc402309039")
v.SetDefault(cfgContainerFee, "1") v.SetDefault(cfgContainerFee, "1")
v.SetDefault(cfgContainerWorkerPoolEnabled, true)
v.SetDefault(cfgContainerWorkerPoolSize, 10)
v.SetDefault(cfgNetmapContract, "75194459637323ea8837d2afe8225ec74a5658c3") v.SetDefault(cfgNetmapContract, "75194459637323ea8837d2afe8225ec74a5658c3")
v.SetDefault(cfgNetmapFee, "1") v.SetDefault(cfgNetmapFee, "1")
v.SetDefault(cfgNetmapWorkerPoolEnabled, true)
v.SetDefault(cfgNetmapWorkerPoolSize, 10)
v.SetDefault(cfgLogLevel, "info") v.SetDefault(cfgLogLevel, "info")
v.SetDefault(cfgLogFormat, "console") v.SetDefault(cfgLogFormat, "console")
@ -716,6 +735,26 @@ func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) {
return pool return pool
} }
func initNetmapWorkerPool(v *viper.Viper) (util2.WorkerPool, error) {
if v.GetBool(cfgNetmapWorkerPoolEnabled) {
// return async worker pool
return ants.NewPool(v.GetInt(cfgNetmapWorkerPoolSize))
}
// return sync worker pool
return util2.SyncWorkerPool{}, nil
}
func initContainerWorkerPool(v *viper.Viper) (util2.WorkerPool, error) {
if v.GetBool(cfgContainerWorkerPoolEnabled) {
// return async worker pool
return ants.NewPool(v.GetInt(cfgContainerWorkerPoolSize))
}
// return sync worker pool
return util2.SyncWorkerPool{}, nil
}
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) { func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
ni := c.localNodeInfo() ni := c.localNodeInfo()
return ni.ToV2(), nil return ni.ToV2(), nil