discovery: make pool management more reliable
Just drop excessive addresses, otherwise we can block for no good reason.
This commit is contained in:
parent
85f19936dd
commit
b4e284f301
1 changed files with 13 additions and 5 deletions
|
@ -55,9 +55,6 @@ func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery {
|
||||||
// BackFill implements the Discoverer interface and will backfill the
|
// BackFill implements the Discoverer interface and will backfill the
|
||||||
// the pool with the given addresses.
|
// the pool with the given addresses.
|
||||||
func (d *DefaultDiscovery) BackFill(addrs ...string) {
|
func (d *DefaultDiscovery) BackFill(addrs ...string) {
|
||||||
if len(d.pool) == maxPoolSize {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, addr := range addrs {
|
for _, addr := range addrs {
|
||||||
d.backFill <- addr
|
d.backFill <- addr
|
||||||
}
|
}
|
||||||
|
@ -68,6 +65,17 @@ func (d *DefaultDiscovery) PoolCount() int {
|
||||||
return len(d.pool)
|
return len(d.pool)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pushToPoolOrDrop tries to push address given into the pool, but if the pool
|
||||||
|
// is already full, it just drops it
|
||||||
|
func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) {
|
||||||
|
select {
|
||||||
|
case d.pool <- addr:
|
||||||
|
// ok, queued
|
||||||
|
default:
|
||||||
|
// whatever
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// RequestRemote will try to establish a connection with n nodes.
|
// RequestRemote will try to establish a connection with n nodes.
|
||||||
func (d *DefaultDiscovery) RequestRemote(n int) {
|
func (d *DefaultDiscovery) RequestRemote(n int) {
|
||||||
d.requestCh <- n
|
d.requestCh <- n
|
||||||
|
@ -145,12 +153,12 @@ func (d *DefaultDiscovery) run() {
|
||||||
if _, ok := d.addrs[addr]; !ok {
|
if _, ok := d.addrs[addr]; !ok {
|
||||||
d.addrs[addr] = true
|
d.addrs[addr] = true
|
||||||
d.unconnectedAddrs[addr] = connRetries
|
d.unconnectedAddrs[addr] = connRetries
|
||||||
d.pool <- addr
|
d.pushToPoolOrDrop(addr)
|
||||||
}
|
}
|
||||||
case addr := <-d.badAddrCh:
|
case addr := <-d.badAddrCh:
|
||||||
d.unconnectedAddrs[addr]--
|
d.unconnectedAddrs[addr]--
|
||||||
if d.unconnectedAddrs[addr] > 0 {
|
if d.unconnectedAddrs[addr] > 0 {
|
||||||
d.pool <- addr
|
d.pushToPoolOrDrop(addr)
|
||||||
} else {
|
} else {
|
||||||
d.badAddrs[addr] = true
|
d.badAddrs[addr] = true
|
||||||
delete(d.unconnectedAddrs, addr)
|
delete(d.unconnectedAddrs, addr)
|
||||||
|
|
Loading…
Reference in a new issue