From 8028e08abc2e4f9877026ab6fb4a6e920c0e302d Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 13 Oct 2020 14:16:06 +0300 Subject: [PATCH 1/2] network: an address should either be good or bad, but not both --- pkg/network/discovery.go | 2 ++ pkg/network/discovery_test.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index cf21618c1..4be7d8375 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -115,6 +115,7 @@ func (d *DefaultDiscovery) RegisterBadAddr(addr string) { } else { d.badAddrs[addr] = true delete(d.unconnectedAddrs, addr) + delete(d.goodAddrs, addr) } d.lock.Unlock() } @@ -161,6 +162,7 @@ func (d *DefaultDiscovery) GoodPeers() []AddressWithCapabilities { func (d *DefaultDiscovery) RegisterGoodAddr(s string, c capability.Capabilities) { d.lock.Lock() d.goodAddrs[s] = c + delete(d.badAddrs, s) d.lock.Unlock() } diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index 94af43d83..753142de5 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -153,14 +153,14 @@ func TestDefaultDiscoverer(t *testing.T) { time.Sleep(time.Second) } assert.Equal(t, len(set1), len(d.BadPeers())) - assert.Equal(t, len(set1), len(d.GoodPeers())) + assert.Equal(t, 0, len(d.GoodPeers())) assert.Equal(t, 0, len(d.UnconnectedPeers())) // Re-adding bad addresses is a no-op. d.BackFill(set1...) assert.Equal(t, 0, len(d.UnconnectedPeers())) assert.Equal(t, len(set1), len(d.BadPeers())) - assert.Equal(t, len(set1), len(d.GoodPeers())) + assert.Equal(t, 0, len(d.GoodPeers())) require.Equal(t, 0, d.PoolCount()) // Close should work and subsequent RequestRemote is a no-op. From 38a22b44b2334d4cd82f79ed7b41541f05fab6e6 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 13 Oct 2020 16:30:10 +0300 Subject: [PATCH 2/2] network: try connecting to seeds indefinitely, use them with 0 pool If the node is to start with seeds unavailable it will try connecting to each of them three times, blacklist them and then sit forever waiting for something. It's not a good behavior, it should always try connecting to seeds if nothing else works. --- pkg/network/discovery.go | 33 +++++++++++++++++++++++++++++---- pkg/network/discovery_test.go | 25 ++++++++++++++++++++++++- pkg/network/server.go | 3 +-- 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 4be7d8375..b17c3a7ed 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -36,6 +36,7 @@ type AddressWithCapabilities struct { // DefaultDiscovery default implementation of the Discoverer interface. type DefaultDiscovery struct { + seeds []string transport Transporter lock sync.RWMutex closeMtx sync.RWMutex @@ -50,8 +51,9 @@ type DefaultDiscovery struct { } // NewDefaultDiscovery returns a new DefaultDiscovery. -func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery { +func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery { d := &DefaultDiscovery{ + seeds: addrs, transport: ts, dialTimeout: dt, badAddrs: make(map[string]bool), @@ -206,15 +208,19 @@ func (d *DefaultDiscovery) Close() { // run is a goroutine that makes DefaultDiscovery process its queue to connect // to other nodes. func (d *DefaultDiscovery) run() { - var requested, r int + var requested, oldRequest, r int var ok bool for { - for requested, ok = <-d.requestCh; ok && requested > 0; requested-- { + if requested == 0 { + requested, ok = <-d.requestCh + } + oldRequest = requested + for ok && requested > 0 { select { case r, ok = <-d.requestCh: if requested <= r { - requested = r + 1 + requested = r } case addr := <-d.pool: d.lock.RLock() @@ -223,11 +229,30 @@ func (d *DefaultDiscovery) run() { updatePoolCountMetric(d.PoolCount()) if !addrIsConnected { go d.tryAddress(addr) + requested-- } + default: // Empty pool + d.lock.Lock() + for _, addr := range d.seeds { + if !d.connectedAddrs[addr] { + delete(d.badAddrs, addr) + d.unconnectedAddrs[addr] = connRetries + d.pushToPoolOrDrop(addr) + } + } + d.lock.Unlock() } } if !ok { return } + // Special case, no connections after all attempts. + d.lock.RLock() + connected := len(d.connectedAddrs) + d.lock.RUnlock() + if connected == 0 { + time.Sleep(d.dialTimeout) + requested = oldRequest + } } } diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index 753142de5..8b829ddba 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -38,7 +38,7 @@ func (ft *fakeTransp) Close() { func TestDefaultDiscoverer(t *testing.T) { ts := &fakeTransp{} ts.dialCh = make(chan string) - d := NewDefaultDiscovery(time.Second, ts) + d := NewDefaultDiscovery(nil, time.Second/2, ts) var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"} sort.Strings(set1) @@ -167,3 +167,26 @@ func TestDefaultDiscoverer(t *testing.T) { d.Close() d.RequestRemote(42) } + +func TestSeedDiscovery(t *testing.T) { + var seeds = []string{"1.1.1.1:10333", "2.2.2.2:10333"} + ts := &fakeTransp{} + ts.dialCh = make(chan string) + atomic.StoreInt32(&ts.retFalse, 1) // Fail all dial requests. + sort.Strings(seeds) + + d := NewDefaultDiscovery(seeds, time.Second/10, ts) + + d.RequestRemote(len(seeds)) + dialled := make([]string, 0) + for i := 0; i < connRetries*2; i++ { + for range seeds { + select { + case a := <-ts.dialCh: + dialled = append(dialled, a) + case <-time.After(time.Second): + t.Fatalf("timeout expecting for transport dial") + } + } + } +} diff --git a/pkg/network/server.go b/pkg/network/server.go index e8feac478..fd7b64a5e 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -151,6 +151,7 @@ func NewServer(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Lo s.transport = NewTCPTransport(s, net.JoinHostPort(config.Address, strconv.Itoa(int(config.Port))), s.log) s.discovery = NewDefaultDiscovery( + s.Seeds, s.DialTimeout, s.transport, ) @@ -171,8 +172,6 @@ func (s *Server) Start(errChan chan error) { s.tryStartConsensus() - s.discovery.BackFill(s.Seeds...) - go s.broadcastTxLoop() go s.relayBlocksLoop() go s.bQueue.run()