Upd/neo-go subscriptions #73

Merged
carpawell merged 2 commits from carpawell/upd/neo-go-subs into master 2023-03-24 09:42:31 +00:00
4 changed files with 210 additions and 117 deletions

View file

@ -69,17 +69,12 @@ type Client struct {
// on every normal call. // on every normal call.
switchLock *sync.RWMutex switchLock *sync.RWMutex
// channel for ws notifications
notifications chan rpcclient.Notification notifications chan rpcclient.Notification
subsInfo // protected with switchLock
// channel for internal stop // channel for internal stop
closeChan chan struct{} closeChan chan struct{}
// cached subscription information
subscribedEvents map[util.Uint160]string
subscribedNotaryEvents map[util.Uint160]string
subscribedToNewBlocks bool
// indicates that Client is not able to // indicates that Client is not able to
// establish connection to any of the // establish connection to any of the
// provided RPC endpoints // provided RPC endpoints
@ -419,43 +414,43 @@ func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) {
// //
// Wraps any error to frostfsError. // Wraps any error to frostfsError.
func toStackParameter(value any) (sc.Parameter, error) { func toStackParameter(value any) (sc.Parameter, error) {
var result = sc.Parameter{ var res = sc.Parameter{
Value: value, Value: value,
} }
switch v := value.(type) { switch v := value.(type) {
case []byte: case []byte:
result.Type = sc.ByteArrayType res.Type = sc.ByteArrayType
case int: case int:
result.Type = sc.IntegerType res.Type = sc.IntegerType
result.Value = big.NewInt(int64(v)) res.Value = big.NewInt(int64(v))
case int64: case int64:
result.Type = sc.IntegerType res.Type = sc.IntegerType
result.Value = big.NewInt(v) res.Value = big.NewInt(v)
case uint64: case uint64:
result.Type = sc.IntegerType res.Type = sc.IntegerType
result.Value = new(big.Int).SetUint64(v) res.Value = new(big.Int).SetUint64(v)
case [][]byte: case [][]byte:
arr := make([]sc.Parameter, 0, len(v)) arr := make([]sc.Parameter, 0, len(v))
for i := range v { for i := range v {
elem, err := toStackParameter(v[i]) elem, err := toStackParameter(v[i])
if err != nil { if err != nil {
return result, err return res, err
} }
arr = append(arr, elem) arr = append(arr, elem)
} }
result.Type = sc.ArrayType res.Type = sc.ArrayType
result.Value = arr res.Value = arr
case string: case string:
result.Type = sc.StringType res.Type = sc.StringType
case util.Uint160: case util.Uint160:
result.Type = sc.ByteArrayType res.Type = sc.ByteArrayType
result.Value = v.BytesBE() res.Value = v.BytesBE()
case noderoles.Role: case noderoles.Role:
result.Type = sc.IntegerType res.Type = sc.IntegerType
result.Value = big.NewInt(int64(v)) res.Value = big.NewInt(int64(v))
case keys.PublicKeys: case keys.PublicKeys:
arr := make([][]byte, 0, len(v)) arr := make([][]byte, 0, len(v))
for i := range v { for i := range v {
@ -464,13 +459,13 @@ func toStackParameter(value any) (sc.Parameter, error) {
return toStackParameter(arr) return toStackParameter(arr)
case bool: case bool:
result.Type = sc.BoolType res.Type = sc.BoolType
result.Value = v res.Value = v
default: default:
return result, wrapFrostFSError(fmt.Errorf("chain/client: unsupported parameter %v", value)) return res, wrapFrostFSError(fmt.Errorf("chain/client: unsupported parameter %v", value))
} }
return result, nil return res, nil
} }
// MagicNumber returns the magic number of the network // MagicNumber returns the magic number of the network
@ -514,7 +509,7 @@ func (c *Client) MsPerBlock() (res int64, err error) {
} }
// IsValidScript returns true if invocation script executes with HALT state. // IsValidScript returns true if invocation script executes with HALT state.
func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (res bool, err error) { func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (valid bool, err error) {
c.switchLock.RLock() c.switchLock.RLock()
defer c.switchLock.RUnlock() defer c.switchLock.RUnlock()
@ -522,12 +517,12 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (res
return false, ErrConnectionLost return false, ErrConnectionLost
} }
result, err := c.client.InvokeScript(script, signers) res, err := c.client.InvokeScript(script, signers)
if err != nil { if err != nil {
return false, fmt.Errorf("invokeScript: %w", err) return false, fmt.Errorf("invokeScript: %w", err)
} }
return result.State == vmstate.Halt.String(), nil return res.State == vmstate.Halt.String(), nil
} }
// NotificationChannel returns channel than receives subscribed // NotificationChannel returns channel than receives subscribed

View file

@ -9,8 +9,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
@ -102,17 +105,22 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) {
} }
cli := &Client{ cli := &Client{
cache: newClientCache(), cache: newClientCache(),
logger: cfg.logger, logger: cfg.logger,
acc: acc, acc: acc,
accAddr: accAddr, accAddr: accAddr,
signer: cfg.signer, signer: cfg.signer,
cfg: *cfg, cfg: *cfg,
switchLock: &sync.RWMutex{}, switchLock: &sync.RWMutex{},
notifications: make(chan rpcclient.Notification), notifications: make(chan rpcclient.Notification),
subscribedEvents: make(map[util.Uint160]string), subsInfo: subsInfo{
subscribedNotaryEvents: make(map[util.Uint160]string), blockRcv: make(chan *block.Block),
closeChan: make(chan struct{}), notificationRcv: make(chan *state.ContainedNotificationEvent),
notaryReqRcv: make(chan *result.NotaryRequestEvent),
subscribedEvents: make(map[util.Uint160]string),
subscribedNotaryEvents: make(map[util.Uint160]string),
},
closeChan: make(chan struct{}),
} }
cli.endpoints.init(cfg.endpoints) cli.endpoints.init(cfg.endpoints)

