[#770] pkg/morph: Rename all parsers and handlers structs/interfaces

Prepare all listening structures for notary events:
rename(add prefix/suffix 'notification') all
notification specific handlers/parsers.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2021-08-12 18:24:17 +03:00 committed by Pavel Karpy
parent c87bc70536
commit d252aa4a3e
16 changed files with 174 additions and 175 deletions

View file

@ -140,7 +140,7 @@ type cfgAccounting struct {
type cfgContainer struct { type cfgContainer struct {
scriptHash neogoutil.Uint160 scriptHash neogoutil.Uint160
parsers map[event.Type]event.Parser parsers map[event.Type]event.NotificationParser
subscribers map[event.Type][]event.Handler subscribers map[event.Type][]event.Handler
workerPool util.WorkerPool // pool for asynchronous handlers workerPool util.WorkerPool // pool for asynchronous handlers
} }
@ -149,7 +149,7 @@ type cfgNetmap struct {
scriptHash neogoutil.Uint160 scriptHash neogoutil.Uint160
wrapper *nmwrapper.Wrapper wrapper *nmwrapper.Wrapper
parsers map[event.Type]event.Parser parsers map[event.Type]event.NotificationParser
subscribers map[event.Type][]event.Handler subscribers map[event.Type][]event.Handler
workerPool util.WorkerPool // pool for asynchronous handlers workerPool util.WorkerPool // pool for asynchronous handlers

View file

@ -175,11 +175,11 @@ func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler)
) )
} }
func setContainerNotificationParser(c *cfg, sTyp string, p event.Parser) { func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
typ := event.TypeFromString(sTyp) typ := event.TypeFromString(sTyp)
if c.cfgContainer.parsers == nil { if c.cfgContainer.parsers == nil {
c.cfgContainer.parsers = make(map[event.Type]event.Parser, 1) c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser, 1)
} }
c.cfgContainer.parsers[typ] = p c.cfgContainer.parsers[typ] = p

View file

@ -139,10 +139,10 @@ func listenMorphNotifications(c *cfg) {
}) })
} }
func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parsers map[event.Type]event.Parser, func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parsers map[event.Type]event.NotificationParser,
subs map[event.Type][]event.Handler) { subs map[event.Type][]event.Handler) {
for typ, handlers := range subs { for typ, handlers := range subs {
pi := event.ParserInfo{} pi := event.NotificationParserInfo{}
pi.SetType(typ) pi.SetType(typ)
pi.SetScriptHash(scHash) pi.SetScriptHash(scHash)
@ -153,15 +153,15 @@ func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parse
pi.SetParser(p) pi.SetParser(p)
lis.SetParser(pi) lis.SetNotificationParser(pi)
for _, h := range handlers { for _, h := range handlers {
hi := event.HandlerInfo{} hi := event.NotificationHandlerInfo{}
hi.SetType(typ) hi.SetType(typ)
hi.SetScriptHash(scHash) hi.SetScriptHash(scHash)
hi.SetHandler(h) hi.SetHandler(h)
lis.RegisterHandler(hi) lis.RegisterNotificationHandler(hi)
} }
} }
} }

View file

@ -166,11 +166,11 @@ func addNetmapNotificationHandler(c *cfg, sTyp string, h event.Handler) {
c.cfgNetmap.subscribers[typ] = append(c.cfgNetmap.subscribers[typ], h) c.cfgNetmap.subscribers[typ] = append(c.cfgNetmap.subscribers[typ], h)
} }
func setNetmapNotificationParser(c *cfg, sTyp string, p event.Parser) { func setNetmapNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
typ := event.TypeFromString(sTyp) typ := event.TypeFromString(sTyp)
if c.cfgNetmap.parsers == nil { if c.cfgNetmap.parsers == nil {
c.cfgNetmap.parsers = make(map[event.Type]event.Parser, 1) c.cfgNetmap.parsers = make(map[event.Type]event.NotificationParser, 1)
} }
c.cfgNetmap.parsers[typ] = p c.cfgNetmap.parsers[typ] = p

View file

@ -8,21 +8,21 @@ type (
// ContractProcessor interface defines functions for binding event producers // ContractProcessor interface defines functions for binding event producers
// such as event.Listener and Timers with contract processor. // such as event.Listener and Timers with contract processor.
ContractProcessor interface { ContractProcessor interface {
ListenerParsers() []event.ParserInfo ListenerNotificationParsers() []event.NotificationParserInfo
ListenerHandlers() []event.HandlerInfo ListenerNotificationHandlers() []event.NotificationHandlerInfo
TimersHandlers() []event.HandlerInfo TimersHandlers() []event.NotificationHandlerInfo
} }
) )
func connectListenerWithProcessor(l event.Listener, p ContractProcessor) { func connectListenerWithProcessor(l event.Listener, p ContractProcessor) {
// register parsers // register parsers
for _, parser := range p.ListenerParsers() { for _, parser := range p.ListenerNotificationParsers() {
l.SetParser(parser) l.SetNotificationParser(parser)
} }
// register handlers // register handlers
for _, handler := range p.ListenerHandlers() { for _, handler := range p.ListenerNotificationHandlers() {
l.RegisterHandler(handler) l.RegisterNotificationHandler(handler)
} }
} }

