morph: use blocking unlimited pool for notifications #198
3 changed files with 7 additions and 33 deletions
|
@ -181,14 +181,6 @@ func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256) error {
|
|||
}
|
||||
|
||||
func listenMorphNotifications(ctx context.Context, c *cfg) {
|
||||
// listenerPoolCap is a capacity of a
|
||||
// worker pool inside the listener. It
|
||||
// is used to prevent blocking in neo-go:
|
||||
// the client cannot make RPC requests if
|
||||
// the notification channel is not being
|
||||
// read by another goroutine.
|
||||
const listenerPoolCap = 10
|
||||
|
||||
var (
|
||||
err error
|
||||
subs subscriber.Subscriber
|
||||
|
@ -208,9 +200,8 @@ func listenMorphNotifications(ctx context.Context, c *cfg) {
|
|||
fatalOnErr(err)
|
||||
|
||||
lis, err := event.NewListener(event.ListenerParams{
|
||||
Logger: c.log,
|
||||
Subscriber: subs,
|
||||
WorkerPoolCapacity: listenerPoolCap,
|
||||
Logger: c.log,
|
||||
Subscriber: subs,
|
||||
})
|
||||
fatalOnErr(err)
|
||||
|
||||
|
|
|
@ -407,14 +407,6 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
|||
}
|
||||
|
||||
func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
|
||||
// listenerPoolCap is a capacity of a
|
||||
// worker pool inside the listener. It
|
||||
// is used to prevent blocking in neo-go:
|
||||
// the client cannot make RPC requests if
|
||||
// the notification channel is not being
|
||||
// read by another goroutine.
|
||||
const listenerPoolCap = 10
|
||||
|
||||
var (
|
||||
sub subscriber.Subscriber
|
||||
err error
|
||||
|
@ -430,9 +422,8 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev
|
|||
}
|
||||
|
||||
listener, err := event.NewListener(event.ListenerParams{
|
||||
Logger: &logger.Logger{Logger: p.log.With(zap.String("chain", p.name))},
|
||||
Subscriber: sub,
|
||||
WorkerPoolCapacity: listenerPoolCap,
|
||||
Logger: &logger.Logger{Logger: p.log.With(zap.String("chain", p.name))},
|
||||
Subscriber: sub,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -609,11 +609,6 @@ func (l *listener) RegisterBlockHandler(handler BlockHandler) {
|
|||
|
||||
// NewListener create the notification event listener instance and returns Listener interface.
|
||||
func NewListener(p ListenerParams) (Listener, error) {
|
||||
// defaultPoolCap is a default worker
|
||||
// pool capacity if it was not specified
|
||||
// via params
|
||||
const defaultPoolCap = 10
|
||||
|
||||
switch {
|
||||
case p.Logger == nil:
|
||||
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilLogger)
|
||||
|
@ -621,12 +616,9 @@ func NewListener(p ListenerParams) (Listener, error) {
|
|||
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilSubscriber)
|
||||
}
|
||||
|
||||
poolCap := p.WorkerPoolCapacity
|
||||
if poolCap == 0 {
|
||||
poolCap = defaultPoolCap
|
||||
}
|
||||
|
||||
pool, err := ants.NewPool(poolCap, ants.WithNonblocking(true))
|
||||
// The pool here must be blocking, otherwise notifications could be dropped.
|
||||
// The default capacity is 0, which means "infinite".
|
||||
pool, err := ants.NewPool(p.WorkerPoolCapacity)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not init worker pool: %w", err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue