Upd/neo-go
subscriptions #73
4 changed files with 210 additions and 117 deletions
|
@ -69,17 +69,12 @@ type Client struct {
|
||||||
// on every normal call.
|
// on every normal call.
|
||||||
switchLock *sync.RWMutex
|
switchLock *sync.RWMutex
|
||||||
|
|
||||||
// channel for ws notifications
|
|
||||||
notifications chan rpcclient.Notification
|
notifications chan rpcclient.Notification
|
||||||
|
subsInfo // protected with switchLock
|
||||||
|
|
||||||
// channel for internal stop
|
// channel for internal stop
|
||||||
closeChan chan struct{}
|
closeChan chan struct{}
|
||||||
|
|
||||||
// cached subscription information
|
|
||||||
subscribedEvents map[util.Uint160]string
|
|
||||||
subscribedNotaryEvents map[util.Uint160]string
|
|
||||||
subscribedToNewBlocks bool
|
|
||||||
|
|
||||||
// indicates that Client is not able to
|
// indicates that Client is not able to
|
||||||
// establish connection to any of the
|
// establish connection to any of the
|
||||||
// provided RPC endpoints
|
// provided RPC endpoints
|
||||||
|
@ -419,43 +414,43 @@ func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) {
|
||||||
//
|
//
|
||||||
// Wraps any error to frostfsError.
|
// Wraps any error to frostfsError.
|
||||||
func toStackParameter(value any) (sc.Parameter, error) {
|
func toStackParameter(value any) (sc.Parameter, error) {
|
||||||
var result = sc.Parameter{
|
var res = sc.Parameter{
|
||||||
Value: value,
|
Value: value,
|
||||||
}
|
}
|
||||||
|
|
||||||
switch v := value.(type) {
|
switch v := value.(type) {
|
||||||
case []byte:
|
case []byte:
|
||||||
result.Type = sc.ByteArrayType
|
res.Type = sc.ByteArrayType
|
||||||
case int:
|
case int:
|
||||||
result.Type = sc.IntegerType
|
res.Type = sc.IntegerType
|
||||||
result.Value = big.NewInt(int64(v))
|
res.Value = big.NewInt(int64(v))
|
||||||
case int64:
|
case int64:
|
||||||
result.Type = sc.IntegerType
|
res.Type = sc.IntegerType
|
||||||
result.Value = big.NewInt(v)
|
res.Value = big.NewInt(v)
|
||||||
case uint64:
|
case uint64:
|
||||||
result.Type = sc.IntegerType
|
res.Type = sc.IntegerType
|
||||||
result.Value = new(big.Int).SetUint64(v)
|
res.Value = new(big.Int).SetUint64(v)
|
||||||
case [][]byte:
|
case [][]byte:
|
||||||
arr := make([]sc.Parameter, 0, len(v))
|
arr := make([]sc.Parameter, 0, len(v))
|
||||||
for i := range v {
|
for i := range v {
|
||||||
elem, err := toStackParameter(v[i])
|
elem, err := toStackParameter(v[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
arr = append(arr, elem)
|
arr = append(arr, elem)
|
||||||
}
|
}
|
||||||
|
|
||||||
result.Type = sc.ArrayType
|
res.Type = sc.ArrayType
|
||||||
result.Value = arr
|
res.Value = arr
|
||||||
case string:
|
case string:
|
||||||
result.Type = sc.StringType
|
res.Type = sc.StringType
|
||||||
case util.Uint160:
|
case util.Uint160:
|
||||||
result.Type = sc.ByteArrayType
|
res.Type = sc.ByteArrayType
|
||||||
result.Value = v.BytesBE()
|
res.Value = v.BytesBE()
|
||||||
case noderoles.Role:
|
case noderoles.Role:
|
||||||
result.Type = sc.IntegerType
|
res.Type = sc.IntegerType
|
||||||
result.Value = big.NewInt(int64(v))
|
res.Value = big.NewInt(int64(v))
|
||||||
case keys.PublicKeys:
|
case keys.PublicKeys:
|
||||||
arr := make([][]byte, 0, len(v))
|
arr := make([][]byte, 0, len(v))
|
||||||
for i := range v {
|
for i := range v {
|
||||||
|
@ -464,13 +459,13 @@ func toStackParameter(value any) (sc.Parameter, error) {
|
||||||
|
|
||||||
return toStackParameter(arr)
|
return toStackParameter(arr)
|
||||||
case bool:
|
case bool:
|
||||||
result.Type = sc.BoolType
|
res.Type = sc.BoolType
|
||||||
result.Value = v
|
res.Value = v
|
||||||
default:
|
default:
|
||||||
return result, wrapFrostFSError(fmt.Errorf("chain/client: unsupported parameter %v", value))
|
return res, wrapFrostFSError(fmt.Errorf("chain/client: unsupported parameter %v", value))
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MagicNumber returns the magic number of the network
|
// MagicNumber returns the magic number of the network
|
||||||
|
@ -514,7 +509,7 @@ func (c *Client) MsPerBlock() (res int64, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsValidScript returns true if invocation script executes with HALT state.
|
// IsValidScript returns true if invocation script executes with HALT state.
|
||||||
func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (res bool, err error) {
|
func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (valid bool, err error) {
|
||||||
c.switchLock.RLock()
|
c.switchLock.RLock()
|
||||||
defer c.switchLock.RUnlock()
|
defer c.switchLock.RUnlock()
|
||||||
|
|
||||||
|
@ -522,12 +517,12 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (res
|
||||||
return false, ErrConnectionLost
|
return false, ErrConnectionLost
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := c.client.InvokeScript(script, signers)
|
res, err := c.client.InvokeScript(script, signers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("invokeScript: %w", err)
|
return false, fmt.Errorf("invokeScript: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return result.State == vmstate.Halt.String(), nil
|
return res.State == vmstate.Halt.String(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NotificationChannel returns channel than receives subscribed
|
// NotificationChannel returns channel than receives subscribed
|
||||||
|
|
|
@ -9,8 +9,11 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
lru "github.com/hashicorp/golang-lru/v2"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
@ -102,17 +105,22 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cli := &Client{
|
cli := &Client{
|
||||||
cache: newClientCache(),
|
cache: newClientCache(),
|
||||||
logger: cfg.logger,
|
logger: cfg.logger,
|
||||||
acc: acc,
|
acc: acc,
|
||||||
accAddr: accAddr,
|
accAddr: accAddr,
|
||||||
signer: cfg.signer,
|
signer: cfg.signer,
|
||||||
cfg: *cfg,
|
cfg: *cfg,
|
||||||
switchLock: &sync.RWMutex{},
|
switchLock: &sync.RWMutex{},
|
||||||
notifications: make(chan rpcclient.Notification),
|
notifications: make(chan rpcclient.Notification),
|
||||||
subscribedEvents: make(map[util.Uint160]string),
|
subsInfo: subsInfo{
|
||||||
subscribedNotaryEvents: make(map[util.Uint160]string),
|
blockRcv: make(chan *block.Block),
|
||||||
closeChan: make(chan struct{}),
|
notificationRcv: make(chan *state.ContainedNotificationEvent),
|
||||||
|
notaryReqRcv: make(chan *result.NotaryRequestEvent),
|
||||||
|
subscribedEvents: make(map[util.Uint160]string),
|
||||||
|
subscribedNotaryEvents: make(map[util.Uint160]string),
|
||||||
|
},
|
||||||
|
closeChan: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
cli.endpoints.init(cfg.endpoints)
|
cli.endpoints.init(cfg.endpoints)
|
||||||
|
|
|
@ -4,6 +4,11 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -51,7 +56,8 @@ func (c *Client) switchRPC() bool {
|
||||||
c.logger.Info("connection to the new RPC node has been established",
|
c.logger.Info("connection to the new RPC node has been established",
|
||||||
zap.String("endpoint", newEndpoint))
|
zap.String("endpoint", newEndpoint))
|
||||||
|
|
||||||
if !c.restoreSubscriptions(cli, newEndpoint) {
|
subs, ok := c.restoreSubscriptions(cli, newEndpoint, false)
|
||||||
|
if !ok {
|
||||||
// new WS client does not allow
|
// new WS client does not allow
|
||||||
// restoring subscription, client
|
// restoring subscription, client
|
||||||
// could not work correctly =>
|
// could not work correctly =>
|
||||||
|
@ -63,6 +69,7 @@ func (c *Client) switchRPC() bool {
|
||||||
|
|
||||||
c.client = cli
|
c.client = cli
|
||||||
c.setActor(act)
|
c.setActor(act)
|
||||||
|
c.subsInfo = subs
|
||||||
|
|
||||||
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
|
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
|
||||||
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
|
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
|
||||||
|
@ -77,9 +84,14 @@ func (c *Client) switchRPC() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) notificationLoop() {
|
func (c *Client) notificationLoop() {
|
||||||
|
var e any
|
||||||
|
var ok bool
|
||||||
|
|
||||||
for {
|
for {
|
||||||
c.switchLock.RLock()
|
c.switchLock.RLock()
|
||||||
nChan := c.client.Notifications
|
bChan := c.blockRcv
|
||||||
|
nChan := c.notificationRcv
|
||||||
|
nrChan := c.notaryReqRcv
|
||||||
c.switchLock.RUnlock()
|
c.switchLock.RUnlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
@ -93,57 +105,74 @@ func (c *Client) notificationLoop() {
|
||||||
c.close()
|
c.close()
|
||||||
|
|
||||||
return
|
return
|
||||||
case n, ok := <-nChan:
|
case e, ok = <-bChan:
|
||||||
// notification channel is used as a connection
|
case e, ok = <-nChan:
|
||||||
// state: if it is closed, the connection is
|
case e, ok = <-nrChan:
|
||||||
// considered to be lost
|
}
|
||||||
if !ok {
|
|
||||||
if closeErr := c.client.GetError(); closeErr != nil {
|
|
||||||
c.logger.Warn("switching to the next RPC node",
|
|
||||||
zap.String("reason", closeErr.Error()),
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
// neo-go client was closed by calling `Close`
|
|
||||||
// method that happens only when the client has
|
|
||||||
// switched to the more prioritized RPC
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if !c.switchRPC() {
|
if ok {
|
||||||
c.logger.Error("could not establish connection to any RPC node")
|
c.routeEvent(e)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// could not connect to all endpoints =>
|
if !c.reconnect() {
|
||||||
// switch client to inactive mode
|
return
|
||||||
c.inactiveMode()
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(@carpawell): call here some callback retrieved in constructor
|
|
||||||
// of the client to allow checking chain state since during switch
|
|
||||||
// process some notification could be lost
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case c.notifications <- n:
|
|
||||||
continue
|
|
||||||
case <-c.cfg.ctx.Done():
|
|
||||||
_ = c.UnsubscribeAll()
|
|
||||||
c.close()
|
|
||||||
|
|
||||||
return
|
|
||||||
case <-c.closeChan:
|
|
||||||
_ = c.UnsubscribeAll()
|
|
||||||
c.close()
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) routeEvent(e any) {
|
||||||
|
typedNotification := rpcclient.Notification{Value: e}
|
||||||
|
|
||||||
|
switch e.(type) {
|
||||||
|
case *block.Block:
|
||||||
|
typedNotification.Type = neorpc.BlockEventID
|
||||||
|
case *state.ContainedNotificationEvent:
|
||||||
|
typedNotification.Type = neorpc.NotificationEventID
|
||||||
|
case *result.NotaryRequestEvent:
|
||||||
|
typedNotification.Type = neorpc.NotaryRequestEventID
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c.notifications <- typedNotification:
|
||||||
|
case <-c.cfg.ctx.Done():
|
||||||
|
_ = c.UnsubscribeAll()
|
||||||
|
c.close()
|
||||||
|
case <-c.closeChan:
|
||||||
|
_ = c.UnsubscribeAll()
|
||||||
|
c.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) reconnect() bool {
|
||||||
|
if closeErr := c.client.GetError(); closeErr != nil {
|
||||||
|
c.logger.Warn("switching to the next RPC node",
|
||||||
|
zap.String("reason", closeErr.Error()),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// neo-go client was closed by calling `Close`
|
||||||
|
// method, that happens only when a client has
|
||||||
|
// switched to the more prioritized RPC
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if !c.switchRPC() {
|
||||||
|
c.logger.Error("could not establish connection to any RPC node")
|
||||||
|
|
||||||
|
// could not connect to all endpoints =>
|
||||||
|
// switch client to inactive mode
|
||||||
|
c.inactiveMode()
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(@carpawell): call here some callback retrieved in constructor
|
||||||
|
// of the client to allow checking chain state since during switch
|
||||||
|
// process some notification could be lost
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) switchToMostPrioritized() {
|
func (c *Client) switchToMostPrioritized() {
|
||||||
t := time.NewTicker(c.cfg.switchInterval)
|
t := time.NewTicker(c.cfg.switchInterval)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
@ -156,11 +185,12 @@ mainLoop:
|
||||||
return
|
return
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
c.switchLock.RLock()
|
c.switchLock.RLock()
|
||||||
|
|
||||||
endpointsCopy := make([]Endpoint, len(c.endpoints.list))
|
endpointsCopy := make([]Endpoint, len(c.endpoints.list))
|
||||||
copy(endpointsCopy, c.endpoints.list)
|
copy(endpointsCopy, c.endpoints.list)
|
||||||
|
|
||||||
currPriority := c.endpoints.list[c.endpoints.curr].Priority
|
currPriority := c.endpoints.list[c.endpoints.curr].Priority
|
||||||
highestPriority := c.endpoints.list[0].Priority
|
highestPriority := c.endpoints.list[0].Priority
|
||||||
|
|
||||||
c.switchLock.RUnlock()
|
c.switchLock.RUnlock()
|
||||||
|
|
||||||
if currPriority == highestPriority {
|
if currPriority == highestPriority {
|
||||||
|
@ -186,7 +216,7 @@ mainLoop:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.restoreSubscriptions(cli, tryE) {
|
if subs, ok := c.restoreSubscriptions(cli, tryE, true); ok {
|
||||||
c.switchLock.Lock()
|
c.switchLock.Lock()
|
||||||
|
|
||||||
// higher priority node could have been
|
// higher priority node could have been
|
||||||
|
@ -201,6 +231,7 @@ mainLoop:
|
||||||
c.cache.invalidate()
|
c.cache.invalidate()
|
||||||
c.client = cli
|
c.client = cli
|
||||||
c.setActor(act)
|
c.setActor(act)
|
||||||
|
c.subsInfo = subs
|
||||||
c.endpoints.curr = i
|
c.endpoints.curr = i
|
||||||
|
|
||||||
c.switchLock.Unlock()
|
c.switchLock.Unlock()
|
||||||
|
|
|
@ -1,6 +1,10 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -36,7 +40,7 @@ func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := c.client.SubscribeForExecutionNotifications(&contract, nil)
|
id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -59,17 +63,17 @@ func (c *Client) SubscribeForNewBlocks() error {
|
||||||
return ErrConnectionLost
|
return ErrConnectionLost
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.subscribedToNewBlocks {
|
if c.subscribedToBlocks {
|
||||||
// no need to subscribe one more time
|
// no need to subscribe one more time
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := c.client.SubscribeForNewBlocks(nil)
|
_, err := c.client.ReceiveBlocks(nil, c.blockRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.subscribedToNewBlocks = true
|
c.subscribedToBlocks = true
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -99,7 +103,7 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := c.client.SubscribeForNotaryRequests(nil, &txSigner)
|
id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -187,7 +191,7 @@ func (c *Client) UnsubscribeAll() error {
|
||||||
// no need to unsubscribe if there are
|
// no need to unsubscribe if there are
|
||||||
// no active subscriptions
|
// no active subscriptions
|
||||||
if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 &&
|
if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 &&
|
||||||
!c.subscribedToNewBlocks {
|
!c.subscribedToBlocks {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,14 +202,32 @@ func (c *Client) UnsubscribeAll() error {
|
||||||
|
|
||||||
c.subscribedEvents = make(map[util.Uint160]string)
|
c.subscribedEvents = make(map[util.Uint160]string)
|
||||||
c.subscribedNotaryEvents = make(map[util.Uint160]string)
|
c.subscribedNotaryEvents = make(map[util.Uint160]string)
|
||||||
c.subscribedToNewBlocks = false
|
c.subscribedToBlocks = false
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// restoreSubscriptions restores subscriptions according to
|
// subsInfo includes channels for ws notifications;
|
||||||
// cached information about them.
|
// cached subscription information.
|
||||||
func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) bool {
|
type subsInfo struct {
|
||||||
|
blockRcv chan *block.Block
|
||||||
|
notificationRcv chan *state.ContainedNotificationEvent
|
||||||
|
notaryReqRcv chan *result.NotaryRequestEvent
|
||||||
|
|
||||||
|
subscribedToBlocks bool
|
||||||
|
subscribedEvents map[util.Uint160]string
|
||||||
|
subscribedNotaryEvents map[util.Uint160]string
|
||||||
|
}
|
||||||
|
|
||||||
|
// restoreSubscriptions restores subscriptions according to cached
|
||||||
|
// information about them.
|
||||||
|
//
|
||||||
|
// If it is NOT a background operation switchLock MUST be held.
|
||||||
|
// Returns a pair: the second is a restoration status and the first
|
||||||
|
// one contains subscription information applied to the passed cli
|
||||||
|
// and receivers for the updated subscriptions.
|
||||||
|
// Does not change Client instance.
|
||||||
|
func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
id string
|
id string
|
||||||
|
@ -214,72 +236,109 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string)
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
|
blockRcv := make(chan *block.Block)
|
||||||
|
notificationRcv := make(chan *state.ContainedNotificationEvent)
|
||||||
|
notaryReqRcv := make(chan *result.NotaryRequestEvent)
|
||||||
|
|
||||||
// neo-go WS client says to _always_ read notifications
|
// neo-go WS client says to _always_ read notifications
|
||||||
// from its channel. Subscribing to any notification
|
// from its channel. Subscribing to any notification
|
||||||
// while not reading them in another goroutine may
|
// while not reading them in another goroutine may
|
||||||
// lead to a dead-lock, thus that async side notification
|
// lead to a dead-lock, thus that async side notification
|
||||||
// listening while restoring subscriptions
|
// listening while restoring subscriptions
|
||||||
go func() {
|
go func() {
|
||||||
|
var e any
|
||||||
|
var ok bool
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
return
|
return
|
||||||
case n, ok := <-cli.Notifications:
|
case e, ok = <-blockRcv:
|
||||||
if !ok {
|
case e, ok = <-notificationRcv:
|
||||||
return
|
case e, ok = <-notaryReqRcv:
|
||||||
}
|
|
||||||
|
|
||||||
c.notifications <- n
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if background {
|
||||||
|
// background client (test) switch, no need to send
|
||||||
|
// any notification, just preventing dead-lock
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
c.routeEvent(e)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if background {
|
||||||
|
c.switchLock.RLock()
|
||||||
|
defer c.switchLock.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
si.subscribedToBlocks = c.subscribedToBlocks
|
||||||
|
si.subscribedEvents = copySubsMap(c.subscribedEvents)
|
||||||
|
si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents)
|
||||||
|
si.blockRcv = blockRcv
|
||||||
|
si.notificationRcv = notificationRcv
|
||||||
|
si.notaryReqRcv = notaryReqRcv
|
||||||
|
|
||||||
// new block events restoration
|
// new block events restoration
|
||||||
if c.subscribedToNewBlocks {
|
if si.subscribedToBlocks {
|
||||||
_, err = cli.SubscribeForNewBlocks(nil)
|
_, err = cli.ReceiveBlocks(nil, blockRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Error("could not restore block subscription after RPC switch",
|
c.logger.Error("could not restore block subscription after RPC switch",
|
||||||
zap.String("endpoint", endpoint),
|
zap.String("endpoint", endpoint),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// notification events restoration
|
// notification events restoration
|
||||||
for contract := range c.subscribedEvents {
|
for contract := range si.subscribedEvents {
|
||||||
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
|
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
|
||||||
id, err = cli.SubscribeForExecutionNotifications(&contract, nil)
|
id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Error("could not restore notification subscription after RPC switch",
|
c.logger.Error("could not restore notification subscription after RPC switch",
|
||||||
zap.String("endpoint", endpoint),
|
zap.String("endpoint", endpoint),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.subscribedEvents[contract] = id
|
si.subscribedEvents[contract] = id
|
||||||
}
|
}
|
||||||
|
|
||||||
// notary notification events restoration
|
// notary notification events restoration
|
||||||
if c.notary != nil {
|
if c.notary != nil {
|
||||||
for signer := range c.subscribedNotaryEvents {
|
for signer := range si.subscribedNotaryEvents {
|
||||||
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
|
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
|
||||||
id, err = cli.SubscribeForNotaryRequests(nil, &signer)
|
id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Error("could not restore notary notification subscription after RPC switch",
|
c.logger.Error("could not restore notary notification subscription after RPC switch",
|
||||||
zap.String("endpoint", endpoint),
|
zap.String("endpoint", endpoint),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.subscribedNotaryEvents[signer] = id
|
si.subscribedNotaryEvents[signer] = id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return si, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string {
|
||||||
|
newM := make(map[util.Uint160]string, len(m))
|
||||||
|
for k, v := range m {
|
||||||
|
newM[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
return newM
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue