network: add Close() to discoverer, shut it down on exit

This commit is contained in:
Roman Khimov 2020-02-24 15:39:31 +03:00
parent e2116e4c3f
commit 77624a8847
4 changed files with 32 additions and 4 deletions

View file

@ -14,6 +14,7 @@ const (
// a healthy connection pool. // a healthy connection pool.
type Discoverer interface { type Discoverer interface {
BackFill(...string) BackFill(...string)
Close()
PoolCount() int PoolCount() int
RequestRemote(int) RequestRemote(int)
RegisterBadAddr(string) RegisterBadAddr(string)
@ -34,6 +35,7 @@ type DefaultDiscovery struct {
connectedAddrs map[string]bool connectedAddrs map[string]bool
goodAddrs map[string]bool goodAddrs map[string]bool
unconnectedAddrs map[string]int unconnectedAddrs map[string]int
isDead bool
requestCh chan int requestCh chan int
pool chan string pool chan string
} }
@ -88,8 +90,12 @@ func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) {
// RequestRemote tries to establish a connection with n nodes. // RequestRemote tries to establish a connection with n nodes.
func (d *DefaultDiscovery) RequestRemote(n int) { func (d *DefaultDiscovery) RequestRemote(n int) {
d.lock.RLock()
if !d.isDead {
d.requestCh <- n d.requestCh <- n
} }
d.lock.RUnlock()
}
// RegisterBadAddr registers the given address as a bad address. // RegisterBadAddr registers the given address as a bad address.
func (d *DefaultDiscovery) RegisterBadAddr(addr string) { func (d *DefaultDiscovery) RegisterBadAddr(addr string) {
@ -171,15 +177,28 @@ func (d *DefaultDiscovery) tryAddress(addr string) {
} }
} }
// Close stops discoverer pool processing making discoverer almost useless.
func (d *DefaultDiscovery) Close() {
d.lock.Lock()
d.isDead = true
d.lock.Unlock()
select {
case <-d.requestCh: // Drain the channel if there is anything there.
default:
}
close(d.requestCh)
}
// run is a goroutine that makes DefaultDiscovery process its queue to connect // run is a goroutine that makes DefaultDiscovery process its queue to connect
// to other nodes. // to other nodes.
func (d *DefaultDiscovery) run() { func (d *DefaultDiscovery) run() {
var requested int var requested, r int
var ok bool
for { for {
for requested = <-d.requestCh; requested > 0; requested-- { for requested, ok = <-d.requestCh; ok && requested > 0; requested-- {
select { select {
case r := <-d.requestCh: case r, ok = <-d.requestCh:
if requested <= r { if requested <= r {
requested = r + 1 requested = r + 1
} }
@ -193,5 +212,8 @@ func (d *DefaultDiscovery) run() {
} }
} }
} }
if !ok {
return
}
} }
} }

View file

@ -143,4 +143,8 @@ func TestDefaultDiscoverer(t *testing.T) {
assert.Equal(t, len(set1), len(d.BadPeers())) assert.Equal(t, len(set1), len(d.BadPeers()))
assert.Equal(t, len(set1), len(d.GoodPeers())) assert.Equal(t, len(set1), len(d.GoodPeers()))
require.Equal(t, 0, d.PoolCount()) require.Equal(t, 0, d.PoolCount())
// Close should work and subsequent RequestRemote is a no-op.
d.Close()
d.RequestRemote(42)
} }

View file

@ -140,6 +140,7 @@ func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error {
type testDiscovery struct{} type testDiscovery struct{}
func (d testDiscovery) BackFill(addrs ...string) {} func (d testDiscovery) BackFill(addrs ...string) {}
func (d testDiscovery) Close() {}
func (d testDiscovery) PoolCount() int { return 0 } func (d testDiscovery) PoolCount() int { return 0 }
func (d testDiscovery) RegisterBadAddr(string) {} func (d testDiscovery) RegisterBadAddr(string) {}
func (d testDiscovery) RegisterGoodAddr(string) {} func (d testDiscovery) RegisterGoodAddr(string) {}

View file

@ -184,6 +184,7 @@ func (s *Server) Start(errChan chan error) {
func (s *Server) Shutdown() { func (s *Server) Shutdown() {
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
s.bQueue.discard() s.bQueue.discard()
s.discovery.Close()
close(s.quit) close(s.quit)
} }