[#1465] node: Prevent process from killing by systemd when shutting down

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2023-01-27 18:12:03 +03:00 committed by fyrchik
parent 362f24953a
commit 85cf1f47ac
5 changed files with 104 additions and 74 deletions

View file

@ -25,6 +25,7 @@ Changelog for FrostFS Node
- Correct status error for expired session token (#2207) - Correct status error for expired session token (#2207)
- Set flag `mode` required for `frostfs-cli control shards set-mode` (#8) - Set flag `mode` required for `frostfs-cli control shards set-mode` (#8)
- Fix `dirty` suffix in debian package version (#53) - Fix `dirty` suffix in debian package version (#53)
- Prevent node process from killing by systemd when shutting down (#1465)
### Removed ### Removed
### Updated ### Updated

View file

@ -911,60 +911,91 @@ type dCfg struct {
} }
} }
func (c *cfg) configWatcher(ctx context.Context) { func (c *cfg) signalWatcher() {
ch := make(chan os.Signal, 1) ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGHUP) signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
for { for {
select { select {
case <-ch: case sig := <-ch:
c.log.Info("SIGHUP has been received, rereading configuration...") switch sig {
case syscall.SIGHUP:
c.reloadConfig()
case syscall.SIGTERM, syscall.SIGINT:
c.log.Info("termination signal has been received, stopping...")
// TODO (@acid-ant): #49 need to cover case when stuck at the middle(node health UNDEFINED or STARTING)
err := c.readConfig(c.appCfg) c.shutdown()
if err != nil {
c.log.Error("configuration reading", zap.Error(err)) c.log.Info("termination signal processing is complete")
continue return
} }
case err := <-c.internalErr: // internal application error
c.log.Warn("internal application error",
zap.String("message", err.Error()))
// all the components are expected to support c.shutdown()
// Logger's dynamic reconfiguration approach
var components []dCfg
// Logger c.log.Info("internal error processing is complete")
logPrm, err := c.loggerPrm()
if err != nil {
c.log.Error("logger configuration preparation", zap.Error(err))
continue
}
components = append(components, dCfg{name: "logger", cfg: logPrm})
// Storage Engine
var rcfg engine.ReConfiguration
for _, optsWithID := range c.shardOpts() {
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts)
}
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg)
if err != nil {
c.log.Error("storage engine configuration update", zap.Error(err))
continue
}
for _, component := range components {
err = component.cfg.Reload()
if err != nil {
c.log.Error("updated configuration applying",
zap.String("component", component.name),
zap.Error(err))
}
}
c.log.Info("configuration has been reloaded successfully")
case <-ctx.Done():
return return
} }
} }
} }
func (c *cfg) reloadConfig() {
c.log.Info("SIGHUP has been received, rereading configuration...")
err := c.readConfig(c.appCfg)
if err != nil {
c.log.Error("configuration reading", zap.Error(err))
return
}
// all the components are expected to support
// Logger's dynamic reconfiguration approach
var components []dCfg
// Logger
logPrm, err := c.loggerPrm()
if err != nil {
c.log.Error("logger configuration preparation", zap.Error(err))
return
}
components = append(components, dCfg{name: "logger", cfg: logPrm})
// Storage Engine
var rcfg engine.ReConfiguration
for _, optsWithID := range c.shardOpts() {
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts)
}
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg)
if err != nil {
c.log.Error("storage engine configuration update", zap.Error(err))
return
}
for _, component := range components {
err = component.cfg.Reload()
if err != nil {
c.log.Error("updated configuration applying",
zap.String("component", component.name),
zap.Error(err))
}
}
c.log.Info("configuration has been reloaded successfully")
}
func (c *cfg) shutdown() {
c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)
c.ctxCancel()
for i := range c.closers {
c.closers[len(c.closers)-1-i]()
}
close(c.internalErr)
}

View file

@ -6,8 +6,6 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"os/signal"
"syscall"
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"github.com/TrueCloudLab/frostfs-node/misc" "github.com/TrueCloudLab/frostfs-node/misc"
@ -66,10 +64,6 @@ func main() {
c.setHealthStatus(control.HealthStatus_READY) c.setHealthStatus(control.HealthStatus_READY)
wait(c) wait(c)
c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)
shutdown(c)
} }
func initAndLog(c *cfg, name string, initializer func(*cfg)) { func initAndLog(c *cfg, name string, initializer func(*cfg)) {
@ -79,9 +73,18 @@ func initAndLog(c *cfg, name string, initializer func(*cfg)) {
} }
func initApp(c *cfg) { func initApp(c *cfg) {
initLocalStorage(c) c.ctx, c.ctxCancel = context.WithCancel(context.Background())
c.ctx, c.ctxCancel = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) c.wg.Add(1)
go func() {
c.signalWatcher()
c.wg.Done()
}()
initAndLog(c, "pprof", initProfiler)
initAndLog(c, "prometheus", initMetrics)
initLocalStorage(c)
initAndLog(c, "storage engine", func(c *cfg) { initAndLog(c, "storage engine", func(c *cfg) {
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open()) fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open())
@ -96,14 +99,10 @@ func initApp(c *cfg) {
initAndLog(c, "reputation", initReputationService) initAndLog(c, "reputation", initReputationService)
initAndLog(c, "notification", initNotifications) initAndLog(c, "notification", initNotifications)
initAndLog(c, "object", initObjectService) initAndLog(c, "object", initObjectService)
initAndLog(c, "pprof", initProfiler)
initAndLog(c, "prometheus", initMetrics)
initAndLog(c, "tree", initTreeService) initAndLog(c, "tree", initTreeService)
initAndLog(c, "control", initControlService) initAndLog(c, "control", initControlService)
initAndLog(c, "morph notifications", listenMorphNotifications) initAndLog(c, "morph notifications", listenMorphNotifications)
c.workers = append(c.workers, newWorkerFromFunc(c.configWatcher))
} }
func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) { func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) {
@ -128,21 +127,7 @@ func wait(c *cfg) {
c.log.Info("application started", c.log.Info("application started",
zap.String("version", misc.Version)) zap.String("version", misc.Version))
select { <-c.ctx.Done() // graceful shutdown
case <-c.ctx.Done(): // graceful shutdown
case err := <-c.internalErr: // internal application error
close(c.internalErr)
c.ctxCancel()
c.log.Warn("internal application error",
zap.String("message", err.Error()))
}
}
func shutdown(c *cfg) {
for _, closer := range c.closers {
closer()
}
c.log.Debug("waiting for all processes to stop") c.log.Debug("waiting for all processes to stop")

View file

@ -126,7 +126,20 @@ func (c *Client) notificationLoop() {
continue continue
} }
c.notifications <- n select {
case c.notifications <- n:
continue
case <-c.cfg.ctx.Done():
_ = c.UnsubscribeAll()
c.close()
return
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
return
}
} }
} }
} }

View file

@ -14,7 +14,7 @@ func (c *Client) Close() {
// closing should be done via the channel // closing should be done via the channel
// to prevent switching to another RPC node // to prevent switching to another RPC node
// in the notification loop // in the notification loop
c.closeChan <- struct{}{} close(c.closeChan)
} }
// SubscribeForExecutionNotifications adds subscription for notifications // SubscribeForExecutionNotifications adds subscription for notifications