From 23c220ae28f232128cd0d857201d24e3e3be3e01 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 20 Jan 2021 20:08:06 +0300 Subject: [PATCH] [#324] morph/listener: Register handlers of new chain blocks Extend Listener with RegisterBlockHandler method. All block handlers are called on each block read from Subscriber.BlockNotifications channel. Signed-off-by: Leonard Lyubich --- pkg/morph/event/handler.go | 7 +++++ pkg/morph/event/listener.go | 42 ++++++++++++++++++++++++++++++ pkg/morph/subscriber/subscriber.go | 23 +++++++++------- 3 files changed, 63 insertions(+), 9 deletions(-) diff --git a/pkg/morph/event/handler.go b/pkg/morph/event/handler.go index 433880ff..65088f5a 100644 --- a/pkg/morph/event/handler.go +++ b/pkg/morph/event/handler.go @@ -1,8 +1,15 @@ package event +import ( + "github.com/nspcc-dev/neo-go/pkg/core/block" +) + // Handler is an Event processing function. type Handler func(Event) +// BlockHandler is a chain block processing function. +type BlockHandler func(*block.Block) + // HandlerInfo is a structure that groups // the parameters of the handler of particular // contract event. diff --git a/pkg/morph/event/listener.go b/pkg/morph/event/listener.go index 74ebb15d..2f6f8da6 100644 --- a/pkg/morph/event/listener.go +++ b/pkg/morph/event/listener.go @@ -43,6 +43,13 @@ type Listener interface { // Must stop the event listener. Stop() + + // Must register chain block handler. + // + // The specified handler must be called after each capture and parsing of the new block from chain. + // + // Must ignore nil handlers. + RegisterBlockHandler(BlockHandler) } // ListenerParams is a group of parameters @@ -67,6 +74,8 @@ type listener struct { log *zap.Logger subscriber subscriber.Subscriber + + blockHandlers []BlockHandler } const newListenerFailMsg = "could not instantiate Listener" @@ -144,6 +153,13 @@ func (s listener) listen(ctx context.Context, intError chan<- error) error { } func (s listener) listenLoop(ctx context.Context, chEvent <-chan *state.NotificationEvent, intErr chan<- error) { + blockChan, err := s.subscriber.BlockNotifications() + if err != nil { + intErr <- errors.Wrap(err, "could not open block notifications channel") + + return + } + loop: for { select { @@ -166,6 +182,23 @@ loop: } s.parseAndHandle(notifyEvent) + case b, ok := <-blockChan: + if !ok { + s.log.Warn("stop event listener by block channel") + if intErr != nil { + intErr <- errors.New("new block notification channel is closed") + } + + break loop + } else if b == nil { + s.log.Warn("nil block was caught") + continue loop + } + + // TODO: consider asynchronous execution + for i := range s.blockHandlers { + s.blockHandlers[i](b) + } } } } @@ -313,6 +346,15 @@ func (s listener) Stop() { s.subscriber.Close() } +func (s *listener) RegisterBlockHandler(handler BlockHandler) { + if handler == nil { + s.log.Warn("ignore nil block handler") + return + } + + s.blockHandlers = append(s.blockHandlers, handler) +} + // NewListener create the notification event listener instance and returns Listener interface. func NewListener(p ListenerParams) (Listener, error) { switch { diff --git a/pkg/morph/subscriber/subscriber.go b/pkg/morph/subscriber/subscriber.go index a607256f..6772e0fa 100644 --- a/pkg/morph/subscriber/subscriber.go +++ b/pkg/morph/subscriber/subscriber.go @@ -2,7 +2,6 @@ package subscriber import ( "context" - "errors" "sync" "time" @@ -11,6 +10,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/rpc/client" "github.com/nspcc-dev/neo-go/pkg/rpc/response" "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/pkg/errors" "go.uber.org/zap" ) @@ -20,7 +20,7 @@ type ( SubscribeForNotification(...util.Uint160) (<-chan *state.NotificationEvent, error) UnsubscribeForNotification() Close() - BlockNotifications() <-chan *block.Block + BlockNotifications() (<-chan *block.Block, error) } subscriber struct { @@ -103,8 +103,18 @@ func (s *subscriber) Close() { s.client.Close() } -func (s *subscriber) BlockNotifications() <-chan *block.Block { - return s.blockChan +func (s *subscriber) BlockNotifications() (<-chan *block.Block, error) { + if err := s.client.Init(); err != nil { + return nil, errors.Wrap(err, "could not init ws client") + } + + if _, err := s.client.SubscribeForNewBlocks(nil); err != nil { + return nil, errors.Wrap(err, "could not subscribe for new block events") + } + + s.blockChan = make(chan *block.Block) + + return s.blockChan, nil } func (s *subscriber) routeNotifications(ctx context.Context) { @@ -163,17 +173,12 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { return nil, err } - if _, err := wsClient.SubscribeForNewBlocks(nil); err != nil { - return nil, err - } - sub := &subscriber{ RWMutex: new(sync.RWMutex), log: p.Log, client: wsClient, notify: make(chan *state.NotificationEvent), notifyIDs: make(map[util.Uint160]string), - blockChan: make(chan *block.Block), } // Worker listens all events from neo-go websocket and puts them