From 7cd4e409eb437c78bd454e1e2db2d56c967119b3 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 13 Apr 2021 15:15:36 +0300 Subject: [PATCH] [#476] cmd/cfg: Add worker pools Add worker pools to Netmap and Container config structures. Add its initialization that depends on environmental variables( sync/async; worker pool size). Signed-off-by: Pavel Karpy --- cmd/neofs-node/config.go | 47 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 42210bca8..8e4983f46 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -87,12 +87,16 @@ const ( cfgAccountingFee = "accounting.fee" // config keys for cfgNetmap - cfgNetmapContract = "netmap.scripthash" - cfgNetmapFee = "netmap.fee" + cfgNetmapContract = "netmap.scripthash" + cfgNetmapFee = "netmap.fee" + cfgNetmapWorkerPoolEnabled = "netmap.async_worker.enabled" + cfgNetmapWorkerPoolSize = "netmap.async_worker.size" // config keys for cfgContainer - cfgContainerContract = "container.scripthash" - cfgContainerFee = "container.fee" + cfgContainerContract = "container.scripthash" + cfgContainerFee = "container.fee" + cfgContainerWorkerPoolEnabled = "container.async_worker.enabled" + cfgContainerWorkerPoolSize = "container.async_worker.size" cfgGCQueueSize = "gc.queuesize" cfgGCQueueTick = "gc.duration.sleep" @@ -237,6 +241,7 @@ type cfgContainer struct { parsers map[event.Type]event.Parser subscribers map[event.Type][]event.Handler + workerPool util2.WorkerPool // pool for asynchronous handlers } type cfgNetmap struct { @@ -248,6 +253,7 @@ type cfgNetmap struct { parsers map[event.Type]event.Parser subscribers map[event.Type][]event.Handler + workerPool util2.WorkerPool // pool for asynchronous handlers state *networkState @@ -336,6 +342,13 @@ func initCfg(path string) *cfg { state := newNetworkState() + // initialize async workers if it is configured so + containerWorkerPool, err := initContainerWorkerPool(viperCfg) + fatalOnErr(err) + + netmapWorkerPool, err := initNetmapWorkerPool(viperCfg) + fatalOnErr(err) + c := &cfg{ ctx: context.Background(), internalErr: make(chan error), @@ -351,11 +364,13 @@ func initCfg(path string) *cfg { cfgContainer: cfgContainer{ scriptHash: u160Container, fee: fixedn.Fixed8(viperCfg.GetInt(cfgContainerFee)), + workerPool: containerWorkerPool, }, cfgNetmap: cfgNetmap{ scriptHash: u160Netmap, fee: fixedn.Fixed8(viperCfg.GetInt(cfgNetmapFee)), state: state, + workerPool: netmapWorkerPool, reBootstrapInterval: viperCfg.GetUint64(cfgReBootstrapInterval), reBootstrapEnabled: viperCfg.GetBool(cfgReBootstrapEnabled), goOfflineEnabled: viperCfg.GetBool(cfgShutdownOfflineEnabled), @@ -428,9 +443,13 @@ func defaultConfiguration(v *viper.Viper) { v.SetDefault(cfgContainerContract, "9d2ca84d7fb88213c4baced5a6ed4dc402309039") v.SetDefault(cfgContainerFee, "1") + v.SetDefault(cfgContainerWorkerPoolEnabled, true) + v.SetDefault(cfgContainerWorkerPoolSize, 10) v.SetDefault(cfgNetmapContract, "75194459637323ea8837d2afe8225ec74a5658c3") v.SetDefault(cfgNetmapFee, "1") + v.SetDefault(cfgNetmapWorkerPoolEnabled, true) + v.SetDefault(cfgNetmapWorkerPoolSize, 10) v.SetDefault(cfgLogLevel, "info") v.SetDefault(cfgLogFormat, "console") @@ -716,6 +735,26 @@ func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) { return pool } +func initNetmapWorkerPool(v *viper.Viper) (util2.WorkerPool, error) { + if v.GetBool(cfgNetmapWorkerPoolEnabled) { + // return async worker pool + return ants.NewPool(v.GetInt(cfgNetmapWorkerPoolSize)) + } + + // return sync worker pool + return util2.SyncWorkerPool{}, nil +} + +func initContainerWorkerPool(v *viper.Viper) (util2.WorkerPool, error) { + if v.GetBool(cfgContainerWorkerPoolEnabled) { + // return async worker pool + return ants.NewPool(v.GetInt(cfgContainerWorkerPoolSize)) + } + + // return sync worker pool + return util2.SyncWorkerPool{}, nil +} + func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) { ni := c.localNodeInfo() return ni.ToV2(), nil