[#73] pool/tree: Add tree pool
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
0d3dacb515
commit
51e022ab8c
5 changed files with 884 additions and 10 deletions
13
go.mod
13
go.mod
|
@ -5,6 +5,7 @@ go 1.19
|
||||||
require (
|
require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230531114046-62edd68f47ac
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230531114046-62edd68f47ac
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb
|
||||||
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||||
github.com/antlr4-go/antlr/v4 v4.13.0
|
github.com/antlr4-go/antlr/v4 v4.13.0
|
||||||
|
@ -15,11 +16,12 @@ require (
|
||||||
github.com/stretchr/testify v1.8.3
|
github.com/stretchr/testify v1.8.3
|
||||||
go.uber.org/zap v1.24.0
|
go.uber.org/zap v1.24.0
|
||||||
google.golang.org/grpc v1.55.0
|
google.golang.org/grpc v1.55.0
|
||||||
|
google.golang.org/protobuf v1.30.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
|
|
||||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
||||||
|
github.com/benbjohnson/clock v1.1.0 // indirect
|
||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||||
github.com/golang/protobuf v1.5.3 // indirect
|
github.com/golang/protobuf v1.5.3 // indirect
|
||||||
|
@ -30,14 +32,6 @@ require (
|
||||||
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
|
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
github.com/twmb/murmur3 v1.1.8 // indirect
|
github.com/twmb/murmur3 v1.1.8 // indirect
|
||||||
go.opentelemetry.io/otel v1.15.1 // indirect
|
|
||||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.15.1 // indirect
|
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.15.1 // indirect
|
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.15.1 // indirect
|
|
||||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.15.1 // indirect
|
|
||||||
go.opentelemetry.io/otel/sdk v1.15.1 // indirect
|
|
||||||
go.opentelemetry.io/otel/trace v1.15.1 // indirect
|
|
||||||
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
|
|
||||||
go.uber.org/atomic v1.9.0 // indirect
|
go.uber.org/atomic v1.9.0 // indirect
|
||||||
go.uber.org/goleak v1.2.1 // indirect
|
go.uber.org/goleak v1.2.1 // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
|
@ -48,6 +42,5 @@ require (
|
||||||
golang.org/x/sys v0.8.0 // indirect
|
golang.org/x/sys v0.8.0 // indirect
|
||||||
golang.org/x/text v0.9.0 // indirect
|
golang.org/x/text v0.9.0 // indirect
|
||||||
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
|
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
|
||||||
google.golang.org/protobuf v1.30.0 // indirect
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
111
pool/tree/client.go
Normal file
111
pool/tree/client.go
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type treeClient struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
address string
|
||||||
|
opts []grpc.DialOption
|
||||||
|
conn *grpc.ClientConn
|
||||||
|
service grpcService.TreeServiceClient
|
||||||
|
healthy bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// newTreeClient creates new tree client with auto dial.
|
||||||
|
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
|
||||||
|
return &treeClient{
|
||||||
|
address: addr,
|
||||||
|
opts: opts,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *treeClient) dial(ctx context.Context) error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if c.conn != nil {
|
||||||
|
return fmt.Errorf("couldn't dial '%s': connection already established", c.address)
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
c.conn, err = grpc.DialContext(ctx, c.address, c.opts...)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("grpc dial node tree service: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.service = grpcService.NewTreeServiceClient(c.conn)
|
||||||
|
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
||||||
|
return fmt.Errorf("healthcheck tree service: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.healthy = true
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bool, err error) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if c.conn == nil {
|
||||||
|
c.conn, err = grpc.DialContext(ctx, c.address, c.opts...)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("grpc dial node tree service: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.service = grpcService.NewTreeServiceClient(c.conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
wasHealthy := c.healthy
|
||||||
|
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
||||||
|
c.healthy = false
|
||||||
|
return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return !wasHealthy, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
|
||||||
|
if c.conn == nil || !c.healthy {
|
||||||
|
return nil, fmt.Errorf("unhealthy endpoint: '%s'", c.address)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.service, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *treeClient) endpoint() string {
|
||||||
|
return c.address
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *treeClient) isHealthy() bool {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
return c.healthy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *treeClient) setHealthy(val bool) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
c.healthy = val
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *treeClient) close() error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if c.conn == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.conn.Close()
|
||||||
|
}
|
745
pool/tree/pool.go
Normal file
745
pool/tree/pool.go
Normal file
|
@ -0,0 +1,745 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"go.uber.org/zap/zapcore"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultRebalanceInterval = 15 * time.Second
|
||||||
|
defaultHealthcheckTimeout = 4 * time.Second
|
||||||
|
defaultDialTimeout = 5 * time.Second
|
||||||
|
defaultStreamTimeout = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrNodeNotFound is returned from Tree service in case of not found error.
|
||||||
|
ErrNodeNotFound = errors.New("not found")
|
||||||
|
|
||||||
|
// ErrNodeAccessDenied is returned from Tree service in case of access denied error.
|
||||||
|
ErrNodeAccessDenied = errors.New("access denied")
|
||||||
|
)
|
||||||
|
|
||||||
|
// client represents virtual connection to the single FrostFS tree service from which Pool is formed.
|
||||||
|
// This interface is expected to have exactly one production implementation - treeClient.
|
||||||
|
// Others are expected to be for test purposes only.
|
||||||
|
type client interface {
|
||||||
|
serviceClient() (grpcService.TreeServiceClient, error)
|
||||||
|
endpoint() string
|
||||||
|
isHealthy() bool
|
||||||
|
setHealthy(bool)
|
||||||
|
dial(ctx context.Context) error
|
||||||
|
redialIfNecessary(context.Context) (bool, error)
|
||||||
|
close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitParameters contains values used to initialize connection Pool.
|
||||||
|
type InitParameters struct {
|
||||||
|
key *keys.PrivateKey
|
||||||
|
logger *zap.Logger
|
||||||
|
nodeDialTimeout time.Duration
|
||||||
|
nodeStreamTimeout time.Duration
|
||||||
|
healthcheckTimeout time.Duration
|
||||||
|
clientRebalanceInterval time.Duration
|
||||||
|
nodeParams []pool.NodeParam
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pool represents virtual connection to the FrostFS tree services network to communicate
|
||||||
|
// with multiple FrostFS tree services without thinking about switching between servers
|
||||||
|
// due to their unavailability.
|
||||||
|
//
|
||||||
|
// Pool can be created and initialized using NewPool function.
|
||||||
|
// Before executing the FrostFS tree operations using the Pool, connection to the
|
||||||
|
// servers MUST BE correctly established (see Dial method).
|
||||||
|
type Pool struct {
|
||||||
|
innerPools []*innerPool
|
||||||
|
key *keys.PrivateKey
|
||||||
|
cancel context.CancelFunc
|
||||||
|
closedCh chan struct{}
|
||||||
|
rebalanceParams rebalanceParameters
|
||||||
|
logger *zap.Logger
|
||||||
|
|
||||||
|
startIndicesMtx sync.RWMutex
|
||||||
|
// startIndices points to the client from which the next request will be executed.
|
||||||
|
// Since clients are stored in innerPool field we have to use two indices.
|
||||||
|
// These indices being changed during:
|
||||||
|
// * rebalance procedure (see Pool.startRebalance)
|
||||||
|
// * retry in case of request failure (see Pool.requestWithRetry)
|
||||||
|
startIndices [2]int
|
||||||
|
}
|
||||||
|
|
||||||
|
type innerPool struct {
|
||||||
|
clients []client
|
||||||
|
}
|
||||||
|
|
||||||
|
type rebalanceParameters struct {
|
||||||
|
nodesGroup [][]pool.NodeParam
|
||||||
|
nodeRequestTimeout time.Duration
|
||||||
|
clientRebalanceInterval time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetNodesParams groups parameters of Pool.GetNodes operation.
|
||||||
|
type GetNodesParams struct {
|
||||||
|
CID cid.ID
|
||||||
|
TreeID string
|
||||||
|
Path []string
|
||||||
|
Meta []string
|
||||||
|
PathAttribute string
|
||||||
|
LatestOnly bool
|
||||||
|
AllAttrs bool
|
||||||
|
BearerToken []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSubTreeParams groups parameters of Pool.GetSubTree operation.
|
||||||
|
type GetSubTreeParams struct {
|
||||||
|
CID cid.ID
|
||||||
|
TreeID string
|
||||||
|
RootID uint64
|
||||||
|
Depth uint32
|
||||||
|
BearerToken []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddNodeParams groups parameters of Pool.AddNode operation.
|
||||||
|
type AddNodeParams struct {
|
||||||
|
CID cid.ID
|
||||||
|
TreeID string
|
||||||
|
Parent uint64
|
||||||
|
Meta map[string]string
|
||||||
|
BearerToken []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddNodeByPathParams groups parameters of Pool.AddNodeByPath operation.
|
||||||
|
type AddNodeByPathParams struct {
|
||||||
|
CID cid.ID
|
||||||
|
TreeID string
|
||||||
|
Path []string
|
||||||
|
Meta map[string]string
|
||||||
|
PathAttribute string
|
||||||
|
BearerToken []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// MoveNodeParams groups parameters of Pool.MoveNode operation.
|
||||||
|
type MoveNodeParams struct {
|
||||||
|
CID cid.ID
|
||||||
|
TreeID string
|
||||||
|
NodeID uint64
|
||||||
|
ParentID uint64
|
||||||
|
Meta map[string]string
|
||||||
|
BearerToken []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveNodeParams groups parameters of Pool.RemoveNode operation.
|
||||||
|
type RemoveNodeParams struct {
|
||||||
|
CID cid.ID
|
||||||
|
TreeID string
|
||||||
|
NodeID uint64
|
||||||
|
BearerToken []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPool creates connection pool using parameters.
|
||||||
|
func NewPool(options InitParameters) (*Pool, error) {
|
||||||
|
if options.key == nil {
|
||||||
|
return nil, fmt.Errorf("missed required parameter 'Key'")
|
||||||
|
}
|
||||||
|
|
||||||
|
nodesParams, err := adjustNodeParams(options.nodeParams)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
fillDefaultInitParams(&options)
|
||||||
|
|
||||||
|
p := &Pool{
|
||||||
|
key: options.key,
|
||||||
|
logger: options.logger,
|
||||||
|
rebalanceParams: rebalanceParameters{
|
||||||
|
nodesGroup: nodesParams,
|
||||||
|
nodeRequestTimeout: options.healthcheckTimeout,
|
||||||
|
clientRebalanceInterval: options.clientRebalanceInterval,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dial establishes a connection to the tree servers from the FrostFS network.
|
||||||
|
// It also starts a routine that checks the health of the nodes and
|
||||||
|
// updates the weights of the nodes for balancing.
|
||||||
|
// Returns an error describing failure reason.
|
||||||
|
//
|
||||||
|
// If failed, the Pool SHOULD NOT be used.
|
||||||
|
//
|
||||||
|
// See also InitParameters.SetClientRebalanceInterval.
|
||||||
|
func (p *Pool) Dial(ctx context.Context) error {
|
||||||
|
inner := make([]*innerPool, len(p.rebalanceParams.nodesGroup))
|
||||||
|
var atLeastOneHealthy bool
|
||||||
|
|
||||||
|
for i, nodes := range p.rebalanceParams.nodesGroup {
|
||||||
|
clients := make([]client, len(nodes))
|
||||||
|
for j, node := range nodes {
|
||||||
|
clients[j] = newTreeClient(node.Address(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
if err := clients[j].dial(ctx); err != nil {
|
||||||
|
p.log(zap.WarnLevel, "failed to build client", zap.String("address", node.Address()), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
atLeastOneHealthy = true
|
||||||
|
}
|
||||||
|
|
||||||
|
inner[i] = &innerPool{
|
||||||
|
clients: clients,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !atLeastOneHealthy {
|
||||||
|
return fmt.Errorf("at least one node must be healthy")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
p.cancel = cancel
|
||||||
|
p.closedCh = make(chan struct{})
|
||||||
|
p.innerPools = inner
|
||||||
|
|
||||||
|
go p.startRebalance(ctx)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetKey specifies default key to be used for the protocol communication by default.
|
||||||
|
func (x *InitParameters) SetKey(key *keys.PrivateKey) {
|
||||||
|
x.key = key
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLogger specifies logger.
|
||||||
|
func (x *InitParameters) SetLogger(logger *zap.Logger) {
|
||||||
|
x.logger = logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetNodeDialTimeout specifies the timeout for connection to be established.
|
||||||
|
func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) {
|
||||||
|
x.nodeDialTimeout = timeout
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC.
|
||||||
|
func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) {
|
||||||
|
x.nodeStreamTimeout = timeout
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.
|
||||||
|
//
|
||||||
|
// See also Pool.Dial.
|
||||||
|
func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
|
||||||
|
x.healthcheckTimeout = timeout
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetClientRebalanceInterval specifies the interval for updating nodes health status.
|
||||||
|
//
|
||||||
|
// See also Pool.Dial.
|
||||||
|
func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
|
||||||
|
x.clientRebalanceInterval = interval
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddNode append information about the node to which you want to connect.
|
||||||
|
func (x *InitParameters) AddNode(nodeParam pool.NodeParam) {
|
||||||
|
x.nodeParams = append(x.nodeParams, nodeParam)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetNodes invokes eponymous method from TreeServiceClient.
|
||||||
|
//
|
||||||
|
// Can return predefined errors:
|
||||||
|
// * ErrNodeNotFound
|
||||||
|
// * ErrNodeAccessDenied.
|
||||||
|
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) {
|
||||||
|
request := &grpcService.GetNodeByPathRequest{
|
||||||
|
Body: &grpcService.GetNodeByPathRequest_Body{
|
||||||
|
ContainerId: prm.CID[:],
|
||||||
|
TreeId: prm.TreeID,
|
||||||
|
Path: prm.Path,
|
||||||
|
Attributes: prm.Meta,
|
||||||
|
PathAttribute: prm.PathAttribute,
|
||||||
|
LatestOnly: prm.LatestOnly,
|
||||||
|
AllAttributes: prm.AllAttrs,
|
||||||
|
BearerToken: prm.BearerToken,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.signRequest(request.Body, func(key, sign []byte) {
|
||||||
|
request.Signature = &grpcService.Signature{
|
||||||
|
Key: key,
|
||||||
|
Sign: sign,
|
||||||
|
}
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var resp *grpcService.GetNodeByPathResponse
|
||||||
|
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
|
resp, inErr = client.GetNodeByPath(ctx, request)
|
||||||
|
return handleError("failed to get node by path", inErr)
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.GetBody().GetNodes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubTreeReader is designed to read list of subtree nodes FrostFS tree service.
|
||||||
|
//
|
||||||
|
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
|
||||||
|
type SubTreeReader struct {
|
||||||
|
cli grpcService.TreeService_GetSubTreeClient
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read reads another list of the subtree nodes.
|
||||||
|
func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, error) {
|
||||||
|
for i := 0; i < len(buf); i++ {
|
||||||
|
resp, err := x.cli.Recv()
|
||||||
|
if err == io.EOF {
|
||||||
|
return i, io.EOF
|
||||||
|
} else if err != nil {
|
||||||
|
return i, handleError("failed to get sub tree", err)
|
||||||
|
}
|
||||||
|
buf[i] = resp.Body
|
||||||
|
}
|
||||||
|
|
||||||
|
return len(buf), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadAll reads all nodes subtree nodes.
|
||||||
|
func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) {
|
||||||
|
var res []*grpcService.GetSubTreeResponse_Body
|
||||||
|
for {
|
||||||
|
resp, err := x.cli.Recv()
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
} else if err != nil {
|
||||||
|
return nil, handleError("failed to get sub tree", err)
|
||||||
|
}
|
||||||
|
res = append(res, resp.Body)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next gets the next node from subtree.
|
||||||
|
func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) {
|
||||||
|
resp, err := x.cli.Recv()
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil, io.EOF
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, handleError("failed to get sub tree", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.Body, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSubTree invokes eponymous method from TreeServiceClient.
|
||||||
|
//
|
||||||
|
// Can return predefined errors:
|
||||||
|
// * ErrNodeNotFound
|
||||||
|
// * ErrNodeAccessDenied.
|
||||||
|
func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) {
|
||||||
|
request := &grpcService.GetSubTreeRequest{
|
||||||
|
Body: &grpcService.GetSubTreeRequest_Body{
|
||||||
|
ContainerId: prm.CID[:],
|
||||||
|
TreeId: prm.TreeID,
|
||||||
|
RootId: prm.RootID,
|
||||||
|
Depth: prm.Depth,
|
||||||
|
BearerToken: prm.BearerToken,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.signRequest(request.Body, func(key, sign []byte) {
|
||||||
|
request.Signature = &grpcService.Signature{
|
||||||
|
Key: key,
|
||||||
|
Sign: sign,
|
||||||
|
}
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var cli grpcService.TreeService_GetSubTreeClient
|
||||||
|
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
|
cli, inErr = client.GetSubTree(ctx, request)
|
||||||
|
return handleError("failed to get sub tree client", inErr)
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &SubTreeReader{cli: cli}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddNode invokes eponymous method from TreeServiceClient.
|
||||||
|
//
|
||||||
|
// Can return predefined errors:
|
||||||
|
// * ErrNodeNotFound
|
||||||
|
// * ErrNodeAccessDenied.
|
||||||
|
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||||
|
request := &grpcService.AddRequest{
|
||||||
|
Body: &grpcService.AddRequest_Body{
|
||||||
|
ContainerId: prm.CID[:],
|
||||||
|
TreeId: prm.TreeID,
|
||||||
|
ParentId: prm.Parent,
|
||||||
|
Meta: metaToKV(prm.Meta),
|
||||||
|
BearerToken: prm.BearerToken,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := p.signRequest(request.Body, func(key, sign []byte) {
|
||||||
|
request.Signature = &grpcService.Signature{
|
||||||
|
Key: key,
|
||||||
|
Sign: sign,
|
||||||
|
}
|
||||||
|
}); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var resp *grpcService.AddResponse
|
||||||
|
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
|
resp, inErr = client.Add(ctx, request)
|
||||||
|
return handleError("failed to add node", inErr)
|
||||||
|
}); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.GetBody().GetNodeId(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddNodeByPath invokes eponymous method from TreeServiceClient.
|
||||||
|
//
|
||||||
|
// Can return predefined errors:
|
||||||
|
// * ErrNodeNotFound
|
||||||
|
// * ErrNodeAccessDenied.
|
||||||
|
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
|
||||||
|
request := &grpcService.AddByPathRequest{
|
||||||
|
Body: &grpcService.AddByPathRequest_Body{
|
||||||
|
ContainerId: prm.CID[:],
|
||||||
|
TreeId: prm.TreeID,
|
||||||
|
Path: prm.Path,
|
||||||
|
Meta: metaToKV(prm.Meta),
|
||||||
|
PathAttribute: prm.PathAttribute,
|
||||||
|
BearerToken: prm.BearerToken,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.signRequest(request.Body, func(key, sign []byte) {
|
||||||
|
request.Signature = &grpcService.Signature{
|
||||||
|
Key: key,
|
||||||
|
Sign: sign,
|
||||||
|
}
|
||||||
|
}); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var resp *grpcService.AddByPathResponse
|
||||||
|
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
|
resp, inErr = client.AddByPath(ctx, request)
|
||||||
|
return handleError("failed to add node by path", inErr)
|
||||||
|
}); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
body := resp.GetBody()
|
||||||
|
if body == nil {
|
||||||
|
return 0, errors.New("nil body in tree service response")
|
||||||
|
} else if len(body.Nodes) == 0 {
|
||||||
|
return 0, errors.New("empty list of added nodes in tree service response")
|
||||||
|
}
|
||||||
|
|
||||||
|
// The first node is the leaf that we add, according to tree service docs.
|
||||||
|
return body.Nodes[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MoveNode invokes eponymous method from TreeServiceClient.
|
||||||
|
//
|
||||||
|
// Can return predefined errors:
|
||||||
|
// * ErrNodeNotFound
|
||||||
|
// * ErrNodeAccessDenied.
|
||||||
|
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
||||||
|
request := &grpcService.MoveRequest{
|
||||||
|
Body: &grpcService.MoveRequest_Body{
|
||||||
|
ContainerId: prm.CID[:],
|
||||||
|
TreeId: prm.TreeID,
|
||||||
|
NodeId: prm.NodeID,
|
||||||
|
ParentId: prm.ParentID,
|
||||||
|
Meta: metaToKV(prm.Meta),
|
||||||
|
BearerToken: prm.BearerToken,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.signRequest(request.Body, func(key, sign []byte) {
|
||||||
|
request.Signature = &grpcService.Signature{
|
||||||
|
Key: key,
|
||||||
|
Sign: sign,
|
||||||
|
}
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||||
|
if _, err := client.Move(ctx, request); err != nil {
|
||||||
|
return handleError("failed to move node", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveNode invokes eponymous method from TreeServiceClient.
|
||||||
|
//
|
||||||
|
// Can return predefined errors:
|
||||||
|
// * ErrNodeNotFound
|
||||||
|
// * ErrNodeAccessDenied.
|
||||||
|
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
||||||
|
request := &grpcService.RemoveRequest{
|
||||||
|
Body: &grpcService.RemoveRequest_Body{
|
||||||
|
ContainerId: prm.CID[:],
|
||||||
|
TreeId: prm.TreeID,
|
||||||
|
NodeId: prm.NodeID,
|
||||||
|
BearerToken: prm.BearerToken,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := p.signRequest(request.Body, func(key, sign []byte) {
|
||||||
|
request.Signature = &grpcService.Signature{
|
||||||
|
Key: key,
|
||||||
|
Sign: sign,
|
||||||
|
}
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||||
|
if _, err := client.Remove(ctx, request); err != nil {
|
||||||
|
return handleError("failed to remove node", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the Pool and releases all the associated resources.
|
||||||
|
func (p *Pool) Close() error {
|
||||||
|
p.cancel()
|
||||||
|
<-p.closedCh
|
||||||
|
|
||||||
|
var err error
|
||||||
|
for _, group := range p.innerPools {
|
||||||
|
for _, cl := range group.clients {
|
||||||
|
if closeErr := cl.close(); closeErr != nil {
|
||||||
|
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
|
||||||
|
err = closeErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleError(msg string, err error) error {
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if strings.Contains(err.Error(), "not found") {
|
||||||
|
return fmt.Errorf("%w: %s", ErrNodeNotFound, err.Error())
|
||||||
|
} else if strings.Contains(err.Error(), "is denied by") {
|
||||||
|
return fmt.Errorf("%w: %s", ErrNodeAccessDenied, err.Error())
|
||||||
|
}
|
||||||
|
return fmt.Errorf("%s: %w", msg, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
|
||||||
|
result := make([]*grpcService.KeyValue, 0, len(meta))
|
||||||
|
|
||||||
|
for key, value := range meta {
|
||||||
|
result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)})
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func adjustNodeParams(nodeParams []pool.NodeParam) ([][]pool.NodeParam, error) {
|
||||||
|
if len(nodeParams) == 0 {
|
||||||
|
return nil, errors.New("no FrostFS peers configured")
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeParamsMap := make(map[int][]pool.NodeParam)
|
||||||
|
for _, param := range nodeParams {
|
||||||
|
nodes := nodeParamsMap[param.Priority()]
|
||||||
|
nodeParamsMap[param.Priority()] = append(nodes, param)
|
||||||
|
}
|
||||||
|
|
||||||
|
res := make([][]pool.NodeParam, 0, len(nodeParamsMap))
|
||||||
|
for _, nodes := range nodeParamsMap {
|
||||||
|
res = append(res, nodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(res, func(i, j int) bool {
|
||||||
|
return res[i][0].Priority() < res[j][0].Priority()
|
||||||
|
})
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func fillDefaultInitParams(params *InitParameters) {
|
||||||
|
if params.clientRebalanceInterval <= 0 {
|
||||||
|
params.clientRebalanceInterval = defaultRebalanceInterval
|
||||||
|
}
|
||||||
|
|
||||||
|
if params.healthcheckTimeout <= 0 {
|
||||||
|
params.healthcheckTimeout = defaultHealthcheckTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
if params.nodeDialTimeout <= 0 {
|
||||||
|
params.nodeDialTimeout = defaultDialTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
if params.nodeStreamTimeout <= 0 {
|
||||||
|
params.nodeStreamTimeout = defaultStreamTimeout
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||||
|
if p.logger == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
p.logger.Log(level, msg, fields...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// startRebalance runs loop to monitor tree client healthy status.
|
||||||
|
func (p *Pool) startRebalance(ctx context.Context) {
|
||||||
|
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
|
||||||
|
buffers := make([][]bool, len(p.rebalanceParams.nodesGroup))
|
||||||
|
for i, nodes := range p.rebalanceParams.nodesGroup {
|
||||||
|
buffers[i] = make([]bool, len(nodes))
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
close(p.closedCh)
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
p.updateNodesHealth(ctx, buffers)
|
||||||
|
ticker.Reset(p.rebalanceParams.clientRebalanceInterval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) {
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for i, inner := range p.innerPools {
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func(i int, innerPool *innerPool) {
|
||||||
|
defer wg.Done()
|
||||||
|
p.updateInnerNodesHealth(ctx, i, buffers[i])
|
||||||
|
}(i, inner)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
LOOP:
|
||||||
|
for i, buffer := range buffers {
|
||||||
|
for j, healthy := range buffer {
|
||||||
|
if healthy {
|
||||||
|
p.setStartIndices(i, j)
|
||||||
|
break LOOP
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, buffer []bool) {
|
||||||
|
if i > len(p.innerPools)-1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
nodesByPriority := p.innerPools[i]
|
||||||
|
options := p.rebalanceParams
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for j, cli := range nodesByPriority.clients {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(j int, cli client) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
|
||||||
|
defer c()
|
||||||
|
|
||||||
|
changed, err := cli.redialIfNecessary(tctx)
|
||||||
|
healthy := err == nil
|
||||||
|
if changed {
|
||||||
|
fields := []zap.Field{zap.String("address", cli.endpoint()), zap.Bool("healthy", healthy)}
|
||||||
|
if err != nil {
|
||||||
|
fields = append(fields, zap.Error(err))
|
||||||
|
}
|
||||||
|
p.log(zap.DebugLevel, "tree health has changed", fields...)
|
||||||
|
} else if err != nil {
|
||||||
|
p.log(zap.DebugLevel, "tree redial error", zap.String("address", cli.endpoint()), zap.Error(err))
|
||||||
|
}
|
||||||
|
buffer[j] = healthy
|
||||||
|
}(j, cli)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool) getStartIndices() (int, int) {
|
||||||
|
p.startIndicesMtx.RLock()
|
||||||
|
defer p.startIndicesMtx.RUnlock()
|
||||||
|
|
||||||
|
return p.startIndices[0], p.startIndices[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool) setStartIndices(i, j int) {
|
||||||
|
p.startIndicesMtx.Lock()
|
||||||
|
p.startIndices[0] = i
|
||||||
|
p.startIndices[1] = j
|
||||||
|
p.startIndicesMtx.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
cl grpcService.TreeServiceClient
|
||||||
|
)
|
||||||
|
|
||||||
|
startI, startJ := p.getStartIndices()
|
||||||
|
groupsLen := len(p.innerPools)
|
||||||
|
for i := startI; i < startI+groupsLen; i++ {
|
||||||
|
indexI := i % groupsLen
|
||||||
|
clientsLen := len(p.innerPools[i].clients)
|
||||||
|
for j := startJ; j < startJ+clientsLen; j++ {
|
||||||
|
indexJ := j % clientsLen
|
||||||
|
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
|
||||||
|
err = fn(cl)
|
||||||
|
}
|
||||||
|
if !shouldTryAgain(err) {
|
||||||
|
if startI != indexI || startJ != indexJ {
|
||||||
|
p.setStartIndices(indexI, indexJ)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
||||||
|
}
|
||||||
|
startJ = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func shouldTryAgain(err error) bool {
|
||||||
|
return !(err == nil ||
|
||||||
|
errors.Is(err, ErrNodeNotFound) ||
|
||||||
|
errors.Is(err, ErrNodeAccessDenied))
|
||||||
|
}
|
25
pool/tree/pool_signature.go
Normal file
25
pool/tree/pool_signature.go
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
crypto "git.frostfs.info/TrueCloudLab/frostfs-crypto"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (p *Pool) signData(buf []byte, f func(key, sign []byte)) error {
|
||||||
|
sign, err := crypto.Sign(&p.key.PrivateKey, buf)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
f(p.key.PublicKey().Bytes(), sign)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool) signRequest(requestBody proto.Message, f func(key, sign []byte)) error {
|
||||||
|
buf, err := proto.Marshal(requestBody)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.signData(buf, f)
|
||||||
|
}
|
Loading…
Reference in a new issue