morph: Fix panic when closing morph client #760
5 changed files with 49 additions and 18 deletions
|
@ -207,6 +207,10 @@ func listenMorphNotifications(ctx context.Context, c *cfg) {
|
|||
})
|
||||
fatalOnErr(err)
|
||||
|
||||
c.onShutdown(func() {
|
||||
lis.Stop()
|
||||
})
|
||||
|
||||
c.workers = append(c.workers, newWorkerFromFunc(func(wCtx context.Context) {
|
||||
runAndLog(wCtx, c, "morph notification", false, func(lCtx context.Context, c *cfg) {
|
||||
lis.ListenWithError(lCtx, c.internalErr)
|
||||
|
|
|
@ -73,6 +73,8 @@ type Client struct {
|
|||
|
||||
// channel for internal stop
|
||||
closeChan chan struct{}
|
||||
closed atomic.Bool
|
||||
wg sync.WaitGroup
|
||||
|
||||
// indicates that Client is not able to
|
||||
// establish connection to any of the
|
||||
|
|
|
@ -75,6 +75,8 @@ func (c *Client) SwitchRPC(ctx context.Context) bool {
|
|||
}
|
||||
|
||||
func (c *Client) closeWaiter(ctx context.Context) {
|
||||
c.wg.Add(1)
|
||||
defer c.wg.Done()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-c.closeChan:
|
||||
|
|
|
@ -16,7 +16,10 @@ func (c *Client) Close() {
|
|||
// closing should be done via the channel
|
||||
// to prevent switching to another RPC node
|
||||
// in the notification loop
|
||||
close(c.closeChan)
|
||||
if c.closed.CompareAndSwap(false, true) {
|
||||
close(c.closeChan)
|
||||
}
|
||||
c.wg.Wait()
|
||||
}
|
||||
|
||||
// ReceiveExecutionNotifications performs subscription for notifications
|
||||
|
|
|
@ -96,6 +96,8 @@ type ListenerParams struct {
|
|||
type listener struct {
|
||||
mtx sync.RWMutex
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
startOnce, stopOnce sync.Once
|
||||
|
||||
started bool
|
||||
|
@ -124,6 +126,12 @@ var (
|
|||
errNilLogger = errors.New("nil logger")
|
||||
|
||||
errNilSubscriber = errors.New("nil event subscriber")
|
||||
|
||||
errNotificationSubscrConnectionTerminated = errors.New("event subscriber connection has been terminated")
|
||||
|
||||
errNotarySubscrConnectionTerminated = errors.New("notary event subscriber connection has been terminated")
|
||||
|
||||
errBlockNotificationChannelClosed = errors.New("new block notification channel is closed")
|
||||
)
|
||||
|
||||
// Listen starts the listening for events with registered handlers.
|
||||
|
@ -133,6 +141,8 @@ var (
|
|||
// Returns an error if listener was already started.
|
||||
func (l *listener) Listen(ctx context.Context) {
|
||||
l.startOnce.Do(func() {
|
||||
l.wg.Add(1)
|
||||
defer l.wg.Done()
|
||||
if err := l.listen(ctx, nil); err != nil {
|
||||
l.log.Error(logs.EventCouldNotStartListenToEvents,
|
||||
zap.String("error", err.Error()),
|
||||
|
@ -149,11 +159,13 @@ func (l *listener) Listen(ctx context.Context) {
|
|||
// Returns an error if listener was already started.
|
||||
func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
|
||||
l.startOnce.Do(func() {
|
||||
l.wg.Add(1)
|
||||
defer l.wg.Done()
|
||||
if err := l.listen(ctx, intError); err != nil {
|
||||
l.log.Error(logs.EventCouldNotStartListenToEvents,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
intError <- err
|
||||
l.sendError(ctx, intError, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -172,6 +184,8 @@ func (l *listener) listen(ctx context.Context, intError chan<- error) error {
|
|||
}
|
||||
|
||||
func (l *listener) subscribe(errCh chan error) {
|
||||
l.wg.Add(1)
|
||||
defer l.wg.Done()
|
||||
// create the list of listening contract hashes
|
||||
hashes := make([]util.Uint160, 0)
|
||||
|
||||
|
@ -212,6 +226,23 @@ func (l *listener) subscribe(errCh chan error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (l *listener) sendError(ctx context.Context, intErr chan<- error, err error) bool {
|
||||
if intErr == nil {
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
return false
|
||||
}
|
||||
// This select required because were are reading from error channel and closing listener
|
||||
// in the same routine when shutting down node.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
l.log.Info(logs.EventStopEventListenerByContext,
|
||||
zap.String("reason", ctx.Err().Error()),
|
||||
)
|
||||
return false
|
||||
case intErr <- err:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) {
|
||||
chs := l.subscriber.NotificationChannels()
|
||||
|
||||
|
@ -219,12 +250,9 @@ loop:
|
|||
for {
|
||||
select {
|
||||
case err := <-subErrCh:
|
||||
if intErr != nil {
|
||||
intErr <- err
|
||||
} else {
|
||||
if !l.sendError(ctx, intErr, err) {
|
||||
l.log.Error(logs.EventStopEventListenerByError, zap.Error(err))
|
||||
}
|
||||
|
||||
break loop
|
||||
case <-ctx.Done():
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
May be also move May be also move `if intErr != nil` logic in function?
acid-ant
commented
The idea was to avoid allocation of error to pass it to the function which should send it. How about to pass string only and allocate error inside the function? The idea was to avoid allocation of error to pass it to the function which should send it. How about to pass string only and allocate error inside the function?
fyrchik
commented
Ah, yeah, missed that. Ah, yeah, missed that.
But the error could easily be moved to a global varable, it is always the same.
|
||||
l.log.Info(logs.EventStopEventListenerByContext,
|
||||
|
@ -234,10 +262,7 @@ loop:
|
|||
case notifyEvent, ok := <-chs.NotificationsCh:
|
||||
if !ok {
|
||||
l.log.Warn(logs.EventStopEventListenerByNotificationChannel)
|
||||
if intErr != nil {
|
||||
intErr <- errors.New("event subscriber connection has been terminated")
|
||||
}
|
||||
|
||||
l.sendError(ctx, intErr, errNotificationSubscrConnectionTerminated)
|
||||
break loop
|
||||
} else if notifyEvent == nil {
|
||||
l.log.Warn(logs.EventNilNotificationEventWasCaught)
|
||||
|
@ -248,10 +273,7 @@ loop:
|
|||
case notaryEvent, ok := <-chs.NotaryRequestsCh:
|
||||
if !ok {
|
||||
l.log.Warn(logs.EventStopEventListenerByNotaryChannel)
|
||||
if intErr != nil {
|
||||
intErr <- errors.New("notary event subscriber connection has been terminated")
|
||||
}
|
||||
|
||||
l.sendError(ctx, intErr, errNotarySubscrConnectionTerminated)
|
||||
break loop
|
||||
} else if notaryEvent == nil {
|
||||
l.log.Warn(logs.EventNilNotaryEventWasCaught)
|
||||
|
@ -262,10 +284,7 @@ loop:
|
|||
case b, ok := <-chs.BlockCh:
|
||||
if !ok {
|
||||
l.log.Warn(logs.EventStopEventListenerByBlockChannel)
|
||||
if intErr != nil {
|
||||
intErr <- errors.New("new block notification channel is closed")
|
||||
}
|
||||
|
||||
l.sendError(ctx, intErr, errBlockNotificationChannelClosed)
|
||||
break loop
|
||||
} else if b == nil {
|
||||
l.log.Warn(logs.EventNilBlockWasCaught)
|
||||
|
@ -603,6 +622,7 @@ func (l *listener) Stop() {
|
|||
l.stopOnce.Do(func() {
|
||||
l.subscriber.Close()
|
||||
})
|
||||
l.wg.Wait()
|
||||
}
|
||||
|
||||
func (l *listener) RegisterBlockHandler(handler BlockHandler) {
|
||||
|
|
Loading…
Reference in a new issue
nitpick, mut maybe
if intErr == nil { return false }
and reduce indentation for other code?Agree, updated.