From 9da77667f380ddf5f549312490a321d415465d85 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 4 Apr 2023 17:47:16 +0300 Subject: [PATCH] [#74] Add round tree retry Signed-off-by: Denis Kirillov --- internal/frostfs/services/tree_client_grpc.go | 27 +++++--- .../frostfs/services/tree_client_grpc_test.go | 65 +++++++++++++++++++ 2 files changed, 84 insertions(+), 8 deletions(-) diff --git a/internal/frostfs/services/tree_client_grpc.go b/internal/frostfs/services/tree_client_grpc.go index f40617dc..f80a082d 100644 --- a/internal/frostfs/services/tree_client_grpc.go +++ b/internal/frostfs/services/tree_client_grpc.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "strings" + "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" @@ -69,9 +70,10 @@ func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta { } type ServiceClientGRPC struct { - key *keys.PrivateKey - log *zap.Logger - clients []treeClient + key *keys.PrivateKey + log *zap.Logger + clients []treeClient + startIndex atomic.Int32 } type treeClient struct { @@ -330,12 +332,15 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket type grpcFunc[Req any, Resp any] func(context.Context, Req, ...grpc.CallOption) (Resp, error) func requestWithRetry[Req any, Resp any](ctx context.Context, req Req, c *ServiceClientGRPC, fn func(client treeClient) grpcFunc[Req, Resp]) (res Resp, err error) { - for _, client := range c.clients { - res, err = fn(client)(ctx, req) + start := int(c.startIndex.Load()) + for i := start; i < start+len(c.clients); i++ { + index := i % len(c.clients) + res, err = fn(c.clients[index])(ctx, req) if !shouldTryAgain(err) { + c.startIndex.Store(int32(index)) return res, err } - c.log.Debug("tree request error", zap.String("address", client.address), zap.Error(err)) + c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err)) } return res, err @@ -347,8 +352,14 @@ func shouldTryAgain(err error) bool { } code := status.Code(unwrapErr(err)) - if code == codes.Unavailable || code == codes.Unimplemented || - strings.Contains(err.Error(), "not found") { + if code == codes.Unavailable || code == codes.Unimplemented { + return true + } + + errText := err.Error() + if strings.Contains(errText, "not found") || + strings.Contains(errText, "shard is in read-only mode") || + strings.Contains(errText, "shard is in degraded mode") { return true } diff --git a/internal/frostfs/services/tree_client_grpc_test.go b/internal/frostfs/services/tree_client_grpc_test.go index 724b9093..f89b77c3 100644 --- a/internal/frostfs/services/tree_client_grpc_test.go +++ b/internal/frostfs/services/tree_client_grpc_test.go @@ -1,11 +1,14 @@ package services import ( + "context" "errors" "testing" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "google.golang.org/grpc" ) func TestHandleError(t *testing.T) { @@ -33,3 +36,65 @@ func TestHandleError(t *testing.T) { }) } } + +func TestRetry(t *testing.T) { + ctx := context.Background() + + cl := &ServiceClientGRPC{ + log: zaptest.NewLogger(t), + clients: []treeClient{ + {address: "node0"}, + {address: "node1"}, + {address: "node2"}, + {address: "node3"}, + }, + } + + fn := func(client treeClient) grpcFunc[[]string, string] { + return func(ctx context.Context, shouldFail []string, opts ...grpc.CallOption) (string, error) { + for _, item := range shouldFail { + if item == client.address { + return "", errors.New("not found") + } + } + return client.address, nil + } + } + + t.Run("first ok", func(t *testing.T) { + resp, err := requestWithRetry(ctx, []string{}, cl, fn) + require.NoError(t, err) + require.Equal(t, "node0", resp) + require.Equal(t, 0, int(cl.startIndex.Load())) + cl.startIndex.Store(0) + }) + + t.Run("first failed", func(t *testing.T) { + resp, err := requestWithRetry(ctx, []string{"node0"}, cl, fn) + require.NoError(t, err) + require.Equal(t, "node1", resp) + require.Equal(t, 1, int(cl.startIndex.Load())) + cl.startIndex.Store(0) + }) + + t.Run("all failed", func(t *testing.T) { + resp, err := requestWithRetry(ctx, []string{"node0", "node1", "node2", "node3"}, cl, fn) + require.Error(t, err) + require.Equal(t, "", resp) + require.Equal(t, 0, int(cl.startIndex.Load())) + cl.startIndex.Store(0) + }) + + t.Run("round", func(t *testing.T) { + resp, err := requestWithRetry(ctx, []string{"node0", "node1"}, cl, fn) + require.NoError(t, err) + require.Equal(t, "node2", resp) + require.Equal(t, 2, int(cl.startIndex.Load())) + + resp, err = requestWithRetry(ctx, []string{"node2", "node3"}, cl, fn) + require.NoError(t, err) + require.Equal(t, "node0", resp) + require.Equal(t, 0, int(cl.startIndex.Load())) + cl.startIndex.Store(0) + }) +}