feature/74-tree_round_robin #77

Merged
realloc merged 1 commit from dkirillov/frostfs-s3-gw:feature/74-tree_round_robin into master 2023-07-26 21:08:00 +00:00
2 changed files with 63 additions and 70 deletions
Showing only changes of commit 69d8779daf - Show all commits

View file

@ -73,7 +73,7 @@ type ServiceClientGRPC struct {
key *keys.PrivateKey key *keys.PrivateKey
log *zap.Logger log *zap.Logger
clients []treeClient clients []treeClient
startIndex atomic.Int32 startIndex int32
} }
type treeClient struct { type treeClient struct {
@ -146,11 +146,12 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams
return nil, err return nil, err
} }
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetNodeByPathRequest, *grpcService.GetNodeByPathResponse] { var resp *grpcService.GetNodeByPathResponse
return client.service.GetNodeByPath if err := c.requestWithRetry(func(client treeClient) (inErr error) {
}) resp, inErr = client.service.GetNodeByPath(ctx, request)
if err != nil { return handleError("failed to get node by path", inErr)
return nil, handleError("failed to get node by path", err) }); err != nil {
return nil, err
} }
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes())) res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
@ -181,11 +182,12 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket
return nil, err return nil, err
} }
cli, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetSubTreeRequest, grpcService.TreeService_GetSubTreeClient] { var cli grpcService.TreeService_GetSubTreeClient
return client.service.GetSubTree if err := c.requestWithRetry(func(client treeClient) (inErr error) {
}) cli, inErr = client.service.GetSubTree(ctx, request)
if err != nil { return handleError("failed to get sub tree client", inErr)
return nil, handleError("failed to get sub tree client", err) }); err != nil {
return nil, err
} }
var subtree []tree.NodeResponse var subtree []tree.NodeResponse
@ -221,11 +223,12 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf
return 0, err return 0, err
} }
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddRequest, *grpcService.AddResponse] { var resp *grpcService.AddResponse
return client.service.Add if err := c.requestWithRetry(func(client treeClient) (inErr error) {
}) resp, inErr = client.service.Add(ctx, request)
if err != nil { return handleError("failed to add node", inErr)
return 0, handleError("failed to add node", err) }); err != nil {
return 0, err
} }
return resp.GetBody().GetNodeId(), nil return resp.GetBody().GetNodeId(), nil
@ -252,11 +255,12 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc
return 0, err return 0, err
} }
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddByPathRequest, *grpcService.AddByPathResponse] { var resp *grpcService.AddByPathResponse
return client.service.AddByPath if err := c.requestWithRetry(func(client treeClient) (inErr error) {
}) resp, inErr = client.service.AddByPath(ctx, request)
if err != nil { return handleError("failed to add node by path", inErr)
return 0, handleError("failed to add node by path", err) }); err != nil {
return 0, err
} }
body := resp.GetBody() body := resp.GetBody()
@ -291,14 +295,12 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn
return err return err
} }
_, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.MoveRequest, *grpcService.MoveResponse] { return c.requestWithRetry(func(client treeClient) error {
return client.service.Move if _, err := client.service.Move(ctx, request); err != nil {
return handleError("failed to move node", err)
}
return nil
}) })
if err != nil {
return handleError("failed to move node", err)
}
return nil
} }
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error { func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
@ -319,31 +321,27 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket
return err return err
} }
_, err := requestWithRetry(ctx, request, c, func(cl treeClient) grpcFunc[*grpcService.RemoveRequest, *grpcService.RemoveResponse] { return c.requestWithRetry(func(client treeClient) error {
return cl.service.Remove if _, err := client.service.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err)
}
return nil
}) })
if err != nil {
return handleError("failed to remove node", err)
}
return nil
} }
type grpcFunc[Req any, Resp any] func(context.Context, Req, ...grpc.CallOption) (Resp, error) func (c *ServiceClientGRPC) requestWithRetry(fn func(client treeClient) error) (err error) {
start := int(atomic.LoadInt32(&c.startIndex))
func requestWithRetry[Req any, Resp any](ctx context.Context, req Req, c *ServiceClientGRPC, fn func(client treeClient) grpcFunc[Req, Resp]) (res Resp, err error) {
start := int(c.startIndex.Load())
for i := start; i < start+len(c.clients); i++ { for i := start; i < start+len(c.clients); i++ {
index := i % len(c.clients) index := i % len(c.clients)
res, err = fn(c.clients[index])(ctx, req) err = fn(c.clients[index])
if !shouldTryAgain(err) { if !shouldTryAgain(err) {
c.startIndex.Store(int32(index)) atomic.StoreInt32(&c.startIndex, int32(index))
return res, err return err
} }
c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err)) c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err))
} }
return res, err return err
} }
func shouldTryAgain(err error) bool { func shouldTryAgain(err error) bool {
@ -396,6 +394,9 @@ func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
} }
func handleError(msg string, err error) error { func handleError(msg string, err error) error {
if err == nil {
return nil
}
if strings.Contains(err.Error(), "not found") { if strings.Contains(err.Error(), "not found") {
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error()) return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
} else if strings.Contains(err.Error(), "is denied by") { } else if strings.Contains(err.Error(), "is denied by") {

View file

@ -1,14 +1,13 @@
package services package services
import ( import (
"context"
"errors" "errors"
"sync/atomic"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
"google.golang.org/grpc"
) )
func TestHandleError(t *testing.T) { func TestHandleError(t *testing.T) {
@ -38,8 +37,6 @@ func TestHandleError(t *testing.T) {
} }
func TestRetry(t *testing.T) { func TestRetry(t *testing.T) {
ctx := context.Background()
cl := &ServiceClientGRPC{ cl := &ServiceClientGRPC{
log: zaptest.NewLogger(t), log: zaptest.NewLogger(t),
clients: []treeClient{ clients: []treeClient{
@ -50,51 +47,46 @@ func TestRetry(t *testing.T) {
}, },
} }
fn := func(client treeClient) grpcFunc[[]string, string] { makeFn := func(shouldFail []string) func(treeClient) error {
return func(ctx context.Context, shouldFail []string, opts ...grpc.CallOption) (string, error) { return func(client treeClient) error {
for _, item := range shouldFail { for _, item := range shouldFail {
if item == client.address { if item == client.address {
return "", errors.New("not found") return errors.New("not found")
} }
} }
return client.address, nil return nil
} }
} }
t.Run("first ok", func(t *testing.T) { t.Run("first ok", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{}, cl, fn) err := cl.requestWithRetry(makeFn([]string{}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "node0", resp) require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
require.Equal(t, 0, int(cl.startIndex.Load())) atomic.StoreInt32(&cl.startIndex, 0)
cl.startIndex.Store(0)
}) })
t.Run("first failed", func(t *testing.T) { t.Run("first failed", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{"node0"}, cl, fn) err := cl.requestWithRetry(makeFn([]string{"node0"}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "node1", resp) require.Equal(t, 1, int(atomic.LoadInt32(&cl.startIndex)))
require.Equal(t, 1, int(cl.startIndex.Load())) atomic.StoreInt32(&cl.startIndex, 0)
cl.startIndex.Store(0)
}) })
t.Run("all failed", func(t *testing.T) { t.Run("all failed", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{"node0", "node1", "node2", "node3"}, cl, fn) err := cl.requestWithRetry(makeFn([]string{"node0", "node1", "node2", "node3"}))
require.Error(t, err) require.Error(t, err)
require.Equal(t, "", resp) require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
require.Equal(t, 0, int(cl.startIndex.Load())) atomic.StoreInt32(&cl.startIndex, 0)
cl.startIndex.Store(0)
}) })
t.Run("round", func(t *testing.T) { t.Run("round", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{"node0", "node1"}, cl, fn) err := cl.requestWithRetry(makeFn([]string{"node0", "node1"}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "node2", resp) require.Equal(t, 2, int(atomic.LoadInt32(&cl.startIndex)))
require.Equal(t, 2, int(cl.startIndex.Load()))
resp, err = requestWithRetry(ctx, []string{"node2", "node3"}, cl, fn) err = cl.requestWithRetry(makeFn([]string{"node2", "node3"}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "node0", resp) require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
require.Equal(t, 0, int(cl.startIndex.Load())) atomic.StoreInt32(&cl.startIndex, 0)
cl.startIndex.Store(0)
}) })
} }