morph: use blocking unlimited pool for notifications #198

Merged
fyrchik merged 1 commit from fyrchik/frostfs-node:fix-do-not-drop-epoch into master 2023-04-03 15:58:23 +00:00
3 changed files with 7 additions and 33 deletions

View file

@ -181,14 +181,6 @@ func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256) error {
}
func listenMorphNotifications(ctx context.Context, c *cfg) {
// listenerPoolCap is a capacity of a
// worker pool inside the listener. It
// is used to prevent blocking in neo-go:
// the client cannot make RPC requests if
// the notification channel is not being
// read by another goroutine.
const listenerPoolCap = 10
var (
err error
subs subscriber.Subscriber
@ -208,9 +200,8 @@ func listenMorphNotifications(ctx context.Context, c *cfg) {
fatalOnErr(err)
lis, err := event.NewListener(event.ListenerParams{
Logger: c.log,
Subscriber: subs,
WorkerPoolCapacity: listenerPoolCap,
Logger: c.log,
Subscriber: subs,
})
fatalOnErr(err)

View file

@ -407,14 +407,6 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
}
func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
// listenerPoolCap is a capacity of a
// worker pool inside the listener. It
// is used to prevent blocking in neo-go:
// the client cannot make RPC requests if
// the notification channel is not being
// read by another goroutine.
const listenerPoolCap = 10
var (
sub subscriber.Subscriber
err error
@ -430,9 +422,8 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev
}
listener, err := event.NewListener(event.ListenerParams{
Logger: &logger.Logger{Logger: p.log.With(zap.String("chain", p.name))},
Subscriber: sub,
WorkerPoolCapacity: listenerPoolCap,
Logger: &logger.Logger{Logger: p.log.With(zap.String("chain", p.name))},
Subscriber: sub,
})
if err != nil {
return nil, err

View file

@ -609,11 +609,6 @@ func (l *listener) RegisterBlockHandler(handler BlockHandler) {
// NewListener create the notification event listener instance and returns Listener interface.
func NewListener(p ListenerParams) (Listener, error) {
// defaultPoolCap is a default worker
// pool capacity if it was not specified
// via params
const defaultPoolCap = 10
switch {
case p.Logger == nil:
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilLogger)
@ -621,12 +616,9 @@ func NewListener(p ListenerParams) (Listener, error) {
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilSubscriber)
}
poolCap := p.WorkerPoolCapacity
if poolCap == 0 {
poolCap = defaultPoolCap
}
pool, err := ants.NewPool(poolCap, ants.WithNonblocking(true))
// The pool here must be blocking, otherwise notifications could be dropped.
// The default capacity is 0, which means "infinite".
pool, err := ants.NewPool(p.WorkerPoolCapacity)
if err != nil {
return nil, fmt.Errorf("could not init worker pool: %w", err)
}