forked from TrueCloudLab/frostfs-node
[#59] morph: Adopt updated neo-go
client API for subs
It does not use deprecated methods anymore but also adds more code that removes. Future refactor that will affect more components will optimize usage of the updated API. Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
This commit is contained in:
parent
73bb590cb1
commit
8579ed4fff
4 changed files with 189 additions and 78 deletions
|
@ -10,10 +10,13 @@ import (
|
||||||
|
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
lru "github.com/hashicorp/golang-lru/v2"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
|
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
|
||||||
|
@ -69,17 +72,20 @@ type Client struct {
|
||||||
// on every normal call.
|
// on every normal call.
|
||||||
switchLock *sync.RWMutex
|
switchLock *sync.RWMutex
|
||||||
|
|
||||||
// channel for ws notifications
|
// channels for ws notifications; protected with switchLock
|
||||||
notifications chan rpcclient.Notification
|
notifications chan rpcclient.Notification
|
||||||
|
blockRcv chan *block.Block
|
||||||
// channel for internal stop
|
notificationRcv chan *state.ContainedNotificationEvent
|
||||||
closeChan chan struct{}
|
notaryReqRcv chan *result.NotaryRequestEvent
|
||||||
|
|
||||||
// cached subscription information
|
// cached subscription information
|
||||||
subscribedEvents map[util.Uint160]string
|
subscribedEvents map[util.Uint160]string
|
||||||
subscribedNotaryEvents map[util.Uint160]string
|
subscribedNotaryEvents map[util.Uint160]string
|
||||||
subscribedToNewBlocks bool
|
subscribedToNewBlocks bool
|
||||||
|
|
||||||
|
// channel for internal stop
|
||||||
|
closeChan chan struct{}
|
||||||
|
|
||||||
// indicates that Client is not able to
|
// indicates that Client is not able to
|
||||||
// establish connection to any of the
|
// establish connection to any of the
|
||||||
// provided RPC endpoints
|
// provided RPC endpoints
|
||||||
|
@ -525,3 +531,14 @@ func (c *Client) setActor(act *actor.Actor) {
|
||||||
c.gasToken = nep17.New(act, gas.Hash)
|
c.gasToken = nep17.New(act, gas.Hash)
|
||||||
c.rolemgmt = rolemgmt.New(act)
|
c.rolemgmt = rolemgmt.New(act)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updateSubs updates subscription information, must be
|
||||||
|
// protected with switchLock.
|
||||||
|
func (c *Client) updateSubs(si subsInfo) {
|
||||||
|
c.blockRcv = si.blockRcv
|
||||||
|
c.notificationRcv = si.notificationRcv
|
||||||
|
c.notaryReqRcv = si.notaryReqRcv
|
||||||
|
|
||||||
|
c.subscribedEvents = si.subscribedEvents
|
||||||
|
c.subscribedNotaryEvents = si.subscribedNotaryEvents
|
||||||
|
}
|
||||||
|
|
|
@ -9,8 +9,11 @@ import (
|
||||||
|
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
lru "github.com/hashicorp/golang-lru/v2"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
@ -109,6 +112,9 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) {
|
||||||
cfg: *cfg,
|
cfg: *cfg,
|
||||||
switchLock: &sync.RWMutex{},
|
switchLock: &sync.RWMutex{},
|
||||||
notifications: make(chan rpcclient.Notification),
|
notifications: make(chan rpcclient.Notification),
|
||||||
|
blockRcv: make(chan *block.Block),
|
||||||
|
notificationRcv: make(chan *state.ContainedNotificationEvent),
|
||||||
|
notaryReqRcv: make(chan *result.NotaryRequestEvent),
|
||||||
subscribedEvents: make(map[util.Uint160]string),
|
subscribedEvents: make(map[util.Uint160]string),
|
||||||
subscribedNotaryEvents: make(map[util.Uint160]string),
|
subscribedNotaryEvents: make(map[util.Uint160]string),
|
||||||
closeChan: make(chan struct{}),
|
closeChan: make(chan struct{}),
|
||||||
|
|
|
@ -4,6 +4,11 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -51,7 +56,8 @@ func (c *Client) switchRPC() bool {
|
||||||
c.logger.Info("connection to the new RPC node has been established",
|
c.logger.Info("connection to the new RPC node has been established",
|
||||||
zap.String("endpoint", newEndpoint))
|
zap.String("endpoint", newEndpoint))
|
||||||
|
|
||||||
if !c.restoreSubscriptions(cli, newEndpoint) {
|
subs, ok := c.restoreSubscriptions(cli, newEndpoint, false)
|
||||||
|
if !ok {
|
||||||
// new WS client does not allow
|
// new WS client does not allow
|
||||||
// restoring subscription, client
|
// restoring subscription, client
|
||||||
// could not work correctly =>
|
// could not work correctly =>
|
||||||
|
@ -63,6 +69,7 @@ func (c *Client) switchRPC() bool {
|
||||||
|
|
||||||
c.client = cli
|
c.client = cli
|
||||||
c.setActor(act)
|
c.setActor(act)
|
||||||
|
c.updateSubs(subs)
|
||||||
|
|
||||||
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
|
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
|
||||||
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
|
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
|
||||||
|
@ -77,9 +84,14 @@ func (c *Client) switchRPC() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) notificationLoop() {
|
func (c *Client) notificationLoop() {
|
||||||
|
var e any
|
||||||
|
var ok bool
|
||||||
|
|
||||||
for {
|
for {
|
||||||
c.switchLock.RLock()
|
c.switchLock.RLock()
|
||||||
nChan := c.client.Notifications
|
bChan := c.blockRcv
|
||||||
|
nChan := c.notificationRcv
|
||||||
|
nrChan := c.notaryReqRcv
|
||||||
c.switchLock.RUnlock()
|
c.switchLock.RUnlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
@ -93,57 +105,74 @@ func (c *Client) notificationLoop() {
|
||||||
c.close()
|
c.close()
|
||||||
|
|
||||||
return
|
return
|
||||||
case n, ok := <-nChan:
|
case e, ok = <-bChan:
|
||||||
// notification channel is used as a connection
|
case e, ok = <-nChan:
|
||||||
// state: if it is closed, the connection is
|
case e, ok = <-nrChan:
|
||||||
// considered to be lost
|
}
|
||||||
if !ok {
|
|
||||||
if closeErr := c.client.GetError(); closeErr != nil {
|
|
||||||
c.logger.Warn("switching to the next RPC node",
|
|
||||||
zap.String("reason", closeErr.Error()),
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
// neo-go client was closed by calling `Close`
|
|
||||||
// method that happens only when the client has
|
|
||||||
// switched to the more prioritized RPC
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if !c.switchRPC() {
|
if ok {
|
||||||
c.logger.Error("could not establish connection to any RPC node")
|
c.routeEvent(e)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// could not connect to all endpoints =>
|
if !c.reconnect() {
|
||||||
// switch client to inactive mode
|
return
|
||||||
c.inactiveMode()
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(@carpawell): call here some callback retrieved in constructor
|
|
||||||
// of the client to allow checking chain state since during switch
|
|
||||||
// process some notification could be lost
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case c.notifications <- n:
|
|
||||||
continue
|
|
||||||
case <-c.cfg.ctx.Done():
|
|
||||||
_ = c.UnsubscribeAll()
|
|
||||||
c.close()
|
|
||||||
|
|
||||||
return
|
|
||||||
case <-c.closeChan:
|
|
||||||
_ = c.UnsubscribeAll()
|
|
||||||
c.close()
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) routeEvent(e any) {
|
||||||
|
typedNotification := rpcclient.Notification{Value: e}
|
||||||
|
|
||||||
|
switch e.(type) {
|
||||||
|
case *block.Block:
|
||||||
|
typedNotification.Type = neorpc.BlockEventID
|
||||||
|
case *state.ContainedNotificationEvent:
|
||||||
|
typedNotification.Type = neorpc.NotificationEventID
|
||||||
|
case *result.NotaryRequestEvent:
|
||||||
|
typedNotification.Type = neorpc.NotaryRequestEventID
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c.notifications <- typedNotification:
|
||||||
|
case <-c.cfg.ctx.Done():
|
||||||
|
_ = c.UnsubscribeAll()
|
||||||
|
c.close()
|
||||||
|
case <-c.closeChan:
|
||||||
|
_ = c.UnsubscribeAll()
|
||||||
|
c.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) reconnect() bool {
|
||||||
|
if closeErr := c.client.GetError(); closeErr != nil {
|
||||||
|
c.logger.Warn("switching to the next RPC node",
|
||||||
|
zap.String("reason", closeErr.Error()),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// neo-go client was closed by calling `Close`
|
||||||
|
// method, that happens only when a client has
|
||||||
|
// switched to the more prioritized RPC
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if !c.switchRPC() {
|
||||||
|
c.logger.Error("could not establish connection to any RPC node")
|
||||||
|
|
||||||
|
// could not connect to all endpoints =>
|
||||||
|
// switch client to inactive mode
|
||||||
|
c.inactiveMode()
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(@carpawell): call here some callback retrieved in constructor
|
||||||
|
// of the client to allow checking chain state since during switch
|
||||||
|
// process some notification could be lost
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) switchToMostPrioritized() {
|
func (c *Client) switchToMostPrioritized() {
|
||||||
t := time.NewTicker(c.cfg.switchInterval)
|
t := time.NewTicker(c.cfg.switchInterval)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
@ -156,11 +185,12 @@ mainLoop:
|
||||||
return
|
return
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
c.switchLock.RLock()
|
c.switchLock.RLock()
|
||||||
|
|
||||||
endpointsCopy := make([]Endpoint, len(c.endpoints.list))
|
endpointsCopy := make([]Endpoint, len(c.endpoints.list))
|
||||||
copy(endpointsCopy, c.endpoints.list)
|
copy(endpointsCopy, c.endpoints.list)
|
||||||
|
|
||||||
currPriority := c.endpoints.list[c.endpoints.curr].Priority
|
currPriority := c.endpoints.list[c.endpoints.curr].Priority
|
||||||
highestPriority := c.endpoints.list[0].Priority
|
highestPriority := c.endpoints.list[0].Priority
|
||||||
|
|
||||||
c.switchLock.RUnlock()
|
c.switchLock.RUnlock()
|
||||||
|
|
||||||
if currPriority == highestPriority {
|
if currPriority == highestPriority {
|
||||||
|
@ -186,7 +216,7 @@ mainLoop:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.restoreSubscriptions(cli, tryE) {
|
if subs, ok := c.restoreSubscriptions(cli, tryE, true); ok {
|
||||||
c.switchLock.Lock()
|
c.switchLock.Lock()
|
||||||
|
|
||||||
// higher priority node could have been
|
// higher priority node could have been
|
||||||
|
@ -201,6 +231,7 @@ mainLoop:
|
||||||
c.cache.invalidate()
|
c.cache.invalidate()
|
||||||
c.client = cli
|
c.client = cli
|
||||||
c.setActor(act)
|
c.setActor(act)
|
||||||
|
c.updateSubs(subs)
|
||||||
c.endpoints.curr = i
|
c.endpoints.curr = i
|
||||||
|
|
||||||
c.switchLock.Unlock()
|
c.switchLock.Unlock()
|
||||||
|
|
|
@ -1,6 +1,10 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -36,7 +40,7 @@ func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := c.client.SubscribeForExecutionNotifications(&contract, nil)
|
id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -64,7 +68,7 @@ func (c *Client) SubscribeForNewBlocks() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := c.client.SubscribeForNewBlocks(nil)
|
_, err := c.client.ReceiveBlocks(nil, c.blockRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -99,7 +103,7 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := c.client.SubscribeForNotaryRequests(nil, &txSigner)
|
id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -203,9 +207,25 @@ func (c *Client) UnsubscribeAll() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// restoreSubscriptions restores subscriptions according to
|
type subsInfo struct {
|
||||||
// cached information about them.
|
blockRcv chan *block.Block
|
||||||
func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) bool {
|
notificationRcv chan *state.ContainedNotificationEvent
|
||||||
|
notaryReqRcv chan *result.NotaryRequestEvent
|
||||||
|
|
||||||
|
subscribedToBlocks bool
|
||||||
|
subscribedEvents map[util.Uint160]string
|
||||||
|
subscribedNotaryEvents map[util.Uint160]string
|
||||||
|
}
|
||||||
|
|
||||||
|
// restoreSubscriptions restores subscriptions according to cached
|
||||||
|
// information about them.
|
||||||
|
//
|
||||||
|
// If it is NOT a background operation switchLock MUST be held.
|
||||||
|
// Returns a pair: the second is a restoration status and the first
|
||||||
|
// one contains subscription information applied to the passed cli
|
||||||
|
// and receivers for the updated subscriptions.
|
||||||
|
// Does not change Client instance.
|
||||||
|
func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
id string
|
id string
|
||||||
|
@ -214,72 +234,109 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string)
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
|
blockRcv := make(chan *block.Block)
|
||||||
|
notificationRcv := make(chan *state.ContainedNotificationEvent)
|
||||||
|
notaryReqRcv := make(chan *result.NotaryRequestEvent)
|
||||||
|
|
||||||
// neo-go WS client says to _always_ read notifications
|
// neo-go WS client says to _always_ read notifications
|
||||||
// from its channel. Subscribing to any notification
|
// from its channel. Subscribing to any notification
|
||||||
// while not reading them in another goroutine may
|
// while not reading them in another goroutine may
|
||||||
// lead to a dead-lock, thus that async side notification
|
// lead to a dead-lock, thus that async side notification
|
||||||
// listening while restoring subscriptions
|
// listening while restoring subscriptions
|
||||||
go func() {
|
go func() {
|
||||||
|
var e any
|
||||||
|
var ok bool
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
return
|
return
|
||||||
case n, ok := <-cli.Notifications:
|
case e, ok = <-blockRcv:
|
||||||
if !ok {
|
case e, ok = <-notificationRcv:
|
||||||
return
|
case e, ok = <-notaryReqRcv:
|
||||||
}
|
|
||||||
|
|
||||||
c.notifications <- n
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if background {
|
||||||
|
// background client (test) switch, no need to send
|
||||||
|
// any notification, just preventing dead-lock
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
c.routeEvent(e)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if background {
|
||||||
|
c.switchLock.RLock()
|
||||||
|
defer c.switchLock.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
si.subscribedToBlocks = c.subscribedToNewBlocks
|
||||||
|
si.subscribedEvents = copySubsMap(c.subscribedEvents)
|
||||||
|
si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents)
|
||||||
|
si.blockRcv = blockRcv
|
||||||
|
si.notificationRcv = notificationRcv
|
||||||
|
si.notaryReqRcv = notaryReqRcv
|
||||||
|
|
||||||
// new block events restoration
|
// new block events restoration
|
||||||
if c.subscribedToNewBlocks {
|
if si.subscribedToBlocks {
|
||||||
_, err = cli.SubscribeForNewBlocks(nil)
|
_, err = cli.ReceiveBlocks(nil, blockRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Error("could not restore block subscription after RPC switch",
|
c.logger.Error("could not restore block subscription after RPC switch",
|
||||||
zap.String("endpoint", endpoint),
|
zap.String("endpoint", endpoint),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// notification events restoration
|
// notification events restoration
|
||||||
for contract := range c.subscribedEvents {
|
for contract := range si.subscribedEvents {
|
||||||
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
|
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
|
||||||
id, err = cli.SubscribeForExecutionNotifications(&contract, nil)
|
id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Error("could not restore notification subscription after RPC switch",
|
c.logger.Error("could not restore notification subscription after RPC switch",
|
||||||
zap.String("endpoint", endpoint),
|
zap.String("endpoint", endpoint),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.subscribedEvents[contract] = id
|
si.subscribedEvents[contract] = id
|
||||||
}
|
}
|
||||||
|
|
||||||
// notary notification events restoration
|
// notary notification events restoration
|
||||||
if c.notary != nil {
|
if c.notary != nil {
|
||||||
for signer := range c.subscribedNotaryEvents {
|
for signer := range si.subscribedNotaryEvents {
|
||||||
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
|
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
|
||||||
id, err = cli.SubscribeForNotaryRequests(nil, &signer)
|
id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Error("could not restore notary notification subscription after RPC switch",
|
c.logger.Error("could not restore notary notification subscription after RPC switch",
|
||||||
zap.String("endpoint", endpoint),
|
zap.String("endpoint", endpoint),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.subscribedNotaryEvents[signer] = id
|
si.subscribedNotaryEvents[signer] = id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return si, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string {
|
||||||
|
newM := make(map[util.Uint160]string, len(m))
|
||||||
|
for k, v := range m {
|
||||||
|
newM[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
return newM
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue