feature/74-tree_round_robin #77

Merged
realloc merged 1 commit from dkirillov/frostfs-s3-gw:feature/74-tree_round_robin into master 2023-07-26 21:08:00 +00:00
2 changed files with 63 additions and 70 deletions

View file

@ -73,7 +73,7 @@ type ServiceClientGRPC struct {
key *keys.PrivateKey
log *zap.Logger
clients []treeClient
startIndex atomic.Int32
startIndex int32
}
type treeClient struct {
@ -146,11 +146,12 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams
return nil, err
}
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetNodeByPathRequest, *grpcService.GetNodeByPathResponse] {
return client.service.GetNodeByPath
})
if err != nil {
return nil, handleError("failed to get node by path", err)
var resp *grpcService.GetNodeByPathResponse
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
resp, inErr = client.service.GetNodeByPath(ctx, request)
return handleError("failed to get node by path", inErr)
}); err != nil {
return nil, err
}
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
@ -181,11 +182,12 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket
return nil, err
}
cli, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetSubTreeRequest, grpcService.TreeService_GetSubTreeClient] {
return client.service.GetSubTree
})
if err != nil {
return nil, handleError("failed to get sub tree client", err)
var cli grpcService.TreeService_GetSubTreeClient
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
cli, inErr = client.service.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr)
}); err != nil {
return nil, err
}
var subtree []tree.NodeResponse
@ -221,11 +223,12 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf
return 0, err
}
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddRequest, *grpcService.AddResponse] {
return client.service.Add
})
if err != nil {
return 0, handleError("failed to add node", err)
var resp *grpcService.AddResponse
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
resp, inErr = client.service.Add(ctx, request)
return handleError("failed to add node", inErr)
}); err != nil {
return 0, err
}
return resp.GetBody().GetNodeId(), nil
@ -252,11 +255,12 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc
return 0, err
}
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddByPathRequest, *grpcService.AddByPathResponse] {
return client.service.AddByPath
})
if err != nil {
return 0, handleError("failed to add node by path", err)
var resp *grpcService.AddByPathResponse
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
resp, inErr = client.service.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr)
}); err != nil {
return 0, err
}
body := resp.GetBody()
@ -291,14 +295,12 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn
return err
}
_, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.MoveRequest, *grpcService.MoveResponse] {
return client.service.Move
return c.requestWithRetry(func(client treeClient) error {
if _, err := client.service.Move(ctx, request); err != nil {
return handleError("failed to move node", err)
}
return nil
})
if err != nil {
return handleError("failed to move node", err)
}
return nil
}
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
@ -319,31 +321,27 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket
return err
}
_, err := requestWithRetry(ctx, request, c, func(cl treeClient) grpcFunc[*grpcService.RemoveRequest, *grpcService.RemoveResponse] {
return cl.service.Remove
return c.requestWithRetry(func(client treeClient) error {
if _, err := client.service.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err)
}
return nil
})
if err != nil {
return handleError("failed to remove node", err)
}
return nil
}
type grpcFunc[Req any, Resp any] func(context.Context, Req, ...grpc.CallOption) (Resp, error)
func requestWithRetry[Req any, Resp any](ctx context.Context, req Req, c *ServiceClientGRPC, fn func(client treeClient) grpcFunc[Req, Resp]) (res Resp, err error) {
start := int(c.startIndex.Load())
func (c *ServiceClientGRPC) requestWithRetry(fn func(client treeClient) error) (err error) {
start := int(atomic.LoadInt32(&c.startIndex))
for i := start; i < start+len(c.clients); i++ {
index := i % len(c.clients)
res, err = fn(c.clients[index])(ctx, req)
err = fn(c.clients[index])
if !shouldTryAgain(err) {
c.startIndex.Store(int32(index))
return res, err
atomic.StoreInt32(&c.startIndex, int32(index))
return err
}
c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err))
}
return res, err
return err
}
func shouldTryAgain(err error) bool {
@ -396,6 +394,9 @@ func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
}
func handleError(msg string, err error) error {
if err == nil {
return nil
}
if strings.Contains(err.Error(), "not found") {
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
} else if strings.Contains(err.Error(), "is denied by") {

View file

@ -1,14 +1,13 @@
package services
import (
"context"
"errors"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
)
func TestHandleError(t *testing.T) {
@ -38,8 +37,6 @@ func TestHandleError(t *testing.T) {
}
func TestRetry(t *testing.T) {
ctx := context.Background()
cl := &ServiceClientGRPC{
log: zaptest.NewLogger(t),
clients: []treeClient{
@ -50,51 +47,46 @@ func TestRetry(t *testing.T) {
},
}
fn := func(client treeClient) grpcFunc[[]string, string] {
return func(ctx context.Context, shouldFail []string, opts ...grpc.CallOption) (string, error) {
makeFn := func(shouldFail []string) func(treeClient) error {
return func(client treeClient) error {
for _, item := range shouldFail {
if item == client.address {
return "", errors.New("not found")
return errors.New("not found")
}
}
return client.address, nil
return nil
}
}
t.Run("first ok", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{}, cl, fn)
err := cl.requestWithRetry(makeFn([]string{}))
require.NoError(t, err)
require.Equal(t, "node0", resp)
require.Equal(t, 0, int(cl.startIndex.Load()))
cl.startIndex.Store(0)
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
atomic.StoreInt32(&cl.startIndex, 0)
})
t.Run("first failed", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{"node0"}, cl, fn)
err := cl.requestWithRetry(makeFn([]string{"node0"}))
require.NoError(t, err)
require.Equal(t, "node1", resp)
require.Equal(t, 1, int(cl.startIndex.Load()))
cl.startIndex.Store(0)
require.Equal(t, 1, int(atomic.LoadInt32(&cl.startIndex)))
atomic.StoreInt32(&cl.startIndex, 0)
})
t.Run("all failed", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{"node0", "node1", "node2", "node3"}, cl, fn)
err := cl.requestWithRetry(makeFn([]string{"node0", "node1", "node2", "node3"}))
require.Error(t, err)
require.Equal(t, "", resp)
require.Equal(t, 0, int(cl.startIndex.Load()))
cl.startIndex.Store(0)
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
atomic.StoreInt32(&cl.startIndex, 0)
})
t.Run("round", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{"node0", "node1"}, cl, fn)
err := cl.requestWithRetry(makeFn([]string{"node0", "node1"}))
require.NoError(t, err)
require.Equal(t, "node2", resp)
require.Equal(t, 2, int(cl.startIndex.Load()))
require.Equal(t, 2, int(atomic.LoadInt32(&cl.startIndex)))
resp, err = requestWithRetry(ctx, []string{"node2", "node3"}, cl, fn)
err = cl.requestWithRetry(makeFn([]string{"node2", "node3"}))
require.NoError(t, err)
require.Equal(t, "node0", resp)
require.Equal(t, 0, int(cl.startIndex.Load()))
cl.startIndex.Store(0)
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
atomic.StoreInt32(&cl.startIndex, 0)
})
}