[#74] Support multiple tree endpoints
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
2ab6f004f1
commit
ca8791a5fd
2 changed files with 103 additions and 21 deletions
|
@ -113,14 +113,14 @@ func (a *App) init(ctx context.Context) {
|
||||||
func (a *App) initLayer(ctx context.Context) {
|
func (a *App) initLayer(ctx context.Context) {
|
||||||
a.initResolver()
|
a.initResolver()
|
||||||
|
|
||||||
treeServiceEndpoint := a.cfg.GetString(cfgTreeServiceEndpoint)
|
treeServiceEndpoint := a.cfg.GetStringSlice(cfgTreeServiceEndpoint)
|
||||||
grpcDialOpt := grpc.WithTransportCredentials(insecure.NewCredentials())
|
grpcDialOpt := grpc.WithTransportCredentials(insecure.NewCredentials())
|
||||||
treeGRPCClient, err := services.NewTreeServiceClientGRPC(ctx, treeServiceEndpoint, a.key, grpcDialOpt)
|
treeGRPCClient, err := services.NewTreeServiceClientGRPC(ctx, treeServiceEndpoint, a.key, a.log, grpcDialOpt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Fatal("failed to create tree service", zap.Error(err))
|
a.log.Fatal("failed to create tree service", zap.Error(err))
|
||||||
}
|
}
|
||||||
treeService := tree.NewTree(treeGRPCClient)
|
treeService := tree.NewTree(treeGRPCClient)
|
||||||
a.log.Info("init tree service", zap.String("endpoint", treeServiceEndpoint))
|
a.log.Info("init tree service", zap.Strings("endpoints", treeGRPCClient.Endpoints()))
|
||||||
|
|
||||||
// prepare random key for anonymous requests
|
// prepare random key for anonymous requests
|
||||||
randomKey, err := keys.NewPrivateKey()
|
randomKey, err := keys.NewPrivateKey()
|
||||||
|
|
|
@ -14,7 +14,10 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
type GetNodeByPathResponseInfoWrapper struct {
|
type GetNodeByPathResponseInfoWrapper struct {
|
||||||
|
@ -67,26 +70,55 @@ func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
|
||||||
|
|
||||||
type ServiceClientGRPC struct {
|
type ServiceClientGRPC struct {
|
||||||
key *keys.PrivateKey
|
key *keys.PrivateKey
|
||||||
|
log *zap.Logger
|
||||||
|
clients []treeClient
|
||||||
|
}
|
||||||
|
|
||||||
|
type treeClient struct {
|
||||||
|
address string
|
||||||
conn *grpc.ClientConn
|
conn *grpc.ClientConn
|
||||||
service grpcService.TreeServiceClient
|
service grpcService.TreeServiceClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTreeServiceClientGRPC(ctx context.Context, addr string, key *keys.PrivateKey, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) {
|
func (c *ServiceClientGRPC) Endpoints() []string {
|
||||||
conn, err := grpc.Dial(addr, grpcOpts...)
|
res := make([]string, len(c.clients))
|
||||||
if err != nil {
|
for i, client := range c.clients {
|
||||||
return nil, fmt.Errorf("did not connect: %v", err)
|
res[i] = client.address
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTreeServiceClientGRPC(ctx context.Context, endpoints []string, key *keys.PrivateKey, log *zap.Logger, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) {
|
||||||
|
res := &ServiceClientGRPC{
|
||||||
|
key: key,
|
||||||
|
log: log,
|
||||||
}
|
}
|
||||||
|
|
||||||
c := grpcService.NewTreeServiceClient(conn)
|
for _, addr := range endpoints {
|
||||||
if _, err = c.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
conn, err := grpc.Dial(addr, grpcOpts...)
|
||||||
return nil, fmt.Errorf("healthcheck: %w", err)
|
if err != nil {
|
||||||
|
log.Warn("dial node tree service", zap.String("address", addr), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
c := grpcService.NewTreeServiceClient(conn)
|
||||||
|
if _, err = c.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
||||||
|
log.Warn("healthcheck tree service", zap.String("address", addr), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
res.clients = append(res.clients, treeClient{
|
||||||
|
address: addr,
|
||||||
|
conn: conn,
|
||||||
|
service: c,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ServiceClientGRPC{
|
if len(res.clients) == 0 {
|
||||||
key: key,
|
return nil, errors.New("no healthy tree grpc client")
|
||||||
conn: conn,
|
}
|
||||||
service: c,
|
|
||||||
}, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) {
|
func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) {
|
||||||
|
@ -112,7 +144,9 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.service.GetNodeByPath(ctx, request)
|
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetNodeByPathRequest, *grpcService.GetNodeByPathResponse] {
|
||||||
|
return client.service.GetNodeByPath
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, handleError("failed to get node by path", err)
|
return nil, handleError("failed to get node by path", err)
|
||||||
}
|
}
|
||||||
|
@ -145,7 +179,9 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cli, err := c.service.GetSubTree(ctx, request)
|
cli, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetSubTreeRequest, grpcService.TreeService_GetSubTreeClient] {
|
||||||
|
return client.service.GetSubTree
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, handleError("failed to get sub tree client", err)
|
return nil, handleError("failed to get sub tree client", err)
|
||||||
}
|
}
|
||||||
|
@ -183,7 +219,9 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.service.Add(ctx, request)
|
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddRequest, *grpcService.AddResponse] {
|
||||||
|
return client.service.Add
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, handleError("failed to add node", err)
|
return 0, handleError("failed to add node", err)
|
||||||
}
|
}
|
||||||
|
@ -212,7 +250,9 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.service.AddByPath(ctx, request)
|
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddByPathRequest, *grpcService.AddByPathResponse] {
|
||||||
|
return client.service.AddByPath
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, handleError("failed to add node by path", err)
|
return 0, handleError("failed to add node by path", err)
|
||||||
}
|
}
|
||||||
|
@ -249,7 +289,10 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := c.service.Move(ctx, request); err != nil {
|
_, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.MoveRequest, *grpcService.MoveResponse] {
|
||||||
|
return client.service.Move
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
return handleError("failed to move node", err)
|
return handleError("failed to move node", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -274,13 +317,52 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := c.service.Remove(ctx, request); err != nil {
|
_, err := requestWithRetry(ctx, request, c, func(cl treeClient) grpcFunc[*grpcService.RemoveRequest, *grpcService.RemoveResponse] {
|
||||||
|
return cl.service.Remove
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
return handleError("failed to remove node", err)
|
return handleError("failed to remove node", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type grpcFunc[Req any, Resp any] func(context.Context, Req, ...grpc.CallOption) (Resp, error)
|
||||||
|
|
||||||
|
func requestWithRetry[Req any, Resp any](ctx context.Context, req Req, c *ServiceClientGRPC, fn func(client treeClient) grpcFunc[Req, Resp]) (res Resp, err error) {
|
||||||
|
for _, client := range c.clients {
|
||||||
|
res, err = fn(client)(ctx, req)
|
||||||
|
if !shouldTryAgain(err) {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
c.log.Debug("tree request error", zap.String("address", client.address), zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func shouldTryAgain(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
code := status.Code(unwrapErr(err))
|
||||||
|
if code == codes.Unavailable || code == codes.Unimplemented ||
|
||||||
|
strings.Contains(err.Error(), "not found") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func unwrapErr(err error) error {
|
||||||
|
for e := errors.Unwrap(err); e != nil; e = errors.Unwrap(err) {
|
||||||
|
err = e
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
|
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
|
||||||
result := make([]*grpcService.KeyValue, 0, len(meta))
|
result := make([]*grpcService.KeyValue, 0, len(meta))
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue