Evgenii Stratonikov
c98357606b
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 28s
DCO action / DCO (pull_request) Successful in 42s
Vulncheck / Vulncheck (pull_request) Successful in 1m0s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m33s
Build / Build Components (pull_request) Successful in 1m46s
Tests and linters / Staticcheck (pull_request) Successful in 2m0s
Tests and linters / Lint (pull_request) Successful in 2m55s
Tests and linters / Tests (pull_request) Successful in 4m17s
Tests and linters / Tests with -race (pull_request) Successful in 5m14s
Tests and linters / gopls check (pull_request) Successful in 5m17s
Tests and linters / Run gofumpt (push) Successful in 27s
Vulncheck / Vulncheck (push) Successful in 51s
Pre-commit hooks / Pre-commit (push) Successful in 1m25s
Build / Build Components (push) Successful in 2m10s
Tests and linters / Staticcheck (push) Successful in 2m25s
Tests and linters / Lint (push) Successful in 3m24s
Tests and linters / Tests (push) Successful in 4m4s
OCI image / Build container images (push) Successful in 4m21s
Tests and linters / Tests with -race (push) Successful in 4m25s
Tests and linters / gopls check (push) Successful in 4m49s
gopatch: ``` @@ var from, to expression @@ +import "bytes" -to := make([]byte, len(from)) -copy(to, from) +to := bytes.Clone(from) @@ var from, to expression @@ +import "bytes" -to = make([]byte, len(from)) -copy(to, from) +to = bytes.Clone(from) @@ var from, to, typ expression @@ +import "slices" -to := make([]typ, len(from)) -copy(to, from) +to := slices.Clone(from) @@ var from, to, typ expression @@ +import "slices" -to = make([]typ, len(from)) -copy(to, from) +to = slices.Clone(from) ``` Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
164 lines
3.3 KiB
Go
164 lines
3.3 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"slices"
|
|
"sort"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Endpoint represents morph endpoint together with its priority.
|
|
type Endpoint struct {
|
|
Address string
|
|
Priority int
|
|
MTLSConfig *MTLSConfig
|
|
}
|
|
|
|
type endpoints struct {
|
|
curr int
|
|
list []Endpoint
|
|
}
|
|
|
|
func (e *endpoints) init(ee []Endpoint) {
|
|
sort.SliceStable(ee, func(i, j int) bool {
|
|
return ee[i].Priority < ee[j].Priority
|
|
})
|
|
|
|
e.curr = 0
|
|
e.list = ee
|
|
}
|
|
|
|
// SwitchRPC performs reconnection and returns true if it was successful.
|
|
func (c *Client) SwitchRPC(ctx context.Context) bool {
|
|
c.switchLock.Lock()
|
|
defer c.switchLock.Unlock()
|
|
|
|
c.client.Close()
|
|
|
|
// Iterate endpoints in the order of decreasing priority.
|
|
for c.endpoints.curr = range c.endpoints.list {
|
|
newEndpoint := c.endpoints.list[c.endpoints.curr]
|
|
cli, act, err := c.newCli(ctx, newEndpoint)
|
|
if err != nil {
|
|
c.logger.Warn(ctx, logs.ClientCouldNotEstablishConnectionToTheSwitchedRPCNode,
|
|
zap.String("endpoint", newEndpoint.Address),
|
|
zap.Error(err),
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
c.cache.invalidate()
|
|
|
|
c.logger.Info(ctx, logs.ClientConnectionToTheNewRPCNodeHasBeenEstablished,
|
|
zap.String("endpoint", newEndpoint.Address))
|
|
|
|
c.client = cli
|
|
c.setActor(act)
|
|
|
|
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
|
|
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
|
|
c.switchIsActive.Store(true)
|
|
go c.switchToMostPrioritized(ctx)
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
c.inactive = true
|
|
|
|
if c.cfg.inactiveModeCb != nil {
|
|
c.cfg.inactiveModeCb()
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (c *Client) closeWaiter(ctx context.Context) {
|
|
c.wg.Add(1)
|
|
defer c.wg.Done()
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-c.closeChan:
|
|
}
|
|
_ = c.UnsubscribeAll()
|
|
c.close()
|
|
}
|
|
|
|
func (c *Client) switchToMostPrioritized(ctx context.Context) {
|
|
t := time.NewTicker(c.cfg.switchInterval)
|
|
defer t.Stop()
|
|
defer c.switchIsActive.Store(false)
|
|
|
|
mainLoop:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
c.switchLock.RLock()
|
|
|
|
endpointsCopy := slices.Clone(c.endpoints.list)
|
|
currPriority := c.endpoints.list[c.endpoints.curr].Priority
|
|
highestPriority := c.endpoints.list[0].Priority
|
|
|
|
c.switchLock.RUnlock()
|
|
|
|
if currPriority == highestPriority {
|
|
// already connected to
|
|
// the most prioritized
|
|
return
|
|
}
|
|
|
|
for i, e := range endpointsCopy {
|
|
if currPriority == e.Priority {
|
|
// a switch will not increase the priority
|
|
continue mainLoop
|
|
}
|
|
|
|
tryE := e.Address
|
|
|
|
cli, act, err := c.newCli(ctx, e)
|
|
if err != nil {
|
|
c.logger.Warn(ctx, logs.ClientCouldNotCreateClientToTheHigherPriorityNode,
|
|
zap.String("endpoint", tryE),
|
|
zap.Error(err),
|
|
)
|
|
continue
|
|
}
|
|
|
|
c.switchLock.Lock()
|
|
|
|
// higher priority node could have been
|
|
// connected in the other goroutine
|
|
if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority {
|
|
cli.Close()
|
|
c.switchLock.Unlock()
|
|
return
|
|
}
|
|
|
|
c.client.Close()
|
|
c.cache.invalidate()
|
|
c.client = cli
|
|
c.setActor(act)
|
|
c.endpoints.curr = i
|
|
|
|
c.switchLock.Unlock()
|
|
|
|
c.logger.Info(ctx, logs.ClientSwitchedToTheHigherPriorityRPC,
|
|
zap.String("endpoint", tryE))
|
|
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// close closes notification channel and wrapped WS client.
|
|
func (c *Client) close() {
|
|
c.switchLock.RLock()
|
|
defer c.switchLock.RUnlock()
|
|
c.client.Close()
|
|
}
|