View file

@ -4,6 +4,11 @@ import (
"sort" "sort"
"time" "time"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -51,7 +56,8 @@ func (c *Client) switchRPC() bool {
c.logger.Info("connection to the new RPC node has been established", c.logger.Info("connection to the new RPC node has been established",
zap.String("endpoint", newEndpoint)) zap.String("endpoint", newEndpoint))
if !c.restoreSubscriptions(cli, newEndpoint) { subs, ok := c.restoreSubscriptions(cli, newEndpoint, false)
if !ok {
// new WS client does not allow // new WS client does not allow
// restoring subscription, client // restoring subscription, client
// could not work correctly => // could not work correctly =>
@ -63,6 +69,7 @@ func (c *Client) switchRPC() bool {
c.client = cli c.client = cli
c.setActor(act) c.setActor(act)
c.subsInfo = subs
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() && if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority { c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
@ -77,9 +84,14 @@ func (c *Client) switchRPC() bool {
} }
func (c *Client) notificationLoop() { func (c *Client) notificationLoop() {
var e any
var ok bool
for { for {
c.switchLock.RLock() c.switchLock.RLock()
nChan := c.client.Notifications bChan := c.blockRcv
nChan := c.notificationRcv
nrChan := c.notaryReqRcv
c.switchLock.RUnlock() c.switchLock.RUnlock()
select { select {
@ -93,57 +105,74 @@ func (c *Client) notificationLoop() {
c.close() c.close()
return return
case n, ok := <-nChan: case e, ok = <-bChan:
// notification channel is used as a connection case e, ok = <-nChan:
// state: if it is closed, the connection is case e, ok = <-nrChan:
// considered to be lost }
if !ok {
if closeErr := c.client.GetError(); closeErr != nil {
c.logger.Warn("switching to the next RPC node",
zap.String("reason", closeErr.Error()),
)
} else {
// neo-go client was closed by calling `Close`
// method that happens only when the client has
// switched to the more prioritized RPC
continue
}
if !c.switchRPC() { if ok {
c.logger.Error("could not establish connection to any RPC node") c.routeEvent(e)
continue
}
// could not connect to all endpoints => if !c.reconnect() {
// switch client to inactive mode return
c.inactiveMode()
return
}
// TODO(@carpawell): call here some callback retrieved in constructor
// of the client to allow checking chain state since during switch
// process some notification could be lost
continue
}
select {
case c.notifications <- n:
continue
case <-c.cfg.ctx.Done():
_ = c.UnsubscribeAll()
c.close()
return
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
return
}
} }
} }
} }
func (c *Client) routeEvent(e any) {
typedNotification := rpcclient.Notification{Value: e}
switch e.(type) {
case *block.Block:
typedNotification.Type = neorpc.BlockEventID
case *state.ContainedNotificationEvent:
typedNotification.Type = neorpc.NotificationEventID
case *result.NotaryRequestEvent:
typedNotification.Type = neorpc.NotaryRequestEventID
}
select {
case c.notifications <- typedNotification:
case <-c.cfg.ctx.Done():
_ = c.UnsubscribeAll()
c.close()
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
}
}
func (c *Client) reconnect() bool {
if closeErr := c.client.GetError(); closeErr != nil {
c.logger.Warn("switching to the next RPC node",
zap.String("reason", closeErr.Error()),
)
} else {
// neo-go client was closed by calling `Close`
// method, that happens only when a client has
// switched to the more prioritized RPC
return true
}
if !c.switchRPC() {
c.logger.Error("could not establish connection to any RPC node")
// could not connect to all endpoints =>
// switch client to inactive mode
c.inactiveMode()
return false
}
// TODO(@carpawell): call here some callback retrieved in constructor
// of the client to allow checking chain state since during switch
// process some notification could be lost
return true
}
func (c *Client) switchToMostPrioritized() { func (c *Client) switchToMostPrioritized() {
t := time.NewTicker(c.cfg.switchInterval) t := time.NewTicker(c.cfg.switchInterval)
defer t.Stop() defer t.Stop()
@ -156,11 +185,12 @@ mainLoop:
return return
case <-t.C: case <-t.C:
c.switchLock.RLock() c.switchLock.RLock()
endpointsCopy := make([]Endpoint, len(c.endpoints.list)) endpointsCopy := make([]Endpoint, len(c.endpoints.list))
copy(endpointsCopy, c.endpoints.list) copy(endpointsCopy, c.endpoints.list)
currPriority := c.endpoints.list[c.endpoints.curr].Priority currPriority := c.endpoints.list[c.endpoints.curr].Priority
highestPriority := c.endpoints.list[0].Priority highestPriority := c.endpoints.list[0].Priority
c.switchLock.RUnlock() c.switchLock.RUnlock()
if currPriority == highestPriority { if currPriority == highestPriority {
@ -186,7 +216,7 @@ mainLoop:
continue continue
} }
if c.restoreSubscriptions(cli, tryE) { if subs, ok := c.restoreSubscriptions(cli, tryE, true); ok {
c.switchLock.Lock() c.switchLock.Lock()
// higher priority node could have been // higher priority node could have been
@ -201,6 +231,7 @@ mainLoop:
c.cache.invalidate() c.cache.invalidate()
c.client = cli c.client = cli
c.setActor(act) c.setActor(act)
c.subsInfo = subs
c.endpoints.curr = i c.endpoints.curr = i
c.switchLock.Unlock() c.switchLock.Unlock()

View file

@ -1,6 +1,10 @@
package client package client
import ( import (
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap" "go.uber.org/zap"
@ -36,7 +40,7 @@ func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error
return nil return nil
} }
id, err := c.client.SubscribeForExecutionNotifications(&contract, nil) id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv)
if err != nil { if err != nil {
return err return err
} }
@ -59,17 +63,17 @@ func (c *Client) SubscribeForNewBlocks() error {
return ErrConnectionLost return ErrConnectionLost
} }
if c.subscribedToNewBlocks { if c.subscribedToBlocks {
// no need to subscribe one more time // no need to subscribe one more time
return nil return nil
} }
_, err := c.client.SubscribeForNewBlocks(nil) _, err := c.client.ReceiveBlocks(nil, c.blockRcv)
if err != nil { if err != nil {
return err return err
} }
c.subscribedToNewBlocks = true c.subscribedToBlocks = true
return nil return nil
} }
@ -99,7 +103,7 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
return nil return nil
} }
id, err := c.client.SubscribeForNotaryRequests(nil, &txSigner) id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv)
if err != nil { if err != nil {
return err return err
} }
@ -187,7 +191,7 @@ func (c *Client) UnsubscribeAll() error {
// no need to unsubscribe if there are // no need to unsubscribe if there are
// no active subscriptions // no active subscriptions
if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 && if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 &&
!c.subscribedToNewBlocks { !c.subscribedToBlocks {
return nil return nil
} }
@ -198,14 +202,32 @@ func (c *Client) UnsubscribeAll() error {
c.subscribedEvents = make(map[util.Uint160]string) c.subscribedEvents = make(map[util.Uint160]string)
c.subscribedNotaryEvents = make(map[util.Uint160]string) c.subscribedNotaryEvents = make(map[util.Uint160]string)
c.subscribedToNewBlocks = false c.subscribedToBlocks = false
return nil return nil
} }
// restoreSubscriptions restores subscriptions according to // subsInfo includes channels for ws notifications;
// cached information about them. // cached subscription information.
func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) bool { type subsInfo struct {
blockRcv chan *block.Block
notificationRcv chan *state.ContainedNotificationEvent
notaryReqRcv chan *result.NotaryRequestEvent
subscribedToBlocks bool
subscribedEvents map[util.Uint160]string
subscribedNotaryEvents map[util.Uint160]string
}
// restoreSubscriptions restores subscriptions according to cached
// information about them.
//
// If it is NOT a background operation switchLock MUST be held.
// Returns a pair: the second is a restoration status and the first
// one contains subscription information applied to the passed cli
// and receivers for the updated subscriptions.
// Does not change Client instance.
func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) {
var ( var (
err error err error
id string id string
@ -214,72 +236,109 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
blockRcv := make(chan *block.Block)
notificationRcv := make(chan *state.ContainedNotificationEvent)
notaryReqRcv := make(chan *result.NotaryRequestEvent)
// neo-go WS client says to _always_ read notifications // neo-go WS client says to _always_ read notifications
// from its channel. Subscribing to any notification // from its channel. Subscribing to any notification
// while not reading them in another goroutine may // while not reading them in another goroutine may
// lead to a dead-lock, thus that async side notification // lead to a dead-lock, thus that async side notification
// listening while restoring subscriptions // listening while restoring subscriptions
go func() { go func() {
var e any
var ok bool
for { for {
select { select {
case <-stopCh: case <-stopCh:
return return
case n, ok := <-cli.Notifications: case e, ok = <-blockRcv:
if !ok { case e, ok = <-notificationRcv:
return case e, ok = <-notaryReqRcv:
}
c.notifications <- n
} }
if !ok {
return
}
if background {
// background client (test) switch, no need to send
// any notification, just preventing dead-lock
continue
}
c.routeEvent(e)
} }
}() }()
if background {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
}
si.subscribedToBlocks = c.subscribedToBlocks
si.subscribedEvents = copySubsMap(c.subscribedEvents)
si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents)
si.blockRcv = blockRcv
si.notificationRcv = notificationRcv
si.notaryReqRcv = notaryReqRcv
// new block events restoration // new block events restoration
if c.subscribedToNewBlocks { if si.subscribedToBlocks {
_, err = cli.SubscribeForNewBlocks(nil) _, err = cli.ReceiveBlocks(nil, blockRcv)
if err != nil { if err != nil {
c.logger.Error("could not restore block subscription after RPC switch", c.logger.Error("could not restore block subscription after RPC switch",
zap.String("endpoint", endpoint), zap.String("endpoint", endpoint),
zap.Error(err), zap.Error(err),
) )
return false return
} }
} }
// notification events restoration // notification events restoration
for contract := range c.subscribedEvents { for contract := range si.subscribedEvents {
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890 contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.SubscribeForExecutionNotifications(&contract, nil) id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv)
if err != nil { if err != nil {
c.logger.Error("could not restore notification subscription after RPC switch", c.logger.Error("could not restore notification subscription after RPC switch",
zap.String("endpoint", endpoint), zap.String("endpoint", endpoint),
zap.Error(err), zap.Error(err),
) )
return false return
} }
c.subscribedEvents[contract] = id si.subscribedEvents[contract] = id
} }
// notary notification events restoration // notary notification events restoration
if c.notary != nil { if c.notary != nil {
for signer := range c.subscribedNotaryEvents { for signer := range si.subscribedNotaryEvents {
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890 signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.SubscribeForNotaryRequests(nil, &signer) id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv)
if err != nil { if err != nil {
c.logger.Error("could not restore notary notification subscription after RPC switch", c.logger.Error("could not restore notary notification subscription after RPC switch",
zap.String("endpoint", endpoint), zap.String("endpoint", endpoint),
zap.Error(err), zap.Error(err),
) )
return false return
} }
c.subscribedNotaryEvents[signer] = id si.subscribedNotaryEvents[signer] = id
} }
} }
return true return si, true
}
func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string {
newM := make(map[util.Uint160]string, len(m))
for k, v := range m {
newM[k] = v
}
return newM
} }