Compare commits
4 commits
master
...
fix/enhanc
Author | SHA1 | Date | |
---|---|---|---|
24862a9332 | |||
ff61f61d54 | |||
308bdcc2f5 | |||
ca57c8f442 |
2 changed files with 95 additions and 19 deletions
|
@ -3,6 +3,7 @@ package tree
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
@ -22,6 +23,10 @@ type treeClient struct {
|
||||||
healthy bool
|
healthy bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
|
||||||
|
)
|
||||||
|
|
||||||
// newTreeClient creates new tree client with auto dial.
|
// newTreeClient creates new tree client with auto dial.
|
||||||
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
|
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
|
||||||
return &treeClient{
|
return &treeClient{
|
||||||
|
@ -102,7 +107,7 @@ func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
|
||||||
defer c.mu.RUnlock()
|
defer c.mu.RUnlock()
|
||||||
|
|
||||||
if c.conn == nil || !c.healthy {
|
if c.conn == nil || !c.healthy {
|
||||||
return nil, fmt.Errorf("unhealthy endpoint: '%s'", c.address)
|
return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.service, nil
|
return c.service, nil
|
||||||
|
|
|
@ -198,7 +198,11 @@ func (p *Pool) Dial(ctx context.Context) error {
|
||||||
clients := make([]client, len(nodes))
|
clients := make([]client, len(nodes))
|
||||||
for j, node := range nodes {
|
for j, node := range nodes {
|
||||||
clients[j] = newTreeClient(node.Address(), p.dialOptions...)
|
clients[j] = newTreeClient(node.Address(), p.dialOptions...)
|
||||||
if err := clients[j].dial(ctx); err != nil {
|
|
||||||
|
tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
|
||||||
|
err := clients[j].dial(tctx)
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
|
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -297,8 +301,11 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.GetNodeByPathResponse
|
var resp *grpcService.GetNodeByPathResponse
|
||||||
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
resp, inErr = client.GetNodeByPath(ctx, request)
|
tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
resp, inErr = client.GetNodeByPath(tctx, request)
|
||||||
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
||||||
// Empty result is expected due to delayed tree service sync.
|
// Empty result is expected due to delayed tree service sync.
|
||||||
// Return an error there to trigger retry and ignore it after,
|
// Return an error there to trigger retry and ignore it after,
|
||||||
|
@ -340,6 +347,7 @@ func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, e
|
||||||
func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) {
|
func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) {
|
||||||
var res []*grpcService.GetSubTreeResponse_Body
|
var res []*grpcService.GetSubTreeResponse_Body
|
||||||
for {
|
for {
|
||||||
|
// TODO(AV): consider timeout for each recv
|
||||||
resp, err := x.cli.Recv()
|
resp, err := x.cli.Recv()
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
break
|
break
|
||||||
|
@ -394,8 +402,11 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
|
||||||
}
|
}
|
||||||
|
|
||||||
var cli grpcService.TreeService_GetSubTreeClient
|
var cli grpcService.TreeService_GetSubTreeClient
|
||||||
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
cli, inErr = client.GetSubTree(ctx, request)
|
tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
cli, inErr = client.GetSubTree(tctx, request)
|
||||||
return handleError("failed to get sub tree client", inErr)
|
return handleError("failed to get sub tree client", inErr)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -429,8 +440,11 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.AddResponse
|
var resp *grpcService.AddResponse
|
||||||
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
resp, inErr = client.Add(ctx, request)
|
tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
resp, inErr = client.Add(tctx, request)
|
||||||
return handleError("failed to add node", inErr)
|
return handleError("failed to add node", inErr)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
@ -466,8 +480,11 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.AddByPathResponse
|
var resp *grpcService.AddByPathResponse
|
||||||
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
resp, inErr = client.AddByPath(ctx, request)
|
tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
resp, inErr = client.AddByPath(tctx, request)
|
||||||
return handleError("failed to add node by path", inErr)
|
return handleError("failed to add node by path", inErr)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
@ -510,8 +527,11 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||||
if _, err := client.Move(ctx, request); err != nil {
|
tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if _, err := client.Move(tctx, request); err != nil {
|
||||||
return handleError("failed to move node", err)
|
return handleError("failed to move node", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -541,8 +561,11 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||||
if _, err := client.Remove(ctx, request); err != nil {
|
tctx, cancel := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if _, err := client.Remove(tctx, request); err != nil {
|
||||||
return handleError("failed to remove node", err)
|
return handleError("failed to remove node", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -728,12 +751,14 @@ func (p *Pool) setStartIndices(i, j int) {
|
||||||
p.startIndicesMtx.Unlock()
|
p.startIndicesMtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
|
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error {
|
||||||
var (
|
var (
|
||||||
err error
|
err, finErr error
|
||||||
cl grpcService.TreeServiceClient
|
cl grpcService.TreeServiceClient
|
||||||
)
|
)
|
||||||
|
|
||||||
|
reqID := GetRequestID(ctx)
|
||||||
|
|
||||||
startI, startJ := p.getStartIndices()
|
startI, startJ := p.getStartIndices()
|
||||||
groupsLen := len(p.innerPools)
|
groupsLen := len(p.innerPools)
|
||||||
for i := startI; i < startI+groupsLen; i++ {
|
for i := startI; i < startI+groupsLen; i++ {
|
||||||
|
@ -750,14 +775,60 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
finErr = finalError(finErr, err)
|
||||||
|
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
||||||
}
|
}
|
||||||
startJ = 0
|
startJ = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return finErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldTryAgain(err error) bool {
|
func shouldTryAgain(err error) bool {
|
||||||
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
|
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func prioErr(err error) int {
|
||||||
|
switch {
|
||||||
|
case err == nil:
|
||||||
|
return -1
|
||||||
|
case errors.Is(err, ErrNodeAccessDenied):
|
||||||
|
return 100
|
||||||
|
case errors.Is(err, context.Canceled):
|
||||||
|
return 150
|
||||||
|
case errors.Is(err, ErrNodeNotFound) ||
|
||||||
|
errors.Is(err, errNodeEmptyResult):
|
||||||
|
return 200
|
||||||
|
case errors.Is(err, context.DeadlineExceeded):
|
||||||
|
return 280
|
||||||
|
case errors.Is(err, ErrUnhealthyEndpoint):
|
||||||
|
return 300
|
||||||
|
default:
|
||||||
|
return 500
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func finalError(current, candidate error) error {
|
||||||
|
if current == nil || candidate == nil {
|
||||||
|
return candidate
|
||||||
|
}
|
||||||
|
|
||||||
|
// lower priority error is more desirable
|
||||||
|
if prioErr(candidate) < prioErr(current) {
|
||||||
|
return candidate
|
||||||
|
}
|
||||||
|
|
||||||
|
return current
|
||||||
|
}
|
||||||
|
|
||||||
|
type reqKeyType string
|
||||||
|
|
||||||
|
const reqIDKey = reqKeyType("request_id")
|
||||||
|
|
||||||
|
func SetRequestID(ctx context.Context, reqID string) context.Context {
|
||||||
|
return context.WithValue(ctx, reqIDKey, reqID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetRequestID(ctx context.Context) string {
|
||||||
|
return ctx.Value(reqIDKey).(string)
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue