package event import ( "context" "errors" "fmt" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) // Listener is an interface of smart contract notification event listener. type Listener interface { // Listen must start the event listener. // // Must listen to events with the parser installed. Listen(context.Context) // ListenWithError must start the event listener. // // Must listen to events with the parser installed. // // Must send error to channel if subscriber channel has been closed or // it could not be started. ListenWithError(context.Context, chan<- error) // SetNotificationParser must set the parser of particular contract event. // // Parser of each event must be set once. All parsers must be set before Listen call. // // Must ignore nil parsers and all calls after listener has been started. SetNotificationParser(NotificationParserInfo) // RegisterNotificationHandler must register the event handler for particular notification event of contract. // // The specified handler must be called after each capture and parsing of the event. // // Must ignore nil handlers. RegisterNotificationHandler(NotificationHandlerInfo) // EnableNotarySupport enables notary request listening. Passed hash is // notary mainTX signer. In practise, it means that listener will subscribe // for only notary requests that are going to be paid with passed hash. // // Must not be called after Listen or ListenWithError. EnableNotarySupport(util.Uint160, client.AlphabetKeys, BlockCounter) // SetNotaryParser must set the parser of particular notary request event. // // Parser of each event must be set once. All parsers must be set before Listen call. // // Must ignore nil parsers and all calls after listener has been started. // // Has no effect if EnableNotarySupport was not called before Listen or ListenWithError. SetNotaryParser(NotaryParserInfo) // RegisterNotaryHandler must register the event handler for particular notification event of contract. // // The specified handler must be called after each capture and parsing of the event. // // Must ignore nil handlers. // // Has no effect if EnableNotarySupport was not called before Listen or ListenWithError. RegisterNotaryHandler(NotaryHandlerInfo) // RegisterBlockHandler must register chain block handler. // // The specified handler must be called after each capture and parsing of the new block from chain. // // Must ignore nil handlers. RegisterBlockHandler(BlockHandler) // Stop must stop the event listener. Stop() } // ListenerParams is a group of parameters // for Listener constructor. type ListenerParams struct { Logger *logger.Logger Subscriber subscriber.Subscriber WorkerPoolCapacity int } type listener struct { mtx sync.RWMutex wg sync.WaitGroup startOnce, stopOnce sync.Once started bool notificationParsers map[scriptHashWithType]NotificationParser notificationHandlers map[scriptHashWithType][]Handler listenNotary bool notaryEventsPreparator NotaryPreparator notaryParsers map[notaryRequestTypes]NotaryParser notaryHandlers map[notaryRequestTypes]Handler notaryMainTXSigner util.Uint160 // filter for notary subscription log *logger.Logger subscriber subscriber.Subscriber blockHandlers []BlockHandler pool *ants.Pool } const newListenerFailMsg = "could not instantiate Listener" var ( errNilLogger = errors.New("nil logger") errNilSubscriber = errors.New("nil event subscriber") errNotificationSubscrConnectionTerminated = errors.New("event subscriber connection has been terminated") errNotarySubscrConnectionTerminated = errors.New("notary event subscriber connection has been terminated") errBlockNotificationChannelClosed = errors.New("new block notification channel is closed") ) // Listen starts the listening for events with registered handlers. // // Executes once, all subsequent calls do nothing. // // Returns an error if listener was already started. func (l *listener) Listen(ctx context.Context) { l.startOnce.Do(func() { l.wg.Add(1) defer l.wg.Done() if err := l.listen(ctx, nil); err != nil { l.log.Error(logs.EventCouldNotStartListenToEvents, zap.String("error", err.Error()), ) } }) } // ListenWithError starts the listening for events with registered handlers and // passing error message to intError channel if subscriber channel has been closed. // // Executes once, all subsequent calls do nothing. // // Returns an error if listener was already started. func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) { l.startOnce.Do(func() { l.wg.Add(1) defer l.wg.Done() if err := l.listen(ctx, intError); err != nil { l.log.Error(logs.EventCouldNotStartListenToEvents, zap.String("error", err.Error()), ) l.sendError(ctx, intError, err) } }) } func (l *listener) listen(ctx context.Context, intError chan<- error) error { // mark listener as started l.started = true subErrCh := make(chan error) go l.subscribe(subErrCh) l.listenLoop(ctx, intError, subErrCh) return nil } func (l *listener) subscribe(errCh chan error) { l.wg.Add(1) defer l.wg.Done() // create the list of listening contract hashes hashes := make([]util.Uint160, 0) // fill the list with the contracts with set event parsers. l.mtx.RLock() for hashType := range l.notificationParsers { scHash := hashType.ScriptHash() // prevent repetitions for _, hash := range hashes { if hash.Equals(scHash) { continue } } hashes = append(hashes, hashType.ScriptHash()) } l.mtx.RUnlock() err := l.subscriber.SubscribeForNotification(hashes...) if err != nil { errCh <- fmt.Errorf("could not subscribe for notifications: %w", err) return } if len(l.blockHandlers) > 0 { if err = l.subscriber.BlockNotifications(); err != nil { errCh <- fmt.Errorf("could not subscribe for blocks: %w", err) return } } if l.listenNotary { if err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil { errCh <- fmt.Errorf("could not subscribe for notary requests: %w", err) return } } } func (l *listener) sendError(ctx context.Context, intErr chan<- error, err error) bool { if intErr == nil { return false } // This select required because were are reading from error channel and closing listener // in the same routine when shutting down node. select { case <-ctx.Done(): l.log.Info(logs.EventStopEventListenerByContext, zap.String("reason", ctx.Err().Error()), ) return false case intErr <- err: return true } } func (l *listener) listenLoop(ctx context.Context, intErr chan<- error, subErrCh chan error) { chs := l.subscriber.NotificationChannels() loop: for { select { case err := <-subErrCh: if !l.sendError(ctx, intErr, err) { l.log.Error(logs.EventStopEventListenerByError, zap.Error(err)) } break loop case <-ctx.Done(): l.log.Info(logs.EventStopEventListenerByContext, zap.String("reason", ctx.Err().Error()), ) break loop case notifyEvent, ok := <-chs.NotificationsCh: if !ok { l.log.Warn(logs.EventStopEventListenerByNotificationChannel) l.sendError(ctx, intErr, errNotificationSubscrConnectionTerminated) break loop } else if notifyEvent == nil { l.log.Warn(logs.EventNilNotificationEventWasCaught) continue loop } l.handleNotifyEvent(notifyEvent) case notaryEvent, ok := <-chs.NotaryRequestsCh: if !ok { l.log.Warn(logs.EventStopEventListenerByNotaryChannel) l.sendError(ctx, intErr, errNotarySubscrConnectionTerminated) break loop } else if notaryEvent == nil { l.log.Warn(logs.EventNilNotaryEventWasCaught) continue loop } l.handleNotaryEvent(notaryEvent) case b, ok := <-chs.BlockCh: if !ok { l.log.Warn(logs.EventStopEventListenerByBlockChannel) l.sendError(ctx, intErr, errBlockNotificationChannelClosed) break loop } else if b == nil { l.log.Warn(logs.EventNilBlockWasCaught) continue loop } l.handleBlockEvent(b) } } } func (l *listener) handleBlockEvent(b *block.Block) { if err := l.pool.Submit(func() { for i := range l.blockHandlers { l.blockHandlers[i](b) } }); err != nil { l.log.Warn(logs.EventListenerWorkerPoolDrained, zap.Int("capacity", l.pool.Cap())) } } func (l *listener) handleNotaryEvent(notaryEvent *result.NotaryRequestEvent) { if err := l.pool.Submit(func() { l.parseAndHandleNotary(notaryEvent) }); err != nil { l.log.Warn(logs.EventListenerWorkerPoolDrained, zap.Int("capacity", l.pool.Cap())) } } func (l *listener) handleNotifyEvent(notifyEvent *state.ContainedNotificationEvent) { if err := l.pool.Submit(func() { l.parseAndHandleNotification(notifyEvent) }); err != nil { l.log.Warn(logs.EventListenerWorkerPoolDrained, zap.Int("capacity", l.pool.Cap())) } } func (l *listener) parseAndHandleNotification(notifyEvent *state.ContainedNotificationEvent) { log := l.log.With( zap.String("script hash LE", notifyEvent.ScriptHash.StringLE()), ) // calculate event type from bytes typEvent := TypeFromString(notifyEvent.Name) log = log.With( zap.String("event type", notifyEvent.Name), ) // get the event parser keyEvent := scriptHashWithType{} keyEvent.SetScriptHash(notifyEvent.ScriptHash) keyEvent.SetType(typEvent) l.mtx.RLock() parser, ok := l.notificationParsers[keyEvent] l.mtx.RUnlock() if !ok { log.Debug(logs.EventEventParserNotSet) return } // parse the notification event event, err := parser(notifyEvent) if err != nil { log.Warn(logs.EventCouldNotParseNotificationEvent, zap.String("error", err.Error()), ) return } // handler the event l.mtx.RLock() handlers := l.notificationHandlers[keyEvent] l.mtx.RUnlock() if len(handlers) == 0 { log.Info(logs.EventNotificationHandlersForParsedNotificationEventWereNotRegistered, zap.Any("event", event), ) return } for _, handler := range handlers { handler(event) } } func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) { // prepare the notary event notaryEvent, err := l.notaryEventsPreparator.Prepare(nr.NotaryRequest) if err != nil { var expErr *ExpiredTXError switch { case errors.Is(err, ErrTXAlreadyHandled): case errors.As(err, &expErr): l.log.Warn(logs.EventSkipExpiredMainTXNotaryEvent, zap.String("error", err.Error()), zap.Uint32("current_block_height", expErr.CurrentBlockHeight), zap.Uint32("fallback_tx_not_valid_before_height", expErr.FallbackTXNotValidBeforeHeight), ) default: l.log.Warn(logs.EventCouldNotPrepareAndValidateNotaryEvent, zap.String("error", err.Error()), ) } return } log := l.log.With( zap.String("contract", notaryEvent.ScriptHash().StringLE()), zap.Stringer("method", notaryEvent.Type()), ) notaryKey := notaryRequestTypes{} notaryKey.SetMempoolType(nr.Type) notaryKey.SetRequestType(notaryEvent.Type()) notaryKey.SetScriptHash(notaryEvent.ScriptHash()) // get notary parser l.mtx.RLock() parser, ok := l.notaryParsers[notaryKey] l.mtx.RUnlock() if !ok { log.Debug(logs.EventNotaryParserNotSet) return } // parse the notary event event, err := parser(notaryEvent) if err != nil { log.Warn(logs.EventCouldNotParseNotaryEvent, zap.String("error", err.Error()), ) return } // handle the event l.mtx.RLock() handler, ok := l.notaryHandlers[notaryKey] l.mtx.RUnlock() if !ok { log.Info(logs.EventNotaryHandlersForParsedNotificationEventWereNotRegistered, zap.Any("event", event), ) return } handler(event) } // SetNotificationParser sets the parser of particular contract event. // // Ignores nil and already set parsers. // Ignores the parser if listener is started. func (l *listener) SetNotificationParser(pi NotificationParserInfo) { log := l.log.With( zap.String("contract", pi.ScriptHash().StringLE()), zap.Stringer("event_type", pi.getType()), ) parser := pi.parser() if parser == nil { log.Info(logs.EventIgnoreNilEventParser) return } l.mtx.Lock() defer l.mtx.Unlock() // check if the listener was started if l.started { log.Warn(logs.EventListenerHasBeenAlreadyStartedIgnoreParser) return } // add event parser if _, ok := l.notificationParsers[pi.scriptHashWithType]; !ok { l.notificationParsers[pi.scriptHashWithType] = pi.parser() } log.Debug(logs.EventRegisteredNewEventParser) } // RegisterNotificationHandler registers the handler for particular notification event of contract. // // Ignores nil handlers. // Ignores handlers of event without parser. func (l *listener) RegisterNotificationHandler(hi NotificationHandlerInfo) { log := l.log.With( zap.String("contract", hi.ScriptHash().StringLE()), zap.Stringer("event_type", hi.GetType()), ) handler := hi.Handler() if handler == nil { log.Warn(logs.EventIgnoreNilEventHandler) return } // check if parser was set l.mtx.RLock() _, ok := l.notificationParsers[hi.scriptHashWithType] l.mtx.RUnlock() if !ok { log.Warn(logs.EventIgnoreHandlerOfEventWoParser) return } // add event handler l.mtx.Lock() l.notificationHandlers[hi.scriptHashWithType] = append( l.notificationHandlers[hi.scriptHashWithType], hi.Handler(), ) l.mtx.Unlock() log.Debug(logs.EventRegisteredNewEventHandler) } // EnableNotarySupport enables notary request listening. Passed hash is // notary mainTX signer. In practise, it means that listener will subscribe // for only notary requests that are going to be paid with passed hash. // // Must not be called after Listen or ListenWithError. func (l *listener) EnableNotarySupport(mainTXSigner util.Uint160, alphaKeys client.AlphabetKeys, bc BlockCounter) { l.mtx.Lock() defer l.mtx.Unlock() l.listenNotary = true l.notaryMainTXSigner = mainTXSigner l.notaryHandlers = make(map[notaryRequestTypes]Handler) l.notaryParsers = make(map[notaryRequestTypes]NotaryParser) l.notaryEventsPreparator = notaryPreparator( PreparatorPrm{ AlphaKeys: alphaKeys, BlockCounter: bc, }, ) } // SetNotaryParser sets the parser of particular notary request event. // // Ignores nil and already set parsers. // Ignores the parser if listener is started. func (l *listener) SetNotaryParser(pi NotaryParserInfo) { if !l.listenNotary { return } log := l.log.With( zap.Stringer("mempool_type", pi.GetMempoolType()), zap.String("contract", pi.ScriptHash().StringLE()), zap.Stringer("notary_type", pi.RequestType()), ) parser := pi.parser() if parser == nil { log.Info(logs.EventIgnoreNilNotaryEventParser) return } l.mtx.Lock() defer l.mtx.Unlock() // check if the listener was started if l.started { log.Warn(logs.EventListenerHasBeenAlreadyStartedIgnoreNotaryParser) return } // add event parser if _, ok := l.notaryParsers[pi.notaryRequestTypes]; !ok { l.notaryParsers[pi.notaryRequestTypes] = pi.parser() } log.Info(logs.EventRegisteredNewEventParser) } // RegisterNotaryHandler registers the handler for particular notification notary request event. // // Ignores nil handlers. // Ignores handlers of event without parser. func (l *listener) RegisterNotaryHandler(hi NotaryHandlerInfo) { if !l.listenNotary { return } log := l.log.With( zap.Stringer("mempool_type", hi.GetMempoolType()), zap.String("contract", hi.ScriptHash().StringLE()), zap.Stringer("notary type", hi.RequestType()), ) handler := hi.Handler() if handler == nil { log.Warn(logs.EventIgnoreNilNotaryEventHandler) return } // check if parser was set l.mtx.RLock() _, ok := l.notaryParsers[hi.notaryRequestTypes] l.mtx.RUnlock() if !ok { log.Warn(logs.EventIgnoreHandlerOfNotaryEventWoParser) return } // add notary event handler l.mtx.Lock() l.notaryHandlers[hi.notaryRequestTypes] = hi.Handler() l.mtx.Unlock() log.Info(logs.EventRegisteredNewEventHandler) } // Stop closes subscription channel with remote neo node. func (l *listener) Stop() { l.stopOnce.Do(func() { l.subscriber.Close() }) l.wg.Wait() } func (l *listener) RegisterBlockHandler(handler BlockHandler) { if handler == nil { l.log.Warn(logs.EventIgnoreNilBlockHandler) return } l.blockHandlers = append(l.blockHandlers, handler) } // NewListener create the notification event listener instance and returns Listener interface. func NewListener(p ListenerParams) (Listener, error) { switch { case p.Logger == nil: return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilLogger) case p.Subscriber == nil: return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilSubscriber) } // The pool here must be blocking, otherwise notifications could be dropped. // The default capacity is 0, which means "infinite". pool, err := ants.NewPool(p.WorkerPoolCapacity) if err != nil { return nil, fmt.Errorf("could not init worker pool: %w", err) } return &listener{ notificationParsers: make(map[scriptHashWithType]NotificationParser), notificationHandlers: make(map[scriptHashWithType][]Handler), log: p.Logger, subscriber: p.Subscriber, pool: pool, }, nil }