Merge pull request #2020 from nspcc-dev/fix-infinite-loop-in-discoverer
Fix infinite loop in discoverer
This commit is contained in:
commit
0888dda6a2
2 changed files with 19 additions and 7 deletions
|
@ -37,6 +37,7 @@ type DefaultDiscovery struct {
|
||||||
connectedAddrs map[string]bool
|
connectedAddrs map[string]bool
|
||||||
goodAddrs map[string]bool
|
goodAddrs map[string]bool
|
||||||
unconnectedAddrs map[string]int
|
unconnectedAddrs map[string]int
|
||||||
|
attempted map[string]bool
|
||||||
isDead bool
|
isDead bool
|
||||||
requestCh chan int
|
requestCh chan int
|
||||||
pool chan string
|
pool chan string
|
||||||
|
@ -52,6 +53,7 @@ func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *Defa
|
||||||
connectedAddrs: make(map[string]bool),
|
connectedAddrs: make(map[string]bool),
|
||||||
goodAddrs: make(map[string]bool),
|
goodAddrs: make(map[string]bool),
|
||||||
unconnectedAddrs: make(map[string]int),
|
unconnectedAddrs: make(map[string]int),
|
||||||
|
attempted: make(map[string]bool),
|
||||||
requestCh: make(chan int),
|
requestCh: make(chan int),
|
||||||
pool: make(chan string, maxPoolSize),
|
pool: make(chan string, maxPoolSize),
|
||||||
}
|
}
|
||||||
|
@ -174,11 +176,13 @@ func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DefaultDiscovery) tryAddress(addr string) {
|
func (d *DefaultDiscovery) tryAddress(addr string) {
|
||||||
if err := d.transport.Dial(addr, d.dialTimeout); err != nil {
|
err := d.transport.Dial(addr, d.dialTimeout)
|
||||||
|
d.lock.Lock()
|
||||||
|
delete(d.attempted, addr)
|
||||||
|
d.lock.Unlock()
|
||||||
|
if err != nil {
|
||||||
d.RegisterBadAddr(addr)
|
d.RegisterBadAddr(addr)
|
||||||
d.RequestRemote(1)
|
d.RequestRemote(1)
|
||||||
} else {
|
|
||||||
d.RegisterConnectedAddr(addr)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,24 +216,31 @@ func (d *DefaultDiscovery) run() {
|
||||||
requested = r
|
requested = r
|
||||||
}
|
}
|
||||||
case addr := <-d.pool:
|
case addr := <-d.pool:
|
||||||
d.lock.RLock()
|
|
||||||
addrIsConnected := d.connectedAddrs[addr]
|
|
||||||
d.lock.RUnlock()
|
|
||||||
updatePoolCountMetric(d.PoolCount())
|
updatePoolCountMetric(d.PoolCount())
|
||||||
if !addrIsConnected {
|
d.lock.Lock()
|
||||||
|
if !d.connectedAddrs[addr] && !d.attempted[addr] {
|
||||||
|
d.attempted[addr] = true
|
||||||
go d.tryAddress(addr)
|
go d.tryAddress(addr)
|
||||||
requested--
|
requested--
|
||||||
}
|
}
|
||||||
|
d.lock.Unlock()
|
||||||
default: // Empty pool
|
default: // Empty pool
|
||||||
|
var added int
|
||||||
d.lock.Lock()
|
d.lock.Lock()
|
||||||
for _, addr := range d.seeds {
|
for _, addr := range d.seeds {
|
||||||
if !d.connectedAddrs[addr] {
|
if !d.connectedAddrs[addr] {
|
||||||
delete(d.badAddrs, addr)
|
delete(d.badAddrs, addr)
|
||||||
d.unconnectedAddrs[addr] = connRetries
|
d.unconnectedAddrs[addr] = connRetries
|
||||||
d.pushToPoolOrDrop(addr)
|
d.pushToPoolOrDrop(addr)
|
||||||
|
added++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
d.lock.Unlock()
|
d.lock.Unlock()
|
||||||
|
// The pool is empty, but all seed nodes are already connected,
|
||||||
|
// we can end up in an infinite loop here, so drop the request.
|
||||||
|
if added == 0 {
|
||||||
|
requested = 0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -59,6 +59,7 @@ func TestDefaultDiscoverer(t *testing.T) {
|
||||||
select {
|
select {
|
||||||
case a := <-ts.dialCh:
|
case a := <-ts.dialCh:
|
||||||
dialled = append(dialled, a)
|
dialled = append(dialled, a)
|
||||||
|
d.RegisterConnectedAddr(a)
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
t.Fatalf("timeout expecting for transport dial")
|
t.Fatalf("timeout expecting for transport dial")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue