diff --git a/cmd/frostfs-node/morph.go b/cmd/frostfs-node/morph.go index edb1412a..f7100d0b 100644 --- a/cmd/frostfs-node/morph.go +++ b/cmd/frostfs-node/morph.go @@ -207,6 +207,10 @@ func listenMorphNotifications(ctx context.Context, c *cfg) { }) fatalOnErr(err) + c.onShutdown(func() { + lis.Stop() + }) + c.workers = append(c.workers, newWorkerFromFunc(func(wCtx context.Context) { runAndLog(wCtx, c, "morph notification", false, func(lCtx context.Context, c *cfg) { lis.ListenWithError(lCtx, c.internalErr) diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index 290e651f..05083864 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -73,6 +73,8 @@ type Client struct { // channel for internal stop closeChan chan struct{} + closed atomic.Bool + wg sync.WaitGroup // indicates that Client is not able to // establish connection to any of the diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index e006ca69..25ec626d 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -75,6 +75,8 @@ func (c *Client) SwitchRPC(ctx context.Context) bool { } func (c *Client) closeWaiter(ctx context.Context) { + c.wg.Add(1) + defer c.wg.Done() select { case <-ctx.Done(): case <-c.closeChan: diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index 121dccfb..a013631d 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -16,7 +16,10 @@ func (c *Client) Close() { // closing should be done via the channel // to prevent switching to another RPC node // in the notification loop - close(c.closeChan) + if c.closed.CompareAndSwap(false, true) { + close(c.closeChan) + } + c.wg.Wait() } // ReceiveExecutionNotifications performs subscription for notifications diff --git a/pkg/morph/event/listener.go b/pkg/morph/event/listener.go index ca503141..4fb92da5 100644 --- a/pkg/morph/event/listener.go +++ b/pkg/morph/event/listener.go @@ -96,6 +96,8 @@ type ListenerParams struct { type listener struct { mtx sync.RWMutex + wg sync.WaitGroup + startOnce, stopOnce sync.Once started bool @@ -124,6 +126,12 @@ var ( errNilLogger = errors.New("nil logger") errNilSubscriber = errors.New("nil event subscriber") + + errNotificationSubscrConnectionTerminated = errors.New("event subscriber connection has been terminated") + + errNotarySubscrConnectionTerminated = errors.New("notary event subscriber connection has been terminated") + + errBlockNotificationChannelClosed = errors.New("new block notification channel is closed") ) // Listen starts the listening for events with registered handlers. @@ -133,6 +141,8 @@ var ( // Returns an error if listener was already started. func (l *listener) Listen(ctx context.Context) { l.startOnce.Do(func() { + l.wg.Add(1) + defer l.wg.Done() if err := l.listen(ctx, nil); err != nil { l.log.Error(logs.EventCouldNotStartListenToEvents, zap.String("error", err.Error()), @@ -149,11 +159,13 @@ func (l *listener) Listen(ctx context.Context) { // Returns an error if listener was already started. func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) { l.startOnce.Do(func() { + l.wg.Add(1) + defer l.wg.Done() if err := l.listen(ctx, intError); err != nil { l.log.Error(logs.EventCouldNotStartListenToEvents, zap.String("error", err.Error()), ) - intError <- err + l.sendError(ctx, intError, err) } }) } @@ -172,6 +184,8 @@ func (l *listener) listen(ctx context.Context, intError chan<- error) error { } func (l *listener) subscribe(errCh chan error) { + l.wg.Add(1) + defer l.wg.Done() // create the list of listening contract hashes hashes := make([]util.Uint160, 0) @@ -212,6 +226,23 @@ func (l *listener) subscribe(errCh chan error) { } } +func (l *listener) sendError(ctx context.Context, intErr chan<- error, err error) bool { + if intErr == nil { + return false + } + // This select required because were are reading from error channel and closing listener + // in the same routine when shutting down node. + select { + case <-ctx.Done(): + l.log.Info(logs.EventStopEventListenerByContext, + zap.String("reason", ctx.Err().Error()), + ) + return false + case intErr <- err: + return true + } +} + func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) { chs := l.subscriber.NotificationChannels() @@ -219,12 +250,9 @@ loop: for { select { case err := <-subErrCh: - if intErr != nil { - intErr <- err - } else { + if !l.sendError(ctx, intErr, err) { l.log.Error(logs.EventStopEventListenerByError, zap.Error(err)) } - break loop case <-ctx.Done(): l.log.Info(logs.EventStopEventListenerByContext, @@ -234,10 +262,7 @@ loop: case notifyEvent, ok := <-chs.NotificationsCh: if !ok { l.log.Warn(logs.EventStopEventListenerByNotificationChannel) - if intErr != nil { - intErr <- errors.New("event subscriber connection has been terminated") - } - + l.sendError(ctx, intErr, errNotificationSubscrConnectionTerminated) break loop } else if notifyEvent == nil { l.log.Warn(logs.EventNilNotificationEventWasCaught) @@ -248,10 +273,7 @@ loop: case notaryEvent, ok := <-chs.NotaryRequestsCh: if !ok { l.log.Warn(logs.EventStopEventListenerByNotaryChannel) - if intErr != nil { - intErr <- errors.New("notary event subscriber connection has been terminated") - } - + l.sendError(ctx, intErr, errNotarySubscrConnectionTerminated) break loop } else if notaryEvent == nil { l.log.Warn(logs.EventNilNotaryEventWasCaught) @@ -262,10 +284,7 @@ loop: case b, ok := <-chs.BlockCh: if !ok { l.log.Warn(logs.EventStopEventListenerByBlockChannel) - if intErr != nil { - intErr <- errors.New("new block notification channel is closed") - } - + l.sendError(ctx, intErr, errBlockNotificationChannelClosed) break loop } else if b == nil { l.log.Warn(logs.EventNilBlockWasCaught) @@ -603,6 +622,7 @@ func (l *listener) Stop() { l.stopOnce.Do(func() { l.subscriber.Close() }) + l.wg.Wait() } func (l *listener) RegisterBlockHandler(handler BlockHandler) {