From f0cbf2e99df3108a43805d8090c4dcb2d462997a Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 9 Aug 2021 18:27:04 +0300 Subject: [PATCH] [#770] pkg/morph/subscriber: Add subscription for notary Signed-off-by: Pavel Karpy --- pkg/morph/subscriber/subscriber.go | 49 +++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/pkg/morph/subscriber/subscriber.go b/pkg/morph/subscriber/subscriber.go index 56f61fbc..7b5576b1 100644 --- a/pkg/morph/subscriber/subscriber.go +++ b/pkg/morph/subscriber/subscriber.go @@ -20,8 +20,9 @@ type ( Subscriber interface { SubscribeForNotification(...util.Uint160) (<-chan *state.NotificationEvent, error) UnsubscribeForNotification() - Close() BlockNotifications() (<-chan *block.Block, error) + SubscribeForNotaryRequests(mainTXSigner util.Uint160) (<-chan *response.NotaryRequestEvent, error) + Close() } subscriber struct { @@ -29,10 +30,12 @@ type ( log *zap.Logger client *client.WSClient - notify chan *state.NotificationEvent - notifyIDs map[util.Uint160]string + notifyChan chan *state.NotificationEvent + notifyIDs map[util.Uint160]string blockChan chan *block.Block + + notaryChan chan *response.NotaryRequestEvent } // Params is a group of Subscriber constructor parameters. @@ -81,7 +84,7 @@ func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) (<-chan s.notifyIDs[contract] = id } - return s.notify, nil + return s.notifyChan, nil } func (s *subscriber) UnsubscribeForNotification() { @@ -112,6 +115,14 @@ func (s *subscriber) BlockNotifications() (<-chan *block.Block, error) { return s.blockChan, nil } +func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) (<-chan *response.NotaryRequestEvent, error) { + if _, err := s.client.SubscribeForNotaryRequests(nil, &mainTXSigner); err != nil { + return nil, fmt.Errorf("could not subscribe for notary request events: %w", err) + } + + return s.notaryChan, nil +} + func (s *subscriber) routeNotifications(ctx context.Context) { for { select { @@ -119,9 +130,10 @@ func (s *subscriber) routeNotifications(ctx context.Context) { return case notification, ok := <-s.client.Notifications: if !ok { - s.log.Warn("remote channel has been closed") - close(s.notify) + s.log.Warn("remote notification channel has been closed") + close(s.notifyChan) close(s.blockChan) + close(s.notaryChan) return } @@ -130,11 +142,11 @@ func (s *subscriber) routeNotifications(ctx context.Context) { case response.NotificationEventID: notification, ok := notification.Value.(*state.NotificationEvent) if !ok { - s.log.Error("can't cast notify event to the notify struct") + s.log.Error("can't cast notify event value to the notify struct") continue } - s.notify <- notification + s.notifyChan <- notification case response.BlockEventID: b, ok := notification.Value.(*block.Block) if !ok { @@ -143,6 +155,14 @@ func (s *subscriber) routeNotifications(ctx context.Context) { } s.blockChan <- b + case response.NotaryRequestEventID: + notaryRequest, ok := notification.Value.(*response.NotaryRequestEvent) + if !ok { + s.log.Error("can't cast notify event value to the notary request struct") + continue + } + + s.notaryChan <- notaryRequest default: s.log.Debug("unsupported notification from the chain", zap.Uint8("type", uint8(notification.Type)), @@ -173,12 +193,13 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { } sub := &subscriber{ - RWMutex: new(sync.RWMutex), - log: p.Log, - client: wsClient, - notify: make(chan *state.NotificationEvent), - notifyIDs: make(map[util.Uint160]string), - blockChan: make(chan *block.Block), + RWMutex: new(sync.RWMutex), + log: p.Log, + client: wsClient, + notifyChan: make(chan *state.NotificationEvent), + notifyIDs: make(map[util.Uint160]string), + blockChan: make(chan *block.Block), + notaryChan: make(chan *response.NotaryRequestEvent), } // Worker listens all events from neo-go websocket and puts them