diff --git a/pool/tree/pool.go b/pool/tree/pool.go index b04ab2c..15d4c68 100644 --- a/pool/tree/pool.go +++ b/pool/tree/pool.go @@ -91,6 +91,7 @@ type Pool struct { methods []*pool.MethodStatus maxRequestAttempts int + streamTimeout time.Duration startIndicesMtx sync.RWMutex // startIndices points to the client from which the next request will be executed. @@ -231,6 +232,7 @@ func NewPool(options InitParameters) (*Pool, error) { clientRebalanceInterval: options.clientRebalanceInterval, }, maxRequestAttempts: options.maxRequestAttempts, + streamTimeout: options.nodeStreamTimeout, methods: methods, } @@ -355,7 +357,9 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService var resp *grpcService.GetNodeByPathResponse err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { - resp, inErr = client.GetNodeByPath(ctx, request) + reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) + defer cancel() + resp, inErr = client.GetNodeByPath(reqCtx, request) // Pool wants to do retry 'GetNodeByPath' request if result is empty. // Empty result is expected due to delayed tree service sync. // Return an error there to trigger retry and ignore it after, @@ -489,7 +493,9 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { var resp *grpcService.AddResponse err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { - resp, inErr = client.Add(ctx, request) + reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) + defer cancel() + resp, inErr = client.Add(reqCtx, request) return handleError("failed to add node", inErr) }) p.methods[methodAddNode].IncRequests(time.Since(start)) @@ -524,7 +530,9 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint var resp *grpcService.AddByPathResponse err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { - resp, inErr = client.AddByPath(ctx, request) + reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) + defer cancel() + resp, inErr = client.AddByPath(reqCtx, request) return handleError("failed to add node by path", inErr) }) p.methods[methodAddNodeByPath].IncRequests(time.Since(start)) @@ -566,7 +574,9 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { } err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { - if _, err := client.Move(ctx, request); err != nil { + reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) + defer cancel() + if _, err := client.Move(reqCtx, request); err != nil { return handleError("failed to move node", err) } return nil @@ -597,7 +607,9 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error { } err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { - if _, err := client.Remove(ctx, request); err != nil { + reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) + defer cancel() + if _, err := client.Remove(reqCtx, request); err != nil { return handleError("failed to remove node", err) } return nil