[#191] pool/tree: Support limit request attempts
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
8999d2f080
commit
ab75edd709
2 changed files with 49 additions and 3 deletions
|
@ -60,6 +60,7 @@ type InitParameters struct {
|
|||
clientRebalanceInterval time.Duration
|
||||
nodeParams []pool.NodeParam
|
||||
dialOptions []grpc.DialOption
|
||||
maxRequestAttempts int
|
||||
}
|
||||
|
||||
// Pool represents virtual connection to the FrostFS tree services network to communicate
|
||||
|
@ -78,6 +79,8 @@ type Pool struct {
|
|||
dialOptions []grpc.DialOption
|
||||
logger *zap.Logger
|
||||
|
||||
maxRequestAttempts int
|
||||
|
||||
startIndicesMtx sync.RWMutex
|
||||
// startIndices points to the client from which the next request will be executed.
|
||||
// Since clients are stored in innerPool field we have to use two indices.
|
||||
|
@ -177,6 +180,7 @@ func NewPool(options InitParameters) (*Pool, error) {
|
|||
nodeRequestTimeout: options.healthcheckTimeout,
|
||||
clientRebalanceInterval: options.clientRebalanceInterval,
|
||||
},
|
||||
maxRequestAttempts: options.maxRequestAttempts,
|
||||
}
|
||||
|
||||
return p, nil
|
||||
|
@ -268,6 +272,12 @@ func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
|
|||
x.dialOptions = opts
|
||||
}
|
||||
|
||||
// SetMaxRequestAttempts sets the max attempt to make successful request.
|
||||
// Default value is 0 that means the number of attempts equals to number of nodes in pool.
|
||||
func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
|
||||
x.maxRequestAttempts = maxAttempts
|
||||
}
|
||||
|
||||
// GetNodes invokes eponymous method from TreeServiceClient.
|
||||
//
|
||||
// Can return predefined errors:
|
||||
|
@ -628,6 +638,10 @@ func fillDefaultInitParams(params *InitParameters) {
|
|||
if params.nodeStreamTimeout <= 0 {
|
||||
params.nodeStreamTimeout = defaultStreamTimeout
|
||||
}
|
||||
|
||||
if params.maxRequestAttempts <= 0 {
|
||||
params.maxRequestAttempts = len(params.nodeParams)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||
|
@ -738,11 +752,23 @@ func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.
|
|||
|
||||
startI, startJ := p.getStartIndices()
|
||||
groupsLen := len(p.innerPools)
|
||||
attempts := p.maxRequestAttempts
|
||||
|
||||
LOOP:
|
||||
for i := startI; i < startI+groupsLen; i++ {
|
||||
indexI := i % groupsLen
|
||||
clientsLen := len(p.innerPools[indexI].clients)
|
||||
for j := startJ; j < startJ+clientsLen; j++ {
|
||||
indexJ := j % clientsLen
|
||||
|
||||
if attempts == 0 {
|
||||
if startI != indexI || startJ != indexJ {
|
||||
p.setStartIndices(indexI, indexJ)
|
||||
}
|
||||
break LOOP
|
||||
}
|
||||
attempts--
|
||||
|
||||
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
|
||||
err = fn(cl)
|
||||
}
|
||||
|
@ -752,8 +778,10 @@ func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
finErr = finalError(finErr, err)
|
||||
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
||||
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
|
||||
zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
||||
}
|
||||
startJ = 0
|
||||
}
|
||||
|
|
|
@ -83,9 +83,15 @@ func TestRetry(t *testing.T) {
|
|||
{"node10", "node11", "node12", "node13"},
|
||||
}
|
||||
|
||||
var lenNodes int
|
||||
for i := range nodes {
|
||||
lenNodes += len(nodes[i])
|
||||
}
|
||||
|
||||
p := &Pool{
|
||||
logger: zaptest.NewLogger(t),
|
||||
innerPools: makeInnerPool(nodes),
|
||||
maxRequestAttempts: lenNodes,
|
||||
}
|
||||
|
||||
makeFn := func(client grpcService.TreeServiceClient) error {
|
||||
|
@ -194,6 +200,18 @@ func TestRetry(t *testing.T) {
|
|||
require.Equal(t, 1, index)
|
||||
checkIndicesAndReset(t, p, 0, 0)
|
||||
})
|
||||
|
||||
t.Run("limit attempts", func(t *testing.T) {
|
||||
oldVal := p.maxRequestAttempts
|
||||
p.maxRequestAttempts = 2
|
||||
setErrors(p, nodes[0]...)
|
||||
setErrors(p, nodes[1]...)
|
||||
err := p.requestWithRetry(ctx, makeFn)
|
||||
require.Error(t, err)
|
||||
checkIndicesAndReset(t, p, 0, 2)
|
||||
p.maxRequestAttempts = oldVal
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestRebalance(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue