forked from TrueCloudLab/frostfs-node
[#1770] node: Support configuration reread on SIGHUP
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
887afeaddb
commit
6fc3268ebf
2 changed files with 55 additions and 8 deletions
|
@ -6,8 +6,11 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"net"
|
"net"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
"sync"
|
"sync"
|
||||||
atomicstd "sync/atomic"
|
atomicstd "sync/atomic"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
@ -562,8 +565,13 @@ func (c *cfg) engineOpts() []engine.Option {
|
||||||
return opts
|
return opts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) shardOpts() [][]shard.Option {
|
type shardOptsWithMetaPath struct {
|
||||||
oo := make([][]shard.Option, 0, len(c.EngineCfg.shards))
|
metaPath string
|
||||||
|
shOpts []shard.Option
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cfg) shardOpts() []shardOptsWithMetaPath {
|
||||||
|
shards := make([]shardOptsWithMetaPath, 0, len(c.EngineCfg.shards))
|
||||||
|
|
||||||
for _, shCfg := range c.EngineCfg.shards {
|
for _, shCfg := range c.EngineCfg.shards {
|
||||||
var writeCacheOpts []writecache.Option
|
var writeCacheOpts []writecache.Option
|
||||||
|
@ -626,7 +634,9 @@ func (c *cfg) shardOpts() [][]shard.Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
oo = append(oo, []shard.Option{
|
var sh shardOptsWithMetaPath
|
||||||
|
sh.metaPath = shCfg.metaCfg.path
|
||||||
|
sh.shOpts = []shard.Option{
|
||||||
shard.WithLogger(c.log),
|
shard.WithLogger(c.log),
|
||||||
shard.WithRefillMetabase(shCfg.refillMetabase),
|
shard.WithRefillMetabase(shCfg.refillMetabase),
|
||||||
shard.WithMode(shCfg.mode),
|
shard.WithMode(shCfg.mode),
|
||||||
|
@ -660,10 +670,12 @@ func (c *cfg) shardOpts() [][]shard.Option {
|
||||||
|
|
||||||
return pool
|
return pool
|
||||||
}),
|
}),
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return oo
|
shards = append(shards, sh)
|
||||||
|
}
|
||||||
|
|
||||||
|
return shards
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) LocalAddress() network.AddressGroup {
|
func (c *cfg) LocalAddress() network.AddressGroup {
|
||||||
|
@ -690,8 +702,8 @@ func initLocalStorage(c *cfg) {
|
||||||
tombstone.WithTombstoneSource(tombstoneSrc),
|
tombstone.WithTombstoneSource(tombstoneSrc),
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, opts := range c.shardOpts() {
|
for _, optsWithMeta := range c.shardOpts() {
|
||||||
id, err := ls.AddShard(append(opts, shard.WithTombstoneSource(tombstoneSource))...)
|
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
c.log.Info("shard attached to engine",
|
c.log.Info("shard attached to engine",
|
||||||
|
@ -773,3 +785,36 @@ func (c *cfg) needBootstrap() bool {
|
||||||
func (c *cfg) ObjectServiceLoad() float64 {
|
func (c *cfg) ObjectServiceLoad() float64 {
|
||||||
return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity)
|
return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *cfg) configWatcher(ctx context.Context) {
|
||||||
|
ch := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(ch, syscall.SIGHUP)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
c.log.Info("SIGHUP has been received, rereading configuration...")
|
||||||
|
|
||||||
|
err := c.readConfig(c.appCfg)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Error("configuration reading", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var rcfg engine.ReConfiguration
|
||||||
|
for _, optsWithMeta := range c.shardOpts() {
|
||||||
|
rcfg.AddShard(optsWithMeta.metaPath, optsWithMeta.shOpts)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Error("storage engine configuration update", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
c.log.Info("configuration has been reloaded successfully")
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -81,7 +81,7 @@ func initAndLog(c *cfg, name string, initializer func(*cfg)) {
|
||||||
func initApp(c *cfg) {
|
func initApp(c *cfg) {
|
||||||
initLocalStorage(c)
|
initLocalStorage(c)
|
||||||
|
|
||||||
c.ctx, c.ctxCancel = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
|
c.ctx, c.ctxCancel = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|
||||||
initAndLog(c, "storage engine", func(c *cfg) {
|
initAndLog(c, "storage engine", func(c *cfg) {
|
||||||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open())
|
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open())
|
||||||
|
@ -102,6 +102,8 @@ func initApp(c *cfg) {
|
||||||
initAndLog(c, "control", initControlService)
|
initAndLog(c, "control", initControlService)
|
||||||
|
|
||||||
initAndLog(c, "morph notifications", listenMorphNotifications)
|
initAndLog(c, "morph notifications", listenMorphNotifications)
|
||||||
|
|
||||||
|
c.workers = append(c.workers, newWorkerFromFunc(c.configWatcher))
|
||||||
}
|
}
|
||||||
|
|
||||||
func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) {
|
func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) {
|
||||||
|
|
Loading…
Reference in a new issue