Marina Biryukova
42a0fc8c13
New version of tree/pool selects tree service connection to make request based on the current net map and container placement policy Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
1099 lines
29 KiB
Go
1099 lines
29 KiB
Go
package tree
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
|
|
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pkg/network"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
const (
|
|
defaultRebalanceInterval = 15 * time.Second
|
|
defaultHealthcheckTimeout = 4 * time.Second
|
|
defaultDialTimeout = 5 * time.Second
|
|
defaultStreamTimeout = 10 * time.Second
|
|
)
|
|
|
|
// SubTreeSort defines an order of nodes returned from GetSubTree RPC.
|
|
type SubTreeSort int32
|
|
|
|
const (
|
|
// NoneOrder does not specify order of nodes returned in GetSubTree RPC.
|
|
NoneOrder SubTreeSort = iota
|
|
// AscendingOrder specifies ascending alphabetical order of nodes based on FilePath attribute.
|
|
AscendingOrder
|
|
)
|
|
|
|
var (
|
|
// ErrNodeNotFound is returned from Tree service in case of not found error.
|
|
ErrNodeNotFound = errors.New("not found")
|
|
|
|
// ErrNodeAccessDenied is returned from Tree service in case of access denied error.
|
|
ErrNodeAccessDenied = errors.New("access denied")
|
|
|
|
// errNodeEmpty is used to trigger retry when 'GetNodeByPath' return empty result.
|
|
errNodeEmptyResult = errors.New("empty result")
|
|
)
|
|
|
|
// client represents virtual connection to the single FrostFS tree service from which Pool is formed.
|
|
// This interface is expected to have exactly one production implementation - treeClient.
|
|
// Others are expected to be for test purposes only.
|
|
type client interface {
|
|
serviceClient() (*rpcclient.Client, error)
|
|
endpoint() string
|
|
isHealthy() bool
|
|
setHealthy(bool)
|
|
dial(ctx context.Context) error
|
|
redialIfNecessary(context.Context) (bool, error)
|
|
close() error
|
|
}
|
|
|
|
// InitParameters contains values used to initialize connection Pool.
|
|
type InitParameters struct {
|
|
key *keys.PrivateKey
|
|
logger *zap.Logger
|
|
nodeDialTimeout time.Duration
|
|
nodeStreamTimeout time.Duration
|
|
healthcheckTimeout time.Duration
|
|
clientRebalanceInterval time.Duration
|
|
nodeParams []pool.NodeParam
|
|
dialOptions []grpc.DialOption
|
|
maxRequestAttempts int
|
|
netMapInfoSource NetMapInfoSource
|
|
}
|
|
|
|
type NetMapInfoSource interface {
|
|
NetMapSnapshot(ctx context.Context) (netmap.NetMap, error)
|
|
PlacementPolicy(ctx context.Context, cnrID cid.ID) (netmap.PlacementPolicy, error)
|
|
}
|
|
|
|
// Pool represents virtual connection to the FrostFS tree services network to communicate
|
|
// with multiple FrostFS tree services without thinking about switching between servers
|
|
// due to their unavailability.
|
|
//
|
|
// Pool can be created and initialized using NewPool function.
|
|
// Before executing the FrostFS tree operations using the Pool without netMapInfoSource, connection to the
|
|
// servers MUST BE correctly established (see Dial method).
|
|
type Pool struct {
|
|
innerPools []*innerPool
|
|
key *keys.PrivateKey
|
|
cancel context.CancelFunc
|
|
closedCh chan struct{}
|
|
rebalanceParams rebalanceParameters
|
|
dialOptions []grpc.DialOption
|
|
logger *zap.Logger
|
|
methods []*pool.MethodStatus
|
|
|
|
maxRequestAttempts int
|
|
streamTimeout time.Duration
|
|
nodeDialTimeout time.Duration
|
|
|
|
netMapInfoSource NetMapInfoSource
|
|
|
|
// mutex protects clientMap and startIndices
|
|
mutex sync.RWMutex
|
|
// clientMap will be used if netMapInfoSource is set
|
|
clientMap map[uint64]client
|
|
// startIndices points to the client from which the next request will be executed.
|
|
// Since clients are stored in innerPool field we have to use two indices.
|
|
// These indices being changed during:
|
|
// * rebalance procedure (see Pool.startRebalance)
|
|
// * retry in case of request failure (see Pool.requestWithRetry)
|
|
// startIndices will be used if netMapInfoSource is not set
|
|
startIndices [2]int
|
|
}
|
|
|
|
type innerPool struct {
|
|
clients []client
|
|
}
|
|
|
|
type rebalanceParameters struct {
|
|
nodesGroup [][]pool.NodeParam
|
|
nodeRequestTimeout time.Duration
|
|
clientRebalanceInterval time.Duration
|
|
}
|
|
|
|
// GetNodesParams groups parameters of Pool.GetNodes operation.
|
|
type GetNodesParams struct {
|
|
CID cid.ID
|
|
TreeID string
|
|
Path []string
|
|
Meta []string
|
|
PathAttribute string
|
|
LatestOnly bool
|
|
AllAttrs bool
|
|
BearerToken []byte
|
|
}
|
|
|
|
// GetSubTreeParams groups parameters of Pool.GetSubTree operation.
|
|
type GetSubTreeParams struct {
|
|
CID cid.ID
|
|
TreeID string
|
|
RootID []uint64
|
|
Depth uint32
|
|
BearerToken []byte
|
|
Order SubTreeSort
|
|
}
|
|
|
|
// AddNodeParams groups parameters of Pool.AddNode operation.
|
|
type AddNodeParams struct {
|
|
CID cid.ID
|
|
TreeID string
|
|
Parent uint64
|
|
Meta map[string]string
|
|
BearerToken []byte
|
|
}
|
|
|
|
// AddNodeByPathParams groups parameters of Pool.AddNodeByPath operation.
|
|
type AddNodeByPathParams struct {
|
|
CID cid.ID
|
|
TreeID string
|
|
Path []string
|
|
Meta map[string]string
|
|
PathAttribute string
|
|
BearerToken []byte
|
|
}
|
|
|
|
// MoveNodeParams groups parameters of Pool.MoveNode operation.
|
|
type MoveNodeParams struct {
|
|
CID cid.ID
|
|
TreeID string
|
|
NodeID uint64
|
|
ParentID uint64
|
|
Meta map[string]string
|
|
BearerToken []byte
|
|
}
|
|
|
|
// RemoveNodeParams groups parameters of Pool.RemoveNode operation.
|
|
type RemoveNodeParams struct {
|
|
CID cid.ID
|
|
TreeID string
|
|
NodeID uint64
|
|
BearerToken []byte
|
|
}
|
|
|
|
// MethodIndex index of method in list of statuses in Pool.
|
|
type MethodIndex int
|
|
|
|
const (
|
|
methodGetNodes MethodIndex = iota
|
|
methodGetSubTree
|
|
methodAddNode
|
|
methodAddNodeByPath
|
|
methodMoveNode
|
|
methodRemoveNode
|
|
methodLast
|
|
)
|
|
|
|
// String implements fmt.Stringer.
|
|
func (m MethodIndex) String() string {
|
|
switch m {
|
|
case methodGetNodes:
|
|
return "getNodes"
|
|
case methodAddNode:
|
|
return "addNode"
|
|
case methodGetSubTree:
|
|
return "getSubTree"
|
|
case methodAddNodeByPath:
|
|
return "addNodeByPath"
|
|
case methodMoveNode:
|
|
return "moveNode"
|
|
case methodRemoveNode:
|
|
return "removeNode"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
// NewPool creates connection pool using parameters.
|
|
func NewPool(options InitParameters) (*Pool, error) {
|
|
if options.key == nil {
|
|
return nil, fmt.Errorf("missed required parameter 'Key'")
|
|
}
|
|
|
|
fillDefaultInitParams(&options)
|
|
|
|
methods := make([]*pool.MethodStatus, methodLast)
|
|
for i := methodGetNodes; i < methodLast; i++ {
|
|
methods[i] = pool.NewMethodStatus(i.String())
|
|
}
|
|
|
|
p := &Pool{
|
|
key: options.key,
|
|
logger: options.logger,
|
|
dialOptions: options.dialOptions,
|
|
rebalanceParams: rebalanceParameters{
|
|
nodeRequestTimeout: options.healthcheckTimeout,
|
|
clientRebalanceInterval: options.clientRebalanceInterval,
|
|
},
|
|
maxRequestAttempts: options.maxRequestAttempts,
|
|
streamTimeout: options.nodeStreamTimeout,
|
|
nodeDialTimeout: options.nodeDialTimeout,
|
|
methods: methods,
|
|
netMapInfoSource: options.netMapInfoSource,
|
|
clientMap: make(map[uint64]client),
|
|
}
|
|
|
|
if options.netMapInfoSource == nil {
|
|
nodesParams, err := adjustNodeParams(options.nodeParams)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p.rebalanceParams.nodesGroup = nodesParams
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
// Dial may not be called and will have no effect if netMapInfoSource is set.
|
|
// See also InitParameters.SetNetMapInfoSource
|
|
//
|
|
// Otherwise, Dial establishes a connection to the tree servers from the FrostFS network.
|
|
// It also starts a routine that checks the health of the nodes and
|
|
// updates the weights of the nodes for balancing.
|
|
// Returns an error describing failure reason.
|
|
//
|
|
// If failed and netMapInfoSource is not set, the Pool SHOULD NOT be used.
|
|
//
|
|
// See also InitParameters.SetClientRebalanceInterval.
|
|
func (p *Pool) Dial(ctx context.Context) error {
|
|
if p.netMapInfoSource != nil {
|
|
return nil
|
|
}
|
|
|
|
inner := make([]*innerPool, len(p.rebalanceParams.nodesGroup))
|
|
var atLeastOneHealthy bool
|
|
|
|
for i, nodes := range p.rebalanceParams.nodesGroup {
|
|
clients := make([]client, len(nodes))
|
|
for j, node := range nodes {
|
|
clients[j] = newTreeClient(node.Address(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
|
|
if err := clients[j].dial(ctx); err != nil {
|
|
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
atLeastOneHealthy = true
|
|
}
|
|
|
|
inner[i] = &innerPool{
|
|
clients: clients,
|
|
}
|
|
}
|
|
|
|
if !atLeastOneHealthy {
|
|
return fmt.Errorf("at least one node must be healthy")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
p.cancel = cancel
|
|
p.closedCh = make(chan struct{})
|
|
p.innerPools = inner
|
|
|
|
go p.startRebalance(ctx)
|
|
return nil
|
|
}
|
|
|
|
// SetKey specifies default key to be used for the protocol communication by default.
|
|
func (x *InitParameters) SetKey(key *keys.PrivateKey) {
|
|
x.key = key
|
|
}
|
|
|
|
// SetLogger specifies logger.
|
|
func (x *InitParameters) SetLogger(logger *zap.Logger) {
|
|
x.logger = logger
|
|
}
|
|
|
|
// SetNodeDialTimeout specifies the timeout for connection to be established.
|
|
func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) {
|
|
x.nodeDialTimeout = timeout
|
|
}
|
|
|
|
// SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC.
|
|
func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) {
|
|
x.nodeStreamTimeout = timeout
|
|
}
|
|
|
|
// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.
|
|
//
|
|
// See also Pool.Dial.
|
|
func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
|
|
x.healthcheckTimeout = timeout
|
|
}
|
|
|
|
// SetClientRebalanceInterval specifies the interval for updating nodes health status.
|
|
//
|
|
// See also Pool.Dial.
|
|
func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
|
|
x.clientRebalanceInterval = interval
|
|
}
|
|
|
|
// AddNode append information about the node to which you want to connect.
|
|
func (x *InitParameters) AddNode(nodeParam pool.NodeParam) {
|
|
x.nodeParams = append(x.nodeParams, nodeParam)
|
|
}
|
|
|
|
// SetGRPCDialOptions sets the gRPC dial options for new gRPC tree client connection.
|
|
func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
|
|
x.dialOptions = opts
|
|
}
|
|
|
|
// SetMaxRequestAttempts sets the max attempt to make successful request.
|
|
// Default value is 0 that means the number of attempts equals to number of nodes in pool.
|
|
func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
|
|
x.maxRequestAttempts = maxAttempts
|
|
}
|
|
|
|
// SetNetMapInfoSource sets implementation of interface to get current net map and container placement policy.
|
|
// If set, AddNode will have no effect.
|
|
func (x *InitParameters) SetNetMapInfoSource(netMapInfoSource NetMapInfoSource) {
|
|
x.netMapInfoSource = netMapInfoSource
|
|
}
|
|
|
|
// GetNodes invokes eponymous method from TreeServiceClient.
|
|
//
|
|
// Can return predefined errors:
|
|
// * ErrNodeNotFound
|
|
// * ErrNodeAccessDenied.
|
|
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNodeByPathResponseInfo, error) {
|
|
body := new(tree.GetNodeByPathRequestBody)
|
|
body.SetContainerID(prm.CID)
|
|
body.SetTreeID(prm.TreeID)
|
|
body.SetPath(prm.Path)
|
|
body.SetAttributes(prm.Meta)
|
|
body.SetPathAttribute(prm.PathAttribute)
|
|
body.SetAllAttributes(prm.AllAttrs)
|
|
body.SetLatestOnly(prm.LatestOnly)
|
|
body.SetBearerToken(prm.BearerToken)
|
|
|
|
request := new(tree.GetNodeByPathRequest)
|
|
request.SetBody(body)
|
|
|
|
start := time.Now()
|
|
if err := p.signRequest(request); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var resp *tree.GetNodeByPathResponse
|
|
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
|
|
resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx))
|
|
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
|
// Empty result is expected due to delayed tree service sync.
|
|
// Return an error there to trigger retry and ignore it after,
|
|
// to keep compatibility with 'GetNodeByPath' implementation.
|
|
if inErr == nil && len(resp.GetBody().GetNodes()) == 0 {
|
|
return errNodeEmptyResult
|
|
}
|
|
return handleError("failed to get node by path", inErr)
|
|
})
|
|
p.methods[methodGetNodes].IncRequests(time.Since(start))
|
|
if err != nil && !errors.Is(err, errNodeEmptyResult) {
|
|
return nil, err
|
|
}
|
|
|
|
return resp.GetBody().GetNodes(), nil
|
|
}
|
|
|
|
// SubTreeReader is designed to read list of subtree nodes FrostFS tree service.
|
|
//
|
|
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
|
|
type SubTreeReader struct {
|
|
cli *rpcapi.GetSubTreeResponseReader
|
|
}
|
|
|
|
// Read reads another list of the subtree nodes.
|
|
func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
|
|
for i := range buf {
|
|
var resp tree.GetSubTreeResponse
|
|
err := x.cli.Read(&resp)
|
|
if err == io.EOF {
|
|
return i, io.EOF
|
|
} else if err != nil {
|
|
return i, handleError("failed to get sub tree", err)
|
|
}
|
|
buf[i] = resp.GetBody()
|
|
}
|
|
|
|
return len(buf), nil
|
|
}
|
|
|
|
// ReadAll reads all nodes subtree nodes.
|
|
func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
|
|
var res []*tree.GetSubTreeResponseBody
|
|
for {
|
|
var resp tree.GetSubTreeResponse
|
|
err := x.cli.Read(&resp)
|
|
if err == io.EOF {
|
|
break
|
|
} else if err != nil {
|
|
return nil, handleError("failed to get sub tree", err)
|
|
}
|
|
res = append(res, resp.GetBody())
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
// Next gets the next node from subtree.
|
|
func (x *SubTreeReader) Next() (*tree.GetSubTreeResponseBody, error) {
|
|
var resp tree.GetSubTreeResponse
|
|
err := x.cli.Read(&resp)
|
|
if err == io.EOF {
|
|
return nil, io.EOF
|
|
}
|
|
if err != nil {
|
|
return nil, handleError("failed to get sub tree", err)
|
|
}
|
|
|
|
return resp.GetBody(), nil
|
|
}
|
|
|
|
// GetSubTree invokes eponymous method from TreeServiceClient.
|
|
//
|
|
// Can return predefined errors:
|
|
// * ErrNodeNotFound
|
|
// * ErrNodeAccessDenied.
|
|
func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) {
|
|
body := new(tree.GetSubTreeRequestBody)
|
|
body.SetContainerID(prm.CID[:])
|
|
body.SetTreeID(prm.TreeID)
|
|
body.SetBearerToken(prm.BearerToken)
|
|
body.SetDepth(prm.Depth)
|
|
body.SetRootID(prm.RootID)
|
|
|
|
orderBy := new(tree.GetSubTreeRequestBodyOrder)
|
|
switch prm.Order {
|
|
case AscendingOrder:
|
|
orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderAsc)
|
|
default:
|
|
orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderNone)
|
|
}
|
|
body.SetOrderBy(orderBy)
|
|
|
|
request := new(tree.GetSubTreeRequest)
|
|
request.SetBody(body)
|
|
|
|
start := time.Now()
|
|
if err := p.signRequest(request); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var cli *rpcapi.GetSubTreeResponseReader
|
|
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
|
|
cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
|
|
return handleError("failed to get sub tree client", inErr)
|
|
})
|
|
p.methods[methodGetSubTree].IncRequests(time.Since(start))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &SubTreeReader{cli: cli}, nil
|
|
}
|
|
|
|
// AddNode invokes eponymous method from TreeServiceClient.
|
|
//
|
|
// Can return predefined errors:
|
|
// * ErrNodeNotFound
|
|
// * ErrNodeAccessDenied.
|
|
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
|
body := new(tree.AddRequestBody)
|
|
body.SetTreeID(prm.TreeID)
|
|
body.SetBearerToken(prm.BearerToken)
|
|
body.SetContainerID(prm.CID[:])
|
|
body.SetMeta(metaToKV(prm.Meta))
|
|
body.SetParentID(prm.Parent)
|
|
|
|
request := new(tree.AddRequest)
|
|
request.SetBody(body)
|
|
|
|
start := time.Now()
|
|
if err := p.signRequest(request); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
var resp *tree.AddResponse
|
|
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
|
|
resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx))
|
|
return handleError("failed to add node", inErr)
|
|
})
|
|
p.methods[methodAddNode].IncRequests(time.Since(start))
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return resp.GetBody().GetNodeID(), nil
|
|
}
|
|
|
|
// AddNodeByPath invokes eponymous method from TreeServiceClient.
|
|
//
|
|
// Can return predefined errors:
|
|
// * ErrNodeNotFound
|
|
// * ErrNodeAccessDenied.
|
|
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
|
|
body := new(tree.AddByPathRequestBody)
|
|
body.SetTreeID(prm.TreeID)
|
|
body.SetBearerToken(prm.BearerToken)
|
|
body.SetContainerID(prm.CID[:])
|
|
body.SetMeta(metaToKV(prm.Meta))
|
|
body.SetPathAttribute(prm.PathAttribute)
|
|
body.SetPath(prm.Path)
|
|
|
|
request := new(tree.AddByPathRequest)
|
|
request.SetBody(body)
|
|
|
|
start := time.Now()
|
|
if err := p.signRequest(request); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
var resp *tree.AddByPathResponse
|
|
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
|
|
resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx))
|
|
return handleError("failed to add node by path", inErr)
|
|
})
|
|
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
respBody := resp.GetBody()
|
|
if respBody == nil {
|
|
return 0, errors.New("nil body in tree service response")
|
|
} else if len(respBody.GetNodes()) == 0 {
|
|
return 0, errors.New("empty list of added nodes in tree service response")
|
|
}
|
|
|
|
// The first node is the leaf that we add, according to tree service docs.
|
|
return respBody.GetNodes()[0], nil
|
|
}
|
|
|
|
// MoveNode invokes eponymous method from TreeServiceClient.
|
|
//
|
|
// Can return predefined errors:
|
|
// * ErrNodeNotFound
|
|
// * ErrNodeAccessDenied.
|
|
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
|
body := new(tree.MoveRequestBody)
|
|
body.SetTreeID(prm.TreeID)
|
|
body.SetBearerToken(prm.BearerToken)
|
|
body.SetContainerID(prm.CID[:])
|
|
body.SetMeta(metaToKV(prm.Meta))
|
|
body.SetNodeID(prm.NodeID)
|
|
body.SetParentID(prm.ParentID)
|
|
|
|
request := new(tree.MoveRequest)
|
|
request.SetBody(body)
|
|
|
|
start := time.Now()
|
|
if err := p.signRequest(request); err != nil {
|
|
return err
|
|
}
|
|
|
|
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error {
|
|
if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil {
|
|
return handleError("failed to move node", err)
|
|
}
|
|
return nil
|
|
})
|
|
p.methods[methodMoveNode].IncRequests(time.Since(start))
|
|
|
|
return err
|
|
}
|
|
|
|
// RemoveNode invokes eponymous method from TreeServiceClient.
|
|
//
|
|
// Can return predefined errors:
|
|
// * ErrNodeNotFound
|
|
// * ErrNodeAccessDenied.
|
|
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
|
body := new(tree.RemoveRequestBody)
|
|
body.SetTreeID(prm.TreeID)
|
|
body.SetBearerToken(prm.BearerToken)
|
|
body.SetContainerID(prm.CID[:])
|
|
body.SetNodeID(prm.NodeID)
|
|
|
|
request := new(tree.RemoveRequest)
|
|
request.SetBody(body)
|
|
|
|
start := time.Now()
|
|
if err := p.signRequest(request); err != nil {
|
|
return err
|
|
}
|
|
|
|
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error {
|
|
if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil {
|
|
return handleError("failed to remove node", err)
|
|
}
|
|
return nil
|
|
})
|
|
p.methods[methodRemoveNode].IncRequests(time.Since(start))
|
|
|
|
return err
|
|
}
|
|
|
|
// Close closes the Pool and releases all the associated resources.
|
|
func (p *Pool) Close() error {
|
|
p.cancel()
|
|
<-p.closedCh
|
|
|
|
var err error
|
|
for _, group := range p.innerPools {
|
|
for _, cl := range group.clients {
|
|
if closeErr := cl.close(); closeErr != nil {
|
|
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
|
|
err = closeErr
|
|
}
|
|
}
|
|
}
|
|
|
|
if closeErr := p.closeClientMapConnections(); closeErr != nil {
|
|
err = closeErr
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (p *Pool) closeClientMapConnections() (err error) {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
|
|
for _, cl := range p.clientMap {
|
|
if closeErr := cl.close(); closeErr != nil {
|
|
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
|
|
err = closeErr
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Statistic returns tree pool statistics.
|
|
func (p *Pool) Statistic() Statistic {
|
|
stat := Statistic{make([]pool.StatusSnapshot, len(p.methods))}
|
|
|
|
for i, method := range p.methods {
|
|
stat.methods[i] = method.Snapshot()
|
|
method.Reset()
|
|
}
|
|
|
|
return stat
|
|
}
|
|
|
|
func handleError(msg string, err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if strings.Contains(err.Error(), "not found") {
|
|
return fmt.Errorf("%w: %s", ErrNodeNotFound, err.Error())
|
|
} else if strings.Contains(err.Error(), "denied") {
|
|
return fmt.Errorf("%w: %s", ErrNodeAccessDenied, err.Error())
|
|
}
|
|
return fmt.Errorf("%s: %w", msg, err)
|
|
}
|
|
|
|
func metaToKV(meta map[string]string) []*tree.KeyValue {
|
|
result := make([]*tree.KeyValue, 0, len(meta))
|
|
|
|
for key, value := range meta {
|
|
kv := new(tree.KeyValue)
|
|
kv.SetKey(key)
|
|
kv.SetValue([]byte(value))
|
|
result = append(result, kv)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func adjustNodeParams(nodeParams []pool.NodeParam) ([][]pool.NodeParam, error) {
|
|
if len(nodeParams) == 0 {
|
|
return nil, errors.New("no FrostFS peers configured")
|
|
}
|
|
|
|
nodeParamsMap := make(map[int][]pool.NodeParam)
|
|
for _, param := range nodeParams {
|
|
nodes := nodeParamsMap[param.Priority()]
|
|
nodeParamsMap[param.Priority()] = append(nodes, param)
|
|
}
|
|
|
|
res := make([][]pool.NodeParam, 0, len(nodeParamsMap))
|
|
for _, nodes := range nodeParamsMap {
|
|
res = append(res, nodes)
|
|
}
|
|
|
|
sort.Slice(res, func(i, j int) bool {
|
|
return res[i][0].Priority() < res[j][0].Priority()
|
|
})
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func fillDefaultInitParams(params *InitParameters) {
|
|
if params.clientRebalanceInterval <= 0 {
|
|
params.clientRebalanceInterval = defaultRebalanceInterval
|
|
}
|
|
|
|
if params.healthcheckTimeout <= 0 {
|
|
params.healthcheckTimeout = defaultHealthcheckTimeout
|
|
}
|
|
|
|
if params.nodeDialTimeout <= 0 {
|
|
params.nodeDialTimeout = defaultDialTimeout
|
|
}
|
|
|
|
if params.nodeStreamTimeout <= 0 {
|
|
params.nodeStreamTimeout = defaultStreamTimeout
|
|
}
|
|
|
|
if params.maxRequestAttempts <= 0 {
|
|
params.maxRequestAttempts = len(params.nodeParams)
|
|
}
|
|
}
|
|
|
|
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
|
if p.logger == nil {
|
|
return
|
|
}
|
|
|
|
p.logger.Log(level, msg, fields...)
|
|
}
|
|
|
|
// startRebalance runs loop to monitor tree client healthy status.
|
|
func (p *Pool) startRebalance(ctx context.Context) {
|
|
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
|
|
buffers := make([][]bool, len(p.rebalanceParams.nodesGroup))
|
|
for i, nodes := range p.rebalanceParams.nodesGroup {
|
|
buffers[i] = make([]bool, len(nodes))
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
close(p.closedCh)
|
|
return
|
|
case <-ticker.C:
|
|
p.updateNodesHealth(ctx, buffers)
|
|
ticker.Reset(p.rebalanceParams.clientRebalanceInterval)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) {
|
|
wg := sync.WaitGroup{}
|
|
for i, inner := range p.innerPools {
|
|
wg.Add(1)
|
|
|
|
go func(i int, _ *innerPool) {
|
|
defer wg.Done()
|
|
p.updateInnerNodesHealth(ctx, i, buffers[i])
|
|
}(i, inner)
|
|
}
|
|
wg.Wait()
|
|
|
|
LOOP:
|
|
for i, buffer := range buffers {
|
|
for j, healthy := range buffer {
|
|
if healthy {
|
|
p.setStartIndices(i, j)
|
|
break LOOP
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, buffer []bool) {
|
|
if i > len(p.innerPools)-1 {
|
|
return
|
|
}
|
|
nodesByPriority := p.innerPools[i]
|
|
options := p.rebalanceParams
|
|
|
|
var wg sync.WaitGroup
|
|
for j, cli := range nodesByPriority.clients {
|
|
wg.Add(1)
|
|
go func(j int, cli client) {
|
|
defer wg.Done()
|
|
|
|
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
|
|
defer c()
|
|
|
|
changed, err := cli.redialIfNecessary(tctx)
|
|
healthy := err == nil
|
|
if changed {
|
|
fields := []zap.Field{zap.String("address", cli.endpoint()), zap.Bool("healthy", healthy)}
|
|
if err != nil {
|
|
fields = append(fields, zap.Error(err))
|
|
}
|
|
p.log(zap.DebugLevel, "tree health has changed", fields...)
|
|
} else if err != nil {
|
|
p.log(zap.DebugLevel, "tree redial error", zap.String("address", cli.endpoint()), zap.Error(err))
|
|
}
|
|
buffer[j] = healthy
|
|
}(j, cli)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (p *Pool) getStartIndices() (int, int) {
|
|
p.mutex.RLock()
|
|
defer p.mutex.RUnlock()
|
|
|
|
return p.startIndices[0], p.startIndices[1]
|
|
}
|
|
|
|
func (p *Pool) setStartIndices(i, j int) {
|
|
p.mutex.Lock()
|
|
p.startIndices[0] = i
|
|
p.startIndices[1] = j
|
|
p.mutex.Unlock()
|
|
}
|
|
|
|
func (p *Pool) requestWithRetry(ctx context.Context, cnrID cid.ID, fn func(client *rpcclient.Client) error) error {
|
|
if p.netMapInfoSource != nil {
|
|
return p.requestWithRetryContainerNodes(ctx, cnrID, fn)
|
|
}
|
|
|
|
var (
|
|
err, finErr error
|
|
cl *rpcclient.Client
|
|
)
|
|
|
|
reqID := GetRequestID(ctx)
|
|
|
|
startI, startJ := p.getStartIndices()
|
|
groupsLen := len(p.innerPools)
|
|
attempts := p.maxRequestAttempts
|
|
|
|
LOOP:
|
|
for i := startI; i < startI+groupsLen; i++ {
|
|
indexI := i % groupsLen
|
|
clientsLen := len(p.innerPools[indexI].clients)
|
|
for j := startJ; j < startJ+clientsLen; j++ {
|
|
indexJ := j % clientsLen
|
|
|
|
if attempts == 0 {
|
|
if startI != indexI || startJ != indexJ {
|
|
p.setStartIndices(indexI, indexJ)
|
|
}
|
|
break LOOP
|
|
}
|
|
attempts--
|
|
|
|
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
|
|
err = fn(cl)
|
|
}
|
|
if !shouldTryAgain(err) {
|
|
if startI != indexI || startJ != indexJ {
|
|
p.setStartIndices(indexI, indexJ)
|
|
}
|
|
|
|
if err != nil {
|
|
err = fmt.Errorf("address %s: %w", p.innerPools[indexI].clients[indexJ].endpoint(), err)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
finErr = finalError(finErr, err)
|
|
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
|
|
zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
|
}
|
|
startJ = 0
|
|
}
|
|
|
|
return finErr
|
|
}
|
|
|
|
func (p *Pool) requestWithRetryContainerNodes(ctx context.Context, cnrID cid.ID, fn func(client *rpcclient.Client) error) error {
|
|
var (
|
|
err, finErr error
|
|
cl *rpcclient.Client
|
|
)
|
|
|
|
reqID := GetRequestID(ctx)
|
|
|
|
netMap, err := p.netMapInfoSource.NetMapSnapshot(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("get net map: %w", err)
|
|
}
|
|
|
|
policy, err := p.netMapInfoSource.PlacementPolicy(ctx, cnrID)
|
|
if err != nil {
|
|
return fmt.Errorf("get container placement policy: %w", err)
|
|
}
|
|
|
|
cnrNodes, err := netMap.ContainerNodes(policy, cnrID[:])
|
|
if err != nil {
|
|
return fmt.Errorf("get container nodes: %w", err)
|
|
}
|
|
|
|
cnrNodes, err = netMap.PlacementVectors(cnrNodes, cnrID[:])
|
|
if err != nil {
|
|
return fmt.Errorf("get placement vectors: %w", err)
|
|
}
|
|
|
|
attempts := p.maxRequestAttempts
|
|
|
|
LOOP:
|
|
for _, cnrNodeGroup := range cnrNodes {
|
|
for _, cnrNode := range cnrNodeGroup {
|
|
if attempts == 0 {
|
|
break LOOP
|
|
}
|
|
|
|
treeCl, ok := p.getClientFromMap(cnrNode.Hash())
|
|
if !ok {
|
|
treeCl, err = p.getNewTreeClient(ctx, cnrNode)
|
|
if err != nil {
|
|
finErr = finalError(finErr, err)
|
|
p.log(zap.DebugLevel, "failed to create tree client", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts))
|
|
continue
|
|
}
|
|
|
|
p.addClientToMap(cnrNode.Hash(), treeCl)
|
|
}
|
|
attempts--
|
|
|
|
if cl, err = treeCl.serviceClient(); err == nil {
|
|
err = fn(cl)
|
|
}
|
|
if shouldRedial(ctx, err) {
|
|
p.deleteClientFromMap(cnrNode.Hash())
|
|
}
|
|
if !shouldTryAgain(err) {
|
|
if err != nil {
|
|
err = fmt.Errorf("address %s: %w", treeCl.endpoint(), err)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
finErr = finalError(finErr, err)
|
|
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
|
|
zap.String("address", treeCl.endpoint()), zap.Error(err))
|
|
}
|
|
}
|
|
|
|
return finErr
|
|
}
|
|
|
|
func (p *Pool) getClientFromMap(hash uint64) (client, bool) {
|
|
p.mutex.RLock()
|
|
defer p.mutex.RUnlock()
|
|
|
|
cl, ok := p.clientMap[hash]
|
|
return cl, ok
|
|
}
|
|
|
|
func (p *Pool) addClientToMap(hash uint64, cl client) {
|
|
p.mutex.Lock()
|
|
p.clientMap[hash] = cl
|
|
p.mutex.Unlock()
|
|
}
|
|
|
|
func (p *Pool) deleteClientFromMap(hash uint64) {
|
|
p.mutex.Lock()
|
|
_ = p.clientMap[hash].close()
|
|
delete(p.clientMap, hash)
|
|
p.mutex.Unlock()
|
|
}
|
|
|
|
func (p *Pool) getNewTreeClient(ctx context.Context, node netmap.NodeInfo) (*treeClient, error) {
|
|
var (
|
|
treeCl *treeClient
|
|
err error
|
|
)
|
|
|
|
node.IterateNetworkEndpoints(func(endpoint string) bool {
|
|
var addr network.Address
|
|
if err = addr.FromString(endpoint); err != nil {
|
|
p.log(zap.WarnLevel, "can't parse endpoint", zap.String("endpoint", endpoint), zap.Error(err))
|
|
return false
|
|
}
|
|
|
|
newTreeCl := newTreeClient(addr.URIAddr(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
|
|
if err = newTreeCl.dial(ctx); err != nil {
|
|
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
|
|
return false
|
|
}
|
|
|
|
treeCl = newTreeCl
|
|
return true
|
|
})
|
|
|
|
if treeCl == nil {
|
|
return nil, fmt.Errorf("tree client wasn't initialized")
|
|
}
|
|
|
|
return treeCl, nil
|
|
}
|
|
|
|
func shouldTryAgain(err error) bool {
|
|
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
|
|
}
|
|
|
|
func shouldRedial(ctx context.Context, err error) bool {
|
|
if err == nil || errors.Is(err, ErrNodeAccessDenied) || errors.Is(err, ErrNodeNotFound) || errors.Is(err, errNodeEmptyResult) || errors.Is(ctx.Err(), context.Canceled) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func prioErr(err error) int {
|
|
switch {
|
|
case err == nil:
|
|
return -1
|
|
case errors.Is(err, ErrNodeAccessDenied):
|
|
return 100
|
|
case errors.Is(err, ErrNodeNotFound) ||
|
|
errors.Is(err, errNodeEmptyResult):
|
|
return 200
|
|
case errors.Is(err, ErrUnhealthyEndpoint):
|
|
return 300
|
|
default:
|
|
return 500
|
|
}
|
|
}
|
|
|
|
func finalError(current, candidate error) error {
|
|
if current == nil || candidate == nil {
|
|
return candidate
|
|
}
|
|
|
|
// lower priority error is more desirable
|
|
if prioErr(candidate) < prioErr(current) {
|
|
return candidate
|
|
}
|
|
|
|
return current
|
|
}
|
|
|
|
type reqKeyType struct{}
|
|
|
|
// SetRequestID sets request identifier to context so when some operations are logged in tree pool
|
|
// this identifier also be logged.
|
|
func SetRequestID(ctx context.Context, reqID string) context.Context {
|
|
return context.WithValue(ctx, reqKeyType{}, reqID)
|
|
}
|
|
|
|
// GetRequestID fetch tree pool request identifier from context.
|
|
func GetRequestID(ctx context.Context) string {
|
|
reqID, _ := ctx.Value(reqKeyType{}).(string)
|
|
return reqID
|
|
}
|