From 533e9f8b750288fc0bf7f7a9cdfe9ef68dfcb233 Mon Sep 17 00:00:00 2001
From: Pavel Karpy
Date: Wed, 22 Feb 2023 16:04:58 +0300
Subject: [PATCH] [#59] morph: Adopt updated `neo-go` client API for subs
It does not use deprecated methods anymore but also adds more code that
removes. Future refactor that will affect more components will optimize
usage of the updated API.
Signed-off-by: Pavel Karpy
---
pkg/morph/client/client.go | 7 +-
pkg/morph/client/constructor.go | 30 ++++---
pkg/morph/client/multi.go | 129 ++++++++++++++++++------------
pkg/morph/client/notifications.go | 115 +++++++++++++++++++-------
4 files changed, 187 insertions(+), 94 deletions(-)
diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go
index 51a030e6..c840a34c 100644
--- a/pkg/morph/client/client.go
+++ b/pkg/morph/client/client.go
@@ -69,17 +69,12 @@ type Client struct {
// on every normal call.
switchLock *sync.RWMutex
- // channel for ws notifications
notifications chan rpcclient.Notification
+ subsInfo // protected with switchLock
// channel for internal stop
closeChan chan struct{}
- // cached subscription information
- subscribedEvents map[util.Uint160]string
- subscribedNotaryEvents map[util.Uint160]string
- subscribedToNewBlocks bool
-
// indicates that Client is not able to
// establish connection to any of the
// provided RPC endpoints
diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go
index efb3d0bf..e4569ad0 100644
--- a/pkg/morph/client/constructor.go
+++ b/pkg/morph/client/constructor.go
@@ -9,8 +9,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2"
+ "github.com/nspcc-dev/neo-go/pkg/core/block"
+ "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
+ "github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/util"
@@ -102,17 +105,22 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) {
}
cli := &Client{
- cache: newClientCache(),
- logger: cfg.logger,
- acc: acc,
- accAddr: accAddr,
- signer: cfg.signer,
- cfg: *cfg,
- switchLock: &sync.RWMutex{},
- notifications: make(chan rpcclient.Notification),
- subscribedEvents: make(map[util.Uint160]string),
- subscribedNotaryEvents: make(map[util.Uint160]string),
- closeChan: make(chan struct{}),
+ cache: newClientCache(),
+ logger: cfg.logger,
+ acc: acc,
+ accAddr: accAddr,
+ signer: cfg.signer,
+ cfg: *cfg,
+ switchLock: &sync.RWMutex{},
+ notifications: make(chan rpcclient.Notification),
+ subsInfo: subsInfo{
+ blockRcv: make(chan *block.Block),
+ notificationRcv: make(chan *state.ContainedNotificationEvent),
+ notaryReqRcv: make(chan *result.NotaryRequestEvent),
+ subscribedEvents: make(map[util.Uint160]string),
+ subscribedNotaryEvents: make(map[util.Uint160]string),
+ },
+ closeChan: make(chan struct{}),
}
cli.endpoints.init(cfg.endpoints)
diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go
index e0eecd92..54af56b2 100644
--- a/pkg/morph/client/multi.go
+++ b/pkg/morph/client/multi.go
@@ -4,6 +4,11 @@ import (
"sort"
"time"
+ "github.com/nspcc-dev/neo-go/pkg/core/block"
+ "github.com/nspcc-dev/neo-go/pkg/core/state"
+ "github.com/nspcc-dev/neo-go/pkg/neorpc"
+ "github.com/nspcc-dev/neo-go/pkg/neorpc/result"
+ "github.com/nspcc-dev/neo-go/pkg/rpcclient"
"go.uber.org/zap"
)
@@ -51,7 +56,8 @@ func (c *Client) switchRPC() bool {
c.logger.Info("connection to the new RPC node has been established",
zap.String("endpoint", newEndpoint))
- if !c.restoreSubscriptions(cli, newEndpoint) {
+ subs, ok := c.restoreSubscriptions(cli, newEndpoint, false)
+ if !ok {
// new WS client does not allow
// restoring subscription, client
// could not work correctly =>
@@ -63,6 +69,7 @@ func (c *Client) switchRPC() bool {
c.client = cli
c.setActor(act)
+ c.subsInfo = subs
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
@@ -77,9 +84,14 @@ func (c *Client) switchRPC() bool {
}
func (c *Client) notificationLoop() {
+ var e any
+ var ok bool
+
for {
c.switchLock.RLock()
- nChan := c.client.Notifications
+ bChan := c.blockRcv
+ nChan := c.notificationRcv
+ nrChan := c.notaryReqRcv
c.switchLock.RUnlock()
select {
@@ -93,57 +105,74 @@ func (c *Client) notificationLoop() {
c.close()
return
- case n, ok := <-nChan:
- // notification channel is used as a connection
- // state: if it is closed, the connection is
- // considered to be lost
- if !ok {
- if closeErr := c.client.GetError(); closeErr != nil {
- c.logger.Warn("switching to the next RPC node",
- zap.String("reason", closeErr.Error()),
- )
- } else {
- // neo-go client was closed by calling `Close`
- // method that happens only when the client has
- // switched to the more prioritized RPC
- continue
- }
+ case e, ok = <-bChan:
+ case e, ok = <-nChan:
+ case e, ok = <-nrChan:
+ }
- if !c.switchRPC() {
- c.logger.Error("could not establish connection to any RPC node")
+ if ok {
+ c.routeEvent(e)
+ continue
+ }
- // could not connect to all endpoints =>
- // switch client to inactive mode
- c.inactiveMode()
-
- return
- }
-
- // TODO(@carpawell): call here some callback retrieved in constructor
- // of the client to allow checking chain state since during switch
- // process some notification could be lost
-
- continue
- }
-
- select {
- case c.notifications <- n:
- continue
- case <-c.cfg.ctx.Done():
- _ = c.UnsubscribeAll()
- c.close()
-
- return
- case <-c.closeChan:
- _ = c.UnsubscribeAll()
- c.close()
-
- return
- }
+ if !c.reconnect() {
+ return
}
}
}
+func (c *Client) routeEvent(e any) {
+ typedNotification := rpcclient.Notification{Value: e}
+
+ switch e.(type) {
+ case *block.Block:
+ typedNotification.Type = neorpc.BlockEventID
+ case *state.ContainedNotificationEvent:
+ typedNotification.Type = neorpc.NotificationEventID
+ case *result.NotaryRequestEvent:
+ typedNotification.Type = neorpc.NotaryRequestEventID
+ }
+
+ select {
+ case c.notifications <- typedNotification:
+ case <-c.cfg.ctx.Done():
+ _ = c.UnsubscribeAll()
+ c.close()
+ case <-c.closeChan:
+ _ = c.UnsubscribeAll()
+ c.close()
+ }
+}
+
+func (c *Client) reconnect() bool {
+ if closeErr := c.client.GetError(); closeErr != nil {
+ c.logger.Warn("switching to the next RPC node",
+ zap.String("reason", closeErr.Error()),
+ )
+ } else {
+ // neo-go client was closed by calling `Close`
+ // method, that happens only when a client has
+ // switched to the more prioritized RPC
+ return true
+ }
+
+ if !c.switchRPC() {
+ c.logger.Error("could not establish connection to any RPC node")
+
+ // could not connect to all endpoints =>
+ // switch client to inactive mode
+ c.inactiveMode()
+
+ return false
+ }
+
+ // TODO(@carpawell): call here some callback retrieved in constructor
+ // of the client to allow checking chain state since during switch
+ // process some notification could be lost
+
+ return true
+}
+
func (c *Client) switchToMostPrioritized() {
t := time.NewTicker(c.cfg.switchInterval)
defer t.Stop()
@@ -156,11 +185,12 @@ mainLoop:
return
case <-t.C:
c.switchLock.RLock()
+
endpointsCopy := make([]Endpoint, len(c.endpoints.list))
copy(endpointsCopy, c.endpoints.list)
-
currPriority := c.endpoints.list[c.endpoints.curr].Priority
highestPriority := c.endpoints.list[0].Priority
+
c.switchLock.RUnlock()
if currPriority == highestPriority {
@@ -186,7 +216,7 @@ mainLoop:
continue
}
- if c.restoreSubscriptions(cli, tryE) {
+ if subs, ok := c.restoreSubscriptions(cli, tryE, true); ok {
c.switchLock.Lock()
// higher priority node could have been
@@ -201,6 +231,7 @@ mainLoop:
c.cache.invalidate()
c.client = cli
c.setActor(act)
+ c.subsInfo = subs
c.endpoints.curr = i
c.switchLock.Unlock()
diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go
index 2afeebb8..1d287527 100644
--- a/pkg/morph/client/notifications.go
+++ b/pkg/morph/client/notifications.go
@@ -1,6 +1,10 @@
package client
import (
+ "github.com/nspcc-dev/neo-go/pkg/core/block"
+ "github.com/nspcc-dev/neo-go/pkg/core/state"
+ "github.com/nspcc-dev/neo-go/pkg/neorpc"
+ "github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
@@ -36,7 +40,7 @@ func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error
return nil
}
- id, err := c.client.SubscribeForExecutionNotifications(&contract, nil)
+ id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv)
if err != nil {
return err
}
@@ -59,17 +63,17 @@ func (c *Client) SubscribeForNewBlocks() error {
return ErrConnectionLost
}
- if c.subscribedToNewBlocks {
+ if c.subscribedToBlocks {
// no need to subscribe one more time
return nil
}
- _, err := c.client.SubscribeForNewBlocks(nil)
+ _, err := c.client.ReceiveBlocks(nil, c.blockRcv)
if err != nil {
return err
}
- c.subscribedToNewBlocks = true
+ c.subscribedToBlocks = true
return nil
}
@@ -99,7 +103,7 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
return nil
}
- id, err := c.client.SubscribeForNotaryRequests(nil, &txSigner)
+ id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv)
if err != nil {
return err
}
@@ -187,7 +191,7 @@ func (c *Client) UnsubscribeAll() error {
// no need to unsubscribe if there are
// no active subscriptions
if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 &&
- !c.subscribedToNewBlocks {
+ !c.subscribedToBlocks {
return nil
}
@@ -198,14 +202,32 @@ func (c *Client) UnsubscribeAll() error {
c.subscribedEvents = make(map[util.Uint160]string)
c.subscribedNotaryEvents = make(map[util.Uint160]string)
- c.subscribedToNewBlocks = false
+ c.subscribedToBlocks = false
return nil
}
-// restoreSubscriptions restores subscriptions according to
-// cached information about them.
-func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) bool {
+// subsInfo includes channels for ws notifications;
+// cached subscription information.
+type subsInfo struct {
+ blockRcv chan *block.Block
+ notificationRcv chan *state.ContainedNotificationEvent
+ notaryReqRcv chan *result.NotaryRequestEvent
+
+ subscribedToBlocks bool
+ subscribedEvents map[util.Uint160]string
+ subscribedNotaryEvents map[util.Uint160]string
+}
+
+// restoreSubscriptions restores subscriptions according to cached
+// information about them.
+//
+// If it is NOT a background operation switchLock MUST be held.
+// Returns a pair: the second is a restoration status and the first
+// one contains subscription information applied to the passed cli
+// and receivers for the updated subscriptions.
+// Does not change Client instance.
+func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) {
var (
err error
id string
@@ -214,72 +236,109 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string)
stopCh := make(chan struct{})
defer close(stopCh)
+ blockRcv := make(chan *block.Block)
+ notificationRcv := make(chan *state.ContainedNotificationEvent)
+ notaryReqRcv := make(chan *result.NotaryRequestEvent)
+
// neo-go WS client says to _always_ read notifications
// from its channel. Subscribing to any notification
// while not reading them in another goroutine may
// lead to a dead-lock, thus that async side notification
// listening while restoring subscriptions
go func() {
+ var e any
+ var ok bool
+
for {
select {
case <-stopCh:
return
- case n, ok := <-cli.Notifications:
- if !ok {
- return
- }
-
- c.notifications <- n
+ case e, ok = <-blockRcv:
+ case e, ok = <-notificationRcv:
+ case e, ok = <-notaryReqRcv:
}
+
+ if !ok {
+ return
+ }
+
+ if background {
+ // background client (test) switch, no need to send
+ // any notification, just preventing dead-lock
+ continue
+ }
+
+ c.routeEvent(e)
}
}()
+ if background {
+ c.switchLock.RLock()
+ defer c.switchLock.RUnlock()
+ }
+
+ si.subscribedToBlocks = c.subscribedToBlocks
+ si.subscribedEvents = copySubsMap(c.subscribedEvents)
+ si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents)
+ si.blockRcv = blockRcv
+ si.notificationRcv = notificationRcv
+ si.notaryReqRcv = notaryReqRcv
+
// new block events restoration
- if c.subscribedToNewBlocks {
- _, err = cli.SubscribeForNewBlocks(nil)
+ if si.subscribedToBlocks {
+ _, err = cli.ReceiveBlocks(nil, blockRcv)
if err != nil {
c.logger.Error("could not restore block subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
- return false
+ return
}
}
// notification events restoration
- for contract := range c.subscribedEvents {
+ for contract := range si.subscribedEvents {
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
- id, err = cli.SubscribeForExecutionNotifications(&contract, nil)
+ id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv)
if err != nil {
c.logger.Error("could not restore notification subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
- return false
+ return
}
- c.subscribedEvents[contract] = id
+ si.subscribedEvents[contract] = id
}
// notary notification events restoration
if c.notary != nil {
- for signer := range c.subscribedNotaryEvents {
+ for signer := range si.subscribedNotaryEvents {
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
- id, err = cli.SubscribeForNotaryRequests(nil, &signer)
+ id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv)
if err != nil {
c.logger.Error("could not restore notary notification subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
- return false
+ return
}
- c.subscribedNotaryEvents[signer] = id
+ si.subscribedNotaryEvents[signer] = id
}
}
- return true
+ return si, true
+}
+
+func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string {
+ newM := make(map[util.Uint160]string, len(m))
+ for k, v := range m {
+ newM[k] = v
+ }
+
+ return newM
}