diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 1daebe189..90fde6647 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -28,6 +28,7 @@ type Discoverer interface { // DefaultDiscovery default implementation of the Discoverer interface. type DefaultDiscovery struct { + seeds []string transport Transporter lock sync.RWMutex closeMtx sync.RWMutex @@ -42,8 +43,9 @@ type DefaultDiscovery struct { } // NewDefaultDiscovery returns a new DefaultDiscovery. -func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery { +func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery { d := &DefaultDiscovery{ + seeds: addrs, transport: ts, dialTimeout: dt, badAddrs: make(map[string]bool), @@ -195,15 +197,19 @@ func (d *DefaultDiscovery) Close() { // run is a goroutine that makes DefaultDiscovery process its queue to connect // to other nodes. func (d *DefaultDiscovery) run() { - var requested, r int + var requested, oldRequest, r int var ok bool for { - for requested, ok = <-d.requestCh; ok && requested > 0; requested-- { + if requested == 0 { + requested, ok = <-d.requestCh + } + oldRequest = requested + for ok && requested > 0 { select { case r, ok = <-d.requestCh: if requested <= r { - requested = r + 1 + requested = r } case addr := <-d.pool: d.lock.RLock() @@ -212,11 +218,30 @@ func (d *DefaultDiscovery) run() { updatePoolCountMetric(d.PoolCount()) if !addrIsConnected { go d.tryAddress(addr) + requested-- } + default: // Empty pool + d.lock.Lock() + for _, addr := range d.seeds { + if !d.connectedAddrs[addr] { + delete(d.badAddrs, addr) + d.unconnectedAddrs[addr] = connRetries + d.pushToPoolOrDrop(addr) + } + } + d.lock.Unlock() } } if !ok { return } + // Special case, no connections after all attempts. + d.lock.RLock() + connected := len(d.connectedAddrs) + d.lock.RUnlock() + if connected == 0 { + time.Sleep(d.dialTimeout) + requested = oldRequest + } } } diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index 9f4314a21..08123ba2a 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -34,7 +34,7 @@ func (ft *fakeTransp) Close() { func TestDefaultDiscoverer(t *testing.T) { ts := &fakeTransp{} ts.dialCh = make(chan string) - d := NewDefaultDiscovery(time.Second, ts) + d := NewDefaultDiscovery(nil, time.Second/2, ts) var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"} sort.Strings(set1) @@ -148,3 +148,26 @@ func TestDefaultDiscoverer(t *testing.T) { d.Close() d.RequestRemote(42) } + +func TestSeedDiscovery(t *testing.T) { + var seeds = []string{"1.1.1.1:10333", "2.2.2.2:10333"} + ts := &fakeTransp{} + ts.dialCh = make(chan string) + atomic.StoreInt32(&ts.retFalse, 1) // Fail all dial requests. + sort.Strings(seeds) + + d := NewDefaultDiscovery(seeds, time.Second/10, ts) + + d.RequestRemote(len(seeds)) + dialled := make([]string, 0) + for i := 0; i < connRetries*2; i++ { + for range seeds { + select { + case a := <-ts.dialCh: + dialled = append(dialled, a) + case <-time.After(time.Second): + t.Fatalf("timeout expecting for transport dial") + } + } + } +} diff --git a/pkg/network/server.go b/pkg/network/server.go index c7836f933..d45162418 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -150,6 +150,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* s.transport = NewTCPTransport(s, fmt.Sprintf("%s:%d", config.Address, config.Port), s.log) s.discovery = NewDefaultDiscovery( + s.Seeds, s.DialTimeout, s.transport, ) @@ -176,8 +177,6 @@ func (s *Server) Start(errChan chan error) { s.tryStartConsensus() - s.discovery.BackFill(s.Seeds...) - go s.broadcastTxLoop() go s.relayBlocksLoop() go s.bQueue.run()