From b9baebbed74c5949e33d75366096a265472dbec8 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 11 Apr 2023 15:16:14 +0300 Subject: [PATCH] [#74] tree: Simplify retry Signed-off-by: Denis Kirillov --- internal/frostfs/services/tree_client_grpc.go | 87 ++++++++++--------- .../frostfs/services/tree_client_grpc_test.go | 46 ++++------ 2 files changed, 63 insertions(+), 70 deletions(-) diff --git a/internal/frostfs/services/tree_client_grpc.go b/internal/frostfs/services/tree_client_grpc.go index f80a082..11acc05 100644 --- a/internal/frostfs/services/tree_client_grpc.go +++ b/internal/frostfs/services/tree_client_grpc.go @@ -73,7 +73,7 @@ type ServiceClientGRPC struct { key *keys.PrivateKey log *zap.Logger clients []treeClient - startIndex atomic.Int32 + startIndex int32 } type treeClient struct { @@ -146,11 +146,12 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams return nil, err } - resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetNodeByPathRequest, *grpcService.GetNodeByPathResponse] { - return client.service.GetNodeByPath - }) - if err != nil { - return nil, handleError("failed to get node by path", err) + var resp *grpcService.GetNodeByPathResponse + if err := c.requestWithRetry(func(client treeClient) (inErr error) { + resp, inErr = client.service.GetNodeByPath(ctx, request) + return handleError("failed to get node by path", inErr) + }); err != nil { + return nil, err } res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes())) @@ -181,11 +182,12 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket return nil, err } - cli, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetSubTreeRequest, grpcService.TreeService_GetSubTreeClient] { - return client.service.GetSubTree - }) - if err != nil { - return nil, handleError("failed to get sub tree client", err) + var cli grpcService.TreeService_GetSubTreeClient + if err := c.requestWithRetry(func(client treeClient) (inErr error) { + cli, inErr = client.service.GetSubTree(ctx, request) + return handleError("failed to get sub tree client", inErr) + }); err != nil { + return nil, err } var subtree []tree.NodeResponse @@ -221,11 +223,12 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf return 0, err } - resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddRequest, *grpcService.AddResponse] { - return client.service.Add - }) - if err != nil { - return 0, handleError("failed to add node", err) + var resp *grpcService.AddResponse + if err := c.requestWithRetry(func(client treeClient) (inErr error) { + resp, inErr = client.service.Add(ctx, request) + return handleError("failed to add node", inErr) + }); err != nil { + return 0, err } return resp.GetBody().GetNodeId(), nil @@ -252,11 +255,12 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc return 0, err } - resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddByPathRequest, *grpcService.AddByPathResponse] { - return client.service.AddByPath - }) - if err != nil { - return 0, handleError("failed to add node by path", err) + var resp *grpcService.AddByPathResponse + if err := c.requestWithRetry(func(client treeClient) (inErr error) { + resp, inErr = client.service.AddByPath(ctx, request) + return handleError("failed to add node by path", inErr) + }); err != nil { + return 0, err } body := resp.GetBody() @@ -291,14 +295,12 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn return err } - _, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.MoveRequest, *grpcService.MoveResponse] { - return client.service.Move + return c.requestWithRetry(func(client treeClient) error { + if _, err := client.service.Move(ctx, request); err != nil { + return handleError("failed to move node", err) + } + return nil }) - if err != nil { - return handleError("failed to move node", err) - } - - return nil } func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error { @@ -319,31 +321,27 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket return err } - _, err := requestWithRetry(ctx, request, c, func(cl treeClient) grpcFunc[*grpcService.RemoveRequest, *grpcService.RemoveResponse] { - return cl.service.Remove + return c.requestWithRetry(func(client treeClient) error { + if _, err := client.service.Remove(ctx, request); err != nil { + return handleError("failed to remove node", err) + } + return nil }) - if err != nil { - return handleError("failed to remove node", err) - } - - return nil } -type grpcFunc[Req any, Resp any] func(context.Context, Req, ...grpc.CallOption) (Resp, error) - -func requestWithRetry[Req any, Resp any](ctx context.Context, req Req, c *ServiceClientGRPC, fn func(client treeClient) grpcFunc[Req, Resp]) (res Resp, err error) { - start := int(c.startIndex.Load()) +func (c *ServiceClientGRPC) requestWithRetry(fn func(client treeClient) error) (err error) { + start := int(atomic.LoadInt32(&c.startIndex)) for i := start; i < start+len(c.clients); i++ { index := i % len(c.clients) - res, err = fn(c.clients[index])(ctx, req) + err = fn(c.clients[index]) if !shouldTryAgain(err) { - c.startIndex.Store(int32(index)) - return res, err + atomic.StoreInt32(&c.startIndex, int32(index)) + return err } c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err)) } - return res, err + return err } func shouldTryAgain(err error) bool { @@ -396,6 +394,9 @@ func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte { } func handleError(msg string, err error) error { + if err == nil { + return nil + } if strings.Contains(err.Error(), "not found") { return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error()) } else if strings.Contains(err.Error(), "is denied by") { diff --git a/internal/frostfs/services/tree_client_grpc_test.go b/internal/frostfs/services/tree_client_grpc_test.go index f89b77c..b94b2a7 100644 --- a/internal/frostfs/services/tree_client_grpc_test.go +++ b/internal/frostfs/services/tree_client_grpc_test.go @@ -1,14 +1,13 @@ package services import ( - "context" "errors" + "sync/atomic" "testing" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" - "google.golang.org/grpc" ) func TestHandleError(t *testing.T) { @@ -38,8 +37,6 @@ func TestHandleError(t *testing.T) { } func TestRetry(t *testing.T) { - ctx := context.Background() - cl := &ServiceClientGRPC{ log: zaptest.NewLogger(t), clients: []treeClient{ @@ -50,51 +47,46 @@ func TestRetry(t *testing.T) { }, } - fn := func(client treeClient) grpcFunc[[]string, string] { - return func(ctx context.Context, shouldFail []string, opts ...grpc.CallOption) (string, error) { + makeFn := func(shouldFail []string) func(treeClient) error { + return func(client treeClient) error { for _, item := range shouldFail { if item == client.address { - return "", errors.New("not found") + return errors.New("not found") } } - return client.address, nil + return nil } } t.Run("first ok", func(t *testing.T) { - resp, err := requestWithRetry(ctx, []string{}, cl, fn) + err := cl.requestWithRetry(makeFn([]string{})) require.NoError(t, err) - require.Equal(t, "node0", resp) - require.Equal(t, 0, int(cl.startIndex.Load())) - cl.startIndex.Store(0) + require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex))) + atomic.StoreInt32(&cl.startIndex, 0) }) t.Run("first failed", func(t *testing.T) { - resp, err := requestWithRetry(ctx, []string{"node0"}, cl, fn) + err := cl.requestWithRetry(makeFn([]string{"node0"})) require.NoError(t, err) - require.Equal(t, "node1", resp) - require.Equal(t, 1, int(cl.startIndex.Load())) - cl.startIndex.Store(0) + require.Equal(t, 1, int(atomic.LoadInt32(&cl.startIndex))) + atomic.StoreInt32(&cl.startIndex, 0) }) t.Run("all failed", func(t *testing.T) { - resp, err := requestWithRetry(ctx, []string{"node0", "node1", "node2", "node3"}, cl, fn) + err := cl.requestWithRetry(makeFn([]string{"node0", "node1", "node2", "node3"})) require.Error(t, err) - require.Equal(t, "", resp) - require.Equal(t, 0, int(cl.startIndex.Load())) - cl.startIndex.Store(0) + require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex))) + atomic.StoreInt32(&cl.startIndex, 0) }) t.Run("round", func(t *testing.T) { - resp, err := requestWithRetry(ctx, []string{"node0", "node1"}, cl, fn) + err := cl.requestWithRetry(makeFn([]string{"node0", "node1"})) require.NoError(t, err) - require.Equal(t, "node2", resp) - require.Equal(t, 2, int(cl.startIndex.Load())) + require.Equal(t, 2, int(atomic.LoadInt32(&cl.startIndex))) - resp, err = requestWithRetry(ctx, []string{"node2", "node3"}, cl, fn) + err = cl.requestWithRetry(makeFn([]string{"node2", "node3"})) require.NoError(t, err) - require.Equal(t, "node0", resp) - require.Equal(t, 0, int(cl.startIndex.Load())) - cl.startIndex.Store(0) + require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex))) + atomic.StoreInt32(&cl.startIndex, 0) }) }