From 51e022ab8c255ebf8f3e6582450d1cfa20ca2e65 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 6 Jun 2023 09:29:23 +0300 Subject: [PATCH] [#73] pool/tree: Add tree pool Signed-off-by: Denis Kirillov --- go.mod | 13 +- go.sum | 1 - pool/tree/client.go | 111 ++++++ pool/tree/pool.go | 745 ++++++++++++++++++++++++++++++++++++ pool/tree/pool_signature.go | 25 ++ 5 files changed, 884 insertions(+), 11 deletions(-) create mode 100644 pool/tree/client.go create mode 100644 pool/tree/pool.go create mode 100644 pool/tree/pool_signature.go diff --git a/go.mod b/go.mod index d3d7721..89f6fe3 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230531114046-62edd68f47ac git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb + git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/tzhash v1.8.0 github.com/antlr4-go/antlr/v4 v4.13.0 @@ -15,11 +16,12 @@ require ( github.com/stretchr/testify v1.8.3 go.uber.org/zap v1.24.0 google.golang.org/grpc v1.55.0 + google.golang.org/protobuf v1.30.0 ) require ( - git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect + github.com/benbjohnson/clock v1.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/golang/protobuf v1.5.3 // indirect @@ -30,14 +32,6 @@ require ( github.com/nspcc-dev/rfc6979 v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/twmb/murmur3 v1.1.8 // indirect - go.opentelemetry.io/otel v1.15.1 // indirect - go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.15.1 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.15.1 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.15.1 // indirect - go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.15.1 // indirect - go.opentelemetry.io/otel/sdk v1.15.1 // indirect - go.opentelemetry.io/otel/trace v1.15.1 // indirect - go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/goleak v1.2.1 // indirect go.uber.org/multierr v1.11.0 // indirect @@ -48,6 +42,5 @@ require ( golang.org/x/sys v0.8.0 // indirect golang.org/x/text v0.9.0 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect - google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 1d4eb8e..a409e2f 100644 --- a/go.sum +++ b/go.sum @@ -335,7 +335,6 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/pool/tree/client.go b/pool/tree/client.go new file mode 100644 index 0000000..cbf8d53 --- /dev/null +++ b/pool/tree/client.go @@ -0,0 +1,111 @@ +package tree + +import ( + "context" + "fmt" + "sync" + + grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service" + "google.golang.org/grpc" +) + +type treeClient struct { + mu sync.RWMutex + address string + opts []grpc.DialOption + conn *grpc.ClientConn + service grpcService.TreeServiceClient + healthy bool +} + +// newTreeClient creates new tree client with auto dial. +func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient { + return &treeClient{ + address: addr, + opts: opts, + } +} + +func (c *treeClient) dial(ctx context.Context) error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn != nil { + return fmt.Errorf("couldn't dial '%s': connection already established", c.address) + } + + var err error + c.conn, err = grpc.DialContext(ctx, c.address, c.opts...) + if err != nil { + return fmt.Errorf("grpc dial node tree service: %w", err) + } + + c.service = grpcService.NewTreeServiceClient(c.conn) + if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil { + return fmt.Errorf("healthcheck tree service: %w", err) + } + + c.healthy = true + + return nil +} + +func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bool, err error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn == nil { + c.conn, err = grpc.DialContext(ctx, c.address, c.opts...) + if err != nil { + return false, fmt.Errorf("grpc dial node tree service: %w", err) + } + + c.service = grpcService.NewTreeServiceClient(c.conn) + } + + wasHealthy := c.healthy + if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil { + c.healthy = false + return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err) + } + + return !wasHealthy, nil +} + +func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + if c.conn == nil || !c.healthy { + return nil, fmt.Errorf("unhealthy endpoint: '%s'", c.address) + } + + return c.service, nil +} + +func (c *treeClient) endpoint() string { + return c.address +} + +func (c *treeClient) isHealthy() bool { + c.mu.RLock() + defer c.mu.RUnlock() + return c.healthy +} + +func (c *treeClient) setHealthy(val bool) { + c.mu.Lock() + defer c.mu.Unlock() + c.healthy = val +} + +func (c *treeClient) close() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn == nil { + return nil + } + + return c.conn.Close() +} diff --git a/pool/tree/pool.go b/pool/tree/pool.go new file mode 100644 index 0000000..3c6cdff --- /dev/null +++ b/pool/tree/pool.go @@ -0,0 +1,745 @@ +package tree + +import ( + "context" + "errors" + "fmt" + "io" + "sort" + "strings" + "sync" + "time" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" + grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + defaultRebalanceInterval = 15 * time.Second + defaultHealthcheckTimeout = 4 * time.Second + defaultDialTimeout = 5 * time.Second + defaultStreamTimeout = 10 * time.Second +) + +var ( + // ErrNodeNotFound is returned from Tree service in case of not found error. + ErrNodeNotFound = errors.New("not found") + + // ErrNodeAccessDenied is returned from Tree service in case of access denied error. + ErrNodeAccessDenied = errors.New("access denied") +) + +// client represents virtual connection to the single FrostFS tree service from which Pool is formed. +// This interface is expected to have exactly one production implementation - treeClient. +// Others are expected to be for test purposes only. +type client interface { + serviceClient() (grpcService.TreeServiceClient, error) + endpoint() string + isHealthy() bool + setHealthy(bool) + dial(ctx context.Context) error + redialIfNecessary(context.Context) (bool, error) + close() error +} + +// InitParameters contains values used to initialize connection Pool. +type InitParameters struct { + key *keys.PrivateKey + logger *zap.Logger + nodeDialTimeout time.Duration + nodeStreamTimeout time.Duration + healthcheckTimeout time.Duration + clientRebalanceInterval time.Duration + nodeParams []pool.NodeParam +} + +// Pool represents virtual connection to the FrostFS tree services network to communicate +// with multiple FrostFS tree services without thinking about switching between servers +// due to their unavailability. +// +// Pool can be created and initialized using NewPool function. +// Before executing the FrostFS tree operations using the Pool, connection to the +// servers MUST BE correctly established (see Dial method). +type Pool struct { + innerPools []*innerPool + key *keys.PrivateKey + cancel context.CancelFunc + closedCh chan struct{} + rebalanceParams rebalanceParameters + logger *zap.Logger + + startIndicesMtx sync.RWMutex + // startIndices points to the client from which the next request will be executed. + // Since clients are stored in innerPool field we have to use two indices. + // These indices being changed during: + // * rebalance procedure (see Pool.startRebalance) + // * retry in case of request failure (see Pool.requestWithRetry) + startIndices [2]int +} + +type innerPool struct { + clients []client +} + +type rebalanceParameters struct { + nodesGroup [][]pool.NodeParam + nodeRequestTimeout time.Duration + clientRebalanceInterval time.Duration +} + +// GetNodesParams groups parameters of Pool.GetNodes operation. +type GetNodesParams struct { + CID cid.ID + TreeID string + Path []string + Meta []string + PathAttribute string + LatestOnly bool + AllAttrs bool + BearerToken []byte +} + +// GetSubTreeParams groups parameters of Pool.GetSubTree operation. +type GetSubTreeParams struct { + CID cid.ID + TreeID string + RootID uint64 + Depth uint32 + BearerToken []byte +} + +// AddNodeParams groups parameters of Pool.AddNode operation. +type AddNodeParams struct { + CID cid.ID + TreeID string + Parent uint64 + Meta map[string]string + BearerToken []byte +} + +// AddNodeByPathParams groups parameters of Pool.AddNodeByPath operation. +type AddNodeByPathParams struct { + CID cid.ID + TreeID string + Path []string + Meta map[string]string + PathAttribute string + BearerToken []byte +} + +// MoveNodeParams groups parameters of Pool.MoveNode operation. +type MoveNodeParams struct { + CID cid.ID + TreeID string + NodeID uint64 + ParentID uint64 + Meta map[string]string + BearerToken []byte +} + +// RemoveNodeParams groups parameters of Pool.RemoveNode operation. +type RemoveNodeParams struct { + CID cid.ID + TreeID string + NodeID uint64 + BearerToken []byte +} + +// NewPool creates connection pool using parameters. +func NewPool(options InitParameters) (*Pool, error) { + if options.key == nil { + return nil, fmt.Errorf("missed required parameter 'Key'") + } + + nodesParams, err := adjustNodeParams(options.nodeParams) + if err != nil { + return nil, err + } + + fillDefaultInitParams(&options) + + p := &Pool{ + key: options.key, + logger: options.logger, + rebalanceParams: rebalanceParameters{ + nodesGroup: nodesParams, + nodeRequestTimeout: options.healthcheckTimeout, + clientRebalanceInterval: options.clientRebalanceInterval, + }, + } + + return p, nil +} + +// Dial establishes a connection to the tree servers from the FrostFS network. +// It also starts a routine that checks the health of the nodes and +// updates the weights of the nodes for balancing. +// Returns an error describing failure reason. +// +// If failed, the Pool SHOULD NOT be used. +// +// See also InitParameters.SetClientRebalanceInterval. +func (p *Pool) Dial(ctx context.Context) error { + inner := make([]*innerPool, len(p.rebalanceParams.nodesGroup)) + var atLeastOneHealthy bool + + for i, nodes := range p.rebalanceParams.nodesGroup { + clients := make([]client, len(nodes)) + for j, node := range nodes { + clients[j] = newTreeClient(node.Address(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err := clients[j].dial(ctx); err != nil { + p.log(zap.WarnLevel, "failed to build client", zap.String("address", node.Address()), zap.Error(err)) + continue + } + + atLeastOneHealthy = true + } + + inner[i] = &innerPool{ + clients: clients, + } + } + + if !atLeastOneHealthy { + return fmt.Errorf("at least one node must be healthy") + } + + ctx, cancel := context.WithCancel(ctx) + p.cancel = cancel + p.closedCh = make(chan struct{}) + p.innerPools = inner + + go p.startRebalance(ctx) + return nil +} + +// SetKey specifies default key to be used for the protocol communication by default. +func (x *InitParameters) SetKey(key *keys.PrivateKey) { + x.key = key +} + +// SetLogger specifies logger. +func (x *InitParameters) SetLogger(logger *zap.Logger) { + x.logger = logger +} + +// SetNodeDialTimeout specifies the timeout for connection to be established. +func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) { + x.nodeDialTimeout = timeout +} + +// SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC. +func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) { + x.nodeStreamTimeout = timeout +} + +// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive. +// +// See also Pool.Dial. +func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) { + x.healthcheckTimeout = timeout +} + +// SetClientRebalanceInterval specifies the interval for updating nodes health status. +// +// See also Pool.Dial. +func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) { + x.clientRebalanceInterval = interval +} + +// AddNode append information about the node to which you want to connect. +func (x *InitParameters) AddNode(nodeParam pool.NodeParam) { + x.nodeParams = append(x.nodeParams, nodeParam) +} + +// GetNodes invokes eponymous method from TreeServiceClient. +// +// Can return predefined errors: +// * ErrNodeNotFound +// * ErrNodeAccessDenied. +func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) { + request := &grpcService.GetNodeByPathRequest{ + Body: &grpcService.GetNodeByPathRequest_Body{ + ContainerId: prm.CID[:], + TreeId: prm.TreeID, + Path: prm.Path, + Attributes: prm.Meta, + PathAttribute: prm.PathAttribute, + LatestOnly: prm.LatestOnly, + AllAttributes: prm.AllAttrs, + BearerToken: prm.BearerToken, + }, + } + + if err := p.signRequest(request.Body, func(key, sign []byte) { + request.Signature = &grpcService.Signature{ + Key: key, + Sign: sign, + } + }); err != nil { + return nil, err + } + + var resp *grpcService.GetNodeByPathResponse + if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { + resp, inErr = client.GetNodeByPath(ctx, request) + return handleError("failed to get node by path", inErr) + }); err != nil { + return nil, err + } + + return resp.GetBody().GetNodes(), nil +} + +// SubTreeReader is designed to read list of subtree nodes FrostFS tree service. +// +// Must be initialized using Pool.GetSubTree, any other usage is unsafe. +type SubTreeReader struct { + cli grpcService.TreeService_GetSubTreeClient +} + +// Read reads another list of the subtree nodes. +func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, error) { + for i := 0; i < len(buf); i++ { + resp, err := x.cli.Recv() + if err == io.EOF { + return i, io.EOF + } else if err != nil { + return i, handleError("failed to get sub tree", err) + } + buf[i] = resp.Body + } + + return len(buf), nil +} + +// ReadAll reads all nodes subtree nodes. +func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) { + var res []*grpcService.GetSubTreeResponse_Body + for { + resp, err := x.cli.Recv() + if err == io.EOF { + break + } else if err != nil { + return nil, handleError("failed to get sub tree", err) + } + res = append(res, resp.Body) + } + + return res, nil +} + +// Next gets the next node from subtree. +func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) { + resp, err := x.cli.Recv() + if err == io.EOF { + return nil, io.EOF + } + if err != nil { + return nil, handleError("failed to get sub tree", err) + } + + return resp.Body, nil +} + +// GetSubTree invokes eponymous method from TreeServiceClient. +// +// Can return predefined errors: +// * ErrNodeNotFound +// * ErrNodeAccessDenied. +func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) { + request := &grpcService.GetSubTreeRequest{ + Body: &grpcService.GetSubTreeRequest_Body{ + ContainerId: prm.CID[:], + TreeId: prm.TreeID, + RootId: prm.RootID, + Depth: prm.Depth, + BearerToken: prm.BearerToken, + }, + } + + if err := p.signRequest(request.Body, func(key, sign []byte) { + request.Signature = &grpcService.Signature{ + Key: key, + Sign: sign, + } + }); err != nil { + return nil, err + } + + var cli grpcService.TreeService_GetSubTreeClient + if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { + cli, inErr = client.GetSubTree(ctx, request) + return handleError("failed to get sub tree client", inErr) + }); err != nil { + return nil, err + } + + return &SubTreeReader{cli: cli}, nil +} + +// AddNode invokes eponymous method from TreeServiceClient. +// +// Can return predefined errors: +// * ErrNodeNotFound +// * ErrNodeAccessDenied. +func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { + request := &grpcService.AddRequest{ + Body: &grpcService.AddRequest_Body{ + ContainerId: prm.CID[:], + TreeId: prm.TreeID, + ParentId: prm.Parent, + Meta: metaToKV(prm.Meta), + BearerToken: prm.BearerToken, + }, + } + if err := p.signRequest(request.Body, func(key, sign []byte) { + request.Signature = &grpcService.Signature{ + Key: key, + Sign: sign, + } + }); err != nil { + return 0, err + } + + var resp *grpcService.AddResponse + if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { + resp, inErr = client.Add(ctx, request) + return handleError("failed to add node", inErr) + }); err != nil { + return 0, err + } + + return resp.GetBody().GetNodeId(), nil +} + +// AddNodeByPath invokes eponymous method from TreeServiceClient. +// +// Can return predefined errors: +// * ErrNodeNotFound +// * ErrNodeAccessDenied. +func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) { + request := &grpcService.AddByPathRequest{ + Body: &grpcService.AddByPathRequest_Body{ + ContainerId: prm.CID[:], + TreeId: prm.TreeID, + Path: prm.Path, + Meta: metaToKV(prm.Meta), + PathAttribute: prm.PathAttribute, + BearerToken: prm.BearerToken, + }, + } + + if err := p.signRequest(request.Body, func(key, sign []byte) { + request.Signature = &grpcService.Signature{ + Key: key, + Sign: sign, + } + }); err != nil { + return 0, err + } + + var resp *grpcService.AddByPathResponse + if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { + resp, inErr = client.AddByPath(ctx, request) + return handleError("failed to add node by path", inErr) + }); err != nil { + return 0, err + } + + body := resp.GetBody() + if body == nil { + return 0, errors.New("nil body in tree service response") + } else if len(body.Nodes) == 0 { + return 0, errors.New("empty list of added nodes in tree service response") + } + + // The first node is the leaf that we add, according to tree service docs. + return body.Nodes[0], nil +} + +// MoveNode invokes eponymous method from TreeServiceClient. +// +// Can return predefined errors: +// * ErrNodeNotFound +// * ErrNodeAccessDenied. +func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { + request := &grpcService.MoveRequest{ + Body: &grpcService.MoveRequest_Body{ + ContainerId: prm.CID[:], + TreeId: prm.TreeID, + NodeId: prm.NodeID, + ParentId: prm.ParentID, + Meta: metaToKV(prm.Meta), + BearerToken: prm.BearerToken, + }, + } + + if err := p.signRequest(request.Body, func(key, sign []byte) { + request.Signature = &grpcService.Signature{ + Key: key, + Sign: sign, + } + }); err != nil { + return err + } + + return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { + if _, err := client.Move(ctx, request); err != nil { + return handleError("failed to move node", err) + } + return nil + }) +} + +// RemoveNode invokes eponymous method from TreeServiceClient. +// +// Can return predefined errors: +// * ErrNodeNotFound +// * ErrNodeAccessDenied. +func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error { + request := &grpcService.RemoveRequest{ + Body: &grpcService.RemoveRequest_Body{ + ContainerId: prm.CID[:], + TreeId: prm.TreeID, + NodeId: prm.NodeID, + BearerToken: prm.BearerToken, + }, + } + if err := p.signRequest(request.Body, func(key, sign []byte) { + request.Signature = &grpcService.Signature{ + Key: key, + Sign: sign, + } + }); err != nil { + return err + } + + return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { + if _, err := client.Remove(ctx, request); err != nil { + return handleError("failed to remove node", err) + } + return nil + }) +} + +// Close closes the Pool and releases all the associated resources. +func (p *Pool) Close() error { + p.cancel() + <-p.closedCh + + var err error + for _, group := range p.innerPools { + for _, cl := range group.clients { + if closeErr := cl.close(); closeErr != nil { + p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr)) + err = closeErr + } + } + } + + return err +} + +func handleError(msg string, err error) error { + if err == nil { + return nil + } + if strings.Contains(err.Error(), "not found") { + return fmt.Errorf("%w: %s", ErrNodeNotFound, err.Error()) + } else if strings.Contains(err.Error(), "is denied by") { + return fmt.Errorf("%w: %s", ErrNodeAccessDenied, err.Error()) + } + return fmt.Errorf("%s: %w", msg, err) +} + +func metaToKV(meta map[string]string) []*grpcService.KeyValue { + result := make([]*grpcService.KeyValue, 0, len(meta)) + + for key, value := range meta { + result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)}) + } + + return result +} + +func adjustNodeParams(nodeParams []pool.NodeParam) ([][]pool.NodeParam, error) { + if len(nodeParams) == 0 { + return nil, errors.New("no FrostFS peers configured") + } + + nodeParamsMap := make(map[int][]pool.NodeParam) + for _, param := range nodeParams { + nodes := nodeParamsMap[param.Priority()] + nodeParamsMap[param.Priority()] = append(nodes, param) + } + + res := make([][]pool.NodeParam, 0, len(nodeParamsMap)) + for _, nodes := range nodeParamsMap { + res = append(res, nodes) + } + + sort.Slice(res, func(i, j int) bool { + return res[i][0].Priority() < res[j][0].Priority() + }) + + return res, nil +} + +func fillDefaultInitParams(params *InitParameters) { + if params.clientRebalanceInterval <= 0 { + params.clientRebalanceInterval = defaultRebalanceInterval + } + + if params.healthcheckTimeout <= 0 { + params.healthcheckTimeout = defaultHealthcheckTimeout + } + + if params.nodeDialTimeout <= 0 { + params.nodeDialTimeout = defaultDialTimeout + } + + if params.nodeStreamTimeout <= 0 { + params.nodeStreamTimeout = defaultStreamTimeout + } +} + +func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) { + if p.logger == nil { + return + } + + p.logger.Log(level, msg, fields...) +} + +// startRebalance runs loop to monitor tree client healthy status. +func (p *Pool) startRebalance(ctx context.Context) { + ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval) + buffers := make([][]bool, len(p.rebalanceParams.nodesGroup)) + for i, nodes := range p.rebalanceParams.nodesGroup { + buffers[i] = make([]bool, len(nodes)) + } + + for { + select { + case <-ctx.Done(): + close(p.closedCh) + return + case <-ticker.C: + p.updateNodesHealth(ctx, buffers) + ticker.Reset(p.rebalanceParams.clientRebalanceInterval) + } + } +} + +func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) { + wg := sync.WaitGroup{} + for i, inner := range p.innerPools { + wg.Add(1) + + go func(i int, innerPool *innerPool) { + defer wg.Done() + p.updateInnerNodesHealth(ctx, i, buffers[i]) + }(i, inner) + } + wg.Wait() + +LOOP: + for i, buffer := range buffers { + for j, healthy := range buffer { + if healthy { + p.setStartIndices(i, j) + break LOOP + } + } + } +} + +func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, buffer []bool) { + if i > len(p.innerPools)-1 { + return + } + nodesByPriority := p.innerPools[i] + options := p.rebalanceParams + + var wg sync.WaitGroup + for j, cli := range nodesByPriority.clients { + wg.Add(1) + go func(j int, cli client) { + defer wg.Done() + + tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) + defer c() + + changed, err := cli.redialIfNecessary(tctx) + healthy := err == nil + if changed { + fields := []zap.Field{zap.String("address", cli.endpoint()), zap.Bool("healthy", healthy)} + if err != nil { + fields = append(fields, zap.Error(err)) + } + p.log(zap.DebugLevel, "tree health has changed", fields...) + } else if err != nil { + p.log(zap.DebugLevel, "tree redial error", zap.String("address", cli.endpoint()), zap.Error(err)) + } + buffer[j] = healthy + }(j, cli) + } + wg.Wait() +} + +func (p *Pool) getStartIndices() (int, int) { + p.startIndicesMtx.RLock() + defer p.startIndicesMtx.RUnlock() + + return p.startIndices[0], p.startIndices[1] +} + +func (p *Pool) setStartIndices(i, j int) { + p.startIndicesMtx.Lock() + p.startIndices[0] = i + p.startIndices[1] = j + p.startIndicesMtx.Unlock() +} + +func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error { + var ( + err error + cl grpcService.TreeServiceClient + ) + + startI, startJ := p.getStartIndices() + groupsLen := len(p.innerPools) + for i := startI; i < startI+groupsLen; i++ { + indexI := i % groupsLen + clientsLen := len(p.innerPools[i].clients) + for j := startJ; j < startJ+clientsLen; j++ { + indexJ := j % clientsLen + if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil { + err = fn(cl) + } + if !shouldTryAgain(err) { + if startI != indexI || startJ != indexJ { + p.setStartIndices(indexI, indexJ) + } + return err + } + p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) + } + startJ = 0 + } + + return err +} + +func shouldTryAgain(err error) bool { + return !(err == nil || + errors.Is(err, ErrNodeNotFound) || + errors.Is(err, ErrNodeAccessDenied)) +} diff --git a/pool/tree/pool_signature.go b/pool/tree/pool_signature.go new file mode 100644 index 0000000..0b3a2f6 --- /dev/null +++ b/pool/tree/pool_signature.go @@ -0,0 +1,25 @@ +package tree + +import ( + crypto "git.frostfs.info/TrueCloudLab/frostfs-crypto" + "google.golang.org/protobuf/proto" +) + +func (p *Pool) signData(buf []byte, f func(key, sign []byte)) error { + sign, err := crypto.Sign(&p.key.PrivateKey, buf) + if err != nil { + return err + } + + f(p.key.PublicKey().Bytes(), sign) + return nil +} + +func (p *Pool) signRequest(requestBody proto.Message, f func(key, sign []byte)) error { + buf, err := proto.Marshal(requestBody) + if err != nil { + return err + } + + return p.signData(buf, f) +}