frostfs-sdk-go/pool/tree/pool.go
Alex Vanin 37350dbb1e [#326] pool: Fix panic that causes mutex deadlock
Two concurrent 'deleteClientFromMap' calls for
the same client may produce panic and deadlock.

First goroutine acquires lock, removes element
from the map, releases lock.

Second goroutine acquires lock, and throws panic
while trying to call 'close()' on empty struct.
Lock is never released and it causes deadlock for
other goroutines.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-01-29 20:14:55 +03:00

1101 lines
29 KiB
Go

package tree
import (
"context"
"errors"
"fmt"
"io"
"sort"
"strings"
"sync"
"time"
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
const (
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
)
// SubTreeSort defines an order of nodes returned from GetSubTree RPC.
type SubTreeSort int32
const (
// NoneOrder does not specify order of nodes returned in GetSubTree RPC.
NoneOrder SubTreeSort = iota
// AscendingOrder specifies ascending alphabetical order of nodes based on FilePath attribute.
AscendingOrder
)
var (
// ErrNodeNotFound is returned from Tree service in case of not found error.
ErrNodeNotFound = errors.New("not found")
// ErrNodeAccessDenied is returned from Tree service in case of access denied error.
ErrNodeAccessDenied = errors.New("access denied")
// errNodeEmpty is used to trigger retry when 'GetNodeByPath' return empty result.
errNodeEmptyResult = errors.New("empty result")
)
// client represents virtual connection to the single FrostFS tree service from which Pool is formed.
// This interface is expected to have exactly one production implementation - treeClient.
// Others are expected to be for test purposes only.
type client interface {
serviceClient() (*rpcclient.Client, error)
endpoint() string
isHealthy() bool
setHealthy(bool)
dial(ctx context.Context) error
redialIfNecessary(context.Context) (bool, error)
close() error
}
// InitParameters contains values used to initialize connection Pool.
type InitParameters struct {
key *keys.PrivateKey
logger *zap.Logger
nodeDialTimeout time.Duration
nodeStreamTimeout time.Duration
healthcheckTimeout time.Duration
clientRebalanceInterval time.Duration
nodeParams []pool.NodeParam
dialOptions []grpc.DialOption
maxRequestAttempts int
netMapInfoSource NetMapInfoSource
}
type NetMapInfoSource interface {
NetMapSnapshot(ctx context.Context) (netmap.NetMap, error)
PlacementPolicy(ctx context.Context, cnrID cid.ID) (netmap.PlacementPolicy, error)
}
// Pool represents virtual connection to the FrostFS tree services network to communicate
// with multiple FrostFS tree services without thinking about switching between servers
// due to their unavailability.
//
// Pool can be created and initialized using NewPool function.
// Before executing the FrostFS tree operations using the Pool without netMapInfoSource, connection to the
// servers MUST BE correctly established (see Dial method).
type Pool struct {
innerPools []*innerPool
key *keys.PrivateKey
cancel context.CancelFunc
closedCh chan struct{}
rebalanceParams rebalanceParameters
dialOptions []grpc.DialOption
logger *zap.Logger
methods []*pool.MethodStatus
maxRequestAttempts int
streamTimeout time.Duration
nodeDialTimeout time.Duration
netMapInfoSource NetMapInfoSource
// mutex protects clientMap and startIndices
mutex sync.RWMutex
// clientMap will be used if netMapInfoSource is set
clientMap map[uint64]client
// startIndices points to the client from which the next request will be executed.
// Since clients are stored in innerPool field we have to use two indices.
// These indices being changed during:
// * rebalance procedure (see Pool.startRebalance)
// * retry in case of request failure (see Pool.requestWithRetry)
// startIndices will be used if netMapInfoSource is not set
startIndices [2]int
}
type innerPool struct {
clients []client
}
type rebalanceParameters struct {
nodesGroup [][]pool.NodeParam
nodeRequestTimeout time.Duration
clientRebalanceInterval time.Duration
}
// GetNodesParams groups parameters of Pool.GetNodes operation.
type GetNodesParams struct {
CID cid.ID
TreeID string
Path []string
Meta []string
PathAttribute string
LatestOnly bool
AllAttrs bool
BearerToken []byte
}
// GetSubTreeParams groups parameters of Pool.GetSubTree operation.
type GetSubTreeParams struct {
CID cid.ID
TreeID string
RootID []uint64
Depth uint32
BearerToken []byte
Order SubTreeSort
}
// AddNodeParams groups parameters of Pool.AddNode operation.
type AddNodeParams struct {
CID cid.ID
TreeID string
Parent uint64
Meta map[string]string
BearerToken []byte
}
// AddNodeByPathParams groups parameters of Pool.AddNodeByPath operation.
type AddNodeByPathParams struct {
CID cid.ID
TreeID string
Path []string
Meta map[string]string
PathAttribute string
BearerToken []byte
}
// MoveNodeParams groups parameters of Pool.MoveNode operation.
type MoveNodeParams struct {
CID cid.ID
TreeID string
NodeID uint64
ParentID uint64
Meta map[string]string
BearerToken []byte
}
// RemoveNodeParams groups parameters of Pool.RemoveNode operation.
type RemoveNodeParams struct {
CID cid.ID
TreeID string
NodeID uint64
BearerToken []byte
}
// MethodIndex index of method in list of statuses in Pool.
type MethodIndex int
const (
methodGetNodes MethodIndex = iota
methodGetSubTree
methodAddNode
methodAddNodeByPath
methodMoveNode
methodRemoveNode
methodLast
)
// String implements fmt.Stringer.
func (m MethodIndex) String() string {
switch m {
case methodGetNodes:
return "getNodes"
case methodAddNode:
return "addNode"
case methodGetSubTree:
return "getSubTree"
case methodAddNodeByPath:
return "addNodeByPath"
case methodMoveNode:
return "moveNode"
case methodRemoveNode:
return "removeNode"
default:
return "unknown"
}
}
// NewPool creates connection pool using parameters.
func NewPool(options InitParameters) (*Pool, error) {
if options.key == nil {
return nil, fmt.Errorf("missed required parameter 'Key'")
}
fillDefaultInitParams(&options)
methods := make([]*pool.MethodStatus, methodLast)
for i := methodGetNodes; i < methodLast; i++ {
methods[i] = pool.NewMethodStatus(i.String())
}
p := &Pool{
key: options.key,
logger: options.logger,
dialOptions: options.dialOptions,
rebalanceParams: rebalanceParameters{
nodeRequestTimeout: options.healthcheckTimeout,
clientRebalanceInterval: options.clientRebalanceInterval,
},
maxRequestAttempts: options.maxRequestAttempts,
streamTimeout: options.nodeStreamTimeout,
nodeDialTimeout: options.nodeDialTimeout,
methods: methods,
netMapInfoSource: options.netMapInfoSource,
clientMap: make(map[uint64]client),
}
if options.netMapInfoSource == nil {
nodesParams, err := adjustNodeParams(options.nodeParams)
if err != nil {
return nil, err
}
p.rebalanceParams.nodesGroup = nodesParams
}
return p, nil
}
// Dial may not be called and will have no effect if netMapInfoSource is set.
// See also InitParameters.SetNetMapInfoSource
//
// Otherwise, Dial establishes a connection to the tree servers from the FrostFS network.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed and netMapInfoSource is not set, the Pool SHOULD NOT be used.
//
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
if p.netMapInfoSource != nil {
return nil
}
inner := make([]*innerPool, len(p.rebalanceParams.nodesGroup))
var atLeastOneHealthy bool
for i, nodes := range p.rebalanceParams.nodesGroup {
clients := make([]client, len(nodes))
for j, node := range nodes {
clients[j] = newTreeClient(node.Address(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
if err := clients[j].dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
continue
}
atLeastOneHealthy = true
}
inner[i] = &innerPool{
clients: clients,
}
}
if !atLeastOneHealthy {
return fmt.Errorf("at least one node must be healthy")
}
ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel
p.closedCh = make(chan struct{})
p.innerPools = inner
go p.startRebalance(ctx)
return nil
}
// SetKey specifies default key to be used for the protocol communication by default.
func (x *InitParameters) SetKey(key *keys.PrivateKey) {
x.key = key
}
// SetLogger specifies logger.
func (x *InitParameters) SetLogger(logger *zap.Logger) {
x.logger = logger
}
// SetNodeDialTimeout specifies the timeout for connection to be established.
func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) {
x.nodeDialTimeout = timeout
}
// SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC.
func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) {
x.nodeStreamTimeout = timeout
}
// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.
//
// See also Pool.Dial.
func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
x.healthcheckTimeout = timeout
}
// SetClientRebalanceInterval specifies the interval for updating nodes health status.
//
// See also Pool.Dial.
func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
x.clientRebalanceInterval = interval
}
// AddNode append information about the node to which you want to connect.
func (x *InitParameters) AddNode(nodeParam pool.NodeParam) {
x.nodeParams = append(x.nodeParams, nodeParam)
}
// SetGRPCDialOptions sets the gRPC dial options for new gRPC tree client connection.
func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
x.dialOptions = opts
}
// SetMaxRequestAttempts sets the max attempt to make successful request.
// Default value is 0 that means the number of attempts equals to number of nodes in pool.
func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
x.maxRequestAttempts = maxAttempts
}
// SetNetMapInfoSource sets implementation of interface to get current net map and container placement policy.
// If set, AddNode will have no effect.
func (x *InitParameters) SetNetMapInfoSource(netMapInfoSource NetMapInfoSource) {
x.netMapInfoSource = netMapInfoSource
}
// GetNodes invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNodeByPathResponseInfo, error) {
body := new(tree.GetNodeByPathRequestBody)
body.SetContainerID(prm.CID)
body.SetTreeID(prm.TreeID)
body.SetPath(prm.Path)
body.SetAttributes(prm.Meta)
body.SetPathAttribute(prm.PathAttribute)
body.SetAllAttributes(prm.AllAttrs)
body.SetLatestOnly(prm.LatestOnly)
body.SetBearerToken(prm.BearerToken)
request := new(tree.GetNodeByPathRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return nil, err
}
var resp *tree.GetNodeByPathResponse
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx))
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after,
// to keep compatibility with 'GetNodeByPath' implementation.
if inErr == nil && len(resp.GetBody().GetNodes()) == 0 {
return errNodeEmptyResult
}
return handleError("failed to get node by path", inErr)
})
p.methods[methodGetNodes].IncRequests(time.Since(start))
if err != nil && !errors.Is(err, errNodeEmptyResult) {
return nil, err
}
return resp.GetBody().GetNodes(), nil
}
// SubTreeReader is designed to read list of subtree nodes FrostFS tree service.
//
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
type SubTreeReader struct {
cli *rpcapi.GetSubTreeResponseReader
}
// Read reads another list of the subtree nodes.
func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
for i := range buf {
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
return i, io.EOF
} else if err != nil {
return i, handleError("failed to get sub tree", err)
}
buf[i] = resp.GetBody()
}
return len(buf), nil
}
// ReadAll reads all nodes subtree nodes.
func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
var res []*tree.GetSubTreeResponseBody
for {
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
break
} else if err != nil {
return nil, handleError("failed to get sub tree", err)
}
res = append(res, resp.GetBody())
}
return res, nil
}
// Next gets the next node from subtree.
func (x *SubTreeReader) Next() (*tree.GetSubTreeResponseBody, error) {
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
return nil, io.EOF
}
if err != nil {
return nil, handleError("failed to get sub tree", err)
}
return resp.GetBody(), nil
}
// GetSubTree invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) {
body := new(tree.GetSubTreeRequestBody)
body.SetContainerID(prm.CID[:])
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetDepth(prm.Depth)
body.SetRootID(prm.RootID)
orderBy := new(tree.GetSubTreeRequestBodyOrder)
switch prm.Order {
case AscendingOrder:
orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderAsc)
default:
orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderNone)
}
body.SetOrderBy(orderBy)
request := new(tree.GetSubTreeRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return nil, err
}
var cli *rpcapi.GetSubTreeResponseReader
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
return handleError("failed to get sub tree client", inErr)
})
p.methods[methodGetSubTree].IncRequests(time.Since(start))
if err != nil {
return nil, err
}
return &SubTreeReader{cli: cli}, nil
}
// AddNode invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
body := new(tree.AddRequestBody)
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetContainerID(prm.CID[:])
body.SetMeta(metaToKV(prm.Meta))
body.SetParentID(prm.Parent)
request := new(tree.AddRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return 0, err
}
var resp *tree.AddResponse
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx))
return handleError("failed to add node", inErr)
})
p.methods[methodAddNode].IncRequests(time.Since(start))
if err != nil {
return 0, err
}
return resp.GetBody().GetNodeID(), nil
}
// AddNodeByPath invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
body := new(tree.AddByPathRequestBody)
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetContainerID(prm.CID[:])
body.SetMeta(metaToKV(prm.Meta))
body.SetPathAttribute(prm.PathAttribute)
body.SetPath(prm.Path)
request := new(tree.AddByPathRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return 0, err
}
var resp *tree.AddByPathResponse
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx))
return handleError("failed to add node by path", inErr)
})
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
if err != nil {
return 0, err
}
respBody := resp.GetBody()
if respBody == nil {
return 0, errors.New("nil body in tree service response")
} else if len(respBody.GetNodes()) == 0 {
return 0, errors.New("empty list of added nodes in tree service response")
}
// The first node is the leaf that we add, according to tree service docs.
return respBody.GetNodes()[0], nil
}
// MoveNode invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
body := new(tree.MoveRequestBody)
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetContainerID(prm.CID[:])
body.SetMeta(metaToKV(prm.Meta))
body.SetNodeID(prm.NodeID)
body.SetParentID(prm.ParentID)
request := new(tree.MoveRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return err
}
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error {
if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil {
return handleError("failed to move node", err)
}
return nil
})
p.methods[methodMoveNode].IncRequests(time.Since(start))
return err
}
// RemoveNode invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
body := new(tree.RemoveRequestBody)
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetContainerID(prm.CID[:])
body.SetNodeID(prm.NodeID)
request := new(tree.RemoveRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return err
}
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error {
if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil {
return handleError("failed to remove node", err)
}
return nil
})
p.methods[methodRemoveNode].IncRequests(time.Since(start))
return err
}
// Close closes the Pool and releases all the associated resources.
func (p *Pool) Close() error {
p.cancel()
<-p.closedCh
var err error
for _, group := range p.innerPools {
for _, cl := range group.clients {
if closeErr := cl.close(); closeErr != nil {
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
err = closeErr
}
}
}
if closeErr := p.closeClientMapConnections(); closeErr != nil {
err = closeErr
}
return err
}
func (p *Pool) closeClientMapConnections() (err error) {
p.mutex.Lock()
defer p.mutex.Unlock()
for _, cl := range p.clientMap {
if closeErr := cl.close(); closeErr != nil {
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
err = closeErr
}
}
return err
}
// Statistic returns tree pool statistics.
func (p *Pool) Statistic() Statistic {
stat := Statistic{make([]pool.StatusSnapshot, len(p.methods))}
for i, method := range p.methods {
stat.methods[i] = method.Snapshot()
method.Reset()
}
return stat
}
func handleError(msg string, err error) error {
if err == nil {
return nil
}
if strings.Contains(err.Error(), "not found") {
return fmt.Errorf("%w: %s", ErrNodeNotFound, err.Error())
} else if strings.Contains(err.Error(), "denied") {
return fmt.Errorf("%w: %s", ErrNodeAccessDenied, err.Error())
}
return fmt.Errorf("%s: %w", msg, err)
}
func metaToKV(meta map[string]string) []*tree.KeyValue {
result := make([]*tree.KeyValue, 0, len(meta))
for key, value := range meta {
kv := new(tree.KeyValue)
kv.SetKey(key)
kv.SetValue([]byte(value))
result = append(result, kv)
}
return result
}
func adjustNodeParams(nodeParams []pool.NodeParam) ([][]pool.NodeParam, error) {
if len(nodeParams) == 0 {
return nil, errors.New("no FrostFS peers configured")
}
nodeParamsMap := make(map[int][]pool.NodeParam)
for _, param := range nodeParams {
nodes := nodeParamsMap[param.Priority()]
nodeParamsMap[param.Priority()] = append(nodes, param)
}
res := make([][]pool.NodeParam, 0, len(nodeParamsMap))
for _, nodes := range nodeParamsMap {
res = append(res, nodes)
}
sort.Slice(res, func(i, j int) bool {
return res[i][0].Priority() < res[j][0].Priority()
})
return res, nil
}
func fillDefaultInitParams(params *InitParameters) {
if params.clientRebalanceInterval <= 0 {
params.clientRebalanceInterval = defaultRebalanceInterval
}
if params.healthcheckTimeout <= 0 {
params.healthcheckTimeout = defaultHealthcheckTimeout
}
if params.nodeDialTimeout <= 0 {
params.nodeDialTimeout = defaultDialTimeout
}
if params.nodeStreamTimeout <= 0 {
params.nodeStreamTimeout = defaultStreamTimeout
}
if params.maxRequestAttempts <= 0 {
params.maxRequestAttempts = len(params.nodeParams)
}
}
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
if p.logger == nil {
return
}
p.logger.Log(level, msg, fields...)
}
// startRebalance runs loop to monitor tree client healthy status.
func (p *Pool) startRebalance(ctx context.Context) {
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
buffers := make([][]bool, len(p.rebalanceParams.nodesGroup))
for i, nodes := range p.rebalanceParams.nodesGroup {
buffers[i] = make([]bool, len(nodes))
}
for {
select {
case <-ctx.Done():
close(p.closedCh)
return
case <-ticker.C:
p.updateNodesHealth(ctx, buffers)
ticker.Reset(p.rebalanceParams.clientRebalanceInterval)
}
}
}
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) {
wg := sync.WaitGroup{}
for i, inner := range p.innerPools {
wg.Add(1)
go func(i int, _ *innerPool) {
defer wg.Done()
p.updateInnerNodesHealth(ctx, i, buffers[i])
}(i, inner)
}
wg.Wait()
LOOP:
for i, buffer := range buffers {
for j, healthy := range buffer {
if healthy {
p.setStartIndices(i, j)
break LOOP
}
}
}
}
func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, buffer []bool) {
if i > len(p.innerPools)-1 {
return
}
nodesByPriority := p.innerPools[i]
options := p.rebalanceParams
var wg sync.WaitGroup
for j, cli := range nodesByPriority.clients {
wg.Add(1)
go func(j int, cli client) {
defer wg.Done()
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
defer c()
changed, err := cli.redialIfNecessary(tctx)
healthy := err == nil
if changed {
fields := []zap.Field{zap.String("address", cli.endpoint()), zap.Bool("healthy", healthy)}
if err != nil {
fields = append(fields, zap.Error(err))
}
p.log(zap.DebugLevel, "tree health has changed", fields...)
} else if err != nil {
p.log(zap.DebugLevel, "tree redial error", zap.String("address", cli.endpoint()), zap.Error(err))
}
buffer[j] = healthy
}(j, cli)
}
wg.Wait()
}
func (p *Pool) getStartIndices() (int, int) {
p.mutex.RLock()
defer p.mutex.RUnlock()
return p.startIndices[0], p.startIndices[1]
}
func (p *Pool) setStartIndices(i, j int) {
p.mutex.Lock()
p.startIndices[0] = i
p.startIndices[1] = j
p.mutex.Unlock()
}
func (p *Pool) requestWithRetry(ctx context.Context, cnrID cid.ID, fn func(client *rpcclient.Client) error) error {
if p.netMapInfoSource != nil {
return p.requestWithRetryContainerNodes(ctx, cnrID, fn)
}
var (
err, finErr error
cl *rpcclient.Client
)
reqID := GetRequestID(ctx)
startI, startJ := p.getStartIndices()
groupsLen := len(p.innerPools)
attempts := p.maxRequestAttempts
LOOP:
for i := startI; i < startI+groupsLen; i++ {
indexI := i % groupsLen
clientsLen := len(p.innerPools[indexI].clients)
for j := startJ; j < startJ+clientsLen; j++ {
indexJ := j % clientsLen
if attempts == 0 {
if startI != indexI || startJ != indexJ {
p.setStartIndices(indexI, indexJ)
}
break LOOP
}
attempts--
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
err = fn(cl)
}
if !shouldTryAgain(err) {
if startI != indexI || startJ != indexJ {
p.setStartIndices(indexI, indexJ)
}
if err != nil {
err = fmt.Errorf("address %s: %w", p.innerPools[indexI].clients[indexJ].endpoint(), err)
}
return err
}
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
}
startJ = 0
}
return finErr
}
func (p *Pool) requestWithRetryContainerNodes(ctx context.Context, cnrID cid.ID, fn func(client *rpcclient.Client) error) error {
var (
err, finErr error
cl *rpcclient.Client
)
reqID := GetRequestID(ctx)
netMap, err := p.netMapInfoSource.NetMapSnapshot(ctx)
if err != nil {
return fmt.Errorf("get net map: %w", err)
}
policy, err := p.netMapInfoSource.PlacementPolicy(ctx, cnrID)
if err != nil {
return fmt.Errorf("get container placement policy: %w", err)
}
cnrNodes, err := netMap.ContainerNodes(policy, cnrID[:])
if err != nil {
return fmt.Errorf("get container nodes: %w", err)
}
cnrNodes, err = netMap.PlacementVectors(cnrNodes, cnrID[:])
if err != nil {
return fmt.Errorf("get placement vectors: %w", err)
}
attempts := p.maxRequestAttempts
LOOP:
for _, cnrNodeGroup := range cnrNodes {
for _, cnrNode := range cnrNodeGroup {
if attempts == 0 {
break LOOP
}
treeCl, ok := p.getClientFromMap(cnrNode.Hash())
if !ok {
treeCl, err = p.getNewTreeClient(ctx, cnrNode)
if err != nil {
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "failed to create tree client", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts))
continue
}
p.addClientToMap(cnrNode.Hash(), treeCl)
}
attempts--
if cl, err = treeCl.serviceClient(); err == nil {
err = fn(cl)
}
if shouldRedial(ctx, err) {
p.deleteClientFromMap(cnrNode.Hash())
}
if !shouldTryAgain(err) {
if err != nil {
err = fmt.Errorf("address %s: %w", treeCl.endpoint(), err)
}
return err
}
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
zap.String("address", treeCl.endpoint()), zap.Error(err))
}
}
return finErr
}
func (p *Pool) getClientFromMap(hash uint64) (client, bool) {
p.mutex.RLock()
defer p.mutex.RUnlock()
cl, ok := p.clientMap[hash]
return cl, ok
}
func (p *Pool) addClientToMap(hash uint64, cl client) {
p.mutex.Lock()
p.clientMap[hash] = cl
p.mutex.Unlock()
}
func (p *Pool) deleteClientFromMap(hash uint64) {
p.mutex.Lock()
if cli, ok := p.clientMap[hash]; ok {
_ = cli.close()
delete(p.clientMap, hash)
}
p.mutex.Unlock()
}
func (p *Pool) getNewTreeClient(ctx context.Context, node netmap.NodeInfo) (*treeClient, error) {
var (
treeCl *treeClient
err error
)
node.IterateNetworkEndpoints(func(endpoint string) bool {
var addr network.Address
if err = addr.FromString(endpoint); err != nil {
p.log(zap.WarnLevel, "can't parse endpoint", zap.String("endpoint", endpoint), zap.Error(err))
return false
}
newTreeCl := newTreeClient(addr.URIAddr(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
if err = newTreeCl.dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
return false
}
treeCl = newTreeCl
return true
})
if treeCl == nil {
return nil, fmt.Errorf("tree client wasn't initialized")
}
return treeCl, nil
}
func shouldTryAgain(err error) bool {
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
}
func shouldRedial(ctx context.Context, err error) bool {
if err == nil || errors.Is(err, ErrNodeAccessDenied) || errors.Is(err, ErrNodeNotFound) || errors.Is(err, errNodeEmptyResult) || errors.Is(ctx.Err(), context.Canceled) {
return false
}
return true
}
func prioErr(err error) int {
switch {
case err == nil:
return -1
case errors.Is(err, ErrNodeAccessDenied):
return 100
case errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, errNodeEmptyResult):
return 200
case errors.Is(err, ErrUnhealthyEndpoint):
return 300
default:
return 500
}
}
func finalError(current, candidate error) error {
if current == nil || candidate == nil {
return candidate
}
// lower priority error is more desirable
if prioErr(candidate) < prioErr(current) {
return candidate
}
return current
}
type reqKeyType struct{}
// SetRequestID sets request identifier to context so when some operations are logged in tree pool
// this identifier also be logged.
func SetRequestID(ctx context.Context, reqID string) context.Context {
return context.WithValue(ctx, reqKeyType{}, reqID)
}
// GetRequestID fetch tree pool request identifier from context.
func GetRequestID(ctx context.Context) string {
reqID, _ := ctx.Value(reqKeyType{}).(string)
return reqID
}