Compare commits

...

1 commit

Author SHA1 Message Date
7217394620 [#749] morph: Fix panic when closing morph client
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-11-01 08:48:27 +03:00
5 changed files with 49 additions and 18 deletions

View file

@ -207,6 +207,10 @@ func listenMorphNotifications(ctx context.Context, c *cfg) {
}) })
fatalOnErr(err) fatalOnErr(err)
c.onShutdown(func() {
lis.Stop()
})
c.workers = append(c.workers, newWorkerFromFunc(func(wCtx context.Context) { c.workers = append(c.workers, newWorkerFromFunc(func(wCtx context.Context) {
runAndLog(wCtx, c, "morph notification", false, func(lCtx context.Context, c *cfg) { runAndLog(wCtx, c, "morph notification", false, func(lCtx context.Context, c *cfg) {
lis.ListenWithError(lCtx, c.internalErr) lis.ListenWithError(lCtx, c.internalErr)

View file

@ -73,6 +73,8 @@ type Client struct {
// channel for internal stop // channel for internal stop
closeChan chan struct{} closeChan chan struct{}
closed atomic.Bool
wg sync.WaitGroup
// indicates that Client is not able to // indicates that Client is not able to
// establish connection to any of the // establish connection to any of the

View file

@ -75,6 +75,8 @@ func (c *Client) SwitchRPC(ctx context.Context) bool {
} }
func (c *Client) closeWaiter(ctx context.Context) { func (c *Client) closeWaiter(ctx context.Context) {
c.wg.Add(1)
defer c.wg.Done()
select { select {
case <-ctx.Done(): case <-ctx.Done():
case <-c.closeChan: case <-c.closeChan:

View file

@ -16,7 +16,10 @@ func (c *Client) Close() {
// closing should be done via the channel // closing should be done via the channel
// to prevent switching to another RPC node // to prevent switching to another RPC node
// in the notification loop // in the notification loop
if c.closed.CompareAndSwap(false, true) {
close(c.closeChan) close(c.closeChan)
}
c.wg.Wait()
} }
// ReceiveExecutionNotifications performs subscription for notifications // ReceiveExecutionNotifications performs subscription for notifications

View file

@ -96,6 +96,8 @@ type ListenerParams struct {
type listener struct { type listener struct {
mtx sync.RWMutex mtx sync.RWMutex
wg sync.WaitGroup
startOnce, stopOnce sync.Once startOnce, stopOnce sync.Once
started bool started bool
@ -124,6 +126,12 @@ var (
errNilLogger = errors.New("nil logger") errNilLogger = errors.New("nil logger")
errNilSubscriber = errors.New("nil event subscriber") errNilSubscriber = errors.New("nil event subscriber")
errNotificationSubscrConnectionTerminated = errors.New("event subscriber connection has been terminated")
errNotarySubscrConnectionTerminated = errors.New("notary event subscriber connection has been terminated")
errBlockNotificationChannelClosed = errors.New("new block notification channel is closed")
) )
// Listen starts the listening for events with registered handlers. // Listen starts the listening for events with registered handlers.
@ -133,6 +141,8 @@ var (
// Returns an error if listener was already started. // Returns an error if listener was already started.
func (l *listener) Listen(ctx context.Context) { func (l *listener) Listen(ctx context.Context) {
l.startOnce.Do(func() { l.startOnce.Do(func() {
l.wg.Add(1)
defer l.wg.Done()
if err := l.listen(ctx, nil); err != nil { if err := l.listen(ctx, nil); err != nil {
l.log.Error(logs.EventCouldNotStartListenToEvents, l.log.Error(logs.EventCouldNotStartListenToEvents,
zap.String("error", err.Error()), zap.String("error", err.Error()),
@ -149,11 +159,13 @@ func (l *listener) Listen(ctx context.Context) {
// Returns an error if listener was already started. // Returns an error if listener was already started.
func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) { func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
l.startOnce.Do(func() { l.startOnce.Do(func() {
l.wg.Add(1)
defer l.wg.Done()
if err := l.listen(ctx, intError); err != nil { if err := l.listen(ctx, intError); err != nil {
l.log.Error(logs.EventCouldNotStartListenToEvents, l.log.Error(logs.EventCouldNotStartListenToEvents,
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
intError <- err l.sendError(ctx, intError, err)
} }
}) })
} }
@ -172,6 +184,8 @@ func (l *listener) listen(ctx context.Context, intError chan<- error) error {
} }
func (l *listener) subscribe(errCh chan error) { func (l *listener) subscribe(errCh chan error) {
l.wg.Add(1)
defer l.wg.Done()
// create the list of listening contract hashes // create the list of listening contract hashes
hashes := make([]util.Uint160, 0) hashes := make([]util.Uint160, 0)
@ -212,6 +226,23 @@ func (l *listener) subscribe(errCh chan error) {
} }
} }
func (l *listener) sendError(ctx context.Context, intErr chan<- error, err error) bool {
if intErr == nil {
return false
}
// This select required because were are reading from error channel and closing listener
// in the same routine when shutting down node.
select {
case <-ctx.Done():
l.log.Info(logs.EventStopEventListenerByContext,
zap.String("reason", ctx.Err().Error()),
)
return false
case intErr <- err:
return true
}
}
func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) { func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) {
chs := l.subscriber.NotificationChannels() chs := l.subscriber.NotificationChannels()
@ -219,12 +250,9 @@ loop:
for { for {
select { select {
case err := <-subErrCh: case err := <-subErrCh:
if intErr != nil { if !l.sendError(ctx, intErr, err) {
intErr <- err
} else {
l.log.Error(logs.EventStopEventListenerByError, zap.Error(err)) l.log.Error(logs.EventStopEventListenerByError, zap.Error(err))
} }
break loop break loop
case <-ctx.Done(): case <-ctx.Done():
l.log.Info(logs.EventStopEventListenerByContext, l.log.Info(logs.EventStopEventListenerByContext,
@ -234,10 +262,7 @@ loop:
case notifyEvent, ok := <-chs.NotificationsCh: case notifyEvent, ok := <-chs.NotificationsCh:
if !ok { if !ok {
l.log.Warn(logs.EventStopEventListenerByNotificationChannel) l.log.Warn(logs.EventStopEventListenerByNotificationChannel)
if intErr != nil { l.sendError(ctx, intErr, errNotificationSubscrConnectionTerminated)
intErr <- errors.New("event subscriber connection has been terminated")
}
break loop break loop
} else if notifyEvent == nil { } else if notifyEvent == nil {
l.log.Warn(logs.EventNilNotificationEventWasCaught) l.log.Warn(logs.EventNilNotificationEventWasCaught)
@ -248,10 +273,7 @@ loop:
case notaryEvent, ok := <-chs.NotaryRequestsCh: case notaryEvent, ok := <-chs.NotaryRequestsCh:
if !ok { if !ok {
l.log.Warn(logs.EventStopEventListenerByNotaryChannel) l.log.Warn(logs.EventStopEventListenerByNotaryChannel)
if intErr != nil { l.sendError(ctx, intErr, errNotarySubscrConnectionTerminated)
intErr <- errors.New("notary event subscriber connection has been terminated")
}
break loop break loop
} else if notaryEvent == nil { } else if notaryEvent == nil {
l.log.Warn(logs.EventNilNotaryEventWasCaught) l.log.Warn(logs.EventNilNotaryEventWasCaught)
@ -262,10 +284,7 @@ loop:
case b, ok := <-chs.BlockCh: case b, ok := <-chs.BlockCh:
if !ok { if !ok {
l.log.Warn(logs.EventStopEventListenerByBlockChannel) l.log.Warn(logs.EventStopEventListenerByBlockChannel)
if intErr != nil { l.sendError(ctx, intErr, errBlockNotificationChannelClosed)
intErr <- errors.New("new block notification channel is closed")
}
break loop break loop
} else if b == nil { } else if b == nil {
l.log.Warn(logs.EventNilBlockWasCaught) l.log.Warn(logs.EventNilBlockWasCaught)
@ -603,6 +622,7 @@ func (l *listener) Stop() {
l.stopOnce.Do(func() { l.stopOnce.Do(func() {
l.subscriber.Close() l.subscriber.Close()
}) })
l.wg.Wait()
} }
func (l *listener) RegisterBlockHandler(handler BlockHandler) { func (l *listener) RegisterBlockHandler(handler BlockHandler) {