diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 9a6b96b23..f33e98980 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -49,6 +49,7 @@ type DefaultDiscovery struct { goodAddrs map[string]capability.Capabilities unconnectedAddrs map[string]int attempted map[string]bool + outstanding int32 optimalFanOut int32 networkSize int32 requestCh chan int @@ -119,6 +120,8 @@ func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) { // RequestRemote tries to establish a connection with n nodes. func (d *DefaultDiscovery) RequestRemote(requested int) { + outstanding := int(atomic.LoadInt32(&d.outstanding)) + requested -= outstanding for ; requested > 0; requested-- { var nextAddr string d.lock.Lock() @@ -146,6 +149,7 @@ func (d *DefaultDiscovery) RequestRemote(requested int) { } d.attempted[nextAddr] = true d.lock.Unlock() + atomic.AddInt32(&d.outstanding, 1) go d.tryAddress(nextAddr) } } @@ -291,6 +295,7 @@ func (d *DefaultDiscovery) updateNetSize() { func (d *DefaultDiscovery) tryAddress(addr string) { p, err := d.transport.Dial(addr, d.dialTimeout) + atomic.AddInt32(&d.outstanding, -1) d.lock.Lock() delete(d.attempted, addr) if err == nil {