[#191] pool/tree: Support request id

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-10-10 16:59:27 +03:00
parent 6fbe1595cb
commit 8999d2f080
2 changed files with 37 additions and 20 deletions

View file

@ -297,7 +297,7 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
} }
var resp *grpcService.GetNodeByPathResponse var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request) resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty. // Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync. // Empty result is expected due to delayed tree service sync.
@ -394,7 +394,7 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
} }
var cli grpcService.TreeService_GetSubTreeClient var cli grpcService.TreeService_GetSubTreeClient
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request) cli, inErr = client.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr) return handleError("failed to get sub tree client", inErr)
}); err != nil { }); err != nil {
@ -429,7 +429,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
} }
var resp *grpcService.AddResponse var resp *grpcService.AddResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.Add(ctx, request) resp, inErr = client.Add(ctx, request)
return handleError("failed to add node", inErr) return handleError("failed to add node", inErr)
}); err != nil { }); err != nil {
@ -466,7 +466,7 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
} }
var resp *grpcService.AddByPathResponse var resp *grpcService.AddByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request) resp, inErr = client.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr) return handleError("failed to add node by path", inErr)
}); err != nil { }); err != nil {
@ -510,7 +510,7 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
return err return err
} }
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil { if _, err := client.Move(ctx, request); err != nil {
return handleError("failed to move node", err) return handleError("failed to move node", err)
} }
@ -541,7 +541,7 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
return err return err
} }
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil { if _, err := client.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err) return handleError("failed to remove node", err)
} }
@ -728,12 +728,14 @@ func (p *Pool) setStartIndices(i, j int) {
p.startIndicesMtx.Unlock() p.startIndicesMtx.Unlock()
} }
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error { func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error {
var ( var (
err, finErr error err, finErr error
cl grpcService.TreeServiceClient cl grpcService.TreeServiceClient
) )
reqID := GetRequestID(ctx)
startI, startJ := p.getStartIndices() startI, startJ := p.getStartIndices()
groupsLen := len(p.innerPools) groupsLen := len(p.innerPools)
for i := startI; i < startI+groupsLen; i++ { for i := startI; i < startI+groupsLen; i++ {
@ -751,7 +753,7 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
return err return err
} }
finErr = finalError(finErr, err) finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
} }
startJ = 0 startJ = 0
} }
@ -791,3 +793,17 @@ func finalError(current, candidate error) error {
return current return current
} }
type reqKeyType struct{}
// SetRequestID sets request identifier to context so when some operations are logged in tree pool
// this identifier also be logged.
func SetRequestID(ctx context.Context, reqID string) context.Context {
return context.WithValue(ctx, reqKeyType{}, reqID)
}
// GetRequestID fetch tree pool request identifier from context.
func GetRequestID(ctx context.Context) string {
reqID, _ := ctx.Value(reqKeyType{}).(string)
return reqID
}

View file

@ -77,6 +77,7 @@ func TestHandleError(t *testing.T) {
} }
func TestRetry(t *testing.T) { func TestRetry(t *testing.T) {
ctx := context.Background()
nodes := [][]string{ nodes := [][]string{
{"node00", "node01", "node02", "node03"}, {"node00", "node01", "node02", "node03"},
{"node10", "node11", "node12", "node13"}, {"node10", "node11", "node12", "node13"},
@ -92,14 +93,14 @@ func TestRetry(t *testing.T) {
} }
t.Run("first ok", func(t *testing.T) { t.Run("first ok", func(t *testing.T) {
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
t.Run("first failed", func(t *testing.T) { t.Run("first failed", func(t *testing.T) {
setErrors(p, "node00") setErrors(p, "node00")
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 1) checkIndicesAndReset(t, p, 0, 1)
}) })
@ -107,7 +108,7 @@ func TestRetry(t *testing.T) {
t.Run("all failed", func(t *testing.T) { t.Run("all failed", func(t *testing.T) {
setErrors(p, nodes[0]...) setErrors(p, nodes[0]...)
setErrors(p, nodes[1]...) setErrors(p, nodes[1]...)
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.Error(t, err) require.Error(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
@ -115,13 +116,13 @@ func TestRetry(t *testing.T) {
t.Run("round", func(t *testing.T) { t.Run("round", func(t *testing.T) {
setErrors(p, nodes[0][0], nodes[0][1]) setErrors(p, nodes[0][0], nodes[0][1])
setErrors(p, nodes[1]...) setErrors(p, nodes[1]...)
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndices(t, p, 0, 2) checkIndices(t, p, 0, 2)
resetClientsErrors(p) resetClientsErrors(p)
setErrors(p, nodes[0][2], nodes[0][3]) setErrors(p, nodes[0][2], nodes[0][3])
err = p.requestWithRetry(makeFn) err = p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
@ -129,14 +130,14 @@ func TestRetry(t *testing.T) {
t.Run("group switch", func(t *testing.T) { t.Run("group switch", func(t *testing.T) {
setErrors(p, nodes[0]...) setErrors(p, nodes[0]...)
setErrors(p, nodes[1][0]) setErrors(p, nodes[1][0])
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 1, 1) checkIndicesAndReset(t, p, 1, 1)
}) })
t.Run("group round", func(t *testing.T) { t.Run("group round", func(t *testing.T) {
setErrors(p, nodes[0][1:]...) setErrors(p, nodes[0][1:]...)
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
@ -144,7 +145,7 @@ func TestRetry(t *testing.T) {
t.Run("group round switch", func(t *testing.T) { t.Run("group round switch", func(t *testing.T) {
setErrors(p, nodes[0]...) setErrors(p, nodes[0]...)
p.setStartIndices(0, 1) p.setStartIndices(0, 1)
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 1, 0) checkIndicesAndReset(t, p, 1, 0)
}) })
@ -152,14 +153,14 @@ func TestRetry(t *testing.T) {
t.Run("no panic group switch", func(t *testing.T) { t.Run("no panic group switch", func(t *testing.T) {
setErrors(p, nodes[1]...) setErrors(p, nodes[1]...)
p.setStartIndices(1, 0) p.setStartIndices(1, 0)
err := p.requestWithRetry(makeFn) err := p.requestWithRetry(ctx, makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
t.Run("error empty result", func(t *testing.T) { t.Run("error empty result", func(t *testing.T) {
errNodes, index := 2, 0 errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if index < errNodes { if index < errNodes {
index++ index++
return errNodeEmptyResult return errNodeEmptyResult
@ -172,7 +173,7 @@ func TestRetry(t *testing.T) {
t.Run("error not found", func(t *testing.T) { t.Run("error not found", func(t *testing.T) {
errNodes, index := 2, 0 errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if index < errNodes { if index < errNodes {
index++ index++
return ErrNodeNotFound return ErrNodeNotFound
@ -185,7 +186,7 @@ func TestRetry(t *testing.T) {
t.Run("error access denied", func(t *testing.T) { t.Run("error access denied", func(t *testing.T) {
var index int var index int
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
index++ index++
return ErrNodeAccessDenied return ErrNodeAccessDenied
}) })