Compare commits

...

2 commits

Author SHA1 Message Date
Pavel Karpy
543d1dca85 [#73] morph: Rename vars that collide with package names
Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-02-22 19:28:36 +03:00
Pavel Karpy
8579ed4fff [#59] morph: Adopt updated neo-go client API for subs
It does not use deprecated methods anymore but also adds more code that
removes. Future refactor that will affect more components will optimize
usage of the updated API.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-02-22 19:28:33 +03:00
4 changed files with 212 additions and 101 deletions

View file

@ -10,10 +10,13 @@ import (
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger" "github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles" "github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas" "github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
@ -69,17 +72,20 @@ type Client struct {
// on every normal call. // on every normal call.
switchLock *sync.RWMutex switchLock *sync.RWMutex
// channel for ws notifications // channels for ws notifications; protected with switchLock
notifications chan rpcclient.Notification notifications chan rpcclient.Notification
blockRcv chan *block.Block
// channel for internal stop notificationRcv chan *state.ContainedNotificationEvent
closeChan chan struct{} notaryReqRcv chan *result.NotaryRequestEvent
// cached subscription information // cached subscription information
subscribedEvents map[util.Uint160]string subscribedEvents map[util.Uint160]string
subscribedNotaryEvents map[util.Uint160]string subscribedNotaryEvents map[util.Uint160]string
subscribedToNewBlocks bool subscribedToNewBlocks bool
// channel for internal stop
closeChan chan struct{}
// indicates that Client is not able to // indicates that Client is not able to
// establish connection to any of the // establish connection to any of the
// provided RPC endpoints // provided RPC endpoints
@ -385,43 +391,43 @@ func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) {
// //
// Wraps any error to frostfsError. // Wraps any error to frostfsError.
func toStackParameter(value any) (sc.Parameter, error) { func toStackParameter(value any) (sc.Parameter, error) {
var result = sc.Parameter{ var res = sc.Parameter{
Value: value, Value: value,
} }
switch v := value.(type) { switch v := value.(type) {
case []byte: case []byte:
result.Type = sc.ByteArrayType res.Type = sc.ByteArrayType
case int: case int:
result.Type = sc.IntegerType res.Type = sc.IntegerType
result.Value = big.NewInt(int64(v)) res.Value = big.NewInt(int64(v))
case int64: case int64:
result.Type = sc.IntegerType res.Type = sc.IntegerType
result.Value = big.NewInt(v) res.Value = big.NewInt(v)
case uint64: case uint64:
result.Type = sc.IntegerType res.Type = sc.IntegerType
result.Value = new(big.Int).SetUint64(v) res.Value = new(big.Int).SetUint64(v)
case [][]byte: case [][]byte:
arr := make([]sc.Parameter, 0, len(v)) arr := make([]sc.Parameter, 0, len(v))
for i := range v { for i := range v {
elem, err := toStackParameter(v[i]) elem, err := toStackParameter(v[i])
if err != nil { if err != nil {
return result, err return res, err
} }
arr = append(arr, elem) arr = append(arr, elem)
} }
result.Type = sc.ArrayType res.Type = sc.ArrayType
result.Value = arr res.Value = arr
case string: case string:
result.Type = sc.StringType res.Type = sc.StringType
case util.Uint160: case util.Uint160:
result.Type = sc.ByteArrayType res.Type = sc.ByteArrayType
result.Value = v.BytesBE() res.Value = v.BytesBE()
case noderoles.Role: case noderoles.Role:
result.Type = sc.IntegerType res.Type = sc.IntegerType
result.Value = big.NewInt(int64(v)) res.Value = big.NewInt(int64(v))
case keys.PublicKeys: case keys.PublicKeys:
arr := make([][]byte, 0, len(v)) arr := make([][]byte, 0, len(v))
for i := range v { for i := range v {
@ -430,13 +436,13 @@ func toStackParameter(value any) (sc.Parameter, error) {
return toStackParameter(arr) return toStackParameter(arr)
case bool: case bool:
result.Type = sc.BoolType res.Type = sc.BoolType
result.Value = v res.Value = v
default: default:
return result, wrapFrostFSError(fmt.Errorf("chain/client: unsupported parameter %v", value)) return res, wrapFrostFSError(fmt.Errorf("chain/client: unsupported parameter %v", value))
} }
return result, nil return res, nil
} }
// MagicNumber returns the magic number of the network // MagicNumber returns the magic number of the network
@ -480,7 +486,7 @@ func (c *Client) MsPerBlock() (res int64, err error) {
} }
// IsValidScript returns true if invocation script executes with HALT state. // IsValidScript returns true if invocation script executes with HALT state.
func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (res bool, err error) { func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (valid bool, err error) {
c.switchLock.RLock() c.switchLock.RLock()
defer c.switchLock.RUnlock() defer c.switchLock.RUnlock()
@ -488,12 +494,12 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (res
return false, ErrConnectionLost return false, ErrConnectionLost
} }
result, err := c.client.InvokeScript(script, signers) res, err := c.client.InvokeScript(script, signers)
if err != nil { if err != nil {
return false, fmt.Errorf("invokeScript: %w", err) return false, fmt.Errorf("invokeScript: %w", err)
} }
return result.State == vmstate.Halt.String(), nil return res.State == vmstate.Halt.String(), nil
} }
// NotificationChannel returns channel than receives subscribed // NotificationChannel returns channel than receives subscribed
@ -525,3 +531,14 @@ func (c *Client) setActor(act *actor.Actor) {
c.gasToken = nep17.New(act, gas.Hash) c.gasToken = nep17.New(act, gas.Hash)
c.rolemgmt = rolemgmt.New(act) c.rolemgmt = rolemgmt.New(act)
} }
// updateSubs updates subscription information, must be
// protected with switchLock.
func (c *Client) updateSubs(si subsInfo) {
c.blockRcv = si.blockRcv
c.notificationRcv = si.notificationRcv
c.notaryReqRcv = si.notaryReqRcv
c.subscribedEvents = si.subscribedEvents
c.subscribedNotaryEvents = si.subscribedNotaryEvents
}

View file

@ -9,8 +9,11 @@ import (
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger" "github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
@ -109,6 +112,9 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) {
cfg: *cfg, cfg: *cfg,
switchLock: &sync.RWMutex{}, switchLock: &sync.RWMutex{},
notifications: make(chan rpcclient.Notification), notifications: make(chan rpcclient.Notification),
blockRcv: make(chan *block.Block),
notificationRcv: make(chan *state.ContainedNotificationEvent),
notaryReqRcv: make(chan *result.NotaryRequestEvent),
subscribedEvents: make(map[util.Uint160]string), subscribedEvents: make(map[util.Uint160]string),
subscribedNotaryEvents: make(map[util.Uint160]string), subscribedNotaryEvents: make(map[util.Uint160]string),
closeChan: make(chan struct{}), closeChan: make(chan struct{}),

View file

@ -4,6 +4,11 @@ import (
"sort" "sort"
"time" "time"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -51,7 +56,8 @@ func (c *Client) switchRPC() bool {
c.logger.Info("connection to the new RPC node has been established", c.logger.Info("connection to the new RPC node has been established",
zap.String("endpoint", newEndpoint)) zap.String("endpoint", newEndpoint))
if !c.restoreSubscriptions(cli, newEndpoint) { subs, ok := c.restoreSubscriptions(cli, newEndpoint, false)
if !ok {
// new WS client does not allow // new WS client does not allow
// restoring subscription, client // restoring subscription, client
// could not work correctly => // could not work correctly =>
@ -63,6 +69,7 @@ func (c *Client) switchRPC() bool {
c.client = cli c.client = cli
c.setActor(act) c.setActor(act)
c.updateSubs(subs)
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() && if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority { c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
@ -77,9 +84,14 @@ func (c *Client) switchRPC() bool {
} }
func (c *Client) notificationLoop() { func (c *Client) notificationLoop() {
var e any
var ok bool
for { for {
c.switchLock.RLock() c.switchLock.RLock()
nChan := c.client.Notifications bChan := c.blockRcv
nChan := c.notificationRcv
nrChan := c.notaryReqRcv
c.switchLock.RUnlock() c.switchLock.RUnlock()
select { select {
@ -93,20 +105,55 @@ func (c *Client) notificationLoop() {
c.close() c.close()
return return
case n, ok := <-nChan: case e, ok = <-bChan:
// notification channel is used as a connection case e, ok = <-nChan:
// state: if it is closed, the connection is case e, ok = <-nrChan:
// considered to be lost }
if !ok {
if ok {
c.routeEvent(e)
continue
}
if !c.reconnect() {
return
}
}
}
func (c *Client) routeEvent(e any) {
typedNotification := rpcclient.Notification{Value: e}
switch e.(type) {
case *block.Block:
typedNotification.Type = neorpc.BlockEventID
case *state.ContainedNotificationEvent:
typedNotification.Type = neorpc.NotificationEventID
case *result.NotaryRequestEvent:
typedNotification.Type = neorpc.NotaryRequestEventID
}
select {
case c.notifications <- typedNotification:
case <-c.cfg.ctx.Done():
_ = c.UnsubscribeAll()
c.close()
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
}
}
func (c *Client) reconnect() bool {
if closeErr := c.client.GetError(); closeErr != nil { if closeErr := c.client.GetError(); closeErr != nil {
c.logger.Warn("switching to the next RPC node", c.logger.Warn("switching to the next RPC node",
zap.String("reason", closeErr.Error()), zap.String("reason", closeErr.Error()),
) )
} else { } else {
// neo-go client was closed by calling `Close` // neo-go client was closed by calling `Close`
// method that happens only when the client has // method, that happens only when a client has
// switched to the more prioritized RPC // switched to the more prioritized RPC
continue return true
} }
if !c.switchRPC() { if !c.switchRPC() {
@ -116,32 +163,14 @@ func (c *Client) notificationLoop() {
// switch client to inactive mode // switch client to inactive mode
c.inactiveMode() c.inactiveMode()
return return false
} }
// TODO(@carpawell): call here some callback retrieved in constructor // TODO(@carpawell): call here some callback retrieved in constructor
// of the client to allow checking chain state since during switch // of the client to allow checking chain state since during switch
// process some notification could be lost // process some notification could be lost
continue return true
}
select {
case c.notifications <- n:
continue
case <-c.cfg.ctx.Done():
_ = c.UnsubscribeAll()
c.close()
return
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
return
}
}
}
} }
func (c *Client) switchToMostPrioritized() { func (c *Client) switchToMostPrioritized() {
@ -156,11 +185,12 @@ mainLoop:
return return
case <-t.C: case <-t.C:
c.switchLock.RLock() c.switchLock.RLock()
endpointsCopy := make([]Endpoint, len(c.endpoints.list)) endpointsCopy := make([]Endpoint, len(c.endpoints.list))
copy(endpointsCopy, c.endpoints.list) copy(endpointsCopy, c.endpoints.list)
currPriority := c.endpoints.list[c.endpoints.curr].Priority currPriority := c.endpoints.list[c.endpoints.curr].Priority
highestPriority := c.endpoints.list[0].Priority highestPriority := c.endpoints.list[0].Priority
c.switchLock.RUnlock() c.switchLock.RUnlock()
if currPriority == highestPriority { if currPriority == highestPriority {
@ -186,7 +216,7 @@ mainLoop:
continue continue
} }
if c.restoreSubscriptions(cli, tryE) { if subs, ok := c.restoreSubscriptions(cli, tryE, true); ok {
c.switchLock.Lock() c.switchLock.Lock()
// higher priority node could have been // higher priority node could have been
@ -201,6 +231,7 @@ mainLoop:
c.cache.invalidate() c.cache.invalidate()
c.client = cli c.client = cli
c.setActor(act) c.setActor(act)
c.updateSubs(subs)
c.endpoints.curr = i c.endpoints.curr = i
c.switchLock.Unlock() c.switchLock.Unlock()

View file

@ -1,6 +1,10 @@
package client package client
import ( import (
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap" "go.uber.org/zap"
@ -36,7 +40,7 @@ func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error
return nil return nil
} }
id, err := c.client.SubscribeForExecutionNotifications(&contract, nil) id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv)
if err != nil { if err != nil {
return err return err
} }
@ -64,7 +68,7 @@ func (c *Client) SubscribeForNewBlocks() error {
return nil return nil
} }
_, err := c.client.SubscribeForNewBlocks(nil) _, err := c.client.ReceiveBlocks(nil, c.blockRcv)
if err != nil { if err != nil {
return err return err
} }
@ -99,7 +103,7 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
return nil return nil
} }
id, err := c.client.SubscribeForNotaryRequests(nil, &txSigner) id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv)
if err != nil { if err != nil {
return err return err
} }
@ -203,9 +207,25 @@ func (c *Client) UnsubscribeAll() error {
return nil return nil
} }
// restoreSubscriptions restores subscriptions according to type subsInfo struct {
// cached information about them. blockRcv chan *block.Block
func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) bool { notificationRcv chan *state.ContainedNotificationEvent
notaryReqRcv chan *result.NotaryRequestEvent
subscribedToBlocks bool
subscribedEvents map[util.Uint160]string
subscribedNotaryEvents map[util.Uint160]string
}
// restoreSubscriptions restores subscriptions according to cached
// information about them.
//
// If it is NOT a background operation switchLock MUST be held.
// Returns a pair: the second is a restoration status and the first
// one contains subscription information applied to the passed cli
// and receivers for the updated subscriptions.
// Does not change Client instance.
func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) {
var ( var (
err error err error
id string id string
@ -214,72 +234,109 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
blockRcv := make(chan *block.Block)
notificationRcv := make(chan *state.ContainedNotificationEvent)
notaryReqRcv := make(chan *result.NotaryRequestEvent)
// neo-go WS client says to _always_ read notifications // neo-go WS client says to _always_ read notifications
// from its channel. Subscribing to any notification // from its channel. Subscribing to any notification
// while not reading them in another goroutine may // while not reading them in another goroutine may
// lead to a dead-lock, thus that async side notification // lead to a dead-lock, thus that async side notification
// listening while restoring subscriptions // listening while restoring subscriptions
go func() { go func() {
var e any
var ok bool
for { for {
select { select {
case <-stopCh: case <-stopCh:
return return
case n, ok := <-cli.Notifications: case e, ok = <-blockRcv:
case e, ok = <-notificationRcv:
case e, ok = <-notaryReqRcv:
}
if !ok { if !ok {
return return
} }
c.notifications <- n if background {
// background client (test) switch, no need to send
// any notification, just preventing dead-lock
continue
} }
c.routeEvent(e)
} }
}() }()
if background {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
}
si.subscribedToBlocks = c.subscribedToNewBlocks
si.subscribedEvents = copySubsMap(c.subscribedEvents)
si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents)
si.blockRcv = blockRcv
si.notificationRcv = notificationRcv
si.notaryReqRcv = notaryReqRcv
// new block events restoration // new block events restoration
if c.subscribedToNewBlocks { if si.subscribedToBlocks {
_, err = cli.SubscribeForNewBlocks(nil) _, err = cli.ReceiveBlocks(nil, blockRcv)
if err != nil { if err != nil {
c.logger.Error("could not restore block subscription after RPC switch", c.logger.Error("could not restore block subscription after RPC switch",
zap.String("endpoint", endpoint), zap.String("endpoint", endpoint),
zap.Error(err), zap.Error(err),
) )
return false return
} }
} }
// notification events restoration // notification events restoration
for contract := range c.subscribedEvents { for contract := range si.subscribedEvents {
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890 contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.SubscribeForExecutionNotifications(&contract, nil) id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv)
if err != nil { if err != nil {
c.logger.Error("could not restore notification subscription after RPC switch", c.logger.Error("could not restore notification subscription after RPC switch",
zap.String("endpoint", endpoint), zap.String("endpoint", endpoint),
zap.Error(err), zap.Error(err),
) )
return false return
} }
c.subscribedEvents[contract] = id si.subscribedEvents[contract] = id
} }
// notary notification events restoration // notary notification events restoration
if c.notary != nil { if c.notary != nil {
for signer := range c.subscribedNotaryEvents { for signer := range si.subscribedNotaryEvents {
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890 signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.SubscribeForNotaryRequests(nil, &signer) id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv)
if err != nil { if err != nil {
c.logger.Error("could not restore notary notification subscription after RPC switch", c.logger.Error("could not restore notary notification subscription after RPC switch",
zap.String("endpoint", endpoint), zap.String("endpoint", endpoint),
zap.Error(err), zap.Error(err),
) )
return false return
} }
c.subscribedNotaryEvents[signer] = id si.subscribedNotaryEvents[signer] = id
} }
} }
return true return si, true
}
func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string {
newM := make(map[util.Uint160]string, len(m))
for k, v := range m {
newM[k] = v
}
return newM
} }