From 00eb080f5083b0a8b0132f356621c08e62f5c7d4 Mon Sep 17 00:00:00 2001 From: Nikita Zinkevich Date: Wed, 23 Oct 2024 17:36:08 +0300 Subject: [PATCH] [#185] Control timeout of tree service operations Implemented context timeout for all tree service operations except those that return a GRPC stream Signed-off-by: Nikita Zinkevich --- pool/tree/pool.go | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/pool/tree/pool.go b/pool/tree/pool.go index 0d7c117..be2cb2d 100644 --- a/pool/tree/pool.go +++ b/pool/tree/pool.go @@ -66,6 +66,7 @@ type InitParameters struct { logger *zap.Logger nodeDialTimeout time.Duration nodeStreamTimeout time.Duration + connTimeout time.Duration healthcheckTimeout time.Duration clientRebalanceInterval time.Duration nodeParams []pool.NodeParam @@ -91,6 +92,7 @@ type Pool struct { methods []*pool.MethodStatus maxRequestAttempts int + connTimeout time.Duration startIndicesMtx sync.RWMutex // startIndices points to the client from which the next request will be executed. @@ -231,6 +233,7 @@ func NewPool(options InitParameters) (*Pool, error) { clientRebalanceInterval: options.clientRebalanceInterval, }, maxRequestAttempts: options.maxRequestAttempts, + connTimeout: options.connTimeout, methods: methods, } @@ -306,6 +309,11 @@ func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) { x.healthcheckTimeout = timeout } +// SetConnectionTimeout specifies the timeout for operation in each request retry. +func (x *InitParameters) SetConnectionTimeout(timeout time.Duration) { + x.connTimeout = timeout +} + // SetClientRebalanceInterval specifies the interval for updating nodes health status. // // See also Pool.Dial. @@ -355,7 +363,9 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService var resp *grpcService.GetNodeByPathResponse err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { - resp, inErr = client.GetNodeByPath(ctx, request) + reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout) + defer cancel() + resp, inErr = client.GetNodeByPath(reqCtx, request) // Pool wants to do retry 'GetNodeByPath' request if result is empty. // Empty result is expected due to delayed tree service sync. // Return an error there to trigger retry and ignore it after, @@ -489,7 +499,9 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { var resp *grpcService.AddResponse err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { - resp, inErr = client.Add(ctx, request) + reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout) + defer cancel() + resp, inErr = client.Add(reqCtx, request) return handleError("failed to add node", inErr) }) p.methods[methodAddNode].IncRequests(time.Since(start)) @@ -524,7 +536,9 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint var resp *grpcService.AddByPathResponse err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { - resp, inErr = client.AddByPath(ctx, request) + reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout) + defer cancel() + resp, inErr = client.AddByPath(reqCtx, request) return handleError("failed to add node by path", inErr) }) p.methods[methodAddNodeByPath].IncRequests(time.Since(start)) @@ -566,7 +580,9 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { } err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { - if _, err := client.Move(ctx, request); err != nil { + reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout) + defer cancel() + if _, err := client.Move(reqCtx, request); err != nil { return handleError("failed to move node", err) } return nil @@ -597,7 +613,9 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error { } err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { - if _, err := client.Remove(ctx, request); err != nil { + reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout) + defer cancel() + if _, err := client.Remove(reqCtx, request); err != nil { return handleError("failed to remove node", err) } return nil @@ -887,6 +905,13 @@ func finalError(current, candidate error) error { return current } +func getTimeoutContext(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout != 0 { + return context.WithTimeout(ctx, timeout) + } + return ctx, func() {} +} + type reqKeyType struct{} // SetRequestID sets request identifier to context so when some operations are logged in tree pool