morph: use blocking unlimited pool for notifications #198

Merged
fyrchik merged 1 commit from fyrchik/frostfs-node:fix-do-not-drop-epoch into master 2023-04-03 15:58:23 +00:00
3 changed files with 7 additions and 33 deletions

View file

@ -181,14 +181,6 @@ func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256) error {
} }
func listenMorphNotifications(ctx context.Context, c *cfg) { func listenMorphNotifications(ctx context.Context, c *cfg) {
// listenerPoolCap is a capacity of a
// worker pool inside the listener. It
// is used to prevent blocking in neo-go:
// the client cannot make RPC requests if
// the notification channel is not being
// read by another goroutine.
const listenerPoolCap = 10
var ( var (
err error err error
subs subscriber.Subscriber subs subscriber.Subscriber
@ -208,9 +200,8 @@ func listenMorphNotifications(ctx context.Context, c *cfg) {
fatalOnErr(err) fatalOnErr(err)
lis, err := event.NewListener(event.ListenerParams{ lis, err := event.NewListener(event.ListenerParams{
Logger: c.log, Logger: c.log,
Subscriber: subs, Subscriber: subs,
WorkerPoolCapacity: listenerPoolCap,
}) })
fatalOnErr(err) fatalOnErr(err)

View file

@ -407,14 +407,6 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
} }
func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) { func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
// listenerPoolCap is a capacity of a
// worker pool inside the listener. It
// is used to prevent blocking in neo-go:
// the client cannot make RPC requests if
// the notification channel is not being
// read by another goroutine.
const listenerPoolCap = 10
var ( var (
sub subscriber.Subscriber sub subscriber.Subscriber
err error err error
@ -430,9 +422,8 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev
} }
listener, err := event.NewListener(event.ListenerParams{ listener, err := event.NewListener(event.ListenerParams{
Logger: &logger.Logger{Logger: p.log.With(zap.String("chain", p.name))}, Logger: &logger.Logger{Logger: p.log.With(zap.String("chain", p.name))},
Subscriber: sub, Subscriber: sub,
WorkerPoolCapacity: listenerPoolCap,
}) })
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -609,11 +609,6 @@ func (l *listener) RegisterBlockHandler(handler BlockHandler) {
// NewListener create the notification event listener instance and returns Listener interface. // NewListener create the notification event listener instance and returns Listener interface.
func NewListener(p ListenerParams) (Listener, error) { func NewListener(p ListenerParams) (Listener, error) {
// defaultPoolCap is a default worker
// pool capacity if it was not specified
// via params
const defaultPoolCap = 10
switch { switch {
case p.Logger == nil: case p.Logger == nil:
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilLogger) return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilLogger)
@ -621,12 +616,9 @@ func NewListener(p ListenerParams) (Listener, error) {
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilSubscriber) return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilSubscriber)
} }
poolCap := p.WorkerPoolCapacity // The pool here must be blocking, otherwise notifications could be dropped.
if poolCap == 0 { // The default capacity is 0, which means "infinite".
poolCap = defaultPoolCap pool, err := ants.NewPool(p.WorkerPoolCapacity)
}
pool, err := ants.NewPool(poolCap, ants.WithNonblocking(true))
if err != nil { if err != nil {
return nil, fmt.Errorf("could not init worker pool: %w", err) return nil, fmt.Errorf("could not init worker pool: %w", err)
} }