forked from TrueCloudLab/frostfs-node
[#749] morph: Fix panic when closing morph client
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
c80b46fad3
commit
a26483fc30
5 changed files with 49 additions and 18 deletions
|
@ -207,6 +207,10 @@ func listenMorphNotifications(ctx context.Context, c *cfg) {
|
||||||
})
|
})
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
c.onShutdown(func() {
|
||||||
|
lis.Stop()
|
||||||
|
})
|
||||||
|
|
||||||
c.workers = append(c.workers, newWorkerFromFunc(func(wCtx context.Context) {
|
c.workers = append(c.workers, newWorkerFromFunc(func(wCtx context.Context) {
|
||||||
runAndLog(wCtx, c, "morph notification", false, func(lCtx context.Context, c *cfg) {
|
runAndLog(wCtx, c, "morph notification", false, func(lCtx context.Context, c *cfg) {
|
||||||
lis.ListenWithError(lCtx, c.internalErr)
|
lis.ListenWithError(lCtx, c.internalErr)
|
||||||
|
|
|
@ -73,6 +73,8 @@ type Client struct {
|
||||||
|
|
||||||
// channel for internal stop
|
// channel for internal stop
|
||||||
closeChan chan struct{}
|
closeChan chan struct{}
|
||||||
|
closed atomic.Bool
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
// indicates that Client is not able to
|
// indicates that Client is not able to
|
||||||
// establish connection to any of the
|
// establish connection to any of the
|
||||||
|
|
|
@ -75,6 +75,8 @@ func (c *Client) SwitchRPC(ctx context.Context) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) closeWaiter(ctx context.Context) {
|
func (c *Client) closeWaiter(ctx context.Context) {
|
||||||
|
c.wg.Add(1)
|
||||||
|
defer c.wg.Done()
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
case <-c.closeChan:
|
case <-c.closeChan:
|
||||||
|
|
|
@ -16,8 +16,11 @@ func (c *Client) Close() {
|
||||||
// closing should be done via the channel
|
// closing should be done via the channel
|
||||||
// to prevent switching to another RPC node
|
// to prevent switching to another RPC node
|
||||||
// in the notification loop
|
// in the notification loop
|
||||||
|
if c.closed.CompareAndSwap(false, true) {
|
||||||
close(c.closeChan)
|
close(c.closeChan)
|
||||||
}
|
}
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// ReceiveExecutionNotifications performs subscription for notifications
|
// ReceiveExecutionNotifications performs subscription for notifications
|
||||||
// generated during contract execution. Events are sent to the specified channel.
|
// generated during contract execution. Events are sent to the specified channel.
|
||||||
|
|
|
@ -96,6 +96,8 @@ type ListenerParams struct {
|
||||||
type listener struct {
|
type listener struct {
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
startOnce, stopOnce sync.Once
|
startOnce, stopOnce sync.Once
|
||||||
|
|
||||||
started bool
|
started bool
|
||||||
|
@ -124,6 +126,12 @@ var (
|
||||||
errNilLogger = errors.New("nil logger")
|
errNilLogger = errors.New("nil logger")
|
||||||
|
|
||||||
errNilSubscriber = errors.New("nil event subscriber")
|
errNilSubscriber = errors.New("nil event subscriber")
|
||||||
|
|
||||||
|
errNotificationSubscrConnectionTerminated = errors.New("event subscriber connection has been terminated")
|
||||||
|
|
||||||
|
errNotarySubscrConnectionTerminated = errors.New("notary event subscriber connection has been terminated")
|
||||||
|
|
||||||
|
errBlockNotificationChannelClosed = errors.New("new block notification channel is closed")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Listen starts the listening for events with registered handlers.
|
// Listen starts the listening for events with registered handlers.
|
||||||
|
@ -133,6 +141,8 @@ var (
|
||||||
// Returns an error if listener was already started.
|
// Returns an error if listener was already started.
|
||||||
func (l *listener) Listen(ctx context.Context) {
|
func (l *listener) Listen(ctx context.Context) {
|
||||||
l.startOnce.Do(func() {
|
l.startOnce.Do(func() {
|
||||||
|
l.wg.Add(1)
|
||||||
|
defer l.wg.Done()
|
||||||
if err := l.listen(ctx, nil); err != nil {
|
if err := l.listen(ctx, nil); err != nil {
|
||||||
l.log.Error(logs.EventCouldNotStartListenToEvents,
|
l.log.Error(logs.EventCouldNotStartListenToEvents,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -149,11 +159,13 @@ func (l *listener) Listen(ctx context.Context) {
|
||||||
// Returns an error if listener was already started.
|
// Returns an error if listener was already started.
|
||||||
func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
|
func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
|
||||||
l.startOnce.Do(func() {
|
l.startOnce.Do(func() {
|
||||||
|
l.wg.Add(1)
|
||||||
|
defer l.wg.Done()
|
||||||
if err := l.listen(ctx, intError); err != nil {
|
if err := l.listen(ctx, intError); err != nil {
|
||||||
l.log.Error(logs.EventCouldNotStartListenToEvents,
|
l.log.Error(logs.EventCouldNotStartListenToEvents,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
intError <- err
|
l.sendError(ctx, intError, err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -172,6 +184,8 @@ func (l *listener) listen(ctx context.Context, intError chan<- error) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *listener) subscribe(errCh chan error) {
|
func (l *listener) subscribe(errCh chan error) {
|
||||||
|
l.wg.Add(1)
|
||||||
|
defer l.wg.Done()
|
||||||
// create the list of listening contract hashes
|
// create the list of listening contract hashes
|
||||||
hashes := make([]util.Uint160, 0)
|
hashes := make([]util.Uint160, 0)
|
||||||
|
|
||||||
|
@ -212,6 +226,23 @@ func (l *listener) subscribe(errCh chan error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *listener) sendError(ctx context.Context, intErr chan<- error, err error) bool {
|
||||||
|
if intErr == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// This select required because were are reading from error channel and closing listener
|
||||||
|
// in the same routine when shutting down node.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
l.log.Info(logs.EventStopEventListenerByContext,
|
||||||
|
zap.String("reason", ctx.Err().Error()),
|
||||||
|
)
|
||||||
|
return false
|
||||||
|
case intErr <- err:
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) {
|
func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) {
|
||||||
chs := l.subscriber.NotificationChannels()
|
chs := l.subscriber.NotificationChannels()
|
||||||
|
|
||||||
|
@ -219,12 +250,9 @@ loop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case err := <-subErrCh:
|
case err := <-subErrCh:
|
||||||
if intErr != nil {
|
if !l.sendError(ctx, intErr, err) {
|
||||||
intErr <- err
|
|
||||||
} else {
|
|
||||||
l.log.Error(logs.EventStopEventListenerByError, zap.Error(err))
|
l.log.Error(logs.EventStopEventListenerByError, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
break loop
|
break loop
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
l.log.Info(logs.EventStopEventListenerByContext,
|
l.log.Info(logs.EventStopEventListenerByContext,
|
||||||
|
@ -234,10 +262,7 @@ loop:
|
||||||
case notifyEvent, ok := <-chs.NotificationsCh:
|
case notifyEvent, ok := <-chs.NotificationsCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
l.log.Warn(logs.EventStopEventListenerByNotificationChannel)
|
l.log.Warn(logs.EventStopEventListenerByNotificationChannel)
|
||||||
if intErr != nil {
|
l.sendError(ctx, intErr, errNotificationSubscrConnectionTerminated)
|
||||||
intErr <- errors.New("event subscriber connection has been terminated")
|
|
||||||
}
|
|
||||||
|
|
||||||
break loop
|
break loop
|
||||||
} else if notifyEvent == nil {
|
} else if notifyEvent == nil {
|
||||||
l.log.Warn(logs.EventNilNotificationEventWasCaught)
|
l.log.Warn(logs.EventNilNotificationEventWasCaught)
|
||||||
|
@ -248,10 +273,7 @@ loop:
|
||||||
case notaryEvent, ok := <-chs.NotaryRequestsCh:
|
case notaryEvent, ok := <-chs.NotaryRequestsCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
l.log.Warn(logs.EventStopEventListenerByNotaryChannel)
|
l.log.Warn(logs.EventStopEventListenerByNotaryChannel)
|
||||||
if intErr != nil {
|
l.sendError(ctx, intErr, errNotarySubscrConnectionTerminated)
|
||||||
intErr <- errors.New("notary event subscriber connection has been terminated")
|
|
||||||
}
|
|
||||||
|
|
||||||
break loop
|
break loop
|
||||||
} else if notaryEvent == nil {
|
} else if notaryEvent == nil {
|
||||||
l.log.Warn(logs.EventNilNotaryEventWasCaught)
|
l.log.Warn(logs.EventNilNotaryEventWasCaught)
|
||||||
|
@ -262,10 +284,7 @@ loop:
|
||||||
case b, ok := <-chs.BlockCh:
|
case b, ok := <-chs.BlockCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
l.log.Warn(logs.EventStopEventListenerByBlockChannel)
|
l.log.Warn(logs.EventStopEventListenerByBlockChannel)
|
||||||
if intErr != nil {
|
l.sendError(ctx, intErr, errBlockNotificationChannelClosed)
|
||||||
intErr <- errors.New("new block notification channel is closed")
|
|
||||||
}
|
|
||||||
|
|
||||||
break loop
|
break loop
|
||||||
} else if b == nil {
|
} else if b == nil {
|
||||||
l.log.Warn(logs.EventNilBlockWasCaught)
|
l.log.Warn(logs.EventNilBlockWasCaught)
|
||||||
|
@ -603,6 +622,7 @@ func (l *listener) Stop() {
|
||||||
l.stopOnce.Do(func() {
|
l.stopOnce.Do(func() {
|
||||||
l.subscriber.Close()
|
l.subscriber.Close()
|
||||||
})
|
})
|
||||||
|
l.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *listener) RegisterBlockHandler(handler BlockHandler) {
|
func (l *listener) RegisterBlockHandler(handler BlockHandler) {
|
||||||
|
|
Loading…
Reference in a new issue