diff --git a/pkg/morph/subscriber/subscriber.go b/pkg/morph/subscriber/subscriber.go index ed0bc966a..a607256f8 100644 --- a/pkg/morph/subscriber/subscriber.go +++ b/pkg/morph/subscriber/subscriber.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/rpc/client" "github.com/nspcc-dev/neo-go/pkg/rpc/response" @@ -19,6 +20,7 @@ type ( SubscribeForNotification(...util.Uint160) (<-chan *state.NotificationEvent, error) UnsubscribeForNotification() Close() + BlockNotifications() <-chan *block.Block } subscriber struct { @@ -28,6 +30,8 @@ type ( notify chan *state.NotificationEvent notifyIDs map[util.Uint160]string + + blockChan chan *block.Block } // Params is a group of Subscriber constructor parameters. @@ -99,6 +103,10 @@ func (s *subscriber) Close() { s.client.Close() } +func (s *subscriber) BlockNotifications() <-chan *block.Block { + return s.blockChan +} + func (s *subscriber) routeNotifications(ctx context.Context) { for { select { @@ -108,6 +116,7 @@ func (s *subscriber) routeNotifications(ctx context.Context) { if !ok { s.log.Warn("remote channel has been closed") close(s.notify) + close(s.blockChan) return } @@ -121,6 +130,14 @@ func (s *subscriber) routeNotifications(ctx context.Context) { } s.notify <- notification + case response.BlockEventID: + b, ok := notification.Value.(*block.Block) + if !ok { + s.log.Error("can't cast block event value to block") + continue + } + + s.blockChan <- b default: s.log.Debug("unsupported notification from the chain", zap.Uint8("type", uint8(notification.Type)), @@ -146,12 +163,17 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { return nil, err } + if _, err := wsClient.SubscribeForNewBlocks(nil); err != nil { + return nil, err + } + sub := &subscriber{ RWMutex: new(sync.RWMutex), log: p.Log, client: wsClient, notify: make(chan *state.NotificationEvent), notifyIDs: make(map[util.Uint160]string), + blockChan: make(chan *block.Block), } // Worker listens all events from neo-go websocket and puts them