frostfs-node/pkg/morph/client/multi.go
Evgenii Stratonikov 30c7925b3c [#1609] morph/client: Retry connecting to failed endpoint during the switch
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
2022-07-21 16:08:42 +03:00

137 lines
2.7 KiB
Go

package client
import (
"sort"
"go.uber.org/zap"
)
// Endpoint represents morph endpoint together with its priority.
type Endpoint struct {
Address string
Priority int
}
type endpoints struct {
curr int
list []Endpoint
}
func (e *endpoints) init(ee []Endpoint) {
sort.SliceStable(ee, func(i, j int) bool {
return ee[i].Priority > ee[j].Priority
})
e.curr = 0
e.list = ee
}
func (c *Client) switchRPC() bool {
c.switchLock.Lock()
defer c.switchLock.Unlock()
c.client.Close()
// Iterate endpoints in the order of decreasing priority.
// Skip the current endpoint.
for c.endpoints.curr = range c.endpoints.list {
newEndpoint := c.endpoints.list[c.endpoints.curr].Address
cli, err := newWSClient(c.cfg, newEndpoint)
if err != nil {
c.logger.Warn("could not establish connection to the switched RPC node",
zap.String("endpoint", newEndpoint),
zap.Error(err),
)
continue
}
err = cli.Init()
if err != nil {
cli.Close()
c.logger.Warn("could not init the switched RPC node",
zap.String("endpoint", newEndpoint),
zap.Error(err),
)
continue
}
c.cache.invalidate()
c.client = cli
c.logger.Info("connection to the new RPC node has been established",
zap.String("endpoint", newEndpoint))
if !c.restoreSubscriptions(newEndpoint) {
// new WS client does not allow
// restoring subscription, client
// could not work correctly =>
// closing connection to RPC node
// to switch to another one
cli.Close()
continue
}
return true
}
return false
}
func (c *Client) notificationLoop() {
for {
select {
case <-c.cfg.ctx.Done():
_ = c.UnsubscribeAll()
c.close()
return
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
return
case n, ok := <-c.client.Notifications:
// notification channel is used as a connection
// state: if it is closed, the connection is
// considered to be lost
if !ok {
var closeReason string
if closeErr := c.client.GetError(); closeErr != nil {
closeReason = closeErr.Error()
} else {
closeReason = "unknown"
}
c.logger.Warn("switching to the next RPC node",
zap.String("reason", closeReason),
)
if !c.switchRPC() {
c.logger.Error("could not establish connection to any RPC node")
// could not connect to all endpoints =>
// switch client to inactive mode
c.inactiveMode()
return
}
// TODO(@carpawell): call here some callback retrieved in constructor
// of the client to allow checking chain state since during switch
// process some notification could be lost
continue
}
c.notifications <- n
}
}
}
// close closes notification channel and wrapped WS client
func (c *Client) close() {
close(c.notifications)
c.client.Close()
}