[#191] limit_tree_pool_retries #191

Merged
alexvanin merged 2 commits from poc/limit_tree_pool_retries into master 2024-09-04 19:51:15 +00:00
2 changed files with 85 additions and 22 deletions

View file

@ -60,6 +60,7 @@ type InitParameters struct {
clientRebalanceInterval time.Duration
nodeParams []pool.NodeParam
dialOptions []grpc.DialOption
maxRequestAttempts int
}
// Pool represents virtual connection to the FrostFS tree services network to communicate
@ -78,6 +79,8 @@ type Pool struct {
dialOptions []grpc.DialOption
logger *zap.Logger
maxRequestAttempts int
startIndicesMtx sync.RWMutex
// startIndices points to the client from which the next request will be executed.
// Since clients are stored in innerPool field we have to use two indices.
@ -177,6 +180,7 @@ func NewPool(options InitParameters) (*Pool, error) {
nodeRequestTimeout: options.healthcheckTimeout,
clientRebalanceInterval: options.clientRebalanceInterval,
},
maxRequestAttempts: options.maxRequestAttempts,
}
return p, nil
@ -268,6 +272,12 @@ func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
x.dialOptions = opts
}
// SetMaxRequestAttempts sets the max attempt to make successful request.
// Default value is 0 that means the number of attempts equals to number of nodes in pool.
func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
x.maxRequestAttempts = maxAttempts
}
// GetNodes invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
@ -297,7 +307,7 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
}
var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
@ -394,7 +404,7 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
}
var cli grpcService.TreeService_GetSubTreeClient
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr)
}); err != nil {
@ -429,7 +439,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
}
var resp *grpcService.AddResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.Add(ctx, request)
return handleError("failed to add node", inErr)
}); err != nil {
@ -466,7 +476,7 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
}
var resp *grpcService.AddByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr)
}); err != nil {
@ -510,7 +520,7 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
return err
}
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil {
return handleError("failed to move node", err)
}
@ -541,7 +551,7 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
return err
}
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err)
}
@ -628,6 +638,10 @@ func fillDefaultInitParams(params *InitParameters) {
if params.nodeStreamTimeout <= 0 {
params.nodeStreamTimeout = defaultStreamTimeout
}
if params.maxRequestAttempts <= 0 {
params.maxRequestAttempts = len(params.nodeParams)
}
}
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
@ -728,19 +742,33 @@ func (p *Pool) setStartIndices(i, j int) {
p.startIndicesMtx.Unlock()
}
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error {
var (
err, finErr error
cl grpcService.TreeServiceClient
)
reqID := GetRequestID(ctx)
startI, startJ := p.getStartIndices()
groupsLen := len(p.innerPools)
attempts := p.maxRequestAttempts
LOOP:
for i := startI; i < startI+groupsLen; i++ {
indexI := i % groupsLen
clientsLen := len(p.innerPools[indexI].clients)
for j := startJ; j < startJ+clientsLen; j++ {
indexJ := j % clientsLen
if attempts == 0 {
if startI != indexI || startJ != indexJ {
p.setStartIndices(indexI, indexJ)
}
break LOOP
}
attempts--
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
err = fn(cl)
}
@ -750,8 +778,10 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
}
return err
}
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
}
startJ = 0
}
@ -791,3 +821,17 @@ func finalError(current, candidate error) error {
return current
}
type reqKeyType struct{}
// SetRequestID sets request identifier to context so when some operations are logged in tree pool
fyrchik marked this conversation as resolved Outdated

For single-value context keys we can use typed empty struct type reqKeyType struct{}
I belive it is faster, because we don't need to compare string values
Go compiler uses this appoach a lot, e.g. 9d836d41d0/src/net/lookup.go (L312)

For single-value context keys we can use typed empty struct `type reqKeyType struct{}` I belive it is faster, because we don't need to compare string values Go compiler uses this appoach a lot, e.g. https://github.com/golang/go/blob/9d836d41d0d9df3acabf7f9607d3b09188a9bfc6/src/net/lookup.go#L312

How about WithRequestID (similar to WithValue)?

How about `WithRequestID` (similar to `WithValue`)?

And if it is public we need some comments about what it affects.

And if it is public we need some comments about what it affects.

How about WithRequestID (similar to WithValue)?

I would prefer to have paired names GetRequestID/SetRequestID

