From b36371ed94d191f81c76221052001aab92ac283d Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 13 Oct 2020 14:16:06 +0300 Subject: [PATCH 1/2] network: an address should either be good or bad, but not both --- pkg/network/discovery.go | 2 ++ pkg/network/discovery_test.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 65b0b8f36..1daebe189 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -107,6 +107,7 @@ func (d *DefaultDiscovery) RegisterBadAddr(addr string) { } else { d.badAddrs[addr] = true delete(d.unconnectedAddrs, addr) + delete(d.goodAddrs, addr) } d.lock.Unlock() } @@ -150,6 +151,7 @@ func (d *DefaultDiscovery) GoodPeers() []string { func (d *DefaultDiscovery) RegisterGoodAddr(s string) { d.lock.Lock() d.goodAddrs[s] = true + delete(d.badAddrs, s) d.lock.Unlock() } diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index 9105cdc0b..9f4314a21 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -134,14 +134,14 @@ func TestDefaultDiscoverer(t *testing.T) { time.Sleep(time.Second) } assert.Equal(t, len(set1), len(d.BadPeers())) - assert.Equal(t, len(set1), len(d.GoodPeers())) + assert.Equal(t, 0, len(d.GoodPeers())) assert.Equal(t, 0, len(d.UnconnectedPeers())) // Re-adding bad addresses is a no-op. d.BackFill(set1...) assert.Equal(t, 0, len(d.UnconnectedPeers())) assert.Equal(t, len(set1), len(d.BadPeers())) - assert.Equal(t, len(set1), len(d.GoodPeers())) + assert.Equal(t, 0, len(d.GoodPeers())) require.Equal(t, 0, d.PoolCount()) // Close should work and subsequent RequestRemote is a no-op. From 78773df6eca64634e068005891f3a4cbb0348937 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 13 Oct 2020 16:30:10 +0300 Subject: [PATCH 2/2] network: try connecting to seeds indefinitely, use them with 0 pool If the node is to start with seeds unavailable it will try connecting to each of them three times, blacklist them and then sit forever waiting for something. It's not a good behavior, it should always try connecting to seeds if nothing else works. --- pkg/network/discovery.go | 33 +++++++++++++++++++++++++++++---- pkg/network/discovery_test.go | 25 ++++++++++++++++++++++++- pkg/network/server.go | 3 +-- 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 1daebe189..90fde6647 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -28,6 +28,7 @@ type Discoverer interface { // DefaultDiscovery default implementation of the Discoverer interface. type DefaultDiscovery struct { + seeds []string transport Transporter lock sync.RWMutex closeMtx sync.RWMutex @@ -42,8 +43,9 @@ type DefaultDiscovery struct { } // NewDefaultDiscovery returns a new DefaultDiscovery. -func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery { +func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery { d := &DefaultDiscovery{ + seeds: addrs, transport: ts, dialTimeout: dt, badAddrs: make(map[string]bool), @@ -195,15 +197,19 @@ func (d *DefaultDiscovery) Close() { // run is a goroutine that makes DefaultDiscovery process its queue to connect // to other nodes. func (d *DefaultDiscovery) run() { - var requested, r int + var requested, oldRequest, r int var ok bool for { - for requested, ok = <-d.requestCh; ok && requested > 0; requested-- { + if requested == 0 { + requested, ok = <-d.requestCh + } + oldRequest = requested + for ok && requested > 0 { select { case r, ok = <-d.requestCh: if requested <= r { - requested = r + 1 + requested = r } case addr := <-d.pool: d.lock.RLock() @@ -212,11 +218,30 @@ func (d *DefaultDiscovery) run() { updatePoolCountMetric(d.PoolCount()) if !addrIsConnected { go d.tryAddress(addr) + requested-- } + default: // Empty pool + d.lock.Lock() + for _, addr := range d.seeds { + if !d.connectedAddrs[addr] { + delete(d.badAddrs, addr) + d.unconnectedAddrs[addr] = connRetries + d.pushToPoolOrDrop(addr) + } + } + d.lock.Unlock() } } if !ok { return } + // Special case, no connections after all attempts. + d.lock.RLock() + connected := len(d.connectedAddrs) + d.lock.RUnlock() + if connected == 0 { + time.Sleep(d.dialTimeout) + requested = oldRequest + } } } diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index 9f4314a21..08123ba2a 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -34,7 +34,7 @@ func (ft *fakeTransp) Close() { func TestDefaultDiscoverer(t *testing.T) { ts := &fakeTransp{} ts.dialCh = make(chan string) - d := NewDefaultDiscovery(time.Second, ts) + d := NewDefaultDiscovery(nil, time.Second/2, ts) var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"} sort.Strings(set1) @@ -148,3 +148,26 @@ func TestDefaultDiscoverer(t *testing.T) { d.Close() d.RequestRemote(42) } + +func TestSeedDiscovery(t *testing.T) { + var seeds = []string{"1.1.1.1:10333", "2.2.2.2:10333"} + ts := &fakeTransp{} + ts.dialCh = make(chan string) + atomic.StoreInt32(&ts.retFalse, 1) // Fail all dial requests. + sort.Strings(seeds) + + d := NewDefaultDiscovery(seeds, time.Second/10, ts) + + d.RequestRemote(len(seeds)) + dialled := make([]string, 0) + for i := 0; i < connRetries*2; i++ { + for range seeds { + select { + case a := <-ts.dialCh: + dialled = append(dialled, a) + case <-time.After(time.Second): + t.Fatalf("timeout expecting for transport dial") + } + } + } +} diff --git a/pkg/network/server.go b/pkg/network/server.go index c7836f933..d45162418 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -150,6 +150,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* s.transport = NewTCPTransport(s, fmt.Sprintf("%s:%d", config.Address, config.Port), s.log) s.discovery = NewDefaultDiscovery( + s.Seeds, s.DialTimeout, s.transport, ) @@ -176,8 +177,6 @@ func (s *Server) Start(errChan chan error) { s.tryStartConsensus() - s.discovery.BackFill(s.Seeds...) - go s.broadcastTxLoop() go s.relayBlocksLoop() go s.bQueue.run()