[#191] limit_tree_pool_retries #191
2 changed files with 85 additions and 22 deletions
|
@ -60,6 +60,7 @@ type InitParameters struct {
|
||||||
clientRebalanceInterval time.Duration
|
clientRebalanceInterval time.Duration
|
||||||
nodeParams []pool.NodeParam
|
nodeParams []pool.NodeParam
|
||||||
dialOptions []grpc.DialOption
|
dialOptions []grpc.DialOption
|
||||||
|
maxRequestAttempts int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pool represents virtual connection to the FrostFS tree services network to communicate
|
// Pool represents virtual connection to the FrostFS tree services network to communicate
|
||||||
|
@ -78,6 +79,8 @@ type Pool struct {
|
||||||
dialOptions []grpc.DialOption
|
dialOptions []grpc.DialOption
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
|
|
||||||
|
maxRequestAttempts int
|
||||||
|
|
||||||
startIndicesMtx sync.RWMutex
|
startIndicesMtx sync.RWMutex
|
||||||
// startIndices points to the client from which the next request will be executed.
|
// startIndices points to the client from which the next request will be executed.
|
||||||
// Since clients are stored in innerPool field we have to use two indices.
|
// Since clients are stored in innerPool field we have to use two indices.
|
||||||
|
@ -177,6 +180,7 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
nodeRequestTimeout: options.healthcheckTimeout,
|
nodeRequestTimeout: options.healthcheckTimeout,
|
||||||
clientRebalanceInterval: options.clientRebalanceInterval,
|
clientRebalanceInterval: options.clientRebalanceInterval,
|
||||||
},
|
},
|
||||||
|
maxRequestAttempts: options.maxRequestAttempts,
|
||||||
}
|
}
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
|
@ -268,6 +272,12 @@ func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
|
||||||
x.dialOptions = opts
|
x.dialOptions = opts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetMaxRequestAttempts sets the max attempt to make successful request.
|
||||||
|
// Default value is 0 that means the number of attempts equals to number of nodes in pool.
|
||||||
|
func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
|
||||||
|
x.maxRequestAttempts = maxAttempts
|
||||||
|
}
|
||||||
|
|
||||||
// GetNodes invokes eponymous method from TreeServiceClient.
|
// GetNodes invokes eponymous method from TreeServiceClient.
|
||||||
//
|
//
|
||||||
// Can return predefined errors:
|
// Can return predefined errors:
|
||||||
|
@ -297,7 +307,7 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.GetNodeByPathResponse
|
var resp *grpcService.GetNodeByPathResponse
|
||||||
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
resp, inErr = client.GetNodeByPath(ctx, request)
|
resp, inErr = client.GetNodeByPath(ctx, request)
|
||||||
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
||||||
// Empty result is expected due to delayed tree service sync.
|
// Empty result is expected due to delayed tree service sync.
|
||||||
|
@ -394,7 +404,7 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
|
||||||
}
|
}
|
||||||
|
|
||||||
var cli grpcService.TreeService_GetSubTreeClient
|
var cli grpcService.TreeService_GetSubTreeClient
|
||||||
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
cli, inErr = client.GetSubTree(ctx, request)
|
cli, inErr = client.GetSubTree(ctx, request)
|
||||||
return handleError("failed to get sub tree client", inErr)
|
return handleError("failed to get sub tree client", inErr)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
@ -429,7 +439,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.AddResponse
|
var resp *grpcService.AddResponse
|
||||||
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
resp, inErr = client.Add(ctx, request)
|
resp, inErr = client.Add(ctx, request)
|
||||||
return handleError("failed to add node", inErr)
|
return handleError("failed to add node", inErr)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
@ -466,7 +476,7 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.AddByPathResponse
|
var resp *grpcService.AddByPathResponse
|
||||||
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
resp, inErr = client.AddByPath(ctx, request)
|
resp, inErr = client.AddByPath(ctx, request)
|
||||||
return handleError("failed to add node by path", inErr)
|
return handleError("failed to add node by path", inErr)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
@ -510,7 +520,7 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||||
if _, err := client.Move(ctx, request); err != nil {
|
if _, err := client.Move(ctx, request); err != nil {
|
||||||
return handleError("failed to move node", err)
|
return handleError("failed to move node", err)
|
||||||
}
|
}
|
||||||
|
@ -541,7 +551,7 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||||
if _, err := client.Remove(ctx, request); err != nil {
|
if _, err := client.Remove(ctx, request); err != nil {
|
||||||
return handleError("failed to remove node", err)
|
return handleError("failed to remove node", err)
|
||||||
}
|
}
|
||||||
|
@ -628,6 +638,10 @@ func fillDefaultInitParams(params *InitParameters) {
|
||||||
if params.nodeStreamTimeout <= 0 {
|
if params.nodeStreamTimeout <= 0 {
|
||||||
params.nodeStreamTimeout = defaultStreamTimeout
|
params.nodeStreamTimeout = defaultStreamTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if params.maxRequestAttempts <= 0 {
|
||||||
|
params.maxRequestAttempts = len(params.nodeParams)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||||
|
@ -728,19 +742,33 @@ func (p *Pool) setStartIndices(i, j int) {
|
||||||
p.startIndicesMtx.Unlock()
|
p.startIndicesMtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
|
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error {
|
||||||
var (
|
var (
|
||||||
err, finErr error
|
err, finErr error
|
||||||
cl grpcService.TreeServiceClient
|
cl grpcService.TreeServiceClient
|
||||||
)
|
)
|
||||||
|
|
||||||
|
reqID := GetRequestID(ctx)
|
||||||
|
|
||||||
startI, startJ := p.getStartIndices()
|
startI, startJ := p.getStartIndices()
|
||||||
groupsLen := len(p.innerPools)
|
groupsLen := len(p.innerPools)
|
||||||
|
attempts := p.maxRequestAttempts
|
||||||
|
|
||||||
|
LOOP:
|
||||||
for i := startI; i < startI+groupsLen; i++ {
|
for i := startI; i < startI+groupsLen; i++ {
|
||||||
indexI := i % groupsLen
|
indexI := i % groupsLen
|
||||||
clientsLen := len(p.innerPools[indexI].clients)
|
clientsLen := len(p.innerPools[indexI].clients)
|
||||||
for j := startJ; j < startJ+clientsLen; j++ {
|
for j := startJ; j < startJ+clientsLen; j++ {
|
||||||
indexJ := j % clientsLen
|
indexJ := j % clientsLen
|
||||||
|
|
||||||
|
if attempts == 0 {
|
||||||
|
if startI != indexI || startJ != indexJ {
|
||||||
|
p.setStartIndices(indexI, indexJ)
|
||||||
|
}
|
||||||
|
break LOOP
|
||||||
|
}
|
||||||
|
attempts--
|
||||||
|
|
||||||
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
|
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
|
||||||
err = fn(cl)
|
err = fn(cl)
|
||||||
}
|
}
|
||||||
|
@ -750,8 +778,10 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
finErr = finalError(finErr, err)
|
finErr = finalError(finErr, err)
|
||||||
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
|
||||||
|
zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
||||||
}
|
}
|
||||||
startJ = 0
|
startJ = 0
|
||||||
}
|
}
|
||||||
|
@ -791,3 +821,17 @@ func finalError(current, candidate error) error {
|
||||||
|
|
||||||
return current
|
return current
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type reqKeyType struct{}
|
||||||
|
|
||||||
|
// SetRequestID sets request identifier to context so when some operations are logged in tree pool
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
How about How about `WithRequestID` (similar to `WithValue`)?
fyrchik
commented
And if it is public we need some comments about what it affects. And if it is public we need some comments about what it affects.
dkirillov
commented
I would prefer to have paired names > How about `WithRequestID` (similar to `WithValue`)?
I would prefer to have paired names `GetRequestID`/`SetRequestID`
|
|||||||
|
// this identifier also be logged.
|
||||||
|
func SetRequestID(ctx context.Context, reqID string) context.Context {
|
||||||
|
return context.WithValue(ctx, reqKeyType{}, reqID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRequestID fetch tree pool request identifier from context.
|
||||||
|
func GetRequestID(ctx context.Context) string {
|
||||||
|
reqID, _ := ctx.Value(reqKeyType{}).(string)
|
||||||
|
return reqID
|
||||||
|
}
|
||||||
|
|
|
@ -77,14 +77,21 @@ func TestHandleError(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRetry(t *testing.T) {
|
func TestRetry(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
nodes := [][]string{
|
nodes := [][]string{
|
||||||
{"node00", "node01", "node02", "node03"},
|
{"node00", "node01", "node02", "node03"},
|
||||||
{"node10", "node11", "node12", "node13"},
|
{"node10", "node11", "node12", "node13"},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var lenNodes int
|
||||||
|
for i := range nodes {
|
||||||
|
lenNodes += len(nodes[i])
|
||||||
|
}
|
||||||
|
|
||||||
p := &Pool{
|
p := &Pool{
|
||||||
logger: zaptest.NewLogger(t),
|
logger: zaptest.NewLogger(t),
|
||||||
innerPools: makeInnerPool(nodes),
|
innerPools: makeInnerPool(nodes),
|
||||||
|
maxRequestAttempts: lenNodes,
|
||||||
}
|
}
|
||||||
|
|
||||||
makeFn := func(client grpcService.TreeServiceClient) error {
|
makeFn := func(client grpcService.TreeServiceClient) error {
|
||||||
|
@ -92,14 +99,14 @@ func TestRetry(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("first ok", func(t *testing.T) {
|
t.Run("first ok", func(t *testing.T) {
|
||||||
err := p.requestWithRetry(makeFn)
|
err := p.requestWithRetry(ctx, makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkIndicesAndReset(t, p, 0, 0)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("first failed", func(t *testing.T) {
|
t.Run("first failed", func(t *testing.T) {
|
||||||
setErrors(p, "node00")
|
setErrors(p, "node00")
|
||||||
err := p.requestWithRetry(makeFn)
|
err := p.requestWithRetry(ctx, makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 1)
|
checkIndicesAndReset(t, p, 0, 1)
|
||||||
})
|
})
|
||||||
|
@ -107,7 +114,7 @@ func TestRetry(t *testing.T) {
|
||||||
t.Run("all failed", func(t *testing.T) {
|
t.Run("all failed", func(t *testing.T) {
|
||||||
setErrors(p, nodes[0]...)
|
setErrors(p, nodes[0]...)
|
||||||
setErrors(p, nodes[1]...)
|
setErrors(p, nodes[1]...)
|
||||||
err := p.requestWithRetry(makeFn)
|
err := p.requestWithRetry(ctx, makeFn)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkIndicesAndReset(t, p, 0, 0)
|
||||||
})
|
})
|
||||||
|
@ -115,13 +122,13 @@ func TestRetry(t *testing.T) {
|
||||||
t.Run("round", func(t *testing.T) {
|
t.Run("round", func(t *testing.T) {
|
||||||
setErrors(p, nodes[0][0], nodes[0][1])
|
setErrors(p, nodes[0][0], nodes[0][1])
|
||||||
setErrors(p, nodes[1]...)
|
setErrors(p, nodes[1]...)
|
||||||
err := p.requestWithRetry(makeFn)
|
err := p.requestWithRetry(ctx, makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndices(t, p, 0, 2)
|
checkIndices(t, p, 0, 2)
|
||||||
resetClientsErrors(p)
|
resetClientsErrors(p)
|
||||||
|
|
||||||
setErrors(p, nodes[0][2], nodes[0][3])
|
setErrors(p, nodes[0][2], nodes[0][3])
|
||||||
err = p.requestWithRetry(makeFn)
|
err = p.requestWithRetry(ctx, makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkIndicesAndReset(t, p, 0, 0)
|
||||||
})
|
})
|
||||||
|
@ -129,14 +136,14 @@ func TestRetry(t *testing.T) {
|
||||||
t.Run("group switch", func(t *testing.T) {
|
t.Run("group switch", func(t *testing.T) {
|
||||||
setErrors(p, nodes[0]...)
|
setErrors(p, nodes[0]...)
|
||||||
setErrors(p, nodes[1][0])
|
setErrors(p, nodes[1][0])
|
||||||
err := p.requestWithRetry(makeFn)
|
err := p.requestWithRetry(ctx, makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 1, 1)
|
checkIndicesAndReset(t, p, 1, 1)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("group round", func(t *testing.T) {
|
t.Run("group round", func(t *testing.T) {
|
||||||
setErrors(p, nodes[0][1:]...)
|
setErrors(p, nodes[0][1:]...)
|
||||||
err := p.requestWithRetry(makeFn)
|
err := p.requestWithRetry(ctx, makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkIndicesAndReset(t, p, 0, 0)
|
||||||
})
|
})
|
||||||
|
@ -144,7 +151,7 @@ func TestRetry(t *testing.T) {
|
||||||
t.Run("group round switch", func(t *testing.T) {
|
t.Run("group round switch", func(t *testing.T) {
|
||||||
setErrors(p, nodes[0]...)
|
setErrors(p, nodes[0]...)
|
||||||
p.setStartIndices(0, 1)
|
p.setStartIndices(0, 1)
|
||||||
err := p.requestWithRetry(makeFn)
|
err := p.requestWithRetry(ctx, makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 1, 0)
|
checkIndicesAndReset(t, p, 1, 0)
|
||||||
})
|
})
|
||||||
|
@ -152,14 +159,14 @@ func TestRetry(t *testing.T) {
|
||||||
t.Run("no panic group switch", func(t *testing.T) {
|
t.Run("no panic group switch", func(t *testing.T) {
|
||||||
setErrors(p, nodes[1]...)
|
setErrors(p, nodes[1]...)
|
||||||
p.setStartIndices(1, 0)
|
p.setStartIndices(1, 0)
|
||||||
err := p.requestWithRetry(makeFn)
|
err := p.requestWithRetry(ctx, makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkIndicesAndReset(t, p, 0, 0)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("error empty result", func(t *testing.T) {
|
t.Run("error empty result", func(t *testing.T) {
|
||||||
errNodes, index := 2, 0
|
errNodes, index := 2, 0
|
||||||
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||||
if index < errNodes {
|
if index < errNodes {
|
||||||
index++
|
index++
|
||||||
return errNodeEmptyResult
|
return errNodeEmptyResult
|
||||||
|
@ -172,7 +179,7 @@ func TestRetry(t *testing.T) {
|
||||||
|
|
||||||
t.Run("error not found", func(t *testing.T) {
|
t.Run("error not found", func(t *testing.T) {
|
||||||
errNodes, index := 2, 0
|
errNodes, index := 2, 0
|
||||||
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||||
if index < errNodes {
|
if index < errNodes {
|
||||||
index++
|
index++
|
||||||
return ErrNodeNotFound
|
return ErrNodeNotFound
|
||||||
|
@ -185,7 +192,7 @@ func TestRetry(t *testing.T) {
|
||||||
|
|
||||||
t.Run("error access denied", func(t *testing.T) {
|
t.Run("error access denied", func(t *testing.T) {
|
||||||
var index int
|
var index int
|
||||||
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||||
index++
|
index++
|
||||||
return ErrNodeAccessDenied
|
return ErrNodeAccessDenied
|
||||||
})
|
})
|
||||||
|
@ -193,6 +200,18 @@ func TestRetry(t *testing.T) {
|
||||||
require.Equal(t, 1, index)
|
require.Equal(t, 1, index)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkIndicesAndReset(t, p, 0, 0)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("limit attempts", func(t *testing.T) {
|
||||||
|
oldVal := p.maxRequestAttempts
|
||||||
|
p.maxRequestAttempts = 2
|
||||||
|
setErrors(p, nodes[0]...)
|
||||||
|
setErrors(p, nodes[1]...)
|
||||||
|
err := p.requestWithRetry(ctx, makeFn)
|
||||||
|
require.Error(t, err)
|
||||||
|
checkIndicesAndReset(t, p, 0, 2)
|
||||||
|
p.maxRequestAttempts = oldVal
|
||||||
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRebalance(t *testing.T) {
|
func TestRebalance(t *testing.T) {
|
||||||
|
|
Loading…
Reference in a new issue
For single-value context keys we can use typed empty struct
type reqKeyType struct{}
I belive it is faster, because we don't need to compare string values
Go compiler uses this appoach a lot, e.g.
9d836d41d0/src/net/lookup.go (L312)