[#1770] node: Support logger config rereading

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
remotes/fyrchik/neofs-adm-fix-commands
Pavel Karpy 2022-09-28 12:19:23 +03:00 committed by Pavel Karpy
parent 8c75cb1dad
commit b6806ea6b9
2 changed files with 123 additions and 63 deletions

View File

@ -54,7 +54,7 @@ func main() {
)
exitErr(err)
log, err := logger.NewLogger(logPrm)
log, err := logger.NewLogger(&logPrm)
exitErr(err)
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)

View File

@ -87,6 +87,10 @@ type applicationConfiguration struct {
// has already been read
_read bool
LoggerCfg struct {
level string
}
EngineCfg struct {
errorThreshold uint32
shardPoolSize uint32
@ -182,6 +186,12 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a._read = true
// Logger
a.LoggerCfg.level = loggerconfig.Level(c)
// Storage Engine
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
@ -346,10 +356,17 @@ type shared struct {
metricsCollector *metrics.NodeMetrics
}
// dynamicConfiguration stores parameters of the
// components that supports runtime reconfigurations
type dynamicConfiguration struct {
logger *logger.Prm
}
type cfg struct {
applicationConfiguration
internals
shared
dynamicConfiguration
// configuration of the internal
// services
@ -483,16 +500,19 @@ type cfgReputation struct {
var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block")
func initCfg(appCfg *config.Config) *cfg {
c := &cfg{}
err := c.readConfig(appCfg)
if err != nil {
panic(fmt.Errorf("config reading: %w", err))
}
key := nodeconfig.Key(appCfg)
var logPrm logger.Prm
err := logPrm.SetLevelString(
loggerconfig.Level(appCfg),
)
logPrm, err := c.loggerPrm()
fatalOnErr(err)
log, err := logger.NewLogger(&logPrm)
log, err := logger.NewLogger(logPrm)
fatalOnErr(err)
var netAddr network.AddressGroup
@ -519,63 +539,55 @@ func initCfg(appCfg *config.Config) *cfg {
reputationWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
c := &cfg{
internals: internals{
ctx: context.Background(),
appCfg: appCfg,
internalErr: make(chan error),
log: log,
wg: new(sync.WaitGroup),
apiVersion: version.Current(),
healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)),
},
shared: shared{
key: key,
binPublicKey: key.PublicKey().Bytes(),
localAddr: netAddr,
respSvc: response.NewService(response.WithNetworkState(netState)),
clientCache: cache.NewSDKClientCache(cache.ClientCacheOpts{
DialTimeout: apiclientconfig.DialTimeout(appCfg),
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
Key: &key.PrivateKey,
AllowExternal: apiclientconfig.AllowExternal(appCfg),
}),
persistate: persistate,
},
cfgAccounting: cfgAccounting{
scriptHash: contractsconfig.Balance(appCfg),
},
cfgContainer: cfgContainer{
scriptHash: contractsconfig.Container(appCfg),
workerPool: containerWorkerPool,
},
cfgNetmap: cfgNetmap{
scriptHash: contractsconfig.Netmap(appCfg),
state: netState,
workerPool: netmapWorkerPool,
needBootstrap: !relayOnly,
reBoostrapTurnedOff: atomic.NewBool(relayOnly),
},
cfgGRPC: cfgGRPC{
maxChunkSize: maxChunkSize,
maxAddrAmount: maxAddrAmount,
},
cfgMorph: cfgMorph{
proxyScriptHash: contractsconfig.Proxy(appCfg),
},
cfgObject: cfgObject{
pool: initObjectPool(appCfg),
},
cfgReputation: cfgReputation{
scriptHash: contractsconfig.Reputation(appCfg),
workerPool: reputationWorkerPool,
},
c.internals = internals{
ctx: context.Background(),
appCfg: appCfg,
internalErr: make(chan error),
log: log,
wg: new(sync.WaitGroup),
apiVersion: version.Current(),
healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)),
}
// returned err must be nil during first time read
err = c.readConfig(appCfg)
if err != nil {
panic(fmt.Errorf("config reading: %w", err))
c.shared = shared{
key: key,
binPublicKey: key.PublicKey().Bytes(),
localAddr: netAddr,
respSvc: response.NewService(response.WithNetworkState(netState)),
clientCache: cache.NewSDKClientCache(cache.ClientCacheOpts{
DialTimeout: apiclientconfig.DialTimeout(appCfg),
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
Key: &key.PrivateKey,
AllowExternal: apiclientconfig.AllowExternal(appCfg),
}),
persistate: persistate,
}
c.cfgAccounting = cfgAccounting{
scriptHash: contractsconfig.Balance(appCfg),
}
c.cfgContainer = cfgContainer{
scriptHash: contractsconfig.Container(appCfg),
workerPool: containerWorkerPool,
}
c.cfgNetmap = cfgNetmap{
scriptHash: contractsconfig.Netmap(appCfg),
state: netState,
workerPool: netmapWorkerPool,
needBootstrap: !relayOnly,
reBoostrapTurnedOff: atomic.NewBool(relayOnly),
}
c.cfgGRPC = cfgGRPC{
maxChunkSize: maxChunkSize,
maxAddrAmount: maxAddrAmount,
}
c.cfgMorph = cfgMorph{
proxyScriptHash: contractsconfig.Proxy(appCfg),
}
c.cfgObject = cfgObject{
pool: initObjectPool(appCfg),
}
c.cfgReputation = cfgReputation{
scriptHash: contractsconfig.Reputation(appCfg),
workerPool: reputationWorkerPool,
}
user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey)
@ -721,6 +733,22 @@ func (c *cfg) shardOpts() []shardOptsWithID {
return shards
}
func (c *cfg) loggerPrm() (*logger.Prm, error) {
// check if it has been inited before
if c.dynamicConfiguration.logger == nil {
c.dynamicConfiguration.logger = new(logger.Prm)
}
// (re)init read configuration
err := c.dynamicConfiguration.logger.SetLevelString(c.LoggerCfg.level)
if err != nil {
// not expected since validation should be performed before
panic(fmt.Sprintf("incorrect log level format: %s", c.LoggerCfg.level))
}
return c.dynamicConfiguration.logger, nil
}
func (c *cfg) LocalAddress() network.AddressGroup {
return c.localAddr
}
@ -829,6 +857,13 @@ func (c *cfg) ObjectServiceLoad() float64 {
return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity)
}
type dCfg struct {
name string
cfg interface {
Reload() error
}
}
func (c *cfg) configWatcher(ctx context.Context) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGHUP)
@ -844,6 +879,22 @@ func (c *cfg) configWatcher(ctx context.Context) {
continue
}
// all the components are expected to support
// Logger's dynamic reconfiguration approach
var components []dCfg
// Logger
logPrm, err := c.loggerPrm()
if err != nil {
c.log.Error("logger configuration preparation", zap.Error(err))
continue
}
components = append(components, dCfg{name: "logger", cfg: logPrm})
// Storage Engine
var rcfg engine.ReConfiguration
for _, optsWithID := range c.shardOpts() {
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts)
@ -855,6 +906,15 @@ func (c *cfg) configWatcher(ctx context.Context) {
continue
}
for _, component := range components {
err = component.cfg.Reload()
if err != nil {
c.log.Error("updated configuration applying",
zap.String("component", component.name),
zap.Error(err))
}
}
c.log.Info("configuration has been reloaded successfully")
case <-ctx.Done():
return