[#219] morph: Refactor moprh event listener

Resolve funlen and gocognit linters for listener.listenLoop method.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-04-05 18:11:56 +03:00
parent 775179f823
commit d07e40d6fe

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
@ -210,7 +211,6 @@ func (l *listener) subscribe(errCh chan error) {
} }
} }
// nolint: funlen, gocognit
func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) { func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) {
chs := l.subscriber.NotificationChannels() chs := l.subscriber.NotificationChannels()
@ -243,12 +243,7 @@ loop:
continue loop continue loop
} }
if err := l.pool.Submit(func() { l.handleNotifyEvent(notifyEvent)
l.parseAndHandleNotification(notifyEvent)
}); err != nil {
l.log.Warn("listener worker pool drained",
zap.Int("capacity", l.pool.Cap()))
}
case notaryEvent, ok := <-chs.NotaryRequestsCh: case notaryEvent, ok := <-chs.NotaryRequestsCh:
if !ok { if !ok {
l.log.Warn("stop event listener by notary channel") l.log.Warn("stop event listener by notary channel")
@ -262,12 +257,7 @@ loop:
continue loop continue loop
} }
if err := l.pool.Submit(func() { l.handleNotaryEvent(notaryEvent)
l.parseAndHandleNotary(notaryEvent)
}); err != nil {
l.log.Warn("listener worker pool drained",
zap.Int("capacity", l.pool.Cap()))
}
case b, ok := <-chs.BlockCh: case b, ok := <-chs.BlockCh:
if !ok { if !ok {
l.log.Warn("stop event listener by block channel") l.log.Warn("stop event listener by block channel")
@ -281,6 +271,12 @@ loop:
continue loop continue loop
} }
l.handleBlockEvent(b)
}
}
}
func (l *listener) handleBlockEvent(b *block.Block) {
if err := l.pool.Submit(func() { if err := l.pool.Submit(func() {
for i := range l.blockHandlers { for i := range l.blockHandlers {
l.blockHandlers[i](b) l.blockHandlers[i](b)
@ -289,7 +285,23 @@ loop:
l.log.Warn("listener worker pool drained", l.log.Warn("listener worker pool drained",
zap.Int("capacity", l.pool.Cap())) zap.Int("capacity", l.pool.Cap()))
} }
}
func (l *listener) handleNotaryEvent(notaryEvent *result.NotaryRequestEvent) {
if err := l.pool.Submit(func() {
l.parseAndHandleNotary(notaryEvent)
}); err != nil {
l.log.Warn("listener worker pool drained",
zap.Int("capacity", l.pool.Cap()))
} }
}
func (l *listener) handleNotifyEvent(notifyEvent *state.ContainedNotificationEvent) {
if err := l.pool.Submit(func() {
l.parseAndHandleNotification(notifyEvent)
}); err != nil {
l.log.Warn("listener worker pool drained",
zap.Int("capacity", l.pool.Cap()))
} }
} }