From a69c6d1ec9e43f49e6e588d8770ea05ccbcef050 Mon Sep 17 00:00:00 2001
From: Pavel Karpy
Date: Mon, 27 Feb 2023 17:17:55 +0300
Subject: [PATCH] [#2272] morph: Do not subscribe to events without listening
It led to a neo-go dead-lock in the `subscriber` component. Subscribing to
notifications is the same RPC as any others, so it could also be blocked
forever if no async listening (reading the notification channel) routine
exists. If a number of subscriptions is big enough (or a caller is lucky
enough) subscribing loop might have not finished subscribing before the
first notification is received and then: subscribing RPC is blocked by
received notification (non)handling and listening notifications routine is
blocked by not finished subscription loop.
That commit starts listening notification channel _before_ any subscription
actions.
Signed-off-by: Pavel Karpy
---
CHANGELOG.md | 2 +-
pkg/morph/event/listener.go | 87 ++++++++++++++----------------
pkg/morph/subscriber/subscriber.go | 41 +++++++++-----
3 files changed, 69 insertions(+), 61 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ed92f3a53..6793ed340 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -44,7 +44,7 @@ Changelog for FrostFS Node
- Possible deadlock in write-cache (#2239)
- Fix `*_req_count` and `*_req_count_success` metric values (#2241)
- Storage ID update by write-cache (#2244)
-- `neo-go` client deadlock on subscription restoration (#2244)
+- `neo-go` client deadlock on subscription (#2244, #2272)
- Possible panic during write-cache initialization (#2234)
- Do not fetch an object if `meta` is missing it (#61)
- Create contract wallet only by `init` and `update-config` command (#63)
diff --git a/pkg/morph/event/listener.go b/pkg/morph/event/listener.go
index ed2b95026..64fdc3df3 100644
--- a/pkg/morph/event/listener.go
+++ b/pkg/morph/event/listener.go
@@ -9,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
- "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/util"
@@ -158,6 +157,19 @@ func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
}
func (l *listener) listen(ctx context.Context, intError chan<- error) error {
+ // mark listener as started
+ l.started = true
+
+ subErrCh := make(chan error)
+
+ go l.subscribe(subErrCh)
+
+ l.listenLoop(ctx, intError, subErrCh)
+
+ return nil
+}
+
+func (l *listener) subscribe(errCh chan error) {
// create the list of listening contract hashes
hashes := make([]util.Uint160, 0)
@@ -175,71 +187,50 @@ func (l *listener) listen(ctx context.Context, intError chan<- error) error {
hashes = append(hashes, hashType.ScriptHash())
}
-
- // mark listener as started
- l.started = true
-
l.mtx.RUnlock()
- chEvent, err := l.subscriber.SubscribeForNotification(hashes...)
+ err := l.subscriber.SubscribeForNotification(hashes...)
if err != nil {
- return err
+ errCh <- fmt.Errorf("could not subscribe for notifications: %w", err)
+ return
}
- l.listenLoop(ctx, chEvent, intError)
-
- return nil
-}
-
-// nolint: funlen, gocognit
-func (l *listener) listenLoop(ctx context.Context, chEvent <-chan *state.ContainedNotificationEvent, intErr chan<- error) {
- var (
- blockChan <-chan *block.Block
-
- notaryChan <-chan *result.NotaryRequestEvent
-
- err error
- )
-
if len(l.blockHandlers) > 0 {
- if blockChan, err = l.subscriber.BlockNotifications(); err != nil {
- if intErr != nil {
- intErr <- fmt.Errorf("could not open block notifications channel: %w", err)
- } else {
- l.log.Debug("could not open block notifications channel",
- zap.String("error", err.Error()),
- )
- }
-
+ if err = l.subscriber.BlockNotifications(); err != nil {
+ errCh <- fmt.Errorf("could not subscribe for blocks: %w", err)
return
}
- } else {
- blockChan = make(chan *block.Block)
}
if l.listenNotary {
- if notaryChan, err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil {
- if intErr != nil {
- intErr <- fmt.Errorf("could not open notary notifications channel: %w", err)
- } else {
- l.log.Debug("could not open notary notifications channel",
- zap.String("error", err.Error()),
- )
- }
-
+ if err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil {
+ errCh <- fmt.Errorf("could not subscribe for notary requests: %w", err)
return
}
}
+}
+
+// nolint: funlen, gocognit
+func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) {
+ chs := l.subscriber.NotificationChannels()
loop:
for {
select {
+ case err := <-subErrCh:
+ if intErr != nil {
+ intErr <- err
+ } else {
+ l.log.Error("stop event listener by error", zap.Error(err))
+ }
+
+ break loop
case <-ctx.Done():
l.log.Info("stop event listener by context",
zap.String("reason", ctx.Err().Error()),
)
break loop
- case notifyEvent, ok := <-chEvent:
+ case notifyEvent, ok := <-chs.NotificationsCh:
if !ok {
l.log.Warn("stop event listener by notification channel")
if intErr != nil {
@@ -252,13 +243,13 @@ loop:
continue loop
}
- if err = l.pool.Submit(func() {
+ if err := l.pool.Submit(func() {
l.parseAndHandleNotification(notifyEvent)
}); err != nil {
l.log.Warn("listener worker pool drained",
zap.Int("capacity", l.pool.Cap()))
}
- case notaryEvent, ok := <-notaryChan:
+ case notaryEvent, ok := <-chs.NotaryRequestsCh:
if !ok {
l.log.Warn("stop event listener by notary channel")
if intErr != nil {
@@ -271,13 +262,13 @@ loop:
continue loop
}
- if err = l.pool.Submit(func() {
+ if err := l.pool.Submit(func() {
l.parseAndHandleNotary(notaryEvent)
}); err != nil {
l.log.Warn("listener worker pool drained",
zap.Int("capacity", l.pool.Cap()))
}
- case b, ok := <-blockChan:
+ case b, ok := <-chs.BlockCh:
if !ok {
l.log.Warn("stop event listener by block channel")
if intErr != nil {
@@ -290,7 +281,7 @@ loop:
continue loop
}
- if err = l.pool.Submit(func() {
+ if err := l.pool.Submit(func() {
for i := range l.blockHandlers {
l.blockHandlers[i](b)
}
diff --git a/pkg/morph/subscriber/subscriber.go b/pkg/morph/subscriber/subscriber.go
index 6229e6f30..17bed5b2d 100644
--- a/pkg/morph/subscriber/subscriber.go
+++ b/pkg/morph/subscriber/subscriber.go
@@ -17,12 +17,21 @@ import (
)
type (
+ NotificationChannels struct {
+ BlockCh <-chan *block.Block
+ NotificationsCh <-chan *state.ContainedNotificationEvent
+ NotaryRequestsCh <-chan *result.NotaryRequestEvent
+ }
+
// Subscriber is an interface of the NotificationEvent listener.
Subscriber interface {
- SubscribeForNotification(...util.Uint160) (<-chan *state.ContainedNotificationEvent, error)
+ SubscribeForNotification(...util.Uint160) error
UnsubscribeForNotification()
- BlockNotifications() (<-chan *block.Block, error)
- SubscribeForNotaryRequests(mainTXSigner util.Uint160) (<-chan *result.NotaryRequestEvent, error)
+ BlockNotifications() error
+ SubscribeForNotaryRequests(mainTXSigner util.Uint160) error
+
+ NotificationChannels() NotificationChannels
+
Close()
}
@@ -46,6 +55,14 @@ type (
}
)
+func (s *subscriber) NotificationChannels() NotificationChannels {
+ return NotificationChannels{
+ BlockCh: s.blockChan,
+ NotificationsCh: s.notifyChan,
+ NotaryRequestsCh: s.notaryChan,
+ }
+}
+
var (
errNilParams = errors.New("chain/subscriber: config was not provided to the constructor")
@@ -54,7 +71,7 @@ var (
errNilClient = errors.New("chain/subscriber: client was not provided to the constructor")
)
-func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) (<-chan *state.ContainedNotificationEvent, error) {
+func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error {
s.Lock()
defer s.Unlock()
@@ -69,14 +86,14 @@ func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) (<-chan
_ = s.client.UnsubscribeContract(hash)
}
- return nil, err
+ return err
}
// save notification id
notifyIDs[contracts[i]] = struct{}{}
}
- return s.notifyChan, nil
+ return nil
}
func (s *subscriber) UnsubscribeForNotification() {
@@ -91,20 +108,20 @@ func (s *subscriber) Close() {
s.client.Close()
}
-func (s *subscriber) BlockNotifications() (<-chan *block.Block, error) {
+func (s *subscriber) BlockNotifications() error {
if err := s.client.SubscribeForNewBlocks(); err != nil {
- return nil, fmt.Errorf("could not subscribe for new block events: %w", err)
+ return fmt.Errorf("could not subscribe for new block events: %w", err)
}
- return s.blockChan, nil
+ return nil
}
-func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) (<-chan *result.NotaryRequestEvent, error) {
+func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error {
if err := s.client.SubscribeForNotaryRequests(mainTXSigner); err != nil {
- return nil, fmt.Errorf("could not subscribe for notary request events: %w", err)
+ return fmt.Errorf("could not subscribe for notary request events: %w", err)
}
- return s.notaryChan, nil
+ return nil
}
func (s *subscriber) routeNotifications(ctx context.Context) {