[#107] pool/tree: Support grpc schemas
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
14ed3e177d
commit
388d1ca1de
2 changed files with 33 additions and 10 deletions
|
@ -2,11 +2,15 @@ package tree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
apiClient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||||
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
)
|
)
|
||||||
|
|
||||||
type treeClient struct {
|
type treeClient struct {
|
||||||
|
@ -35,12 +39,10 @@ func (c *treeClient) dial(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
c.conn, err = grpc.DialContext(ctx, c.address, c.opts...)
|
if c.conn, c.service, err = dialClient(ctx, c.address, c.opts...); err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
return fmt.Errorf("grpc dial node tree service: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.service = grpcService.NewTreeServiceClient(c.conn)
|
|
||||||
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
||||||
return fmt.Errorf("healthcheck tree service: %w", err)
|
return fmt.Errorf("healthcheck tree service: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -55,12 +57,9 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
if c.conn == nil {
|
if c.conn == nil {
|
||||||
c.conn, err = grpc.DialContext(ctx, c.address, c.opts...)
|
if c.conn, c.service, err = dialClient(ctx, c.address, c.opts...); err != nil {
|
||||||
if err != nil {
|
return false, err
|
||||||
return false, fmt.Errorf("grpc dial node tree service: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.service = grpcService.NewTreeServiceClient(c.conn)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wasHealthy := c.healthy
|
wasHealthy := c.healthy
|
||||||
|
@ -74,6 +73,30 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
|
||||||
return !wasHealthy, nil
|
return !wasHealthy, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func dialClient(ctx context.Context, addr string, clientOptions ...grpc.DialOption) (*grpc.ClientConn, grpcService.TreeServiceClient, error) {
|
||||||
|
host, tlsEnable, err := apiClient.ParseURI(addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("parse address: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
creds := insecure.NewCredentials()
|
||||||
|
if tlsEnable {
|
||||||
|
creds = credentials.NewTLS(&tls.Config{})
|
||||||
|
}
|
||||||
|
|
||||||
|
options := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
|
||||||
|
|
||||||
|
// the order is matter, we want client to be able to overwrite options.
|
||||||
|
opts := append(options, clientOptions...)
|
||||||
|
|
||||||
|
conn, err := grpc.DialContext(ctx, host, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("grpc dial node tree service: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return conn, grpcService.NewTreeServiceClient(conn), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
|
func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
defer c.mu.RUnlock()
|
defer c.mu.RUnlock()
|
||||||
|
|
|
@ -196,7 +196,7 @@ func (p *Pool) Dial(ctx context.Context) error {
|
||||||
for j, node := range nodes {
|
for j, node := range nodes {
|
||||||
clients[j] = newTreeClient(node.Address(), p.dialOptions...)
|
clients[j] = newTreeClient(node.Address(), p.dialOptions...)
|
||||||
if err := clients[j].dial(ctx); err != nil {
|
if err := clients[j].dial(ctx); err != nil {
|
||||||
p.log(zap.WarnLevel, "failed to build client", zap.String("address", node.Address()), zap.Error(err))
|
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue