From d252aa4a3ea63a1123c8d246acd23ac0905e2bc7 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 12 Aug 2021 18:24:17 +0300 Subject: [PATCH] [#770] pkg/morph: Rename all parsers and handlers structs/interfaces Prepare all listening structures for notary events: rename(add prefix/suffix 'notification') all notification specific handlers/parsers. Signed-off-by: Pavel Karpy --- cmd/neofs-node/config.go | 4 +- cmd/neofs-node/container.go | 4 +- cmd/neofs-node/morph.go | 10 +- cmd/neofs-node/netmap.go | 4 +- pkg/innerring/bindings.go | 14 +- .../processors/alphabet/processor.go | 10 +- pkg/innerring/processors/audit/processor.go | 10 +- pkg/innerring/processors/balance/processor.go | 18 +-- .../processors/container/processor.go | 18 +-- .../processors/governance/processor.go | 18 +-- pkg/innerring/processors/neofs/processor.go | 18 +-- pkg/innerring/processors/netmap/processor.go | 26 +-- .../processors/reputation/processor.go | 18 +-- pkg/morph/event/{handler.go => handlers.go} | 8 +- pkg/morph/event/listener.go | 151 +++++++++--------- pkg/morph/event/{parser.go => parsers.go} | 18 +-- 16 files changed, 174 insertions(+), 175 deletions(-) rename pkg/morph/event/{handler.go => handlers.go} (68%) rename pkg/morph/event/{parser.go => parsers.go} (60%) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index b0565003d..90cc3002c 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -140,7 +140,7 @@ type cfgAccounting struct { type cfgContainer struct { scriptHash neogoutil.Uint160 - parsers map[event.Type]event.Parser + parsers map[event.Type]event.NotificationParser subscribers map[event.Type][]event.Handler workerPool util.WorkerPool // pool for asynchronous handlers } @@ -149,7 +149,7 @@ type cfgNetmap struct { scriptHash neogoutil.Uint160 wrapper *nmwrapper.Wrapper - parsers map[event.Type]event.Parser + parsers map[event.Type]event.NotificationParser subscribers map[event.Type][]event.Handler workerPool util.WorkerPool // pool for asynchronous handlers diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 22e33ffac..8d9f84b79 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -175,11 +175,11 @@ func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler) ) } -func setContainerNotificationParser(c *cfg, sTyp string, p event.Parser) { +func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) { typ := event.TypeFromString(sTyp) if c.cfgContainer.parsers == nil { - c.cfgContainer.parsers = make(map[event.Type]event.Parser, 1) + c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser, 1) } c.cfgContainer.parsers[typ] = p diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index dd68d2cbb..ded1efa29 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -139,10 +139,10 @@ func listenMorphNotifications(c *cfg) { }) } -func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parsers map[event.Type]event.Parser, +func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parsers map[event.Type]event.NotificationParser, subs map[event.Type][]event.Handler) { for typ, handlers := range subs { - pi := event.ParserInfo{} + pi := event.NotificationParserInfo{} pi.SetType(typ) pi.SetScriptHash(scHash) @@ -153,15 +153,15 @@ func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parse pi.SetParser(p) - lis.SetParser(pi) + lis.SetNotificationParser(pi) for _, h := range handlers { - hi := event.HandlerInfo{} + hi := event.NotificationHandlerInfo{} hi.SetType(typ) hi.SetScriptHash(scHash) hi.SetHandler(h) - lis.RegisterHandler(hi) + lis.RegisterNotificationHandler(hi) } } } diff --git a/cmd/neofs-node/netmap.go b/cmd/neofs-node/netmap.go index a23da6a1c..66c4a3101 100644 --- a/cmd/neofs-node/netmap.go +++ b/cmd/neofs-node/netmap.go @@ -166,11 +166,11 @@ func addNetmapNotificationHandler(c *cfg, sTyp string, h event.Handler) { c.cfgNetmap.subscribers[typ] = append(c.cfgNetmap.subscribers[typ], h) } -func setNetmapNotificationParser(c *cfg, sTyp string, p event.Parser) { +func setNetmapNotificationParser(c *cfg, sTyp string, p event.NotificationParser) { typ := event.TypeFromString(sTyp) if c.cfgNetmap.parsers == nil { - c.cfgNetmap.parsers = make(map[event.Type]event.Parser, 1) + c.cfgNetmap.parsers = make(map[event.Type]event.NotificationParser, 1) } c.cfgNetmap.parsers[typ] = p diff --git a/pkg/innerring/bindings.go b/pkg/innerring/bindings.go index 0e5d52226..dccbd0bf0 100644 --- a/pkg/innerring/bindings.go +++ b/pkg/innerring/bindings.go @@ -8,21 +8,21 @@ type ( // ContractProcessor interface defines functions for binding event producers // such as event.Listener and Timers with contract processor. ContractProcessor interface { - ListenerParsers() []event.ParserInfo - ListenerHandlers() []event.HandlerInfo - TimersHandlers() []event.HandlerInfo + ListenerNotificationParsers() []event.NotificationParserInfo + ListenerNotificationHandlers() []event.NotificationHandlerInfo + TimersHandlers() []event.NotificationHandlerInfo } ) func connectListenerWithProcessor(l event.Listener, p ContractProcessor) { // register parsers - for _, parser := range p.ListenerParsers() { - l.SetParser(parser) + for _, parser := range p.ListenerNotificationParsers() { + l.SetNotificationParser(parser) } // register handlers - for _, handler := range p.ListenerHandlers() { - l.RegisterHandler(handler) + for _, handler := range p.ListenerNotificationHandlers() { + l.RegisterNotificationHandler(handler) } } diff --git a/pkg/innerring/processors/alphabet/processor.go b/pkg/innerring/processors/alphabet/processor.go index 823ceca23..e4f0212dd 100644 --- a/pkg/innerring/processors/alphabet/processor.go +++ b/pkg/innerring/processors/alphabet/processor.go @@ -82,17 +82,17 @@ func New(p *Params) (*Processor, error) { }, nil } -// ListenerParsers for the 'event.Listener' event producer. -func (np *Processor) ListenerParsers() []event.ParserInfo { +// ListenerNotificationParsers for the 'event.Listener' event producer. +func (np *Processor) ListenerNotificationParsers() []event.NotificationParserInfo { return nil } -// ListenerHandlers for the 'event.Listener' event producer. -func (np *Processor) ListenerHandlers() []event.HandlerInfo { +// ListenerNotificationHandlers for the 'event.Listener' event producer. +func (np *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo { return nil } // TimersHandlers for the 'Timers' event producer. -func (np *Processor) TimersHandlers() []event.HandlerInfo { +func (np *Processor) TimersHandlers() []event.NotificationHandlerInfo { return nil } diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index f10b78267..6bc5d1cdf 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -116,18 +116,18 @@ func New(p *Params) (*Processor, error) { }, nil } -// ListenerParsers for the 'event.Listener' event producer. -func (ap *Processor) ListenerParsers() []event.ParserInfo { +// ListenerNotificationParsers for the 'event.Listener' event producer. +func (ap *Processor) ListenerNotificationParsers() []event.NotificationParserInfo { return nil } -// ListenerHandlers for the 'event.Listener' event producer. -func (ap *Processor) ListenerHandlers() []event.HandlerInfo { +// ListenerNotificationHandlers for the 'event.Listener' event producer. +func (ap *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo { return nil } // TimersHandlers for the 'Timers' event producer. -func (ap *Processor) TimersHandlers() []event.HandlerInfo { +func (ap *Processor) TimersHandlers() []event.NotificationHandlerInfo { return nil } diff --git a/pkg/innerring/processors/balance/processor.go b/pkg/innerring/processors/balance/processor.go index 49712d00f..8574a18f9 100644 --- a/pkg/innerring/processors/balance/processor.go +++ b/pkg/innerring/processors/balance/processor.go @@ -76,12 +76,12 @@ func New(p *Params) (*Processor, error) { }, nil } -// ListenerParsers for the 'event.Listener' event producer. -func (bp *Processor) ListenerParsers() []event.ParserInfo { - var parsers []event.ParserInfo +// ListenerNotificationParsers for the 'event.Listener' event producer. +func (bp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo { + var parsers []event.NotificationParserInfo // new lock event - lock := event.ParserInfo{} + lock := event.NotificationParserInfo{} lock.SetType(lockNotification) lock.SetScriptHash(bp.balanceContract) lock.SetParser(balanceEvent.ParseLock) @@ -90,12 +90,12 @@ func (bp *Processor) ListenerParsers() []event.ParserInfo { return parsers } -// ListenerHandlers for the 'event.Listener' event producer. -func (bp *Processor) ListenerHandlers() []event.HandlerInfo { - var handlers []event.HandlerInfo +// ListenerNotificationHandlers for the 'event.Listener' event producer. +func (bp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo { + var handlers []event.NotificationHandlerInfo // lock handler - lock := event.HandlerInfo{} + lock := event.NotificationHandlerInfo{} lock.SetType(lockNotification) lock.SetScriptHash(bp.balanceContract) lock.SetHandler(bp.handleLock) @@ -105,6 +105,6 @@ func (bp *Processor) ListenerHandlers() []event.HandlerInfo { } // TimersHandlers for the 'Timers' event producer. -func (bp *Processor) TimersHandlers() []event.HandlerInfo { +func (bp *Processor) TimersHandlers() []event.NotificationHandlerInfo { return nil } diff --git a/pkg/innerring/processors/container/processor.go b/pkg/innerring/processors/container/processor.go index c0bbf5399..66be56cad 100644 --- a/pkg/innerring/processors/container/processor.go +++ b/pkg/innerring/processors/container/processor.go @@ -92,12 +92,12 @@ func New(p *Params) (*Processor, error) { }, nil } -// ListenerParsers for the 'event.Listener' event producer. -func (cp *Processor) ListenerParsers() []event.ParserInfo { +// ListenerNotificationParsers for the 'event.Listener' event producer. +func (cp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo { var ( - parsers = make([]event.ParserInfo, 0, 3) + parsers = make([]event.NotificationParserInfo, 0, 3) - p event.ParserInfo + p event.NotificationParserInfo ) p.SetScriptHash(cp.containerContract) @@ -120,12 +120,12 @@ func (cp *Processor) ListenerParsers() []event.ParserInfo { return parsers } -// ListenerHandlers for the 'event.Listener' event producer. -func (cp *Processor) ListenerHandlers() []event.HandlerInfo { +// ListenerNotificationHandlers for the 'event.Listener' event producer. +func (cp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo { var ( - handlers = make([]event.HandlerInfo, 0, 3) + handlers = make([]event.NotificationHandlerInfo, 0, 3) - h event.HandlerInfo + h event.NotificationHandlerInfo ) h.SetScriptHash(cp.containerContract) @@ -149,6 +149,6 @@ func (cp *Processor) ListenerHandlers() []event.HandlerInfo { } // TimersHandlers for the 'Timers' event producer. -func (cp *Processor) TimersHandlers() []event.HandlerInfo { +func (cp *Processor) TimersHandlers() []event.NotificationHandlerInfo { return nil } diff --git a/pkg/innerring/processors/governance/processor.go b/pkg/innerring/processors/governance/processor.go index 2402f7fee..bb66e4b8a 100644 --- a/pkg/innerring/processors/governance/processor.go +++ b/pkg/innerring/processors/governance/processor.go @@ -128,25 +128,25 @@ func New(p *Params) (*Processor, error) { }, nil } -// ListenerParsers for the 'event.Listener' event producer. -func (gp *Processor) ListenerParsers() []event.ParserInfo { - var pi event.ParserInfo +// ListenerNotificationParsers for the 'event.Listener' event producer. +func (gp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo { + var pi event.NotificationParserInfo pi.SetScriptHash(gp.designate) pi.SetType(event.TypeFromString(native.DesignationEventName)) pi.SetParser(rolemanagement.ParseDesignate) - return []event.ParserInfo{pi} + return []event.NotificationParserInfo{pi} } -// ListenerHandlers for the 'event.Listener' event producer. -func (gp *Processor) ListenerHandlers() []event.HandlerInfo { - var hi event.HandlerInfo +// ListenerNotificationHandlers for the 'event.Listener' event producer. +func (gp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo { + var hi event.NotificationHandlerInfo hi.SetScriptHash(gp.designate) hi.SetType(event.TypeFromString(native.DesignationEventName)) hi.SetHandler(gp.HandleAlphabetSync) - return []event.HandlerInfo{hi} + return []event.NotificationHandlerInfo{hi} } // TimersHandlers for the 'Timers' event producer. -func (gp *Processor) TimersHandlers() []event.HandlerInfo { +func (gp *Processor) TimersHandlers() []event.NotificationHandlerInfo { return nil } diff --git a/pkg/innerring/processors/neofs/processor.go b/pkg/innerring/processors/neofs/processor.go index 0b97e4f92..ac451a7e1 100644 --- a/pkg/innerring/processors/neofs/processor.go +++ b/pkg/innerring/processors/neofs/processor.go @@ -129,12 +129,12 @@ func New(p *Params) (*Processor, error) { }, nil } -// ListenerParsers for the 'event.Listener' event producer. -func (np *Processor) ListenerParsers() []event.ParserInfo { +// ListenerNotificationParsers for the 'event.Listener' event producer. +func (np *Processor) ListenerNotificationParsers() []event.NotificationParserInfo { var ( - parsers = make([]event.ParserInfo, 0, 6) + parsers = make([]event.NotificationParserInfo, 0, 6) - p event.ParserInfo + p event.NotificationParserInfo ) p.SetScriptHash(np.neofsContract) @@ -172,12 +172,12 @@ func (np *Processor) ListenerParsers() []event.ParserInfo { return parsers } -// ListenerHandlers for the 'event.Listener' event producer. -func (np *Processor) ListenerHandlers() []event.HandlerInfo { +// ListenerNotificationHandlers for the 'event.Listener' event producer. +func (np *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo { var ( - handlers = make([]event.HandlerInfo, 0, 6) + handlers = make([]event.NotificationHandlerInfo, 0, 6) - h event.HandlerInfo + h event.NotificationHandlerInfo ) h.SetScriptHash(np.neofsContract) @@ -216,6 +216,6 @@ func (np *Processor) ListenerHandlers() []event.HandlerInfo { } // TimersHandlers for the 'Timers' event producer. -func (np *Processor) TimersHandlers() []event.HandlerInfo { +func (np *Processor) TimersHandlers() []event.NotificationHandlerInfo { return nil } diff --git a/pkg/innerring/processors/netmap/processor.go b/pkg/innerring/processors/netmap/processor.go index 607415bfb..11a646035 100644 --- a/pkg/innerring/processors/netmap/processor.go +++ b/pkg/innerring/processors/netmap/processor.go @@ -146,26 +146,26 @@ func New(p *Params) (*Processor, error) { }, nil } -// ListenerParsers for the 'event.Listener' event producer. -func (np *Processor) ListenerParsers() []event.ParserInfo { - var parsers []event.ParserInfo +// ListenerNotificationParsers for the 'event.Listener' event producer. +func (np *Processor) ListenerNotificationParsers() []event.NotificationParserInfo { + var parsers []event.NotificationParserInfo // new epoch event - newEpoch := event.ParserInfo{} + newEpoch := event.NotificationParserInfo{} newEpoch.SetType(newEpochNotification) newEpoch.SetScriptHash(np.netmapContract) newEpoch.SetParser(netmapEvent.ParseNewEpoch) parsers = append(parsers, newEpoch) // new peer event - addPeer := event.ParserInfo{} + addPeer := event.NotificationParserInfo{} addPeer.SetType(addPeerNotification) addPeer.SetScriptHash(np.netmapContract) addPeer.SetParser(netmapEvent.ParseAddPeer) parsers = append(parsers, addPeer) // update peer event - updatePeer := event.ParserInfo{} + updatePeer := event.NotificationParserInfo{} updatePeer.SetType(updatePeerStateNotification) updatePeer.SetScriptHash(np.netmapContract) updatePeer.SetParser(netmapEvent.ParseUpdatePeer) @@ -174,26 +174,26 @@ func (np *Processor) ListenerParsers() []event.ParserInfo { return parsers } -// ListenerHandlers for the 'event.Listener' event producer. -func (np *Processor) ListenerHandlers() []event.HandlerInfo { - var handlers []event.HandlerInfo +// ListenerNotificationHandlers for the 'event.Listener' event producer. +func (np *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo { + var handlers []event.NotificationHandlerInfo // new epoch handler - newEpoch := event.HandlerInfo{} + newEpoch := event.NotificationHandlerInfo{} newEpoch.SetType(newEpochNotification) newEpoch.SetScriptHash(np.netmapContract) newEpoch.SetHandler(np.handleNewEpoch) handlers = append(handlers, newEpoch) // new peer handler - addPeer := event.HandlerInfo{} + addPeer := event.NotificationHandlerInfo{} addPeer.SetType(addPeerNotification) addPeer.SetScriptHash(np.netmapContract) addPeer.SetHandler(np.handleAddPeer) handlers = append(handlers, addPeer) // update peer handler - updatePeer := event.HandlerInfo{} + updatePeer := event.NotificationHandlerInfo{} updatePeer.SetType(updatePeerStateNotification) updatePeer.SetScriptHash(np.netmapContract) updatePeer.SetHandler(np.handleUpdateState) @@ -203,6 +203,6 @@ func (np *Processor) ListenerHandlers() []event.HandlerInfo { } // TimersHandlers for the 'Timers' event producer. -func (np *Processor) TimersHandlers() []event.HandlerInfo { +func (np *Processor) TimersHandlers() []event.NotificationHandlerInfo { return nil } diff --git a/pkg/innerring/processors/reputation/processor.go b/pkg/innerring/processors/reputation/processor.go index 94a7dff37..3ebdc905c 100644 --- a/pkg/innerring/processors/reputation/processor.go +++ b/pkg/innerring/processors/reputation/processor.go @@ -88,12 +88,12 @@ func New(p *Params) (*Processor, error) { }, nil } -// ListenerParsers for the 'event.Listener' event producer. -func (rp *Processor) ListenerParsers() []event.ParserInfo { - var parsers []event.ParserInfo +// ListenerNotificationParsers for the 'event.Listener' event producer. +func (rp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo { + var parsers []event.NotificationParserInfo // put reputation event - put := event.ParserInfo{} + put := event.NotificationParserInfo{} put.SetType(putReputationNotification) put.SetScriptHash(rp.reputationContract) put.SetParser(reputationEvent.ParsePut) @@ -102,12 +102,12 @@ func (rp *Processor) ListenerParsers() []event.ParserInfo { return parsers } -// ListenerHandlers for the 'event.Listener' event producer. -func (rp *Processor) ListenerHandlers() []event.HandlerInfo { - var handlers []event.HandlerInfo +// ListenerNotificationHandlers for the 'event.Listener' event producer. +func (rp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo { + var handlers []event.NotificationHandlerInfo // put reputation handler - put := event.HandlerInfo{} + put := event.NotificationHandlerInfo{} put.SetType(putReputationNotification) put.SetScriptHash(rp.reputationContract) put.SetHandler(rp.handlePutReputation) @@ -117,6 +117,6 @@ func (rp *Processor) ListenerHandlers() []event.HandlerInfo { } // TimersHandlers for the 'Timers' event producer. -func (rp *Processor) TimersHandlers() []event.HandlerInfo { +func (rp *Processor) TimersHandlers() []event.NotificationHandlerInfo { return nil } diff --git a/pkg/morph/event/handler.go b/pkg/morph/event/handlers.go similarity index 68% rename from pkg/morph/event/handler.go rename to pkg/morph/event/handlers.go index 65088f5a4..81ec1a63d 100644 --- a/pkg/morph/event/handler.go +++ b/pkg/morph/event/handlers.go @@ -10,21 +10,21 @@ type Handler func(Event) // BlockHandler is a chain block processing function. type BlockHandler func(*block.Block) -// HandlerInfo is a structure that groups +// NotificationHandlerInfo is a structure that groups // the parameters of the handler of particular // contract event. -type HandlerInfo struct { +type NotificationHandlerInfo struct { scriptHashWithType h Handler } // SetHandler is an event handler setter. -func (s *HandlerInfo) SetHandler(v Handler) { +func (s *NotificationHandlerInfo) SetHandler(v Handler) { s.h = v } // Handler returns an event handler. -func (s HandlerInfo) Handler() Handler { +func (s NotificationHandlerInfo) Handler() Handler { return s.h } diff --git a/pkg/morph/event/listener.go b/pkg/morph/event/listener.go index 32e9d945a..3d173b5cb 100644 --- a/pkg/morph/event/listener.go +++ b/pkg/morph/event/listener.go @@ -29,22 +29,19 @@ type Listener interface { // it could not be started. ListenWithError(context.Context, chan<- error) - // SetParser must set the parser of particular contract event. + // SetNotificationParser must set the parser of particular contract event. // // Parser of each event must be set once. All parsers must be set before Listen call. // // Must ignore nil parsers and all calls after listener has been started. - SetParser(ParserInfo) + SetNotificationParser(NotificationParserInfo) - // RegisterHandler must register the event handler for particular notification event of contract. + // RegisterNotificationHandler must register the event handler for particular notification event of contract. // - // The specified handler must be called after each capture and parsing of the event + // The specified handler must be called after each capture and parsing of the event. // // Must ignore nil handlers. - RegisterHandler(HandlerInfo) - - // Stop must stop the event listener. - Stop() + RegisterNotificationHandler(NotificationHandlerInfo) // RegisterBlockHandler must register chain block handler. // @@ -52,6 +49,9 @@ type Listener interface { // // Must ignore nil handlers. RegisterBlockHandler(BlockHandler) + + // Stop must stop the event listener. + Stop() } // ListenerParams is a group of parameters @@ -69,9 +69,8 @@ type listener struct { started bool - parsers map[scriptHashWithType]Parser - - handlers map[scriptHashWithType][]Handler + notificationParsers map[scriptHashWithType]NotificationParser + notificationHandlers map[scriptHashWithType][]Handler log *zap.Logger @@ -93,10 +92,10 @@ var ( // Executes once, all subsequent calls do nothing. // // Returns an error if listener was already started. -func (s listener) Listen(ctx context.Context) { - s.once.Do(func() { - if err := s.listen(ctx, nil); err != nil { - s.log.Error("could not start listen to events", +func (l listener) Listen(ctx context.Context) { + l.once.Do(func() { + if err := l.listen(ctx, nil); err != nil { + l.log.Error("could not start listen to events", zap.String("error", err.Error()), ) } @@ -109,10 +108,10 @@ func (s listener) Listen(ctx context.Context) { // Executes once, all subsequent calls do nothing. // // Returns an error if listener was already started. -func (s listener) ListenWithError(ctx context.Context, intError chan<- error) { - s.once.Do(func() { - if err := s.listen(ctx, intError); err != nil { - s.log.Error("could not start listen to events", +func (l listener) ListenWithError(ctx context.Context, intError chan<- error) { + l.once.Do(func() { + if err := l.listen(ctx, intError); err != nil { + l.log.Error("could not start listen to events", zap.String("error", err.Error()), ) intError <- err @@ -120,13 +119,13 @@ func (s listener) ListenWithError(ctx context.Context, intError chan<- error) { }) } -func (s listener) listen(ctx context.Context, intError chan<- error) error { +func (l listener) listen(ctx context.Context, intError chan<- error) error { // create the list of listening contract hashes hashes := make([]util.Uint160, 0) // fill the list with the contracts with set event parsers. - s.mtx.RLock() - for hashType := range s.parsers { + l.mtx.RLock() + for hashType := range l.notificationParsers { scHash := hashType.ScriptHash() // prevent repetitions @@ -140,30 +139,30 @@ func (s listener) listen(ctx context.Context, intError chan<- error) error { } // mark listener as started - s.started = true + l.started = true - s.mtx.RUnlock() + l.mtx.RUnlock() - chEvent, err := s.subscriber.SubscribeForNotification(hashes...) + chEvent, err := l.subscriber.SubscribeForNotification(hashes...) if err != nil { return err } - s.listenLoop(ctx, chEvent, intError) + l.listenLoop(ctx, chEvent, intError) return nil } -func (s listener) listenLoop(ctx context.Context, chEvent <-chan *state.NotificationEvent, intErr chan<- error) { +func (l listener) listenLoop(ctx context.Context, chEvent <-chan *state.NotificationEvent, intErr chan<- error) { var blockChan <-chan *block.Block - if len(s.blockHandlers) > 0 { + if len(l.blockHandlers) > 0 { var err error - if blockChan, err = s.subscriber.BlockNotifications(); err != nil { + if blockChan, err = l.subscriber.BlockNotifications(); err != nil { if intErr != nil { intErr <- fmt.Errorf("could not open block notifications channel: %w", err) } else { - s.log.Debug("could not open block notifications channel", + l.log.Debug("could not open block notifications channel", zap.String("error", err.Error()), ) } @@ -178,47 +177,47 @@ loop: for { select { case <-ctx.Done(): - s.log.Info("stop event listener by context", + l.log.Info("stop event listener by context", zap.String("reason", ctx.Err().Error()), ) break loop case notifyEvent, ok := <-chEvent: if !ok { - s.log.Warn("stop event listener by channel") + l.log.Warn("stop event listener by channel") if intErr != nil { intErr <- errors.New("event subscriber connection has been terminated") } break loop } else if notifyEvent == nil { - s.log.Warn("nil notification event was caught") + l.log.Warn("nil notification event was caught") continue loop } - s.parseAndHandle(notifyEvent) + l.parseAndHandleNotification(notifyEvent) case b, ok := <-blockChan: if !ok { - s.log.Warn("stop event listener by block channel") + l.log.Warn("stop event listener by block channel") if intErr != nil { intErr <- errors.New("new block notification channel is closed") } break loop } else if b == nil { - s.log.Warn("nil block was caught") + l.log.Warn("nil block was caught") continue loop } // TODO: consider asynchronous execution - for i := range s.blockHandlers { - s.blockHandlers[i](b) + for i := range l.blockHandlers { + l.blockHandlers[i](b) } } } } -func (s listener) parseAndHandle(notifyEvent *state.NotificationEvent) { - log := s.log.With( +func (l listener) parseAndHandleNotification(notifyEvent *state.NotificationEvent) { + log := l.log.With( zap.String("script hash LE", notifyEvent.ScriptHash.StringLE()), ) @@ -247,9 +246,9 @@ func (s listener) parseAndHandle(notifyEvent *state.NotificationEvent) { keyEvent.SetScriptHash(notifyEvent.ScriptHash) keyEvent.SetType(typEvent) - s.mtx.RLock() - parser, ok := s.parsers[keyEvent] - s.mtx.RUnlock() + l.mtx.RLock() + parser, ok := l.notificationParsers[keyEvent] + l.mtx.RUnlock() if !ok { log.Debug("event parser not set") @@ -268,12 +267,12 @@ func (s listener) parseAndHandle(notifyEvent *state.NotificationEvent) { } // handler the event - s.mtx.RLock() - handlers := s.handlers[keyEvent] - s.mtx.RUnlock() + l.mtx.RLock() + handlers := l.notificationHandlers[keyEvent] + l.mtx.RUnlock() if len(handlers) == 0 { - log.Info("handlers for parsed notification event were not registered", + log.Info("notification handlers for parsed notification event were not registered", zap.Any("event", event), ) @@ -285,12 +284,12 @@ func (s listener) parseAndHandle(notifyEvent *state.NotificationEvent) { } } -// SetParser sets the parser of particular contract event. +// SetNotificationParser sets the parser of particular contract event. // // Ignores nil and already set parsers. // Ignores the parser if listener is started. -func (s listener) SetParser(p ParserInfo) { - log := s.log.With( +func (l listener) SetNotificationParser(p NotificationParserInfo) { + log := l.log.With( zap.String("script hash LE", p.ScriptHash().StringLE()), zap.Stringer("event type", p.getType()), ) @@ -301,29 +300,29 @@ func (s listener) SetParser(p ParserInfo) { return } - s.mtx.Lock() - defer s.mtx.Unlock() + l.mtx.Lock() + defer l.mtx.Unlock() // check if the listener was started - if s.started { + if l.started { log.Warn("listener has been already started, ignore parser") return } // add event parser - if _, ok := s.parsers[p.scriptHashWithType]; !ok { - s.parsers[p.scriptHashWithType] = p.parser() + if _, ok := l.notificationParsers[p.scriptHashWithType]; !ok { + l.notificationParsers[p.scriptHashWithType] = p.parser() } log.Info("registered new event parser") } -// RegisterHandler registers the handler for particular notification event of contract. +// RegisterNotificationHandler registers the handler for particular notification event of contract. // // Ignores nil handlers. // Ignores handlers of event without parser. -func (s listener) RegisterHandler(p HandlerInfo) { - log := s.log.With( +func (l listener) RegisterNotificationHandler(p NotificationHandlerInfo) { + log := l.log.With( zap.String("script hash LE", p.ScriptHash().StringLE()), zap.Stringer("event type", p.GetType()), ) @@ -335,9 +334,9 @@ func (s listener) RegisterHandler(p HandlerInfo) { } // check if parser was set - s.mtx.RLock() - _, ok := s.parsers[p.scriptHashWithType] - s.mtx.RUnlock() + l.mtx.RLock() + _, ok := l.notificationParsers[p.scriptHashWithType] + l.mtx.RUnlock() if !ok { log.Warn("ignore handler of event w/o parser") @@ -345,28 +344,28 @@ func (s listener) RegisterHandler(p HandlerInfo) { } // add event handler - s.mtx.Lock() - s.handlers[p.scriptHashWithType] = append( - s.handlers[p.scriptHashWithType], + l.mtx.Lock() + l.notificationHandlers[p.scriptHashWithType] = append( + l.notificationHandlers[p.scriptHashWithType], p.Handler(), ) - s.mtx.Unlock() + l.mtx.Unlock() log.Info("registered new event handler") } // Stop closes subscription channel with remote neo node. -func (s listener) Stop() { - s.subscriber.Close() +func (l listener) Stop() { + l.subscriber.Close() } -func (s *listener) RegisterBlockHandler(handler BlockHandler) { +func (l *listener) RegisterBlockHandler(handler BlockHandler) { if handler == nil { - s.log.Warn("ignore nil block handler") + l.log.Warn("ignore nil block handler") return } - s.blockHandlers = append(s.blockHandlers, handler) + l.blockHandlers = append(l.blockHandlers, handler) } // NewListener create the notification event listener instance and returns Listener interface. @@ -379,11 +378,11 @@ func NewListener(p ListenerParams) (Listener, error) { } return &listener{ - mtx: new(sync.RWMutex), - once: new(sync.Once), - parsers: make(map[scriptHashWithType]Parser), - handlers: make(map[scriptHashWithType][]Handler), - log: p.Logger, - subscriber: p.Subscriber, + mtx: new(sync.RWMutex), + once: new(sync.Once), + notificationParsers: make(map[scriptHashWithType]NotificationParser), + notificationHandlers: make(map[scriptHashWithType][]Handler), + log: p.Logger, + subscriber: p.Subscriber, }, nil } diff --git a/pkg/morph/event/parser.go b/pkg/morph/event/parsers.go similarity index 60% rename from pkg/morph/event/parser.go rename to pkg/morph/event/parsers.go index 398496902..6d3f67f06 100644 --- a/pkg/morph/event/parser.go +++ b/pkg/morph/event/parsers.go @@ -6,17 +6,17 @@ import ( "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" ) -// Parser is a function that constructs Event +// NotificationParser is a function that constructs Event // from the StackItem list. -type Parser func([]stackitem.Item) (Event, error) +type NotificationParser func([]stackitem.Item) (Event, error) -// ParserInfo is a structure that groups +// NotificationParserInfo is a structure that groups // the parameters of particular contract // notification event parser. -type ParserInfo struct { +type NotificationParserInfo struct { scriptHashWithType - p Parser + p NotificationParser } type wrongPrmNumber struct { @@ -36,19 +36,19 @@ func (s wrongPrmNumber) Error() string { } // SetParser is an event parser setter. -func (s *ParserInfo) SetParser(v Parser) { +func (s *NotificationParserInfo) SetParser(v NotificationParser) { s.p = v } -func (s ParserInfo) parser() Parser { +func (s NotificationParserInfo) parser() NotificationParser { return s.p } // SetType is an event type setter. -func (s *ParserInfo) SetType(v Type) { +func (s *NotificationParserInfo) SetType(v Type) { s.typ = v } -func (s ParserInfo) getType() Type { +func (s NotificationParserInfo) getType() Type { return s.typ }