diff --git a/pool/tree/pool.go b/pool/tree/pool.go index 0d7c117..0548c61 100644 --- a/pool/tree/pool.go +++ b/pool/tree/pool.go @@ -66,6 +66,7 @@ type InitParameters struct { logger *zap.Logger nodeDialTimeout time.Duration nodeStreamTimeout time.Duration + connTimeout time.Duration healthcheckTimeout time.Duration clientRebalanceInterval time.Duration nodeParams []pool.NodeParam @@ -91,6 +92,7 @@ type Pool struct { methods []*pool.MethodStatus maxRequestAttempts int + connTimeout time.Duration startIndicesMtx sync.RWMutex // startIndices points to the client from which the next request will be executed. @@ -231,6 +233,7 @@ func NewPool(options InitParameters) (*Pool, error) { clientRebalanceInterval: options.clientRebalanceInterval, }, maxRequestAttempts: options.maxRequestAttempts, + connTimeout: options.connTimeout, methods: methods, } @@ -306,6 +309,11 @@ func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) { x.healthcheckTimeout = timeout } +// SetConnectionTimeout specifies the timeout for operation in each request retry +func (x *InitParameters) SetConnectionTimeout(timeout time.Duration) { + x.connTimeout = timeout +} + // SetClientRebalanceInterval specifies the interval for updating nodes health status. // // See also Pool.Dial. @@ -489,7 +497,9 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { var resp *grpcService.AddResponse err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { - resp, inErr = client.Add(ctx, request) + reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout) + defer cancel() + resp, inErr = client.Add(reqCtx, request) return handleError("failed to add node", inErr) }) p.methods[methodAddNode].IncRequests(time.Since(start)) @@ -524,7 +534,9 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint var resp *grpcService.AddByPathResponse err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { - resp, inErr = client.AddByPath(ctx, request) + reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout) + defer cancel() + resp, inErr = client.AddByPath(reqCtx, request) return handleError("failed to add node by path", inErr) }) p.methods[methodAddNodeByPath].IncRequests(time.Since(start)) @@ -566,7 +578,9 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { } err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { - if _, err := client.Move(ctx, request); err != nil { + reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout) + defer cancel() + if _, err := client.Move(reqCtx, request); err != nil { return handleError("failed to move node", err) } return nil @@ -597,7 +611,9 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error { } err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { - if _, err := client.Remove(ctx, request); err != nil { + reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout) + defer cancel() + if _, err := client.Remove(reqCtx, request); err != nil { return handleError("failed to remove node", err) } return nil @@ -887,6 +903,13 @@ func finalError(current, candidate error) error { return current } +func getTimeoutContext(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout != 0 { + return context.WithTimeout(ctx, timeout) + } + return ctx, func() {} +} + type reqKeyType struct{} // SetRequestID sets request identifier to context so when some operations are logged in tree pool