package tree import ( "context" "errors" "fmt" "io" "sort" "strings" "sync" "time" rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc" rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" ) const ( defaultRebalanceInterval = 15 * time.Second defaultHealthcheckTimeout = 4 * time.Second defaultDialTimeout = 5 * time.Second defaultStreamTimeout = 10 * time.Second ) // SubTreeSort defines an order of nodes returned from GetSubTree RPC. type SubTreeSort int32 const ( // NoneOrder does not specify order of nodes returned in GetSubTree RPC. NoneOrder SubTreeSort = iota // AscendingOrder specifies ascending alphabetical order of nodes based on FilePath attribute. AscendingOrder ) var ( // ErrNodeNotFound is returned from Tree service in case of not found error. ErrNodeNotFound = errors.New("not found") // ErrNodeAccessDenied is returned from Tree service in case of access denied error. ErrNodeAccessDenied = errors.New("access denied") // errNodeEmpty is used to trigger retry when 'GetNodeByPath' return empty result. errNodeEmptyResult = errors.New("empty result") ) // client represents virtual connection to the single FrostFS tree service from which Pool is formed. // This interface is expected to have exactly one production implementation - treeClient. // Others are expected to be for test purposes only. type client interface { serviceClient() (*rpcclient.Client, error) endpoint() string isHealthy() bool setHealthy(bool) dial(ctx context.Context) error redialIfNecessary(context.Context) (bool, error) close() error } // InitParameters contains values used to initialize connection Pool. type InitParameters struct { key *keys.PrivateKey logger *zap.Logger nodeDialTimeout time.Duration nodeStreamTimeout time.Duration healthcheckTimeout time.Duration clientRebalanceInterval time.Duration nodeParams []pool.NodeParam dialOptions []grpc.DialOption maxRequestAttempts int netMapInfoSource NetMapInfoSource } type NetMapInfoSource interface { NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) PlacementPolicy(ctx context.Context, cnrID cid.ID) (netmap.PlacementPolicy, error) } // Pool represents virtual connection to the FrostFS tree services network to communicate // with multiple FrostFS tree services without thinking about switching between servers // due to their unavailability. // // Pool can be created and initialized using NewPool function. // Before executing the FrostFS tree operations using the Pool without netMapInfoSource, connection to the // servers MUST BE correctly established (see Dial method). type Pool struct { innerPools []*innerPool key *keys.PrivateKey cancel context.CancelFunc closedCh chan struct{} rebalanceParams rebalanceParameters dialOptions []grpc.DialOption logger *zap.Logger methods []*pool.MethodStatus maxRequestAttempts int streamTimeout time.Duration nodeDialTimeout time.Duration netMapInfoSource NetMapInfoSource // mutex protects clientMap and startIndices mutex sync.RWMutex // clientMap will be used if netMapInfoSource is set clientMap map[uint64]client // startIndices points to the client from which the next request will be executed. // Since clients are stored in innerPool field we have to use two indices. // These indices being changed during: // * rebalance procedure (see Pool.startRebalance) // * retry in case of request failure (see Pool.requestWithRetry) // startIndices will be used if netMapInfoSource is not set startIndices [2]int } type innerPool struct { clients []client } type rebalanceParameters struct { nodesGroup [][]pool.NodeParam nodeRequestTimeout time.Duration clientRebalanceInterval time.Duration } // GetNodesParams groups parameters of Pool.GetNodes operation. type GetNodesParams struct { CID cid.ID TreeID string Path []string Meta []string PathAttribute string LatestOnly bool AllAttrs bool BearerToken []byte } // GetSubTreeParams groups parameters of Pool.GetSubTree operation. type GetSubTreeParams struct { CID cid.ID TreeID string RootID []uint64 Depth uint32 BearerToken []byte Order SubTreeSort } // AddNodeParams groups parameters of Pool.AddNode operation. type AddNodeParams struct { CID cid.ID TreeID string Parent uint64 Meta map[string]string BearerToken []byte } // AddNodeByPathParams groups parameters of Pool.AddNodeByPath operation. type AddNodeByPathParams struct { CID cid.ID TreeID string Path []string Meta map[string]string PathAttribute string BearerToken []byte } // MoveNodeParams groups parameters of Pool.MoveNode operation. type MoveNodeParams struct { CID cid.ID TreeID string NodeID uint64 ParentID uint64 Meta map[string]string BearerToken []byte } // RemoveNodeParams groups parameters of Pool.RemoveNode operation. type RemoveNodeParams struct { CID cid.ID TreeID string NodeID uint64 BearerToken []byte } // MethodIndex index of method in list of statuses in Pool. type MethodIndex int const ( methodGetNodes MethodIndex = iota methodGetSubTree methodAddNode methodAddNodeByPath methodMoveNode methodRemoveNode methodLast ) // String implements fmt.Stringer. func (m MethodIndex) String() string { switch m { case methodGetNodes: return "getNodes" case methodAddNode: return "addNode" case methodGetSubTree: return "getSubTree" case methodAddNodeByPath: return "addNodeByPath" case methodMoveNode: return "moveNode" case methodRemoveNode: return "removeNode" default: return "unknown" } } // NewPool creates connection pool using parameters. func NewPool(options InitParameters) (*Pool, error) { if options.key == nil { return nil, fmt.Errorf("missed required parameter 'Key'") } fillDefaultInitParams(&options) methods := make([]*pool.MethodStatus, methodLast) for i := methodGetNodes; i < methodLast; i++ { methods[i] = pool.NewMethodStatus(i.String()) } p := &Pool{ key: options.key, logger: options.logger, dialOptions: options.dialOptions, rebalanceParams: rebalanceParameters{ nodeRequestTimeout: options.healthcheckTimeout, clientRebalanceInterval: options.clientRebalanceInterval, }, maxRequestAttempts: options.maxRequestAttempts, streamTimeout: options.nodeStreamTimeout, nodeDialTimeout: options.nodeDialTimeout, methods: methods, netMapInfoSource: options.netMapInfoSource, clientMap: make(map[uint64]client), } if options.netMapInfoSource == nil { nodesParams, err := adjustNodeParams(options.nodeParams) if err != nil { return nil, err } p.rebalanceParams.nodesGroup = nodesParams } return p, nil } // Dial may not be called and will have no effect if netMapInfoSource is set. // See also InitParameters.SetNetMapInfoSource // // Otherwise, Dial establishes a connection to the tree servers from the FrostFS network. // It also starts a routine that checks the health of the nodes and // updates the weights of the nodes for balancing. // Returns an error describing failure reason. // // If failed and netMapInfoSource is not set, the Pool SHOULD NOT be used. // // See also InitParameters.SetClientRebalanceInterval. func (p *Pool) Dial(ctx context.Context) error { if p.netMapInfoSource != nil { return nil } inner := make([]*innerPool, len(p.rebalanceParams.nodesGroup)) var atLeastOneHealthy bool for i, nodes := range p.rebalanceParams.nodesGroup { clients := make([]client, len(nodes)) for j, node := range nodes { clients[j] = newTreeClient(node.Address(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout) if err := clients[j].dial(ctx); err != nil { p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err)) continue } atLeastOneHealthy = true } inner[i] = &innerPool{ clients: clients, } } if !atLeastOneHealthy { return fmt.Errorf("at least one node must be healthy") } ctx, cancel := context.WithCancel(ctx) p.cancel = cancel p.closedCh = make(chan struct{}) p.innerPools = inner go p.startRebalance(ctx) return nil } // SetKey specifies default key to be used for the protocol communication by default. func (x *InitParameters) SetKey(key *keys.PrivateKey) { x.key = key } // SetLogger specifies logger. func (x *InitParameters) SetLogger(logger *zap.Logger) { x.logger = logger } // SetNodeDialTimeout specifies the timeout for connection to be established. func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) { x.nodeDialTimeout = timeout } // SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC. func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) { x.nodeStreamTimeout = timeout } // SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive. // // See also Pool.Dial. func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) { x.healthcheckTimeout = timeout } // SetClientRebalanceInterval specifies the interval for updating nodes health status. // // See also Pool.Dial. func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) { x.clientRebalanceInterval = interval } // AddNode append information about the node to which you want to connect. func (x *InitParameters) AddNode(nodeParam pool.NodeParam) { x.nodeParams = append(x.nodeParams, nodeParam) } // SetGRPCDialOptions sets the gRPC dial options for new gRPC tree client connection. func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) { x.dialOptions = opts } // SetMaxRequestAttempts sets the max attempt to make successful request. // Default value is 0 that means the number of attempts equals to number of nodes in pool. func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) { x.maxRequestAttempts = maxAttempts } // SetNetMapInfoSource sets implementation of interface to get current net map and container placement policy. // If set, AddNode will have no effect. func (x *InitParameters) SetNetMapInfoSource(netMapInfoSource NetMapInfoSource) { x.netMapInfoSource = netMapInfoSource } // GetNodes invokes eponymous method from TreeServiceClient. // // Can return predefined errors: // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNodeByPathResponseInfo, error) { body := new(tree.GetNodeByPathRequestBody) body.SetContainerID(prm.CID) body.SetTreeID(prm.TreeID) body.SetPath(prm.Path) body.SetAttributes(prm.Meta) body.SetPathAttribute(prm.PathAttribute) body.SetAllAttributes(prm.AllAttrs) body.SetLatestOnly(prm.LatestOnly) body.SetBearerToken(prm.BearerToken) request := new(tree.GetNodeByPathRequest) request.SetBody(body) start := time.Now() if err := p.signRequest(request); err != nil { return nil, err } var resp *tree.GetNodeByPathResponse err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) { resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx)) // Pool wants to do retry 'GetNodeByPath' request if result is empty. // Empty result is expected due to delayed tree service sync. // Return an error there to trigger retry and ignore it after, // to keep compatibility with 'GetNodeByPath' implementation. if inErr == nil && len(resp.GetBody().GetNodes()) == 0 { return errNodeEmptyResult } return handleError("failed to get node by path", inErr) }) p.methods[methodGetNodes].IncRequests(time.Since(start)) if err != nil && !errors.Is(err, errNodeEmptyResult) { return nil, err } return resp.GetBody().GetNodes(), nil } // SubTreeReader is designed to read list of subtree nodes FrostFS tree service. // // Must be initialized using Pool.GetSubTree, any other usage is unsafe. type SubTreeReader struct { cli *rpcapi.GetSubTreeResponseReader } // Read reads another list of the subtree nodes. func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) { for i := range buf { var resp tree.GetSubTreeResponse err := x.cli.Read(&resp) if err == io.EOF { return i, io.EOF } else if err != nil { return i, handleError("failed to get sub tree", err) } buf[i] = resp.GetBody() } return len(buf), nil } // ReadAll reads all nodes subtree nodes. func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) { var res []*tree.GetSubTreeResponseBody for { var resp tree.GetSubTreeResponse err := x.cli.Read(&resp) if err == io.EOF { break } else if err != nil { return nil, handleError("failed to get sub tree", err) } res = append(res, resp.GetBody()) } return res, nil } // Next gets the next node from subtree. func (x *SubTreeReader) Next() (*tree.GetSubTreeResponseBody, error) { var resp tree.GetSubTreeResponse err := x.cli.Read(&resp) if err == io.EOF { return nil, io.EOF } if err != nil { return nil, handleError("failed to get sub tree", err) } return resp.GetBody(), nil } // GetSubTree invokes eponymous method from TreeServiceClient. // // Can return predefined errors: // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) { body := new(tree.GetSubTreeRequestBody) body.SetContainerID(prm.CID[:]) body.SetTreeID(prm.TreeID) body.SetBearerToken(prm.BearerToken) body.SetDepth(prm.Depth) body.SetRootID(prm.RootID) orderBy := new(tree.GetSubTreeRequestBodyOrder) switch prm.Order { case AscendingOrder: orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderAsc) default: orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderNone) } body.SetOrderBy(orderBy) request := new(tree.GetSubTreeRequest) request.SetBody(body) start := time.Now() if err := p.signRequest(request); err != nil { return nil, err } var cli *rpcapi.GetSubTreeResponseReader err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) { cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx)) return handleError("failed to get sub tree client", inErr) }) p.methods[methodGetSubTree].IncRequests(time.Since(start)) if err != nil { return nil, err } return &SubTreeReader{cli: cli}, nil } // AddNode invokes eponymous method from TreeServiceClient. // // Can return predefined errors: // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { body := new(tree.AddRequestBody) body.SetTreeID(prm.TreeID) body.SetBearerToken(prm.BearerToken) body.SetContainerID(prm.CID[:]) body.SetMeta(metaToKV(prm.Meta)) body.SetParentID(prm.Parent) request := new(tree.AddRequest) request.SetBody(body) start := time.Now() if err := p.signRequest(request); err != nil { return 0, err } var resp *tree.AddResponse err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) { resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx)) return handleError("failed to add node", inErr) }) p.methods[methodAddNode].IncRequests(time.Since(start)) if err != nil { return 0, err } return resp.GetBody().GetNodeID(), nil } // AddNodeByPath invokes eponymous method from TreeServiceClient. // // Can return predefined errors: // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) { body := new(tree.AddByPathRequestBody) body.SetTreeID(prm.TreeID) body.SetBearerToken(prm.BearerToken) body.SetContainerID(prm.CID[:]) body.SetMeta(metaToKV(prm.Meta)) body.SetPathAttribute(prm.PathAttribute) body.SetPath(prm.Path) request := new(tree.AddByPathRequest) request.SetBody(body) start := time.Now() if err := p.signRequest(request); err != nil { return 0, err } var resp *tree.AddByPathResponse err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) { resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx)) return handleError("failed to add node by path", inErr) }) p.methods[methodAddNodeByPath].IncRequests(time.Since(start)) if err != nil { return 0, err } respBody := resp.GetBody() if respBody == nil { return 0, errors.New("nil body in tree service response") } else if len(respBody.GetNodes()) == 0 { return 0, errors.New("empty list of added nodes in tree service response") } // The first node is the leaf that we add, according to tree service docs. return respBody.GetNodes()[0], nil } // MoveNode invokes eponymous method from TreeServiceClient. // // Can return predefined errors: // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { body := new(tree.MoveRequestBody) body.SetTreeID(prm.TreeID) body.SetBearerToken(prm.BearerToken) body.SetContainerID(prm.CID[:]) body.SetMeta(metaToKV(prm.Meta)) body.SetNodeID(prm.NodeID) body.SetParentID(prm.ParentID) request := new(tree.MoveRequest) request.SetBody(body) start := time.Now() if err := p.signRequest(request); err != nil { return err } err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error { if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil { return handleError("failed to move node", err) } return nil }) p.methods[methodMoveNode].IncRequests(time.Since(start)) return err } // RemoveNode invokes eponymous method from TreeServiceClient. // // Can return predefined errors: // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error { body := new(tree.RemoveRequestBody) body.SetTreeID(prm.TreeID) body.SetBearerToken(prm.BearerToken) body.SetContainerID(prm.CID[:]) body.SetNodeID(prm.NodeID) request := new(tree.RemoveRequest) request.SetBody(body) start := time.Now() if err := p.signRequest(request); err != nil { return err } err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error { if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil { return handleError("failed to remove node", err) } return nil }) p.methods[methodRemoveNode].IncRequests(time.Since(start)) return err } // Close closes the Pool and releases all the associated resources. func (p *Pool) Close() error { p.cancel() <-p.closedCh var err error for _, group := range p.innerPools { for _, cl := range group.clients { if closeErr := cl.close(); closeErr != nil { p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr)) err = closeErr } } } if closeErr := p.closeClientMapConnections(); closeErr != nil { err = closeErr } return err } func (p *Pool) closeClientMapConnections() (err error) { p.mutex.Lock() defer p.mutex.Unlock() for _, cl := range p.clientMap { if closeErr := cl.close(); closeErr != nil { p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr)) err = closeErr } } return err } // Statistic returns tree pool statistics. func (p *Pool) Statistic() Statistic { stat := Statistic{make([]pool.StatusSnapshot, len(p.methods))} for i, method := range p.methods { stat.methods[i] = method.Snapshot() method.Reset() } return stat } func handleError(msg string, err error) error { if err == nil { return nil } if strings.Contains(err.Error(), "not found") { return fmt.Errorf("%w: %s", ErrNodeNotFound, err.Error()) } else if strings.Contains(err.Error(), "denied") { return fmt.Errorf("%w: %s", ErrNodeAccessDenied, err.Error()) } return fmt.Errorf("%s: %w", msg, err) } func metaToKV(meta map[string]string) []*tree.KeyValue { result := make([]*tree.KeyValue, 0, len(meta)) for key, value := range meta { kv := new(tree.KeyValue) kv.SetKey(key) kv.SetValue([]byte(value)) result = append(result, kv) } return result } func adjustNodeParams(nodeParams []pool.NodeParam) ([][]pool.NodeParam, error) { if len(nodeParams) == 0 { return nil, errors.New("no FrostFS peers configured") } nodeParamsMap := make(map[int][]pool.NodeParam) for _, param := range nodeParams { nodes := nodeParamsMap[param.Priority()] nodeParamsMap[param.Priority()] = append(nodes, param) } res := make([][]pool.NodeParam, 0, len(nodeParamsMap)) for _, nodes := range nodeParamsMap { res = append(res, nodes) } sort.Slice(res, func(i, j int) bool { return res[i][0].Priority() < res[j][0].Priority() }) return res, nil } func fillDefaultInitParams(params *InitParameters) { if params.clientRebalanceInterval <= 0 { params.clientRebalanceInterval = defaultRebalanceInterval } if params.healthcheckTimeout <= 0 { params.healthcheckTimeout = defaultHealthcheckTimeout } if params.nodeDialTimeout <= 0 { params.nodeDialTimeout = defaultDialTimeout } if params.nodeStreamTimeout <= 0 { params.nodeStreamTimeout = defaultStreamTimeout } if params.maxRequestAttempts <= 0 { params.maxRequestAttempts = len(params.nodeParams) } } func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) { if p.logger == nil { return } p.logger.Log(level, msg, fields...) } // startRebalance runs loop to monitor tree client healthy status. func (p *Pool) startRebalance(ctx context.Context) { ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval) buffers := make([][]bool, len(p.rebalanceParams.nodesGroup)) for i, nodes := range p.rebalanceParams.nodesGroup { buffers[i] = make([]bool, len(nodes)) } for { select { case <-ctx.Done(): close(p.closedCh) return case <-ticker.C: p.updateNodesHealth(ctx, buffers) ticker.Reset(p.rebalanceParams.clientRebalanceInterval) } } } func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) { wg := sync.WaitGroup{} for i, inner := range p.innerPools { wg.Add(1) go func(i int, _ *innerPool) { defer wg.Done() p.updateInnerNodesHealth(ctx, i, buffers[i]) }(i, inner) } wg.Wait() LOOP: for i, buffer := range buffers { for j, healthy := range buffer { if healthy { p.setStartIndices(i, j) break LOOP } } } } func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, buffer []bool) { if i > len(p.innerPools)-1 { return } nodesByPriority := p.innerPools[i] options := p.rebalanceParams var wg sync.WaitGroup for j, cli := range nodesByPriority.clients { wg.Add(1) go func(j int, cli client) { defer wg.Done() tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) defer c() changed, err := cli.redialIfNecessary(tctx) healthy := err == nil if changed { fields := []zap.Field{zap.String("address", cli.endpoint()), zap.Bool("healthy", healthy)} if err != nil { fields = append(fields, zap.Error(err)) } p.log(zap.DebugLevel, "tree health has changed", fields...) } else if err != nil { p.log(zap.DebugLevel, "tree redial error", zap.String("address", cli.endpoint()), zap.Error(err)) } buffer[j] = healthy }(j, cli) } wg.Wait() } func (p *Pool) getStartIndices() (int, int) { p.mutex.RLock() defer p.mutex.RUnlock() return p.startIndices[0], p.startIndices[1] } func (p *Pool) setStartIndices(i, j int) { p.mutex.Lock() p.startIndices[0] = i p.startIndices[1] = j p.mutex.Unlock() } func (p *Pool) requestWithRetry(ctx context.Context, cnrID cid.ID, fn func(client *rpcclient.Client) error) error { if p.netMapInfoSource != nil { return p.requestWithRetryContainerNodes(ctx, cnrID, fn) } var ( err, finErr error cl *rpcclient.Client ) reqID := GetRequestID(ctx) startI, startJ := p.getStartIndices() groupsLen := len(p.innerPools) attempts := p.maxRequestAttempts LOOP: for i := startI; i < startI+groupsLen; i++ { indexI := i % groupsLen clientsLen := len(p.innerPools[indexI].clients) for j := startJ; j < startJ+clientsLen; j++ { indexJ := j % clientsLen if attempts == 0 { if startI != indexI || startJ != indexJ { p.setStartIndices(indexI, indexJ) } break LOOP } attempts-- if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil { err = fn(cl) } if !shouldTryAgain(err) { if startI != indexI || startJ != indexJ { p.setStartIndices(indexI, indexJ) } if err != nil { err = fmt.Errorf("address %s: %w", p.innerPools[indexI].clients[indexJ].endpoint(), err) } return err } finErr = finalError(finErr, err) p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts), zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) } startJ = 0 } return finErr } func (p *Pool) requestWithRetryContainerNodes(ctx context.Context, cnrID cid.ID, fn func(client *rpcclient.Client) error) error { var ( err, finErr error cl *rpcclient.Client ) reqID := GetRequestID(ctx) netMap, err := p.netMapInfoSource.NetMapSnapshot(ctx) if err != nil { return fmt.Errorf("get net map: %w", err) } policy, err := p.netMapInfoSource.PlacementPolicy(ctx, cnrID) if err != nil { return fmt.Errorf("get container placement policy: %w", err) } cnrNodes, err := netMap.ContainerNodes(policy, cnrID[:]) if err != nil { return fmt.Errorf("get container nodes: %w", err) } cnrNodes, err = netMap.PlacementVectors(cnrNodes, cnrID[:]) if err != nil { return fmt.Errorf("get placement vectors: %w", err) } attempts := p.maxRequestAttempts LOOP: for _, cnrNodeGroup := range cnrNodes { for _, cnrNode := range cnrNodeGroup { if attempts == 0 { break LOOP } treeCl, ok := p.getClientFromMap(cnrNode.Hash()) if !ok { treeCl, err = p.getNewTreeClient(ctx, cnrNode) if err != nil { finErr = finalError(finErr, err) p.log(zap.DebugLevel, "failed to create tree client", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts)) continue } p.addClientToMap(cnrNode.Hash(), treeCl) } attempts-- if cl, err = treeCl.serviceClient(); err == nil { err = fn(cl) } if shouldRedial(ctx, err) { p.deleteClientFromMap(cnrNode.Hash()) } if !shouldTryAgain(err) { if err != nil { err = fmt.Errorf("address %s: %w", treeCl.endpoint(), err) } return err } finErr = finalError(finErr, err) p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts), zap.String("address", treeCl.endpoint()), zap.Error(err)) } } return finErr } func (p *Pool) getClientFromMap(hash uint64) (client, bool) { p.mutex.RLock() defer p.mutex.RUnlock() cl, ok := p.clientMap[hash] return cl, ok } func (p *Pool) addClientToMap(hash uint64, cl client) { p.mutex.Lock() p.clientMap[hash] = cl p.mutex.Unlock() } func (p *Pool) deleteClientFromMap(hash uint64) { p.mutex.Lock() _ = p.clientMap[hash].close() delete(p.clientMap, hash) p.mutex.Unlock() } func (p *Pool) getNewTreeClient(ctx context.Context, node netmap.NodeInfo) (*treeClient, error) { var ( treeCl *treeClient err error ) node.IterateNetworkEndpoints(func(endpoint string) bool { var addr network.Address if err = addr.FromString(endpoint); err != nil { p.log(zap.WarnLevel, "can't parse endpoint", zap.String("endpoint", endpoint), zap.Error(err)) return false } newTreeCl := newTreeClient(addr.URIAddr(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout) if err = newTreeCl.dial(ctx); err != nil { p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err)) return false } treeCl = newTreeCl return true }) if treeCl == nil { return nil, fmt.Errorf("tree client wasn't initialized") } return treeCl, nil } func shouldTryAgain(err error) bool { return !(err == nil || errors.Is(err, ErrNodeAccessDenied)) } func shouldRedial(ctx context.Context, err error) bool { if err == nil || errors.Is(err, ErrNodeAccessDenied) || errors.Is(err, ErrNodeNotFound) || errors.Is(err, errNodeEmptyResult) || errors.Is(ctx.Err(), context.Canceled) { return false } return true } func prioErr(err error) int { switch { case err == nil: return -1 case errors.Is(err, ErrNodeAccessDenied): return 100 case errors.Is(err, ErrNodeNotFound) || errors.Is(err, errNodeEmptyResult): return 200 case errors.Is(err, ErrUnhealthyEndpoint): return 300 default: return 500 } } func finalError(current, candidate error) error { if current == nil || candidate == nil { return candidate } // lower priority error is more desirable if prioErr(candidate) < prioErr(current) { return candidate } return current } type reqKeyType struct{} // SetRequestID sets request identifier to context so when some operations are logged in tree pool // this identifier also be logged. func SetRequestID(ctx context.Context, reqID string) context.Context { return context.WithValue(ctx, reqKeyType{}, reqID) } // GetRequestID fetch tree pool request identifier from context. func GetRequestID(ctx context.Context) string { reqID, _ := ctx.Value(reqKeyType{}).(string) return reqID }