From b60afd88c46f1b942e2623c9050d4f8ebea00e89 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Mon, 3 Apr 2023 17:03:10 +0300 Subject: [PATCH] [#74] Support multiple tree endpoints Signed-off-by: Denis Kirillov --- cmd/s3-gw/app.go | 6 +- internal/frostfs/services/tree_client_grpc.go | 118 +++++++++++++++--- 2 files changed, 103 insertions(+), 21 deletions(-) diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 7ea8151..f4faff2 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -113,14 +113,14 @@ func (a *App) init(ctx context.Context) { func (a *App) initLayer(ctx context.Context) { a.initResolver() - treeServiceEndpoint := a.cfg.GetString(cfgTreeServiceEndpoint) + treeServiceEndpoint := a.cfg.GetStringSlice(cfgTreeServiceEndpoint) grpcDialOpt := grpc.WithTransportCredentials(insecure.NewCredentials()) - treeGRPCClient, err := services.NewTreeServiceClientGRPC(ctx, treeServiceEndpoint, a.key, grpcDialOpt) + treeGRPCClient, err := services.NewTreeServiceClientGRPC(ctx, treeServiceEndpoint, a.key, a.log, grpcDialOpt) if err != nil { a.log.Fatal("failed to create tree service", zap.Error(err)) } treeService := tree.NewTree(treeGRPCClient) - a.log.Info("init tree service", zap.String("endpoint", treeServiceEndpoint)) + a.log.Info("init tree service", zap.Strings("endpoints", treeGRPCClient.Endpoints())) // prepare random key for anonymous requests randomKey, err := keys.NewPrivateKey() diff --git a/internal/frostfs/services/tree_client_grpc.go b/internal/frostfs/services/tree_client_grpc.go index ff582bf..f40617d 100644 --- a/internal/frostfs/services/tree_client_grpc.go +++ b/internal/frostfs/services/tree_client_grpc.go @@ -14,7 +14,10 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type GetNodeByPathResponseInfoWrapper struct { @@ -67,26 +70,55 @@ func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta { type ServiceClientGRPC struct { key *keys.PrivateKey + log *zap.Logger + clients []treeClient +} + +type treeClient struct { + address string conn *grpc.ClientConn service grpcService.TreeServiceClient } -func NewTreeServiceClientGRPC(ctx context.Context, addr string, key *keys.PrivateKey, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) { - conn, err := grpc.Dial(addr, grpcOpts...) - if err != nil { - return nil, fmt.Errorf("did not connect: %v", err) +func (c *ServiceClientGRPC) Endpoints() []string { + res := make([]string, len(c.clients)) + for i, client := range c.clients { + res[i] = client.address + } + return res +} + +func NewTreeServiceClientGRPC(ctx context.Context, endpoints []string, key *keys.PrivateKey, log *zap.Logger, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) { + res := &ServiceClientGRPC{ + key: key, + log: log, } - c := grpcService.NewTreeServiceClient(conn) - if _, err = c.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil { - return nil, fmt.Errorf("healthcheck: %w", err) + for _, addr := range endpoints { + conn, err := grpc.Dial(addr, grpcOpts...) + if err != nil { + log.Warn("dial node tree service", zap.String("address", addr), zap.Error(err)) + continue + } + + c := grpcService.NewTreeServiceClient(conn) + if _, err = c.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil { + log.Warn("healthcheck tree service", zap.String("address", addr), zap.Error(err)) + continue + } + + res.clients = append(res.clients, treeClient{ + address: addr, + conn: conn, + service: c, + }) } - return &ServiceClientGRPC{ - key: key, - conn: conn, - service: c, - }, nil + if len(res.clients) == 0 { + return nil, errors.New("no healthy tree grpc client") + } + + return res, nil } func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) { @@ -112,7 +144,9 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams return nil, err } - resp, err := c.service.GetNodeByPath(ctx, request) + resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetNodeByPathRequest, *grpcService.GetNodeByPathResponse] { + return client.service.GetNodeByPath + }) if err != nil { return nil, handleError("failed to get node by path", err) } @@ -145,7 +179,9 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket return nil, err } - cli, err := c.service.GetSubTree(ctx, request) + cli, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetSubTreeRequest, grpcService.TreeService_GetSubTreeClient] { + return client.service.GetSubTree + }) if err != nil { return nil, handleError("failed to get sub tree client", err) } @@ -183,7 +219,9 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf return 0, err } - resp, err := c.service.Add(ctx, request) + resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddRequest, *grpcService.AddResponse] { + return client.service.Add + }) if err != nil { return 0, handleError("failed to add node", err) } @@ -212,7 +250,9 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc return 0, err } - resp, err := c.service.AddByPath(ctx, request) + resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddByPathRequest, *grpcService.AddByPathResponse] { + return client.service.AddByPath + }) if err != nil { return 0, handleError("failed to add node by path", err) } @@ -249,7 +289,10 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn return err } - if _, err := c.service.Move(ctx, request); err != nil { + _, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.MoveRequest, *grpcService.MoveResponse] { + return client.service.Move + }) + if err != nil { return handleError("failed to move node", err) } @@ -274,13 +317,52 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket return err } - if _, err := c.service.Remove(ctx, request); err != nil { + _, err := requestWithRetry(ctx, request, c, func(cl treeClient) grpcFunc[*grpcService.RemoveRequest, *grpcService.RemoveResponse] { + return cl.service.Remove + }) + if err != nil { return handleError("failed to remove node", err) } return nil } +type grpcFunc[Req any, Resp any] func(context.Context, Req, ...grpc.CallOption) (Resp, error) + +func requestWithRetry[Req any, Resp any](ctx context.Context, req Req, c *ServiceClientGRPC, fn func(client treeClient) grpcFunc[Req, Resp]) (res Resp, err error) { + for _, client := range c.clients { + res, err = fn(client)(ctx, req) + if !shouldTryAgain(err) { + return res, err + } + c.log.Debug("tree request error", zap.String("address", client.address), zap.Error(err)) + } + + return res, err +} + +func shouldTryAgain(err error) bool { + if err == nil { + return false + } + + code := status.Code(unwrapErr(err)) + if code == codes.Unavailable || code == codes.Unimplemented || + strings.Contains(err.Error(), "not found") { + return true + } + + return false +} + +func unwrapErr(err error) error { + for e := errors.Unwrap(err); e != nil; e = errors.Unwrap(err) { + err = e + } + + return err +} + func metaToKV(meta map[string]string) []*grpcService.KeyValue { result := make([]*grpcService.KeyValue, 0, len(meta))