frostfs-sdk-go/pool/tree/pool.go

794 lines
21 KiB
Go
Raw Permalink Normal View History

package tree
import (
"context"
"errors"
"fmt"
"io"
"sort"
"strings"
"sync"
"time"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
const (
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
)
var (
// ErrNodeNotFound is returned from Tree service in case of not found error.
ErrNodeNotFound = errors.New("not found")
// ErrNodeAccessDenied is returned from Tree service in case of access denied error.
ErrNodeAccessDenied = errors.New("access denied")
// errNodeEmpty is used to trigger retry when 'GetNodeByPath' return empty result.
errNodeEmptyResult = errors.New("empty result")
)
// client represents virtual connection to the single FrostFS tree service from which Pool is formed.
// This interface is expected to have exactly one production implementation - treeClient.
// Others are expected to be for test purposes only.
type client interface {
serviceClient() (grpcService.TreeServiceClient, error)
endpoint() string
isHealthy() bool
setHealthy(bool)
dial(ctx context.Context) error
redialIfNecessary(context.Context) (bool, error)
close() error
}
// InitParameters contains values used to initialize connection Pool.
type InitParameters struct {
key *keys.PrivateKey
logger *zap.Logger
nodeDialTimeout time.Duration
nodeStreamTimeout time.Duration
healthcheckTimeout time.Duration
clientRebalanceInterval time.Duration
nodeParams []pool.NodeParam
dialOptions []grpc.DialOption
}
// Pool represents virtual connection to the FrostFS tree services network to communicate
// with multiple FrostFS tree services without thinking about switching between servers
// due to their unavailability.
//
// Pool can be created and initialized using NewPool function.
// Before executing the FrostFS tree operations using the Pool, connection to the
// servers MUST BE correctly established (see Dial method).
type Pool struct {
innerPools []*innerPool
key *keys.PrivateKey
cancel context.CancelFunc
closedCh chan struct{}
rebalanceParams rebalanceParameters
dialOptions []grpc.DialOption
logger *zap.Logger
startIndicesMtx sync.RWMutex
// startIndices points to the client from which the next request will be executed.
// Since clients are stored in innerPool field we have to use two indices.
// These indices being changed during:
// * rebalance procedure (see Pool.startRebalance)
// * retry in case of request failure (see Pool.requestWithRetry)
startIndices [2]int
}
type innerPool struct {
clients []client
}
type rebalanceParameters struct {
nodesGroup [][]pool.NodeParam
nodeRequestTimeout time.Duration
clientRebalanceInterval time.Duration
}
// GetNodesParams groups parameters of Pool.GetNodes operation.
type GetNodesParams struct {
CID cid.ID
TreeID string
Path []string
Meta []string
PathAttribute string
LatestOnly bool
AllAttrs bool
BearerToken []byte
}
// GetSubTreeParams groups parameters of Pool.GetSubTree operation.
type GetSubTreeParams struct {
CID cid.ID
TreeID string
RootID uint64
Depth uint32
BearerToken []byte
}
// AddNodeParams groups parameters of Pool.AddNode operation.
type AddNodeParams struct {
CID cid.ID
TreeID string
Parent uint64
Meta map[string]string
BearerToken []byte
}
// AddNodeByPathParams groups parameters of Pool.AddNodeByPath operation.
type AddNodeByPathParams struct {
CID cid.ID
TreeID string
Path []string
Meta map[string]string
PathAttribute string
BearerToken []byte
}
// MoveNodeParams groups parameters of Pool.MoveNode operation.
type MoveNodeParams struct {
CID cid.ID
TreeID string
NodeID uint64
ParentID uint64
Meta map[string]string
BearerToken []byte
}
// RemoveNodeParams groups parameters of Pool.RemoveNode operation.
type RemoveNodeParams struct {
CID cid.ID
TreeID string
NodeID uint64
BearerToken []byte
}
// NewPool creates connection pool using parameters.
func NewPool(options InitParameters) (*Pool, error) {
if options.key == nil {
return nil, fmt.Errorf("missed required parameter 'Key'")
}
nodesParams, err := adjustNodeParams(options.nodeParams)
if err != nil {
return nil, err
}
fillDefaultInitParams(&options)
p := &Pool{
key: options.key,
logger: options.logger,
dialOptions: options.dialOptions,
rebalanceParams: rebalanceParameters{
nodesGroup: nodesParams,
nodeRequestTimeout: options.healthcheckTimeout,
clientRebalanceInterval: options.clientRebalanceInterval,
},
}
return p, nil
}
// Dial establishes a connection to the tree servers from the FrostFS network.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed, the Pool SHOULD NOT be used.
//
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
inner := make([]*innerPool, len(p.rebalanceParams.nodesGroup))
var atLeastOneHealthy bool
for i, nodes := range p.rebalanceParams.nodesGroup {
clients := make([]client, len(nodes))
for j, node := range nodes {
clients[j] = newTreeClient(node.Address(), p.dialOptions...)
if err := clients[j].dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
continue
}
atLeastOneHealthy = true
}
inner[i] = &innerPool{
clients: clients,
}
}
if !atLeastOneHealthy {
return fmt.Errorf("at least one node must be healthy")
}
ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel
p.closedCh = make(chan struct{})
p.innerPools = inner
go p.startRebalance(ctx)
return nil
}
// SetKey specifies default key to be used for the protocol communication by default.
func (x *InitParameters) SetKey(key *keys.PrivateKey) {
x.key = key
}
// SetLogger specifies logger.
func (x *InitParameters) SetLogger(logger *zap.Logger) {
x.logger = logger
}
// SetNodeDialTimeout specifies the timeout for connection to be established.
func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) {
x.nodeDialTimeout = timeout
}
// SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC.
func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) {
x.nodeStreamTimeout = timeout
}
// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.
//
// See also Pool.Dial.
func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
x.healthcheckTimeout = timeout
}
// SetClientRebalanceInterval specifies the interval for updating nodes health status.
//
// See also Pool.Dial.
func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
x.clientRebalanceInterval = interval
}
// AddNode append information about the node to which you want to connect.
func (x *InitParameters) AddNode(nodeParam pool.NodeParam) {
x.nodeParams = append(x.nodeParams, nodeParam)
}
// SetGRPCDialOptions sets the gRPC dial options for new gRPC tree client connection.
func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
x.dialOptions = opts
}
// GetNodes invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) {
request := &grpcService.GetNodeByPathRequest{
Body: &grpcService.GetNodeByPathRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
Path: prm.Path,
Attributes: prm.Meta,
PathAttribute: prm.PathAttribute,
LatestOnly: prm.LatestOnly,
AllAttributes: prm.AllAttrs,
BearerToken: prm.BearerToken,
},
}
if err := p.signRequest(request.Body, func(key, sign []byte) {
request.Signature = &grpcService.Signature{
Key: key,
Sign: sign,
}
}); err != nil {
return nil, err
}
var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after,
// to keep compatibility with 'GetNodeByPath' implementation.
if inErr == nil && len(resp.Body.Nodes) == 0 {
return errNodeEmptyResult
}
return handleError("failed to get node by path", inErr)
}); err != nil && !errors.Is(err, errNodeEmptyResult) {
return nil, err
}
return resp.GetBody().GetNodes(), nil
}
// SubTreeReader is designed to read list of subtree nodes FrostFS tree service.
//
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
type SubTreeReader struct {
cli grpcService.TreeService_GetSubTreeClient
}
// Read reads another list of the subtree nodes.
func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, error) {
for i := 0; i < len(buf); i++ {
resp, err := x.cli.Recv()
if err == io.EOF {
return i, io.EOF
} else if err != nil {
return i, handleError("failed to get sub tree", err)
}
buf[i] = resp.Body
}
return len(buf), nil
}
// ReadAll reads all nodes subtree nodes.
func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) {
var res []*grpcService.GetSubTreeResponse_Body
for {
resp, err := x.cli.Recv()
if err == io.EOF {
break
} else if err != nil {
return nil, handleError("failed to get sub tree", err)
}
res = append(res, resp.Body)
}
return res, nil
}
// Next gets the next node from subtree.
func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) {
resp, err := x.cli.Recv()
if err == io.EOF {
return nil, io.EOF
}
if err != nil {
return nil, handleError("failed to get sub tree", err)
}
return resp.Body, nil
}
// GetSubTree invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) {
request := &grpcService.GetSubTreeRequest{
Body: &grpcService.GetSubTreeRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
RootId: prm.RootID,
Depth: prm.Depth,
BearerToken: prm.BearerToken,
OrderBy: &grpcService.GetSubTreeRequest_Body_Order{
Direction: grpcService.GetSubTreeRequest_Body_Order_Asc,
},
},
}
if err := p.signRequest(request.Body, func(key, sign []byte) {
request.Signature = &grpcService.Signature{
Key: key,
Sign: sign,
}
}); err != nil {
return nil, err
}
var cli grpcService.TreeService_GetSubTreeClient
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr)
}); err != nil {
return nil, err
}
return &SubTreeReader{cli: cli}, nil
}
// AddNode invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
request := &grpcService.AddRequest{
Body: &grpcService.AddRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
ParentId: prm.Parent,
Meta: metaToKV(prm.Meta),
BearerToken: prm.BearerToken,
},
}
if err := p.signRequest(request.Body, func(key, sign []byte) {
request.Signature = &grpcService.Signature{
Key: key,
Sign: sign,
}
}); err != nil {
return 0, err
}
var resp *grpcService.AddResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.Add(ctx, request)
return handleError("failed to add node", inErr)
}); err != nil {
return 0, err
}
return resp.GetBody().GetNodeId(), nil
}
// AddNodeByPath invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
request := &grpcService.AddByPathRequest{
Body: &grpcService.AddByPathRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
Path: prm.Path,
Meta: metaToKV(prm.Meta),
PathAttribute: prm.PathAttribute,
BearerToken: prm.BearerToken,
},
}
if err := p.signRequest(request.Body, func(key, sign []byte) {
request.Signature = &grpcService.Signature{
Key: key,
Sign: sign,
}
}); err != nil {
return 0, err
}
var resp *grpcService.AddByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr)
}); err != nil {
return 0, err
}
body := resp.GetBody()
if body == nil {
return 0, errors.New("nil body in tree service response")
} else if len(body.Nodes) == 0 {
return 0, errors.New("empty list of added nodes in tree service response")
}
// The first node is the leaf that we add, according to tree service docs.
return body.Nodes[0], nil
}
// MoveNode invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
request := &grpcService.MoveRequest{
Body: &grpcService.MoveRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
NodeId: prm.NodeID,
ParentId: prm.ParentID,
Meta: metaToKV(prm.Meta),
BearerToken: prm.BearerToken,
},
}
if err := p.signRequest(request.Body, func(key, sign []byte) {
request.Signature = &grpcService.Signature{
Key: key,
Sign: sign,
}
}); err != nil {
return err
}
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil {
return handleError("failed to move node", err)
}
return nil
})
}
// RemoveNode invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
request := &grpcService.RemoveRequest{
Body: &grpcService.RemoveRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
NodeId: prm.NodeID,
BearerToken: prm.BearerToken,
},
}
if err := p.signRequest(request.Body, func(key, sign []byte) {
request.Signature = &grpcService.Signature{
Key: key,
Sign: sign,
}
}); err != nil {
return err
}
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err)
}
return nil
})
}
// Close closes the Pool and releases all the associated resources.
func (p *Pool) Close() error {
p.cancel()
<-p.closedCh
var err error
for _, group := range p.innerPools {
for _, cl := range group.clients {
if closeErr := cl.close(); closeErr != nil {
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
err = closeErr
}
}
}
return err
}
func handleError(msg string, err error) error {
if err == nil {
return nil
}
if strings.Contains(err.Error(), "not found") {
return fmt.Errorf("%w: %s", ErrNodeNotFound, err.Error())
} else if strings.Contains(err.Error(), "is denied by") {
return fmt.Errorf("%w: %s", ErrNodeAccessDenied, err.Error())
}
return fmt.Errorf("%s: %w", msg, err)
}
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
result := make([]*grpcService.KeyValue, 0, len(meta))
for key, value := range meta {
result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)})
}
return result
}
func adjustNodeParams(nodeParams []pool.NodeParam) ([][]pool.NodeParam, error) {
if len(nodeParams) == 0 {
return nil, errors.New("no FrostFS peers configured")
}
nodeParamsMap := make(map[int][]pool.NodeParam)
for _, param := range nodeParams {
nodes := nodeParamsMap[param.Priority()]
nodeParamsMap[param.Priority()] = append(nodes, param)
}
res := make([][]pool.NodeParam, 0, len(nodeParamsMap))
for _, nodes := range nodeParamsMap {
res = append(res, nodes)
}
sort.Slice(res, func(i, j int) bool {
return res[i][0].Priority() < res[j][0].Priority()
})
return res, nil
}
func fillDefaultInitParams(params *InitParameters) {
if params.clientRebalanceInterval <= 0 {
params.clientRebalanceInterval = defaultRebalanceInterval
}
if params.healthcheckTimeout <= 0 {
params.healthcheckTimeout = defaultHealthcheckTimeout
}
if params.nodeDialTimeout <= 0 {
params.nodeDialTimeout = defaultDialTimeout
}
if params.nodeStreamTimeout <= 0 {
params.nodeStreamTimeout = defaultStreamTimeout
}
}
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
if p.logger == nil {
return
}
p.logger.Log(level, msg, fields...)
}
// startRebalance runs loop to monitor tree client healthy status.
func (p *Pool) startRebalance(ctx context.Context) {
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
buffers := make([][]bool, len(p.rebalanceParams.nodesGroup))
for i, nodes := range p.rebalanceParams.nodesGroup {
buffers[i] = make([]bool, len(nodes))
}
for {
select {
case <-ctx.Done():
close(p.closedCh)
return
case <-ticker.C:
p.updateNodesHealth(ctx, buffers)
ticker.Reset(p.rebalanceParams.clientRebalanceInterval)
}
}
}
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) {
wg := sync.WaitGroup{}
for i, inner := range p.innerPools {
wg.Add(1)
go func(i int, innerPool *innerPool) {
defer wg.Done()
p.updateInnerNodesHealth(ctx, i, buffers[i])
}(i, inner)
}
wg.Wait()
LOOP:
for i, buffer := range buffers {
for j, healthy := range buffer {
if healthy {
p.setStartIndices(i, j)
break LOOP
}
}
}
}
func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, buffer []bool) {
if i > len(p.innerPools)-1 {
return
}
nodesByPriority := p.innerPools[i]
options := p.rebalanceParams
var wg sync.WaitGroup
for j, cli := range nodesByPriority.clients {
wg.Add(1)
go func(j int, cli client) {
defer wg.Done()
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
defer c()
changed, err := cli.redialIfNecessary(tctx)
healthy := err == nil
if changed {
fields := []zap.Field{zap.String("address", cli.endpoint()), zap.Bool("healthy", healthy)}
if err != nil {
fields = append(fields, zap.Error(err))
}
p.log(zap.DebugLevel, "tree health has changed", fields...)
} else if err != nil {
p.log(zap.DebugLevel, "tree redial error", zap.String("address", cli.endpoint()), zap.Error(err))
}
buffer[j] = healthy
}(j, cli)
}
wg.Wait()
}
func (p *Pool) getStartIndices() (int, int) {
p.startIndicesMtx.RLock()
defer p.startIndicesMtx.RUnlock()
return p.startIndices[0], p.startIndices[1]
}
func (p *Pool) setStartIndices(i, j int) {
p.startIndicesMtx.Lock()
p.startIndices[0] = i
p.startIndices[1] = j
p.startIndicesMtx.Unlock()
}
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
var (
err, finErr error
cl grpcService.TreeServiceClient
)
startI, startJ := p.getStartIndices()
groupsLen := len(p.innerPools)
for i := startI; i < startI+groupsLen; i++ {
indexI := i % groupsLen
clientsLen := len(p.innerPools[indexI].clients)
for j := startJ; j < startJ+clientsLen; j++ {
indexJ := j % clientsLen
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
err = fn(cl)
}
if !shouldTryAgain(err) {
if startI != indexI || startJ != indexJ {
p.setStartIndices(indexI, indexJ)
}
return err
}
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
}
startJ = 0
}
return finErr
}
func shouldTryAgain(err error) bool {
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
}
func prioErr(err error) int {
switch {
case err == nil:
return -1
case errors.Is(err, ErrNodeAccessDenied):
return 100
case errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, errNodeEmptyResult):
return 200
case errors.Is(err, ErrUnhealthyEndpoint):
return 300
default:
return 500
}
}
func finalError(current, candidate error) error {
if current == nil || candidate == nil {
return candidate
}
// lower priority error is more desirable
if prioErr(candidate) < prioErr(current) {
return candidate
}
return current
}