diff --git a/CHANGELOG.md b/CHANGELOG.md index ed92f3a5..6793ed34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,7 +44,7 @@ Changelog for FrostFS Node - Possible deadlock in write-cache (#2239) - Fix `*_req_count` and `*_req_count_success` metric values (#2241) - Storage ID update by write-cache (#2244) -- `neo-go` client deadlock on subscription restoration (#2244) +- `neo-go` client deadlock on subscription (#2244, #2272) - Possible panic during write-cache initialization (#2234) - Do not fetch an object if `meta` is missing it (#61) - Create contract wallet only by `init` and `update-config` command (#63) diff --git a/pkg/morph/event/listener.go b/pkg/morph/event/listener.go index ed2b9502..64fdc3df 100644 --- a/pkg/morph/event/listener.go +++ b/pkg/morph/event/listener.go @@ -9,7 +9,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/util" @@ -158,6 +157,19 @@ func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) { } func (l *listener) listen(ctx context.Context, intError chan<- error) error { + // mark listener as started + l.started = true + + subErrCh := make(chan error) + + go l.subscribe(subErrCh) + + l.listenLoop(ctx, intError, subErrCh) + + return nil +} + +func (l *listener) subscribe(errCh chan error) { // create the list of listening contract hashes hashes := make([]util.Uint160, 0) @@ -175,71 +187,50 @@ func (l *listener) listen(ctx context.Context, intError chan<- error) error { hashes = append(hashes, hashType.ScriptHash()) } - - // mark listener as started - l.started = true - l.mtx.RUnlock() - chEvent, err := l.subscriber.SubscribeForNotification(hashes...) + err := l.subscriber.SubscribeForNotification(hashes...) if err != nil { - return err + errCh <- fmt.Errorf("could not subscribe for notifications: %w", err) + return } - l.listenLoop(ctx, chEvent, intError) - - return nil -} - -// nolint: funlen, gocognit -func (l *listener) listenLoop(ctx context.Context, chEvent <-chan *state.ContainedNotificationEvent, intErr chan<- error) { - var ( - blockChan <-chan *block.Block - - notaryChan <-chan *result.NotaryRequestEvent - - err error - ) - if len(l.blockHandlers) > 0 { - if blockChan, err = l.subscriber.BlockNotifications(); err != nil { - if intErr != nil { - intErr <- fmt.Errorf("could not open block notifications channel: %w", err) - } else { - l.log.Debug("could not open block notifications channel", - zap.String("error", err.Error()), - ) - } - + if err = l.subscriber.BlockNotifications(); err != nil { + errCh <- fmt.Errorf("could not subscribe for blocks: %w", err) return } - } else { - blockChan = make(chan *block.Block) } if l.listenNotary { - if notaryChan, err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil { - if intErr != nil { - intErr <- fmt.Errorf("could not open notary notifications channel: %w", err) - } else { - l.log.Debug("could not open notary notifications channel", - zap.String("error", err.Error()), - ) - } - + if err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil { + errCh <- fmt.Errorf("could not subscribe for notary requests: %w", err) return } } +} + +// nolint: funlen, gocognit +func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) { + chs := l.subscriber.NotificationChannels() loop: for { select { + case err := <-subErrCh: + if intErr != nil { + intErr <- err + } else { + l.log.Error("stop event listener by error", zap.Error(err)) + } + + break loop case <-ctx.Done(): l.log.Info("stop event listener by context", zap.String("reason", ctx.Err().Error()), ) break loop - case notifyEvent, ok := <-chEvent: + case notifyEvent, ok := <-chs.NotificationsCh: if !ok { l.log.Warn("stop event listener by notification channel") if intErr != nil { @@ -252,13 +243,13 @@ loop: continue loop } - if err = l.pool.Submit(func() { + if err := l.pool.Submit(func() { l.parseAndHandleNotification(notifyEvent) }); err != nil { l.log.Warn("listener worker pool drained", zap.Int("capacity", l.pool.Cap())) } - case notaryEvent, ok := <-notaryChan: + case notaryEvent, ok := <-chs.NotaryRequestsCh: if !ok { l.log.Warn("stop event listener by notary channel") if intErr != nil { @@ -271,13 +262,13 @@ loop: continue loop } - if err = l.pool.Submit(func() { + if err := l.pool.Submit(func() { l.parseAndHandleNotary(notaryEvent) }); err != nil { l.log.Warn("listener worker pool drained", zap.Int("capacity", l.pool.Cap())) } - case b, ok := <-blockChan: + case b, ok := <-chs.BlockCh: if !ok { l.log.Warn("stop event listener by block channel") if intErr != nil { @@ -290,7 +281,7 @@ loop: continue loop } - if err = l.pool.Submit(func() { + if err := l.pool.Submit(func() { for i := range l.blockHandlers { l.blockHandlers[i](b) } diff --git a/pkg/morph/subscriber/subscriber.go b/pkg/morph/subscriber/subscriber.go index 6229e6f3..17bed5b2 100644 --- a/pkg/morph/subscriber/subscriber.go +++ b/pkg/morph/subscriber/subscriber.go @@ -17,12 +17,21 @@ import ( ) type ( + NotificationChannels struct { + BlockCh <-chan *block.Block + NotificationsCh <-chan *state.ContainedNotificationEvent + NotaryRequestsCh <-chan *result.NotaryRequestEvent + } + // Subscriber is an interface of the NotificationEvent listener. Subscriber interface { - SubscribeForNotification(...util.Uint160) (<-chan *state.ContainedNotificationEvent, error) + SubscribeForNotification(...util.Uint160) error UnsubscribeForNotification() - BlockNotifications() (<-chan *block.Block, error) - SubscribeForNotaryRequests(mainTXSigner util.Uint160) (<-chan *result.NotaryRequestEvent, error) + BlockNotifications() error + SubscribeForNotaryRequests(mainTXSigner util.Uint160) error + + NotificationChannels() NotificationChannels + Close() } @@ -46,6 +55,14 @@ type ( } ) +func (s *subscriber) NotificationChannels() NotificationChannels { + return NotificationChannels{ + BlockCh: s.blockChan, + NotificationsCh: s.notifyChan, + NotaryRequestsCh: s.notaryChan, + } +} + var ( errNilParams = errors.New("chain/subscriber: config was not provided to the constructor") @@ -54,7 +71,7 @@ var ( errNilClient = errors.New("chain/subscriber: client was not provided to the constructor") ) -func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) (<-chan *state.ContainedNotificationEvent, error) { +func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error { s.Lock() defer s.Unlock() @@ -69,14 +86,14 @@ func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) (<-chan _ = s.client.UnsubscribeContract(hash) } - return nil, err + return err } // save notification id notifyIDs[contracts[i]] = struct{}{} } - return s.notifyChan, nil + return nil } func (s *subscriber) UnsubscribeForNotification() { @@ -91,20 +108,20 @@ func (s *subscriber) Close() { s.client.Close() } -func (s *subscriber) BlockNotifications() (<-chan *block.Block, error) { +func (s *subscriber) BlockNotifications() error { if err := s.client.SubscribeForNewBlocks(); err != nil { - return nil, fmt.Errorf("could not subscribe for new block events: %w", err) + return fmt.Errorf("could not subscribe for new block events: %w", err) } - return s.blockChan, nil + return nil } -func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) (<-chan *result.NotaryRequestEvent, error) { +func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error { if err := s.client.SubscribeForNotaryRequests(mainTXSigner); err != nil { - return nil, fmt.Errorf("could not subscribe for notary request events: %w", err) + return fmt.Errorf("could not subscribe for notary request events: %w", err) } - return s.notaryChan, nil + return nil } func (s *subscriber) routeNotifications(ctx context.Context) {