diff --git a/pool/tree/pool.go b/pool/tree/pool.go index fc360d2..eb88a89 100644 --- a/pool/tree/pool.go +++ b/pool/tree/pool.go @@ -297,7 +297,7 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService } var resp *grpcService.GetNodeByPathResponse - if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { + if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { resp, inErr = client.GetNodeByPath(ctx, request) // Pool wants to do retry 'GetNodeByPath' request if result is empty. // Empty result is expected due to delayed tree service sync. @@ -394,7 +394,7 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe } var cli grpcService.TreeService_GetSubTreeClient - if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { + if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { cli, inErr = client.GetSubTree(ctx, request) return handleError("failed to get sub tree client", inErr) }); err != nil { @@ -429,7 +429,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { } var resp *grpcService.AddResponse - if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { + if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { resp, inErr = client.Add(ctx, request) return handleError("failed to add node", inErr) }); err != nil { @@ -466,7 +466,7 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint } var resp *grpcService.AddByPathResponse - if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { + if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { resp, inErr = client.AddByPath(ctx, request) return handleError("failed to add node by path", inErr) }); err != nil { @@ -510,7 +510,7 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { return err } - return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { + return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { if _, err := client.Move(ctx, request); err != nil { return handleError("failed to move node", err) } @@ -541,7 +541,7 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error { return err } - return p.requestWithRetry(func(client grpcService.TreeServiceClient) error { + return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { if _, err := client.Remove(ctx, request); err != nil { return handleError("failed to remove node", err) } @@ -728,12 +728,14 @@ func (p *Pool) setStartIndices(i, j int) { p.startIndicesMtx.Unlock() } -func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error { +func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error { var ( err, finErr error cl grpcService.TreeServiceClient ) + reqID := GetRequestID(ctx) + startI, startJ := p.getStartIndices() groupsLen := len(p.innerPools) for i := startI; i < startI+groupsLen; i++ { @@ -751,7 +753,7 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er return err } finErr = finalError(finErr, err) - p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) + p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) } startJ = 0 } @@ -791,3 +793,17 @@ func finalError(current, candidate error) error { return current } + +type reqKeyType struct{} + +// SetRequestID sets request identifier to context so when some operations are logged in tree pool +// this identifier also be logged. +func SetRequestID(ctx context.Context, reqID string) context.Context { + return context.WithValue(ctx, reqKeyType{}, reqID) +} + +// GetRequestID fetch tree pool request identifier from context. +func GetRequestID(ctx context.Context) string { + reqID, _ := ctx.Value(reqKeyType{}).(string) + return reqID +} diff --git a/pool/tree/pool_test.go b/pool/tree/pool_test.go index 2c616a7..19c0635 100644 --- a/pool/tree/pool_test.go +++ b/pool/tree/pool_test.go @@ -77,6 +77,7 @@ func TestHandleError(t *testing.T) { } func TestRetry(t *testing.T) { + ctx := context.Background() nodes := [][]string{ {"node00", "node01", "node02", "node03"}, {"node10", "node11", "node12", "node13"}, @@ -92,14 +93,14 @@ func TestRetry(t *testing.T) { } t.Run("first ok", func(t *testing.T) { - err := p.requestWithRetry(makeFn) + err := p.requestWithRetry(ctx, makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 0) }) t.Run("first failed", func(t *testing.T) { setErrors(p, "node00") - err := p.requestWithRetry(makeFn) + err := p.requestWithRetry(ctx, makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 1) }) @@ -107,7 +108,7 @@ func TestRetry(t *testing.T) { t.Run("all failed", func(t *testing.T) { setErrors(p, nodes[0]...) setErrors(p, nodes[1]...) - err := p.requestWithRetry(makeFn) + err := p.requestWithRetry(ctx, makeFn) require.Error(t, err) checkIndicesAndReset(t, p, 0, 0) }) @@ -115,13 +116,13 @@ func TestRetry(t *testing.T) { t.Run("round", func(t *testing.T) { setErrors(p, nodes[0][0], nodes[0][1]) setErrors(p, nodes[1]...) - err := p.requestWithRetry(makeFn) + err := p.requestWithRetry(ctx, makeFn) require.NoError(t, err) checkIndices(t, p, 0, 2) resetClientsErrors(p) setErrors(p, nodes[0][2], nodes[0][3]) - err = p.requestWithRetry(makeFn) + err = p.requestWithRetry(ctx, makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 0) }) @@ -129,14 +130,14 @@ func TestRetry(t *testing.T) { t.Run("group switch", func(t *testing.T) { setErrors(p, nodes[0]...) setErrors(p, nodes[1][0]) - err := p.requestWithRetry(makeFn) + err := p.requestWithRetry(ctx, makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 1, 1) }) t.Run("group round", func(t *testing.T) { setErrors(p, nodes[0][1:]...) - err := p.requestWithRetry(makeFn) + err := p.requestWithRetry(ctx, makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 0) }) @@ -144,7 +145,7 @@ func TestRetry(t *testing.T) { t.Run("group round switch", func(t *testing.T) { setErrors(p, nodes[0]...) p.setStartIndices(0, 1) - err := p.requestWithRetry(makeFn) + err := p.requestWithRetry(ctx, makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 1, 0) }) @@ -152,14 +153,14 @@ func TestRetry(t *testing.T) { t.Run("no panic group switch", func(t *testing.T) { setErrors(p, nodes[1]...) p.setStartIndices(1, 0) - err := p.requestWithRetry(makeFn) + err := p.requestWithRetry(ctx, makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 0) }) t.Run("error empty result", func(t *testing.T) { errNodes, index := 2, 0 - err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error { + err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { if index < errNodes { index++ return errNodeEmptyResult @@ -172,7 +173,7 @@ func TestRetry(t *testing.T) { t.Run("error not found", func(t *testing.T) { errNodes, index := 2, 0 - err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error { + err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { if index < errNodes { index++ return ErrNodeNotFound @@ -185,7 +186,7 @@ func TestRetry(t *testing.T) { t.Run("error access denied", func(t *testing.T) { var index int - err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error { + err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { index++ return ErrNodeAccessDenied })