From 01e152774f0d3189df8c28661a1960508a6edee9 Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Fri, 14 Feb 2020 13:05:51 +0300 Subject: [PATCH] refactor reBalance and isAlive --- pool.go | 81 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/pool.go b/pool.go index e0c3eaa..d361881 100644 --- a/pool.go +++ b/pool.go @@ -30,8 +30,8 @@ type ( Pool struct { log *zap.Logger - ctl time.Duration - opts keepalive.ClientParameters + connectTimeout time.Duration + opts keepalive.ClientParameters cur *grpc.ClientConn @@ -42,7 +42,10 @@ type ( } ) -var errNoHealthyConnections = errors.New("no active connections") +var ( + errEmptyConnection = errors.New("empty connection") + errNoHealthyConnections = errors.New("no active connections") +) func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool { p := &Pool{ @@ -53,7 +56,7 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool { conns: make(map[uint32][]*grpc.ClientConn), // fill with defaults: - ctl: time.Second * 15, + connectTimeout: time.Second * 15, opts: keepalive.ClientParameters{ Time: time.Second * 10, Timeout: time.Minute * 5, @@ -70,7 +73,7 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool { l.Info("used random seed", zap.Uint64("seed", seed)) if val := v.GetDuration("connect_timeout"); val > 0 { - p.ctl = val + p.connectTimeout = val } if val := v.GetDuration("keepalive.time"); val > 0 { @@ -138,11 +141,14 @@ func (p *Pool) reBalance(ctx context.Context) { keys := make(map[uint32]struct{}) + p.log.Info("re-balancing connections") + for i := range p.nodes { var ( idx = -1 exists bool err error + start = time.Now() conn = p.nodes[i].conn weight = p.nodes[i].weight ) @@ -151,7 +157,7 @@ func (p *Pool) reBalance(ctx context.Context) { p.log.Warn("empty connection, try to connect", zap.String("address", p.nodes[i].address)) - ctx, cancel := context.WithTimeout(ctx, p.ctl) + ctx, cancel := context.WithTimeout(ctx, p.connectTimeout) conn, err = grpc.DialContext(ctx, p.nodes[i].address, grpc.WithBlock(), grpc.WithInsecure(), @@ -161,6 +167,7 @@ func (p *Pool) reBalance(ctx context.Context) { if err != nil || conn == nil { p.log.Warn("skip, could not connect to node", zap.String("address", p.nodes[i].address), + zap.Duration("elapsed", time.Since(start)), zap.Error(err)) continue } @@ -169,43 +176,41 @@ func (p *Pool) reBalance(ctx context.Context) { p.log.Info("connected to node", zap.String("address", p.nodes[i].address)) } - switch st := conn.GetState(); st { - case connectivity.Idle, connectivity.Ready, connectivity.Connecting: - keys[weight] = struct{}{} - - for j := range p.conns[weight] { - if p.conns[weight][j] == conn { - exists = true - break - } + for j := range p.conns[weight] { + if p.conns[weight][j] == conn { + idx = j + exists = true + break } + } - if !exists { - p.conns[weight] = append(p.conns[weight], conn) - } - - // Problems with connection, try to remove from : - default: - for j := range p.conns[weight] { - if p.conns[weight][j] == conn { - idx = j - exists = true - break - } - } + // if something wrong with connection (bad state or unhealthy), try to close it and remove + if err = isAlive(ctx, p.log, conn); err != nil { + p.log.Warn("connection not alive", + zap.String("address", p.nodes[i].address), + zap.Error(err)) if exists { // remove from connections p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...) } - if err := conn.Close(); err != nil { - p.log.Warn("could not close bad connection", zap.Error(err)) + if err = conn.Close(); err != nil { + p.log.Warn("could not close bad connection", + zap.String("address", p.nodes[i].address), + zap.Error(err)) } if p.nodes[i].conn != nil { p.nodes[i].conn = nil } + continue + } + + keys[weight] = struct{}{} + + if !exists { + p.conns[weight] = append(p.conns[weight], conn) } } @@ -223,7 +228,7 @@ func (p *Pool) getConnection(ctx context.Context) (*grpc.ClientConn, error) { p.RLock() defer p.RUnlock() - if p.cur != nil && isAlive(ctx, p.log, p.cur) { + if err := isAlive(ctx, p.log, p.cur); err == nil { return p.cur, nil } @@ -244,7 +249,11 @@ func (p *Pool) getConnection(ctx context.Context) (*grpc.ClientConn, error) { return nil, errNoHealthyConnections } -func isAlive(ctx context.Context, log *zap.Logger, cur *grpc.ClientConn) bool { +func isAlive(ctx context.Context, log *zap.Logger, cur *grpc.ClientConn) error { + if cur == nil { + return errEmptyConnection + } + switch st := cur.GetState(); st { case connectivity.Idle, connectivity.Ready, connectivity.Connecting: req := new(state.HealthRequest) @@ -254,11 +263,13 @@ func isAlive(ctx context.Context, log *zap.Logger, cur *grpc.ClientConn) bool { if err != nil { log.Warn("could not fetch health-check", zap.Error(err)) - return false + return err + } else if !res.Healthy { + return errors.New(res.Status) } - return res.Healthy + return nil default: - return false + return errors.New(st.String()) } }