package tree import ( "context" "errors" "fmt" "io" "sort" "strings" "sync" "time" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" ) const ( defaultRebalanceInterval = 15 * time.Second defaultHealthcheckTimeout = 4 * time.Second defaultDialTimeout = 5 * time.Second defaultStreamTimeout = 10 * time.Second ) var ( // ErrNodeNotFound is returned from Tree service in case of not found error. ErrNodeNotFound = errors.New("not found") // ErrNodeAccessDenied is returned from Tree service in case of access denied error. ErrNodeAccessDenied = errors.New("access denied") ) // client represents virtual connection to the single FrostFS tree service from which Pool is formed. // This interface is expected to have exactly one production implementation - treeClient. // Others are expected to be for test purposes only. type client interface { serviceClient() (grpcService.TreeServiceClient, error) endpoint() string isHealthy() bool setHealthy(bool) dial(ctx context.Context) error redialIfNecessary(context.Context) (bool, error) close() error } // InitParameters contains values used to initialize connection Pool. type InitParameters struct { key *keys.PrivateKey logger *zap.Logger nodeDialTimeout time.Duration nodeStreamTimeout time.Duration healthcheckTimeout time.Duration clientRebalanceInterval time.Duration nodeParams []pool.NodeParam dialOptions []grpc.DialOption } // Pool represents virtual connection to the FrostFS tree services network to communicate // with multiple FrostFS tree services without thinking about switching between servers // due to their unavailability. // // Pool can be created and initialized using NewPool function. // Before executing the FrostFS tree operations using the Pool, connection to the // servers MUST BE correctly established (see Dial method). type Pool struct { innerPools []*innerPool key *keys.PrivateKey cancel context.CancelFunc closedCh chan struct{} rebalanceParams rebalanceParameters dialOptions []grpc.DialOption logger *zap.Logger startIndicesMtx sync.RWMutex // startIndices points to the client from which the next request will be executed. // Since clients are stored in innerPool field we have to use two indices. // These indices being changed during: // * rebalance procedure (see Pool.startRebalance) // * retry in case of request failure (see Pool.requestWithRetry) startIndices [2]int } type innerPool struct { clients []client } type rebalanceParameters struct { nodesGroup [][]pool.NodeParam nodeRequestTimeout time.Duration clientRebalanceInterval time.Duration } // GetNodesParams groups parameters of Pool.GetNodes operation. type GetNodesParams struct { CID cid.ID TreeID string Path []string Meta []string PathAttribute string LatestOnly bool AllAttrs bool BearerToken []byte } // GetSubTreeParams groups parameters of Pool.GetSubTree operation. type GetSubTreeParams struct { CID cid.ID TreeID string RootID uint64 Depth uint32 BearerToken []byte } // AddNodeParams groups parameters of Pool.AddNode operation. type AddNodeParams struct { CID cid.ID TreeID string Parent uint64 Meta map[string]string BearerToken []byte } // AddNodeByPathParams groups parameters of Pool.AddNodeByPath operation. type AddNodeByPathParams struct { CID cid.ID TreeID string Path []string Meta map[string]string PathAttribute string BearerToken []byte } // MoveNodeParams groups parameters of Pool.MoveNode operation. type MoveNodeParams struct { CID cid.ID TreeID string NodeID uint64 ParentID uint64 Meta map[string]string BearerToken []byte } // RemoveNodeParams groups parameters of Pool.RemoveNode operation. type RemoveNodeParams struct { CID cid.ID TreeID string NodeID uint64 BearerToken []byte } // NewPool creates connection pool using parameters. func NewPool(options InitParameters) (*Pool, error) { if options.key == nil { return nil, fmt.Errorf("missed required parameter 'Key'") } nodesParams, err := adjustNodeParams(options.nodeParams) if err != nil { return nil, err } fillDefaultInitParams(&options) p := &Pool{ key: options.key, logger: options.logger, dialOptions: options.dialOptions, rebalanceParams: rebalanceParameters{ nodesGroup: nodesParams, nodeRequestTimeout: options.healthcheckTimeout, clientRebalanceInterval: options.clientRebalanceInterval, }, } return p, nil } // Dial establishes a connection to the tree servers from the FrostFS network. // It also starts a routine that checks the health of the nodes and // updates the weights of the nodes for balancing. // Returns an error describing failure reason. // // If failed, the Pool SHOULD NOT be used. // // See also InitParameters.SetClientRebalanceInterval. func (p *Pool) Dial(ctx context.Context) error { inner := make([]*innerPool, len(p.rebalanceParams.nodesGroup)) var atLeastOneHealthy bool for i, nodes := range p.rebalanceParams.nodesGroup { clients := make([]client, len(nodes)) for j, node := range nodes { clients[j] = newTreeClient(node.Address(), p.dialOptions...) if err := clients[j].dial(ctx); err != nil { p.log(zap.WarnLevel, "failed to build client", zap.String("address", node.Address()), zap.Error(err)) continue } atLeastOneHealthy = true } inner[i] = &innerPool{ clients: clients, } } if !atLeastOneHealthy { return fmt.Errorf("at least one node must be healthy") } ctx, cancel := context.WithCancel(ctx) p.cancel = cancel p.closedCh = make(chan struct{}) p.innerPools = inner go p.startRebalance(ctx) return nil } // SetKey specifies default key to be used for the protocol communication by default. func (x *InitParameters) SetKey(key *keys.PrivateKey) { x.key = key } // SetLogger specifies logger. func (x *InitParameters) SetLogger(logger *zap.Logger) { x.logger = logger } // SetNodeDialTimeout specifies the timeout for connection to be established. func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) { x.nodeDialTimeout = timeout } // SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC. func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) { x.nodeStreamTimeout = timeout } // SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive. // // See also Pool.Dial. func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) { x.healthcheckTimeout = timeout } // SetClientRebalanceInterval specifies the interval for updating nodes health status. // // See also Pool.Dial. func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) { x.clientRebalanceInterval = interval } // AddNode append information about the node to which you want to connect. func (x *InitParameters) AddNode(nodeParam pool.NodeParam) { x.nodeParams = append(x.nodeParams, nodeParam) } // SetGRPCDialOptions sets the gRPC dial options for new gRPC tree client connection. func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) { x.dialOptions = opts } // GetNodes invokes eponymous method from TreeServiceClient. // // Can return predefined errors: // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) { request := &grpcService.GetNodeByPathRequest{ Body: &grpcService.GetNodeByPathRequest_Body{ ContainerId: prm.CID[:], TreeId: prm.TreeID, Path: prm.Path, Attributes: prm.Meta, PathAttribute: prm.PathAttribute, LatestOnly: prm.LatestOnly, AllAttributes: prm.AllAttrs, BearerToken: prm.BearerToken, }, } if err := p.signRequest(request.Body, func(key, sign []byte) { request.Signature = &grpcService.Signature{ Key: key, Sign: sign, } }); err != nil { return nil, err } var resp *grpcService.GetNodeByPathResponse if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { resp, inErr = client.GetNodeByPath(ctx, request) return handleError("failed to get node by path", inErr) }); err != nil { return nil, err } return resp.GetBody().GetNodes(), nil } // SubTreeReader is designed to read list of subtree nodes FrostFS tree service. // // Must be initialized using Pool.GetSubTree, any other usage is unsafe. type SubTreeReader struct { cli grpcService.TreeService_GetSubTreeClient } // Read reads another list of the subtree nodes. func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, error) { for i := 0; i < len(buf); i++ { resp, err := x.cli.Recv() if err == io.EOF { return i, io.EOF } else if err != nil { return i, handleError("failed to get sub tree", err) } buf[i] = resp.Body } return len(buf), nil } // ReadAll reads all nodes subtree nodes. func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) { var res []*grpcService.GetSubTreeResponse_Body for { resp, err := x.cli.Recv() if err == io.EOF { break } else if err != nil { return nil, handleError("failed to get sub tree", err) } res = append(res, resp.Body) } return res, nil } // Next gets the next node from subtree. func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) { resp, err := x.cli.Recv() if err == io.EOF { return nil, io.EOF } if err != nil { return nil, handleError("failed to get sub tree", err) } return resp.Body, nil } // GetSubTree invokes eponymous method from TreeServiceClient. // // Can return predefined errors: // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) { request := &grpcService.GetSubTreeRequest{ Body: &grpcService.GetSubTreeRequest_Body{ ContainerId: prm.CID[:], TreeId: prm.TreeID, RootId: prm.RootID, Depth: prm.Depth, BearerToken: prm.BearerToken, }, } if err := p.signRequest(request.Body, func(key, sign []byte) { request.Signature = &grpcService.Signature{ Key: key, Sign: sign, } }); err != nil { return nil, err } var cli grpcService.TreeService_GetSubTreeClient if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { cli, inErr = client.GetSubTree(ctx, request) return handleError("failed to get sub tree client", inErr) }); err != nil { return nil, err } return &SubTreeReader{cli: cli}, nil } // AddNode invokes eponymous method from TreeServiceClient. // // Can return predefined errors: // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { request := &grpcService.AddRequest{ Body: &grpcService.AddRequest_Body{ ContainerId: prm.CID[:], TreeId: prm.TreeID, ParentId: prm.Parent, Meta: metaToKV(prm.Meta), BearerToken: prm.BearerToken, }, } if err := p.signRequest(request.Body, func(key, sign []byte) { request.Signature = &grpcService.Signature{ Key: key, Sign: sign, } }); err != nil { return 0, err } var resp *grpcService.AddResponse if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { resp, inErr = client.Add(ctx, request) return handleError("failed to add node", inErr) }); err != nil { return 0, err } return resp.GetBody().GetNodeId(), nil } // AddNodeByPath invokes eponymous method from TreeServiceClient. // // Can return predefined errors: // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) { request := &grpcService.AddByPathRequest{ Body: &grpcService.AddByPathRequest_Body{ ContainerId: prm.CID[:], TreeId: prm.TreeID, Path: prm.Path, Meta: metaToKV(prm.Meta), PathAttribute: prm.PathAttribute, BearerToken: prm.BearerToken, }, } if err := p.signRequest(request.Body, func(key, sign []byte) { request.Signature = &grpcService.Signature{ Key: key, Sign: sign, } }); err != nil { return 0, err } var resp *grpcService.AddByPathResponse if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { resp, inErr = client.AddByPath(ctx, request) return handleError("failed to add node by path", inErr) }); err != nil { return 0, err } body := resp.GetBody() if body == nil { return 0, errors.New("nil body in tree service response") } else if len(body.Nodes) == 0 { return 0, errors.New("empty list of added nodes in tree service response") } // The first node is the leaf that we add, according to tree service docs. return body.Nodes[0], nil } // MoveNode invokes eponymous method from TreeServiceClient. // // Can return predefined errors: // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { request := &grpcService.MoveRequest{ Body: &grpcService.MoveRequest_Body{ ContainerId: prm.CID[:], TreeId: prm.TreeID, NodeId: prm.NodeID, ParentId: prm.ParentID, Meta: metaToKV(prm.Meta), BearerToken: prm.BearerToken, }, } if err := p.signRequest(request.Body, func(key, sign []byte) { request.Signature = &grpcService.Signature{ Key: key, Sign: sign, } }); err != nil { return err } return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { if _, err := client.Move(ctx, request); err != nil { return handleError("failed to move node", err) } return nil }) } // RemoveNode invokes eponymous method from TreeServiceClient. // // Can return predefined errors: // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error { request := &grpcService.RemoveRequest{ Body: &grpcService.RemoveRequest_Body{ ContainerId: prm.CID[:], TreeId: prm.TreeID, NodeId: prm.NodeID, BearerToken: prm.BearerToken, }, } if err := p.signRequest(request.Body, func(key, sign []byte) { request.Signature = &grpcService.Signature{ Key: key, Sign: sign, } }); err != nil { return err } return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { if _, err := client.Remove(ctx, request); err != nil { return handleError("failed to remove node", err) } return nil }) } // Close closes the Pool and releases all the associated resources. func (p *Pool) Close() error { p.cancel() <-p.closedCh var err error for _, group := range p.innerPools { for _, cl := range group.clients { if closeErr := cl.close(); closeErr != nil { p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr)) err = closeErr } } } return err } func handleError(msg string, err error) error { if err == nil { return nil } if strings.Contains(err.Error(), "not found") { return fmt.Errorf("%w: %s", ErrNodeNotFound, err.Error()) } else if strings.Contains(err.Error(), "is denied by") { return fmt.Errorf("%w: %s", ErrNodeAccessDenied, err.Error()) } return fmt.Errorf("%s: %w", msg, err) } func metaToKV(meta map[string]string) []*grpcService.KeyValue { result := make([]*grpcService.KeyValue, 0, len(meta)) for key, value := range meta { result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)}) } return result } func adjustNodeParams(nodeParams []pool.NodeParam) ([][]pool.NodeParam, error) { if len(nodeParams) == 0 { return nil, errors.New("no FrostFS peers configured") } nodeParamsMap := make(map[int][]pool.NodeParam) for _, param := range nodeParams { nodes := nodeParamsMap[param.Priority()] nodeParamsMap[param.Priority()] = append(nodes, param) } res := make([][]pool.NodeParam, 0, len(nodeParamsMap)) for _, nodes := range nodeParamsMap { res = append(res, nodes) } sort.Slice(res, func(i, j int) bool { return res[i][0].Priority() < res[j][0].Priority() }) return res, nil } func fillDefaultInitParams(params *InitParameters) { if params.clientRebalanceInterval <= 0 { params.clientRebalanceInterval = defaultRebalanceInterval } if params.healthcheckTimeout <= 0 { params.healthcheckTimeout = defaultHealthcheckTimeout } if params.nodeDialTimeout <= 0 { params.nodeDialTimeout = defaultDialTimeout } if params.nodeStreamTimeout <= 0 { params.nodeStreamTimeout = defaultStreamTimeout } } func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) { if p.logger == nil { return } p.logger.Log(level, msg, fields...) } // startRebalance runs loop to monitor tree client healthy status. func (p *Pool) startRebalance(ctx context.Context) { ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval) buffers := make([][]bool, len(p.rebalanceParams.nodesGroup)) for i, nodes := range p.rebalanceParams.nodesGroup { buffers[i] = make([]bool, len(nodes)) } for { select { case <-ctx.Done(): close(p.closedCh) return case <-ticker.C: p.updateNodesHealth(ctx, buffers) ticker.Reset(p.rebalanceParams.clientRebalanceInterval) } } } func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) { wg := sync.WaitGroup{} for i, inner := range p.innerPools { wg.Add(1) go func(i int, innerPool *innerPool) { defer wg.Done() p.updateInnerNodesHealth(ctx, i, buffers[i]) }(i, inner) } wg.Wait() LOOP: for i, buffer := range buffers { for j, healthy := range buffer { if healthy { p.setStartIndices(i, j) break LOOP } } } } func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, buffer []bool) { if i > len(p.innerPools)-1 { return } nodesByPriority := p.innerPools[i] options := p.rebalanceParams var wg sync.WaitGroup for j, cli := range nodesByPriority.clients { wg.Add(1) go func(j int, cli client) { defer wg.Done() tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) defer c() changed, err := cli.redialIfNecessary(tctx) healthy := err == nil if changed { fields := []zap.Field{zap.String("address", cli.endpoint()), zap.Bool("healthy", healthy)} if err != nil { fields = append(fields, zap.Error(err)) } p.log(zap.DebugLevel, "tree health has changed", fields...) } else if err != nil { p.log(zap.DebugLevel, "tree redial error", zap.String("address", cli.endpoint()), zap.Error(err)) } buffer[j] = healthy }(j, cli) } wg.Wait() } func (p *Pool) getStartIndices() (int, int) { p.startIndicesMtx.RLock() defer p.startIndicesMtx.RUnlock() return p.startIndices[0], p.startIndices[1] } func (p *Pool) setStartIndices(i, j int) { p.startIndicesMtx.Lock() p.startIndices[0] = i p.startIndices[1] = j p.startIndicesMtx.Unlock() } func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error { var ( err error cl grpcService.TreeServiceClient ) startI, startJ := p.getStartIndices() groupsLen := len(p.innerPools) for i := startI; i < startI+groupsLen; i++ { indexI := i % groupsLen clientsLen := len(p.innerPools[i].clients) for j := startJ; j < startJ+clientsLen; j++ { indexJ := j % clientsLen if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil { err = fn(cl) } if !shouldTryAgain(err) { if startI != indexI || startJ != indexJ { p.setStartIndices(indexI, indexJ) } return err } p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) } startJ = 0 } return err } func shouldTryAgain(err error) bool { return !(err == nil || errors.Is(err, ErrNodeNotFound) || errors.Is(err, ErrNodeAccessDenied)) }