pool: Adopt timeouts for all unary operations

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
Alexey Vanin 2023-10-10 13:49:48 +03:00
parent 308bdcc2f5
commit ff61f61d54

View file

@ -302,7 +302,10 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
var resp *grpcService.GetNodeByPathResponse var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request) tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer cancel()
resp, inErr = client.GetNodeByPath(tctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty. // Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync. // Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after, // Return an error there to trigger retry and ignore it after,
@ -344,6 +347,7 @@ func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, e
func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) { func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) {
var res []*grpcService.GetSubTreeResponse_Body var res []*grpcService.GetSubTreeResponse_Body
for { for {
// TODO(AV): consider timeout for each recv
resp, err := x.cli.Recv() resp, err := x.cli.Recv()
if err == io.EOF { if err == io.EOF {
break break
@ -399,7 +403,10 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
var cli grpcService.TreeService_GetSubTreeClient var cli grpcService.TreeService_GetSubTreeClient
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request) tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer cancel()
cli, inErr = client.GetSubTree(tctx, request)
return handleError("failed to get sub tree client", inErr) return handleError("failed to get sub tree client", inErr)
}); err != nil { }); err != nil {
return nil, err return nil, err
@ -434,7 +441,10 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
var resp *grpcService.AddResponse var resp *grpcService.AddResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.Add(ctx, request) tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer cancel()
resp, inErr = client.Add(tctx, request)
return handleError("failed to add node", inErr) return handleError("failed to add node", inErr)
}); err != nil { }); err != nil {
return 0, err return 0, err
@ -471,7 +481,10 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
var resp *grpcService.AddByPathResponse var resp *grpcService.AddByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request) tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer cancel()
resp, inErr = client.AddByPath(tctx, request)
return handleError("failed to add node by path", inErr) return handleError("failed to add node by path", inErr)
}); err != nil { }); err != nil {
return 0, err return 0, err
@ -515,7 +528,10 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
} }
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil { tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer cancel()
if _, err := client.Move(tctx, request); err != nil {
return handleError("failed to move node", err) return handleError("failed to move node", err)
} }
return nil return nil
@ -546,7 +562,10 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
} }
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil { tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer cancel()
if _, err := client.Remove(tctx, request); err != nil {
return handleError("failed to remove node", err) return handleError("failed to remove node", err)
} }
return nil return nil
@ -773,9 +792,13 @@ func prioErr(err error) int {
return -1 return -1
case errors.Is(err, ErrNodeAccessDenied): case errors.Is(err, ErrNodeAccessDenied):
return 100 return 100
case errors.Is(err, context.Canceled):
return 150
case errors.Is(err, ErrNodeNotFound) || case errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, errNodeEmptyResult): errors.Is(err, errNodeEmptyResult):
return 200 return 200
case errors.Is(err, context.DeadlineExceeded):
return 280
case errors.Is(err, ErrUnhealthyEndpoint): case errors.Is(err, ErrUnhealthyEndpoint):
return 300 return 300
default: default: