forked from TrueCloudLab/frostfs-node
[#2272] morph: Do not subscribe to events without listening
It led to a neo-go dead-lock in the `subscriber` component. Subscribing to notifications is the same RPC as any others, so it could also be blocked forever if no async listening (reading the notification channel) routine exists. If a number of subscriptions is big enough (or a caller is lucky enough) subscribing loop might have not finished subscribing before the first notification is received and then: subscribing RPC is blocked by received notification (non)handling and listening notifications routine is blocked by not finished subscription loop. That commit starts listening notification channel _before_ any subscription actions. Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
This commit is contained in:
parent
2bdf7126b8
commit
a69c6d1ec9
3 changed files with 69 additions and 61 deletions
|
@ -44,7 +44,7 @@ Changelog for FrostFS Node
|
|||
- Possible deadlock in write-cache (#2239)
|
||||
- Fix `*_req_count` and `*_req_count_success` metric values (#2241)
|
||||
- Storage ID update by write-cache (#2244)
|
||||
- `neo-go` client deadlock on subscription restoration (#2244)
|
||||
- `neo-go` client deadlock on subscription (#2244, #2272)
|
||||
- Possible panic during write-cache initialization (#2234)
|
||||
- Do not fetch an object if `meta` is missing it (#61)
|
||||
- Create contract wallet only by `init` and `update-config` command (#63)
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
|
@ -158,6 +157,19 @@ func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
|
|||
}
|
||||
|
||||
func (l *listener) listen(ctx context.Context, intError chan<- error) error {
|
||||
// mark listener as started
|
||||
l.started = true
|
||||
|
||||
subErrCh := make(chan error)
|
||||
|
||||
go l.subscribe(subErrCh)
|
||||
|
||||
l.listenLoop(ctx, intError, subErrCh)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *listener) subscribe(errCh chan error) {
|
||||
// create the list of listening contract hashes
|
||||
hashes := make([]util.Uint160, 0)
|
||||
|
||||
|
@ -175,71 +187,50 @@ func (l *listener) listen(ctx context.Context, intError chan<- error) error {
|
|||
|
||||
hashes = append(hashes, hashType.ScriptHash())
|
||||
}
|
||||
|
||||
// mark listener as started
|
||||
l.started = true
|
||||
|
||||
l.mtx.RUnlock()
|
||||
|
||||
chEvent, err := l.subscriber.SubscribeForNotification(hashes...)
|
||||
err := l.subscriber.SubscribeForNotification(hashes...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l.listenLoop(ctx, chEvent, intError)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// nolint: funlen, gocognit
|
||||
func (l *listener) listenLoop(ctx context.Context, chEvent <-chan *state.ContainedNotificationEvent, intErr chan<- error) {
|
||||
var (
|
||||
blockChan <-chan *block.Block
|
||||
|
||||
notaryChan <-chan *result.NotaryRequestEvent
|
||||
|
||||
err error
|
||||
)
|
||||
|
||||
if len(l.blockHandlers) > 0 {
|
||||
if blockChan, err = l.subscriber.BlockNotifications(); err != nil {
|
||||
if intErr != nil {
|
||||
intErr <- fmt.Errorf("could not open block notifications channel: %w", err)
|
||||
} else {
|
||||
l.log.Debug("could not open block notifications channel",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
errCh <- fmt.Errorf("could not subscribe for notifications: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(l.blockHandlers) > 0 {
|
||||
if err = l.subscriber.BlockNotifications(); err != nil {
|
||||
errCh <- fmt.Errorf("could not subscribe for blocks: %w", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
blockChan = make(chan *block.Block)
|
||||
}
|
||||
|
||||
if l.listenNotary {
|
||||
if notaryChan, err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil {
|
||||
if intErr != nil {
|
||||
intErr <- fmt.Errorf("could not open notary notifications channel: %w", err)
|
||||
} else {
|
||||
l.log.Debug("could not open notary notifications channel",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
if err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil {
|
||||
errCh <- fmt.Errorf("could not subscribe for notary requests: %w", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// nolint: funlen, gocognit
|
||||
func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) {
|
||||
chs := l.subscriber.NotificationChannels()
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case err := <-subErrCh:
|
||||
if intErr != nil {
|
||||
intErr <- err
|
||||
} else {
|
||||
l.log.Error("stop event listener by error", zap.Error(err))
|
||||
}
|
||||
|
||||
break loop
|
||||
case <-ctx.Done():
|
||||
l.log.Info("stop event listener by context",
|
||||
zap.String("reason", ctx.Err().Error()),
|
||||
)
|
||||
break loop
|
||||
case notifyEvent, ok := <-chEvent:
|
||||
case notifyEvent, ok := <-chs.NotificationsCh:
|
||||
if !ok {
|
||||
l.log.Warn("stop event listener by notification channel")
|
||||
if intErr != nil {
|
||||
|
@ -252,13 +243,13 @@ loop:
|
|||
continue loop
|
||||
}
|
||||
|
||||
if err = l.pool.Submit(func() {
|
||||
if err := l.pool.Submit(func() {
|
||||
l.parseAndHandleNotification(notifyEvent)
|
||||
}); err != nil {
|
||||
l.log.Warn("listener worker pool drained",
|
||||
zap.Int("capacity", l.pool.Cap()))
|
||||
}
|
||||
case notaryEvent, ok := <-notaryChan:
|
||||
case notaryEvent, ok := <-chs.NotaryRequestsCh:
|
||||
if !ok {
|
||||
l.log.Warn("stop event listener by notary channel")
|
||||
if intErr != nil {
|
||||
|
@ -271,13 +262,13 @@ loop:
|
|||
continue loop
|
||||
}
|
||||
|
||||
if err = l.pool.Submit(func() {
|
||||
if err := l.pool.Submit(func() {
|
||||
l.parseAndHandleNotary(notaryEvent)
|
||||
}); err != nil {
|
||||
l.log.Warn("listener worker pool drained",
|
||||
zap.Int("capacity", l.pool.Cap()))
|
||||
}
|
||||
case b, ok := <-blockChan:
|
||||
case b, ok := <-chs.BlockCh:
|
||||
if !ok {
|
||||
l.log.Warn("stop event listener by block channel")
|
||||
if intErr != nil {
|
||||
|
@ -290,7 +281,7 @@ loop:
|
|||
continue loop
|
||||
}
|
||||
|
||||
if err = l.pool.Submit(func() {
|
||||
if err := l.pool.Submit(func() {
|
||||
for i := range l.blockHandlers {
|
||||
l.blockHandlers[i](b)
|
||||
}
|
||||
|
|
|
@ -17,12 +17,21 @@ import (
|
|||
)
|
||||
|
||||
type (
|
||||
NotificationChannels struct {
|
||||
BlockCh <-chan *block.Block
|
||||
NotificationsCh <-chan *state.ContainedNotificationEvent
|
||||
NotaryRequestsCh <-chan *result.NotaryRequestEvent
|
||||
}
|
||||
|
||||
// Subscriber is an interface of the NotificationEvent listener.
|
||||
Subscriber interface {
|
||||
SubscribeForNotification(...util.Uint160) (<-chan *state.ContainedNotificationEvent, error)
|
||||
SubscribeForNotification(...util.Uint160) error
|
||||
UnsubscribeForNotification()
|
||||
BlockNotifications() (<-chan *block.Block, error)
|
||||
SubscribeForNotaryRequests(mainTXSigner util.Uint160) (<-chan *result.NotaryRequestEvent, error)
|
||||
BlockNotifications() error
|
||||
SubscribeForNotaryRequests(mainTXSigner util.Uint160) error
|
||||
|
||||
NotificationChannels() NotificationChannels
|
||||
|
||||
Close()
|
||||
}
|
||||
|
||||
|
@ -46,6 +55,14 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
func (s *subscriber) NotificationChannels() NotificationChannels {
|
||||
return NotificationChannels{
|
||||
BlockCh: s.blockChan,
|
||||
NotificationsCh: s.notifyChan,
|
||||
NotaryRequestsCh: s.notaryChan,
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
errNilParams = errors.New("chain/subscriber: config was not provided to the constructor")
|
||||
|
||||
|
@ -54,7 +71,7 @@ var (
|
|||
errNilClient = errors.New("chain/subscriber: client was not provided to the constructor")
|
||||
)
|
||||
|
||||
func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) (<-chan *state.ContainedNotificationEvent, error) {
|
||||
func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
|
@ -69,14 +86,14 @@ func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) (<-chan
|
|||
_ = s.client.UnsubscribeContract(hash)
|
||||
}
|
||||
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
// save notification id
|
||||
notifyIDs[contracts[i]] = struct{}{}
|
||||
}
|
||||
|
||||
return s.notifyChan, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *subscriber) UnsubscribeForNotification() {
|
||||
|
@ -91,20 +108,20 @@ func (s *subscriber) Close() {
|
|||
s.client.Close()
|
||||
}
|
||||
|
||||
func (s *subscriber) BlockNotifications() (<-chan *block.Block, error) {
|
||||
func (s *subscriber) BlockNotifications() error {
|
||||
if err := s.client.SubscribeForNewBlocks(); err != nil {
|
||||
return nil, fmt.Errorf("could not subscribe for new block events: %w", err)
|
||||
return fmt.Errorf("could not subscribe for new block events: %w", err)
|
||||
}
|
||||
|
||||
return s.blockChan, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) (<-chan *result.NotaryRequestEvent, error) {
|
||||
func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error {
|
||||
if err := s.client.SubscribeForNotaryRequests(mainTXSigner); err != nil {
|
||||
return nil, fmt.Errorf("could not subscribe for notary request events: %w", err)
|
||||
return fmt.Errorf("could not subscribe for notary request events: %w", err)
|
||||
}
|
||||
|
||||
return s.notaryChan, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *subscriber) routeNotifications(ctx context.Context) {
|
||||
|
|
Loading…
Reference in a new issue