morph: Fix panic when closing morph client #760

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:bugfix/749-morph-client-panic into master 2023-11-01 10:48:15 +00:00
5 changed files with 49 additions and 18 deletions

View file

@ -207,6 +207,10 @@ func listenMorphNotifications(ctx context.Context, c *cfg) {
})
fatalOnErr(err)
c.onShutdown(func() {
lis.Stop()
})
c.workers = append(c.workers, newWorkerFromFunc(func(wCtx context.Context) {
runAndLog(wCtx, c, "morph notification", false, func(lCtx context.Context, c *cfg) {
lis.ListenWithError(lCtx, c.internalErr)

View file

@ -73,6 +73,8 @@ type Client struct {
// channel for internal stop
closeChan chan struct{}
closed atomic.Bool
wg sync.WaitGroup
// indicates that Client is not able to
// establish connection to any of the

View file

@ -75,6 +75,8 @@ func (c *Client) SwitchRPC(ctx context.Context) bool {
}
func (c *Client) closeWaiter(ctx context.Context) {
c.wg.Add(1)
defer c.wg.Done()
select {
case <-ctx.Done():
case <-c.closeChan:

View file

@ -16,7 +16,10 @@ func (c *Client) Close() {
// closing should be done via the channel
// to prevent switching to another RPC node
// in the notification loop
if c.closed.CompareAndSwap(false, true) {
close(c.closeChan)
}
c.wg.Wait()
}
// ReceiveExecutionNotifications performs subscription for notifications

View file

@ -96,6 +96,8 @@ type ListenerParams struct {
type listener struct {
mtx sync.RWMutex
wg sync.WaitGroup
startOnce, stopOnce sync.Once
started bool
@ -124,6 +126,12 @@ var (
errNilLogger = errors.New("nil logger")
errNilSubscriber = errors.New("nil event subscriber")
errNotificationSubscrConnectionTerminated = errors.New("event subscriber connection has been terminated")
errNotarySubscrConnectionTerminated = errors.New("notary event subscriber connection has been terminated")
errBlockNotificationChannelClosed = errors.New("new block notification channel is closed")
)
// Listen starts the listening for events with registered handlers.
@ -133,6 +141,8 @@ var (
// Returns an error if listener was already started.
func (l *listener) Listen(ctx context.Context) {
l.startOnce.Do(func() {
l.wg.Add(1)
defer l.wg.Done()
if err := l.listen(ctx, nil); err != nil {
l.log.Error(logs.EventCouldNotStartListenToEvents,
zap.String("error", err.Error()),
@ -149,11 +159,13 @@ func (l *listener) Listen(ctx context.Context) {
// Returns an error if listener was already started.
func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
l.startOnce.Do(func() {
l.wg.Add(1)
defer l.wg.Done()
if err := l.listen(ctx, intError); err != nil {
l.log.Error(logs.EventCouldNotStartListenToEvents,
zap.String("error", err.Error()),
)
intError <- err
l.sendError(ctx, intError, err)
}
})
}
@ -172,6 +184,8 @@ func (l *listener) listen(ctx context.Context, intError chan<- error) error {
}
func (l *listener) subscribe(errCh chan error) {
l.wg.Add(1)
defer l.wg.Done()
// create the list of listening contract hashes
hashes := make([]util.Uint160, 0)
@ -212,6 +226,23 @@ func (l *listener) subscribe(errCh chan error) {
}
}
func (l *listener) sendError(ctx context.Context, intErr chan<- error, err error) bool {
if intErr == nil {
fyrchik marked this conversation as resolved Outdated

nitpick, mut maybe if intErr == nil { return false } and reduce indentation for other code?

nitpick, mut maybe `if intErr == nil { return false }` and reduce indentation for other code?

Agree, updated.

Agree, updated.
return false
}
// This select required because were are reading from error channel and closing listener
// in the same routine when shutting down node.
select {
case <-ctx.Done():
l.log.Info(logs.EventStopEventListenerByContext,
zap.String("reason", ctx.Err().Error()),
)
return false
case intErr <- err:
return true
}
}
func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) {
chs := l.subscriber.NotificationChannels()
@ -219,12 +250,9 @@ loop:
for {
select {
case err := <-subErrCh:
if intErr != nil {
intErr <- err
} else {
if !l.sendError(ctx, intErr, err) {
l.log.Error(logs.EventStopEventListenerByError, zap.Error(err))
}
break loop
case <-ctx.Done():
fyrchik marked this conversation as resolved Outdated

May be also move if intErr != nil logic in function?

May be also move `if intErr != nil` logic in function?

The idea was to avoid allocation of error to pass it to the function which should send it. How about to pass string only and allocate error inside the function?

The idea was to avoid allocation of error to pass it to the function which should send it. How about to pass string only and allocate error inside the function?

Ah, yeah, missed that.
But the error could easily be moved to a global varable, it is always the same.

Ah, yeah, missed that. But the error could easily be moved to a global varable, it is always the same.
l.log.Info(logs.EventStopEventListenerByContext,
@ -234,10 +262,7 @@ loop:
case notifyEvent, ok := <-chs.NotificationsCh:
if !ok {
l.log.Warn(logs.EventStopEventListenerByNotificationChannel)
if intErr != nil {
intErr <- errors.New("event subscriber connection has been terminated")
}
l.sendError(ctx, intErr, errNotificationSubscrConnectionTerminated)
break loop
} else if notifyEvent == nil {
l.log.Warn(logs.EventNilNotificationEventWasCaught)
@ -248,10 +273,7 @@ loop:
case notaryEvent, ok := <-chs.NotaryRequestsCh:
if !ok {
l.log.Warn(logs.EventStopEventListenerByNotaryChannel)
if intErr != nil {
intErr <- errors.New("notary event subscriber connection has been terminated")
}
l.sendError(ctx, intErr, errNotarySubscrConnectionTerminated)
break loop
} else if notaryEvent == nil {
l.log.Warn(logs.EventNilNotaryEventWasCaught)
@ -262,10 +284,7 @@ loop:
case b, ok := <-chs.BlockCh:
if !ok {
l.log.Warn(logs.EventStopEventListenerByBlockChannel)
if intErr != nil {
intErr <- errors.New("new block notification channel is closed")
}
l.sendError(ctx, intErr, errBlockNotificationChannelClosed)
break loop
} else if b == nil {
l.log.Warn(logs.EventNilBlockWasCaught)
@ -603,6 +622,7 @@ func (l *listener) Stop() {
l.stopOnce.Do(func() {
l.subscriber.Close()
})
l.wg.Wait()
}
func (l *listener) RegisterBlockHandler(handler BlockHandler) {