frostfs-node/pkg/morph/event/listener.go
Pavel Karpy 01e69f2f7a [#1170] pkg/morph: Add worker pool
Add worker pool to the listener to prevent blocking. It is used only for
notary notifications and new block events handling since it uses RPC
calls. That may lead to the deadlock state: neo-go cannot send RPC until
notification channel is read but notification channel cannot be read since
neo-go client cannot send RPC.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
2022-04-08 11:52:18 +03:00

633 lines
16 KiB
Go

package event
import (
"context"
"errors"
"fmt"
"sync"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/rpc/response/result/subscriptions"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/nspcc-dev/neofs-node/pkg/morph/subscriber"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// Listener is an interface of smart contract notification event listener.
type Listener interface {
// Listen must start the event listener.
//
// Must listen to events with the parser installed.
Listen(context.Context)
// ListenWithError must start the event listener.
//
// Must listen to events with the parser installed.
//
// Must send error to channel if subscriber channel has been closed or
// it could not be started.
ListenWithError(context.Context, chan<- error)
// SetNotificationParser must set the parser of particular contract event.
//
// Parser of each event must be set once. All parsers must be set before Listen call.
//
// Must ignore nil parsers and all calls after listener has been started.
SetNotificationParser(NotificationParserInfo)
// RegisterNotificationHandler must register the event handler for particular notification event of contract.
//
// The specified handler must be called after each capture and parsing of the event.
//
// Must ignore nil handlers.
RegisterNotificationHandler(NotificationHandlerInfo)
// EnableNotarySupport enables notary request listening. Passed hash is
// notary mainTX signer. In practise, it means that listener will subscribe
// for only notary requests that are going to be paid with passed hash.
//
// Must not be called after Listen or ListenWithError.
EnableNotarySupport(util.Uint160, client.AlphabetKeys, BlockCounter)
// SetNotaryParser must set the parser of particular notary request event.
//
// Parser of each event must be set once. All parsers must be set before Listen call.
//
// Must ignore nil parsers and all calls after listener has been started.
//
// Has no effect if EnableNotarySupport was not called before Listen or ListenWithError.
SetNotaryParser(NotaryParserInfo)
// RegisterNotaryHandler must register the event handler for particular notification event of contract.
//
// The specified handler must be called after each capture and parsing of the event.
//
// Must ignore nil handlers.
//
// Has no effect if EnableNotarySupport was not called before Listen or ListenWithError.
RegisterNotaryHandler(NotaryHandlerInfo)
// RegisterBlockHandler must register chain block handler.
//
// The specified handler must be called after each capture and parsing of the new block from chain.
//
// Must ignore nil handlers.
RegisterBlockHandler(BlockHandler)
// Stop must stop the event listener.
Stop()
}
// ListenerParams is a group of parameters
// for Listener constructor.
type ListenerParams struct {
Logger *zap.Logger
Subscriber subscriber.Subscriber
WorkerPoolCapacity int
}
type listener struct {
mtx sync.RWMutex
startOnce, stopOnce sync.Once
started bool
notificationParsers map[scriptHashWithType]NotificationParser
notificationHandlers map[scriptHashWithType][]Handler
listenNotary bool
notaryEventsPreparator NotaryPreparator
notaryParsers map[notaryRequestTypes]NotaryParser
notaryHandlers map[notaryRequestTypes]Handler
notaryMainTXSigner util.Uint160 // filter for notary subscription
log *zap.Logger
subscriber subscriber.Subscriber
blockHandlers []BlockHandler
pool *ants.Pool
}
const newListenerFailMsg = "could not instantiate Listener"
var (
errNilLogger = errors.New("nil logger")
errNilSubscriber = errors.New("nil event subscriber")
)
// Listen starts the listening for events with registered handlers.
//
// Executes once, all subsequent calls do nothing.
//
// Returns an error if listener was already started.
func (l *listener) Listen(ctx context.Context) {
l.startOnce.Do(func() {
if err := l.listen(ctx, nil); err != nil {
l.log.Error("could not start listen to events",
zap.String("error", err.Error()),
)
}
})
}
// ListenWithError starts the listening for events with registered handlers and
// passing error message to intError channel if subscriber channel has been closed.
//
// Executes once, all subsequent calls do nothing.
//
// Returns an error if listener was already started.
func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
l.startOnce.Do(func() {
if err := l.listen(ctx, intError); err != nil {
l.log.Error("could not start listen to events",
zap.String("error", err.Error()),
)
intError <- err
}
})
}
func (l *listener) listen(ctx context.Context, intError chan<- error) error {
// create the list of listening contract hashes
hashes := make([]util.Uint160, 0)
// fill the list with the contracts with set event parsers.
l.mtx.RLock()
for hashType := range l.notificationParsers {
scHash := hashType.ScriptHash()
// prevent repetitions
for _, hash := range hashes {
if hash.Equals(scHash) {
continue
}
}
hashes = append(hashes, hashType.ScriptHash())
}
// mark listener as started
l.started = true
l.mtx.RUnlock()
chEvent, err := l.subscriber.SubscribeForNotification(hashes...)
if err != nil {
return err
}
l.listenLoop(ctx, chEvent, intError)
return nil
}
func (l *listener) listenLoop(ctx context.Context, chEvent <-chan *subscriptions.NotificationEvent, intErr chan<- error) {
var (
blockChan <-chan *block.Block
notaryChan <-chan *subscriptions.NotaryRequestEvent
err error
)
if len(l.blockHandlers) > 0 {
if blockChan, err = l.subscriber.BlockNotifications(); err != nil {
if intErr != nil {
intErr <- fmt.Errorf("could not open block notifications channel: %w", err)
} else {
l.log.Debug("could not open block notifications channel",
zap.String("error", err.Error()),
)
}
return
}
} else {
blockChan = make(chan *block.Block)
}
if l.listenNotary {
if notaryChan, err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil {
if intErr != nil {
intErr <- fmt.Errorf("could not open notary notifications channel: %w", err)
} else {
l.log.Debug("could not open notary notifications channel",
zap.String("error", err.Error()),
)
}
return
}
}
loop:
for {
select {
case <-ctx.Done():
l.log.Info("stop event listener by context",
zap.String("reason", ctx.Err().Error()),
)
break loop
case notifyEvent, ok := <-chEvent:
if !ok {
l.log.Warn("stop event listener by notification channel")
if intErr != nil {
intErr <- errors.New("event subscriber connection has been terminated")
}
break loop
} else if notifyEvent == nil {
l.log.Warn("nil notification event was caught")
continue loop
}
l.parseAndHandleNotification(notifyEvent)
case notaryEvent, ok := <-notaryChan:
if !ok {
l.log.Warn("stop event listener by notary channel")
if intErr != nil {
intErr <- errors.New("notary event subscriber connection has been terminated")
}
break loop
} else if notaryEvent == nil {
l.log.Warn("nil notary event was caught")
continue loop
}
if err = l.pool.Submit(func() {
l.parseAndHandleNotary(notaryEvent)
}); err != nil {
l.log.Warn("listener worker pool drained",
zap.Int("capacity", l.pool.Cap()))
}
case b, ok := <-blockChan:
if !ok {
l.log.Warn("stop event listener by block channel")
if intErr != nil {
intErr <- errors.New("new block notification channel is closed")
}
break loop
} else if b == nil {
l.log.Warn("nil block was caught")
continue loop
}
if err = l.pool.Submit(func() {
for i := range l.blockHandlers {
l.blockHandlers[i](b)
}
}); err != nil {
l.log.Warn("listener worker pool drained",
zap.Int("capacity", l.pool.Cap()))
}
}
}
}
func (l *listener) parseAndHandleNotification(notifyEvent *subscriptions.NotificationEvent) {
log := l.log.With(
zap.String("script hash LE", notifyEvent.ScriptHash.StringLE()),
)
// calculate event type from bytes
typEvent := TypeFromString(notifyEvent.Name)
log = log.With(
zap.String("event type", notifyEvent.Name),
)
// get the event parser
keyEvent := scriptHashWithType{}
keyEvent.SetScriptHash(notifyEvent.ScriptHash)
keyEvent.SetType(typEvent)
l.mtx.RLock()
parser, ok := l.notificationParsers[keyEvent]
l.mtx.RUnlock()
if !ok {
log.Debug("event parser not set")
return
}
// parse the notification event
event, err := parser(notifyEvent)
if err != nil {
log.Warn("could not parse notification event",
zap.String("error", err.Error()),
)
return
}
// handler the event
l.mtx.RLock()
handlers := l.notificationHandlers[keyEvent]
l.mtx.RUnlock()
if len(handlers) == 0 {
log.Info("notification handlers for parsed notification event were not registered",
zap.Any("event", event),
)
return
}
for _, handler := range handlers {
handler(event)
}
}
func (l *listener) parseAndHandleNotary(nr *subscriptions.NotaryRequestEvent) {
// prepare the notary event
notaryEvent, err := l.notaryEventsPreparator.Prepare(nr.NotaryRequest)
if err != nil {
switch {
case errors.Is(err, ErrTXAlreadyHandled):
case errors.Is(err, ErrMainTXExpired):
l.log.Warn("skip expired main TX notary event",
zap.String("error", err.Error()),
)
default:
l.log.Warn("could not prepare and validate notary event",
zap.String("error", err.Error()),
)
}
return
}
log := l.log.With(
zap.String("contract", notaryEvent.ScriptHash().StringLE()),
zap.Stringer("method", notaryEvent.Type()),
)
notaryKey := notaryRequestTypes{}
notaryKey.SetMempoolType(nr.Type)
notaryKey.SetRequestType(notaryEvent.Type())
notaryKey.SetScriptHash(notaryEvent.ScriptHash())
// get notary parser
l.mtx.RLock()
parser, ok := l.notaryParsers[notaryKey]
l.mtx.RUnlock()
if !ok {
log.Debug("notary parser not set")
return
}
// parse the notary event
event, err := parser(notaryEvent)
if err != nil {
log.Warn("could not parse notary event",
zap.String("error", err.Error()),
)
return
}
// handle the event
l.mtx.RLock()
handler, ok := l.notaryHandlers[notaryKey]
l.mtx.RUnlock()
if !ok {
log.Info("notary handlers for parsed notification event were not registered",
zap.Any("event", event),
)
return
}
handler(event)
}
// SetNotificationParser sets the parser of particular contract event.
//
// Ignores nil and already set parsers.
// Ignores the parser if listener is started.
func (l *listener) SetNotificationParser(pi NotificationParserInfo) {
log := l.log.With(
zap.String("contract", pi.ScriptHash().StringLE()),
zap.Stringer("event_type", pi.getType()),
)
parser := pi.parser()
if parser == nil {
log.Info("ignore nil event parser")
return
}
l.mtx.Lock()
defer l.mtx.Unlock()
// check if the listener was started
if l.started {
log.Warn("listener has been already started, ignore parser")
return
}
// add event parser
if _, ok := l.notificationParsers[pi.scriptHashWithType]; !ok {
l.notificationParsers[pi.scriptHashWithType] = pi.parser()
}
log.Debug("registered new event parser")
}
// RegisterNotificationHandler registers the handler for particular notification event of contract.
//
// Ignores nil handlers.
// Ignores handlers of event without parser.
func (l *listener) RegisterNotificationHandler(hi NotificationHandlerInfo) {
log := l.log.With(
zap.String("contract", hi.ScriptHash().StringLE()),
zap.Stringer("event_type", hi.GetType()),
)
handler := hi.Handler()
if handler == nil {
log.Warn("ignore nil event handler")
return
}
// check if parser was set
l.mtx.RLock()
_, ok := l.notificationParsers[hi.scriptHashWithType]
l.mtx.RUnlock()
if !ok {
log.Warn("ignore handler of event w/o parser")
return
}
// add event handler
l.mtx.Lock()
l.notificationHandlers[hi.scriptHashWithType] = append(
l.notificationHandlers[hi.scriptHashWithType],
hi.Handler(),
)
l.mtx.Unlock()
log.Debug("registered new event handler")
}
// EnableNotarySupport enables notary request listening. Passed hash is
// notary mainTX signer. In practise, it means that listener will subscribe
// for only notary requests that are going to be paid with passed hash.
//
// Must not be called after Listen or ListenWithError.
func (l *listener) EnableNotarySupport(mainTXSigner util.Uint160, alphaKeys client.AlphabetKeys, bc BlockCounter) {
l.mtx.Lock()
defer l.mtx.Unlock()
l.listenNotary = true
l.notaryMainTXSigner = mainTXSigner
l.notaryHandlers = make(map[notaryRequestTypes]Handler)
l.notaryParsers = make(map[notaryRequestTypes]NotaryParser)
l.notaryEventsPreparator = notaryPreparator(
PreparatorPrm{
AlphaKeys: alphaKeys,
BlockCounter: bc,
},
)
}
// SetNotaryParser sets the parser of particular notary request event.
//
// Ignores nil and already set parsers.
// Ignores the parser if listener is started.
func (l *listener) SetNotaryParser(pi NotaryParserInfo) {
if !l.listenNotary {
return
}
log := l.log.With(
zap.Stringer("mempool_type", pi.GetMempoolType()),
zap.String("contract", pi.ScriptHash().StringLE()),
zap.Stringer("notary_type", pi.RequestType()),
)
parser := pi.parser()
if parser == nil {
log.Info("ignore nil notary event parser")
return
}
l.mtx.Lock()
defer l.mtx.Unlock()
// check if the listener was started
if l.started {
log.Warn("listener has been already started, ignore notary parser")
return
}
// add event parser
if _, ok := l.notaryParsers[pi.notaryRequestTypes]; !ok {
l.notaryParsers[pi.notaryRequestTypes] = pi.parser()
}
log.Info("registered new event parser")
}
// RegisterNotaryHandler registers the handler for particular notification notary request event.
//
// Ignores nil handlers.
// Ignores handlers of event without parser.
func (l *listener) RegisterNotaryHandler(hi NotaryHandlerInfo) {
if !l.listenNotary {
return
}
log := l.log.With(
zap.Stringer("mempool_type", hi.GetMempoolType()),
zap.String("contract", hi.ScriptHash().StringLE()),
zap.Stringer("notary type", hi.RequestType()),
)
handler := hi.Handler()
if handler == nil {
log.Warn("ignore nil notary event handler")
return
}
// check if parser was set
l.mtx.RLock()
_, ok := l.notaryParsers[hi.notaryRequestTypes]
l.mtx.RUnlock()
if !ok {
log.Warn("ignore handler of notary event w/o parser")
return
}
// add notary event handler
l.mtx.Lock()
l.notaryHandlers[hi.notaryRequestTypes] = hi.Handler()
l.mtx.Unlock()
log.Info("registered new event handler")
}
// Stop closes subscription channel with remote neo node.
func (l *listener) Stop() {
l.stopOnce.Do(func() {
l.subscriber.Close()
})
}
func (l *listener) RegisterBlockHandler(handler BlockHandler) {
if handler == nil {
l.log.Warn("ignore nil block handler")
return
}
l.blockHandlers = append(l.blockHandlers, handler)
}
// NewListener create the notification event listener instance and returns Listener interface.
func NewListener(p ListenerParams) (Listener, error) {
// defaultPoolCap is a default worker
// pool capacity if it was not specified
// via params
const defaultPoolCap = 10
switch {
case p.Logger == nil:
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilLogger)
case p.Subscriber == nil:
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilSubscriber)
}
poolCap := p.WorkerPoolCapacity
if poolCap == 0 {
poolCap = defaultPoolCap
}
pool, err := ants.NewPool(poolCap, ants.WithNonblocking(true))
if err != nil {
return nil, fmt.Errorf("could not init worker pool: %w", err)
}
return &listener{
notificationParsers: make(map[scriptHashWithType]NotificationParser),
notificationHandlers: make(map[scriptHashWithType][]Handler),
log: p.Logger,
subscriber: p.Subscriber,
pool: pool,
}, nil
}