Prevent process from killing by systemd when shutting down #34

Merged
acid-ant merged 2 commits from bugfix/1465-fix-node-shutdown into master 2023-02-17 09:13:01 +00:00
6 changed files with 108 additions and 75 deletions

View file

@ -25,6 +25,7 @@ Changelog for FrostFS Node
- Correct status error for expired session token (#2207)
- Set flag `mode` required for `frostfs-cli control shards set-mode` (#8)
- Fix `dirty` suffix in debian package version (#53)
- Prevent node process from killing by systemd when shutting down (#1465)
### Removed
### Updated

View file

@ -911,60 +911,91 @@ type dCfg struct {
}
}
func (c *cfg) configWatcher(ctx context.Context) {
func (c *cfg) signalWatcher() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGHUP)
signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case <-ch:
c.log.Info("SIGHUP has been received, rereading configuration...")
case sig := <-ch:
switch sig {
case syscall.SIGHUP:
c.reloadConfig()
case syscall.SIGTERM, syscall.SIGINT:
c.log.Info("termination signal has been received, stopping...")
// TODO (@acid-ant): #49 need to cover case when stuck at the middle(node health UNDEFINED or STARTING)
err := c.readConfig(c.appCfg)
if err != nil {
c.log.Error("configuration reading", zap.Error(err))
continue
c.shutdown()
c.log.Info("termination signal processing is complete")
return
}
case err := <-c.internalErr: // internal application error
c.log.Warn("internal application error",
zap.String("message", err.Error()))
// all the components are expected to support
// Logger's dynamic reconfiguration approach
var components []dCfg
c.shutdown()
// Logger
logPrm, err := c.loggerPrm()
if err != nil {
c.log.Error("logger configuration preparation", zap.Error(err))
continue
}
components = append(components, dCfg{name: "logger", cfg: logPrm})
// Storage Engine
var rcfg engine.ReConfiguration
for _, optsWithID := range c.shardOpts() {
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts)
}
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg)
if err != nil {
c.log.Error("storage engine configuration update", zap.Error(err))
continue
}
for _, component := range components {
err = component.cfg.Reload()
if err != nil {
c.log.Error("updated configuration applying",
zap.String("component", component.name),
zap.Error(err))
}
}
c.log.Info("configuration has been reloaded successfully")
case <-ctx.Done():
c.log.Info("internal error processing is complete")
return
}
}
}
func (c *cfg) reloadConfig() {
c.log.Info("SIGHUP has been received, rereading configuration...")
err := c.readConfig(c.appCfg)
if err != nil {
c.log.Error("configuration reading", zap.Error(err))
return
}
// all the components are expected to support
// Logger's dynamic reconfiguration approach
var components []dCfg
// Logger
logPrm, err := c.loggerPrm()
if err != nil {
c.log.Error("logger configuration preparation", zap.Error(err))
return
}
components = append(components, dCfg{name: "logger", cfg: logPrm})
fyrchik commented 2023-02-06 13:46:29 +00:00 (Migrated from github.com)
Review

Why not just nil? Also, do we need it at all?

Why not just `nil`? Also, do we need it at all?
fyrchik commented 2023-02-06 18:28:21 +00:00 (Migrated from github.com)
Review

Why did you decide to have a separate function?

Why did you decide to have a separate function?
acid-ant commented 2023-02-07 08:20:29 +00:00 (Migrated from github.com)
Review

No, we don't need this line at all, removed.

No, we don't need this line at all, removed.
acid-ant commented 2023-02-07 08:22:18 +00:00 (Migrated from github.com)
Review

Now I see that it is not necessary, thanks. Merged two functions in one.

Now I see that it is not necessary, thanks. Merged two functions in one.
// Storage Engine
var rcfg engine.ReConfiguration
carpawell commented 2023-02-01 19:58:20 +00:00 (Migrated from github.com)
Review

why do we listen to ctx here? i would close cancel func with terminateWatcher and do not share it via cfg at all

also, it would save us from double c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN) i guess

why do we listen to `ctx` here? i would close cancel func with `terminateWatcher` and do not share it via `cfg` at all also, it would save us from double `c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)` i guess
acid-ant commented 2023-02-02 06:42:17 +00:00 (Migrated from github.com)
Review

@carpawell we listen here for context because we may want to exit by internal error.

@carpawell we listen here for context because we may want to exit by internal error.
carpawell commented 2023-02-02 07:40:41 +00:00 (Migrated from github.com)
Review

yes, and my question is: would it be better to just move listening internal error here too (in the same goroutine)? it would be the only code that is possible to cancel ctx, all the others would just wait for its closing. more straight scheme IMO (if it is possible and i have not missed something)

