Prevent process from killing by systemd when shutting down #34
|
@ -25,6 +25,7 @@ Changelog for FrostFS Node
|
|||
- Correct status error for expired session token (#2207)
|
||||
- Set flag `mode` required for `frostfs-cli control shards set-mode` (#8)
|
||||
- Fix `dirty` suffix in debian package version (#53)
|
||||
- Prevent node process from killing by systemd when shutting down (#1465)
|
||||
|
||||
### Removed
|
||||
### Updated
|
||||
|
|
|
@ -911,60 +911,91 @@ type dCfg struct {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *cfg) configWatcher(ctx context.Context) {
|
||||
func (c *cfg) signalWatcher() {
|
||||
ch := make(chan os.Signal, 1)
|
||||
signal.Notify(ch, syscall.SIGHUP)
|
||||
signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
c.log.Info("SIGHUP has been received, rereading configuration...")
|
||||
case sig := <-ch:
|
||||
switch sig {
|
||||
case syscall.SIGHUP:
|
||||
c.reloadConfig()
|
||||
case syscall.SIGTERM, syscall.SIGINT:
|
||||
c.log.Info("termination signal has been received, stopping...")
|
||||
// TODO (@acid-ant): #49 need to cover case when stuck at the middle(node health UNDEFINED or STARTING)
|
||||
|
||||
err := c.readConfig(c.appCfg)
|
||||
if err != nil {
|
||||
c.log.Error("configuration reading", zap.Error(err))
|
||||
continue
|
||||
c.shutdown()
|
||||
|
||||
c.log.Info("termination signal processing is complete")
|
||||
return
|
||||
}
|
||||
case err := <-c.internalErr: // internal application error
|
||||
c.log.Warn("internal application error",
|
||||
zap.String("message", err.Error()))
|
||||
|
||||
// all the components are expected to support
|
||||
// Logger's dynamic reconfiguration approach
|
||||
var components []dCfg
|
||||
c.shutdown()
|
||||
|
||||
// Logger
|
||||
|
||||
logPrm, err := c.loggerPrm()
|
||||
if err != nil {
|
||||
c.log.Error("logger configuration preparation", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
components = append(components, dCfg{name: "logger", cfg: logPrm})
|
||||
|
||||
// Storage Engine
|
||||
|
||||
var rcfg engine.ReConfiguration
|
||||
for _, optsWithID := range c.shardOpts() {
|
||||
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts)
|
||||
}
|
||||
|
||||
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg)
|
||||
if err != nil {
|
||||
c.log.Error("storage engine configuration update", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
for _, component := range components {
|
||||
err = component.cfg.Reload()
|
||||
if err != nil {
|
||||
c.log.Error("updated configuration applying",
|
||||
zap.String("component", component.name),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
c.log.Info("configuration has been reloaded successfully")
|
||||
case <-ctx.Done():
|
||||
c.log.Info("internal error processing is complete")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cfg) reloadConfig() {
|
||||
c.log.Info("SIGHUP has been received, rereading configuration...")
|
||||
|
||||
err := c.readConfig(c.appCfg)
|
||||
if err != nil {
|
||||
c.log.Error("configuration reading", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
// all the components are expected to support
|
||||
// Logger's dynamic reconfiguration approach
|
||||
var components []dCfg
|
||||
|
||||
// Logger
|
||||
|
||||
logPrm, err := c.loggerPrm()
|
||||
if err != nil {
|
||||
c.log.Error("logger configuration preparation", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
components = append(components, dCfg{name: "logger", cfg: logPrm})
|
||||
|
||||
![]() No, we don't need this line at all, removed. No, we don't need this line at all, removed.
![]() Now I see that it is not necessary, thanks. Merged two functions in one. Now I see that it is not necessary, thanks. Merged two functions in one.
|
||||
// Storage Engine
|
||||
|
||||
var rcfg engine.ReConfiguration
|
||||
![]() why do we listen to also, it would save us from double why do we listen to `ctx` here? i would close cancel func with `terminateWatcher` and do not share it via `cfg` at all
also, it would save us from double `c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)` i guess
![]() @carpawell we listen here for context because we may want to exit by internal error. @carpawell we listen here for context because we may want to exit by internal error.
![]() yes, and my question is: would it be better to just move listening internal error here too (in the same goroutine)? it would be the only code that is possible to cancel ctx, all the others would just wait for its closing. more straight scheme IMO (if it is possible and i have not missed something) yes, and my question is: would it be better to just move listening internal error here too (in the same goroutine)? it would be the only code that is possible to cancel ctx, all the others would just wait for its closing. more straight scheme IMO (if it is possible and i have not missed something)
![]() Looks better, moved listening for internal errors here too. Looks better, moved listening for internal errors here too.
![]() So can we remove So can we remove `ctx.Done` now? The problem I see it that this is not a real "worker": it doesn't use context at all, but instead _manages_. Why not have this `select` executed synchronously in `wait`?
|
||||
for _, optsWithID := range c.shardOpts() {
|
||||
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts)
|
||||
}
|
||||
|
||||
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg)
|
||||
if err != nil {
|
||||
c.log.Error("storage engine configuration update", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
for _, component := range components {
|
||||
err = component.cfg.Reload()
|
||||
if err != nil {
|
||||
c.log.Error("updated configuration applying",
|
||||
zap.String("component", component.name),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
c.log.Info("configuration has been reloaded successfully")
|
||||
}
|
||||
|
||||
func (c *cfg) shutdown() {
|
||||
c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)
|
||||
|
||||
c.ctxCancel()
|
||||
for i := range c.closers {
|
||||
c.closers[len(c.closers)-1-i]()
|
||||
}
|
||||
close(c.internalErr)
|
||||
}
|
||||
|
|
|
@ -6,8 +6,6 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
"github.com/TrueCloudLab/frostfs-node/misc"
|
||||
|
@ -66,10 +64,6 @@ func main() {
|
|||
c.setHealthStatus(control.HealthStatus_READY)
|
||||
|
||||
wait(c)
|
||||
|
||||
c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)
|
||||
|
||||
shutdown(c)
|
||||
}
|
||||
|
||||
func initAndLog(c *cfg, name string, initializer func(*cfg)) {
|
||||
|
@ -79,9 +73,18 @@ func initAndLog(c *cfg, name string, initializer func(*cfg)) {
|
|||
}
|
||||
|
||||
func initApp(c *cfg) {
|
||||
initLocalStorage(c)
|
||||
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
|
||||
|
||||
c.ctx, c.ctxCancel = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
c.signalWatcher()
|
||||
c.wg.Done()
|
||||
}()
|
||||
|
||||
initAndLog(c, "pprof", initProfiler)
|
||||
initAndLog(c, "prometheus", initMetrics)
|
||||
|
||||
initLocalStorage(c)
|
||||
|
||||
initAndLog(c, "storage engine", func(c *cfg) {
|
||||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open())
|
||||
|
@ -96,14 +99,10 @@ func initApp(c *cfg) {
|
|||
initAndLog(c, "reputation", initReputationService)
|
||||
initAndLog(c, "notification", initNotifications)
|
||||
initAndLog(c, "object", initObjectService)
|
||||
initAndLog(c, "pprof", initProfiler)
|
||||
initAndLog(c, "prometheus", initMetrics)
|
||||
initAndLog(c, "tree", initTreeService)
|
||||
initAndLog(c, "control", initControlService)
|
||||
|
||||
initAndLog(c, "morph notifications", listenMorphNotifications)
|
||||
|
||||
c.workers = append(c.workers, newWorkerFromFunc(c.configWatcher))
|
||||
}
|
||||
|
||||
func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) {
|
||||
|
@ -128,21 +127,7 @@ func wait(c *cfg) {
|
|||
c.log.Info("application started",
|
||||
zap.String("version", misc.Version))
|
||||
|
||||
select {
|
||||
case <-c.ctx.Done(): // graceful shutdown
|
||||
case err := <-c.internalErr: // internal application error
|
||||
close(c.internalErr)
|
||||
c.ctxCancel()
|
||||
|
||||
c.log.Warn("internal application error",
|
||||
zap.String("message", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
func shutdown(c *cfg) {
|
||||
for _, closer := range c.closers {
|
||||
closer()
|
||||
}
|
||||
<-c.ctx.Done() // graceful shutdown
|
||||
|
||||
c.log.Debug("waiting for all processes to stop")
|
||||
|
||||
|
|
|
@ -56,7 +56,10 @@ func initMorphComponents(c *cfg) {
|
|||
fatalOnErr(err)
|
||||
}
|
||||
|
||||
c.onShutdown(cli.Close)
|
||||
c.onShutdown(func() {
|
||||
c.log.Info("closing morph components...")
|
||||
cli.Close()
|
||||
![]() Thanks, moved to another. Thanks, moved to another.
|
||||
})
|
||||
|
||||
if err := cli.SetGroupSignerScope(); err != nil {
|
||||
c.log.Info("failed to set group signer scope, continue with Global", zap.Error(err))
|
||||
|
|
|
@ -126,7 +126,20 @@ func (c *Client) notificationLoop() {
|
|||
continue
|
||||
}
|
||||
|
||||
c.notifications <- n
|
||||
select {
|
||||
case c.notifications <- n:
|
||||
continue
|
||||
case <-c.cfg.ctx.Done():
|
||||
_ = c.UnsubscribeAll()
|
||||
c.close()
|
||||
|
||||
return
|
||||
case <-c.closeChan:
|
||||
_ = c.UnsubscribeAll()
|
||||
c.close()
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ func (c *Client) Close() {
|
|||
// closing should be done via the channel
|
||||
// to prevent switching to another RPC node
|
||||
// in the notification loop
|
||||
c.closeChan <- struct{}{}
|
||||
close(c.closeChan)
|
||||
}
|
||||
|
||||
// SubscribeForExecutionNotifications adds subscription for notifications
|
||||
|
|
Why not just
nil
? Also, do we need it at all?Why did you decide to have a separate function?