refactoring connectionpool: add healthcheck, move getConnection and first call reBalance outside constructor
This commit is contained in:
parent
35e2378afc
commit
7790684726
1 changed files with 16 additions and 9 deletions
25
pool.go
25
pool.go
|
@ -46,15 +46,18 @@ type (
|
||||||
nodes []*node
|
nodes []*node
|
||||||
keys []uint32
|
keys []uint32
|
||||||
conns map[uint32][]*node
|
conns map[uint32][]*node
|
||||||
|
|
||||||
|
unhealthy *atomic.Error
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
errBootstrapping = errors.New("bootstrapping")
|
||||||
errEmptyConnection = errors.New("empty connection")
|
errEmptyConnection = errors.New("empty connection")
|
||||||
errNoHealthyConnections = errors.New("no active connections")
|
errNoHealthyConnections = errors.New("no active connections")
|
||||||
)
|
)
|
||||||
|
|
||||||
func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) (*Pool, error) {
|
func newPool(l *zap.Logger, v *viper.Viper) *Pool {
|
||||||
p := &Pool{
|
p := &Pool{
|
||||||
log: l,
|
log: l,
|
||||||
Mutex: new(sync.Mutex),
|
Mutex: new(sync.Mutex),
|
||||||
|
@ -74,6 +77,8 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) (*Pool, error)
|
||||||
Timeout: defaultKeepaliveTimeout,
|
Timeout: defaultKeepaliveTimeout,
|
||||||
PermitWithoutStream: true,
|
PermitWithoutStream: true,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
unhealthy: atomic.NewError(errBootstrapping),
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, 8)
|
buf := make([]byte, 8)
|
||||||
|
@ -126,11 +131,7 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) (*Pool, error)
|
||||||
zap.Uint32("weight", p.nodes[i].weight))
|
zap.Uint32("weight", p.nodes[i].weight))
|
||||||
}
|
}
|
||||||
|
|
||||||
p.reBalance(ctx)
|
return p
|
||||||
|
|
||||||
_, err := p.getConnection(ctx)
|
|
||||||
|
|
||||||
return p, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) close() {
|
func (p *Pool) close() {
|
||||||
|
@ -150,7 +151,12 @@ func (p *Pool) close() {
|
||||||
|
|
||||||
func (p *Pool) reBalance(ctx context.Context) {
|
func (p *Pool) reBalance(ctx context.Context) {
|
||||||
p.Lock()
|
p.Lock()
|
||||||
defer p.Unlock()
|
defer func() {
|
||||||
|
p.Unlock()
|
||||||
|
|
||||||
|
_, err := p.getConnection(ctx)
|
||||||
|
p.unhealthy.Store(err)
|
||||||
|
}()
|
||||||
|
|
||||||
keys := make(map[uint32]struct{})
|
keys := make(map[uint32]struct{})
|
||||||
|
|
||||||
|
@ -166,8 +172,9 @@ func (p *Pool) reBalance(ctx context.Context) {
|
||||||
weight = p.nodes[i].weight
|
weight = p.nodes[i].weight
|
||||||
)
|
)
|
||||||
|
|
||||||
if ctx.Err() != nil {
|
if err = ctx.Err(); err != nil {
|
||||||
p.log.Warn("something went wrong", zap.Error(ctx.Err()))
|
p.log.Warn("something went wrong", zap.Error(err))
|
||||||
|
p.unhealthy.Store(err)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue