From 03bef2bc98d1e156be0153ec5a2f6eba5bf9ff5d Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 3 Apr 2023 10:44:05 +0300 Subject: [PATCH] [#195] morph: use blocking unlimited pool for notifications With non-blocking pool restricted by 10 in capacity, the probability of dropping events is unexpectedly big. Notifications are an essential part of the FrostFS, we should not drop anything, especially new epochs. ``` Mar 31 07:07:03 vedi neofs-ir[19164]: 2023-03-31T07:07:03.901Z debug subscriber/subscriber.go:154 new notification event from sidechain {"name": "NewEpoch"} Mar 31 07:07:03 vedi neofs-ir[19164]: 2023-03-31T07:07:03.901Z warn event/listener.go:248 listener worker pool drained {"chain": "morph", "capacity": 10} ``` Signed-off-by: Evgenii Stratonikov --- cmd/frostfs-node/morph.go | 13 ++----------- pkg/innerring/innerring.go | 13 ++----------- pkg/morph/event/listener.go | 14 +++----------- 3 files changed, 7 insertions(+), 33 deletions(-) diff --git a/cmd/frostfs-node/morph.go b/cmd/frostfs-node/morph.go index 439de3a9e..2dfbe5c18 100644 --- a/cmd/frostfs-node/morph.go +++ b/cmd/frostfs-node/morph.go @@ -181,14 +181,6 @@ func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256) error { } func listenMorphNotifications(ctx context.Context, c *cfg) { - // listenerPoolCap is a capacity of a - // worker pool inside the listener. It - // is used to prevent blocking in neo-go: - // the client cannot make RPC requests if - // the notification channel is not being - // read by another goroutine. - const listenerPoolCap = 10 - var ( err error subs subscriber.Subscriber @@ -208,9 +200,8 @@ func listenMorphNotifications(ctx context.Context, c *cfg) { fatalOnErr(err) lis, err := event.NewListener(event.ListenerParams{ - Logger: c.log, - Subscriber: subs, - WorkerPoolCapacity: listenerPoolCap, + Logger: c.log, + Subscriber: subs, }) fatalOnErr(err) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 063d0f7cd..49ffdb798 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -407,14 +407,6 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan } func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) { - // listenerPoolCap is a capacity of a - // worker pool inside the listener. It - // is used to prevent blocking in neo-go: - // the client cannot make RPC requests if - // the notification channel is not being - // read by another goroutine. - const listenerPoolCap = 10 - var ( sub subscriber.Subscriber err error @@ -430,9 +422,8 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev } listener, err := event.NewListener(event.ListenerParams{ - Logger: &logger.Logger{Logger: p.log.With(zap.String("chain", p.name))}, - Subscriber: sub, - WorkerPoolCapacity: listenerPoolCap, + Logger: &logger.Logger{Logger: p.log.With(zap.String("chain", p.name))}, + Subscriber: sub, }) if err != nil { return nil, err diff --git a/pkg/morph/event/listener.go b/pkg/morph/event/listener.go index ed2b95026..252c895a5 100644 --- a/pkg/morph/event/listener.go +++ b/pkg/morph/event/listener.go @@ -609,11 +609,6 @@ func (l *listener) RegisterBlockHandler(handler BlockHandler) { // NewListener create the notification event listener instance and returns Listener interface. func NewListener(p ListenerParams) (Listener, error) { - // defaultPoolCap is a default worker - // pool capacity if it was not specified - // via params - const defaultPoolCap = 10 - switch { case p.Logger == nil: return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilLogger) @@ -621,12 +616,9 @@ func NewListener(p ListenerParams) (Listener, error) { return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilSubscriber) } - poolCap := p.WorkerPoolCapacity - if poolCap == 0 { - poolCap = defaultPoolCap - } - - pool, err := ants.NewPool(poolCap, ants.WithNonblocking(true)) + // The pool here must be blocking, otherwise notifications could be dropped. + // The default capacity is 0, which means "infinite". + pool, err := ants.NewPool(p.WorkerPoolCapacity) if err != nil { return nil, fmt.Errorf("could not init worker pool: %w", err) } -- 2.45.2