diff --git a/pkg/morph/event/listener.go b/pkg/morph/event/listener.go index 0bc7e89f..3de19932 100644 --- a/pkg/morph/event/listener.go +++ b/pkg/morph/event/listener.go @@ -9,6 +9,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/util" @@ -210,7 +211,6 @@ func (l *listener) subscribe(errCh chan error) { } } -// nolint: funlen, gocognit func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) { chs := l.subscriber.NotificationChannels() @@ -243,12 +243,7 @@ loop: continue loop } - if err := l.pool.Submit(func() { - l.parseAndHandleNotification(notifyEvent) - }); err != nil { - l.log.Warn("listener worker pool drained", - zap.Int("capacity", l.pool.Cap())) - } + l.handleNotifyEvent(notifyEvent) case notaryEvent, ok := <-chs.NotaryRequestsCh: if !ok { l.log.Warn("stop event listener by notary channel") @@ -262,12 +257,7 @@ loop: continue loop } - if err := l.pool.Submit(func() { - l.parseAndHandleNotary(notaryEvent) - }); err != nil { - l.log.Warn("listener worker pool drained", - zap.Int("capacity", l.pool.Cap())) - } + l.handleNotaryEvent(notaryEvent) case b, ok := <-chs.BlockCh: if !ok { l.log.Warn("stop event listener by block channel") @@ -281,18 +271,40 @@ loop: continue loop } - if err := l.pool.Submit(func() { - for i := range l.blockHandlers { - l.blockHandlers[i](b) - } - }); err != nil { - l.log.Warn("listener worker pool drained", - zap.Int("capacity", l.pool.Cap())) - } + l.handleBlockEvent(b) } } } +func (l *listener) handleBlockEvent(b *block.Block) { + if err := l.pool.Submit(func() { + for i := range l.blockHandlers { + l.blockHandlers[i](b) + } + }); err != nil { + l.log.Warn("listener worker pool drained", + zap.Int("capacity", l.pool.Cap())) + } +} + +func (l *listener) handleNotaryEvent(notaryEvent *result.NotaryRequestEvent) { + if err := l.pool.Submit(func() { + l.parseAndHandleNotary(notaryEvent) + }); err != nil { + l.log.Warn("listener worker pool drained", + zap.Int("capacity", l.pool.Cap())) + } +} + +func (l *listener) handleNotifyEvent(notifyEvent *state.ContainedNotificationEvent) { + if err := l.pool.Submit(func() { + l.parseAndHandleNotification(notifyEvent) + }); err != nil { + l.log.Warn("listener worker pool drained", + zap.Int("capacity", l.pool.Cap())) + } +} + func (l *listener) parseAndHandleNotification(notifyEvent *state.ContainedNotificationEvent) { log := l.log.With( zap.String("script hash LE", notifyEvent.ScriptHash.StringLE()),