[#1560] morph/client: Perform RPC switch and restore in one step

Otherwise we could switch infinitely if subscription restore has failed.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-06-29 16:02:07 +03:00 committed by fyrchik
parent 6358f4d746
commit 0ccea802e9
4 changed files with 25 additions and 31 deletions

View file

@ -54,9 +54,11 @@ type Client struct {
cfg cfg cfg cfg
endpoints *endpoints endpoints endpoints
// switching between rpc endpoint lock // switchLock protects endpoints, inactive, and subscription-related fields.
// It is taken exclusively during endpoint switch and locked in shared mode
// on every normal call.
switchLock *sync.RWMutex switchLock *sync.RWMutex
// channel for ws notifications // channel for ws notifications

View file

@ -111,7 +111,7 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error)
// they will be used in switch process, otherwise // they will be used in switch process, otherwise
// inactive mode will be enabled // inactive mode will be enabled
cli.client = cfg.singleCli cli.client = cfg.singleCli
cli.endpoints = newEndpoints(cfg.extraEndpoints) cli.endpoints.init(cfg.extraEndpoints)
} else { } else {
ws, err := newWSClient(*cfg, endpoint) ws, err := newWSClient(*cfg, endpoint)
if err != nil { if err != nil {
@ -124,7 +124,7 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error)
} }
cli.client = ws cli.client = ws
cli.endpoints = newEndpoints(append([]string{endpoint}, cfg.extraEndpoints...)) cli.endpoints.init(append([]string{endpoint}, cfg.extraEndpoints...))
} }
go cli.notificationLoop() go cli.notificationLoop()

View file

@ -9,11 +9,9 @@ type endpoints struct {
list []string list []string
} }
func newEndpoints(ee []string) *endpoints { func (e *endpoints) init(ee []string) {
return &endpoints{ e.curr = 0
curr: 0, e.list = ee
list: ee,
}
} }
// next returns the next endpoint and its index // next returns the next endpoint and its index
@ -90,6 +88,19 @@ func (c *Client) switchRPC() bool {
c.cache.invalidate() c.cache.invalidate()
c.client = cli c.client = cli
c.logger.Info("connection to the new RPC node has been established",
zap.String("endpoint", newEndpoint))
if !c.restoreSubscriptions(newEndpoint) {
// new WS client does not allow
// restoring subscription, client
// could not work correctly =>
// closing connection to RPC node
// to switch to another one
cli.Close()
continue
}
return true return true
} }
} }
@ -133,21 +144,6 @@ func (c *Client) notificationLoop() {
return return
} }
newEndpoint, _ := c.endpoints.current()
c.logger.Warn("connection to the new RPC node has been established",
zap.String("endpoint", newEndpoint),
)
if !c.restoreSubscriptions() {
// new WS client does not allow
// restoring subscription, client
// could not work correctly =>
// closing connection to RPC node
// to switch to another one
c.client.Close()
}
// TODO(@carpawell): call here some callback retrieved in constructor // TODO(@carpawell): call here some callback retrieved in constructor
// of the client to allow checking chain state since during switch // of the client to allow checking chain state since during switch
// process some notification could be lost // process some notification could be lost

View file

@ -204,16 +204,12 @@ func (c *Client) UnsubscribeAll() error {
// restoreSubscriptions restores subscriptions according to // restoreSubscriptions restores subscriptions according to
// cached information about them. // cached information about them.
func (c *Client) restoreSubscriptions() bool { func (c *Client) restoreSubscriptions(endpoint string) bool {
var ( var (
err error err error
id string id string
endpoint, _ = c.endpoints.current()
) )
c.switchLock.Lock()
defer c.switchLock.Unlock()
// new block events restoration // new block events restoration
if c.subscribedToNewBlocks { if c.subscribedToNewBlocks {
_, err = c.client.SubscribeForNewBlocks(nil) _, err = c.client.SubscribeForNewBlocks(nil)