View file

@ -82,17 +82,17 @@ func New(p *Params) (*Processor, error) {
}, nil }, nil
} }
// ListenerParsers for the 'event.Listener' event producer. // ListenerNotificationParsers for the 'event.Listener' event producer.
func (np *Processor) ListenerParsers() []event.ParserInfo { func (np *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
return nil return nil
} }
// ListenerHandlers for the 'event.Listener' event producer. // ListenerNotificationHandlers for the 'event.Listener' event producer.
func (np *Processor) ListenerHandlers() []event.HandlerInfo { func (np *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
return nil return nil
} }
// TimersHandlers for the 'Timers' event producer. // TimersHandlers for the 'Timers' event producer.
func (np *Processor) TimersHandlers() []event.HandlerInfo { func (np *Processor) TimersHandlers() []event.NotificationHandlerInfo {
return nil return nil
} }

View file

@ -116,18 +116,18 @@ func New(p *Params) (*Processor, error) {
}, nil }, nil
} }
// ListenerParsers for the 'event.Listener' event producer. // ListenerNotificationParsers for the 'event.Listener' event producer.
func (ap *Processor) ListenerParsers() []event.ParserInfo { func (ap *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
return nil return nil
} }
// ListenerHandlers for the 'event.Listener' event producer. // ListenerNotificationHandlers for the 'event.Listener' event producer.
func (ap *Processor) ListenerHandlers() []event.HandlerInfo { func (ap *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
return nil return nil
} }
// TimersHandlers for the 'Timers' event producer. // TimersHandlers for the 'Timers' event producer.
func (ap *Processor) TimersHandlers() []event.HandlerInfo { func (ap *Processor) TimersHandlers() []event.NotificationHandlerInfo {
return nil return nil
} }

View file

