forked from TrueCloudLab/frostfs-node
[#304] cmd/neofs-node: Catch closing channel of listener endpoint
As in #72 storage application should behave the same way at remote RPC node failures. The simplest way is to restart application. Later we can reinitialize it without downtime. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
1e170c3812
commit
3774c5d69a
3 changed files with 27 additions and 9 deletions
|
@ -142,6 +142,10 @@ const (
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
|
ctxCancel func()
|
||||||
|
|
||||||
|
internalErr chan error // channel for internal application errors at runtime
|
||||||
|
|
||||||
viper *viper.Viper
|
viper *viper.Viper
|
||||||
|
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
|
@ -291,6 +295,7 @@ func initCfg(path string) *cfg {
|
||||||
|
|
||||||
c := &cfg{
|
c := &cfg{
|
||||||
ctx: context.Background(),
|
ctx: context.Background(),
|
||||||
|
internalErr: make(chan error),
|
||||||
viper: viperCfg,
|
viper: viperCfg,
|
||||||
log: log,
|
log: log,
|
||||||
wg: new(sync.WaitGroup),
|
wg: new(sync.WaitGroup),
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"log"
|
"log"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/grace"
|
"github.com/nspcc-dev/neofs-node/pkg/util/grace"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func fatalOnErr(err error) {
|
func fatalOnErr(err error) {
|
||||||
|
@ -29,7 +31,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func initApp(c *cfg) {
|
func initApp(c *cfg) {
|
||||||
c.ctx = grace.NewGracefulContext(nil)
|
c.ctx, c.ctxCancel = context.WithCancel(grace.NewGracefulContext(nil))
|
||||||
|
|
||||||
initGRPC(c)
|
initGRPC(c)
|
||||||
|
|
||||||
|
@ -56,7 +58,15 @@ func bootUp(c *cfg) {
|
||||||
func wait(c *cfg) {
|
func wait(c *cfg) {
|
||||||
c.log.Info("application started")
|
c.log.Info("application started")
|
||||||
|
|
||||||
<-c.ctx.Done()
|
select {
|
||||||
|
case <-c.ctx.Done(): // graceful shutdown
|
||||||
|
case err := <-c.internalErr: // internal application error
|
||||||
|
close(c.internalErr)
|
||||||
|
c.ctxCancel()
|
||||||
|
|
||||||
|
c.log.Warn("internal application error",
|
||||||
|
zap.String("message", err.Error()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func shutdown(c *cfg) {
|
func shutdown(c *cfg) {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
@ -51,7 +52,9 @@ func listenMorphNotifications(c *cfg) {
|
||||||
})
|
})
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
c.workers = append(c.workers, newWorkerFromFunc(lis.Listen))
|
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
|
||||||
|
lis.ListenWithError(ctx, c.internalErr)
|
||||||
|
}))
|
||||||
|
|
||||||
setNetmapNotificationParser(c, newEpochNotification, netmapEvent.ParseNewEpoch)
|
setNetmapNotificationParser(c, newEpochNotification, netmapEvent.ParseNewEpoch)
|
||||||
registerNotificationHandlers(c.cfgNetmap.scriptHash, lis, c.cfgNetmap.parsers, c.cfgNetmap.subscribers)
|
registerNotificationHandlers(c.cfgNetmap.scriptHash, lis, c.cfgNetmap.parsers, c.cfgNetmap.subscribers)
|
||||||
|
|
Loading…
Reference in a new issue