diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index ef81c0175..dd5ad73fc 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -54,9 +54,11 @@ type Client struct { cfg cfg - endpoints *endpoints + endpoints endpoints - // switching between rpc endpoint lock + // switchLock protects endpoints, inactive, and subscription-related fields. + // It is taken exclusively during endpoint switch and locked in shared mode + // on every normal call. switchLock *sync.RWMutex // channel for ws notifications diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index eff396f95..3d9151a98 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -111,7 +111,7 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error) // they will be used in switch process, otherwise // inactive mode will be enabled cli.client = cfg.singleCli - cli.endpoints = newEndpoints(cfg.extraEndpoints) + cli.endpoints.init(cfg.extraEndpoints) } else { ws, err := newWSClient(*cfg, endpoint) if err != nil { @@ -124,7 +124,7 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error) } cli.client = ws - cli.endpoints = newEndpoints(append([]string{endpoint}, cfg.extraEndpoints...)) + cli.endpoints.init(append([]string{endpoint}, cfg.extraEndpoints...)) } go cli.notificationLoop() diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index 053967dc3..e35ed3c4c 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -9,11 +9,9 @@ type endpoints struct { list []string } -func newEndpoints(ee []string) *endpoints { - return &endpoints{ - curr: 0, - list: ee, - } +func (e *endpoints) init(ee []string) { + e.curr = 0 + e.list = ee } // next returns the next endpoint and its index @@ -90,6 +88,19 @@ func (c *Client) switchRPC() bool { c.cache.invalidate() c.client = cli + c.logger.Info("connection to the new RPC node has been established", + zap.String("endpoint", newEndpoint)) + + if !c.restoreSubscriptions(newEndpoint) { + // new WS client does not allow + // restoring subscription, client + // could not work correctly => + // closing connection to RPC node + // to switch to another one + cli.Close() + continue + } + return true } } @@ -133,21 +144,6 @@ func (c *Client) notificationLoop() { return } - newEndpoint, _ := c.endpoints.current() - - c.logger.Warn("connection to the new RPC node has been established", - zap.String("endpoint", newEndpoint), - ) - - if !c.restoreSubscriptions() { - // new WS client does not allow - // restoring subscription, client - // could not work correctly => - // closing connection to RPC node - // to switch to another one - c.client.Close() - } - // TODO(@carpawell): call here some callback retrieved in constructor // of the client to allow checking chain state since during switch // process some notification could be lost diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index 6f5184db7..83a7e2d64 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -204,16 +204,12 @@ func (c *Client) UnsubscribeAll() error { // restoreSubscriptions restores subscriptions according to // cached information about them. -func (c *Client) restoreSubscriptions() bool { +func (c *Client) restoreSubscriptions(endpoint string) bool { var ( - err error - id string - endpoint, _ = c.endpoints.current() + err error + id string ) - c.switchLock.Lock() - defer c.switchLock.Unlock() - // new block events restoration if c.subscribedToNewBlocks { _, err = c.client.SubscribeForNewBlocks(nil)