diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 08e59f2ee..221db32ee 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -10,7 +10,7 @@ import ( ) const ( - maxPoolSize = 200 + maxPoolSize = 10000 connRetries = 3 ) @@ -18,7 +18,6 @@ const ( // a healthy connection pool. type Discoverer interface { BackFill(...string) - Close() GetFanOut() int PoolCount() int RequestRemote(int) @@ -42,7 +41,6 @@ type DefaultDiscovery struct { seeds []string transport Transporter lock sync.RWMutex - closeMtx sync.RWMutex dialTimeout time.Duration badAddrs map[string]bool connectedAddrs map[string]bool @@ -50,10 +48,7 @@ type DefaultDiscovery struct { unconnectedAddrs map[string]int attempted map[string]bool optimalFanOut int32 - isDead bool requestCh chan int - pool chan string - runExit chan struct{} } // NewDefaultDiscovery returns a new DefaultDiscovery. @@ -68,10 +63,7 @@ func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *Defa unconnectedAddrs: make(map[string]int), attempted: make(map[string]bool), requestCh: make(chan int), - pool: make(chan string, maxPoolSize), - runExit: make(chan struct{}), } - go d.run() return d } @@ -93,7 +85,6 @@ func (d *DefaultDiscovery) backfill(addrs ...string) { d.unconnectedAddrs[addr] > 0 { continue } - d.unconnectedAddrs[addr] = connRetries d.pushToPoolOrDrop(addr) } d.updateNetSize() @@ -101,40 +92,73 @@ func (d *DefaultDiscovery) backfill(addrs ...string) { // PoolCount returns the number of the available node addresses. func (d *DefaultDiscovery) PoolCount() int { - return len(d.pool) + d.lock.RLock() + defer d.lock.RUnlock() + return d.poolCount() +} + +func (d *DefaultDiscovery) poolCount() int { + return len(d.unconnectedAddrs) } // pushToPoolOrDrop tries to push the address given into the pool, but if the pool // is already full, it just drops it. func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) { - select { - case d.pool <- addr: - updatePoolCountMetric(d.PoolCount()) - // ok, queued - default: - // whatever + if len(d.unconnectedAddrs) < maxPoolSize { + d.unconnectedAddrs[addr] = connRetries } } // RequestRemote tries to establish a connection with n nodes. -func (d *DefaultDiscovery) RequestRemote(n int) { - d.closeMtx.RLock() - if !d.isDead { - d.requestCh <- n +func (d *DefaultDiscovery) RequestRemote(requested int) { + for ; requested > 0; requested-- { + var nextAddr string + d.lock.Lock() + for addr := range d.unconnectedAddrs { + if !d.connectedAddrs[addr] && !d.attempted[addr] { + nextAddr = addr + break + } + } + + if nextAddr == "" { + // Empty pool, try seeds. + for _, addr := range d.seeds { + if !d.connectedAddrs[addr] && !d.attempted[addr] { + nextAddr = addr + break + } + } + } + if nextAddr == "" { + d.lock.Unlock() + // The pool is empty, but all seed nodes are already connected (or attempted), + // we can end up in an infinite loop here, so drop the request. + break + } + d.attempted[nextAddr] = true + d.lock.Unlock() + go d.tryAddress(nextAddr) } - d.closeMtx.RUnlock() } // RegisterBadAddr registers the given address as a bad address. func (d *DefaultDiscovery) RegisterBadAddr(addr string) { + var isSeed bool d.lock.Lock() - d.unconnectedAddrs[addr]-- - if d.unconnectedAddrs[addr] > 0 { - d.pushToPoolOrDrop(addr) - } else { - d.badAddrs[addr] = true - delete(d.unconnectedAddrs, addr) - delete(d.goodAddrs, addr) + for _, seed := range d.seeds { + if addr == seed { + isSeed = true + break + } + } + if !isSeed { + d.unconnectedAddrs[addr]-- + if d.unconnectedAddrs[addr] <= 0 { + d.badAddrs[addr] = true + delete(d.unconnectedAddrs, addr) + delete(d.goodAddrs, addr) + } } d.updateNetSize() d.lock.Unlock() @@ -219,6 +243,7 @@ func (d *DefaultDiscovery) updateNetSize() { atomic.StoreInt32(&d.optimalFanOut, int32(fanOut+0.5)) // Truncating conversion, hence +0.5. updateNetworkSizeMetric(netsize) + updatePoolCountMetric(d.poolCount()) } func (d *DefaultDiscovery) tryAddress(addr string) { @@ -231,76 +256,3 @@ func (d *DefaultDiscovery) tryAddress(addr string) { d.RequestRemote(1) } } - -// Close stops discoverer pool processing, which makes the discoverer almost useless. -func (d *DefaultDiscovery) Close() { - d.closeMtx.Lock() - d.isDead = true - d.closeMtx.Unlock() - select { - case <-d.requestCh: // Drain the channel if there is anything there. - default: - } - close(d.requestCh) - <-d.runExit -} - -// run is a goroutine that makes DefaultDiscovery process its queue to connect -// to other nodes. -func (d *DefaultDiscovery) run() { - var requested, oldRequest, r int - var ok bool - - for { - if requested == 0 { - requested, ok = <-d.requestCh - } - oldRequest = requested - for ok && requested > 0 { - select { - case r, ok = <-d.requestCh: - if requested <= r { - requested = r - } - case addr := <-d.pool: - updatePoolCountMetric(d.PoolCount()) - d.lock.Lock() - if !d.connectedAddrs[addr] && !d.attempted[addr] { - d.attempted[addr] = true - go d.tryAddress(addr) - requested-- - } - d.lock.Unlock() - default: // Empty pool - var added int - d.lock.Lock() - for _, addr := range d.seeds { - if !d.connectedAddrs[addr] { - delete(d.badAddrs, addr) - d.unconnectedAddrs[addr] = connRetries - d.pushToPoolOrDrop(addr) - added++ - } - } - d.lock.Unlock() - // The pool is empty, but all seed nodes are already connected, - // we can end up in an infinite loop here, so drop the request. - if added == 0 { - requested = 0 - } - } - } - if !ok { - break - } - // Special case, no connections after all attempts. - d.lock.RLock() - connected := len(d.connectedAddrs) - d.lock.RUnlock() - if connected == 0 { - time.Sleep(d.dialTimeout) - requested = oldRequest - } - } - close(d.runExit) -} diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index 047aceb9b..fdcea6af1 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -157,7 +157,7 @@ func TestDefaultDiscoverer(t *testing.T) { } } } - require.Equal(t, 0, d.PoolCount()) + require.Eventually(t, func() bool { return d.PoolCount() == 0 }, 2*time.Second, 50*time.Millisecond) sort.Strings(dialledBad) for i := 0; i < len(set1); i++ { for j := 0; j < connRetries; j++ { @@ -174,10 +174,6 @@ func TestDefaultDiscoverer(t *testing.T) { assert.Equal(t, len(set1), len(d.BadPeers())) assert.Equal(t, 0, len(d.GoodPeers())) require.Equal(t, 0, d.PoolCount()) - - // Close should work and subsequent RequestRemote is a no-op. - d.Close() - d.RequestRemote(42) } func TestSeedDiscovery(t *testing.T) { diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 8be8339f5..5692482cf 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -33,7 +33,6 @@ func (d *testDiscovery) BackFill(addrs ...string) { defer d.Unlock() d.backfill = append(d.backfill, addrs...) } -func (d *testDiscovery) Close() {} func (d *testDiscovery) PoolCount() int { return 0 } func (d *testDiscovery) RegisterBadAddr(addr string) { d.Lock() @@ -204,6 +203,5 @@ func newTestServerWithCustomCfg(t *testing.T, serverConfig ServerConfig, protoco s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), new(fakechain.FakeStateSync), zaptest.NewLogger(t), newFakeTransp, newTestDiscovery) require.NoError(t, err) - t.Cleanup(s.discovery.Close) return s } diff --git a/pkg/network/server.go b/pkg/network/server.go index b33901543..542a934b6 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -261,7 +261,6 @@ func (s *Server) Start(errChan chan error) { func (s *Server) Shutdown() { s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.transport.Close() - s.discovery.Close() for _, p := range s.getPeers(nil) { p.Disconnect(errServerShutdown) }