[#1615] morph: Switch to a more prioritized RPC node

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
support/v0.34
Pavel Karpy 2022-08-22 19:36:41 +03:00 committed by fyrchik
parent 7c0aa69d11
commit 17f7d0a2ee
4 changed files with 115 additions and 16 deletions

View File

@ -58,6 +58,7 @@ command.
- `neofs-adm morph set-config` now supports well-known `MaintenanceModeAllowed` key (#1892) - `neofs-adm morph set-config` now supports well-known `MaintenanceModeAllowed` key (#1892)
- `add`, `get-by-path` and `add-by-path` tree service CLI commands (#1332) - `add`, `get-by-path` and `add-by-path` tree service CLI commands (#1332)
- Tree synchronisation on startup (#1329) - Tree synchronisation on startup (#1329)
- Morph client returns to the highest priority endpoint after the switch (#1615)
### Changed ### Changed
- Allow to evacuate shard data with `EvacuateShard` control RPC (#1800) - Allow to evacuate shard data with `EvacuateShard` control RPC (#1800)

View File

@ -24,6 +24,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate" "github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -81,6 +82,11 @@ type Client struct {
// establish connection to any of the // establish connection to any of the
// provided RPC endpoints // provided RPC endpoints
inactive bool inactive bool
// indicates that Client has already started
// goroutine that tries to switch to the higher
// priority RPC node
switchIsActive atomic.Bool
} }
type cache struct { type cache struct {

View File

@ -2,6 +2,7 @@ package client
import ( import (
"sort" "sort"
"time"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -46,14 +47,11 @@ func (c *Client) switchRPC() bool {
} }
c.cache.invalidate() c.cache.invalidate()
c.client = cli
c.rpcActor = act
c.gasToken = gas
c.logger.Info("connection to the new RPC node has been established", c.logger.Info("connection to the new RPC node has been established",
zap.String("endpoint", newEndpoint)) zap.String("endpoint", newEndpoint))
if !c.restoreSubscriptions(newEndpoint) { if !c.restoreSubscriptions(cli, newEndpoint) {
// new WS client does not allow // new WS client does not allow
// restoring subscription, client // restoring subscription, client
// could not work correctly => // could not work correctly =>
@ -63,6 +61,16 @@ func (c *Client) switchRPC() bool {
continue continue
} }
c.client = cli
c.rpcActor = act
c.gasToken = gas
if !c.switchIsActive.Load() &&
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
c.switchIsActive.Store(true)
go c.switchToMostPrioritized()
}
return true return true
} }
@ -71,6 +79,10 @@ func (c *Client) switchRPC() bool {
func (c *Client) notificationLoop() { func (c *Client) notificationLoop() {
for { for {
c.switchLock.RLock()
nChan := c.client.Notifications
c.switchLock.RUnlock()
select { select {
case <-c.cfg.ctx.Done(): case <-c.cfg.ctx.Done():
_ = c.UnsubscribeAll() _ = c.UnsubscribeAll()
@ -82,22 +94,22 @@ func (c *Client) notificationLoop() {
c.close() c.close()
return return
case n, ok := <-c.client.Notifications: case n, ok := <-nChan:
// notification channel is used as a connection // notification channel is used as a connection
// state: if it is closed, the connection is // state: if it is closed, the connection is
// considered to be lost // considered to be lost
if !ok { if !ok {
var closeReason string
if closeErr := c.client.GetError(); closeErr != nil { if closeErr := c.client.GetError(); closeErr != nil {
closeReason = closeErr.Error() c.logger.Warn("switching to the next RPC node",
zap.String("reason", closeErr.Error()),
)
} else { } else {
closeReason = "unknown" // neo-go client was closed by calling `Close`
// method that happens only when the client has
// switched to the more prioritized RPC
continue
} }
c.logger.Warn("switching to the next RPC node",
zap.String("reason", closeReason),
)
if !c.switchRPC() { if !c.switchRPC() {
c.logger.Error("could not establish connection to any RPC node") c.logger.Error("could not establish connection to any RPC node")
@ -120,6 +132,85 @@ func (c *Client) notificationLoop() {
} }
} }
func (c *Client) switchToMostPrioritized() {
const period = 2 * time.Minute
t := time.NewTicker(period)
defer t.Stop()
defer c.switchIsActive.Store(false)
mainLoop:
for {
select {
case <-c.cfg.ctx.Done():
return
case <-t.C:
c.switchLock.RLock()
endpointsCopy := make([]Endpoint, len(c.endpoints.list))
copy(endpointsCopy, c.endpoints.list)
currPriority := c.endpoints.list[c.endpoints.curr].Priority
highestPriority := c.endpoints.list[0].Priority
c.switchLock.RUnlock()
if currPriority == highestPriority {
// already connected to
// the most prioritized
return
}
for i, e := range endpointsCopy {
if currPriority == e.Priority {
// a switch will not increase the priority
continue mainLoop
}
tryE := e.Address
cli, act, gas, err := c.newCli(tryE)
if err != nil {
c.logger.Warn("could not create client to the higher priority node",
zap.String("endpoint", tryE),
zap.Error(err),
)
continue
}
if c.restoreSubscriptions(cli, tryE) {
c.switchLock.Lock()
// higher priority node could have been
// connected in the other goroutine
if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority {
cli.Close()
c.switchLock.Unlock()
return
}
c.client.Close()
c.cache.invalidate()
c.client = cli
c.rpcActor = act
c.gasToken = gas
c.endpoints.curr = i
c.switchLock.Unlock()
c.logger.Info("switched to the higher priority RPC",
zap.String("endpoint", tryE))
return
}
c.logger.Warn("could not restore side chain subscriptions using node",
zap.String("endpoint", tryE),
zap.Error(err),
)
}
}
}
}
// close closes notification channel and wrapped WS client. // close closes notification channel and wrapped WS client.
func (c *Client) close() { func (c *Client) close() {
close(c.notifications) close(c.notifications)

View File

@ -1,6 +1,7 @@
package client package client
import ( import (
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -204,7 +205,7 @@ func (c *Client) UnsubscribeAll() error {
// restoreSubscriptions restores subscriptions according to // restoreSubscriptions restores subscriptions according to
// cached information about them. // cached information about them.
func (c *Client) restoreSubscriptions(endpoint string) bool { func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) bool {
var ( var (
err error err error
id string id string
@ -212,7 +213,7 @@ func (c *Client) restoreSubscriptions(endpoint string) bool {
// new block events restoration // new block events restoration
if c.subscribedToNewBlocks { if c.subscribedToNewBlocks {
_, err = c.client.SubscribeForNewBlocks(nil) _, err = cli.SubscribeForNewBlocks(nil)
if err != nil { if err != nil {
c.logger.Error("could not restore block subscription after RPC switch", c.logger.Error("could not restore block subscription after RPC switch",
zap.String("endpoint", endpoint), zap.String("endpoint", endpoint),
@ -225,7 +226,7 @@ func (c *Client) restoreSubscriptions(endpoint string) bool {
// notification events restoration // notification events restoration
for contract := range c.subscribedEvents { for contract := range c.subscribedEvents {
id, err = c.client.SubscribeForExecutionNotifications(&contract, nil) id, err = cli.SubscribeForExecutionNotifications(&contract, nil)
if err != nil { if err != nil {
c.logger.Error("could not restore notification subscription after RPC switch", c.logger.Error("could not restore notification subscription after RPC switch",
zap.String("endpoint", endpoint), zap.String("endpoint", endpoint),
@ -241,7 +242,7 @@ func (c *Client) restoreSubscriptions(endpoint string) bool {
// notary notification events restoration // notary notification events restoration
if c.notary != nil { if c.notary != nil {
for signer := range c.subscribedNotaryEvents { for signer := range c.subscribedNotaryEvents {
id, err = c.client.SubscribeForNotaryRequests(nil, &signer) id, err = cli.SubscribeForNotaryRequests(nil, &signer)
if err != nil { if err != nil {
c.logger.Error("could not restore notary notification subscription after RPC switch", c.logger.Error("could not restore notary notification subscription after RPC switch",
zap.String("endpoint", endpoint), zap.String("endpoint", endpoint),