[#185] Control timeout of tree service operations
Implemented context timeout for all tree service operations except those that return a GRPC stream Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
parent
5361f0eceb
commit
00eb080f50
1 changed files with 30 additions and 5 deletions
|
@ -66,6 +66,7 @@ type InitParameters struct {
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
nodeDialTimeout time.Duration
|
nodeDialTimeout time.Duration
|
||||||
nodeStreamTimeout time.Duration
|
nodeStreamTimeout time.Duration
|
||||||
|
connTimeout time.Duration
|
||||||
healthcheckTimeout time.Duration
|
healthcheckTimeout time.Duration
|
||||||
clientRebalanceInterval time.Duration
|
clientRebalanceInterval time.Duration
|
||||||
nodeParams []pool.NodeParam
|
nodeParams []pool.NodeParam
|
||||||
|
@ -91,6 +92,7 @@ type Pool struct {
|
||||||
methods []*pool.MethodStatus
|
methods []*pool.MethodStatus
|
||||||
|
|
||||||
maxRequestAttempts int
|
maxRequestAttempts int
|
||||||
|
connTimeout time.Duration
|
||||||
|
|
||||||
startIndicesMtx sync.RWMutex
|
startIndicesMtx sync.RWMutex
|
||||||
// startIndices points to the client from which the next request will be executed.
|
// startIndices points to the client from which the next request will be executed.
|
||||||
|
@ -231,6 +233,7 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
clientRebalanceInterval: options.clientRebalanceInterval,
|
clientRebalanceInterval: options.clientRebalanceInterval,
|
||||||
},
|
},
|
||||||
maxRequestAttempts: options.maxRequestAttempts,
|
maxRequestAttempts: options.maxRequestAttempts,
|
||||||
|
connTimeout: options.connTimeout,
|
||||||
methods: methods,
|
methods: methods,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -306,6 +309,11 @@ func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
|
||||||
x.healthcheckTimeout = timeout
|
x.healthcheckTimeout = timeout
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetConnectionTimeout specifies the timeout for operation in each request retry.
|
||||||
|
func (x *InitParameters) SetConnectionTimeout(timeout time.Duration) {
|
||||||
|
x.connTimeout = timeout
|
||||||
|
}
|
||||||
|
|
||||||
// SetClientRebalanceInterval specifies the interval for updating nodes health status.
|
// SetClientRebalanceInterval specifies the interval for updating nodes health status.
|
||||||
//
|
//
|
||||||
// See also Pool.Dial.
|
// See also Pool.Dial.
|
||||||
|
@ -355,7 +363,9 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
||||||
|
|
||||||
var resp *grpcService.GetNodeByPathResponse
|
var resp *grpcService.GetNodeByPathResponse
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
resp, inErr = client.GetNodeByPath(ctx, request)
|
reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
|
||||||
|
defer cancel()
|
||||||
|
resp, inErr = client.GetNodeByPath(reqCtx, request)
|
||||||
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
||||||
// Empty result is expected due to delayed tree service sync.
|
// Empty result is expected due to delayed tree service sync.
|
||||||
// Return an error there to trigger retry and ignore it after,
|
// Return an error there to trigger retry and ignore it after,
|
||||||
|
@ -489,7 +499,9 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||||
|
|
||||||
var resp *grpcService.AddResponse
|
var resp *grpcService.AddResponse
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
resp, inErr = client.Add(ctx, request)
|
reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
|
||||||
|
defer cancel()
|
||||||
|
resp, inErr = client.Add(reqCtx, request)
|
||||||
return handleError("failed to add node", inErr)
|
return handleError("failed to add node", inErr)
|
||||||
})
|
})
|
||||||
p.methods[methodAddNode].IncRequests(time.Since(start))
|
p.methods[methodAddNode].IncRequests(time.Since(start))
|
||||||
|
@ -524,7 +536,9 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
||||||
|
|
||||||
var resp *grpcService.AddByPathResponse
|
var resp *grpcService.AddByPathResponse
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
resp, inErr = client.AddByPath(ctx, request)
|
reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
|
||||||
|
defer cancel()
|
||||||
|
resp, inErr = client.AddByPath(reqCtx, request)
|
||||||
return handleError("failed to add node by path", inErr)
|
return handleError("failed to add node by path", inErr)
|
||||||
})
|
})
|
||||||
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
|
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
|
||||||
|
@ -566,7 +580,9 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||||
if _, err := client.Move(ctx, request); err != nil {
|
reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
|
||||||
|
defer cancel()
|
||||||
|
if _, err := client.Move(reqCtx, request); err != nil {
|
||||||
return handleError("failed to move node", err)
|
return handleError("failed to move node", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -597,7 +613,9 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||||
if _, err := client.Remove(ctx, request); err != nil {
|
reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
|
||||||
|
defer cancel()
|
||||||
|
if _, err := client.Remove(reqCtx, request); err != nil {
|
||||||
return handleError("failed to remove node", err)
|
return handleError("failed to remove node", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -887,6 +905,13 @@ func finalError(current, candidate error) error {
|
||||||
return current
|
return current
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getTimeoutContext(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
|
||||||
|
if timeout != 0 {
|
||||||
|
return context.WithTimeout(ctx, timeout)
|
||||||
|
}
|
||||||
|
return ctx, func() {}
|
||||||
|
}
|
||||||
|
|
||||||
type reqKeyType struct{}
|
type reqKeyType struct{}
|
||||||
|
|
||||||
// SetRequestID sets request identifier to context so when some operations are logged in tree pool
|
// SetRequestID sets request identifier to context so when some operations are logged in tree pool
|
||||||
|
|
Loading…
Reference in a new issue