From c65d1d9db94df6b43e4be7e9d07de798aaec982b Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 20 Jan 2021 20:05:52 +0300 Subject: [PATCH] [#324] morph/subscriber: Subscribe on new chain blocks Call SubscribeForNewBlocks in Subscriber's constructor. Provide BlockNotifications interface method that returns block channel. Write new blocks to the channel on notification events with BlockEventID type. Signed-off-by: Leonard Lyubich --- pkg/morph/subscriber/subscriber.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/pkg/morph/subscriber/subscriber.go b/pkg/morph/subscriber/subscriber.go index ed0bc966a7..a607256f89 100644 --- a/pkg/morph/subscriber/subscriber.go +++ b/pkg/morph/subscriber/subscriber.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/rpc/client" "github.com/nspcc-dev/neo-go/pkg/rpc/response" @@ -19,6 +20,7 @@ type ( SubscribeForNotification(...util.Uint160) (<-chan *state.NotificationEvent, error) UnsubscribeForNotification() Close() + BlockNotifications() <-chan *block.Block } subscriber struct { @@ -28,6 +30,8 @@ type ( notify chan *state.NotificationEvent notifyIDs map[util.Uint160]string + + blockChan chan *block.Block } // Params is a group of Subscriber constructor parameters. @@ -99,6 +103,10 @@ func (s *subscriber) Close() { s.client.Close() } +func (s *subscriber) BlockNotifications() <-chan *block.Block { + return s.blockChan +} + func (s *subscriber) routeNotifications(ctx context.Context) { for { select { @@ -108,6 +116,7 @@ func (s *subscriber) routeNotifications(ctx context.Context) { if !ok { s.log.Warn("remote channel has been closed") close(s.notify) + close(s.blockChan) return } @@ -121,6 +130,14 @@ func (s *subscriber) routeNotifications(ctx context.Context) { } s.notify <- notification + case response.BlockEventID: + b, ok := notification.Value.(*block.Block) + if !ok { + s.log.Error("can't cast block event value to block") + continue + } + + s.blockChan <- b default: s.log.Debug("unsupported notification from the chain", zap.Uint8("type", uint8(notification.Type)), @@ -146,12 +163,17 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { return nil, err } + if _, err := wsClient.SubscribeForNewBlocks(nil); err != nil { + return nil, err + } + sub := &subscriber{ RWMutex: new(sync.RWMutex), log: p.Log, client: wsClient, notify: make(chan *state.NotificationEvent), notifyIDs: make(map[util.Uint160]string), + blockChan: make(chan *block.Block), } // Worker listens all events from neo-go websocket and puts them