@ -76,12 +76,12 @@ func New(p *Params) (*Processor, error) {
}, nil }, nil
} }
// ListenerParsers for the 'event.Listener' event producer. // ListenerNotificationParsers for the 'event.Listener' event producer.
func (bp *Processor) ListenerParsers() []event.ParserInfo { func (bp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
var parsers []event.ParserInfo var parsers []event.NotificationParserInfo
// new lock event // new lock event
lock := event.ParserInfo{} lock := event.NotificationParserInfo{}
lock.SetType(lockNotification) lock.SetType(lockNotification)
lock.SetScriptHash(bp.balanceContract) lock.SetScriptHash(bp.balanceContract)
lock.SetParser(balanceEvent.ParseLock) lock.SetParser(balanceEvent.ParseLock)
@ -90,12 +90,12 @@ func (bp *Processor) ListenerParsers() []event.ParserInfo {
return parsers return parsers
} }
// ListenerHandlers for the 'event.Listener' event producer. // ListenerNotificationHandlers for the 'event.Listener' event producer.
func (bp *Processor) ListenerHandlers() []event.HandlerInfo { func (bp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
var handlers []event.HandlerInfo var handlers []event.NotificationHandlerInfo
// lock handler // lock handler
lock := event.HandlerInfo{} lock := event.NotificationHandlerInfo{}
lock.SetType(lockNotification) lock.SetType(lockNotification)
lock.SetScriptHash(bp.balanceContract) lock.SetScriptHash(bp.balanceContract)
lock.SetHandler(bp.handleLock) lock.SetHandler(bp.handleLock)
@ -105,6 +105,6 @@ func (bp *Processor) ListenerHandlers() []event.HandlerInfo {
} }
// TimersHandlers for the 'Timers' event producer. // TimersHandlers for the 'Timers' event producer.
func (bp *Processor) TimersHandlers() []event.HandlerInfo { func (bp *Processor) TimersHandlers() []event.NotificationHandlerInfo {
return nil return nil
} }

View file

@ -92,12 +92,12 @@ func New(p *Params) (*Processor, error) {
}, nil }, nil
} }
// ListenerParsers for the 'event.Listener' event producer. // ListenerNotificationParsers for the 'event.Listener' event producer.
func (cp *Processor) ListenerParsers() []event.ParserInfo { func (cp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
var ( var (
parsers = make([]event.ParserInfo, 0, 3) parsers = make([]event.NotificationParserInfo, 0, 3)
p event.ParserInfo p event.NotificationParserInfo
) )
p.SetScriptHash(cp.containerContract) p.SetScriptHash(cp.containerContract)
@ -120,12 +120,12 @@ func (cp *Processor) ListenerParsers() []event.ParserInfo {
return parsers return parsers
} }
// ListenerHandlers for the 'event.Listener' event producer. // ListenerNotificationHandlers for the 'event.Listener' event producer.
func (cp *Processor) ListenerHandlers() []event.HandlerInfo { func (cp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
var ( var (
handlers = make([]event.HandlerInfo, 0, 3) handlers = make([]event.NotificationHandlerInfo, 0, 3)
h event.HandlerInfo h event.NotificationHandlerInfo
) )
h.SetScriptHash(cp.containerContract) h.SetScriptHash(cp.containerContract)
@ -149,6 +149,6 @@ func (cp *Processor) ListenerHandlers() []event.HandlerInfo {
} }
// TimersHandlers for the 'Timers' event producer. // TimersHandlers for the 'Timers' event producer.
func (cp *Processor) TimersHandlers() []event.HandlerInfo { func (cp *Processor) TimersHandlers() []event.NotificationHandlerInfo {
return nil return nil
} }

View file

@ -128,25 +128,25 @@ func New(p *Params) (*Processor, error) {
}, nil }, nil
} }
// ListenerParsers for the 'event.Listener' event producer. // ListenerNotificationParsers for the 'event.Listener' event producer.
func (gp *Processor) ListenerParsers() []event.ParserInfo { func (gp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
var pi event.ParserInfo var pi event.NotificationParserInfo
pi.SetScriptHash(gp.designate) pi.SetScriptHash(gp.designate)
pi.SetType(event.TypeFromString(native.DesignationEventName)) pi.SetType(event.TypeFromString(native.DesignationEventName))
pi.SetParser(rolemanagement.ParseDesignate) pi.SetParser(rolemanagement.ParseDesignate)
return []event.ParserInfo{pi} return []event.NotificationParserInfo{pi}
} }
// ListenerHandlers for the 'event.Listener' event producer. // ListenerNotificationHandlers for the 'event.Listener' event producer.
func (gp *Processor) ListenerHandlers() []event.HandlerInfo { func (gp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
var hi event.HandlerInfo var hi event.NotificationHandlerInfo
hi.SetScriptHash(gp.designate) hi.SetScriptHash(gp.designate)
hi.SetType(event.TypeFromString(native.DesignationEventName)) hi.SetType(event.TypeFromString(native.DesignationEventName))
hi.SetHandler(gp.HandleAlphabetSync) hi.SetHandler(gp.HandleAlphabetSync)
return []event.HandlerInfo{hi} return []event.NotificationHandlerInfo{hi}
} }
// TimersHandlers for the 'Timers' event producer. // TimersHandlers for the 'Timers' event producer.
func (gp *Processor) TimersHandlers() []event.HandlerInfo { func (gp *Processor) TimersHandlers() []event.NotificationHandlerInfo {
return nil return nil
} }

View file

