feature/74-tree_round_robin #77
2 changed files with 63 additions and 70 deletions
|
@ -73,7 +73,7 @@ type ServiceClientGRPC struct {
|
|||
key *keys.PrivateKey
|
||||
log *zap.Logger
|
||||
clients []treeClient
|
||||
startIndex atomic.Int32
|
||||
startIndex int32
|
||||
}
|
||||
|
||||
type treeClient struct {
|
||||
|
@ -146,11 +146,12 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams
|
|||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetNodeByPathRequest, *grpcService.GetNodeByPathResponse] {
|
||||
return client.service.GetNodeByPath
|
||||
})
|
||||
if err != nil {
|
||||
return nil, handleError("failed to get node by path", err)
|
||||
var resp *grpcService.GetNodeByPathResponse
|
||||
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
|
||||
resp, inErr = client.service.GetNodeByPath(ctx, request)
|
||||
return handleError("failed to get node by path", inErr)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
|
||||
|
@ -181,11 +182,12 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket
|
|||
return nil, err
|
||||
}
|
||||
|
||||
cli, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetSubTreeRequest, grpcService.TreeService_GetSubTreeClient] {
|
||||
return client.service.GetSubTree
|
||||
})
|
||||
if err != nil {
|
||||
return nil, handleError("failed to get sub tree client", err)
|
||||
var cli grpcService.TreeService_GetSubTreeClient
|
||||
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
|
||||
cli, inErr = client.service.GetSubTree(ctx, request)
|
||||
return handleError("failed to get sub tree client", inErr)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var subtree []tree.NodeResponse
|
||||
|
@ -221,11 +223,12 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf
|
|||
return 0, err
|
||||
}
|
||||
|
||||
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddRequest, *grpcService.AddResponse] {
|
||||
return client.service.Add
|
||||
})
|
||||
if err != nil {
|
||||
return 0, handleError("failed to add node", err)
|
||||
var resp *grpcService.AddResponse
|
||||
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
|
||||
resp, inErr = client.service.Add(ctx, request)
|
||||
return handleError("failed to add node", inErr)
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return resp.GetBody().GetNodeId(), nil
|
||||
|
@ -252,11 +255,12 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc
|
|||
return 0, err
|
||||
}
|
||||
|
||||
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddByPathRequest, *grpcService.AddByPathResponse] {
|
||||
return client.service.AddByPath
|
||||
})
|
||||
if err != nil {
|
||||
return 0, handleError("failed to add node by path", err)
|
||||
var resp *grpcService.AddByPathResponse
|
||||
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
|
||||
resp, inErr = client.service.AddByPath(ctx, request)
|
||||
return handleError("failed to add node by path", inErr)
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
body := resp.GetBody()
|
||||
|
@ -291,14 +295,12 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn
|
|||
return err
|
||||
}
|
||||
|
||||
_, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.MoveRequest, *grpcService.MoveResponse] {
|
||||
return client.service.Move
|
||||
return c.requestWithRetry(func(client treeClient) error {
|
||||
if _, err := client.service.Move(ctx, request); err != nil {
|
||||
return handleError("failed to move node", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return handleError("failed to move node", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
||||
|
@ -319,31 +321,27 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket
|
|||
return err
|
||||
}
|
||||
|
||||
_, err := requestWithRetry(ctx, request, c, func(cl treeClient) grpcFunc[*grpcService.RemoveRequest, *grpcService.RemoveResponse] {
|
||||
return cl.service.Remove
|
||||
return c.requestWithRetry(func(client treeClient) error {
|
||||
if _, err := client.service.Remove(ctx, request); err != nil {
|
||||
return handleError("failed to remove node", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return handleError("failed to remove node", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type grpcFunc[Req any, Resp any] func(context.Context, Req, ...grpc.CallOption) (Resp, error)
|
||||
|
||||
func requestWithRetry[Req any, Resp any](ctx context.Context, req Req, c *ServiceClientGRPC, fn func(client treeClient) grpcFunc[Req, Resp]) (res Resp, err error) {
|
||||
start := int(c.startIndex.Load())
|
||||
func (c *ServiceClientGRPC) requestWithRetry(fn func(client treeClient) error) (err error) {
|
||||
start := int(atomic.LoadInt32(&c.startIndex))
|
||||
|
||||
for i := start; i < start+len(c.clients); i++ {
|
||||
index := i % len(c.clients)
|
||||
res, err = fn(c.clients[index])(ctx, req)
|
||||
err = fn(c.clients[index])
|
||||
if !shouldTryAgain(err) {
|
||||
c.startIndex.Store(int32(index))
|
||||
return res, err
|
||||
atomic.StoreInt32(&c.startIndex, int32(index))
|
||||
return err
|
||||
}
|
||||
c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err))
|
||||
}
|
||||
|
||||
return res, err
|
||||
return err
|
||||
}
|
||||
|
||||
func shouldTryAgain(err error) bool {
|
||||
|
@ -396,6 +394,9 @@ func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
|
|||
}
|
||||
|
||||
func handleError(msg string, err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if strings.Contains(err.Error(), "not found") {
|
||||
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
|
||||
} else if strings.Contains(err.Error(), "is denied by") {
|
||||
|
|
|
@ -1,14 +1,13 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func TestHandleError(t *testing.T) {
|
||||
|
@ -38,8 +37,6 @@ func TestHandleError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRetry(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
cl := &ServiceClientGRPC{
|
||||
log: zaptest.NewLogger(t),
|
||||
clients: []treeClient{
|
||||
|
@ -50,51 +47,46 @@ func TestRetry(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
fn := func(client treeClient) grpcFunc[[]string, string] {
|
||||
return func(ctx context.Context, shouldFail []string, opts ...grpc.CallOption) (string, error) {
|
||||
makeFn := func(shouldFail []string) func(treeClient) error {
|
||||
return func(client treeClient) error {
|
||||
for _, item := range shouldFail {
|
||||
if item == client.address {
|
||||
return "", errors.New("not found")
|
||||
return errors.New("not found")
|
||||
}
|
||||
}
|
||||
return client.address, nil
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("first ok", func(t *testing.T) {
|
||||
resp, err := requestWithRetry(ctx, []string{}, cl, fn)
|
||||
err := cl.requestWithRetry(makeFn([]string{}))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "node0", resp)
|
||||
require.Equal(t, 0, int(cl.startIndex.Load()))
|
||||
cl.startIndex.Store(0)
|
||||
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
|
||||
atomic.StoreInt32(&cl.startIndex, 0)
|
||||
})
|
||||
|
||||
t.Run("first failed", func(t *testing.T) {
|
||||
resp, err := requestWithRetry(ctx, []string{"node0"}, cl, fn)
|
||||
err := cl.requestWithRetry(makeFn([]string{"node0"}))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "node1", resp)
|
||||
require.Equal(t, 1, int(cl.startIndex.Load()))
|
||||
cl.startIndex.Store(0)
|
||||
require.Equal(t, 1, int(atomic.LoadInt32(&cl.startIndex)))
|
||||
atomic.StoreInt32(&cl.startIndex, 0)
|
||||
})
|
||||
|
||||
t.Run("all failed", func(t *testing.T) {
|
||||
resp, err := requestWithRetry(ctx, []string{"node0", "node1", "node2", "node3"}, cl, fn)
|
||||
err := cl.requestWithRetry(makeFn([]string{"node0", "node1", "node2", "node3"}))
|
||||
require.Error(t, err)
|
||||
require.Equal(t, "", resp)
|
||||
require.Equal(t, 0, int(cl.startIndex.Load()))
|
||||
cl.startIndex.Store(0)
|
||||
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
|
||||
atomic.StoreInt32(&cl.startIndex, 0)
|
||||
})
|
||||
|
||||
t.Run("round", func(t *testing.T) {
|
||||
resp, err := requestWithRetry(ctx, []string{"node0", "node1"}, cl, fn)
|
||||
err := cl.requestWithRetry(makeFn([]string{"node0", "node1"}))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "node2", resp)
|
||||
require.Equal(t, 2, int(cl.startIndex.Load()))
|
||||
require.Equal(t, 2, int(atomic.LoadInt32(&cl.startIndex)))
|
||||
|
||||
resp, err = requestWithRetry(ctx, []string{"node2", "node3"}, cl, fn)
|
||||
err = cl.requestWithRetry(makeFn([]string{"node2", "node3"}))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "node0", resp)
|
||||
require.Equal(t, 0, int(cl.startIndex.Load()))
|
||||
cl.startIndex.Store(0)
|
||||
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
|
||||
atomic.StoreInt32(&cl.startIndex, 0)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue
Current implementation uses any available client in the
c.clients
that comes up first thus we use the clients in the beginning more often that the ones in the end of it. Also, we iterate through some part of the list on each request going throughrequestWithRetry
. I don't insist but may be we should change this?All implementations of round-robin approaches that I know suppose a cycling system: we go through a set of resources and distribute them one by one from start of a "list" to the end, and then we start over again. This way we achieve somewhat equal utilisation and don't have to look through a "list" of resources every time we need one.
Example of a round-robin approach: https://en.wikipedia.org/wiki/Round-robin_scheduling
Good point. I'll change behavior to start iterate client at the index where we stop previously.
As I understand we cannot drop iterating clients at all because we want to retry failed request
/cc @alexvanin @a.bogatyrev