diff --git a/cmd/frostfs-node/morph.go b/cmd/frostfs-node/morph.go index 439de3a9e2..2dfbe5c18c 100644 --- a/cmd/frostfs-node/morph.go +++ b/cmd/frostfs-node/morph.go @@ -181,14 +181,6 @@ func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256) error { } func listenMorphNotifications(ctx context.Context, c *cfg) { - // listenerPoolCap is a capacity of a - // worker pool inside the listener. It - // is used to prevent blocking in neo-go: - // the client cannot make RPC requests if - // the notification channel is not being - // read by another goroutine. - const listenerPoolCap = 10 - var ( err error subs subscriber.Subscriber @@ -208,9 +200,8 @@ func listenMorphNotifications(ctx context.Context, c *cfg) { fatalOnErr(err) lis, err := event.NewListener(event.ListenerParams{ - Logger: c.log, - Subscriber: subs, - WorkerPoolCapacity: listenerPoolCap, + Logger: c.log, + Subscriber: subs, }) fatalOnErr(err) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 38023932f4..3b42a5853f 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -409,14 +409,6 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan } func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) { - // listenerPoolCap is a capacity of a - // worker pool inside the listener. It - // is used to prevent blocking in neo-go: - // the client cannot make RPC requests if - // the notification channel is not being - // read by another goroutine. - const listenerPoolCap = 10 - var ( sub subscriber.Subscriber err error @@ -432,9 +424,8 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev } listener, err := event.NewListener(event.ListenerParams{ - Logger: &logger.Logger{Logger: p.log.With(zap.String("chain", p.name))}, - Subscriber: sub, - WorkerPoolCapacity: listenerPoolCap, + Logger: &logger.Logger{Logger: p.log.With(zap.String("chain", p.name))}, + Subscriber: sub, }) if err != nil { return nil, err diff --git a/pkg/morph/event/listener.go b/pkg/morph/event/listener.go index 64fdc3df3b..0bc7e89f88 100644 --- a/pkg/morph/event/listener.go +++ b/pkg/morph/event/listener.go @@ -600,11 +600,6 @@ func (l *listener) RegisterBlockHandler(handler BlockHandler) { // NewListener create the notification event listener instance and returns Listener interface. func NewListener(p ListenerParams) (Listener, error) { - // defaultPoolCap is a default worker - // pool capacity if it was not specified - // via params - const defaultPoolCap = 10 - switch { case p.Logger == nil: return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilLogger) @@ -612,12 +607,9 @@ func NewListener(p ListenerParams) (Listener, error) { return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilSubscriber) } - poolCap := p.WorkerPoolCapacity - if poolCap == 0 { - poolCap = defaultPoolCap - } - - pool, err := ants.NewPool(poolCap, ants.WithNonblocking(true)) + // The pool here must be blocking, otherwise notifications could be dropped. + // The default capacity is 0, which means "infinite". + pool, err := ants.NewPool(p.WorkerPoolCapacity) if err != nil { return nil, fmt.Errorf("could not init worker pool: %w", err) }