frostfs-sdk-go/pool/tree/pool.go

1100 lines
29 KiB
Go
Raw Permalink Normal View History

package tree
import (
"context"
"errors"
"fmt"
"io"
"sort"
"strings"
"sync"
"time"
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
const (
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
)
// SubTreeSort defines an order of nodes returned from GetSubTree RPC.
type SubTreeSort int32
const (
// NoneOrder does not specify order of nodes returned in GetSubTree RPC.
NoneOrder SubTreeSort = iota
// AscendingOrder specifies ascending alphabetical order of nodes based on FilePath attribute.
AscendingOrder
)
var (
// ErrNodeNotFound is returned from Tree service in case of not found error.
ErrNodeNotFound = errors.New("not found")
// ErrNodeAccessDenied is returned from Tree service in case of access denied error.
ErrNodeAccessDenied = errors.New("access denied")
// errNodeEmpty is used to trigger retry when 'GetNodeByPath' return empty result.
errNodeEmptyResult = errors.New("empty result")
)
// client represents virtual connection to the single FrostFS tree service from which Pool is formed.
// This interface is expected to have exactly one production implementation - treeClient.
// Others are expected to be for test purposes only.
type client interface {
serviceClient() (*rpcclient.Client, error)
endpoint() string
isHealthy() bool
setHealthy(bool)
dial(ctx context.Context) error
redialIfNecessary(context.Context) (bool, error)
close() error
}
// InitParameters contains values used to initialize connection Pool.
type InitParameters struct {
key *keys.PrivateKey
logger *zap.Logger
nodeDialTimeout time.Duration
nodeStreamTimeout time.Duration
healthcheckTimeout time.Duration
clientRebalanceInterval time.Duration
nodeParams []pool.NodeParam
dialOptions []grpc.DialOption
maxRequestAttempts int
netMapInfoSource NetMapInfoSource
}
type NetMapInfoSource interface {
NetMapSnapshot(ctx context.Context) (netmap.NetMap, error)
PlacementPolicy(ctx context.Context, cnrID cid.ID) (netmap.PlacementPolicy, error)
}
// Pool represents virtual connection to the FrostFS tree services network to communicate
// with multiple FrostFS tree services without thinking about switching between servers
// due to their unavailability.
//
// Pool can be created and initialized using NewPool function.
// Before executing the FrostFS tree operations using the Pool without netMapInfoSource, connection to the
// servers MUST BE correctly established (see Dial method).
type Pool struct {
innerPools []*innerPool
key *keys.PrivateKey
cancel context.CancelFunc
closedCh chan struct{}
rebalanceParams rebalanceParameters
dialOptions []grpc.DialOption
logger *zap.Logger
methods []*pool.MethodStatus
maxRequestAttempts int
streamTimeout time.Duration
nodeDialTimeout time.Duration
netMapInfoSource NetMapInfoSource
// mutex protects clientMap and startIndices
mutex sync.RWMutex
// clientMap will be used if netMapInfoSource is set
clientMap map[uint64]client
// startIndices points to the client from which the next request will be executed.
// Since clients are stored in innerPool field we have to use two indices.
// These indices being changed during:
// * rebalance procedure (see Pool.startRebalance)
// * retry in case of request failure (see Pool.requestWithRetry)
// startIndices will be used if netMapInfoSource is not set
startIndices [2]int
}
type innerPool struct {
clients []client
}
type rebalanceParameters struct {
nodesGroup [][]pool.NodeParam
nodeRequestTimeout time.Duration
clientRebalanceInterval time.Duration
}
// GetNodesParams groups parameters of Pool.GetNodes operation.
type GetNodesParams struct {
CID cid.ID
TreeID string
Path []string
Meta []string
PathAttribute string
LatestOnly bool
AllAttrs bool
BearerToken []byte
}
// GetSubTreeParams groups parameters of Pool.GetSubTree operation.
type GetSubTreeParams struct {
CID cid.ID
TreeID string
RootID []uint64
Depth uint32
BearerToken []byte
Order SubTreeSort
}
// AddNodeParams groups parameters of Pool.AddNode operation.
type AddNodeParams struct {
CID cid.ID
TreeID string
Parent uint64
Meta map[string]string
BearerToken []byte
}
// AddNodeByPathParams groups parameters of Pool.AddNodeByPath operation.
type AddNodeByPathParams struct {
CID cid.ID
TreeID string
Path []string
Meta map[string]string
PathAttribute string
BearerToken []byte
}
// MoveNodeParams groups parameters of Pool.MoveNode operation.
type MoveNodeParams struct {
CID cid.ID
TreeID string
NodeID uint64
ParentID uint64
Meta map[string]string
BearerToken []byte
}
// RemoveNodeParams groups parameters of Pool.RemoveNode operation.
type RemoveNodeParams struct {
CID cid.ID
TreeID string
NodeID uint64
BearerToken []byte
}
// MethodIndex index of method in list of statuses in Pool.
type MethodIndex int
const (
methodGetNodes MethodIndex = iota
methodGetSubTree
methodAddNode
methodAddNodeByPath
methodMoveNode
methodRemoveNode
methodLast
)
// String implements fmt.Stringer.
func (m MethodIndex) String() string {
switch m {
case methodGetNodes:
return "getNodes"
case methodAddNode:
return "addNode"
case methodGetSubTree:
return "getSubTree"
case methodAddNodeByPath:
return "addNodeByPath"
case methodMoveNode:
return "moveNode"
case methodRemoveNode:
return "removeNode"
default:
return "unknown"
}
}
// NewPool creates connection pool using parameters.
func NewPool(options InitParameters) (*Pool, error) {
if options.key == nil {
return nil, fmt.Errorf("missed required parameter 'Key'")
}
fillDefaultInitParams(&options)
methods := make([]*pool.MethodStatus, methodLast)
for i := methodGetNodes; i < methodLast; i++ {
methods[i] = pool.NewMethodStatus(i.String())
}
p := &Pool{
key: options.key,
logger: options.logger,
dialOptions: options.dialOptions,
rebalanceParams: rebalanceParameters{
nodeRequestTimeout: options.healthcheckTimeout,
clientRebalanceInterval: options.clientRebalanceInterval,
},
maxRequestAttempts: options.maxRequestAttempts,
streamTimeout: options.nodeStreamTimeout,
nodeDialTimeout: options.nodeDialTimeout,
methods: methods,
netMapInfoSource: options.netMapInfoSource,
clientMap: make(map[uint64]client),
}
if options.netMapInfoSource == nil {
nodesParams, err := adjustNodeParams(options.nodeParams)
if err != nil {
return nil, err
}
p.rebalanceParams.nodesGroup = nodesParams
}
return p, nil
}
// Dial may not be called and will have no effect if netMapInfoSource is set.
// See also InitParameters.SetNetMapInfoSource
//
// Otherwise, Dial establishes a connection to the tree servers from the FrostFS network.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed and netMapInfoSource is not set, the Pool SHOULD NOT be used.
//
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
if p.netMapInfoSource != nil {
return nil
}
inner := make([]*innerPool, len(p.rebalanceParams.nodesGroup))
var atLeastOneHealthy bool
for i, nodes := range p.rebalanceParams.nodesGroup {
clients := make([]client, len(nodes))
for j, node := range nodes {
clients[j] = newTreeClient(node.Address(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
if err := clients[j].dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
continue
}
atLeastOneHealthy = true
}
inner[i] = &innerPool{
clients: clients,
}
}
if !atLeastOneHealthy {
return fmt.Errorf("at least one node must be healthy")
}
ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel
p.closedCh = make(chan struct{})
p.innerPools = inner
go p.startRebalance(ctx)
return nil
}
// SetKey specifies default key to be used for the protocol communication by default.
func (x *InitParameters) SetKey(key *keys.PrivateKey) {
x.key = key
}
// SetLogger specifies logger.
func (x *InitParameters) SetLogger(logger *zap.Logger) {
x.logger = logger
}
// SetNodeDialTimeout specifies the timeout for connection to be established.
func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) {
x.nodeDialTimeout = timeout
}
// SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC.
func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) {
x.nodeStreamTimeout = timeout
}
// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.
//
// See also Pool.Dial.
func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
x.healthcheckTimeout = timeout
}
// SetClientRebalanceInterval specifies the interval for updating nodes health status.
//
// See also Pool.Dial.
func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
x.clientRebalanceInterval = interval
}
// AddNode append information about the node to which you want to connect.
func (x *InitParameters) AddNode(nodeParam pool.NodeParam) {
x.nodeParams = append(x.nodeParams, nodeParam)
}
// SetGRPCDialOptions sets the gRPC dial options for new gRPC tree client connection.
func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
x.dialOptions = opts
}
// SetMaxRequestAttempts sets the max attempt to make successful request.
// Default value is 0 that means the number of attempts equals to number of nodes in pool.
func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
x.maxRequestAttempts = maxAttempts
}
// SetNetMapInfoSource sets implementation of interface to get current net map and container placement policy.
// If set, AddNode will have no effect.
func (x *InitParameters) SetNetMapInfoSource(netMapInfoSource NetMapInfoSource) {
x.netMapInfoSource = netMapInfoSource
}
// GetNodes invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNodeByPathResponseInfo, error) {
body := new(tree.GetNodeByPathRequestBody)
body.SetContainerID(prm.CID)
body.SetTreeID(prm.TreeID)
body.SetPath(prm.Path)
body.SetAttributes(prm.Meta)
body.SetPathAttribute(prm.PathAttribute)
body.SetAllAttributes(prm.AllAttrs)
body.SetLatestOnly(prm.LatestOnly)
body.SetBearerToken(prm.BearerToken)
request := new(tree.GetNodeByPathRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return nil, err
}
var resp *tree.GetNodeByPathResponse
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx))
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after,
// to keep compatibility with 'GetNodeByPath' implementation.
if inErr == nil && len(resp.GetBody().GetNodes()) == 0 {
return errNodeEmptyResult
}
return handleError("failed to get node by path", inErr)
})
p.methods[methodGetNodes].IncRequests(time.Since(start))
if err != nil && !errors.Is(err, errNodeEmptyResult) {
return nil, err
}
return resp.GetBody().GetNodes(), nil
}
// SubTreeReader is designed to read list of subtree nodes FrostFS tree service.
//
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
type SubTreeReader struct {
cli *rpcapi.GetSubTreeResponseReader
}
// Read reads another list of the subtree nodes.
func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
for i := range buf {
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
return i, io.EOF
} else if err != nil {
return i, handleError("failed to get sub tree", err)
}
buf[i] = resp.GetBody()
}
return len(buf), nil
}
// ReadAll reads all nodes subtree nodes.
func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
var res []*tree.GetSubTreeResponseBody
for {
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
break
} else if err != nil {
return nil, handleError("failed to get sub tree", err)
}
res = append(res, resp.GetBody())
}
return res, nil
}
// Next gets the next node from subtree.
func (x *SubTreeReader) Next() (*tree.GetSubTreeResponseBody, error) {
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
return nil, io.EOF
}
if err != nil {
return nil, handleError("failed to get sub tree", err)
}
return resp.GetBody(), nil
}
// GetSubTree invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) {
body := new(tree.GetSubTreeRequestBody)
body.SetContainerID(prm.CID[:])
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetDepth(prm.Depth)
body.SetRootID(prm.RootID)
orderBy := new(tree.GetSubTreeRequestBodyOrder)
switch prm.Order {
case AscendingOrder:
orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderAsc)
default:
orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderNone)
}
body.SetOrderBy(orderBy)
request := new(tree.GetSubTreeRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return nil, err
}
var cli *rpcapi.GetSubTreeResponseReader
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
return handleError("failed to get sub tree client", inErr)
})
p.methods[methodGetSubTree].IncRequests(time.Since(start))
if err != nil {
return nil, err
}
return &SubTreeReader{cli: cli}, nil
}
// AddNode invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
body := new(tree.AddRequestBody)
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetContainerID(prm.CID[:])
body.SetMeta(metaToKV(prm.Meta))
body.SetParentID(prm.Parent)
request := new(tree.AddRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return 0, err
}
var resp *tree.AddResponse
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx))
return handleError("failed to add node", inErr)
})
p.methods[methodAddNode].IncRequests(time.Since(start))
if err != nil {
return 0, err
}
return resp.GetBody().GetNodeID(), nil
}
// AddNodeByPath invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
body := new(tree.AddByPathRequestBody)
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetContainerID(prm.CID[:])
body.SetMeta(metaToKV(prm.Meta))
body.SetPathAttribute(prm.PathAttribute)
body.SetPath(prm.Path)
request := new(tree.AddByPathRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return 0, err
}
var resp *tree.AddByPathResponse
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx))
return handleError("failed to add node by path", inErr)
})
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
if err != nil {
return 0, err
}
respBody := resp.GetBody()
if respBody == nil {
return 0, errors.New("nil body in tree service response")
} else if len(respBody.GetNodes()) == 0 {
return 0, errors.New("empty list of added nodes in tree service response")
}
// The first node is the leaf that we add, according to tree service docs.
return respBody.GetNodes()[0], nil
}
// MoveNode invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
body := new(tree.MoveRequestBody)
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetContainerID(prm.CID[:])
body.SetMeta(metaToKV(prm.Meta))
body.SetNodeID(prm.NodeID)
body.SetParentID(prm.ParentID)
request := new(tree.MoveRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return err
}
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error {
if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil {
return handleError("failed to move node", err)
}
return nil
})
p.methods[methodMoveNode].IncRequests(time.Since(start))
return err
}
// RemoveNode invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
body := new(tree.RemoveRequestBody)
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetContainerID(prm.CID[:])
body.SetNodeID(prm.NodeID)
request := new(tree.RemoveRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return err
}
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error {
if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil {
return handleError("failed to remove node", err)
}
return nil
})
p.methods[methodRemoveNode].IncRequests(time.Since(start))
return err
}
// Close closes the Pool and releases all the associated resources.
func (p *Pool) Close() error {
p.cancel()
<-p.closedCh
var err error
for _, group := range p.innerPools {
for _, cl := range group.clients {
if closeErr := cl.close(); closeErr != nil {
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
err = closeErr
}
}
}
if closeErr := p.closeClientMapConnections(); closeErr != nil {
err = closeErr
}
return err
}
func (p *Pool) closeClientMapConnections() (err error) {
p.mutex.Lock()
defer p.mutex.Unlock()
for _, cl := range p.clientMap {
if closeErr := cl.close(); closeErr != nil {
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
err = closeErr
}
}
return err
}
// Statistic returns tree pool statistics.
func (p *Pool) Statistic() Statistic {
stat := Statistic{make([]pool.StatusSnapshot, len(p.methods))}
for i, method := range p.methods {
stat.methods[i] = method.Snapshot()
method.Reset()
}
return stat
}
func handleError(msg string, err error) error {
if err == nil {
return nil
}
if strings.Contains(err.Error(), "not found") {
return fmt.Errorf("%w: %s", ErrNodeNotFound, err.Error())
} else if strings.Contains(err.Error(), "denied") {
return fmt.Errorf("%w: %s", ErrNodeAccessDenied, err.Error())
}
return fmt.Errorf("%s: %w", msg, err)
}
func metaToKV(meta map[string]string) []*tree.KeyValue {
result := make([]*tree.KeyValue, 0, len(meta))
for key, value := range meta {
kv := new(tree.KeyValue)
kv.SetKey(key)
kv.SetValue([]byte(value))
result = append(result, kv)
}
return result
}
func adjustNodeParams(nodeParams []pool.NodeParam) ([][]pool.NodeParam, error) {
if len(nodeParams) == 0 {
return nil, errors.New("no FrostFS peers configured")
}
nodeParamsMap := make(map[int][]pool.NodeParam)
for _, param := range nodeParams {
nodes := nodeParamsMap[param.Priority()]
nodeParamsMap[param.Priority()] = append(nodes, param)
}
res := make([][]pool.NodeParam, 0, len(nodeParamsMap))
for _, nodes := range nodeParamsMap {
res = append(res, nodes)
}
sort.Slice(res, func(i, j int) bool {
return res[i][0].Priority() < res[j][0].Priority()
})
return res, nil
}
func fillDefaultInitParams(params *InitParameters) {
if params.clientRebalanceInterval <= 0 {
params.clientRebalanceInterval = defaultRebalanceInterval
}
if params.healthcheckTimeout <= 0 {
params.healthcheckTimeout = defaultHealthcheckTimeout
}
if params.nodeDialTimeout <= 0 {
params.nodeDialTimeout = defaultDialTimeout
}
if params.nodeStreamTimeout <= 0 {
params.nodeStreamTimeout = defaultStreamTimeout
}
if params.maxRequestAttempts <= 0 {
params.maxRequestAttempts = len(params.nodeParams)
}
}
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
if p.logger == nil {
return
}
p.logger.Log(level, msg, fields...)
}
// startRebalance runs loop to monitor tree client healthy status.
func (p *Pool) startRebalance(ctx context.Context) {
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
buffers := make([][]bool, len(p.rebalanceParams.nodesGroup))
for i, nodes := range p.rebalanceParams.nodesGroup {
buffers[i] = make([]bool, len(nodes))
}
for {
select {
case <-ctx.Done():
close(p.closedCh)
return
case <-ticker.C:
p.updateNodesHealth(ctx, buffers)
ticker.Reset(p.rebalanceParams.clientRebalanceInterval)
}
}
}
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) {
wg := sync.WaitGroup{}
for i, inner := range p.innerPools {
wg.Add(1)
go func(i int, _ *innerPool) {
defer wg.Done()
p.updateInnerNodesHealth(ctx, i, buffers[i])
}(i, inner)
}
wg.Wait()
LOOP:
for i, buffer := range buffers {
for j, healthy := range buffer {
if healthy {
p.setStartIndices(i, j)
break LOOP
}
}
}
}
func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, buffer []bool) {
if i > len(p.innerPools)-1 {
return
}
nodesByPriority := p.innerPools[i]
options := p.rebalanceParams
var wg sync.WaitGroup
for j, cli := range nodesByPriority.clients {
wg.Add(1)
go func(j int, cli client) {
defer wg.Done()
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
defer c()
changed, err := cli.redialIfNecessary(tctx)
healthy := err == nil
if changed {
fields := []zap.Field{zap.String("address", cli.endpoint()), zap.Bool("healthy", healthy)}
if err != nil {
fields = append(fields, zap.Error(err))
}
p.log(zap.DebugLevel, "tree health has changed", fields...)
} else if err != nil {
p.log(zap.DebugLevel, "tree redial error", zap.String("address", cli.endpoint()), zap.Error(err))
}
buffer[j] = healthy
}(j, cli)
}
wg.Wait()
}
func (p *Pool) getStartIndices() (int, int) {
p.mutex.RLock()
defer p.mutex.RUnlock()
return p.startIndices[0], p.startIndices[1]
}
func (p *Pool) setStartIndices(i, j int) {
p.mutex.Lock()
p.startIndices[0] = i
p.startIndices[1] = j
p.mutex.Unlock()
}
func (p *Pool) requestWithRetry(ctx context.Context, cnrID cid.ID, fn func(client *rpcclient.Client) error) error {
if p.netMapInfoSource != nil {
return p.requestWithRetryContainerNodes(ctx, cnrID, fn)
}
var (
err, finErr error
cl *rpcclient.Client
)
reqID := GetRequestID(ctx)
startI, startJ := p.getStartIndices()
groupsLen := len(p.innerPools)
attempts := p.maxRequestAttempts
LOOP:
for i := startI; i < startI+groupsLen; i++ {
indexI := i % groupsLen
clientsLen := len(p.innerPools[indexI].clients)
for j := startJ; j < startJ+clientsLen; j++ {
indexJ := j % clientsLen
if attempts == 0 {
if startI != indexI || startJ != indexJ {
p.setStartIndices(indexI, indexJ)
}
break LOOP
}
attempts--
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
err = fn(cl)
}
if !shouldTryAgain(err) {
if startI != indexI || startJ != indexJ {
p.setStartIndices(indexI, indexJ)
}
if err != nil {
err = fmt.Errorf("address %s: %w", p.innerPools[indexI].clients[indexJ].endpoint(), err)
}
return err
}
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
}
startJ = 0
}
return finErr
}
func (p *Pool) requestWithRetryContainerNodes(ctx context.Context, cnrID cid.ID, fn func(client *rpcclient.Client) error) error {
var (
err, finErr error
cl *rpcclient.Client
)
reqID := GetRequestID(ctx)
netMap, err := p.netMapInfoSource.NetMapSnapshot(ctx)
if err != nil {
return fmt.Errorf("get net map: %w", err)
}
policy, err := p.netMapInfoSource.PlacementPolicy(ctx, cnrID)
if err != nil {
return fmt.Errorf("get container placement policy: %w", err)
}
cnrNodes, err := netMap.ContainerNodes(policy, cnrID[:])
if err != nil {
return fmt.Errorf("get container nodes: %w", err)
}
cnrNodes, err = netMap.PlacementVectors(cnrNodes, cnrID[:])
if err != nil {
return fmt.Errorf("get placement vectors: %w", err)
}
attempts := p.maxRequestAttempts
LOOP:
for _, cnrNodeGroup := range cnrNodes {
for _, cnrNode := range cnrNodeGroup {
if attempts == 0 {
break LOOP
}
treeCl, ok := p.getClientFromMap(cnrNode.Hash())
if !ok {
treeCl, err = p.getNewTreeClient(ctx, cnrNode)
if err != nil {
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "failed to create tree client", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts))
continue
}
p.addClientToMap(cnrNode.Hash(), treeCl)
}
attempts--
if cl, err = treeCl.serviceClient(); err == nil {
err = fn(cl)
}
if shouldRedial(ctx, err) {
p.deleteClientFromMap(cnrNode.Hash())
}
if !shouldTryAgain(err) {
if err != nil {
err = fmt.Errorf("address %s: %w", treeCl.endpoint(), err)
}
return err
}
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
zap.String("address", treeCl.endpoint()), zap.Error(err))
}
}
return finErr
}
func (p *Pool) getClientFromMap(hash uint64) (client, bool) {
p.mutex.RLock()
defer p.mutex.RUnlock()
cl, ok := p.clientMap[hash]
return cl, ok
}
func (p *Pool) addClientToMap(hash uint64, cl client) {
p.mutex.Lock()
p.clientMap[hash] = cl
p.mutex.Unlock()
}
func (p *Pool) deleteClientFromMap(hash uint64) {
p.mutex.Lock()
_ = p.clientMap[hash].close()
delete(p.clientMap, hash)
p.mutex.Unlock()
}
func (p *Pool) getNewTreeClient(ctx context.Context, node netmap.NodeInfo) (*treeClient, error) {
var (
treeCl *treeClient
err error
)
node.IterateNetworkEndpoints(func(endpoint string) bool {
var addr network.Address
if err = addr.FromString(endpoint); err != nil {
p.log(zap.WarnLevel, "can't parse endpoint", zap.String("endpoint", endpoint), zap.Error(err))
return false
}
newTreeCl := newTreeClient(addr.URIAddr(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
if err = newTreeCl.dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
return false
}
treeCl = newTreeCl
return true
})
if treeCl == nil {
return nil, fmt.Errorf("tree client wasn't initialized")
}
return treeCl, nil
}
func shouldTryAgain(err error) bool {
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
}
func shouldRedial(ctx context.Context, err error) bool {
if err == nil || errors.Is(err, ErrNodeAccessDenied) || errors.Is(err, ErrNodeNotFound) || errors.Is(err, errNodeEmptyResult) || errors.Is(ctx.Err(), context.Canceled) {
return false
}
return true
}
func prioErr(err error) int {
switch {
case err == nil:
return -1
case errors.Is(err, ErrNodeAccessDenied):
return 100
case errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, errNodeEmptyResult):
return 200
case errors.Is(err, ErrUnhealthyEndpoint):
return 300
default:
return 500
}
}
func finalError(current, candidate error) error {
if current == nil || candidate == nil {
return candidate
}
// lower priority error is more desirable
if prioErr(candidate) < prioErr(current) {
return candidate
}
return current
}
type reqKeyType struct{}
// SetRequestID sets request identifier to context so when some operations are logged in tree pool
// this identifier also be logged.
func SetRequestID(ctx context.Context, reqID string) context.Context {
return context.WithValue(ctx, reqKeyType{}, reqID)
}
// GetRequestID fetch tree pool request identifier from context.
func GetRequestID(ctx context.Context) string {
reqID, _ := ctx.Value(reqKeyType{}).(string)
return reqID
}