From 17f7d0a2ee82be6a1e51a7cf84771b835161ea1b Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 22 Aug 2022 19:36:41 +0300 Subject: [PATCH] [#1615] morph: Switch to a more prioritized RPC node Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + pkg/morph/client/client.go | 6 ++ pkg/morph/client/multi.go | 115 ++++++++++++++++++++++++++---- pkg/morph/client/notifications.go | 9 +-- 4 files changed, 115 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ae617c42..153a85fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ command. - `neofs-adm morph set-config` now supports well-known `MaintenanceModeAllowed` key (#1892) - `add`, `get-by-path` and `add-by-path` tree service CLI commands (#1332) - Tree synchronisation on startup (#1329) +- Morph client returns to the highest priority endpoint after the switch (#1615) ### Changed - Allow to evacuate shard data with `EvacuateShard` control RPC (#1800) diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index 24cc54c7..272cb254 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -24,6 +24,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/vm/vmstate" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -81,6 +82,11 @@ type Client struct { // establish connection to any of the // provided RPC endpoints inactive bool + + // indicates that Client has already started + // goroutine that tries to switch to the higher + // priority RPC node + switchIsActive atomic.Bool } type cache struct { diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index 16fef685..711c73f9 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -2,6 +2,7 @@ package client import ( "sort" + "time" "go.uber.org/zap" ) @@ -46,14 +47,11 @@ func (c *Client) switchRPC() bool { } c.cache.invalidate() - c.client = cli - c.rpcActor = act - c.gasToken = gas c.logger.Info("connection to the new RPC node has been established", zap.String("endpoint", newEndpoint)) - if !c.restoreSubscriptions(newEndpoint) { + if !c.restoreSubscriptions(cli, newEndpoint) { // new WS client does not allow // restoring subscription, client // could not work correctly => @@ -63,6 +61,16 @@ func (c *Client) switchRPC() bool { continue } + c.client = cli + c.rpcActor = act + c.gasToken = gas + + if !c.switchIsActive.Load() && + c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority { + c.switchIsActive.Store(true) + go c.switchToMostPrioritized() + } + return true } @@ -71,6 +79,10 @@ func (c *Client) switchRPC() bool { func (c *Client) notificationLoop() { for { + c.switchLock.RLock() + nChan := c.client.Notifications + c.switchLock.RUnlock() + select { case <-c.cfg.ctx.Done(): _ = c.UnsubscribeAll() @@ -82,22 +94,22 @@ func (c *Client) notificationLoop() { c.close() return - case n, ok := <-c.client.Notifications: + case n, ok := <-nChan: // notification channel is used as a connection // state: if it is closed, the connection is // considered to be lost if !ok { - var closeReason string if closeErr := c.client.GetError(); closeErr != nil { - closeReason = closeErr.Error() + c.logger.Warn("switching to the next RPC node", + zap.String("reason", closeErr.Error()), + ) } else { - closeReason = "unknown" + // neo-go client was closed by calling `Close` + // method that happens only when the client has + // switched to the more prioritized RPC + continue } - c.logger.Warn("switching to the next RPC node", - zap.String("reason", closeReason), - ) - if !c.switchRPC() { c.logger.Error("could not establish connection to any RPC node") @@ -120,6 +132,85 @@ func (c *Client) notificationLoop() { } } +func (c *Client) switchToMostPrioritized() { + const period = 2 * time.Minute + + t := time.NewTicker(period) + defer t.Stop() + defer c.switchIsActive.Store(false) + +mainLoop: + for { + select { + case <-c.cfg.ctx.Done(): + return + case <-t.C: + c.switchLock.RLock() + endpointsCopy := make([]Endpoint, len(c.endpoints.list)) + copy(endpointsCopy, c.endpoints.list) + + currPriority := c.endpoints.list[c.endpoints.curr].Priority + highestPriority := c.endpoints.list[0].Priority + c.switchLock.RUnlock() + + if currPriority == highestPriority { + // already connected to + // the most prioritized + return + } + + for i, e := range endpointsCopy { + if currPriority == e.Priority { + // a switch will not increase the priority + continue mainLoop + } + + tryE := e.Address + + cli, act, gas, err := c.newCli(tryE) + if err != nil { + c.logger.Warn("could not create client to the higher priority node", + zap.String("endpoint", tryE), + zap.Error(err), + ) + continue + } + + if c.restoreSubscriptions(cli, tryE) { + c.switchLock.Lock() + + // higher priority node could have been + // connected in the other goroutine + if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority { + cli.Close() + c.switchLock.Unlock() + return + } + + c.client.Close() + c.cache.invalidate() + c.client = cli + c.rpcActor = act + c.gasToken = gas + c.endpoints.curr = i + + c.switchLock.Unlock() + + c.logger.Info("switched to the higher priority RPC", + zap.String("endpoint", tryE)) + + return + } + + c.logger.Warn("could not restore side chain subscriptions using node", + zap.String("endpoint", tryE), + zap.Error(err), + ) + } + } + } +} + // close closes notification channel and wrapped WS client. func (c *Client) close() { close(c.notifications) diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index 83a7e2d6..34f87f39 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -1,6 +1,7 @@ package client import ( + "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) @@ -204,7 +205,7 @@ func (c *Client) UnsubscribeAll() error { // restoreSubscriptions restores subscriptions according to // cached information about them. -func (c *Client) restoreSubscriptions(endpoint string) bool { +func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) bool { var ( err error id string @@ -212,7 +213,7 @@ func (c *Client) restoreSubscriptions(endpoint string) bool { // new block events restoration if c.subscribedToNewBlocks { - _, err = c.client.SubscribeForNewBlocks(nil) + _, err = cli.SubscribeForNewBlocks(nil) if err != nil { c.logger.Error("could not restore block subscription after RPC switch", zap.String("endpoint", endpoint), @@ -225,7 +226,7 @@ func (c *Client) restoreSubscriptions(endpoint string) bool { // notification events restoration for contract := range c.subscribedEvents { - id, err = c.client.SubscribeForExecutionNotifications(&contract, nil) + id, err = cli.SubscribeForExecutionNotifications(&contract, nil) if err != nil { c.logger.Error("could not restore notification subscription after RPC switch", zap.String("endpoint", endpoint), @@ -241,7 +242,7 @@ func (c *Client) restoreSubscriptions(endpoint string) bool { // notary notification events restoration if c.notary != nil { for signer := range c.subscribedNotaryEvents { - id, err = c.client.SubscribeForNotaryRequests(nil, &signer) + id, err = cli.SubscribeForNotaryRequests(nil, &signer) if err != nil { c.logger.Error("could not restore notary notification subscription after RPC switch", zap.String("endpoint", endpoint),