diff --git a/internal/frostfs/services/client/client.go b/internal/frostfs/services/client/client.go new file mode 100644 index 00000000..6aae93d7 --- /dev/null +++ b/internal/frostfs/services/client/client.go @@ -0,0 +1,70 @@ +package client + +import ( + "context" + "fmt" + "sync" + + grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree" + "google.golang.org/grpc" +) + +type TreeClient struct { + mu sync.RWMutex + address string + opts []grpc.DialOption + conn *grpc.ClientConn + service grpcService.TreeServiceClient + dialed bool +} + +// NewTreeClient creates new tree client with auto dial. +func NewTreeClient(addr string, opts ...grpc.DialOption) *TreeClient { + return &TreeClient{ + address: addr, + opts: opts, + } +} + +func (c *TreeClient) dial(ctx context.Context) error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.dialed { + return fmt.Errorf("couldn't dial '%s': connection already established", c.address) + } + + conn, err := grpc.Dial(c.address, c.opts...) + if err != nil { + return fmt.Errorf("grpc dial node tree service: %w", err) + } + + serviceClient := grpcService.NewTreeServiceClient(conn) + if _, err = serviceClient.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil { + return fmt.Errorf("healthcheck tree service: %w", err) + } + + c.conn = conn + c.service = serviceClient + c.dialed = true + + return nil +} + +func (c *TreeClient) TreeClient(ctx context.Context) (grpcService.TreeServiceClient, error) { + c.mu.RLock() + dialed := c.dialed + c.mu.RUnlock() + + if !dialed { + if err := c.dial(ctx); err != nil { + return nil, err + } + } + + return c.service, nil +} + +func (c *TreeClient) Address() string { + return c.address +} diff --git a/internal/frostfs/services/tree_client_grpc.go b/internal/frostfs/services/tree_client_grpc.go index 70a0e308..850a43f5 100644 --- a/internal/frostfs/services/tree_client_grpc.go +++ b/internal/frostfs/services/tree_client_grpc.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox" + treeClient "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/client" grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" @@ -70,20 +71,22 @@ func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta { type ServiceClientGRPC struct { key *keys.PrivateKey log *zap.Logger - clients []treeClient + clients []*treeClient.TreeClient startIndex int32 } -type treeClient struct { - address string - conn *grpc.ClientConn - service grpcService.TreeServiceClient +func (c *ServiceClientGRPC) getStartIndex() int { + return int(atomic.LoadInt32(&c.startIndex)) +} + +func (c *ServiceClientGRPC) setStartIndex(index int) { + atomic.StoreInt32(&c.startIndex, int32(index)) } func (c *ServiceClientGRPC) Endpoints() []string { res := make([]string, len(c.clients)) for i, client := range c.clients { - res[i] = client.address + res[i] = client.Address() } return res } @@ -94,30 +97,26 @@ func NewTreeServiceClientGRPC(ctx context.Context, endpoints []string, key *keys log: log, } - for _, addr := range endpoints { - conn, err := grpc.Dial(addr, grpcOpts...) - if err != nil { - log.Warn("dial node tree service", zap.String("address", addr), zap.Error(err)) + firstHealthy := -1 + + res.clients = make([]*treeClient.TreeClient, len(endpoints)) + for i, addr := range endpoints { + res.clients[i] = treeClient.NewTreeClient(addr, grpcOpts...) + if _, err := res.clients[i].TreeClient(ctx); err != nil { + log.Warn("dial tree", zap.String("address", addr), zap.Error(err)) continue } - - c := grpcService.NewTreeServiceClient(conn) - if _, err = c.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil { - log.Warn("healthcheck tree service", zap.String("address", addr), zap.Error(err)) - continue + if firstHealthy == -1 { + firstHealthy = i } - - res.clients = append(res.clients, treeClient{ - address: addr, - conn: conn, - service: c, - }) } - if len(res.clients) == 0 { + if firstHealthy == -1 { return nil, errors.New("no healthy tree grpc client") } + res.setStartIndex(firstHealthy) + return res, nil } @@ -148,8 +147,8 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams zap.String("method", "GetNodeByPath")) var resp *grpcService.GetNodeByPathResponse - if err := c.requestWithRetry(log, func(client treeClient) (inErr error) { - resp, inErr = client.service.GetNodeByPath(ctx, request) + if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) { + resp, inErr = client.GetNodeByPath(ctx, request) return handleError("failed to get node by path", inErr) }); err != nil { return nil, err @@ -187,8 +186,8 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket zap.String("method", "GetSubTree")) var cli grpcService.TreeService_GetSubTreeClient - if err := c.requestWithRetry(log, func(client treeClient) (inErr error) { - cli, inErr = client.service.GetSubTree(ctx, request) + if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) { + cli, inErr = client.GetSubTree(ctx, request) return handleError("failed to get sub tree client", inErr) }); err != nil { return nil, err @@ -231,8 +230,8 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf zap.String("method", "Add")) var resp *grpcService.AddResponse - if err := c.requestWithRetry(log, func(client treeClient) (inErr error) { - resp, inErr = client.service.Add(ctx, request) + if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) { + resp, inErr = client.Add(ctx, request) return handleError("failed to add node", inErr) }); err != nil { return 0, err @@ -266,8 +265,8 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc zap.String("method", "AddByPath")) var resp *grpcService.AddByPathResponse - if err := c.requestWithRetry(log, func(client treeClient) (inErr error) { - resp, inErr = client.service.AddByPath(ctx, request) + if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) { + resp, inErr = client.AddByPath(ctx, request) return handleError("failed to add node by path", inErr) }); err != nil { return 0, err @@ -308,8 +307,8 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID), zap.String("method", "Move")) - return c.requestWithRetry(log, func(client treeClient) error { - if _, err := client.service.Move(ctx, request); err != nil { + return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error { + if _, err := client.Move(ctx, request); err != nil { return handleError("failed to move node", err) } return nil @@ -337,24 +336,31 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID), zap.String("method", "Remove")) - return c.requestWithRetry(log, func(client treeClient) error { - if _, err := client.service.Remove(ctx, request); err != nil { + return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error { + if _, err := client.Remove(ctx, request); err != nil { return handleError("failed to remove node", err) } return nil }) } -func (c *ServiceClientGRPC) requestWithRetry(log *zap.Logger, fn func(client treeClient) error) (err error) { - start := int(atomic.LoadInt32(&c.startIndex)) +func (c *ServiceClientGRPC) requestWithRetry(ctx context.Context, log *zap.Logger, fn func(client grpcService.TreeServiceClient) error) error { + var ( + err error + cl grpcService.TreeServiceClient + ) + + start := c.getStartIndex() for i := start; i < start+len(c.clients); i++ { index := i % len(c.clients) - err = fn(c.clients[index]) + if cl, err = c.clients[index].TreeClient(ctx); err == nil { + err = fn(cl) + } if !shouldTryAgain(err) { - atomic.StoreInt32(&c.startIndex, int32(index)) + c.setStartIndex(index) return err } - log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err)) + log.Debug("tree request error", zap.String("address", c.clients[index].Address()), zap.Error(err)) } return err diff --git a/internal/frostfs/services/tree_client_grpc_test.go b/internal/frostfs/services/tree_client_grpc_test.go index b94b2a79..1edd1263 100644 --- a/internal/frostfs/services/tree_client_grpc_test.go +++ b/internal/frostfs/services/tree_client_grpc_test.go @@ -1,11 +1,15 @@ package services import ( + "context" "errors" - "sync/atomic" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "testing" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/client" + grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -37,56 +41,61 @@ func TestHandleError(t *testing.T) { } func TestRetry(t *testing.T) { + ctx := context.Background() + log := zaptest.NewLogger(t) + + grpcDialOpt := grpc.WithTransportCredentials(insecure.NewCredentials()) cl := &ServiceClientGRPC{ log: zaptest.NewLogger(t), - clients: []treeClient{ - {address: "node0"}, - {address: "node1"}, - {address: "node2"}, - {address: "node3"}, + clients: []*client.TreeClient{ + client.NewTreeClient("node0", grpcDialOpt), + client.NewTreeClient("node1", grpcDialOpt), + client.NewTreeClient("node2", grpcDialOpt), + client.NewTreeClient("node3", grpcDialOpt), }, } - makeFn := func(shouldFail []string) func(treeClient) error { - return func(client treeClient) error { - for _, item := range shouldFail { - if item == client.address { - return errors.New("not found") - } - } + makeFn := func(shouldFail []string) func(grpcService.TreeServiceClient) error { + return func(client grpcService.TreeServiceClient) error { + + //for _, item := range shouldFail { + //if item == client.address { + // return errors.New("not found") + //} + //} return nil } } t.Run("first ok", func(t *testing.T) { - err := cl.requestWithRetry(makeFn([]string{})) + err := cl.requestWithRetry(ctx, log, makeFn([]string{})) require.NoError(t, err) - require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex))) - atomic.StoreInt32(&cl.startIndex, 0) + require.Equal(t, 0, cl.getStartIndex()) + cl.setStartIndex(0) }) t.Run("first failed", func(t *testing.T) { - err := cl.requestWithRetry(makeFn([]string{"node0"})) + err := cl.requestWithRetry(ctx, log, makeFn([]string{"node0"})) require.NoError(t, err) - require.Equal(t, 1, int(atomic.LoadInt32(&cl.startIndex))) - atomic.StoreInt32(&cl.startIndex, 0) + require.Equal(t, 1, cl.getStartIndex()) + cl.setStartIndex(0) }) t.Run("all failed", func(t *testing.T) { - err := cl.requestWithRetry(makeFn([]string{"node0", "node1", "node2", "node3"})) + err := cl.requestWithRetry(ctx, log, makeFn([]string{"node0", "node1", "node2", "node3"})) require.Error(t, err) - require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex))) - atomic.StoreInt32(&cl.startIndex, 0) + require.Equal(t, 0, cl.getStartIndex()) + cl.setStartIndex(0) }) t.Run("round", func(t *testing.T) { - err := cl.requestWithRetry(makeFn([]string{"node0", "node1"})) + err := cl.requestWithRetry(ctx, log, makeFn([]string{"node0", "node1"})) require.NoError(t, err) - require.Equal(t, 2, int(atomic.LoadInt32(&cl.startIndex))) + require.Equal(t, 2, cl.getStartIndex()) - err = cl.requestWithRetry(makeFn([]string{"node2", "node3"})) + err = cl.requestWithRetry(ctx, log, makeFn([]string{"node2", "node3"})) require.NoError(t, err) - require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex))) - atomic.StoreInt32(&cl.startIndex, 0) + require.Equal(t, 0, cl.getStartIndex()) + cl.setStartIndex(0) }) }