forked from TrueCloudLab/frostfs-s3-gw
[#74] tree: Simplify retry
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
0c1e17dca4
commit
b9baebbed7
2 changed files with 63 additions and 70 deletions
|
@ -73,7 +73,7 @@ type ServiceClientGRPC struct {
|
||||||
key *keys.PrivateKey
|
key *keys.PrivateKey
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
clients []treeClient
|
clients []treeClient
|
||||||
startIndex atomic.Int32
|
startIndex int32
|
||||||
}
|
}
|
||||||
|
|
||||||
type treeClient struct {
|
type treeClient struct {
|
||||||
|
@ -146,11 +146,12 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetNodeByPathRequest, *grpcService.GetNodeByPathResponse] {
|
var resp *grpcService.GetNodeByPathResponse
|
||||||
return client.service.GetNodeByPath
|
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
|
||||||
})
|
resp, inErr = client.service.GetNodeByPath(ctx, request)
|
||||||
if err != nil {
|
return handleError("failed to get node by path", inErr)
|
||||||
return nil, handleError("failed to get node by path", err)
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
|
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
|
||||||
|
@ -181,11 +182,12 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cli, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetSubTreeRequest, grpcService.TreeService_GetSubTreeClient] {
|
var cli grpcService.TreeService_GetSubTreeClient
|
||||||
return client.service.GetSubTree
|
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
|
||||||
})
|
cli, inErr = client.service.GetSubTree(ctx, request)
|
||||||
if err != nil {
|
return handleError("failed to get sub tree client", inErr)
|
||||||
return nil, handleError("failed to get sub tree client", err)
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var subtree []tree.NodeResponse
|
var subtree []tree.NodeResponse
|
||||||
|
@ -221,11 +223,12 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddRequest, *grpcService.AddResponse] {
|
var resp *grpcService.AddResponse
|
||||||
return client.service.Add
|
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
|
||||||
})
|
resp, inErr = client.service.Add(ctx, request)
|
||||||
if err != nil {
|
return handleError("failed to add node", inErr)
|
||||||
return 0, handleError("failed to add node", err)
|
}); err != nil {
|
||||||
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp.GetBody().GetNodeId(), nil
|
return resp.GetBody().GetNodeId(), nil
|
||||||
|
@ -252,11 +255,12 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddByPathRequest, *grpcService.AddByPathResponse] {
|
var resp *grpcService.AddByPathResponse
|
||||||
return client.service.AddByPath
|
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
|
||||||
})
|
resp, inErr = client.service.AddByPath(ctx, request)
|
||||||
if err != nil {
|
return handleError("failed to add node by path", inErr)
|
||||||
return 0, handleError("failed to add node by path", err)
|
}); err != nil {
|
||||||
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
body := resp.GetBody()
|
body := resp.GetBody()
|
||||||
|
@ -291,14 +295,12 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.MoveRequest, *grpcService.MoveResponse] {
|
return c.requestWithRetry(func(client treeClient) error {
|
||||||
return client.service.Move
|
if _, err := client.service.Move(ctx, request); err != nil {
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return handleError("failed to move node", err)
|
return handleError("failed to move node", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
||||||
|
@ -319,31 +321,27 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := requestWithRetry(ctx, request, c, func(cl treeClient) grpcFunc[*grpcService.RemoveRequest, *grpcService.RemoveResponse] {
|
return c.requestWithRetry(func(client treeClient) error {
|
||||||
return cl.service.Remove
|
if _, err := client.service.Remove(ctx, request); err != nil {
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return handleError("failed to remove node", err)
|
return handleError("failed to remove node", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type grpcFunc[Req any, Resp any] func(context.Context, Req, ...grpc.CallOption) (Resp, error)
|
func (c *ServiceClientGRPC) requestWithRetry(fn func(client treeClient) error) (err error) {
|
||||||
|
start := int(atomic.LoadInt32(&c.startIndex))
|
||||||
func requestWithRetry[Req any, Resp any](ctx context.Context, req Req, c *ServiceClientGRPC, fn func(client treeClient) grpcFunc[Req, Resp]) (res Resp, err error) {
|
|
||||||
start := int(c.startIndex.Load())
|
|
||||||
for i := start; i < start+len(c.clients); i++ {
|
for i := start; i < start+len(c.clients); i++ {
|
||||||
index := i % len(c.clients)
|
index := i % len(c.clients)
|
||||||
res, err = fn(c.clients[index])(ctx, req)
|
err = fn(c.clients[index])
|
||||||
if !shouldTryAgain(err) {
|
if !shouldTryAgain(err) {
|
||||||
c.startIndex.Store(int32(index))
|
atomic.StoreInt32(&c.startIndex, int32(index))
|
||||||
return res, err
|
return err
|
||||||
}
|
}
|
||||||
c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err))
|
c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldTryAgain(err error) bool {
|
func shouldTryAgain(err error) bool {
|
||||||
|
@ -396,6 +394,9 @@ func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleError(msg string, err error) error {
|
func handleError(msg string, err error) error {
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
if strings.Contains(err.Error(), "not found") {
|
if strings.Contains(err.Error(), "not found") {
|
||||||
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
|
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
|
||||||
} else if strings.Contains(err.Error(), "is denied by") {
|
} else if strings.Contains(err.Error(), "is denied by") {
|
||||||
|
|
|
@ -1,14 +1,13 @@
|
||||||
package services
|
package services
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestHandleError(t *testing.T) {
|
func TestHandleError(t *testing.T) {
|
||||||
|
@ -38,8 +37,6 @@ func TestHandleError(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRetry(t *testing.T) {
|
func TestRetry(t *testing.T) {
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
cl := &ServiceClientGRPC{
|
cl := &ServiceClientGRPC{
|
||||||
log: zaptest.NewLogger(t),
|
log: zaptest.NewLogger(t),
|
||||||
clients: []treeClient{
|
clients: []treeClient{
|
||||||
|
@ -50,51 +47,46 @@ func TestRetry(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
fn := func(client treeClient) grpcFunc[[]string, string] {
|
makeFn := func(shouldFail []string) func(treeClient) error {
|
||||||
return func(ctx context.Context, shouldFail []string, opts ...grpc.CallOption) (string, error) {
|
return func(client treeClient) error {
|
||||||
for _, item := range shouldFail {
|
for _, item := range shouldFail {
|
||||||
if item == client.address {
|
if item == client.address {
|
||||||
return "", errors.New("not found")
|
return errors.New("not found")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return client.address, nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("first ok", func(t *testing.T) {
|
t.Run("first ok", func(t *testing.T) {
|
||||||
resp, err := requestWithRetry(ctx, []string{}, cl, fn)
|
err := cl.requestWithRetry(makeFn([]string{}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, "node0", resp)
|
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
|
||||||
require.Equal(t, 0, int(cl.startIndex.Load()))
|
atomic.StoreInt32(&cl.startIndex, 0)
|
||||||
cl.startIndex.Store(0)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("first failed", func(t *testing.T) {
|
t.Run("first failed", func(t *testing.T) {
|
||||||
resp, err := requestWithRetry(ctx, []string{"node0"}, cl, fn)
|
err := cl.requestWithRetry(makeFn([]string{"node0"}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, "node1", resp)
|
require.Equal(t, 1, int(atomic.LoadInt32(&cl.startIndex)))
|
||||||
require.Equal(t, 1, int(cl.startIndex.Load()))
|
atomic.StoreInt32(&cl.startIndex, 0)
|
||||||
cl.startIndex.Store(0)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("all failed", func(t *testing.T) {
|
t.Run("all failed", func(t *testing.T) {
|
||||||
resp, err := requestWithRetry(ctx, []string{"node0", "node1", "node2", "node3"}, cl, fn)
|
err := cl.requestWithRetry(makeFn([]string{"node0", "node1", "node2", "node3"}))
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Equal(t, "", resp)
|
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
|
||||||
require.Equal(t, 0, int(cl.startIndex.Load()))
|
atomic.StoreInt32(&cl.startIndex, 0)
|
||||||
cl.startIndex.Store(0)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("round", func(t *testing.T) {
|
t.Run("round", func(t *testing.T) {
|
||||||
resp, err := requestWithRetry(ctx, []string{"node0", "node1"}, cl, fn)
|
err := cl.requestWithRetry(makeFn([]string{"node0", "node1"}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, "node2", resp)
|
require.Equal(t, 2, int(atomic.LoadInt32(&cl.startIndex)))
|
||||||
require.Equal(t, 2, int(cl.startIndex.Load()))
|
|
||||||
|
|
||||||
resp, err = requestWithRetry(ctx, []string{"node2", "node3"}, cl, fn)
|
err = cl.requestWithRetry(makeFn([]string{"node2", "node3"}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, "node0", resp)
|
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
|
||||||
require.Equal(t, 0, int(cl.startIndex.Load()))
|
atomic.StoreInt32(&cl.startIndex, 0)
|
||||||
cl.startIndex.Store(0)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue