[#185] Control timeout of tree service operations

Implemented context timeout for all tree service operations except those that return a GRPC stream

Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
Nikita Zinkevich 2024-10-23 17:36:08 +03:00
parent 5361f0eceb
commit 16a4c016fe
Signed by: nzinkevich
GPG key ID: 748EA1D0B2E6420A

View file

@ -66,6 +66,7 @@ type InitParameters struct {
logger *zap.Logger logger *zap.Logger
nodeDialTimeout time.Duration nodeDialTimeout time.Duration
nodeStreamTimeout time.Duration nodeStreamTimeout time.Duration
connTimeout time.Duration
healthcheckTimeout time.Duration healthcheckTimeout time.Duration
clientRebalanceInterval time.Duration clientRebalanceInterval time.Duration
nodeParams []pool.NodeParam nodeParams []pool.NodeParam
@ -91,6 +92,7 @@ type Pool struct {
methods []*pool.MethodStatus methods []*pool.MethodStatus
maxRequestAttempts int maxRequestAttempts int
connTimeout time.Duration
startIndicesMtx sync.RWMutex startIndicesMtx sync.RWMutex
// startIndices points to the client from which the next request will be executed. // startIndices points to the client from which the next request will be executed.
@ -231,6 +233,7 @@ func NewPool(options InitParameters) (*Pool, error) {
clientRebalanceInterval: options.clientRebalanceInterval, clientRebalanceInterval: options.clientRebalanceInterval,
}, },
maxRequestAttempts: options.maxRequestAttempts, maxRequestAttempts: options.maxRequestAttempts,
connTimeout: options.connTimeout,
methods: methods, methods: methods,
} }
@ -306,6 +309,11 @@ func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
x.healthcheckTimeout = timeout x.healthcheckTimeout = timeout
} }
// SetConnectionTimeout specifies the timeout for operation in each request retry.
func (x *InitParameters) SetConnectionTimeout(timeout time.Duration) {
x.connTimeout = timeout
}
// SetClientRebalanceInterval specifies the interval for updating nodes health status. // SetClientRebalanceInterval specifies the interval for updating nodes health status.
// //
// See also Pool.Dial. // See also Pool.Dial.
@ -489,7 +497,9 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
var resp *grpcService.AddResponse var resp *grpcService.AddResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.Add(ctx, request) reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
defer cancel()
resp, inErr = client.Add(reqCtx, request)
return handleError("failed to add node", inErr) return handleError("failed to add node", inErr)
}) })
p.methods[methodAddNode].IncRequests(time.Since(start)) p.methods[methodAddNode].IncRequests(time.Since(start))
@ -524,7 +534,9 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
var resp *grpcService.AddByPathResponse var resp *grpcService.AddByPathResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request) reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
defer cancel()
resp, inErr = client.AddByPath(reqCtx, request)
return handleError("failed to add node by path", inErr) return handleError("failed to add node by path", inErr)
}) })
p.methods[methodAddNodeByPath].IncRequests(time.Since(start)) p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
@ -566,7 +578,9 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
} }
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil { reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
defer cancel()
if _, err := client.Move(reqCtx, request); err != nil {
return handleError("failed to move node", err) return handleError("failed to move node", err)
} }
return nil return nil
@ -597,7 +611,9 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
} }
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil { reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
defer cancel()
if _, err := client.Remove(reqCtx, request); err != nil {
return handleError("failed to remove node", err) return handleError("failed to remove node", err)
} }
return nil return nil
@ -887,6 +903,13 @@ func finalError(current, candidate error) error {
return current return current
} }
func getTimeoutContext(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout != 0 {
return context.WithTimeout(ctx, timeout)
}
return ctx, func() {}
}
type reqKeyType struct{} type reqKeyType struct{}
// SetRequestID sets request identifier to context so when some operations are logged in tree pool // SetRequestID sets request identifier to context so when some operations are logged in tree pool