diff --git a/pool/tree/pool.go b/pool/tree/pool.go index eb88a89..bffe6e9 100644 --- a/pool/tree/pool.go +++ b/pool/tree/pool.go @@ -60,6 +60,7 @@ type InitParameters struct { clientRebalanceInterval time.Duration nodeParams []pool.NodeParam dialOptions []grpc.DialOption + maxRequestAttempts int } // Pool represents virtual connection to the FrostFS tree services network to communicate @@ -78,6 +79,8 @@ type Pool struct { dialOptions []grpc.DialOption logger *zap.Logger + maxRequestAttempts int + startIndicesMtx sync.RWMutex // startIndices points to the client from which the next request will be executed. // Since clients are stored in innerPool field we have to use two indices. @@ -177,6 +180,7 @@ func NewPool(options InitParameters) (*Pool, error) { nodeRequestTimeout: options.healthcheckTimeout, clientRebalanceInterval: options.clientRebalanceInterval, }, + maxRequestAttempts: options.maxRequestAttempts, } return p, nil @@ -268,6 +272,12 @@ func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) { x.dialOptions = opts } +// SetMaxRequestAttempts sets the max attempt to make successful request. +// Default value is 0 that means the number of attempts equals to number of nodes in pool. +func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) { + x.maxRequestAttempts = maxAttempts +} + // GetNodes invokes eponymous method from TreeServiceClient. // // Can return predefined errors: @@ -628,6 +638,10 @@ func fillDefaultInitParams(params *InitParameters) { if params.nodeStreamTimeout <= 0 { params.nodeStreamTimeout = defaultStreamTimeout } + + if params.maxRequestAttempts <= 0 { + params.maxRequestAttempts = len(params.nodeParams) + } } func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) { @@ -738,11 +752,23 @@ func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService. startI, startJ := p.getStartIndices() groupsLen := len(p.innerPools) + attempts := p.maxRequestAttempts + +LOOP: for i := startI; i < startI+groupsLen; i++ { indexI := i % groupsLen clientsLen := len(p.innerPools[indexI].clients) for j := startJ; j < startJ+clientsLen; j++ { indexJ := j % clientsLen + + if attempts == 0 { + if startI != indexI || startJ != indexJ { + p.setStartIndices(indexI, indexJ) + } + break LOOP + } + attempts-- + if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil { err = fn(cl) } @@ -752,8 +778,10 @@ func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService. } return err } + finErr = finalError(finErr, err) - p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) + p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts), + zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) } startJ = 0 } diff --git a/pool/tree/pool_test.go b/pool/tree/pool_test.go index 19c0635..b0d35f5 100644 --- a/pool/tree/pool_test.go +++ b/pool/tree/pool_test.go @@ -83,9 +83,15 @@ func TestRetry(t *testing.T) { {"node10", "node11", "node12", "node13"}, } + var lenNodes int + for i := range nodes { + lenNodes += len(nodes[i]) + } + p := &Pool{ - logger: zaptest.NewLogger(t), - innerPools: makeInnerPool(nodes), + logger: zaptest.NewLogger(t), + innerPools: makeInnerPool(nodes), + maxRequestAttempts: lenNodes, } makeFn := func(client grpcService.TreeServiceClient) error { @@ -194,6 +200,18 @@ func TestRetry(t *testing.T) { require.Equal(t, 1, index) checkIndicesAndReset(t, p, 0, 0) }) + + t.Run("limit attempts", func(t *testing.T) { + oldVal := p.maxRequestAttempts + p.maxRequestAttempts = 2 + setErrors(p, nodes[0]...) + setErrors(p, nodes[1]...) + err := p.requestWithRetry(ctx, makeFn) + require.Error(t, err) + checkIndicesAndReset(t, p, 0, 2) + p.maxRequestAttempts = oldVal + }) + } func TestRebalance(t *testing.T) {