Merge pull request #691 from nspcc-dev/network-conn-overflow-and-shutdown

Network conn overflow and shutdown
This commit is contained in:
Roman Khimov 2020-02-28 18:16:29 +03:00 committed by GitHub
commit 36197056b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 38 additions and 9 deletions

View file

@ -14,6 +14,7 @@ const (
// a healthy connection pool. // a healthy connection pool.
type Discoverer interface { type Discoverer interface {
BackFill(...string) BackFill(...string)
Close()
PoolCount() int PoolCount() int
RequestRemote(int) RequestRemote(int)
RegisterBadAddr(string) RegisterBadAddr(string)
@ -34,6 +35,7 @@ type DefaultDiscovery struct {
connectedAddrs map[string]bool connectedAddrs map[string]bool
goodAddrs map[string]bool goodAddrs map[string]bool
unconnectedAddrs map[string]int unconnectedAddrs map[string]int
isDead bool
requestCh chan int requestCh chan int
pool chan string pool chan string
} }
@ -88,7 +90,11 @@ func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) {
// RequestRemote tries to establish a connection with n nodes. // RequestRemote tries to establish a connection with n nodes.
func (d *DefaultDiscovery) RequestRemote(n int) { func (d *DefaultDiscovery) RequestRemote(n int) {
d.requestCh <- n d.lock.RLock()
if !d.isDead {
d.requestCh <- n
}
d.lock.RUnlock()
} }
// RegisterBadAddr registers the given address as a bad address. // RegisterBadAddr registers the given address as a bad address.
@ -171,15 +177,28 @@ func (d *DefaultDiscovery) tryAddress(addr string) {
} }
} }
// Close stops discoverer pool processing making discoverer almost useless.
func (d *DefaultDiscovery) Close() {
d.lock.Lock()
d.isDead = true
d.lock.Unlock()
select {
case <-d.requestCh: // Drain the channel if there is anything there.
default:
}
close(d.requestCh)
}
// run is a goroutine that makes DefaultDiscovery process its queue to connect // run is a goroutine that makes DefaultDiscovery process its queue to connect
// to other nodes. // to other nodes.
func (d *DefaultDiscovery) run() { func (d *DefaultDiscovery) run() {
var requested int var requested, r int
var ok bool
for { for {
for requested = <-d.requestCh; requested > 0; requested-- { for requested, ok = <-d.requestCh; ok && requested > 0; requested-- {
select { select {
case r := <-d.requestCh: case r, ok = <-d.requestCh:
if requested <= r { if requested <= r {
requested = r + 1 requested = r + 1
} }
@ -193,5 +212,8 @@ func (d *DefaultDiscovery) run() {
} }
} }
} }
if !ok {
return
}
} }
} }

View file

@ -143,4 +143,8 @@ func TestDefaultDiscoverer(t *testing.T) {
assert.Equal(t, len(set1), len(d.BadPeers())) assert.Equal(t, len(set1), len(d.BadPeers()))
assert.Equal(t, len(set1), len(d.GoodPeers())) assert.Equal(t, len(set1), len(d.GoodPeers()))
require.Equal(t, 0, d.PoolCount()) require.Equal(t, 0, d.PoolCount())
// Close should work and subsequent RequestRemote is a no-op.
d.Close()
d.RequestRemote(42)
} }

View file

@ -140,6 +140,7 @@ func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error {
type testDiscovery struct{} type testDiscovery struct{}
func (d testDiscovery) BackFill(addrs ...string) {} func (d testDiscovery) BackFill(addrs ...string) {}
func (d testDiscovery) Close() {}
func (d testDiscovery) PoolCount() int { return 0 } func (d testDiscovery) PoolCount() int { return 0 }
func (d testDiscovery) RegisterBadAddr(string) {} func (d testDiscovery) RegisterBadAddr(string) {}
func (d testDiscovery) RegisterGoodAddr(string) {} func (d testDiscovery) RegisterGoodAddr(string) {}

View file

@ -183,6 +183,11 @@ func (s *Server) Start(errChan chan error) {
// Shutdown disconnects all peers and stops listening. // Shutdown disconnects all peers and stops listening.
func (s *Server) Shutdown() { func (s *Server) Shutdown() {
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
s.transport.Close()
s.discovery.Close()
for p := range s.peers {
p.Disconnect(errServerShutdown)
}
s.bQueue.discard() s.bQueue.discard()
close(s.quit) close(s.quit)
} }
@ -224,10 +229,6 @@ func (s *Server) run() {
} }
select { select {
case <-s.quit: case <-s.quit:
s.transport.Close()
for p := range s.peers {
p.Disconnect(errServerShutdown)
}
return return
case p := <-s.register: case p := <-s.register:
s.lock.Lock() s.lock.Lock()
@ -239,7 +240,8 @@ func (s *Server) run() {
s.lock.RLock() s.lock.RLock()
// Pick a random peer and drop connection to it. // Pick a random peer and drop connection to it.
for peer := range s.peers { for peer := range s.peers {
peer.Disconnect(errMaxPeers) // It will send us unregister signal.
go peer.Disconnect(errMaxPeers)
break break
} }
s.lock.RUnlock() s.lock.RUnlock()