[#172] pool: Use priority of errors in tree pool

When retry happens, use priority map to decide
which error to return. Consider network errors
less desirable than business logic errors.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
Alexey Vanin 2023-10-10 11:25:57 +03:00
parent eb5288f4a5
commit fc4551b843
2 changed files with 40 additions and 4 deletions

View file

@ -3,6 +3,7 @@ package tree
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors"
"fmt" "fmt"
"sync" "sync"
@ -22,6 +23,11 @@ type treeClient struct {
healthy bool healthy bool
} }
var (
// ErrUnhealthyEndpoint is returned when client in the pool considered unavailable.
ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
)
// newTreeClient creates new tree client with auto dial. // newTreeClient creates new tree client with auto dial.
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient { func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
return &treeClient{ return &treeClient{
@ -102,7 +108,7 @@ func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
defer c.mu.RUnlock() defer c.mu.RUnlock()
if c.conn == nil || !c.healthy { if c.conn == nil || !c.healthy {
return nil, fmt.Errorf("unhealthy endpoint: '%s'", c.address) return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
} }
return c.service, nil return c.service, nil

View file

@ -730,7 +730,7 @@ func (p *Pool) setStartIndices(i, j int) {
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error { func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
var ( var (
err error err, finErr error
cl grpcService.TreeServiceClient cl grpcService.TreeServiceClient
) )
@ -750,14 +750,44 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
} }
return err return err
} }
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
} }
startJ = 0 startJ = 0
} }
return err return finErr
} }
func shouldTryAgain(err error) bool { func shouldTryAgain(err error) bool {
return !(err == nil || errors.Is(err, ErrNodeAccessDenied)) return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
} }
func prioErr(err error) int {
switch {
case err == nil:
return -1
case errors.Is(err, ErrNodeAccessDenied):
return 100
case errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, errNodeEmptyResult):
return 200
case errors.Is(err, ErrUnhealthyEndpoint):
return 300
default:
return 500
}
}
func finalError(current, candidate error) error {
if current == nil || candidate == nil {
return candidate
}
// lower priority error is more desirable
if prioErr(candidate) < prioErr(current) {
return candidate
}
return current
}