[#191] limit_tree_pool_retries #191

Merged
alexvanin merged 2 commits from poc/limit_tree_pool_retries into master 2024-09-04 19:51:15 +00:00
2 changed files with 85 additions and 22 deletions

View file

@ -60,6 +60,7 @@ type InitParameters struct {
clientRebalanceInterval time.Duration clientRebalanceInterval time.Duration
nodeParams []pool.NodeParam nodeParams []pool.NodeParam
dialOptions []grpc.DialOption dialOptions []grpc.DialOption
maxRequestAttempts int
} }
// Pool represents virtual connection to the FrostFS tree services network to communicate // Pool represents virtual connection to the FrostFS tree services network to communicate
@ -78,6 +79,8 @@ type Pool struct {
dialOptions []grpc.DialOption dialOptions []grpc.DialOption
logger *zap.Logger logger *zap.Logger
maxRequestAttempts int
startIndicesMtx sync.RWMutex startIndicesMtx sync.RWMutex
// startIndices points to the client from which the next request will be executed. // startIndices points to the client from which the next request will be executed.
// Since clients are stored in innerPool field we have to use two indices. // Since clients are stored in innerPool field we have to use two indices.
@ -177,6 +180,7 @@ func NewPool(options InitParameters) (*Pool, error) {
nodeRequestTimeout: options.healthcheckTimeout, nodeRequestTimeout: options.healthcheckTimeout,
clientRebalanceInterval: options.clientRebalanceInterval, clientRebalanceInterval: options.clientRebalanceInterval,
}, },
maxRequestAttempts: options.maxRequestAttempts,
} }
return p, nil return p, nil
@ -268,6 +272,12 @@ func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
x.dialOptions = opts x.dialOptions = opts
} }
// SetMaxRequestAttempts sets the max attempt to make successful request.
// Default value is 0 that means the number of attempts equals to number of nodes in pool.
func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
x.maxRequestAttempts = maxAttempts
}
// GetNodes invokes eponymous method from TreeServiceClient. // GetNodes invokes eponymous method from TreeServiceClient.
// //
// Can return predefined errors: // Can return predefined errors:
@ -297,7 +307,7 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
} }
var resp *grpcService.GetNodeByPathResponse var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request) resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty. // Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync. // Empty result is expected due to delayed tree service sync.
@ -394,7 +404,7 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
} }
var cli grpcService.TreeService_GetSubTreeClient var cli grpcService.TreeService_GetSubTreeClient
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request) cli, inErr = client.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr) return handleError("failed to get sub tree client", inErr)
}); err != nil { }); err != nil {
@ -429,7 +439,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
} }
var resp *grpcService.AddResponse var resp *grpcService.AddResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.Add(ctx, request) resp, inErr = client.Add(ctx, request)
return handleError("failed to add node", inErr) return handleError("failed to add node", inErr)
}); err != nil { }); err != nil {
@ -466,7 +476,7 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
} }
var resp *grpcService.AddByPathResponse var resp *grpcService.AddByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request) resp, inErr = client.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr) return handleError("failed to add node by path", inErr)
}); err != nil { }); err != nil {
@ -510,7 +520,7 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
return err return err
} }
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil { if _, err := client.Move(ctx, request); err != nil {
return handleError("failed to move node", err) return handleError("failed to move node", err)
} }
@ -541,7 +551,7 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
return err return err
} }
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil { if _, err := client.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err) return handleError("failed to remove node", err)
} }
@ -628,6 +638,10 @@ func fillDefaultInitParams(params *InitParameters) {
if params.nodeStreamTimeout <= 0 { if params.nodeStreamTimeout <= 0 {
params.nodeStreamTimeout = defaultStreamTimeout params.nodeStreamTimeout = defaultStreamTimeout
} }
if params.maxRequestAttempts <= 0 {
params.maxRequestAttempts = len(params.nodeParams)
}
} }
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) { func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
@ -728,19 +742,33 @@ func (p *Pool) setStartIndices(i, j int) {
p.startIndicesMtx.Unlock() p.startIndicesMtx.Unlock()
} }
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error { func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error {
var ( var (
err, finErr error err, finErr error
cl grpcService.TreeServiceClient cl grpcService.TreeServiceClient
) )
reqID := GetRequestID(ctx)
startI, startJ := p.getStartIndices() startI, startJ := p.getStartIndices()
groupsLen := len(p.innerPools) groupsLen := len(p.innerPools)
attempts := p.maxRequestAttempts
LOOP:
for i := startI; i < startI+groupsLen; i++ { for i := startI; i < startI+groupsLen; i++ {
indexI := i % groupsLen indexI := i % groupsLen
clientsLen := len(p.innerPools[indexI].clients) clientsLen := len(p.innerPools[indexI].clients)
for j := startJ; j < startJ+clientsLen; j++ { for j := startJ; j < startJ+clientsLen; j++ {
indexJ := j % clientsLen indexJ := j % clientsLen
if attempts == 0 {
if startI != indexI || startJ != indexJ {
p.setStartIndices(indexI, indexJ)
}
break LOOP
}
attempts--
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil { if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
err = fn(cl) err = fn(cl)
} }
@ -750,8 +778,10 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
} }
return err return err
} }
finErr = finalError(finErr, err) finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
} }
startJ = 0 startJ = 0
} }
@ -791,3 +821,17 @@ func finalError(current, candidate error) error {
return current return current
} }
type reqKeyType struct{}
// SetRequestID sets request identifier to context so when some operations are logged in tree pool
fyrchik marked this conversation as resolved Outdated

For single-value context keys we can use typed empty struct type reqKeyType struct{}
I belive it is faster, because we don't need to compare string values
Go compiler uses this appoach a lot, e.g. 9d836d41d0/src/net/lookup.go (L312)

For single-value context keys we can use typed empty struct `type reqKeyType struct{}` I belive it is faster, because we don't need to compare string values Go compiler uses this appoach a lot, e.g. https://github.com/golang/go/blob/9d836d41d0d9df3acabf7f9607d3b09188a9bfc6/src/net/lookup.go#L312

How about WithRequestID (similar to WithValue)?

How about `WithRequestID` (similar to `WithValue`)?

And if it is public we need some comments about what it affects.

And if it is public we need some comments about what it affects.

How about WithRequestID (similar to WithValue)?

I would prefer to have paired names GetRequestID/SetRequestID

> How about `WithRequestID` (similar to `WithValue`)? I would prefer to have paired names `GetRequestID`/`SetRequestID`
// this identifier also be logged.
func SetRequestID(ctx context.Context, reqID string) context.Context {
return context.WithValue(ctx, reqKeyType{}, reqID)
}
// GetRequestID fetch tree pool request identifier from context.
func GetRequestID(ctx context.Context) string {
reqID, _ := ctx.Value(reqKeyType{}).(string)
return reqID
}

View file

