feature/74-tree_round_robin #77

Merged
realloc merged 1 commit from dkirillov/frostfs-s3-gw:feature/74-tree_round_robin into master 2023-07-26 21:08:00 +00:00
2 changed files with 63 additions and 70 deletions

View file

@ -73,7 +73,7 @@ type ServiceClientGRPC struct {
key *keys.PrivateKey key *keys.PrivateKey
log *zap.Logger log *zap.Logger
clients []treeClient clients []treeClient
startIndex atomic.Int32 startIndex int32
} }
type treeClient struct { type treeClient struct {
@ -146,11 +146,12 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams
return nil, err return nil, err
} }
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetNodeByPathRequest, *grpcService.GetNodeByPathResponse] { var resp *grpcService.GetNodeByPathResponse
return client.service.GetNodeByPath if err := c.requestWithRetry(func(client treeClient) (inErr error) {
}) resp, inErr = client.service.GetNodeByPath(ctx, request)
if err != nil { return handleError("failed to get node by path", inErr)
return nil, handleError("failed to get node by path", err) }); err != nil {
return nil, err
} }
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes())) res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
@ -181,11 +182,12 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket
return nil, err return nil, err
} }
cli, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetSubTreeRequest, grpcService.TreeService_GetSubTreeClient] { var cli grpcService.TreeService_GetSubTreeClient
return client.service.GetSubTree if err := c.requestWithRetry(func(client treeClient) (inErr error) {
}) cli, inErr = client.service.GetSubTree(ctx, request)
if err != nil { return handleError("failed to get sub tree client", inErr)
return nil, handleError("failed to get sub tree client", err) }); err != nil {
return nil, err
} }
var subtree []tree.NodeResponse var subtree []tree.NodeResponse
@ -221,11 +223,12 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf
return 0, err return 0, err
} }
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddRequest, *grpcService.AddResponse] { var resp *grpcService.AddResponse
return client.service.Add if err := c.requestWithRetry(func(client treeClient) (inErr error) {
}) resp, inErr = client.service.Add(ctx, request)
if err != nil { return handleError("failed to add node", inErr)
return 0, handleError("failed to add node", err) }); err != nil {
return 0, err
} }
return resp.GetBody().GetNodeId(), nil return resp.GetBody().GetNodeId(), nil
@ -252,11 +255,12 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc
return 0, err return 0, err
} }
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddByPathRequest, *grpcService.AddByPathResponse] { var resp *grpcService.AddByPathResponse
return client.service.AddByPath if err := c.requestWithRetry(func(client treeClient) (inErr error) {
}) resp, inErr = client.service.AddByPath(ctx, request)
if err != nil { return handleError("failed to add node by path", inErr)
return 0, handleError("failed to add node by path", err) }); err != nil {
return 0, err
} }
body := resp.GetBody() body := resp.GetBody()
@ -291,14 +295,12 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn
return err return err
} }
_, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.MoveRequest, *grpcService.MoveResponse] { return c.requestWithRetry(func(client treeClient) error {
return client.service.Move if _, err := client.service.Move(ctx, request); err != nil {
return handleError("failed to move node", err)
}
return nil
}) })
if err != nil {
return handleError("failed to move node", err)
}
return nil
} }
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error { func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
@ -319,31 +321,27 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket
return err return err
} }
_, err := requestWithRetry(ctx, request, c, func(cl treeClient) grpcFunc[*grpcService.RemoveRequest, *grpcService.RemoveResponse] { return c.requestWithRetry(func(client treeClient) error {
return cl.service.Remove if _, err := client.service.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err)
}
return nil
}) })
if err != nil {
return handleError("failed to remove node", err)
}
return nil
} }
type grpcFunc[Req any, Resp any] func(context.Context, Req, ...grpc.CallOption) (Resp, error) func (c *ServiceClientGRPC) requestWithRetry(fn func(client treeClient) error) (err error) {
start := int(atomic.LoadInt32(&c.startIndex))

Current implementation uses any available client in the c.clients that comes up first thus we use the clients in the beginning more often that the ones in the end of it. Also, we iterate through some part of the list on each request going through requestWithRetry. I don't insist but may be we should change this?

All implementations of round-robin approaches that I know suppose a cycling system: we go through a set of resources and distribute them one by one from start of a "list" to the end, and then we start over again. This way we achieve somewhat equal utilisation and don't have to look through a "list" of resources every time we need one.

Example of a round-robin approach: https://en.wikipedia.org/wiki/Round-robin_scheduling

Current implementation uses any available client in the `c.clients` that comes up first thus we use the clients in the beginning more often that the ones in the end of it. Also, we iterate through some part of the list on each request going through `requestWithRetry`. I don't insist but may be we should change this? All implementations of round-robin approaches that I know suppose a cycling system: we go through a set of resources and distribute them one by one from start of a "list" to the end, and then we start over again. This way we achieve somewhat equal utilisation and don't have to look through a "list" of resources every time we need one. Example of a round-robin approach: https://en.wikipedia.org/wiki/Round-robin_scheduling

Good point. I'll change behavior to start iterate client at the index where we stop previously.

As I understand we cannot drop iterating clients at all because we want to retry failed request

/cc @alexvanin @a.bogatyrev

Good point. I'll change behavior to start iterate client at the index where we stop previously. As I understand we cannot drop iterating clients at all because we want to retry failed request /cc @alexvanin @a.bogatyrev
func requestWithRetry[Req any, Resp any](ctx context.Context, req Req, c *ServiceClientGRPC, fn func(client treeClient) grpcFunc[Req, Resp]) (res Resp, err error) {
start := int(c.startIndex.Load())
for i := start; i < start+len(c.clients); i++ { for i := start; i < start+len(c.clients); i++ {
index := i % len(c.clients) index := i % len(c.clients)
res, err = fn(c.clients[index])(ctx, req) err = fn(c.clients[index])
if !shouldTryAgain(err) { if !shouldTryAgain(err) {
c.startIndex.Store(int32(index)) atomic.StoreInt32(&c.startIndex, int32(index))
return res, err return err
} }
c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err)) c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err))
} }
return res, err return err
} }
func shouldTryAgain(err error) bool { func shouldTryAgain(err error) bool {
@ -396,6 +394,9 @@ func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
} }
func handleError(msg string, err error) error { func handleError(msg string, err error) error {
if err == nil {
return nil
}
if strings.Contains(err.Error(), "not found") { if strings.Contains(err.Error(), "not found") {
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error()) return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
} else if strings.Contains(err.Error(), "is denied by") { } else if strings.Contains(err.Error(), "is denied by") {

View file

@ -1,14 +1,13 @@
package services package services
import ( import (
"context"
"errors" "errors"
"sync/atomic"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
"google.golang.org/grpc"
) )
func TestHandleError(t *testing.T) { func TestHandleError(t *testing.T) {
@ -38,8 +37,6 @@ func TestHandleError(t *testing.T) {
} }
func TestRetry(t *testing.T) { func TestRetry(t *testing.T) {
ctx := context.Background()
cl := &ServiceClientGRPC{ cl := &ServiceClientGRPC{
log: zaptest.NewLogger(t), log: zaptest.NewLogger(t),
clients: []treeClient{ clients: []treeClient{
@ -50,51 +47,46 @@ func TestRetry(t *testing.T) {
}, },
} }
fn := func(client treeClient) grpcFunc[[]string, string] { makeFn := func(shouldFail []string) func(treeClient) error {
return func(ctx context.Context, shouldFail []string, opts ...grpc.CallOption) (string, error) { return func(client treeClient) error {
for _, item := range shouldFail { for _, item := range shouldFail {
if item == client.address { if item == client.address {
return "", errors.New("not found") return errors.New("not found")
} }
} }
return client.address, nil return nil
} }
} }
t.Run("first ok", func(t *testing.T) { t.Run("first ok", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{}, cl, fn) err := cl.requestWithRetry(makeFn([]string{}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "node0", resp) require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
require.Equal(t, 0, int(cl.startIndex.Load())) atomic.StoreInt32(&cl.startIndex, 0)
cl.startIndex.Store(0)
}) })
t.Run("first failed", func(t *testing.T) { t.Run("first failed", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{"node0"}, cl, fn) err := cl.requestWithRetry(makeFn([]string{"node0"}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "node1", resp) require.Equal(t, 1, int(atomic.LoadInt32(&cl.startIndex)))
require.Equal(t, 1, int(cl.startIndex.Load())) atomic.StoreInt32(&cl.startIndex, 0)
cl.startIndex.Store(0)
}) })
t.Run("all failed", func(t *testing.T) { t.Run("all failed", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{"node0", "node1", "node2", "node3"}, cl, fn) err := cl.requestWithRetry(makeFn([]string{"node0", "node1", "node2", "node3"}))
require.Error(t, err) require.Error(t, err)
require.Equal(t, "", resp) require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
require.Equal(t, 0, int(cl.startIndex.Load())) atomic.StoreInt32(&cl.startIndex, 0)
cl.startIndex.Store(0)
}) })
t.Run("round", func(t *testing.T) { t.Run("round", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{"node0", "node1"}, cl, fn) err := cl.requestWithRetry(makeFn([]string{"node0", "node1"}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "node2", resp) require.Equal(t, 2, int(atomic.LoadInt32(&cl.startIndex)))
require.Equal(t, 2, int(cl.startIndex.Load()))
resp, err = requestWithRetry(ctx, []string{"node2", "node3"}, cl, fn) err = cl.requestWithRetry(makeFn([]string{"node2", "node3"}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "node0", resp) require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
require.Equal(t, 0, int(cl.startIndex.Load())) atomic.StoreInt32(&cl.startIndex, 0)
cl.startIndex.Store(0)
}) })
} }