yes, and my question is: would it be better to just move listening internal error here too (in the same goroutine)? it would be the only code that is possible to cancel ctx, all the others would just wait for its closing. more straight scheme IMO (if it is possible and i have not missed something)
acid-ant commented 2023-02-02 12:25:01 +00:00 (Migrated from github.com)
Review

Looks better, moved listening for internal errors here too.

Looks better, moved listening for internal errors here too.
fyrchik commented 2023-02-06 18:36:56 +00:00 (Migrated from github.com)
Review

So can we remove ctx.Done now? The problem I see it that this is not a real "worker": it doesn't use context at all, but instead manages. Why not have this select executed synchronously in wait?

So can we remove `ctx.Done` now? The problem I see it that this is not a real "worker": it doesn't use context at all, but instead _manages_. Why not have this `select` executed synchronously in `wait`?
for _, optsWithID := range c.shardOpts() {
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts)
}
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg)
if err != nil {
c.log.Error("storage engine configuration update", zap.Error(err))
return
}
for _, component := range components {
err = component.cfg.Reload()
if err != nil {
c.log.Error("updated configuration applying",
zap.String("component", component.name),
zap.Error(err))
}
}
c.log.Info("configuration has been reloaded successfully")
}
func (c *cfg) shutdown() {
c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)
c.ctxCancel()
for i := range c.closers {
c.closers[len(c.closers)-1-i]()
}
close(c.internalErr)
}

View file

@ -6,8 +6,6 @@ import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"github.com/TrueCloudLab/frostfs-node/misc"
@ -66,10 +64,6 @@ func main() {
c.setHealthStatus(control.HealthStatus_READY)
wait(c)
c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)
shutdown(c)
}
func initAndLog(c *cfg, name string, initializer func(*cfg)) {
@ -79,9 +73,18 @@ func initAndLog(c *cfg, name string, initializer func(*cfg)) {
}
func initApp(c *cfg) {
initLocalStorage(c)
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
c.ctx, c.ctxCancel = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
c.wg.Add(1)
go func() {
c.signalWatcher()
c.wg.Done()
}()
initAndLog(c, "pprof", initProfiler)
initAndLog(c, "prometheus", initMetrics)
initLocalStorage(c)
initAndLog(c, "storage engine", func(c *cfg) {
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open())
@ -96,14 +99,10 @@ func initApp(c *cfg) {
initAndLog(c, "reputation", initReputationService)
initAndLog(c, "notification", initNotifications)
initAndLog(c, "object", initObjectService)
initAndLog(c, "pprof", initProfiler)
initAndLog(c, "prometheus", initMetrics)
initAndLog(c, "tree", initTreeService)
initAndLog(c, "control", initControlService)
initAndLog(c, "morph notifications", listenMorphNotifications)
c.workers = append(c.workers, newWorkerFromFunc(c.configWatcher))
}
func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) {
@ -128,21 +127,7 @@ func wait(c *cfg) {
c.log.Info("application started",
zap.String("version", misc.Version))
select {
case <-c.ctx.Done(): // graceful shutdown
case err := <-c.internalErr: // internal application error
close(c.internalErr)
c.ctxCancel()
c.log.Warn("internal application error",
zap.String("message", err.Error()))
}
}
func shutdown(c *cfg) {
for _, closer := range c.closers {
closer()
}
<-c.ctx.Done() // graceful shutdown
c.log.Debug("waiting for all processes to stop")

View file

@ -56,7 +56,10 @@ func initMorphComponents(c *cfg) {
fatalOnErr(err)
}
c.onShutdown(cli.Close)
c.onShutdown(func() {
c.log.Info("closing morph components...")
cli.Close()
acid-ant commented 2023-02-10 06:23:43 +00:00 (Migrated from github.com)
Review

Thanks, moved to another.

Thanks, moved to another.
})
if err := cli.SetGroupSignerScope(); err != nil {
c.log.Info("failed to set group signer scope, continue with Global", zap.Error(err))

View file

@ -126,7 +126,20 @@ func (c *Client) notificationLoop() {
continue
}
c.notifications <- n
select {
case c.notifications <- n:
continue
case <-c.cfg.ctx.Done():
_ = c.UnsubscribeAll()
c.close()
return
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
return
}
}
}
}

View file

@ -14,7 +14,7 @@ func (c *Client) Close() {
// closing should be done via the channel
// to prevent switching to another RPC node
// in the notification loop
c.closeChan <- struct{}{}
close(c.closeChan)
}
// SubscribeForExecutionNotifications adds subscription for notifications