[#185] Control timeout of tree service operations #290

Merged
alexvanin merged 1 commit from nzinkevich/frostfs-sdk-go:tree_request_timeout into master 2024-11-06 08:12:37 +00:00

View file

@ -91,6 +91,7 @@ type Pool struct {
methods []*pool.MethodStatus
maxRequestAttempts int
streamTimeout time.Duration
startIndicesMtx sync.RWMutex
// startIndices points to the client from which the next request will be executed.
@ -231,6 +232,7 @@ func NewPool(options InitParameters) (*Pool, error) {
clientRebalanceInterval: options.clientRebalanceInterval,
},
maxRequestAttempts: options.maxRequestAttempts,
streamTimeout: options.nodeStreamTimeout,
methods: methods,
}
@ -355,7 +357,9 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
var resp *grpcService.GetNodeByPathResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request)
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
resp, inErr = client.GetNodeByPath(reqCtx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after,
@ -489,7 +493,9 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
var resp *grpcService.AddResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.Add(ctx, request)
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
resp, inErr = client.Add(reqCtx, request)
return handleError("failed to add node", inErr)
})
dkirillov marked this conversation as resolved Outdated

Why don't invoke this function inside p.requestWithRetry?

Why don't invoke this function inside `p.requestWithRetry`?

GetSubTree should be ignored, because it returns a stream to the client. If I create a timeout context for this operation, it will not only cancel the method call, but it will also close the stream, which is using the same context

GetSubTree should be ignored, because it returns a stream to the client. If I create a timeout context for this operation, it will not only cancel the method call, but it will also close the stream, which is using the same context

I meant:

diff --git a/pool/tree/pool.go b/pool/tree/pool.go
index be2cb2d..3d89146 100644
--- a/pool/tree/pool.go
+++ b/pool/tree/pool.go
@@ -362,10 +362,8 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
 	}
 
 	var resp *grpcService.GetNodeByPathResponse
-	err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
-		reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
-		defer cancel()
-		resp, inErr = client.GetNodeByPath(reqCtx, request)
+	err := p.requestWithRetry(ctx, func(ctx context.Context, client grpcService.TreeServiceClient) (inErr error) {
+		resp, inErr = client.GetNodeByPath(ctx, request)
 		// Pool wants to do retry 'GetNodeByPath' request if result is empty.
 		// Empty result is expected due to delayed tree service sync.
 		// Return an error there to trigger retry and ignore it after,
@@ -820,7 +818,7 @@ func (p *Pool) setStartIndices(i, j int) {
 	p.startIndicesMtx.Unlock()
 }
 
-func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error {
+func (p *Pool) requestWithRetry(ctx context.Context, fn func(context.Context, grpcService.TreeServiceClient) error) error {
 	var (
 		err, finErr error
 		cl          grpcService.TreeServiceClient
@@ -848,7 +846,9 @@ LOOP:
 			attempts--
 
 			if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
-				err = fn(cl)
+				reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
+				err = fn(reqCtx, cl)
+				cancel()
 			}
 			if !shouldTryAgain(err) {
 				if startI != indexI || startJ != indexJ {

I meant: ```diff diff --git a/pool/tree/pool.go b/pool/tree/pool.go index be2cb2d..3d89146 100644 --- a/pool/tree/pool.go +++ b/pool/tree/pool.go @@ -362,10 +362,8 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService } var resp *grpcService.GetNodeByPathResponse - err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { - reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout) - defer cancel() - resp, inErr = client.GetNodeByPath(reqCtx, request) + err := p.requestWithRetry(ctx, func(ctx context.Context, client grpcService.TreeServiceClient) (inErr error) { + resp, inErr = client.GetNodeByPath(ctx, request) // Pool wants to do retry 'GetNodeByPath' request if result is empty. // Empty result is expected due to delayed tree service sync. // Return an error there to trigger retry and ignore it after, @@ -820,7 +818,7 @@ func (p *Pool) setStartIndices(i, j int) { p.startIndicesMtx.Unlock() } -func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error { +func (p *Pool) requestWithRetry(ctx context.Context, fn func(context.Context, grpcService.TreeServiceClient) error) error { var ( err, finErr error cl grpcService.TreeServiceClient @@ -848,7 +846,9 @@ LOOP: attempts-- if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil { - err = fn(cl) + reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout) + err = fn(reqCtx, cl) + cancel() } if !shouldTryAgain(err) { if startI != indexI || startJ != indexJ { ```

In that case all treeService methods which use requestWithRetry will have timeout, but getSubTree shouldn't have it.

In that case all treeService methods which use requestWithRetry will have timeout, but getSubTree shouldn't have it.

Oh, I see. Ok

Oh, I see. Ok
p.methods[methodAddNode].IncRequests(time.Since(start))
@ -524,7 +530,9 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
var resp *grpcService.AddByPathResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request)
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
resp, inErr = client.AddByPath(reqCtx, request)
return handleError("failed to add node by path", inErr)
})
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
@ -566,7 +574,9 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
}
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
if _, err := client.Move(reqCtx, request); err != nil {
return handleError("failed to move node", err)
}
return nil
@ -597,7 +607,9 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
}
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
if _, err := client.Remove(reqCtx, request); err != nil {
return handleError("failed to remove node", err)
}
return nil