From b6806ea6b9e39a1be70b7bf677ca601456161139 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 28 Sep 2022 12:19:23 +0300 Subject: [PATCH] [#1770] node: Support logger config rereading Signed-off-by: Pavel Karpy --- cmd/neofs-ir/main.go | 2 +- cmd/neofs-node/config.go | 184 ++++++++++++++++++++++++++------------- 2 files changed, 123 insertions(+), 63 deletions(-) diff --git a/cmd/neofs-ir/main.go b/cmd/neofs-ir/main.go index 7274d2c9c4..a640858846 100644 --- a/cmd/neofs-ir/main.go +++ b/cmd/neofs-ir/main.go @@ -54,7 +54,7 @@ func main() { ) exitErr(err) - log, err := logger.NewLogger(logPrm) + log, err := logger.NewLogger(&logPrm) exitErr(err) ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 38255bf506..e23dd26162 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -87,6 +87,10 @@ type applicationConfiguration struct { // has already been read _read bool + LoggerCfg struct { + level string + } + EngineCfg struct { errorThreshold uint32 shardPoolSize uint32 @@ -182,6 +186,12 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { a._read = true + // Logger + + a.LoggerCfg.level = loggerconfig.Level(c) + + // Storage Engine + a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) @@ -346,10 +356,17 @@ type shared struct { metricsCollector *metrics.NodeMetrics } +// dynamicConfiguration stores parameters of the +// components that supports runtime reconfigurations +type dynamicConfiguration struct { + logger *logger.Prm +} + type cfg struct { applicationConfiguration internals shared + dynamicConfiguration // configuration of the internal // services @@ -483,16 +500,19 @@ type cfgReputation struct { var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block") func initCfg(appCfg *config.Config) *cfg { + c := &cfg{} + + err := c.readConfig(appCfg) + if err != nil { + panic(fmt.Errorf("config reading: %w", err)) + } + key := nodeconfig.Key(appCfg) - var logPrm logger.Prm - - err := logPrm.SetLevelString( - loggerconfig.Level(appCfg), - ) + logPrm, err := c.loggerPrm() fatalOnErr(err) - log, err := logger.NewLogger(&logPrm) + log, err := logger.NewLogger(logPrm) fatalOnErr(err) var netAddr network.AddressGroup @@ -519,63 +539,55 @@ func initCfg(appCfg *config.Config) *cfg { reputationWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) - c := &cfg{ - internals: internals{ - ctx: context.Background(), - appCfg: appCfg, - internalErr: make(chan error), - log: log, - wg: new(sync.WaitGroup), - apiVersion: version.Current(), - healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)), - }, - shared: shared{ - key: key, - binPublicKey: key.PublicKey().Bytes(), - localAddr: netAddr, - respSvc: response.NewService(response.WithNetworkState(netState)), - clientCache: cache.NewSDKClientCache(cache.ClientCacheOpts{ - DialTimeout: apiclientconfig.DialTimeout(appCfg), - StreamTimeout: apiclientconfig.StreamTimeout(appCfg), - Key: &key.PrivateKey, - AllowExternal: apiclientconfig.AllowExternal(appCfg), - }), - persistate: persistate, - }, - cfgAccounting: cfgAccounting{ - scriptHash: contractsconfig.Balance(appCfg), - }, - cfgContainer: cfgContainer{ - scriptHash: contractsconfig.Container(appCfg), - workerPool: containerWorkerPool, - }, - cfgNetmap: cfgNetmap{ - scriptHash: contractsconfig.Netmap(appCfg), - state: netState, - workerPool: netmapWorkerPool, - needBootstrap: !relayOnly, - reBoostrapTurnedOff: atomic.NewBool(relayOnly), - }, - cfgGRPC: cfgGRPC{ - maxChunkSize: maxChunkSize, - maxAddrAmount: maxAddrAmount, - }, - cfgMorph: cfgMorph{ - proxyScriptHash: contractsconfig.Proxy(appCfg), - }, - cfgObject: cfgObject{ - pool: initObjectPool(appCfg), - }, - cfgReputation: cfgReputation{ - scriptHash: contractsconfig.Reputation(appCfg), - workerPool: reputationWorkerPool, - }, + c.internals = internals{ + ctx: context.Background(), + appCfg: appCfg, + internalErr: make(chan error), + log: log, + wg: new(sync.WaitGroup), + apiVersion: version.Current(), + healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)), } - - // returned err must be nil during first time read - err = c.readConfig(appCfg) - if err != nil { - panic(fmt.Errorf("config reading: %w", err)) + c.shared = shared{ + key: key, + binPublicKey: key.PublicKey().Bytes(), + localAddr: netAddr, + respSvc: response.NewService(response.WithNetworkState(netState)), + clientCache: cache.NewSDKClientCache(cache.ClientCacheOpts{ + DialTimeout: apiclientconfig.DialTimeout(appCfg), + StreamTimeout: apiclientconfig.StreamTimeout(appCfg), + Key: &key.PrivateKey, + AllowExternal: apiclientconfig.AllowExternal(appCfg), + }), + persistate: persistate, + } + c.cfgAccounting = cfgAccounting{ + scriptHash: contractsconfig.Balance(appCfg), + } + c.cfgContainer = cfgContainer{ + scriptHash: contractsconfig.Container(appCfg), + workerPool: containerWorkerPool, + } + c.cfgNetmap = cfgNetmap{ + scriptHash: contractsconfig.Netmap(appCfg), + state: netState, + workerPool: netmapWorkerPool, + needBootstrap: !relayOnly, + reBoostrapTurnedOff: atomic.NewBool(relayOnly), + } + c.cfgGRPC = cfgGRPC{ + maxChunkSize: maxChunkSize, + maxAddrAmount: maxAddrAmount, + } + c.cfgMorph = cfgMorph{ + proxyScriptHash: contractsconfig.Proxy(appCfg), + } + c.cfgObject = cfgObject{ + pool: initObjectPool(appCfg), + } + c.cfgReputation = cfgReputation{ + scriptHash: contractsconfig.Reputation(appCfg), + workerPool: reputationWorkerPool, } user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey) @@ -721,6 +733,22 @@ func (c *cfg) shardOpts() []shardOptsWithID { return shards } +func (c *cfg) loggerPrm() (*logger.Prm, error) { + // check if it has been inited before + if c.dynamicConfiguration.logger == nil { + c.dynamicConfiguration.logger = new(logger.Prm) + } + + // (re)init read configuration + err := c.dynamicConfiguration.logger.SetLevelString(c.LoggerCfg.level) + if err != nil { + // not expected since validation should be performed before + panic(fmt.Sprintf("incorrect log level format: %s", c.LoggerCfg.level)) + } + + return c.dynamicConfiguration.logger, nil +} + func (c *cfg) LocalAddress() network.AddressGroup { return c.localAddr } @@ -829,6 +857,13 @@ func (c *cfg) ObjectServiceLoad() float64 { return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity) } +type dCfg struct { + name string + cfg interface { + Reload() error + } +} + func (c *cfg) configWatcher(ctx context.Context) { ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGHUP) @@ -844,6 +879,22 @@ func (c *cfg) configWatcher(ctx context.Context) { continue } + // all the components are expected to support + // Logger's dynamic reconfiguration approach + var components []dCfg + + // Logger + + logPrm, err := c.loggerPrm() + if err != nil { + c.log.Error("logger configuration preparation", zap.Error(err)) + continue + } + + components = append(components, dCfg{name: "logger", cfg: logPrm}) + + // Storage Engine + var rcfg engine.ReConfiguration for _, optsWithID := range c.shardOpts() { rcfg.AddShard(optsWithID.configID, optsWithID.shOpts) @@ -855,6 +906,15 @@ func (c *cfg) configWatcher(ctx context.Context) { continue } + for _, component := range components { + err = component.cfg.Reload() + if err != nil { + c.log.Error("updated configuration applying", + zap.String("component", component.name), + zap.Error(err)) + } + } + c.log.Info("configuration has been reloaded successfully") case <-ctx.Done(): return