frostfs-node/pkg/morph/event/listener.go

390 lines
8.8 KiB
Go
Raw Normal View History

package event
import (
"context"
"errors"
"fmt"
"sync"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/util"
2020-07-24 13:54:03 +00:00
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/nspcc-dev/neofs-node/pkg/morph/subscriber"
"go.uber.org/zap"
)
// Listener is an interface of smart contract notification event listener.
type Listener interface {
// Must start the event listener.
//
// Must listen to events with the parser installed.
Listen(context.Context)
// Must start the event listener.
//
// Must listen to events with the parser installed.
//
// Must send error to channel if subscriber channel has been closed or
// it could not be started.
ListenWithError(context.Context, chan<- error)
// Must set the parser of particular contract event.
//
// Parser of each event must be set once. All parsers must be set before Listen call.
//
// Must ignore nil parsers and all calls after listener has been started.
SetParser(ParserInfo)
// Must register the event handler for particular notification event of contract.
//
// The specified handler must be called after each capture and parsing of the event
//
// Must ignore nil handlers.
RegisterHandler(HandlerInfo)
2020-07-24 13:54:03 +00:00
// Must stop the event listener.
Stop()
// Must register chain block handler.
//
// The specified handler must be called after each capture and parsing of the new block from chain.
//
// Must ignore nil handlers.
RegisterBlockHandler(BlockHandler)
}
// ListenerParams is a group of parameters
// for Listener constructor.
type ListenerParams struct {
Logger *zap.Logger
Subscriber subscriber.Subscriber
}
type listener struct {
mtx *sync.RWMutex
once *sync.Once
started bool
parsers map[scriptHashWithType]Parser
handlers map[scriptHashWithType][]Handler
log *zap.Logger
subscriber subscriber.Subscriber
blockHandlers []BlockHandler
}
2020-07-24 13:54:03 +00:00
const newListenerFailMsg = "could not instantiate Listener"
2020-07-24 13:54:03 +00:00
var (
errNilLogger = errors.New("nil logger")
2020-07-24 13:54:03 +00:00
errNilSubscriber = errors.New("nil event subscriber")
)
// Listen starts the listening for events with registered handlers.
//
// Executes once, all subsequent calls do nothing.
//
// Returns an error if listener was already started.
func (s listener) Listen(ctx context.Context) {
s.once.Do(func() {
if err := s.listen(ctx, nil); err != nil {
s.log.Error("could not start listen to events",
zap.String("error", err.Error()),
)
}
})
}
// ListenWithError starts the listening for events with registered handlers and
// passing error message to intError channel if subscriber channel has been closed.
//
// Executes once, all subsequent calls do nothing.
//
// Returns an error if listener was already started.
func (s listener) ListenWithError(ctx context.Context, intError chan<- error) {
s.once.Do(func() {
if err := s.listen(ctx, intError); err != nil {
s.log.Error("could not start listen to events",
zap.String("error", err.Error()),
)
intError <- err
}
})
}
func (s listener) listen(ctx context.Context, intError chan<- error) error {
// create the list of listening contract hashes
hashes := make([]util.Uint160, 0)
// fill the list with the contracts with set event parsers.
s.mtx.RLock()
for hashType := range s.parsers {
2020-07-24 13:54:03 +00:00
scHash := hashType.ScriptHash()
// prevent repetitions
for _, hash := range hashes {
if hash.Equals(scHash) {
continue
}
}
2020-07-24 13:54:03 +00:00
hashes = append(hashes, hashType.ScriptHash())
}
// mark listener as started
s.started = true
s.mtx.RUnlock()
chEvent, err := s.subscriber.SubscribeForNotification(hashes...)
if err != nil {
return err
}
s.listenLoop(ctx, chEvent, intError)
return nil
}
func (s listener) listenLoop(ctx context.Context, chEvent <-chan *state.NotificationEvent, intErr chan<- error) {
var blockChan <-chan *block.Block
if len(s.blockHandlers) > 0 {
var err error
if blockChan, err = s.subscriber.BlockNotifications(); err != nil {
if intErr != nil {
intErr <- fmt.Errorf("could not open block notifications channel: %w", err)
} else {
s.log.Debug("could not open block notifications channel",
zap.String("error", err.Error()),
)
}
return
}
} else {
blockChan = make(chan *block.Block)
}
loop:
for {
select {
case <-ctx.Done():
2020-07-24 13:54:03 +00:00
s.log.Info("stop event listener by context",
zap.String("reason", ctx.Err().Error()),
)
break loop
case notifyEvent, ok := <-chEvent:
if !ok {
s.log.Warn("stop event listener by channel")
if intErr != nil {
intErr <- errors.New("event subscriber connection has been terminated")
}
break loop
} else if notifyEvent == nil {
s.log.Warn("nil notification event was caught")
continue loop
}
s.parseAndHandle(notifyEvent)
case b, ok := <-blockChan:
if !ok {
s.log.Warn("stop event listener by block channel")
if intErr != nil {
intErr <- errors.New("new block notification channel is closed")
}
break loop
} else if b == nil {
s.log.Warn("nil block was caught")
continue loop
}
// TODO: consider asynchronous execution
for i := range s.blockHandlers {
s.blockHandlers[i](b)
}
}
}
}
func (s listener) parseAndHandle(notifyEvent *state.NotificationEvent) {
log := s.log.With(
zap.String("script hash LE", notifyEvent.ScriptHash.StringLE()),
)
// stack item must be an array of items
arr, err := client.ArrayFromStackItem(notifyEvent.Item)
if err != nil {
log.Warn("stack item is not an array type",
zap.String("error", err.Error()),
)
return
} else if len(arr) == 0 {
log.Warn("stack item array is empty")
return
}
// calculate event type from bytes
typEvent := TypeFromString(notifyEvent.Name)
log = log.With(
zap.String("event type", notifyEvent.Name),
)
// get the event parser
keyEvent := scriptHashWithType{}
keyEvent.SetScriptHash(notifyEvent.ScriptHash)
keyEvent.SetType(typEvent)
s.mtx.RLock()
parser, ok := s.parsers[keyEvent]
s.mtx.RUnlock()
if !ok {
log.Debug("event parser not set")
return
}
// parse the notification event
event, err := parser(arr)
if err != nil {
log.Warn("could not parse notification event",
zap.String("error", err.Error()),
)
return
}
// handler the event
s.mtx.RLock()
handlers := s.handlers[keyEvent]
s.mtx.RUnlock()
if len(handlers) == 0 {
log.Info("handlers for parsed notification event were not registered",
zap.Any("event", event),
)
return
}
for _, handler := range handlers {
handler(event)
}
}
// SetParser sets the parser of particular contract event.
//
// Ignores nil and already set parsers.
// Ignores the parser if listener is started.
func (s listener) SetParser(p ParserInfo) {
log := s.log.With(
2020-07-24 13:54:03 +00:00
zap.String("script hash LE", p.ScriptHash().StringLE()),
zap.Stringer("event type", p.getType()),
)
parser := p.parser()
if parser == nil {
log.Info("ignore nil event parser")
return
}
s.mtx.Lock()
defer s.mtx.Unlock()
// check if the listener was started
if s.started {
log.Warn("listener has been already started, ignore parser")
return
}
// add event parser
if _, ok := s.parsers[p.scriptHashWithType]; !ok {
s.parsers[p.scriptHashWithType] = p.parser()
}
log.Info("registered new event parser")
}
// RegisterHandler registers the handler for particular notification event of contract.
//
// Ignores nil handlers.
// Ignores handlers of event without parser.
func (s listener) RegisterHandler(p HandlerInfo) {
log := s.log.With(
2020-07-24 13:54:03 +00:00
zap.String("script hash LE", p.ScriptHash().StringLE()),
zap.Stringer("event type", p.GetType()),
)
2020-07-24 13:54:03 +00:00
handler := p.Handler()
if handler == nil {
log.Warn("ignore nil event handler")
return
}
// check if parser was set
s.mtx.RLock()
_, ok := s.parsers[p.scriptHashWithType]
s.mtx.RUnlock()
if !ok {
log.Warn("ignore handler of event w/o parser")
return
}
// add event handler
s.mtx.Lock()
s.handlers[p.scriptHashWithType] = append(
s.handlers[p.scriptHashWithType],
2020-07-24 13:54:03 +00:00
p.Handler(),
)
s.mtx.Unlock()
log.Info("registered new event handler")
}
2020-07-24 13:54:03 +00:00
// Stop closes subscription channel with remote neo node.
func (s listener) Stop() {
s.subscriber.Close()
}
func (s *listener) RegisterBlockHandler(handler BlockHandler) {
if handler == nil {
s.log.Warn("ignore nil block handler")
return
}
s.blockHandlers = append(s.blockHandlers, handler)
}
// NewListener create the notification event listener instance and returns Listener interface.
func NewListener(p ListenerParams) (Listener, error) {
switch {
case p.Logger == nil:
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilLogger)
case p.Subscriber == nil:
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilSubscriber)
}
return &listener{
mtx: new(sync.RWMutex),
once: new(sync.Once),
parsers: make(map[scriptHashWithType]Parser),
handlers: make(map[scriptHashWithType][]Handler),
log: p.Logger,
subscriber: p.Subscriber,
}, nil
}