@ -77,14 +77,21 @@ func TestHandleError(t *testing.T) {
} }
func TestRetry(t *testing.T) { func TestRetry(t *testing.T) {
ctx := context.Background()
nodes := [][]string{ nodes := [][]string{
{"node00", "node01", "node02", "node03"}, {"node00", "node01", "node02", "node03"},
{"node10", "node11", "node12", "node13"}, {"node10", "node11", "node12", "node13"},
} }
var lenNodes int
for i := range nodes {
lenNodes += len(nodes[i])
}
p := &Pool{ p := &Pool{
logger: zaptest.NewLogger(t), logger: zaptest.NewLogger(t),
innerPools: makeInnerPool(nodes), innerPools: makeInnerPool(nodes),
maxRequestAttempts: lenNodes,
} }
makeFn := func(client grpcService.TreeServiceClient) error { makeFn := func(client grpcService.TreeServiceClient) error {
@ -92,14 +99,14 @@ func TestRetry(t *testing.T) {
} }
t.Run("first ok", func(t *testing.T) { t.Run("first ok", func(t *testing.T) {
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
t.Run("first failed", func(t *testing.T) { t.Run("first failed", func(t *testing.T) {
setErrors(p, "node00") setErrors(p, "node00")
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 1) checkIndicesAndReset(t, p, 0, 1)
}) })
@ -107,7 +114,7 @@ func TestRetry(t *testing.T) {
t.Run("all failed", func(t *testing.T) { t.Run("all failed", func(t *testing.T) {
setErrors(p, nodes[0]...) setErrors(p, nodes[0]...)
setErrors(p, nodes[1]...) setErrors(p, nodes[1]...)
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.Error(t, err) require.Error(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
@ -115,13 +122,13 @@ func TestRetry(t *testing.T) {
t.Run("round", func(t *testing.T) { t.Run("round", func(t *testing.T) {
setErrors(p, nodes[0][0], nodes[0][1]) setErrors(p, nodes[0][0], nodes[0][1])
setErrors(p, nodes[1]...) setErrors(p, nodes[1]...)
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndices(t, p, 0, 2) checkIndices(t, p, 0, 2)
resetClientsErrors(p) resetClientsErrors(p)
setErrors(p, nodes[0][2], nodes[0][3]) setErrors(p, nodes[0][2], nodes[0][3])
err = p.requestWithRetry(makeFn) err = p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
@ -129,14 +136,14 @@ func TestRetry(t *testing.T) {
t.Run("group switch", func(t *testing.T) { t.Run("group switch", func(t *testing.T) {
setErrors(p, nodes[0]...) setErrors(p, nodes[0]...)
setErrors(p, nodes[1][0]) setErrors(p, nodes[1][0])
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 1, 1) checkIndicesAndReset(t, p, 1, 1)
}) })
t.Run("group round", func(t *testing.T) { t.Run("group round", func(t *testing.T) {
setErrors(p, nodes[0][1:]...) setErrors(p, nodes[0][1:]...)
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
@ -144,7 +151,7 @@ func TestRetry(t *testing.T) {
t.Run("group round switch", func(t *testing.T) { t.Run("group round switch", func(t *testing.T) {
setErrors(p, nodes[0]...) setErrors(p, nodes[0]...)
p.setStartIndices(0, 1) p.setStartIndices(0, 1)
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 1, 0) checkIndicesAndReset(t, p, 1, 0)
}) })
@ -152,14 +159,14 @@ func TestRetry(t *testing.T) {
t.Run("no panic group switch", func(t *testing.T) { t.Run("no panic group switch", func(t *testing.T) {
setErrors(p, nodes[1]...) setErrors(p, nodes[1]...)
p.setStartIndices(1, 0) p.setStartIndices(1, 0)
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
t.Run("error empty result", func(t *testing.T) { t.Run("error empty result", func(t *testing.T) {
errNodes, index := 2, 0 errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if index < errNodes { if index < errNodes {
index++ index++
return errNodeEmptyResult return errNodeEmptyResult
@ -172,7 +179,7 @@ func TestRetry(t *testing.T) {
t.Run("error not found", func(t *testing.T) { t.Run("error not found", func(t *testing.T) {
errNodes, index := 2, 0 errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if index < errNodes { if index < errNodes {
index++ index++
return ErrNodeNotFound return ErrNodeNotFound
@ -185,7 +192,7 @@ func TestRetry(t *testing.T) {
t.Run("error access denied", func(t *testing.T) { t.Run("error access denied", func(t *testing.T) {
var index int var index int
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
index++ index++
return ErrNodeAccessDenied return ErrNodeAccessDenied
}) })
@ -193,6 +200,18 @@ func TestRetry(t *testing.T) {
require.Equal(t, 1, index) require.Equal(t, 1, index)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
t.Run("limit attempts", func(t *testing.T) {
oldVal := p.maxRequestAttempts
p.maxRequestAttempts = 2
setErrors(p, nodes[0]...)
setErrors(p, nodes[1]...)
err := p.requestWithRetry(ctx, makeFn)
require.Error(t, err)
checkIndicesAndReset(t, p, 0, 2)
p.maxRequestAttempts = oldVal
})
} }
func TestRebalance(t *testing.T) { func TestRebalance(t *testing.T) {