From 7790684726e4ee5f19d4b68487f138d8e7abff6c Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Fri, 28 Feb 2020 20:05:25 +0300 Subject: [PATCH] refactoring connectionpool: add healthcheck, move getConnection and first call reBalance outside constructor --- pool.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/pool.go b/pool.go index 979fd93..7baa273 100644 --- a/pool.go +++ b/pool.go @@ -46,15 +46,18 @@ type ( nodes []*node keys []uint32 conns map[uint32][]*node + + unhealthy *atomic.Error } ) var ( + errBootstrapping = errors.New("bootstrapping") errEmptyConnection = errors.New("empty connection") errNoHealthyConnections = errors.New("no active connections") ) -func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) (*Pool, error) { +func newPool(l *zap.Logger, v *viper.Viper) *Pool { p := &Pool{ log: l, Mutex: new(sync.Mutex), @@ -74,6 +77,8 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) (*Pool, error) Timeout: defaultKeepaliveTimeout, PermitWithoutStream: true, }, + + unhealthy: atomic.NewError(errBootstrapping), } buf := make([]byte, 8) @@ -126,11 +131,7 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) (*Pool, error) zap.Uint32("weight", p.nodes[i].weight)) } - p.reBalance(ctx) - - _, err := p.getConnection(ctx) - - return p, err + return p } func (p *Pool) close() { @@ -150,7 +151,12 @@ func (p *Pool) close() { func (p *Pool) reBalance(ctx context.Context) { p.Lock() - defer p.Unlock() + defer func() { + p.Unlock() + + _, err := p.getConnection(ctx) + p.unhealthy.Store(err) + }() keys := make(map[uint32]struct{}) @@ -166,8 +172,9 @@ func (p *Pool) reBalance(ctx context.Context) { weight = p.nodes[i].weight ) - if ctx.Err() != nil { - p.log.Warn("something went wrong", zap.Error(ctx.Err())) + if err = ctx.Err(); err != nil { + p.log.Warn("something went wrong", zap.Error(err)) + p.unhealthy.Store(err) return }