frostfs-node/pkg/morph/client/multi.go
Evgenii Stratonikov 0e31c12e63 [#240] logs: Move log messages to constants
Drop duplicate entities.
Format entities.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-14 05:06:09 +00:00

260 lines
5.6 KiB
Go

package client
import (
"context"
"sort"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"go.uber.org/zap"
)
// Endpoint represents morph endpoint together with its priority.
type Endpoint struct {
Address string
Priority int
}
type endpoints struct {
curr int
list []Endpoint
}
func (e *endpoints) init(ee []Endpoint) {
sort.SliceStable(ee, func(i, j int) bool {
return ee[i].Priority < ee[j].Priority
})
e.curr = 0
e.list = ee
}
func (c *Client) switchRPC(ctx context.Context) bool {
c.switchLock.Lock()
defer c.switchLock.Unlock()
c.client.Close()
// Iterate endpoints in the order of decreasing priority.
for c.endpoints.curr = range c.endpoints.list {
newEndpoint := c.endpoints.list[c.endpoints.curr].Address
cli, act, err := c.newCli(ctx, newEndpoint)
if err != nil {
c.logger.Warn(logs.ClientCouldNotEstablishConnectionToTheSwitchedRPCNode,
zap.String("endpoint", newEndpoint),
zap.Error(err),
)
continue
}
c.cache.invalidate()
c.logger.Info(logs.ClientConnectionToTheNewRPCNodeHasBeenEstablished,
zap.String("endpoint", newEndpoint))
subs, ok := c.restoreSubscriptions(ctx, cli, newEndpoint, false)
if !ok {
// new WS client does not allow
// restoring subscription, client
// could not work correctly =>
// closing connection to RPC node
// to switch to another one
cli.Close()
continue
}
c.client = cli
c.setActor(act)
c.subsInfo = subs
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
c.switchIsActive.Store(true)
go c.switchToMostPrioritized(ctx)
}
return true
}
return false
}
func (c *Client) notificationLoop(ctx context.Context) {
var e any
var ok bool
for {
c.switchLock.RLock()
bChan := c.blockRcv
nChan := c.notificationRcv
nrChan := c.notaryReqRcv
c.switchLock.RUnlock()
select {
case <-ctx.Done():
_ = c.UnsubscribeAll()
c.close()
return
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
return
case e, ok = <-bChan:
case e, ok = <-nChan:
case e, ok = <-nrChan:
}
if ok {
c.routeEvent(ctx, e)
continue
}
if !c.reconnect(ctx) {
return
}
}
}
func (c *Client) routeEvent(ctx context.Context, e any) {
typedNotification := rpcclient.Notification{Value: e}
switch e.(type) {
case *block.Block:
typedNotification.Type = neorpc.BlockEventID
case *state.ContainedNotificationEvent:
typedNotification.Type = neorpc.NotificationEventID
case *result.NotaryRequestEvent:
typedNotification.Type = neorpc.NotaryRequestEventID
}
select {
case c.notifications <- typedNotification:
case <-ctx.Done():
_ = c.UnsubscribeAll()
c.close()
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
}
}
func (c *Client) reconnect(ctx context.Context) bool {
if closeErr := c.client.GetError(); closeErr != nil {
c.logger.Warn(logs.ClientSwitchingToTheNextRPCNode,
zap.String("reason", closeErr.Error()),
)
} else {
// neo-go client was closed by calling `Close`
// method, that happens only when a client has
// switched to the more prioritized RPC
return true
}
if !c.switchRPC(ctx) {
c.logger.Error(logs.ClientCouldNotEstablishConnectionToAnyRPCNode)
// could not connect to all endpoints =>
// switch client to inactive mode
c.inactiveMode()
return false
}
// TODO(@carpawell): call here some callback retrieved in constructor
// of the client to allow checking chain state since during switch
// process some notification could be lost
return true
}
func (c *Client) switchToMostPrioritized(ctx context.Context) {
t := time.NewTicker(c.cfg.switchInterval)
defer t.Stop()
defer c.switchIsActive.Store(false)
mainLoop:
for {
select {
case <-ctx.Done():
return
case <-t.C:
c.switchLock.RLock()
endpointsCopy := make([]Endpoint, len(c.endpoints.list))
copy(endpointsCopy, c.endpoints.list)
currPriority := c.endpoints.list[c.endpoints.curr].Priority
highestPriority := c.endpoints.list[0].Priority
c.switchLock.RUnlock()
if currPriority == highestPriority {
// already connected to
// the most prioritized
return
}
for i, e := range endpointsCopy {
if currPriority == e.Priority {
// a switch will not increase the priority
continue mainLoop
}
tryE := e.Address
cli, act, err := c.newCli(ctx, tryE)
if err != nil {
c.logger.Warn(logs.ClientCouldNotCreateClientToTheHigherPriorityNode,
zap.String("endpoint", tryE),
zap.Error(err),
)
continue
}
if subs, ok := c.restoreSubscriptions(ctx, cli, tryE, true); ok {
c.switchLock.Lock()
// higher priority node could have been
// connected in the other goroutine
if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority {
cli.Close()
c.switchLock.Unlock()
return
}
c.client.Close()
c.cache.invalidate()
c.client = cli
c.setActor(act)
c.subsInfo = subs
c.endpoints.curr = i
c.switchLock.Unlock()
c.logger.Info(logs.ClientSwitchedToTheHigherPriorityRPC,
zap.String("endpoint", tryE))
return
}
c.logger.Warn(logs.ClientCouldNotRestoreSideChainSubscriptionsUsingNode,
zap.String("endpoint", tryE),
zap.Error(err),
)
}
}
}
}
// close closes notification channel and wrapped WS client.
func (c *Client) close() {
close(c.notifications)
c.client.Close()
}