Alex Vanin
363f153eaf
With new revision of tree service protocol, getSubTree requires to use explicit order field. Signed-off-by: Alex Vanin <a.vanin@yadro.com>
755 lines
20 KiB
Go
755 lines
20 KiB
Go
package tree
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
|
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
const (
|
|
defaultRebalanceInterval = 15 * time.Second
|
|
defaultHealthcheckTimeout = 4 * time.Second
|
|
defaultDialTimeout = 5 * time.Second
|
|
defaultStreamTimeout = 10 * time.Second
|
|
)
|
|
|
|
var (
|
|
// ErrNodeNotFound is returned from Tree service in case of not found error.
|
|
ErrNodeNotFound = errors.New("not found")
|
|
|
|
// ErrNodeAccessDenied is returned from Tree service in case of access denied error.
|
|
ErrNodeAccessDenied = errors.New("access denied")
|
|
)
|
|
|
|
// client represents virtual connection to the single FrostFS tree service from which Pool is formed.
|
|
// This interface is expected to have exactly one production implementation - treeClient.
|
|
// Others are expected to be for test purposes only.
|
|
type client interface {
|
|
serviceClient() (grpcService.TreeServiceClient, error)
|
|
endpoint() string
|
|
isHealthy() bool
|
|
setHealthy(bool)
|
|
dial(ctx context.Context) error
|
|
redialIfNecessary(context.Context) (bool, error)
|
|
close() error
|
|
}
|
|
|
|
// InitParameters contains values used to initialize connection Pool.
|
|
type InitParameters struct {
|
|
key *keys.PrivateKey
|
|
logger *zap.Logger
|
|
nodeDialTimeout time.Duration
|
|
nodeStreamTimeout time.Duration
|
|
healthcheckTimeout time.Duration
|
|
clientRebalanceInterval time.Duration
|
|
nodeParams []pool.NodeParam
|
|
dialOptions []grpc.DialOption
|
|
}
|
|
|
|
// Pool represents virtual connection to the FrostFS tree services network to communicate
|
|
// with multiple FrostFS tree services without thinking about switching between servers
|
|
// due to their unavailability.
|
|
//
|
|
// Pool can be created and initialized using NewPool function.
|
|
// Before executing the FrostFS tree operations using the Pool, connection to the
|
|
// servers MUST BE correctly established (see Dial method).
|
|
type Pool struct {
|
|
innerPools []*innerPool
|
|
key *keys.PrivateKey
|
|
cancel context.CancelFunc
|
|
closedCh chan struct{}
|
|
rebalanceParams rebalanceParameters
|
|
dialOptions []grpc.DialOption
|
|
logger *zap.Logger
|
|
|
|
startIndicesMtx sync.RWMutex
|
|
// startIndices points to the client from which the next request will be executed.
|
|
// Since clients are stored in innerPool field we have to use two indices.
|
|
// These indices being changed during:
|
|
// * rebalance procedure (see Pool.startRebalance)
|
|
// * retry in case of request failure (see Pool.requestWithRetry)
|
|
startIndices [2]int
|
|
}
|
|
|
|
type innerPool struct {
|
|
clients []client
|
|
}
|
|
|
|
type rebalanceParameters struct {
|
|
nodesGroup [][]pool.NodeParam
|
|
nodeRequestTimeout time.Duration
|
|
clientRebalanceInterval time.Duration
|
|
}
|
|
|
|
// GetNodesParams groups parameters of Pool.GetNodes operation.
|
|
type GetNodesParams struct {
|
|
CID cid.ID
|
|
TreeID string
|
|
Path []string
|
|
Meta []string
|
|
PathAttribute string
|
|
LatestOnly bool
|
|
AllAttrs bool
|
|
BearerToken []byte
|
|
}
|
|
|
|
// GetSubTreeParams groups parameters of Pool.GetSubTree operation.
|
|
type GetSubTreeParams struct {
|
|
CID cid.ID
|
|
TreeID string
|
|
RootID uint64
|
|
Depth uint32
|
|
BearerToken []byte
|
|
}
|
|
|
|
// AddNodeParams groups parameters of Pool.AddNode operation.
|
|
type AddNodeParams struct {
|
|
CID cid.ID
|
|
TreeID string
|
|
Parent uint64
|
|
Meta map[string]string
|
|
BearerToken []byte
|
|
}
|
|
|
|
// AddNodeByPathParams groups parameters of Pool.AddNodeByPath operation.
|
|
type AddNodeByPathParams struct {
|
|
CID cid.ID
|
|
TreeID string
|
|
Path []string
|
|
Meta map[string]string
|
|
PathAttribute string
|
|
BearerToken []byte
|
|
}
|
|
|
|
// MoveNodeParams groups parameters of Pool.MoveNode operation.
|
|
type MoveNodeParams struct {
|
|
CID cid.ID
|
|
TreeID string
|
|
NodeID uint64
|
|
ParentID uint64
|
|
Meta map[string]string
|
|
BearerToken []byte
|
|
}
|
|
|
|
// RemoveNodeParams groups parameters of Pool.RemoveNode operation.
|
|
type RemoveNodeParams struct {
|
|
CID cid.ID
|
|
TreeID string
|
|
NodeID uint64
|
|
BearerToken []byte
|
|
}
|
|
|
|
// NewPool creates connection pool using parameters.
|
|
func NewPool(options InitParameters) (*Pool, error) {
|
|
if options.key == nil {
|
|
return nil, fmt.Errorf("missed required parameter 'Key'")
|
|
}
|
|
|
|
nodesParams, err := adjustNodeParams(options.nodeParams)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fillDefaultInitParams(&options)
|
|
|
|
p := &Pool{
|
|
key: options.key,
|
|
logger: options.logger,
|
|
dialOptions: options.dialOptions,
|
|
rebalanceParams: rebalanceParameters{
|
|
nodesGroup: nodesParams,
|
|
nodeRequestTimeout: options.healthcheckTimeout,
|
|
clientRebalanceInterval: options.clientRebalanceInterval,
|
|
},
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
// Dial establishes a connection to the tree servers from the FrostFS network.
|
|
// It also starts a routine that checks the health of the nodes and
|
|
// updates the weights of the nodes for balancing.
|
|
// Returns an error describing failure reason.
|
|
//
|
|
// If failed, the Pool SHOULD NOT be used.
|
|
//
|
|
// See also InitParameters.SetClientRebalanceInterval.
|
|
func (p *Pool) Dial(ctx context.Context) error {
|
|
inner := make([]*innerPool, len(p.rebalanceParams.nodesGroup))
|
|
var atLeastOneHealthy bool
|
|
|
|
for i, nodes := range p.rebalanceParams.nodesGroup {
|
|
clients := make([]client, len(nodes))
|
|
for j, node := range nodes {
|
|
clients[j] = newTreeClient(node.Address(), p.dialOptions...)
|
|
if err := clients[j].dial(ctx); err != nil {
|
|
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
atLeastOneHealthy = true
|
|
}
|
|
|
|
inner[i] = &innerPool{
|
|
clients: clients,
|
|
}
|
|
}
|
|
|
|
if !atLeastOneHealthy {
|
|
return fmt.Errorf("at least one node must be healthy")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
p.cancel = cancel
|
|
p.closedCh = make(chan struct{})
|
|
p.innerPools = inner
|
|
|
|
go p.startRebalance(ctx)
|
|
return nil
|
|
}
|
|
|
|
// SetKey specifies default key to be used for the protocol communication by default.
|
|
func (x *InitParameters) SetKey(key *keys.PrivateKey) {
|
|
x.key = key
|
|
}
|
|
|
|
// SetLogger specifies logger.
|
|
func (x *InitParameters) SetLogger(logger *zap.Logger) {
|
|
x.logger = logger
|
|
}
|
|
|
|
// SetNodeDialTimeout specifies the timeout for connection to be established.
|
|
func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) {
|
|
x.nodeDialTimeout = timeout
|
|
}
|
|
|
|
// SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC.
|
|
func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) {
|
|
x.nodeStreamTimeout = timeout
|
|
}
|
|
|
|
// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.
|
|
//
|
|
// See also Pool.Dial.
|
|
func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
|
|
x.healthcheckTimeout = timeout
|
|
}
|
|
|
|
// SetClientRebalanceInterval specifies the interval for updating nodes health status.
|
|
//
|
|
// See also Pool.Dial.
|
|
func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
|
|
x.clientRebalanceInterval = interval
|
|
}
|
|
|
|
// AddNode append information about the node to which you want to connect.
|
|
func (x *InitParameters) AddNode(nodeParam pool.NodeParam) {
|
|
x.nodeParams = append(x.nodeParams, nodeParam)
|
|
}
|
|
|
|
// SetGRPCDialOptions sets the gRPC dial options for new gRPC tree client connection.
|
|
func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
|
|
x.dialOptions = opts
|
|
}
|
|
|
|
// GetNodes invokes eponymous method from TreeServiceClient.
|
|
//
|
|
// Can return predefined errors:
|
|
// * ErrNodeNotFound
|
|
// * ErrNodeAccessDenied.
|
|
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) {
|
|
request := &grpcService.GetNodeByPathRequest{
|
|
Body: &grpcService.GetNodeByPathRequest_Body{
|
|
ContainerId: prm.CID[:],
|
|
TreeId: prm.TreeID,
|
|
Path: prm.Path,
|
|
Attributes: prm.Meta,
|
|
PathAttribute: prm.PathAttribute,
|
|
LatestOnly: prm.LatestOnly,
|
|
AllAttributes: prm.AllAttrs,
|
|
BearerToken: prm.BearerToken,
|
|
},
|
|
}
|
|
|
|
if err := p.signRequest(request.Body, func(key, sign []byte) {
|
|
request.Signature = &grpcService.Signature{
|
|
Key: key,
|
|
Sign: sign,
|
|
}
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var resp *grpcService.GetNodeByPathResponse
|
|
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
|
resp, inErr = client.GetNodeByPath(ctx, request)
|
|
return handleError("failed to get node by path", inErr)
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp.GetBody().GetNodes(), nil
|
|
}
|
|
|
|
// SubTreeReader is designed to read list of subtree nodes FrostFS tree service.
|
|
//
|
|
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
|
|
type SubTreeReader struct {
|
|
cli grpcService.TreeService_GetSubTreeClient
|
|
}
|
|
|
|
// Read reads another list of the subtree nodes.
|
|
func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, error) {
|
|
for i := 0; i < len(buf); i++ {
|
|
resp, err := x.cli.Recv()
|
|
if err == io.EOF {
|
|
return i, io.EOF
|
|
} else if err != nil {
|
|
return i, handleError("failed to get sub tree", err)
|
|
}
|
|
buf[i] = resp.Body
|
|
}
|
|
|
|
return len(buf), nil
|
|
}
|
|
|
|
// ReadAll reads all nodes subtree nodes.
|
|
func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) {
|
|
var res []*grpcService.GetSubTreeResponse_Body
|
|
for {
|
|
resp, err := x.cli.Recv()
|
|
if err == io.EOF {
|
|
break
|
|
} else if err != nil {
|
|
return nil, handleError("failed to get sub tree", err)
|
|
}
|
|
res = append(res, resp.Body)
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
// Next gets the next node from subtree.
|
|
func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) {
|
|
resp, err := x.cli.Recv()
|
|
if err == io.EOF {
|
|
return nil, io.EOF
|
|
}
|
|
if err != nil {
|
|
return nil, handleError("failed to get sub tree", err)
|
|
}
|
|
|
|
return resp.Body, nil
|
|
}
|
|
|
|
// GetSubTree invokes eponymous method from TreeServiceClient.
|
|
//
|
|
// Can return predefined errors:
|
|
// * ErrNodeNotFound
|
|
// * ErrNodeAccessDenied.
|
|
func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) {
|
|
request := &grpcService.GetSubTreeRequest{
|
|
Body: &grpcService.GetSubTreeRequest_Body{
|
|
ContainerId: prm.CID[:],
|
|
TreeId: prm.TreeID,
|
|
RootId: prm.RootID,
|
|
Depth: prm.Depth,
|
|
BearerToken: prm.BearerToken,
|
|
OrderBy: &grpcService.GetSubTreeRequest_Body_Order{
|
|
Direction: grpcService.GetSubTreeRequest_Body_Order_Asc,
|
|
},
|
|
},
|
|
}
|
|
|
|
if err := p.signRequest(request.Body, func(key, sign []byte) {
|
|
request.Signature = &grpcService.Signature{
|
|
Key: key,
|
|
Sign: sign,
|
|
}
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var cli grpcService.TreeService_GetSubTreeClient
|
|
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
|
cli, inErr = client.GetSubTree(ctx, request)
|
|
return handleError("failed to get sub tree client", inErr)
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &SubTreeReader{cli: cli}, nil
|
|
}
|
|
|
|
// AddNode invokes eponymous method from TreeServiceClient.
|
|
//
|
|
// Can return predefined errors:
|
|
// * ErrNodeNotFound
|
|
// * ErrNodeAccessDenied.
|
|
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
|
request := &grpcService.AddRequest{
|
|
Body: &grpcService.AddRequest_Body{
|
|
ContainerId: prm.CID[:],
|
|
TreeId: prm.TreeID,
|
|
ParentId: prm.Parent,
|
|
Meta: metaToKV(prm.Meta),
|
|
BearerToken: prm.BearerToken,
|
|
},
|
|
}
|
|
if err := p.signRequest(request.Body, func(key, sign []byte) {
|
|
request.Signature = &grpcService.Signature{
|
|
Key: key,
|
|
Sign: sign,
|
|
}
|
|
}); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
var resp *grpcService.AddResponse
|
|
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
|
resp, inErr = client.Add(ctx, request)
|
|
return handleError("failed to add node", inErr)
|
|
}); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return resp.GetBody().GetNodeId(), nil
|
|
}
|
|
|
|
// AddNodeByPath invokes eponymous method from TreeServiceClient.
|
|
//
|
|
// Can return predefined errors:
|
|
// * ErrNodeNotFound
|
|
// * ErrNodeAccessDenied.
|
|
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
|
|
request := &grpcService.AddByPathRequest{
|
|
Body: &grpcService.AddByPathRequest_Body{
|
|
ContainerId: prm.CID[:],
|
|
TreeId: prm.TreeID,
|
|
Path: prm.Path,
|
|
Meta: metaToKV(prm.Meta),
|
|
PathAttribute: prm.PathAttribute,
|
|
BearerToken: prm.BearerToken,
|
|
},
|
|
}
|
|
|
|
if err := p.signRequest(request.Body, func(key, sign []byte) {
|
|
request.Signature = &grpcService.Signature{
|
|
Key: key,
|
|
Sign: sign,
|
|
}
|
|
}); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
var resp *grpcService.AddByPathResponse
|
|
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
|
resp, inErr = client.AddByPath(ctx, request)
|
|
return handleError("failed to add node by path", inErr)
|
|
}); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
body := resp.GetBody()
|
|
if body == nil {
|
|
return 0, errors.New("nil body in tree service response")
|
|
} else if len(body.Nodes) == 0 {
|
|
return 0, errors.New("empty list of added nodes in tree service response")
|
|
}
|
|
|
|
// The first node is the leaf that we add, according to tree service docs.
|
|
return body.Nodes[0], nil
|
|
}
|
|
|
|
// MoveNode invokes eponymous method from TreeServiceClient.
|
|
//
|
|
// Can return predefined errors:
|
|
// * ErrNodeNotFound
|
|
// * ErrNodeAccessDenied.
|
|
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
|
request := &grpcService.MoveRequest{
|
|
Body: &grpcService.MoveRequest_Body{
|
|
ContainerId: prm.CID[:],
|
|
TreeId: prm.TreeID,
|
|
NodeId: prm.NodeID,
|
|
ParentId: prm.ParentID,
|
|
Meta: metaToKV(prm.Meta),
|
|
BearerToken: prm.BearerToken,
|
|
},
|
|
}
|
|
|
|
if err := p.signRequest(request.Body, func(key, sign []byte) {
|
|
request.Signature = &grpcService.Signature{
|
|
Key: key,
|
|
Sign: sign,
|
|
}
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
|
if _, err := client.Move(ctx, request); err != nil {
|
|
return handleError("failed to move node", err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// RemoveNode invokes eponymous method from TreeServiceClient.
|
|
//
|
|
// Can return predefined errors:
|
|
// * ErrNodeNotFound
|
|
// * ErrNodeAccessDenied.
|
|
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
|
request := &grpcService.RemoveRequest{
|
|
Body: &grpcService.RemoveRequest_Body{
|
|
ContainerId: prm.CID[:],
|
|
TreeId: prm.TreeID,
|
|
NodeId: prm.NodeID,
|
|
BearerToken: prm.BearerToken,
|
|
},
|
|
}
|
|
if err := p.signRequest(request.Body, func(key, sign []byte) {
|
|
request.Signature = &grpcService.Signature{
|
|
Key: key,
|
|
Sign: sign,
|
|
}
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
|
if _, err := client.Remove(ctx, request); err != nil {
|
|
return handleError("failed to remove node", err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Close closes the Pool and releases all the associated resources.
|
|
func (p *Pool) Close() error {
|
|
p.cancel()
|
|
<-p.closedCh
|
|
|
|
var err error
|
|
for _, group := range p.innerPools {
|
|
for _, cl := range group.clients {
|
|
if closeErr := cl.close(); closeErr != nil {
|
|
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
|
|
err = closeErr
|
|
}
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func handleError(msg string, err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if strings.Contains(err.Error(), "not found") {
|
|
return fmt.Errorf("%w: %s", ErrNodeNotFound, err.Error())
|
|
} else if strings.Contains(err.Error(), "is denied by") {
|
|
return fmt.Errorf("%w: %s", ErrNodeAccessDenied, err.Error())
|
|
}
|
|
return fmt.Errorf("%s: %w", msg, err)
|
|
}
|
|
|
|
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
|
|
result := make([]*grpcService.KeyValue, 0, len(meta))
|
|
|
|
for key, value := range meta {
|
|
result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)})
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func adjustNodeParams(nodeParams []pool.NodeParam) ([][]pool.NodeParam, error) {
|
|
if len(nodeParams) == 0 {
|
|
return nil, errors.New("no FrostFS peers configured")
|
|
}
|
|
|
|
nodeParamsMap := make(map[int][]pool.NodeParam)
|
|
for _, param := range nodeParams {
|
|
nodes := nodeParamsMap[param.Priority()]
|
|
nodeParamsMap[param.Priority()] = append(nodes, param)
|
|
}
|
|
|
|
res := make([][]pool.NodeParam, 0, len(nodeParamsMap))
|
|
for _, nodes := range nodeParamsMap {
|
|
res = append(res, nodes)
|
|
}
|
|
|
|
sort.Slice(res, func(i, j int) bool {
|
|
return res[i][0].Priority() < res[j][0].Priority()
|
|
})
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func fillDefaultInitParams(params *InitParameters) {
|
|
if params.clientRebalanceInterval <= 0 {
|
|
params.clientRebalanceInterval = defaultRebalanceInterval
|
|
}
|
|
|
|
if params.healthcheckTimeout <= 0 {
|
|
params.healthcheckTimeout = defaultHealthcheckTimeout
|
|
}
|
|
|
|
if params.nodeDialTimeout <= 0 {
|
|
params.nodeDialTimeout = defaultDialTimeout
|
|
}
|
|
|
|
if params.nodeStreamTimeout <= 0 {
|
|
params.nodeStreamTimeout = defaultStreamTimeout
|
|
}
|
|
}
|
|
|
|
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
|
if p.logger == nil {
|
|
return
|
|
}
|
|
|
|
p.logger.Log(level, msg, fields...)
|
|
}
|
|
|
|
// startRebalance runs loop to monitor tree client healthy status.
|
|
func (p *Pool) startRebalance(ctx context.Context) {
|
|
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
|
|
buffers := make([][]bool, len(p.rebalanceParams.nodesGroup))
|
|
for i, nodes := range p.rebalanceParams.nodesGroup {
|
|
buffers[i] = make([]bool, len(nodes))
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
close(p.closedCh)
|
|
return
|
|
case <-ticker.C:
|
|
p.updateNodesHealth(ctx, buffers)
|
|
ticker.Reset(p.rebalanceParams.clientRebalanceInterval)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) {
|
|
wg := sync.WaitGroup{}
|
|
for i, inner := range p.innerPools {
|
|
wg.Add(1)
|
|
|
|
go func(i int, innerPool *innerPool) {
|
|
defer wg.Done()
|
|
p.updateInnerNodesHealth(ctx, i, buffers[i])
|
|
}(i, inner)
|
|
}
|
|
wg.Wait()
|
|
|
|
LOOP:
|
|
for i, buffer := range buffers {
|
|
for j, healthy := range buffer {
|
|
if healthy {
|
|
p.setStartIndices(i, j)
|
|
break LOOP
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, buffer []bool) {
|
|
if i > len(p.innerPools)-1 {
|
|
return
|
|
}
|
|
nodesByPriority := p.innerPools[i]
|
|
options := p.rebalanceParams
|
|
|
|
var wg sync.WaitGroup
|
|
for j, cli := range nodesByPriority.clients {
|
|
wg.Add(1)
|
|
go func(j int, cli client) {
|
|
defer wg.Done()
|
|
|
|
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
|
|
defer c()
|
|
|
|
changed, err := cli.redialIfNecessary(tctx)
|
|
healthy := err == nil
|
|
if changed {
|
|
fields := []zap.Field{zap.String("address", cli.endpoint()), zap.Bool("healthy", healthy)}
|
|
if err != nil {
|
|
fields = append(fields, zap.Error(err))
|
|
}
|
|
p.log(zap.DebugLevel, "tree health has changed", fields...)
|
|
} else if err != nil {
|
|
p.log(zap.DebugLevel, "tree redial error", zap.String("address", cli.endpoint()), zap.Error(err))
|
|
}
|
|
buffer[j] = healthy
|
|
}(j, cli)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (p *Pool) getStartIndices() (int, int) {
|
|
p.startIndicesMtx.RLock()
|
|
defer p.startIndicesMtx.RUnlock()
|
|
|
|
return p.startIndices[0], p.startIndices[1]
|
|
}
|
|
|
|
func (p *Pool) setStartIndices(i, j int) {
|
|
p.startIndicesMtx.Lock()
|
|
p.startIndices[0] = i
|
|
p.startIndices[1] = j
|
|
p.startIndicesMtx.Unlock()
|
|
}
|
|
|
|
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
|
|
var (
|
|
err error
|
|
cl grpcService.TreeServiceClient
|
|
)
|
|
|
|
startI, startJ := p.getStartIndices()
|
|
groupsLen := len(p.innerPools)
|
|
for i := startI; i < startI+groupsLen; i++ {
|
|
indexI := i % groupsLen
|
|
clientsLen := len(p.innerPools[indexI].clients)
|
|
for j := startJ; j < startJ+clientsLen; j++ {
|
|
indexJ := j % clientsLen
|
|
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
|
|
err = fn(cl)
|
|
}
|
|
if !shouldTryAgain(err) {
|
|
if startI != indexI || startJ != indexJ {
|
|
p.setStartIndices(indexI, indexJ)
|
|
}
|
|
return err
|
|
}
|
|
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
|
}
|
|
startJ = 0
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func shouldTryAgain(err error) bool {
|
|
return !(err == nil ||
|
|
errors.Is(err, ErrNodeNotFound) ||
|
|
errors.Is(err, ErrNodeAccessDenied))
|
|
}
|