Compare commits

...
Sign in to create a new pull request.

4 commits

Author SHA1 Message Date
24862a9332 [#XX] pool/tree: Support request id
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-10-10 16:59:27 +03:00
ff61f61d54 pool: Adopt timeouts for all unary operations
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-10 13:49:48 +03:00
308bdcc2f5 pool: Use timeout for context cancelling
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-10 11:39:29 +03:00
ca57c8f442 pool: Use priority of errors in tree pool
When retry happens, use priority map to decide
which error to return. Consider network errors
less desirable than business logic errors.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-10 11:25:57 +03:00
2 changed files with 95 additions and 19 deletions

View file

@ -3,6 +3,7 @@ package tree
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors"
"fmt" "fmt"
"sync" "sync"
@ -22,6 +23,10 @@ type treeClient struct {
healthy bool healthy bool
} }
var (
ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
)
// newTreeClient creates new tree client with auto dial. // newTreeClient creates new tree client with auto dial.
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient { func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
return &treeClient{ return &treeClient{
@ -102,7 +107,7 @@ func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
defer c.mu.RUnlock() defer c.mu.RUnlock()
if c.conn == nil || !c.healthy { if c.conn == nil || !c.healthy {
return nil, fmt.Errorf("unhealthy endpoint: '%s'", c.address) return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
} }
return c.service, nil return c.service, nil

View file

@ -198,7 +198,11 @@ func (p *Pool) Dial(ctx context.Context) error {
clients := make([]client, len(nodes)) clients := make([]client, len(nodes))
for j, node := range nodes { for j, node := range nodes {
clients[j] = newTreeClient(node.Address(), p.dialOptions...) clients[j] = newTreeClient(node.Address(), p.dialOptions...)
if err := clients[j].dial(ctx); err != nil {
tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
err := clients[j].dial(tctx)
cancel()
if err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err)) p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
continue continue
} }
@ -297,8 +301,11 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
} }
var resp *grpcService.GetNodeByPathResponse var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request) tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer cancel()
resp, inErr = client.GetNodeByPath(tctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty. // Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync. // Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after, // Return an error there to trigger retry and ignore it after,
@ -340,6 +347,7 @@ func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, e
func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) { func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) {
var res []*grpcService.GetSubTreeResponse_Body var res []*grpcService.GetSubTreeResponse_Body
for { for {
// TODO(AV): consider timeout for each recv
resp, err := x.cli.Recv() resp, err := x.cli.Recv()
if err == io.EOF { if err == io.EOF {
break break
@ -394,8 +402,11 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
} }
var cli grpcService.TreeService_GetSubTreeClient var cli grpcService.TreeService_GetSubTreeClient
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request) tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer cancel()
cli, inErr = client.GetSubTree(tctx, request)
return handleError("failed to get sub tree client", inErr) return handleError("failed to get sub tree client", inErr)
}); err != nil { }); err != nil {
return nil, err return nil, err
@ -429,8 +440,11 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
} }
var resp *grpcService.AddResponse var resp *grpcService.AddResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.Add(ctx, request) tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer cancel()
resp, inErr = client.Add(tctx, request)
return handleError("failed to add node", inErr) return handleError("failed to add node", inErr)
}); err != nil { }); err != nil {
return 0, err return 0, err
@ -466,8 +480,11 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
} }
var resp *grpcService.AddByPathResponse var resp *grpcService.AddByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request) tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer cancel()
resp, inErr = client.AddByPath(tctx, request)
return handleError("failed to add node by path", inErr) return handleError("failed to add node by path", inErr)
}); err != nil { }); err != nil {
return 0, err return 0, err
@ -510,8 +527,11 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
return err return err
} }
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil { tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer cancel()
if _, err := client.Move(tctx, request); err != nil {
return handleError("failed to move node", err) return handleError("failed to move node", err)
} }
return nil return nil
@ -541,8 +561,11 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
return err return err
} }
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil { tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer cancel()
if _, err := client.Remove(tctx, request); err != nil {
return handleError("failed to remove node", err) return handleError("failed to remove node", err)
} }
return nil return nil
@ -728,12 +751,14 @@ func (p *Pool) setStartIndices(i, j int) {
p.startIndicesMtx.Unlock() p.startIndicesMtx.Unlock()
} }
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error { func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error {
var ( var (
err error err, finErr error
cl grpcService.TreeServiceClient cl grpcService.TreeServiceClient
) )
reqID := GetRequestID(ctx)
startI, startJ := p.getStartIndices() startI, startJ := p.getStartIndices()
groupsLen := len(p.innerPools) groupsLen := len(p.innerPools)
for i := startI; i < startI+groupsLen; i++ { for i := startI; i < startI+groupsLen; i++ {
@ -750,14 +775,60 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
} }
return err return err
} }
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
} }
startJ = 0 startJ = 0
} }
return err return finErr
} }
func shouldTryAgain(err error) bool { func shouldTryAgain(err error) bool {
return !(err == nil || errors.Is(err, ErrNodeAccessDenied)) return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
} }
func prioErr(err error) int {
switch {
case err == nil:
return -1
case errors.Is(err, ErrNodeAccessDenied):
return 100
case errors.Is(err, context.Canceled):
return 150
case errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, errNodeEmptyResult):
return 200
case errors.Is(err, context.DeadlineExceeded):
return 280
case errors.Is(err, ErrUnhealthyEndpoint):
return 300
default:
return 500
}
}
func finalError(current, candidate error) error {
if current == nil || candidate == nil {
return candidate
}
// lower priority error is more desirable
if prioErr(candidate) < prioErr(current) {
return candidate
}
return current
}
type reqKeyType string
const reqIDKey = reqKeyType("request_id")
func SetRequestID(ctx context.Context, reqID string) context.Context {
return context.WithValue(ctx, reqIDKey, reqID)
}
func GetRequestID(ctx context.Context) string {
return ctx.Value(reqIDKey).(string)
}