From 85cf1f47acd6283f2e9595e7f80bbf53cb777fe1 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Fri, 27 Jan 2023 18:12:03 +0300 Subject: [PATCH] [#1465] node: Prevent process from killing by systemd when shutting down Signed-off-by: Anton Nikiforov --- CHANGELOG.md | 1 + cmd/frostfs-node/config.go | 121 +++++++++++++++++++----------- cmd/frostfs-node/main.go | 39 +++------- pkg/morph/client/multi.go | 15 +++- pkg/morph/client/notifications.go | 2 +- 5 files changed, 104 insertions(+), 74 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41ed456c71..9002ee10ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ Changelog for FrostFS Node - Correct status error for expired session token (#2207) - Set flag `mode` required for `frostfs-cli control shards set-mode` (#8) - Fix `dirty` suffix in debian package version (#53) +- Prevent node process from killing by systemd when shutting down (#1465) ### Removed ### Updated diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index bf3493de01..d20e9b1cc4 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -911,60 +911,91 @@ type dCfg struct { } } -func (c *cfg) configWatcher(ctx context.Context) { +func (c *cfg) signalWatcher() { ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGHUP) + signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) for { select { - case <-ch: - c.log.Info("SIGHUP has been received, rereading configuration...") + case sig := <-ch: + switch sig { + case syscall.SIGHUP: + c.reloadConfig() + case syscall.SIGTERM, syscall.SIGINT: + c.log.Info("termination signal has been received, stopping...") + // TODO (@acid-ant): #49 need to cover case when stuck at the middle(node health UNDEFINED or STARTING) - err := c.readConfig(c.appCfg) - if err != nil { - c.log.Error("configuration reading", zap.Error(err)) - continue + c.shutdown() + + c.log.Info("termination signal processing is complete") + return } + case err := <-c.internalErr: // internal application error + c.log.Warn("internal application error", + zap.String("message", err.Error())) - // all the components are expected to support - // Logger's dynamic reconfiguration approach - var components []dCfg + c.shutdown() - // Logger - - logPrm, err := c.loggerPrm() - if err != nil { - c.log.Error("logger configuration preparation", zap.Error(err)) - continue - } - - components = append(components, dCfg{name: "logger", cfg: logPrm}) - - // Storage Engine - - var rcfg engine.ReConfiguration - for _, optsWithID := range c.shardOpts() { - rcfg.AddShard(optsWithID.configID, optsWithID.shOpts) - } - - err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg) - if err != nil { - c.log.Error("storage engine configuration update", zap.Error(err)) - continue - } - - for _, component := range components { - err = component.cfg.Reload() - if err != nil { - c.log.Error("updated configuration applying", - zap.String("component", component.name), - zap.Error(err)) - } - } - - c.log.Info("configuration has been reloaded successfully") - case <-ctx.Done(): + c.log.Info("internal error processing is complete") return } } } + +func (c *cfg) reloadConfig() { + c.log.Info("SIGHUP has been received, rereading configuration...") + + err := c.readConfig(c.appCfg) + if err != nil { + c.log.Error("configuration reading", zap.Error(err)) + return + } + + // all the components are expected to support + // Logger's dynamic reconfiguration approach + var components []dCfg + + // Logger + + logPrm, err := c.loggerPrm() + if err != nil { + c.log.Error("logger configuration preparation", zap.Error(err)) + return + } + + components = append(components, dCfg{name: "logger", cfg: logPrm}) + + // Storage Engine + + var rcfg engine.ReConfiguration + for _, optsWithID := range c.shardOpts() { + rcfg.AddShard(optsWithID.configID, optsWithID.shOpts) + } + + err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg) + if err != nil { + c.log.Error("storage engine configuration update", zap.Error(err)) + return + } + + for _, component := range components { + err = component.cfg.Reload() + if err != nil { + c.log.Error("updated configuration applying", + zap.String("component", component.name), + zap.Error(err)) + } + } + + c.log.Info("configuration has been reloaded successfully") +} + +func (c *cfg) shutdown() { + c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN) + + c.ctxCancel() + for i := range c.closers { + c.closers[len(c.closers)-1-i]() + } + close(c.internalErr) +} diff --git a/cmd/frostfs-node/main.go b/cmd/frostfs-node/main.go index 35c18bafe4..cdc3c94b34 100644 --- a/cmd/frostfs-node/main.go +++ b/cmd/frostfs-node/main.go @@ -6,8 +6,6 @@ import ( "fmt" "log" "os" - "os/signal" - "syscall" "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" "github.com/TrueCloudLab/frostfs-node/misc" @@ -66,10 +64,6 @@ func main() { c.setHealthStatus(control.HealthStatus_READY) wait(c) - - c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN) - - shutdown(c) } func initAndLog(c *cfg, name string, initializer func(*cfg)) { @@ -79,9 +73,18 @@ func initAndLog(c *cfg, name string, initializer func(*cfg)) { } func initApp(c *cfg) { - initLocalStorage(c) + c.ctx, c.ctxCancel = context.WithCancel(context.Background()) - c.ctx, c.ctxCancel = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + c.wg.Add(1) + go func() { + c.signalWatcher() + c.wg.Done() + }() + + initAndLog(c, "pprof", initProfiler) + initAndLog(c, "prometheus", initMetrics) + + initLocalStorage(c) initAndLog(c, "storage engine", func(c *cfg) { fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open()) @@ -96,14 +99,10 @@ func initApp(c *cfg) { initAndLog(c, "reputation", initReputationService) initAndLog(c, "notification", initNotifications) initAndLog(c, "object", initObjectService) - initAndLog(c, "pprof", initProfiler) - initAndLog(c, "prometheus", initMetrics) initAndLog(c, "tree", initTreeService) initAndLog(c, "control", initControlService) initAndLog(c, "morph notifications", listenMorphNotifications) - - c.workers = append(c.workers, newWorkerFromFunc(c.configWatcher)) } func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) { @@ -128,21 +127,7 @@ func wait(c *cfg) { c.log.Info("application started", zap.String("version", misc.Version)) - select { - case <-c.ctx.Done(): // graceful shutdown - case err := <-c.internalErr: // internal application error - close(c.internalErr) - c.ctxCancel() - - c.log.Warn("internal application error", - zap.String("message", err.Error())) - } -} - -func shutdown(c *cfg) { - for _, closer := range c.closers { - closer() - } + <-c.ctx.Done() // graceful shutdown c.log.Debug("waiting for all processes to stop") diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index cdd5326761..e0eecd9295 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -126,7 +126,20 @@ func (c *Client) notificationLoop() { continue } - c.notifications <- n + select { + case c.notifications <- n: + continue + case <-c.cfg.ctx.Done(): + _ = c.UnsubscribeAll() + c.close() + + return + case <-c.closeChan: + _ = c.UnsubscribeAll() + c.close() + + return + } } } } diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index 34f87f3974..8eaf617b2a 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -14,7 +14,7 @@ func (c *Client) Close() { // closing should be done via the channel // to prevent switching to another RPC node // in the notification loop - c.closeChan <- struct{}{} + close(c.closeChan) } // SubscribeForExecutionNotifications adds subscription for notifications