From ff61f61d54b39dd29c754292a7075716f57ceff3 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Tue, 10 Oct 2023 13:49:48 +0300 Subject: [PATCH] pool: Adopt timeouts for all unary operations Signed-off-by: Alex Vanin --- pool/tree/pool.go | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/pool/tree/pool.go b/pool/tree/pool.go index 55407aa4..f6f0791c 100644 --- a/pool/tree/pool.go +++ b/pool/tree/pool.go @@ -302,7 +302,10 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService var resp *grpcService.GetNodeByPathResponse if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { - resp, inErr = client.GetNodeByPath(ctx, request) + tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout) + defer cancel() + + resp, inErr = client.GetNodeByPath(tctx, request) // Pool wants to do retry 'GetNodeByPath' request if result is empty. // Empty result is expected due to delayed tree service sync. // Return an error there to trigger retry and ignore it after, @@ -344,6 +347,7 @@ func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, e func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) { var res []*grpcService.GetSubTreeResponse_Body for { + // TODO(AV): consider timeout for each recv resp, err := x.cli.Recv() if err == io.EOF { break @@ -399,7 +403,10 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe var cli grpcService.TreeService_GetSubTreeClient if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { - cli, inErr = client.GetSubTree(ctx, request) + tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout) + defer cancel() + + cli, inErr = client.GetSubTree(tctx, request) return handleError("failed to get sub tree client", inErr) }); err != nil { return nil, err @@ -434,7 +441,10 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { var resp *grpcService.AddResponse if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { - resp, inErr = client.Add(ctx, request) + tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout) + defer cancel() + + resp, inErr = client.Add(tctx, request) return handleError("failed to add node", inErr) }); err != nil { return 0, err @@ -471,7 +481,10 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint var resp *grpcService.AddByPathResponse if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { - resp, inErr = client.AddByPath(ctx, request) + tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout) + defer cancel() + + resp, inErr = client.AddByPath(tctx, request) return handleError("failed to add node by path", inErr) }); err != nil { return 0, err @@ -515,7 +528,10 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { } return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { - if _, err := client.Move(ctx, request); err != nil { + tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout) + defer cancel() + + if _, err := client.Move(tctx, request); err != nil { return handleError("failed to move node", err) } return nil @@ -546,7 +562,10 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error { } return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { - if _, err := client.Remove(ctx, request); err != nil { + tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout) + defer cancel() + + if _, err := client.Remove(tctx, request); err != nil { return handleError("failed to remove node", err) } return nil @@ -773,9 +792,13 @@ func prioErr(err error) int { return -1 case errors.Is(err, ErrNodeAccessDenied): return 100 + case errors.Is(err, context.Canceled): + return 150 case errors.Is(err, ErrNodeNotFound) || errors.Is(err, errNodeEmptyResult): return 200 + case errors.Is(err, context.DeadlineExceeded): + return 280 case errors.Is(err, ErrUnhealthyEndpoint): return 300 default: