morph: Fix panic when closing morph client #760
5 changed files with 49 additions and 18 deletions
|
@ -207,6 +207,10 @@ func listenMorphNotifications(ctx context.Context, c *cfg) {
|
||||||
})
|
})
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
c.onShutdown(func() {
|
||||||
|
lis.Stop()
|
||||||
|
})
|
||||||
|
|
||||||
c.workers = append(c.workers, newWorkerFromFunc(func(wCtx context.Context) {
|
c.workers = append(c.workers, newWorkerFromFunc(func(wCtx context.Context) {
|
||||||
runAndLog(wCtx, c, "morph notification", false, func(lCtx context.Context, c *cfg) {
|
runAndLog(wCtx, c, "morph notification", false, func(lCtx context.Context, c *cfg) {
|
||||||
lis.ListenWithError(lCtx, c.internalErr)
|
lis.ListenWithError(lCtx, c.internalErr)
|
||||||
|
|
|
@ -73,6 +73,8 @@ type Client struct {
|
||||||
|
|
||||||
// channel for internal stop
|
// channel for internal stop
|
||||||
closeChan chan struct{}
|
closeChan chan struct{}
|
||||||
|
closed atomic.Bool
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
// indicates that Client is not able to
|
// indicates that Client is not able to
|
||||||
// establish connection to any of the
|
// establish connection to any of the
|
||||||
|
|
|
@ -75,6 +75,8 @@ func (c *Client) SwitchRPC(ctx context.Context) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) closeWaiter(ctx context.Context) {
|
func (c *Client) closeWaiter(ctx context.Context) {
|
||||||
|
c.wg.Add(1)
|
||||||
|
defer c.wg.Done()
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
case <-c.closeChan:
|
case <-c.closeChan:
|
||||||
|
|
|
@ -16,8 +16,11 @@ func (c *Client) Close() {
|
||||||
// closing should be done via the channel
|
// closing should be done via the channel
|
||||||
// to prevent switching to another RPC node
|
// to prevent switching to another RPC node
|
||||||
// in the notification loop
|
// in the notification loop
|
||||||
|
if c.closed.CompareAndSwap(false, true) {
|
||||||
close(c.closeChan)
|
close(c.closeChan)
|
||||||
}
|
}
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// ReceiveExecutionNotifications performs subscription for notifications
|
// ReceiveExecutionNotifications performs subscription for notifications
|
||||||
// generated during contract execution. Events are sent to the specified channel.
|
// generated during contract execution. Events are sent to the specified channel.
|
||||||
|
|
|
@ -96,6 +96,8 @@ type ListenerParams struct {
|
||||||
type listener struct {
|
type listener struct {
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
startOnce, stopOnce sync.Once
|
startOnce, stopOnce sync.Once
|
||||||
|
|
||||||
started bool
|
started bool
|
||||||
|
@ -124,6 +126,12 @@ var (
|
||||||
errNilLogger = errors.New("nil logger")
|
errNilLogger = errors.New("nil logger")
|
||||||
|
|
||||||
errNilSubscriber = errors.New("nil event subscriber")
|
errNilSubscriber = errors.New("nil event subscriber")
|
||||||
|
|
||||||
|
errNotificationSubscrConnectionTerminated = errors.New("event subscriber connection has been terminated")
|
||||||
|
|
||||||
|
errNotarySubscrConnectionTerminated = errors.New("notary event subscriber connection has been terminated")
|
||||||
|
|
||||||
|
errBlockNotificationChannelClosed = errors.New("new block notification channel is closed")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Listen starts the listening for events with registered handlers.
|
// Listen starts the listening for events with registered handlers.
|
||||||
|
@ -133,6 +141,8 @@ var (
|
||||||
// Returns an error if listener was already started.
|
// Returns an error if listener was already started.
|
||||||
func (l *listener) Listen(ctx context.Context) {
|
func (l *listener) Listen(ctx context.Context) {
|
||||||
l.startOnce.Do(func() {
|
l.startOnce.Do(func() {
|
||||||
|
l.wg.Add(1)
|
||||||
|
defer l.wg.Done()
|
||||||
if err := l.listen(ctx, nil); err != nil {
|
if err := l.listen(ctx, nil); err != nil {
|
||||||
l.log.Error(logs.EventCouldNotStartListenToEvents,
|
l.log.Error(logs.EventCouldNotStartListenToEvents,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -149,11 +159,13 @@ func (l *listener) Listen(ctx context.Context) {
|
||||||
// Returns an error if listener was already started.
|
// Returns an error if listener was already started.
|
||||||
func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
|
func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
|
||||||
l.startOnce.Do(func() {
|
l.startOnce.Do(func() {
|
||||||
|
l.wg.Add(1)
|
||||||
|
defer l.wg.Done()
|
||||||
if err := l.listen(ctx, intError); err != nil {
|
if err := l.listen(ctx, intError); err != nil {
|
||||||
l.log.Error(logs.EventCouldNotStartListenToEvents,
|
l.log.Error(logs.EventCouldNotStartListenToEvents,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
intError <- err
|
l.sendError(ctx, intError, err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -172,6 +184,8 @@ func (l *listener) listen(ctx context.Context, intError chan<- error) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *listener) subscribe(errCh chan error) {
|
func (l *listener) subscribe(errCh chan error) {
|
||||||
|
l.wg.Add(1)
|
||||||
|
defer l.wg.Done()
|
||||||
// create the list of listening contract hashes
|
// create the list of listening contract hashes
|
||||||
hashes := make([]util.Uint160, 0)
|
hashes := make([]util.Uint160, 0)
|
||||||
|
|
||||||
|
@ -212,6 +226,23 @@ func (l *listener) subscribe(errCh chan error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *listener) sendError(ctx context.Context, intErr chan<- error, err error) bool {
|
||||||
|
if intErr == nil {
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
|
return false
|
||||||
|
}
|
||||||
|
// This select required because were are reading from error channel and closing listener
|
||||||
|
// in the same routine when shutting down node.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
l.log.Info(logs.EventStopEventListenerByContext,
|
||||||
|
zap.String("reason", ctx.Err().Error()),
|
||||||
|
)
|
||||||
|
return false
|
||||||
|
case intErr <- err:
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) {
|
func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) {
|
||||||
chs := l.subscriber.NotificationChannels()
|
chs := l.subscriber.NotificationChannels()
|
||||||
|
|
||||||
|
@ -219,12 +250,9 @@ loop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case err := <-subErrCh:
|
case err := <-subErrCh:
|
||||||
if intErr != nil {
|
if !l.sendError(ctx, intErr, err) {
|
||||||
intErr <- err
|
|
||||||
} else {
|
|
||||||
l.log.Error(logs.EventStopEventListenerByError, zap.Error(err))
|
l.log.Error(logs.EventStopEventListenerByError, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
break loop
|
break loop
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
May be also move May be also move `if intErr != nil` logic in function?
acid-ant
commented
The idea was to avoid allocation of error to pass it to the function which should send it. How about to pass string only and allocate error inside the function? The idea was to avoid allocation of error to pass it to the function which should send it. How about to pass string only and allocate error inside the function?
fyrchik
commented
Ah, yeah, missed that. Ah, yeah, missed that.
But the error could easily be moved to a global varable, it is always the same.
|
|||||||
l.log.Info(logs.EventStopEventListenerByContext,
|
l.log.Info(logs.EventStopEventListenerByContext,
|
||||||
|
@ -234,10 +262,7 @@ loop:
|
||||||
case notifyEvent, ok := <-chs.NotificationsCh:
|
case notifyEvent, ok := <-chs.NotificationsCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
l.log.Warn(logs.EventStopEventListenerByNotificationChannel)
|
l.log.Warn(logs.EventStopEventListenerByNotificationChannel)
|
||||||
if intErr != nil {
|
l.sendError(ctx, intErr, errNotificationSubscrConnectionTerminated)
|
||||||
intErr <- errors.New("event subscriber connection has been terminated")
|
|
||||||
}
|
|
||||||
|
|
||||||
break loop
|
break loop
|
||||||
} else if notifyEvent == nil {
|
} else if notifyEvent == nil {
|
||||||
l.log.Warn(logs.EventNilNotificationEventWasCaught)
|
l.log.Warn(logs.EventNilNotificationEventWasCaught)
|
||||||
|
@ -248,10 +273,7 @@ loop:
|
||||||
case notaryEvent, ok := <-chs.NotaryRequestsCh:
|
case notaryEvent, ok := <-chs.NotaryRequestsCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
l.log.Warn(logs.EventStopEventListenerByNotaryChannel)
|
l.log.Warn(logs.EventStopEventListenerByNotaryChannel)
|
||||||
if intErr != nil {
|
l.sendError(ctx, intErr, errNotarySubscrConnectionTerminated)
|
||||||
intErr <- errors.New("notary event subscriber connection has been terminated")
|
|
||||||
}
|
|
||||||
|
|
||||||
break loop
|
break loop
|
||||||
} else if notaryEvent == nil {
|
} else if notaryEvent == nil {
|
||||||
l.log.Warn(logs.EventNilNotaryEventWasCaught)
|
l.log.Warn(logs.EventNilNotaryEventWasCaught)
|
||||||
|
@ -262,10 +284,7 @@ loop:
|
||||||
case b, ok := <-chs.BlockCh:
|
case b, ok := <-chs.BlockCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
l.log.Warn(logs.EventStopEventListenerByBlockChannel)
|
l.log.Warn(logs.EventStopEventListenerByBlockChannel)
|
||||||
if intErr != nil {
|
l.sendError(ctx, intErr, errBlockNotificationChannelClosed)
|
||||||
intErr <- errors.New("new block notification channel is closed")
|
|
||||||
}
|
|
||||||
|
|
||||||
break loop
|
break loop
|
||||||
} else if b == nil {
|
} else if b == nil {
|
||||||
l.log.Warn(logs.EventNilBlockWasCaught)
|
l.log.Warn(logs.EventNilBlockWasCaught)
|
||||||
|
@ -603,6 +622,7 @@ func (l *listener) Stop() {
|
||||||
l.stopOnce.Do(func() {
|
l.stopOnce.Do(func() {
|
||||||
l.subscriber.Close()
|
l.subscriber.Close()
|
||||||
})
|
})
|
||||||
|
l.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *listener) RegisterBlockHandler(handler BlockHandler) {
|
func (l *listener) RegisterBlockHandler(handler BlockHandler) {
|
||||||
|
|
Loading…
Add table
Reference in a new issue
nitpick, mut maybe
if intErr == nil { return false }
and reduce indentation for other code?Agree, updated.