[#185] tree/pool: Control timeout of tree service operations

Implemented context timeout for all tree service operations except those that return a GRPC stream

Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
Nikita Zinkevich 2024-10-23 17:36:08 +03:00 committed by Alexey Vanin
parent 56c4aaaaca
commit 43d5c8dbac

View file

@ -91,6 +91,7 @@ type Pool struct {
methods []*pool.MethodStatus
maxRequestAttempts int
streamTimeout time.Duration
startIndicesMtx sync.RWMutex
// startIndices points to the client from which the next request will be executed.
@ -231,6 +232,7 @@ func NewPool(options InitParameters) (*Pool, error) {
clientRebalanceInterval: options.clientRebalanceInterval,
},
maxRequestAttempts: options.maxRequestAttempts,
streamTimeout: options.nodeStreamTimeout,
methods: methods,
}
@ -355,7 +357,9 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
var resp *grpcService.GetNodeByPathResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request)
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
resp, inErr = client.GetNodeByPath(reqCtx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after,
@ -489,7 +493,9 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
var resp *grpcService.AddResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.Add(ctx, request)
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
resp, inErr = client.Add(reqCtx, request)
return handleError("failed to add node", inErr)
})
p.methods[methodAddNode].IncRequests(time.Since(start))
@ -524,7 +530,9 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
var resp *grpcService.AddByPathResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request)
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
resp, inErr = client.AddByPath(reqCtx, request)
return handleError("failed to add node by path", inErr)
})
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
@ -566,7 +574,9 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
}
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
if _, err := client.Move(reqCtx, request); err != nil {
return handleError("failed to move node", err)
}
return nil
@ -597,7 +607,9 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
}
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
if _, err := client.Remove(reqCtx, request); err != nil {
return handleError("failed to remove node", err)
}
return nil