@ -129,12 +129,12 @@ func New(p *Params) (*Processor, error) {
}, nil }, nil
} }
// ListenerParsers for the 'event.Listener' event producer. // ListenerNotificationParsers for the 'event.Listener' event producer.
func (np *Processor) ListenerParsers() []event.ParserInfo { func (np *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
var ( var (
parsers = make([]event.ParserInfo, 0, 6) parsers = make([]event.NotificationParserInfo, 0, 6)
p event.ParserInfo p event.NotificationParserInfo
) )
p.SetScriptHash(np.neofsContract) p.SetScriptHash(np.neofsContract)
@ -172,12 +172,12 @@ func (np *Processor) ListenerParsers() []event.ParserInfo {
return parsers return parsers
} }
// ListenerHandlers for the 'event.Listener' event producer. // ListenerNotificationHandlers for the 'event.Listener' event producer.
func (np *Processor) ListenerHandlers() []event.HandlerInfo { func (np *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
var ( var (
handlers = make([]event.HandlerInfo, 0, 6) handlers = make([]event.NotificationHandlerInfo, 0, 6)
h event.HandlerInfo h event.NotificationHandlerInfo
) )
h.SetScriptHash(np.neofsContract) h.SetScriptHash(np.neofsContract)
@ -216,6 +216,6 @@ func (np *Processor) ListenerHandlers() []event.HandlerInfo {
} }
// TimersHandlers for the 'Timers' event producer. // TimersHandlers for the 'Timers' event producer.
func (np *Processor) TimersHandlers() []event.HandlerInfo { func (np *Processor) TimersHandlers() []event.NotificationHandlerInfo {
return nil return nil
} }

View file

@ -146,26 +146,26 @@ func New(p *Params) (*Processor, error) {
}, nil }, nil
} }
// ListenerParsers for the 'event.Listener' event producer. // ListenerNotificationParsers for the 'event.Listener' event producer.
func (np *Processor) ListenerParsers() []event.ParserInfo { func (np *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
var parsers []event.ParserInfo var parsers []event.NotificationParserInfo
// new epoch event // new epoch event
newEpoch := event.ParserInfo{} newEpoch := event.NotificationParserInfo{}
newEpoch.SetType(newEpochNotification) newEpoch.SetType(newEpochNotification)
newEpoch.SetScriptHash(np.netmapContract) newEpoch.SetScriptHash(np.netmapContract)
newEpoch.SetParser(netmapEvent.ParseNewEpoch) newEpoch.SetParser(netmapEvent.ParseNewEpoch)
parsers = append(parsers, newEpoch) parsers = append(parsers, newEpoch)
// new peer event // new peer event
addPeer := event.ParserInfo{} addPeer := event.NotificationParserInfo{}
addPeer.SetType(addPeerNotification) addPeer.SetType(addPeerNotification)
addPeer.SetScriptHash(np.netmapContract) addPeer.SetScriptHash(np.netmapContract)
addPeer.SetParser(netmapEvent.ParseAddPeer) addPeer.SetParser(netmapEvent.ParseAddPeer)
parsers = append(parsers, addPeer) parsers = append(parsers, addPeer)
// update peer event // update peer event
updatePeer := event.ParserInfo{} updatePeer := event.NotificationParserInfo{}
updatePeer.SetType(updatePeerStateNotification) updatePeer.SetType(updatePeerStateNotification)
updatePeer.SetScriptHash(np.netmapContract) updatePeer.SetScriptHash(np.netmapContract)
updatePeer.SetParser(netmapEvent.ParseUpdatePeer) updatePeer.SetParser(netmapEvent.ParseUpdatePeer)
@ -174,26 +174,26 @@ func (np *Processor) ListenerParsers() []event.ParserInfo {
return parsers return parsers
} }
// ListenerHandlers for the 'event.Listener' event producer. // ListenerNotificationHandlers for the 'event.Listener' event producer.
func (np *Processor) ListenerHandlers() []event.HandlerInfo { func (np *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
var handlers []event.HandlerInfo var handlers []event.NotificationHandlerInfo
// new epoch handler // new epoch handler
newEpoch := event.HandlerInfo{} newEpoch := event.NotificationHandlerInfo{}
newEpoch.SetType(newEpochNotification) newEpoch.SetType(newEpochNotification)
newEpoch.SetScriptHash(np.netmapContract) newEpoch.SetScriptHash(np.netmapContract)
newEpoch.SetHandler(np.handleNewEpoch) newEpoch.SetHandler(np.handleNewEpoch)
handlers = append(handlers, newEpoch) handlers = append(handlers, newEpoch)
// new peer handler // new peer handler
addPeer := event.HandlerInfo{} addPeer := event.NotificationHandlerInfo{}
addPeer.SetType(addPeerNotification) addPeer.SetType(addPeerNotification)
addPeer.SetScriptHash(np.netmapContract) addPeer.SetScriptHash(np.netmapContract)
addPeer.SetHandler(np.handleAddPeer) addPeer.SetHandler(np.handleAddPeer)
handlers = append(handlers, addPeer) handlers = append(handlers, addPeer)
// update peer handler // update peer handler
updatePeer := event.HandlerInfo{} updatePeer := event.NotificationHandlerInfo{}
updatePeer.SetType(updatePeerStateNotification) updatePeer.SetType(updatePeerStateNotification)
updatePeer.SetScriptHash(np.netmapContract) updatePeer.SetScriptHash(np.netmapContract)
updatePeer.SetHandler(np.handleUpdateState) updatePeer.SetHandler(np.handleUpdateState)
@ -203,6 +203,6 @@ func (np *Processor) ListenerHandlers() []event.HandlerInfo {
} }
// TimersHandlers for the 'Timers' event producer. // TimersHandlers for the 'Timers' event producer.
func (np *Processor) TimersHandlers() []event.HandlerInfo { func (np *Processor) TimersHandlers() []event.NotificationHandlerInfo {
return nil return nil
} }

View file

@ -88,12 +88,12 @@ func New(p *Params) (*Processor, error) {
}, nil }, nil
} }
// ListenerParsers for the 'event.Listener' event producer. // ListenerNotificationParsers for the 'event.Listener' event producer.
func (rp *Processor) ListenerParsers() []event.ParserInfo { func (rp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
var parsers []event.ParserInfo var parsers []event.NotificationParserInfo
// put reputation event // put reputation event
put := event.ParserInfo{} put := event.NotificationParserInfo{}
put.SetType(putReputationNotification) put.SetType(putReputationNotification)
put.SetScriptHash(rp.reputationContract) put.SetScriptHash(rp.reputationContract)
put.SetParser(reputationEvent.ParsePut) put.SetParser(reputationEvent.ParsePut)
@ -102,12 +102,12 @@ func (rp *Processor) ListenerParsers() []event.ParserInfo {
return parsers return parsers
} }
// ListenerHandlers for the 'event.Listener' event producer. // ListenerNotificationHandlers for the 'event.Listener' event producer.
func (rp *Processor) ListenerHandlers() []event.HandlerInfo { func (rp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
var handlers []event.HandlerInfo var handlers []event.NotificationHandlerInfo
// put reputation handler // put reputation handler
put := event.HandlerInfo{} put := event.NotificationHandlerInfo{}
put.SetType(putReputationNotification) put.SetType(putReputationNotification)
put.SetScriptHash(rp.reputationContract) put.SetScriptHash(rp.reputationContract)
put.SetHandler(rp.handlePutReputation) put.SetHandler(rp.handlePutReputation)
@ -117,6 +117,6 @@ func (rp *Processor) ListenerHandlers() []event.HandlerInfo {
} }
// TimersHandlers for the 'Timers' event producer. // TimersHandlers for the 'Timers' event producer.
func (rp *Processor) TimersHandlers() []event.HandlerInfo { func (rp *Processor) TimersHandlers() []event.NotificationHandlerInfo {
return nil return nil
} }

View file

@ -10,21 +10,21 @@ type Handler func(Event)
// BlockHandler is a chain block processing function. // BlockHandler is a chain block processing function.
type BlockHandler func(*block.Block) type BlockHandler func(*block.Block)
// HandlerInfo is a structure that groups // NotificationHandlerInfo is a structure that groups
// the parameters of the handler of particular // the parameters of the handler of particular
// contract event. // contract event.
type HandlerInfo struct { type NotificationHandlerInfo struct {
scriptHashWithType scriptHashWithType
h Handler h Handler
} }
// SetHandler is an event handler setter. // SetHandler is an event handler setter.
func (s *HandlerInfo) SetHandler(v Handler) { func (s *NotificationHandlerInfo) SetHandler(v Handler) {
s.h = v s.h = v
} }
// Handler returns an event handler. // Handler returns an event handler.
func (s HandlerInfo) Handler() Handler { func (s NotificationHandlerInfo) Handler() Handler {
return s.h return s.h
} }

View file

@ -29,22 +29,19 @@ type Listener interface {
// it could not be started. // it could not be started.
ListenWithError(context.Context, chan<- error) ListenWithError(context.Context, chan<- error)
// SetParser must set the parser of particular contract event. // SetNotificationParser must set the parser of particular contract event.
// //
// Parser of each event must be set once. All parsers must be set before Listen call. // Parser of each event must be set once. All parsers must be set before Listen call.
// //
// Must ignore nil parsers and all calls after listener has been started. // Must ignore nil parsers and all calls after listener has been started.
SetParser(ParserInfo) SetNotificationParser(NotificationParserInfo)
// RegisterHandler must register the event handler for particular notification event of contract. // RegisterNotificationHandler must register the event handler for particular notification event of contract.
// //
// The specified handler must be called after each capture and parsing of the event // The specified handler must be called after each capture and parsing of the event.
// //
// Must ignore nil handlers. // Must ignore nil handlers.
RegisterHandler(HandlerInfo) RegisterNotificationHandler(NotificationHandlerInfo)
// Stop must stop the event listener.
Stop()
// RegisterBlockHandler must register chain block handler. // RegisterBlockHandler must register chain block handler.
// //
@ -52,6 +49,9 @@ type Listener interface {
// //
// Must ignore nil handlers. // Must ignore nil handlers.
RegisterBlockHandler(BlockHandler) RegisterBlockHandler(BlockHandler)
// Stop must stop the event listener.
Stop()
} }
// ListenerParams is a group of parameters // ListenerParams is a group of parameters
@ -69,9 +69,8 @@ type listener struct {
started bool started bool
parsers map[scriptHashWithType]Parser notificationParsers map[scriptHashWithType]NotificationParser
notificationHandlers map[scriptHashWithType][]Handler
handlers map[scriptHashWithType][]Handler
log *zap.Logger log *zap.Logger
@ -93,10 +92,10 @@ var (
// Executes once, all subsequent calls do nothing. // Executes once, all subsequent calls do nothing.
// //
// Returns an error if listener was already started. // Returns an error if listener was already started.
func (s listener) Listen(ctx context.Context) { func (l listener) Listen(ctx context.Context) {
s.once.Do(func() { l.once.Do(func() {
if err := s.listen(ctx, nil); err != nil { if err := l.listen(ctx, nil); err != nil {
s.log.Error("could not start listen to events", l.log.Error("could not start listen to events",
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
} }
@ -109,10 +108,10 @@ func (s listener) Listen(ctx context.Context) {
// Executes once, all subsequent calls do nothing. // Executes once, all subsequent calls do nothing.
// //
// Returns an error if listener was already started. // Returns an error if listener was already started.
func (s listener) ListenWithError(ctx context.Context, intError chan<- error) { func (l listener) ListenWithError(ctx context.Context, intError chan<- error) {
s.once.Do(func() { l.once.Do(func() {
if err := s.listen(ctx, intError); err != nil { if err := l.listen(ctx, intError); err != nil {
s.log.Error("could not start listen to events", l.log.Error("could not start listen to events",
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
intError <- err intError <- err
@ -120,13 +119,13 @@ func (s listener) ListenWithError(ctx context.Context, intError chan<- error) {
}) })
} }
func (s listener) listen(ctx context.Context, intError chan<- error) error { func (l listener) listen(ctx context.Context, intError chan<- error) error {
// create the list of listening contract hashes // create the list of listening contract hashes
hashes := make([]util.Uint160, 0) hashes := make([]util.Uint160, 0)
// fill the list with the contracts with set event parsers. // fill the list with the contracts with set event parsers.
s.mtx.RLock() l.mtx.RLock()
for hashType := range s.parsers { for hashType := range l.notificationParsers {
scHash := hashType.ScriptHash() scHash := hashType.ScriptHash()
// prevent repetitions // prevent repetitions
@ -140,30 +139,30 @@ func (s listener) listen(ctx context.Context, intError chan<- error) error {
} }
// mark listener as started // mark listener as started
s.started = true l.started = true
s.mtx.RUnlock() l.mtx.RUnlock()
chEvent, err := s.subscriber.SubscribeForNotification(hashes...) chEvent, err := l.subscriber.SubscribeForNotification(hashes...)
if err != nil { if err != nil {
return err return err
} }
s.listenLoop(ctx, chEvent, intError) l.listenLoop(ctx, chEvent, intError)
return nil return nil
} }
func (s listener) listenLoop(ctx context.Context, chEvent <-chan *state.NotificationEvent, intErr chan<- error) { func (l listener) listenLoop(ctx context.Context, chEvent <-chan *state.NotificationEvent, intErr chan<- error) {
var blockChan <-chan *block.Block var blockChan <-chan *block.Block
if len(s.blockHandlers) > 0 { if len(l.blockHandlers) > 0 {
var err error var err error
if blockChan, err = s.subscriber.BlockNotifications(); err != nil { if blockChan, err = l.subscriber.BlockNotifications(); err != nil {
if intErr != nil { if intErr != nil {
intErr <- fmt.Errorf("could not open block notifications channel: %w", err) intErr <- fmt.Errorf("could not open block notifications channel: %w", err)
} else { } else {
s.log.Debug("could not open block notifications channel", l.log.Debug("could not open block notifications channel",
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
} }
@ -178,47 +177,47 @@ loop:
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
s.log.Info("stop event listener by context", l.log.Info("stop event listener by context",
zap.String("reason", ctx.Err().Error()), zap.String("reason", ctx.Err().Error()),
) )
break loop break loop
case notifyEvent, ok := <-chEvent: case notifyEvent, ok := <-chEvent:
if !ok { if !ok {
s.log.Warn("stop event listener by channel") l.log.Warn("stop event listener by channel")
if intErr != nil { if intErr != nil {
intErr <- errors.New("event subscriber connection has been terminated") intErr <- errors.New("event subscriber connection has been terminated")
} }
break loop break loop
} else if notifyEvent == nil { } else if notifyEvent == nil {
s.log.Warn("nil notification event was caught") l.log.Warn("nil notification event was caught")
continue loop continue loop
} }
s.parseAndHandle(notifyEvent) l.parseAndHandleNotification(notifyEvent)
case b, ok := <-blockChan: case b, ok := <-blockChan:
if !ok { if !ok {
s.log.Warn("stop event listener by block channel") l.log.Warn("stop event listener by block channel")
if intErr != nil { if intErr != nil {
intErr <- errors.New("new block notification channel is closed") intErr <- errors.New("new block notification channel is closed")
} }
break loop break loop
} else if b == nil { } else if b == nil {
s.log.Warn("nil block was caught") l.log.Warn("nil block was caught")
continue loop continue loop
} }
// TODO: consider asynchronous execution // TODO: consider asynchronous execution
for i := range s.blockHandlers { for i := range l.blockHandlers {
s.blockHandlers[i](b) l.blockHandlers[i](b)
} }
} }
} }
} }
func (s listener) parseAndHandle(notifyEvent *state.NotificationEvent) { func (l listener) parseAndHandleNotification(notifyEvent *state.NotificationEvent) {
log := s.log.With( log := l.log.With(
zap.String("script hash LE", notifyEvent.ScriptHash.StringLE()), zap.String("script hash LE", notifyEvent.ScriptHash.StringLE()),
) )
@ -247,9 +246,9 @@ func (s listener) parseAndHandle(notifyEvent *state.NotificationEvent) {
keyEvent.SetScriptHash(notifyEvent.ScriptHash) keyEvent.SetScriptHash(notifyEvent.ScriptHash)
keyEvent.SetType(typEvent) keyEvent.SetType(typEvent)
s.mtx.RLock() l.mtx.RLock()
parser, ok := s.parsers[keyEvent] parser, ok := l.notificationParsers[keyEvent]
s.mtx.RUnlock() l.mtx.RUnlock()
if !ok { if !ok {
log.Debug("event parser not set") log.Debug("event parser not set")
@ -268,12 +267,12 @@ func (s listener) parseAndHandle(notifyEvent *state.NotificationEvent) {
} }
// handler the event // handler the event
s.mtx.RLock() l.mtx.RLock()
handlers := s.handlers[keyEvent] handlers := l.notificationHandlers[keyEvent]
s.mtx.RUnlock() l.mtx.RUnlock()
if len(handlers) == 0 { if len(handlers) == 0 {
log.Info("handlers for parsed notification event were not registered", log.Info("notification handlers for parsed notification event were not registered",
zap.Any("event", event), zap.Any("event", event),
) )
@ -285,12 +284,12 @@ func (s listener) parseAndHandle(notifyEvent *state.NotificationEvent) {
} }
} }
// SetParser sets the parser of particular contract event. // SetNotificationParser sets the parser of particular contract event.
// //
// Ignores nil and already set parsers. // Ignores nil and already set parsers.
// Ignores the parser if listener is started. // Ignores the parser if listener is started.
func (s listener) SetParser(p ParserInfo) { func (l listener) SetNotificationParser(p NotificationParserInfo) {
log := s.log.With( log := l.log.With(
zap.String("script hash LE", p.ScriptHash().StringLE()), zap.String("script hash LE", p.ScriptHash().StringLE()),
zap.Stringer("event type", p.getType()), zap.Stringer("event type", p.getType()),
) )
@ -301,29 +300,29 @@ func (s listener) SetParser(p ParserInfo) {
return return
} }
s.mtx.Lock() l.mtx.Lock()
defer s.mtx.Unlock() defer l.mtx.Unlock()
// check if the listener was started // check if the listener was started
if s.started { if l.started {
log.Warn("listener has been already started, ignore parser") log.Warn("listener has been already started, ignore parser")
return return
} }
// add event parser // add event parser
if _, ok := s.parsers[p.scriptHashWithType]; !ok { if _, ok := l.notificationParsers[p.scriptHashWithType]; !ok {
s.parsers[p.scriptHashWithType] = p.parser() l.notificationParsers[p.scriptHashWithType] = p.parser()
} }
log.Info("registered new event parser") log.Info("registered new event parser")
} }
// RegisterHandler registers the handler for particular notification event of contract. // RegisterNotificationHandler registers the handler for particular notification event of contract.
// //
// Ignores nil handlers. // Ignores nil handlers.
// Ignores handlers of event without parser. // Ignores handlers of event without parser.
func (s listener) RegisterHandler(p HandlerInfo) { func (l listener) RegisterNotificationHandler(p NotificationHandlerInfo) {
log := s.log.With( log := l.log.With(
zap.String("script hash LE", p.ScriptHash().StringLE()), zap.String("script hash LE", p.ScriptHash().StringLE()),
zap.Stringer("event type", p.GetType()), zap.Stringer("event type", p.GetType()),
) )
@ -335,9 +334,9 @@ func (s listener) RegisterHandler(p HandlerInfo) {
} }
// check if parser was set // check if parser was set
s.mtx.RLock() l.mtx.RLock()
_, ok := s.parsers[p.scriptHashWithType] _, ok := l.notificationParsers[p.scriptHashWithType]
s.mtx.RUnlock() l.mtx.RUnlock()
if !ok { if !ok {
log.Warn("ignore handler of event w/o parser") log.Warn("ignore handler of event w/o parser")
@ -345,28 +344,28 @@ func (s listener) RegisterHandler(p HandlerInfo) {
} }
// add event handler // add event handler
s.mtx.Lock() l.mtx.Lock()
s.handlers[p.scriptHashWithType] = append( l.notificationHandlers[p.scriptHashWithType] = append(
s.handlers[p.scriptHashWithType], l.notificationHandlers[p.scriptHashWithType],
p.Handler(), p.Handler(),
) )
s.mtx.Unlock() l.mtx.Unlock()
log.Info("registered new event handler") log.Info("registered new event handler")
} }
// Stop closes subscription channel with remote neo node. // Stop closes subscription channel with remote neo node.
func (s listener) Stop() { func (l listener) Stop() {
s.subscriber.Close() l.subscriber.Close()
} }
func (s *listener) RegisterBlockHandler(handler BlockHandler) { func (l *listener) RegisterBlockHandler(handler BlockHandler) {
if handler == nil { if handler == nil {
s.log.Warn("ignore nil block handler") l.log.Warn("ignore nil block handler")
return return
} }
s.blockHandlers = append(s.blockHandlers, handler) l.blockHandlers = append(l.blockHandlers, handler)
} }
// NewListener create the notification event listener instance and returns Listener interface. // NewListener create the notification event listener instance and returns Listener interface.
@ -381,8 +380,8 @@ func NewListener(p ListenerParams) (Listener, error) {
return &listener{ return &listener{
mtx: new(sync.RWMutex), mtx: new(sync.RWMutex),
once: new(sync.Once), once: new(sync.Once),
parsers: make(map[scriptHashWithType]Parser), notificationParsers: make(map[scriptHashWithType]NotificationParser),
handlers: make(map[scriptHashWithType][]Handler), notificationHandlers: make(map[scriptHashWithType][]Handler),
log: p.Logger, log: p.Logger,
subscriber: p.Subscriber, subscriber: p.Subscriber,
}, nil }, nil

View file

@ -6,17 +6,17 @@ import (
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
) )
// Parser is a function that constructs Event // NotificationParser is a function that constructs Event
// from the StackItem list. // from the StackItem list.
type Parser func([]stackitem.Item) (Event, error) type NotificationParser func([]stackitem.Item) (Event, error)
// ParserInfo is a structure that groups // NotificationParserInfo is a structure that groups
// the parameters of particular contract // the parameters of particular contract
// notification event parser. // notification event parser.
type ParserInfo struct { type NotificationParserInfo struct {
scriptHashWithType scriptHashWithType
p Parser p NotificationParser
} }
type wrongPrmNumber struct { type wrongPrmNumber struct {
@ -36,19 +36,19 @@ func (s wrongPrmNumber) Error() string {
} }
// SetParser is an event parser setter. // SetParser is an event parser setter.
func (s *ParserInfo) SetParser(v Parser) { func (s *NotificationParserInfo) SetParser(v NotificationParser) {
s.p = v s.p = v
} }
func (s ParserInfo) parser() Parser { func (s NotificationParserInfo) parser() NotificationParser {
return s.p return s.p
} }
// SetType is an event type setter. // SetType is an event type setter.
func (s *ParserInfo) SetType(v Type) { func (s *NotificationParserInfo) SetType(v Type) {
s.typ = v s.typ = v
} }
func (s ParserInfo) getType() Type { func (s NotificationParserInfo) getType() Type {
return s.typ return s.typ
} }