diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 4be7d8375..b17c3a7ed 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -36,6 +36,7 @@ type AddressWithCapabilities struct { // DefaultDiscovery default implementation of the Discoverer interface. type DefaultDiscovery struct { + seeds []string transport Transporter lock sync.RWMutex closeMtx sync.RWMutex @@ -50,8 +51,9 @@ type DefaultDiscovery struct { } // NewDefaultDiscovery returns a new DefaultDiscovery. -func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery { +func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery { d := &DefaultDiscovery{ + seeds: addrs, transport: ts, dialTimeout: dt, badAddrs: make(map[string]bool), @@ -206,15 +208,19 @@ func (d *DefaultDiscovery) Close() { // run is a goroutine that makes DefaultDiscovery process its queue to connect // to other nodes. func (d *DefaultDiscovery) run() { - var requested, r int + var requested, oldRequest, r int var ok bool for { - for requested, ok = <-d.requestCh; ok && requested > 0; requested-- { + if requested == 0 { + requested, ok = <-d.requestCh + } + oldRequest = requested + for ok && requested > 0 { select { case r, ok = <-d.requestCh: if requested <= r { - requested = r + 1 + requested = r } case addr := <-d.pool: d.lock.RLock() @@ -223,11 +229,30 @@ func (d *DefaultDiscovery) run() { updatePoolCountMetric(d.PoolCount()) if !addrIsConnected { go d.tryAddress(addr) + requested-- } + default: // Empty pool + d.lock.Lock() + for _, addr := range d.seeds { + if !d.connectedAddrs[addr] { + delete(d.badAddrs, addr) + d.unconnectedAddrs[addr] = connRetries + d.pushToPoolOrDrop(addr) + } + } + d.lock.Unlock() } } if !ok { return } + // Special case, no connections after all attempts. + d.lock.RLock() + connected := len(d.connectedAddrs) + d.lock.RUnlock() + if connected == 0 { + time.Sleep(d.dialTimeout) + requested = oldRequest + } } } diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index 753142de5..8b829ddba 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -38,7 +38,7 @@ func (ft *fakeTransp) Close() { func TestDefaultDiscoverer(t *testing.T) { ts := &fakeTransp{} ts.dialCh = make(chan string) - d := NewDefaultDiscovery(time.Second, ts) + d := NewDefaultDiscovery(nil, time.Second/2, ts) var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"} sort.Strings(set1) @@ -167,3 +167,26 @@ func TestDefaultDiscoverer(t *testing.T) { d.Close() d.RequestRemote(42) } + +func TestSeedDiscovery(t *testing.T) { + var seeds = []string{"1.1.1.1:10333", "2.2.2.2:10333"} + ts := &fakeTransp{} + ts.dialCh = make(chan string) + atomic.StoreInt32(&ts.retFalse, 1) // Fail all dial requests. + sort.Strings(seeds) + + d := NewDefaultDiscovery(seeds, time.Second/10, ts) + + d.RequestRemote(len(seeds)) + dialled := make([]string, 0) + for i := 0; i < connRetries*2; i++ { + for range seeds { + select { + case a := <-ts.dialCh: + dialled = append(dialled, a) + case <-time.After(time.Second): + t.Fatalf("timeout expecting for transport dial") + } + } + } +} diff --git a/pkg/network/server.go b/pkg/network/server.go index e8feac478..fd7b64a5e 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -151,6 +151,7 @@ func NewServer(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Lo s.transport = NewTCPTransport(s, net.JoinHostPort(config.Address, strconv.Itoa(int(config.Port))), s.log) s.discovery = NewDefaultDiscovery( + s.Seeds, s.DialTimeout, s.transport, ) @@ -171,8 +172,6 @@ func (s *Server) Start(errChan chan error) { s.tryStartConsensus() - s.discovery.BackFill(s.Seeds...) - go s.broadcastTxLoop() go s.relayBlocksLoop() go s.bQueue.run()