refactor reBalance and isAlive

This commit is contained in:
Evgeniy Kulikov 2020-02-14 13:05:51 +03:00
parent 17768b5bfe
commit 01e152774f
No known key found for this signature in database
GPG key ID: BF6AEE0A2A699BF2

81
pool.go
View file

@ -30,8 +30,8 @@ type (
Pool struct { Pool struct {
log *zap.Logger log *zap.Logger
ctl time.Duration connectTimeout time.Duration
opts keepalive.ClientParameters opts keepalive.ClientParameters
cur *grpc.ClientConn cur *grpc.ClientConn
@ -42,7 +42,10 @@ type (
} }
) )
var errNoHealthyConnections = errors.New("no active connections") var (
errEmptyConnection = errors.New("empty connection")
errNoHealthyConnections = errors.New("no active connections")
)
func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool { func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool {
p := &Pool{ p := &Pool{
@ -53,7 +56,7 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool {
conns: make(map[uint32][]*grpc.ClientConn), conns: make(map[uint32][]*grpc.ClientConn),
// fill with defaults: // fill with defaults:
ctl: time.Second * 15, connectTimeout: time.Second * 15,
opts: keepalive.ClientParameters{ opts: keepalive.ClientParameters{
Time: time.Second * 10, Time: time.Second * 10,
Timeout: time.Minute * 5, Timeout: time.Minute * 5,
@ -70,7 +73,7 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool {
l.Info("used random seed", zap.Uint64("seed", seed)) l.Info("used random seed", zap.Uint64("seed", seed))
if val := v.GetDuration("connect_timeout"); val > 0 { if val := v.GetDuration("connect_timeout"); val > 0 {
p.ctl = val p.connectTimeout = val
} }
if val := v.GetDuration("keepalive.time"); val > 0 { if val := v.GetDuration("keepalive.time"); val > 0 {
@ -138,11 +141,14 @@ func (p *Pool) reBalance(ctx context.Context) {
keys := make(map[uint32]struct{}) keys := make(map[uint32]struct{})
p.log.Info("re-balancing connections")
for i := range p.nodes { for i := range p.nodes {
var ( var (
idx = -1 idx = -1
exists bool exists bool
err error err error
start = time.Now()
conn = p.nodes[i].conn conn = p.nodes[i].conn
weight = p.nodes[i].weight weight = p.nodes[i].weight
) )
@ -151,7 +157,7 @@ func (p *Pool) reBalance(ctx context.Context) {
p.log.Warn("empty connection, try to connect", p.log.Warn("empty connection, try to connect",
zap.String("address", p.nodes[i].address)) zap.String("address", p.nodes[i].address))
ctx, cancel := context.WithTimeout(ctx, p.ctl) ctx, cancel := context.WithTimeout(ctx, p.connectTimeout)
conn, err = grpc.DialContext(ctx, p.nodes[i].address, conn, err = grpc.DialContext(ctx, p.nodes[i].address,
grpc.WithBlock(), grpc.WithBlock(),
grpc.WithInsecure(), grpc.WithInsecure(),
@ -161,6 +167,7 @@ func (p *Pool) reBalance(ctx context.Context) {
if err != nil || conn == nil { if err != nil || conn == nil {
p.log.Warn("skip, could not connect to node", p.log.Warn("skip, could not connect to node",
zap.String("address", p.nodes[i].address), zap.String("address", p.nodes[i].address),
zap.Duration("elapsed", time.Since(start)),
zap.Error(err)) zap.Error(err))
continue continue
} }
@ -169,43 +176,41 @@ func (p *Pool) reBalance(ctx context.Context) {
p.log.Info("connected to node", zap.String("address", p.nodes[i].address)) p.log.Info("connected to node", zap.String("address", p.nodes[i].address))
} }
switch st := conn.GetState(); st { for j := range p.conns[weight] {
case connectivity.Idle, connectivity.Ready, connectivity.Connecting: if p.conns[weight][j] == conn {
keys[weight] = struct{}{} idx = j
exists = true
for j := range p.conns[weight] { break
if p.conns[weight][j] == conn {
exists = true
break
}
} }
}
if !exists { // if something wrong with connection (bad state or unhealthy), try to close it and remove
p.conns[weight] = append(p.conns[weight], conn) if err = isAlive(ctx, p.log, conn); err != nil {
} p.log.Warn("connection not alive",
zap.String("address", p.nodes[i].address),
// Problems with connection, try to remove from : zap.Error(err))
default:
for j := range p.conns[weight] {
if p.conns[weight][j] == conn {
idx = j
exists = true
break
}
}
if exists { if exists {
// remove from connections // remove from connections
p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...) p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...)
} }
if err := conn.Close(); err != nil { if err = conn.Close(); err != nil {
p.log.Warn("could not close bad connection", zap.Error(err)) p.log.Warn("could not close bad connection",
zap.String("address", p.nodes[i].address),
zap.Error(err))
} }
if p.nodes[i].conn != nil { if p.nodes[i].conn != nil {
p.nodes[i].conn = nil p.nodes[i].conn = nil
} }
continue
}
keys[weight] = struct{}{}
if !exists {
p.conns[weight] = append(p.conns[weight], conn)
} }
} }
@ -223,7 +228,7 @@ func (p *Pool) getConnection(ctx context.Context) (*grpc.ClientConn, error) {
p.RLock() p.RLock()
defer p.RUnlock() defer p.RUnlock()
if p.cur != nil && isAlive(ctx, p.log, p.cur) { if err := isAlive(ctx, p.log, p.cur); err == nil {
return p.cur, nil return p.cur, nil
} }
@ -244,7 +249,11 @@ func (p *Pool) getConnection(ctx context.Context) (*grpc.ClientConn, error) {
return nil, errNoHealthyConnections return nil, errNoHealthyConnections
} }
func isAlive(ctx context.Context, log *zap.Logger, cur *grpc.ClientConn) bool { func isAlive(ctx context.Context, log *zap.Logger, cur *grpc.ClientConn) error {
if cur == nil {
return errEmptyConnection
}
switch st := cur.GetState(); st { switch st := cur.GetState(); st {
case connectivity.Idle, connectivity.Ready, connectivity.Connecting: case connectivity.Idle, connectivity.Ready, connectivity.Connecting:
req := new(state.HealthRequest) req := new(state.HealthRequest)
@ -254,11 +263,13 @@ func isAlive(ctx context.Context, log *zap.Logger, cur *grpc.ClientConn) bool {
if err != nil { if err != nil {
log.Warn("could not fetch health-check", zap.Error(err)) log.Warn("could not fetch health-check", zap.Error(err))
return false return err
} else if !res.Healthy {
return errors.New(res.Status)
} }
return res.Healthy return nil
default: default:
return false return errors.New(st.String())
} }
} }