feature/74-tree_round_robin #77
2 changed files with 63 additions and 70 deletions
|
@ -73,7 +73,7 @@ type ServiceClientGRPC struct {
|
||||||
key *keys.PrivateKey
|
key *keys.PrivateKey
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
clients []treeClient
|
clients []treeClient
|
||||||
startIndex atomic.Int32
|
startIndex int32
|
||||||
}
|
}
|
||||||
|
|
||||||
type treeClient struct {
|
type treeClient struct {
|
||||||
|
@ -146,11 +146,12 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetNodeByPathRequest, *grpcService.GetNodeByPathResponse] {
|
var resp *grpcService.GetNodeByPathResponse
|
||||||
return client.service.GetNodeByPath
|
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
|
||||||
})
|
resp, inErr = client.service.GetNodeByPath(ctx, request)
|
||||||
if err != nil {
|
return handleError("failed to get node by path", inErr)
|
||||||
return nil, handleError("failed to get node by path", err)
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
|
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
|
||||||
|
@ -181,11 +182,12 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cli, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetSubTreeRequest, grpcService.TreeService_GetSubTreeClient] {
|
var cli grpcService.TreeService_GetSubTreeClient
|
||||||
return client.service.GetSubTree
|
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
|
||||||
})
|
cli, inErr = client.service.GetSubTree(ctx, request)
|
||||||
if err != nil {
|
return handleError("failed to get sub tree client", inErr)
|
||||||
return nil, handleError("failed to get sub tree client", err)
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var subtree []tree.NodeResponse
|
var subtree []tree.NodeResponse
|
||||||
|
@ -221,11 +223,12 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddRequest, *grpcService.AddResponse] {
|
var resp *grpcService.AddResponse
|
||||||
return client.service.Add
|
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
|
||||||
})
|
resp, inErr = client.service.Add(ctx, request)
|
||||||
if err != nil {
|
return handleError("failed to add node", inErr)
|
||||||
return 0, handleError("failed to add node", err)
|
}); err != nil {
|
||||||
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp.GetBody().GetNodeId(), nil
|
return resp.GetBody().GetNodeId(), nil
|
||||||
|
@ -252,11 +255,12 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddByPathRequest, *grpcService.AddByPathResponse] {
|
var resp *grpcService.AddByPathResponse
|
||||||
return client.service.AddByPath
|
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
|
||||||
})
|
resp, inErr = client.service.AddByPath(ctx, request)
|
||||||
if err != nil {
|
return handleError("failed to add node by path", inErr)
|
||||||
return 0, handleError("failed to add node by path", err)
|
}); err != nil {
|
||||||
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
body := resp.GetBody()
|
body := resp.GetBody()
|
||||||
|
@ -291,14 +295,12 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.MoveRequest, *grpcService.MoveResponse] {
|
return c.requestWithRetry(func(client treeClient) error {
|
||||||
return client.service.Move
|
if _, err := client.service.Move(ctx, request); err != nil {
|
||||||
|
return handleError("failed to move node", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
return handleError("failed to move node", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
||||||
|
@ -319,31 +321,27 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := requestWithRetry(ctx, request, c, func(cl treeClient) grpcFunc[*grpcService.RemoveRequest, *grpcService.RemoveResponse] {
|
return c.requestWithRetry(func(client treeClient) error {
|
||||||
return cl.service.Remove
|
if _, err := client.service.Remove(ctx, request); err != nil {
|
||||||
|
return handleError("failed to remove node", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
return handleError("failed to remove node", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type grpcFunc[Req any, Resp any] func(context.Context, Req, ...grpc.CallOption) (Resp, error)
|
func (c *ServiceClientGRPC) requestWithRetry(fn func(client treeClient) error) (err error) {
|
||||||
|
start := int(atomic.LoadInt32(&c.startIndex))
|
||||||
|
|||||||
func requestWithRetry[Req any, Resp any](ctx context.Context, req Req, c *ServiceClientGRPC, fn func(client treeClient) grpcFunc[Req, Resp]) (res Resp, err error) {
|
|
||||||
start := int(c.startIndex.Load())
|
|
||||||
for i := start; i < start+len(c.clients); i++ {
|
for i := start; i < start+len(c.clients); i++ {
|
||||||
index := i % len(c.clients)
|
index := i % len(c.clients)
|
||||||
res, err = fn(c.clients[index])(ctx, req)
|
err = fn(c.clients[index])
|
||||||
if !shouldTryAgain(err) {
|
if !shouldTryAgain(err) {
|
||||||
c.startIndex.Store(int32(index))
|
atomic.StoreInt32(&c.startIndex, int32(index))
|
||||||
return res, err
|
return err
|
||||||
}
|
}
|
||||||
c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err))
|
c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldTryAgain(err error) bool {
|
func shouldTryAgain(err error) bool {
|
||||||
|
@ -396,6 +394,9 @@ func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleError(msg string, err error) error {
|
func handleError(msg string, err error) error {
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
if strings.Contains(err.Error(), "not found") {
|
if strings.Contains(err.Error(), "not found") {
|
||||||
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
|
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
|
||||||
} else if strings.Contains(err.Error(), "is denied by") {
|
} else if strings.Contains(err.Error(), "is denied by") {
|
||||||
|
|
|
@ -1,14 +1,13 @@
|
||||||
package services
|
package services
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestHandleError(t *testing.T) {
|
func TestHandleError(t *testing.T) {
|
||||||
|
@ -38,8 +37,6 @@ func TestHandleError(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRetry(t *testing.T) {
|
func TestRetry(t *testing.T) {
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
cl := &ServiceClientGRPC{
|
cl := &ServiceClientGRPC{
|
||||||
log: zaptest.NewLogger(t),
|
log: zaptest.NewLogger(t),
|
||||||
clients: []treeClient{
|
clients: []treeClient{
|
||||||
|
@ -50,51 +47,46 @@ func TestRetry(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
fn := func(client treeClient) grpcFunc[[]string, string] {
|
makeFn := func(shouldFail []string) func(treeClient) error {
|
||||||
return func(ctx context.Context, shouldFail []string, opts ...grpc.CallOption) (string, error) {
|
return func(client treeClient) error {
|
||||||
for _, item := range shouldFail {
|
for _, item := range shouldFail {
|
||||||
if item == client.address {
|
if item == client.address {
|
||||||
return "", errors.New("not found")
|
return errors.New("not found")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return client.address, nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("first ok", func(t *testing.T) {
|
t.Run("first ok", func(t *testing.T) {
|
||||||
resp, err := requestWithRetry(ctx, []string{}, cl, fn)
|
err := cl.requestWithRetry(makeFn([]string{}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, "node0", resp)
|
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
|
||||||
require.Equal(t, 0, int(cl.startIndex.Load()))
|
atomic.StoreInt32(&cl.startIndex, 0)
|
||||||
cl.startIndex.Store(0)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("first failed", func(t *testing.T) {
|
t.Run("first failed", func(t *testing.T) {
|
||||||
resp, err := requestWithRetry(ctx, []string{"node0"}, cl, fn)
|
err := cl.requestWithRetry(makeFn([]string{"node0"}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, "node1", resp)
|
require.Equal(t, 1, int(atomic.LoadInt32(&cl.startIndex)))
|
||||||
require.Equal(t, 1, int(cl.startIndex.Load()))
|
atomic.StoreInt32(&cl.startIndex, 0)
|
||||||
cl.startIndex.Store(0)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("all failed", func(t *testing.T) {
|
t.Run("all failed", func(t *testing.T) {
|
||||||
resp, err := requestWithRetry(ctx, []string{"node0", "node1", "node2", "node3"}, cl, fn)
|
err := cl.requestWithRetry(makeFn([]string{"node0", "node1", "node2", "node3"}))
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Equal(t, "", resp)
|
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
|
||||||
require.Equal(t, 0, int(cl.startIndex.Load()))
|
atomic.StoreInt32(&cl.startIndex, 0)
|
||||||
cl.startIndex.Store(0)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("round", func(t *testing.T) {
|
t.Run("round", func(t *testing.T) {
|
||||||
resp, err := requestWithRetry(ctx, []string{"node0", "node1"}, cl, fn)
|
err := cl.requestWithRetry(makeFn([]string{"node0", "node1"}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, "node2", resp)
|
require.Equal(t, 2, int(atomic.LoadInt32(&cl.startIndex)))
|
||||||
require.Equal(t, 2, int(cl.startIndex.Load()))
|
|
||||||
|
|
||||||
resp, err = requestWithRetry(ctx, []string{"node2", "node3"}, cl, fn)
|
err = cl.requestWithRetry(makeFn([]string{"node2", "node3"}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, "node0", resp)
|
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
|
||||||
require.Equal(t, 0, int(cl.startIndex.Load()))
|
atomic.StoreInt32(&cl.startIndex, 0)
|
||||||
cl.startIndex.Store(0)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue
Current implementation uses any available client in the
c.clients
that comes up first thus we use the clients in the beginning more often that the ones in the end of it. Also, we iterate through some part of the list on each request going throughrequestWithRetry
. I don't insist but may be we should change this?All implementations of round-robin approaches that I know suppose a cycling system: we go through a set of resources and distribute them one by one from start of a "list" to the end, and then we start over again. This way we achieve somewhat equal utilisation and don't have to look through a "list" of resources every time we need one.
Example of a round-robin approach: https://en.wikipedia.org/wiki/Round-robin_scheduling
Good point. I'll change behavior to start iterate client at the index where we stop previously.
As I understand we cannot drop iterating clients at all because we want to retry failed request
/cc @alexvanin @a.bogatyrev