> How about `WithRequestID` (similar to `WithValue`)? I would prefer to have paired names `GetRequestID`/`SetRequestID`
// this identifier also be logged.
func SetRequestID(ctx context.Context, reqID string) context.Context {
return context.WithValue(ctx, reqKeyType{}, reqID)
}
// GetRequestID fetch tree pool request identifier from context.
func GetRequestID(ctx context.Context) string {
reqID, _ := ctx.Value(reqKeyType{}).(string)
return reqID
}

View file

@ -77,14 +77,21 @@ func TestHandleError(t *testing.T) {
}
func TestRetry(t *testing.T) {
ctx := context.Background()
nodes := [][]string{
{"node00", "node01", "node02", "node03"},
{"node10", "node11", "node12", "node13"},
}
var lenNodes int
for i := range nodes {
lenNodes += len(nodes[i])
}
p := &Pool{
logger: zaptest.NewLogger(t),
innerPools: makeInnerPool(nodes),
maxRequestAttempts: lenNodes,
}
makeFn := func(client grpcService.TreeServiceClient) error {
@ -92,14 +99,14 @@ func TestRetry(t *testing.T) {
}
t.Run("first ok", func(t *testing.T) {
err := p.requestWithRetry(makeFn)
err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0)
})
t.Run("first failed", func(t *testing.T) {
setErrors(p, "node00")
err := p.requestWithRetry(makeFn)
err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 1)
})
@ -107,7 +114,7 @@ func TestRetry(t *testing.T) {
t.Run("all failed", func(t *testing.T) {
setErrors(p, nodes[0]...)
setErrors(p, nodes[1]...)
err := p.requestWithRetry(makeFn)
err := p.requestWithRetry(ctx, makeFn)
require.Error(t, err)
checkIndicesAndReset(t, p, 0, 0)
})
@ -115,13 +122,13 @@ func TestRetry(t *testing.T) {
t.Run("round", func(t *testing.T) {
setErrors(p, nodes[0][0], nodes[0][1])
setErrors(p, nodes[1]...)
err := p.requestWithRetry(makeFn)
err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err)
checkIndices(t, p, 0, 2)
resetClientsErrors(p)
setErrors(p, nodes[0][2], nodes[0][3])
err = p.requestWithRetry(makeFn)
err = p.requestWithRetry(ctx, makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0)
})
@ -129,14 +136,14 @@ func TestRetry(t *testing.T) {
t.Run("group switch", func(t *testing.T) {
setErrors(p, nodes[0]...)
setErrors(p, nodes[1][0])
err := p.requestWithRetry(makeFn)
err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 1, 1)
})
t.Run("group round", func(t *testing.T) {
setErrors(p, nodes[0][1:]...)
err := p.requestWithRetry(makeFn)
err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0)
})
@ -144,7 +151,7 @@ func TestRetry(t *testing.T) {
t.Run("group round switch", func(t *testing.T) {
setErrors(p, nodes[0]...)
p.setStartIndices(0, 1)
err := p.requestWithRetry(makeFn)
err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 1, 0)
})
@ -152,14 +159,14 @@ func TestRetry(t *testing.T) {
t.Run("no panic group switch", func(t *testing.T) {
setErrors(p, nodes[1]...)
p.setStartIndices(1, 0)
err := p.requestWithRetry(makeFn)
err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0)
})
t.Run("error empty result", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if index < errNodes {
index++
return errNodeEmptyResult
@ -172,7 +179,7 @@ func TestRetry(t *testing.T) {
t.Run("error not found", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if index < errNodes {
index++
return ErrNodeNotFound
@ -185,7 +192,7 @@ func TestRetry(t *testing.T) {
t.Run("error access denied", func(t *testing.T) {
var index int
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
index++
return ErrNodeAccessDenied
})
@ -193,6 +200,18 @@ func TestRetry(t *testing.T) {
require.Equal(t, 1, index)
checkIndicesAndReset(t, p, 0, 0)
})
t.Run("limit attempts", func(t *testing.T) {
oldVal := p.maxRequestAttempts
p.maxRequestAttempts = 2
setErrors(p, nodes[0]...)
setErrors(p, nodes[1]...)
err := p.requestWithRetry(ctx, makeFn)
require.Error(t, err)
checkIndicesAndReset(t, p, 0, 2)
p.maxRequestAttempts = oldVal
})
}
func TestRebalance(t *testing.T) {