frostfs-node/pkg/morph/event/listener.go

661 lines
17 KiB
Go

package event
import (
"context"
"errors"
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// Listener is an interface of smart contract notification event listener.
type Listener interface {
// Listen must start the event listener.
//
// Must listen to events with the parser installed.
Listen(context.Context)
// ListenWithError must start the event listener.
//
// Must listen to events with the parser installed.
//
// Must send error to channel if subscriber channel has been closed or
// it could not be started.
ListenWithError(context.Context, chan<- error)
// SetNotificationParser must set the parser of particular contract event.
//
// Parser of each event must be set once. All parsers must be set before Listen call.
//
// Must ignore nil parsers and all calls after listener has been started.
SetNotificationParser(NotificationParserInfo)
// RegisterNotificationHandler must register the event handler for particular notification event of contract.
//
// The specified handler must be called after each capture and parsing of the event.
//
// Must ignore nil handlers.
RegisterNotificationHandler(NotificationHandlerInfo)
// EnableNotarySupport enables notary request listening. Passed hash is
// notary mainTX signer. In practise, it means that listener will subscribe
// for only notary requests that are going to be paid with passed hash.
//
// Must not be called after Listen or ListenWithError.
EnableNotarySupport(util.Uint160, client.AlphabetKeys, BlockCounter)
// SetNotaryParser must set the parser of particular notary request event.
//
// Parser of each event must be set once. All parsers must be set before Listen call.
//
// Must ignore nil parsers and all calls after listener has been started.
//
// Has no effect if EnableNotarySupport was not called before Listen or ListenWithError.
SetNotaryParser(NotaryParserInfo)
// RegisterNotaryHandler must register the event handler for particular notification event of contract.
//
// The specified handler must be called after each capture and parsing of the event.
//
// Must ignore nil handlers.
//
// Has no effect if EnableNotarySupport was not called before Listen or ListenWithError.
RegisterNotaryHandler(NotaryHandlerInfo)
// RegisterBlockHandler must register chain block handler.
//
// The specified handler must be called after each capture and parsing of the new block from chain.
//
// Must ignore nil handlers.
RegisterBlockHandler(BlockHandler)
// Stop must stop the event listener.
Stop()
}
// ListenerParams is a group of parameters
// for Listener constructor.
type ListenerParams struct {
Logger *logger.Logger
Subscriber subscriber.Subscriber
WorkerPoolCapacity int
}
type listener struct {
mtx sync.RWMutex
wg sync.WaitGroup
startOnce, stopOnce sync.Once
started bool
notificationParsers map[scriptHashWithType]NotificationParser
notificationHandlers map[scriptHashWithType][]Handler
listenNotary bool
notaryEventsPreparator NotaryPreparator
notaryParsers map[notaryRequestTypes]NotaryParser
notaryHandlers map[notaryRequestTypes]Handler
notaryMainTXSigner util.Uint160 // filter for notary subscription
log *logger.Logger
subscriber subscriber.Subscriber
blockHandlers []BlockHandler
pool *ants.Pool
}
const newListenerFailMsg = "could not instantiate Listener"
var (
errNilLogger = errors.New("nil logger")
errNilSubscriber = errors.New("nil event subscriber")
errNotificationSubscrConnectionTerminated = errors.New("event subscriber connection has been terminated")
errNotarySubscrConnectionTerminated = errors.New("notary event subscriber connection has been terminated")
errBlockNotificationChannelClosed = errors.New("new block notification channel is closed")
)
// Listen starts the listening for events with registered handlers.
//
// Executes once, all subsequent calls do nothing.
//
// Returns an error if listener was already started.
func (l *listener) Listen(ctx context.Context) {
l.startOnce.Do(func() {
l.wg.Add(1)
defer l.wg.Done()
if err := l.listen(ctx, nil); err != nil {
l.log.Error(logs.EventCouldNotStartListenToEvents,
zap.String("error", err.Error()),
)
}
})
}
// ListenWithError starts the listening for events with registered handlers and
// passing error message to intError channel if subscriber channel has been closed.
//
// Executes once, all subsequent calls do nothing.
//
// Returns an error if listener was already started.
func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
l.startOnce.Do(func() {
l.wg.Add(1)
defer l.wg.Done()
if err := l.listen(ctx, intError); err != nil {
l.log.Error(logs.EventCouldNotStartListenToEvents,
zap.String("error", err.Error()),
)
l.sendError(ctx, intError, err)
}
})
}
func (l *listener) listen(ctx context.Context, intError chan<- error) error {
// mark listener as started
l.started = true
subErrCh := make(chan error)
go l.subscribe(subErrCh)
l.listenLoop(ctx, intError, subErrCh)
return nil
}
func (l *listener) subscribe(errCh chan error) {
l.wg.Add(1)
defer l.wg.Done()
// create the list of listening contract hashes
hashes := make([]util.Uint160, 0)
// fill the list with the contracts with set event parsers.
l.mtx.RLock()
for hashType := range l.notificationParsers {
scHash := hashType.ScriptHash()
// prevent repetitions
for _, hash := range hashes {
if hash.Equals(scHash) {
continue
}
}
hashes = append(hashes, hashType.ScriptHash())
}
l.mtx.RUnlock()
err := l.subscriber.SubscribeForNotification(hashes...)
if err != nil {
errCh <- fmt.Errorf("could not subscribe for notifications: %w", err)
return
}
if len(l.blockHandlers) > 0 {
if err = l.subscriber.BlockNotifications(); err != nil {
errCh <- fmt.Errorf("could not subscribe for blocks: %w", err)
return
}
}
if l.listenNotary {
if err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil {
errCh <- fmt.Errorf("could not subscribe for notary requests: %w", err)
return
}
}
}
func (l *listener) sendError(ctx context.Context, intErr chan<- error, err error) bool {
if intErr == nil {
return false
}
// This select required because were are reading from error channel and closing listener
// in the same routine when shutting down node.
select {
case <-ctx.Done():
l.log.Info(logs.EventStopEventListenerByContext,
zap.String("reason", ctx.Err().Error()),
)
return false
case intErr <- err:
return true
}
}
func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) {
chs := l.subscriber.NotificationChannels()
loop:
for {
select {
case err := <-subErrCh:
if !l.sendError(ctx, intErr, err) {
l.log.Error(logs.EventStopEventListenerByError, zap.Error(err))
}
break loop
case <-ctx.Done():
l.log.Info(logs.EventStopEventListenerByContext,
zap.String("reason", ctx.Err().Error()),
)
break loop
case notifyEvent, ok := <-chs.NotificationsCh:
if !ok {
l.log.Warn(logs.EventStopEventListenerByNotificationChannel)
l.sendError(ctx, intErr, errNotificationSubscrConnectionTerminated)
break loop
} else if notifyEvent == nil {
l.log.Warn(logs.EventNilNotificationEventWasCaught)
continue loop
}
l.handleNotifyEvent(notifyEvent)
case notaryEvent, ok := <-chs.NotaryRequestsCh:
if !ok {
l.log.Warn(logs.EventStopEventListenerByNotaryChannel)
l.sendError(ctx, intErr, errNotarySubscrConnectionTerminated)
break loop
} else if notaryEvent == nil {
l.log.Warn(logs.EventNilNotaryEventWasCaught)
continue loop
}
l.handleNotaryEvent(notaryEvent)
case b, ok := <-chs.BlockCh:
if !ok {
l.log.Warn(logs.EventStopEventListenerByBlockChannel)
l.sendError(ctx, intErr, errBlockNotificationChannelClosed)
break loop
} else if b == nil {
l.log.Warn(logs.EventNilBlockWasCaught)
continue loop
}
l.handleBlockEvent(b)
}
}
}
func (l *listener) handleBlockEvent(b *block.Block) {
if err := l.pool.Submit(func() {
for i := range l.blockHandlers {
l.blockHandlers[i](b)
}
}); err != nil {
l.log.Warn(logs.EventListenerWorkerPoolDrained,
zap.Int("capacity", l.pool.Cap()))
}
}
func (l *listener) handleNotaryEvent(notaryEvent *result.NotaryRequestEvent) {
if err := l.pool.Submit(func() {
l.parseAndHandleNotary(notaryEvent)
}); err != nil {
l.log.Warn(logs.EventListenerWorkerPoolDrained,
zap.Int("capacity", l.pool.Cap()))
}
}
func (l *listener) handleNotifyEvent(notifyEvent *state.ContainedNotificationEvent) {
if err := l.pool.Submit(func() {
l.parseAndHandleNotification(notifyEvent)
}); err != nil {
l.log.Warn(logs.EventListenerWorkerPoolDrained,
zap.Int("capacity", l.pool.Cap()))
}
}
func (l *listener) parseAndHandleNotification(notifyEvent *state.ContainedNotificationEvent) {
log := l.log.With(
zap.String("script hash LE", notifyEvent.ScriptHash.StringLE()),
)
// calculate event type from bytes
typEvent := TypeFromString(notifyEvent.Name)
log = log.With(
zap.String("event type", notifyEvent.Name),
)
// get the event parser
keyEvent := scriptHashWithType{}
keyEvent.SetScriptHash(notifyEvent.ScriptHash)
keyEvent.SetType(typEvent)
l.mtx.RLock()
parser, ok := l.notificationParsers[keyEvent]
l.mtx.RUnlock()
if !ok {
log.Debug(logs.EventEventParserNotSet)
return
}
// parse the notification event
event, err := parser(notifyEvent)
if err != nil {
log.Warn(logs.EventCouldNotParseNotificationEvent,
zap.String("error", err.Error()),
)
return
}
// handler the event
l.mtx.RLock()
handlers := l.notificationHandlers[keyEvent]
l.mtx.RUnlock()
if len(handlers) == 0 {
log.Info(logs.EventNotificationHandlersForParsedNotificationEventWereNotRegistered,
zap.Any("event", event),
)
return
}
for _, handler := range handlers {
handler(event)
}
}
func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
// prepare the notary event
notaryEvent, err := l.notaryEventsPreparator.Prepare(nr.NotaryRequest)
if err != nil {
var expErr *ExpiredTXError
switch {
case errors.Is(err, ErrTXAlreadyHandled):
case errors.As(err, &expErr):
l.log.Warn(logs.EventSkipExpiredMainTXNotaryEvent,
zap.String("error", err.Error()),
zap.Uint32("current_block_height", expErr.CurrentBlockHeight),
zap.Uint32("fallback_tx_not_valid_before_height", expErr.FallbackTXNotValidBeforeHeight),
)
default:
l.log.Warn(logs.EventCouldNotPrepareAndValidateNotaryEvent,
zap.String("error", err.Error()),
)
}
return
}
log := l.log.With(
zap.String("contract", notaryEvent.ScriptHash().StringLE()),
zap.Stringer("method", notaryEvent.Type()),
)
notaryKey := notaryRequestTypes{}
notaryKey.SetMempoolType(nr.Type)
notaryKey.SetRequestType(notaryEvent.Type())
notaryKey.SetScriptHash(notaryEvent.ScriptHash())
// get notary parser
l.mtx.RLock()
parser, ok := l.notaryParsers[notaryKey]
l.mtx.RUnlock()
if !ok {
log.Debug(logs.EventNotaryParserNotSet)
return
}
// parse the notary event
event, err := parser(notaryEvent)
if err != nil {
log.Warn(logs.EventCouldNotParseNotaryEvent,
zap.String("error", err.Error()),
)
return
}
// handle the event
l.mtx.RLock()
handler, ok := l.notaryHandlers[notaryKey]
l.mtx.RUnlock()
if !ok {
log.Info(logs.EventNotaryHandlersForParsedNotificationEventWereNotRegistered,
zap.Any("event", event),
)
return
}
handler(event)
}
// SetNotificationParser sets the parser of particular contract event.
//
// Ignores nil and already set parsers.
// Ignores the parser if listener is started.
func (l *listener) SetNotificationParser(pi NotificationParserInfo) {
log := l.log.With(
zap.String("contract", pi.ScriptHash().StringLE()),
zap.Stringer("event_type", pi.getType()),
)
parser := pi.parser()
if parser == nil {
log.Info(logs.EventIgnoreNilEventParser)
return
}
l.mtx.Lock()
defer l.mtx.Unlock()
// check if the listener was started
if l.started {
log.Warn(logs.EventListenerHasBeenAlreadyStartedIgnoreParser)
return
}
// add event parser
if _, ok := l.notificationParsers[pi.scriptHashWithType]; !ok {
l.notificationParsers[pi.scriptHashWithType] = pi.parser()
}
log.Debug(logs.EventRegisteredNewEventParser)
}
// RegisterNotificationHandler registers the handler for particular notification event of contract.
//
// Ignores nil handlers.
// Ignores handlers of event without parser.
func (l *listener) RegisterNotificationHandler(hi NotificationHandlerInfo) {
log := l.log.With(
zap.String("contract", hi.ScriptHash().StringLE()),
zap.Stringer("event_type", hi.GetType()),
)
handler := hi.Handler()
if handler == nil {
log.Warn(logs.EventIgnoreNilEventHandler)
return
}
// check if parser was set
l.mtx.RLock()
_, ok := l.notificationParsers[hi.scriptHashWithType]
l.mtx.RUnlock()
if !ok {
log.Warn(logs.EventIgnoreHandlerOfEventWoParser)
return
}
// add event handler
l.mtx.Lock()
l.notificationHandlers[hi.scriptHashWithType] = append(
l.notificationHandlers[hi.scriptHashWithType],
hi.Handler(),
)
l.mtx.Unlock()
log.Debug(logs.EventRegisteredNewEventHandler)
}
// EnableNotarySupport enables notary request listening. Passed hash is
// notary mainTX signer. In practise, it means that listener will subscribe
// for only notary requests that are going to be paid with passed hash.
//
// Must not be called after Listen or ListenWithError.
func (l *listener) EnableNotarySupport(mainTXSigner util.Uint160, alphaKeys client.AlphabetKeys, bc BlockCounter) {
l.mtx.Lock()
defer l.mtx.Unlock()
l.listenNotary = true
l.notaryMainTXSigner = mainTXSigner
l.notaryHandlers = make(map[notaryRequestTypes]Handler)
l.notaryParsers = make(map[notaryRequestTypes]NotaryParser)
l.notaryEventsPreparator = notaryPreparator(
PreparatorPrm{
AlphaKeys: alphaKeys,
BlockCounter: bc,
},
)
}
// SetNotaryParser sets the parser of particular notary request event.
//
// Ignores nil and already set parsers.
// Ignores the parser if listener is started.
func (l *listener) SetNotaryParser(pi NotaryParserInfo) {
if !l.listenNotary {
return
}
log := l.log.With(
zap.Stringer("mempool_type", pi.GetMempoolType()),
zap.String("contract", pi.ScriptHash().StringLE()),
zap.Stringer("notary_type", pi.RequestType()),
)
parser := pi.parser()
if parser == nil {
log.Info(logs.EventIgnoreNilNotaryEventParser)
return
}
l.mtx.Lock()
defer l.mtx.Unlock()
// check if the listener was started
if l.started {
log.Warn(logs.EventListenerHasBeenAlreadyStartedIgnoreNotaryParser)
return
}
// add event parser
if _, ok := l.notaryParsers[pi.notaryRequestTypes]; !ok {
l.notaryParsers[pi.notaryRequestTypes] = pi.parser()
}
log.Info(logs.EventRegisteredNewEventParser)
}
// RegisterNotaryHandler registers the handler for particular notification notary request event.
//
// Ignores nil handlers.
// Ignores handlers of event without parser.
func (l *listener) RegisterNotaryHandler(hi NotaryHandlerInfo) {
if !l.listenNotary {
return
}
log := l.log.With(
zap.Stringer("mempool_type", hi.GetMempoolType()),
zap.String("contract", hi.ScriptHash().StringLE()),
zap.Stringer("notary type", hi.RequestType()),
)
handler := hi.Handler()
if handler == nil {
log.Warn(logs.EventIgnoreNilNotaryEventHandler)
return
}
// check if parser was set
l.mtx.RLock()
_, ok := l.notaryParsers[hi.notaryRequestTypes]
l.mtx.RUnlock()
if !ok {
log.Warn(logs.EventIgnoreHandlerOfNotaryEventWoParser)
return
}
// add notary event handler
l.mtx.Lock()
l.notaryHandlers[hi.notaryRequestTypes] = hi.Handler()
l.mtx.Unlock()
log.Info(logs.EventRegisteredNewEventHandler)
}
// Stop closes subscription channel with remote neo node.
func (l *listener) Stop() {
l.stopOnce.Do(func() {
l.subscriber.Close()
})
l.wg.Wait()
}
func (l *listener) RegisterBlockHandler(handler BlockHandler) {
if handler == nil {
l.log.Warn(logs.EventIgnoreNilBlockHandler)
return
}
l.blockHandlers = append(l.blockHandlers, handler)
}
// NewListener create the notification event listener instance and returns Listener interface.
func NewListener(p ListenerParams) (Listener, error) {
switch {
case p.Logger == nil:
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilLogger)
case p.Subscriber == nil:
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilSubscriber)
}
// The pool here must be blocking, otherwise notifications could be dropped.
// The default capacity is 0, which means "infinite".
pool, err := ants.NewPool(p.WorkerPoolCapacity)
if err != nil {
return nil, fmt.Errorf("could not init worker pool: %w", err)
}
return &listener{
notificationParsers: make(map[scriptHashWithType]NotificationParser),
notificationHandlers: make(map[scriptHashWithType][]Handler),
log: p.Logger,
subscriber: p.Subscriber,
pool: pool,
}, nil
}