forked from TrueCloudLab/frostfs-node
[#324] morph/listener: Register handlers of new chain blocks
Extend Listener with RegisterBlockHandler method. All block handlers are called on each block read from Subscriber.BlockNotifications channel. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
c65d1d9db9
commit
23c220ae28
3 changed files with 63 additions and 9 deletions
|
@ -1,8 +1,15 @@
|
||||||
package event
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
|
)
|
||||||
|
|
||||||
// Handler is an Event processing function.
|
// Handler is an Event processing function.
|
||||||
type Handler func(Event)
|
type Handler func(Event)
|
||||||
|
|
||||||
|
// BlockHandler is a chain block processing function.
|
||||||
|
type BlockHandler func(*block.Block)
|
||||||
|
|
||||||
// HandlerInfo is a structure that groups
|
// HandlerInfo is a structure that groups
|
||||||
// the parameters of the handler of particular
|
// the parameters of the handler of particular
|
||||||
// contract event.
|
// contract event.
|
||||||
|
|
|
@ -43,6 +43,13 @@ type Listener interface {
|
||||||
|
|
||||||
// Must stop the event listener.
|
// Must stop the event listener.
|
||||||
Stop()
|
Stop()
|
||||||
|
|
||||||
|
// Must register chain block handler.
|
||||||
|
//
|
||||||
|
// The specified handler must be called after each capture and parsing of the new block from chain.
|
||||||
|
//
|
||||||
|
// Must ignore nil handlers.
|
||||||
|
RegisterBlockHandler(BlockHandler)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListenerParams is a group of parameters
|
// ListenerParams is a group of parameters
|
||||||
|
@ -67,6 +74,8 @@ type listener struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
|
|
||||||
subscriber subscriber.Subscriber
|
subscriber subscriber.Subscriber
|
||||||
|
|
||||||
|
blockHandlers []BlockHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
const newListenerFailMsg = "could not instantiate Listener"
|
const newListenerFailMsg = "could not instantiate Listener"
|
||||||
|
@ -144,6 +153,13 @@ func (s listener) listen(ctx context.Context, intError chan<- error) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s listener) listenLoop(ctx context.Context, chEvent <-chan *state.NotificationEvent, intErr chan<- error) {
|
func (s listener) listenLoop(ctx context.Context, chEvent <-chan *state.NotificationEvent, intErr chan<- error) {
|
||||||
|
blockChan, err := s.subscriber.BlockNotifications()
|
||||||
|
if err != nil {
|
||||||
|
intErr <- errors.Wrap(err, "could not open block notifications channel")
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -166,6 +182,23 @@ loop:
|
||||||
}
|
}
|
||||||
|
|
||||||
s.parseAndHandle(notifyEvent)
|
s.parseAndHandle(notifyEvent)
|
||||||
|
case b, ok := <-blockChan:
|
||||||
|
if !ok {
|
||||||
|
s.log.Warn("stop event listener by block channel")
|
||||||
|
if intErr != nil {
|
||||||
|
intErr <- errors.New("new block notification channel is closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
break loop
|
||||||
|
} else if b == nil {
|
||||||
|
s.log.Warn("nil block was caught")
|
||||||
|
continue loop
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: consider asynchronous execution
|
||||||
|
for i := range s.blockHandlers {
|
||||||
|
s.blockHandlers[i](b)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -313,6 +346,15 @@ func (s listener) Stop() {
|
||||||
s.subscriber.Close()
|
s.subscriber.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *listener) RegisterBlockHandler(handler BlockHandler) {
|
||||||
|
if handler == nil {
|
||||||
|
s.log.Warn("ignore nil block handler")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.blockHandlers = append(s.blockHandlers, handler)
|
||||||
|
}
|
||||||
|
|
||||||
// NewListener create the notification event listener instance and returns Listener interface.
|
// NewListener create the notification event listener instance and returns Listener interface.
|
||||||
func NewListener(p ListenerParams) (Listener, error) {
|
func NewListener(p ListenerParams) (Listener, error) {
|
||||||
switch {
|
switch {
|
||||||
|
|
|
@ -2,7 +2,6 @@ package subscriber
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -11,6 +10,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpc/client"
|
"github.com/nspcc-dev/neo-go/pkg/rpc/client"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpc/response"
|
"github.com/nspcc-dev/neo-go/pkg/rpc/response"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -20,7 +20,7 @@ type (
|
||||||
SubscribeForNotification(...util.Uint160) (<-chan *state.NotificationEvent, error)
|
SubscribeForNotification(...util.Uint160) (<-chan *state.NotificationEvent, error)
|
||||||
UnsubscribeForNotification()
|
UnsubscribeForNotification()
|
||||||
Close()
|
Close()
|
||||||
BlockNotifications() <-chan *block.Block
|
BlockNotifications() (<-chan *block.Block, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
subscriber struct {
|
subscriber struct {
|
||||||
|
@ -103,8 +103,18 @@ func (s *subscriber) Close() {
|
||||||
s.client.Close()
|
s.client.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) BlockNotifications() <-chan *block.Block {
|
func (s *subscriber) BlockNotifications() (<-chan *block.Block, error) {
|
||||||
return s.blockChan
|
if err := s.client.Init(); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not init ws client")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := s.client.SubscribeForNewBlocks(nil); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not subscribe for new block events")
|
||||||
|
}
|
||||||
|
|
||||||
|
s.blockChan = make(chan *block.Block)
|
||||||
|
|
||||||
|
return s.blockChan, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) routeNotifications(ctx context.Context) {
|
func (s *subscriber) routeNotifications(ctx context.Context) {
|
||||||
|
@ -163,17 +173,12 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := wsClient.SubscribeForNewBlocks(nil); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
sub := &subscriber{
|
sub := &subscriber{
|
||||||
RWMutex: new(sync.RWMutex),
|
RWMutex: new(sync.RWMutex),
|
||||||
log: p.Log,
|
log: p.Log,
|
||||||
client: wsClient,
|
client: wsClient,
|
||||||
notify: make(chan *state.NotificationEvent),
|
notify: make(chan *state.NotificationEvent),
|
||||||
notifyIDs: make(map[util.Uint160]string),
|
notifyIDs: make(map[util.Uint160]string),
|
||||||
blockChan: make(chan *block.Block),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Worker listens all events from neo-go websocket and puts them
|
// Worker listens all events from neo-go websocket and puts them
|
||||||
|
|
Loading…
Reference in a new issue