diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index ded9832b..ec00a164 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -173,6 +173,14 @@ func waitNotaryDeposit(c *cfg, tx util.Uint256) error { } func listenMorphNotifications(c *cfg) { + // listenerPoolCap is a capacity of a + // worker pool inside the listener. It + // is used to prevent blocking in neo-go: + // the client cannot make RPC requests if + // the notification channel is not being + // read by another goroutine. + const listenerPoolCap = 10 + var ( err error subs subscriber.Subscriber @@ -192,8 +200,9 @@ func listenMorphNotifications(c *cfg) { fatalOnErr(err) lis, err := event.NewListener(event.ListenerParams{ - Logger: c.log, - Subscriber: subs, + Logger: c.log, + Subscriber: subs, + WorkerPoolCapacity: listenerPoolCap, }) fatalOnErr(err) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index d10b4f52..34a36ca3 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -921,6 +921,14 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<- } func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) { + // listenerPoolCap is a capacity of a + // worker pool inside the listener. It + // is used to prevent blocking in neo-go: + // the client cannot make RPC requests if + // the notification channel is not being + // read by another goroutine. + const listenerPoolCap = 10 + var ( sub subscriber.Subscriber err error @@ -936,8 +944,9 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev } listener, err := event.NewListener(event.ListenerParams{ - Logger: p.log, - Subscriber: sub, + Logger: p.log.With(zap.String("chain", p.name)), + Subscriber: sub, + WorkerPoolCapacity: listenerPoolCap, }) if err != nil { return nil, err diff --git a/pkg/morph/event/listener.go b/pkg/morph/event/listener.go index e3a9a012..0a5f96af 100644 --- a/pkg/morph/event/listener.go +++ b/pkg/morph/event/listener.go @@ -11,6 +11,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/subscriber" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -85,6 +86,8 @@ type ListenerParams struct { Logger *zap.Logger Subscriber subscriber.Subscriber + + WorkerPoolCapacity int } type listener struct { @@ -108,6 +111,8 @@ type listener struct { subscriber subscriber.Subscriber blockHandlers []BlockHandler + + pool *ants.Pool } const newListenerFailMsg = "could not instantiate Listener" @@ -258,7 +263,12 @@ loop: continue loop } - l.parseAndHandleNotary(notaryEvent) + if err = l.pool.Submit(func() { + l.parseAndHandleNotary(notaryEvent) + }); err != nil { + l.log.Warn("listener worker pool drained", + zap.Int("capacity", l.pool.Cap())) + } case b, ok := <-blockChan: if !ok { l.log.Warn("stop event listener by block channel") @@ -272,8 +282,13 @@ loop: continue loop } - for i := range l.blockHandlers { - l.blockHandlers[i](b) + if err = l.pool.Submit(func() { + for i := range l.blockHandlers { + l.blockHandlers[i](b) + } + }); err != nil { + l.log.Warn("listener worker pool drained", + zap.Int("capacity", l.pool.Cap())) } } } @@ -586,6 +601,11 @@ func (l *listener) RegisterBlockHandler(handler BlockHandler) { // NewListener create the notification event listener instance and returns Listener interface. func NewListener(p ListenerParams) (Listener, error) { + // defaultPoolCap is a default worker + // pool capacity if it was not specified + // via params + const defaultPoolCap = 10 + switch { case p.Logger == nil: return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilLogger) @@ -593,10 +613,21 @@ func NewListener(p ListenerParams) (Listener, error) { return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilSubscriber) } + poolCap := p.WorkerPoolCapacity + if poolCap == 0 { + poolCap = defaultPoolCap + } + + pool, err := ants.NewPool(poolCap, ants.WithNonblocking(true)) + if err != nil { + return nil, fmt.Errorf("could not init worker pool: %w", err) + } + return &listener{ notificationParsers: make(map[scriptHashWithType]NotificationParser), notificationHandlers: make(map[scriptHashWithType][]Handler), log: p.Logger, subscriber: p.Subscriber, + pool: pool, }, nil }