From 01e69f2f7a63725534c62dd77a9a60cb1039a935 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 1 Apr 2022 17:18:14 +0300 Subject: [PATCH] [#1170] pkg/morph: Add worker pool Add worker pool to the listener to prevent blocking. It is used only for notary notifications and new block events handling since it uses RPC calls. That may lead to the deadlock state: neo-go cannot send RPC until notification channel is read but notification channel cannot be read since neo-go client cannot send RPC. Signed-off-by: Pavel Karpy --- cmd/neofs-node/morph.go | 13 +++++++++++-- pkg/innerring/innerring.go | 13 +++++++++++-- pkg/morph/event/listener.go | 37 ++++++++++++++++++++++++++++++++++--- 3 files changed, 56 insertions(+), 7 deletions(-) diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index ded9832be0..ec00a1646e 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -173,6 +173,14 @@ func waitNotaryDeposit(c *cfg, tx util.Uint256) error { } func listenMorphNotifications(c *cfg) { + // listenerPoolCap is a capacity of a + // worker pool inside the listener. It + // is used to prevent blocking in neo-go: + // the client cannot make RPC requests if + // the notification channel is not being + // read by another goroutine. + const listenerPoolCap = 10 + var ( err error subs subscriber.Subscriber @@ -192,8 +200,9 @@ func listenMorphNotifications(c *cfg) { fatalOnErr(err) lis, err := event.NewListener(event.ListenerParams{ - Logger: c.log, - Subscriber: subs, + Logger: c.log, + Subscriber: subs, + WorkerPoolCapacity: listenerPoolCap, }) fatalOnErr(err) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index d10b4f52b6..34a36ca3e2 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -921,6 +921,14 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<- } func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) { + // listenerPoolCap is a capacity of a + // worker pool inside the listener. It + // is used to prevent blocking in neo-go: + // the client cannot make RPC requests if + // the notification channel is not being + // read by another goroutine. + const listenerPoolCap = 10 + var ( sub subscriber.Subscriber err error @@ -936,8 +944,9 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev } listener, err := event.NewListener(event.ListenerParams{ - Logger: p.log, - Subscriber: sub, + Logger: p.log.With(zap.String("chain", p.name)), + Subscriber: sub, + WorkerPoolCapacity: listenerPoolCap, }) if err != nil { return nil, err diff --git a/pkg/morph/event/listener.go b/pkg/morph/event/listener.go index e3a9a012f7..0a5f96af1c 100644 --- a/pkg/morph/event/listener.go +++ b/pkg/morph/event/listener.go @@ -11,6 +11,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/subscriber" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -85,6 +86,8 @@ type ListenerParams struct { Logger *zap.Logger Subscriber subscriber.Subscriber + + WorkerPoolCapacity int } type listener struct { @@ -108,6 +111,8 @@ type listener struct { subscriber subscriber.Subscriber blockHandlers []BlockHandler + + pool *ants.Pool } const newListenerFailMsg = "could not instantiate Listener" @@ -258,7 +263,12 @@ loop: continue loop } - l.parseAndHandleNotary(notaryEvent) + if err = l.pool.Submit(func() { + l.parseAndHandleNotary(notaryEvent) + }); err != nil { + l.log.Warn("listener worker pool drained", + zap.Int("capacity", l.pool.Cap())) + } case b, ok := <-blockChan: if !ok { l.log.Warn("stop event listener by block channel") @@ -272,8 +282,13 @@ loop: continue loop } - for i := range l.blockHandlers { - l.blockHandlers[i](b) + if err = l.pool.Submit(func() { + for i := range l.blockHandlers { + l.blockHandlers[i](b) + } + }); err != nil { + l.log.Warn("listener worker pool drained", + zap.Int("capacity", l.pool.Cap())) } } } @@ -586,6 +601,11 @@ func (l *listener) RegisterBlockHandler(handler BlockHandler) { // NewListener create the notification event listener instance and returns Listener interface. func NewListener(p ListenerParams) (Listener, error) { + // defaultPoolCap is a default worker + // pool capacity if it was not specified + // via params + const defaultPoolCap = 10 + switch { case p.Logger == nil: return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilLogger) @@ -593,10 +613,21 @@ func NewListener(p ListenerParams) (Listener, error) { return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilSubscriber) } + poolCap := p.WorkerPoolCapacity + if poolCap == 0 { + poolCap = defaultPoolCap + } + + pool, err := ants.NewPool(poolCap, ants.WithNonblocking(true)) + if err != nil { + return nil, fmt.Errorf("could not init worker pool: %w", err) + } + return &listener{ notificationParsers: make(map[scriptHashWithType]NotificationParser), notificationHandlers: make(map[scriptHashWithType][]Handler), log: p.Logger, subscriber: p.Subscriber, + pool: pool, }, nil }