feature/74-tree_round_robin #77

Merged
realloc merged 1 commit from dkirillov/frostfs-s3-gw:feature/74-tree_round_robin into master 2023-07-26 21:08:00 +00:00
2 changed files with 63 additions and 70 deletions

View file

@ -73,7 +73,7 @@ type ServiceClientGRPC struct {
key *keys.PrivateKey
log *zap.Logger
clients []treeClient
startIndex atomic.Int32
startIndex int32
}
type treeClient struct {
@ -146,11 +146,12 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams
return nil, err
}
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetNodeByPathRequest, *grpcService.GetNodeByPathResponse] {
return client.service.GetNodeByPath
})
if err != nil {
return nil, handleError("failed to get node by path", err)
var resp *grpcService.GetNodeByPathResponse
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
resp, inErr = client.service.GetNodeByPath(ctx, request)
return handleError("failed to get node by path", inErr)
}); err != nil {
return nil, err
}
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
@ -181,11 +182,12 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket
return nil, err
}
cli, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetSubTreeRequest, grpcService.TreeService_GetSubTreeClient] {
return client.service.GetSubTree
})
if err != nil {
return nil, handleError("failed to get sub tree client", err)
var cli grpcService.TreeService_GetSubTreeClient
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
cli, inErr = client.service.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr)
}); err != nil {
return nil, err
}
var subtree []tree.NodeResponse
@ -221,11 +223,12 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf
return 0, err
}
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddRequest, *grpcService.AddResponse] {
return client.service.Add
})
if err != nil {
return 0, handleError("failed to add node", err)
var resp *grpcService.AddResponse
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
resp, inErr = client.service.Add(ctx, request)
return handleError("failed to add node", inErr)
}); err != nil {
return 0, err
}
return resp.GetBody().GetNodeId(), nil
@ -252,11 +255,12 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc
return 0, err
}
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddByPathRequest, *grpcService.AddByPathResponse] {
return client.service.AddByPath
})
if err != nil {
return 0, handleError("failed to add node by path", err)
var resp *grpcService.AddByPathResponse
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
resp, inErr = client.service.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr)
}); err != nil {
return 0, err
}
body := resp.GetBody()
@ -291,14 +295,12 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn
return err
}
_, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.MoveRequest, *grpcService.MoveResponse] {
return client.service.Move
return c.requestWithRetry(func(client treeClient) error {
if _, err := client.service.Move(ctx, request); err != nil {
return handleError("failed to move node", err)
}
return nil
})
if err != nil {
return handleError("failed to move node", err)
}
return nil
}
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
@ -319,31 +321,27 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket
return err
}
_, err := requestWithRetry(ctx, request, c, func(cl treeClient) grpcFunc[*grpcService.RemoveRequest, *grpcService.RemoveResponse] {
return cl.service.Remove
return c.requestWithRetry(func(client treeClient) error {
if _, err := client.service.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err)
}
return nil
})
if err != nil {
return handleError("failed to remove node", err)
}
return nil
}
type grpcFunc[Req any, Resp any] func(context.Context, Req, ...grpc.CallOption) (Resp, error)
func requestWithRetry[Req any, Resp any](ctx context.Context, req Req, c *ServiceClientGRPC, fn func(client treeClient) grpcFunc[Req, Resp]) (res Resp, err error) {
start := int(c.startIndex.Load())
func (c *ServiceClientGRPC) requestWithRetry(fn func(client treeClient) error) (err error) {
start := int(atomic.LoadInt32(&c.startIndex))

Current implementation uses any available client in the c.clients that comes up first thus we use the clients in the beginning more often that the ones in the end of it. Also, we iterate through some part of the list on each request going through requestWithRetry. I don't insist but may be we should change this?

All implementations of round-robin approaches that I know suppose a cycling system: we go through a set of resources and distribute them one by one from start of a "list" to the end, and then we start over again. This way we achieve somewhat equal utilisation and don't have to look through a "list" of resources every time we need one.

Example of a round-robin approach: https://en.wikipedia.org/wiki/Round-robin_scheduling

Current implementation uses any available client in the `c.clients` that comes up first thus we use the clients in the beginning more often that the ones in the end of it. Also, we iterate through some part of the list on each request going through `requestWithRetry`. I don't insist but may be we should change this? All implementations of round-robin approaches that I know suppose a cycling system: we go through a set of resources and distribute them one by one from start of a "list" to the end, and then we start over again. This way we achieve somewhat equal utilisation and don't have to look through a "list" of resources every time we need one. Example of a round-robin approach: https://en.wikipedia.org/wiki/Round-robin_scheduling

Good point. I'll change behavior to start iterate client at the index where we stop previously.

As I understand we cannot drop iterating clients at all because we want to retry failed request

/cc @alexvanin @a.bogatyrev

Good point. I'll change behavior to start iterate client at the index where we stop previously. As I understand we cannot drop iterating clients at all because we want to retry failed request /cc @alexvanin @a.bogatyrev
for i := start; i < start+len(c.clients); i++ {
index := i % len(c.clients)
res, err = fn(c.clients[index])(ctx, req)
err = fn(c.clients[index])
if !shouldTryAgain(err) {
c.startIndex.Store(int32(index))
return res, err
atomic.StoreInt32(&c.startIndex, int32(index))
return err
}
c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err))
}
return res, err
return err
}
func shouldTryAgain(err error) bool {
@ -396,6 +394,9 @@ func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
}
func handleError(msg string, err error) error {
if err == nil {
return nil
}
if strings.Contains(err.Error(), "not found") {
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
} else if strings.Contains(err.Error(), "is denied by") {

View file

@ -1,14 +1,13 @@
package services
import (
"context"
"errors"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
)
func TestHandleError(t *testing.T) {
@ -38,8 +37,6 @@ func TestHandleError(t *testing.T) {
}
func TestRetry(t *testing.T) {
ctx := context.Background()
cl := &ServiceClientGRPC{
log: zaptest.NewLogger(t),
clients: []treeClient{
@ -50,51 +47,46 @@ func TestRetry(t *testing.T) {
},
}
fn := func(client treeClient) grpcFunc[[]string, string] {
return func(ctx context.Context, shouldFail []string, opts ...grpc.CallOption) (string, error) {
makeFn := func(shouldFail []string) func(treeClient) error {
return func(client treeClient) error {
for _, item := range shouldFail {
if item == client.address {
return "", errors.New("not found")
return errors.New("not found")
}
}
return client.address, nil
return nil
}
}
t.Run("first ok", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{}, cl, fn)
err := cl.requestWithRetry(makeFn([]string{}))
require.NoError(t, err)
require.Equal(t, "node0", resp)
require.Equal(t, 0, int(cl.startIndex.Load()))
cl.startIndex.Store(0)
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
atomic.StoreInt32(&cl.startIndex, 0)
})
t.Run("first failed", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{"node0"}, cl, fn)
err := cl.requestWithRetry(makeFn([]string{"node0"}))
require.NoError(t, err)
require.Equal(t, "node1", resp)
require.Equal(t, 1, int(cl.startIndex.Load()))
cl.startIndex.Store(0)
require.Equal(t, 1, int(atomic.LoadInt32(&cl.startIndex)))
atomic.StoreInt32(&cl.startIndex, 0)
})
t.Run("all failed", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{"node0", "node1", "node2", "node3"}, cl, fn)
err := cl.requestWithRetry(makeFn([]string{"node0", "node1", "node2", "node3"}))
require.Error(t, err)
require.Equal(t, "", resp)
require.Equal(t, 0, int(cl.startIndex.Load()))
cl.startIndex.Store(0)
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
atomic.StoreInt32(&cl.startIndex, 0)
})
t.Run("round", func(t *testing.T) {
resp, err := requestWithRetry(ctx, []string{"node0", "node1"}, cl, fn)
err := cl.requestWithRetry(makeFn([]string{"node0", "node1"}))
require.NoError(t, err)
require.Equal(t, "node2", resp)
require.Equal(t, 2, int(cl.startIndex.Load()))
require.Equal(t, 2, int(atomic.LoadInt32(&cl.startIndex)))
resp, err = requestWithRetry(ctx, []string{"node2", "node3"}, cl, fn)
err = cl.requestWithRetry(makeFn([]string{"node2", "node3"}))
require.NoError(t, err)
require.Equal(t, "node0", resp)
require.Equal(t, 0, int(cl.startIndex.Load()))
cl.startIndex.Store(0)
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
atomic.StoreInt32(&cl.startIndex, 0)
})
}