Merge pull request #1483 from nspcc-dev/2.x-fix-empty-discovery-pool

2.x fix empty discovery pool
This commit is contained in:
Roman Khimov 2020-10-13 19:00:20 +03:00 committed by GitHub
commit d2ee2b5f9f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 9 deletions

View file

@ -28,6 +28,7 @@ type Discoverer interface {
// DefaultDiscovery default implementation of the Discoverer interface.
type DefaultDiscovery struct {
seeds []string
transport Transporter
lock sync.RWMutex
closeMtx sync.RWMutex
@ -42,8 +43,9 @@ type DefaultDiscovery struct {
}
// NewDefaultDiscovery returns a new DefaultDiscovery.
func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery {
func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery {
d := &DefaultDiscovery{
seeds: addrs,
transport: ts,
dialTimeout: dt,
badAddrs: make(map[string]bool),
@ -107,6 +109,7 @@ func (d *DefaultDiscovery) RegisterBadAddr(addr string) {
} else {
d.badAddrs[addr] = true
delete(d.unconnectedAddrs, addr)
delete(d.goodAddrs, addr)
}
d.lock.Unlock()
}
@ -150,6 +153,7 @@ func (d *DefaultDiscovery) GoodPeers() []string {
func (d *DefaultDiscovery) RegisterGoodAddr(s string) {
d.lock.Lock()
d.goodAddrs[s] = true
delete(d.badAddrs, s)
d.lock.Unlock()
}
@ -193,15 +197,19 @@ func (d *DefaultDiscovery) Close() {
// run is a goroutine that makes DefaultDiscovery process its queue to connect
// to other nodes.
func (d *DefaultDiscovery) run() {
var requested, r int
var requested, oldRequest, r int
var ok bool
for {
for requested, ok = <-d.requestCh; ok && requested > 0; requested-- {
if requested == 0 {
requested, ok = <-d.requestCh
}
oldRequest = requested
for ok && requested > 0 {
select {
case r, ok = <-d.requestCh:
if requested <= r {
requested = r + 1
requested = r
}
case addr := <-d.pool:
d.lock.RLock()
@ -210,11 +218,30 @@ func (d *DefaultDiscovery) run() {
updatePoolCountMetric(d.PoolCount())
if !addrIsConnected {
go d.tryAddress(addr)
requested--
}
default: // Empty pool
d.lock.Lock()
for _, addr := range d.seeds {
if !d.connectedAddrs[addr] {
delete(d.badAddrs, addr)
d.unconnectedAddrs[addr] = connRetries
d.pushToPoolOrDrop(addr)
}
}
d.lock.Unlock()
}
}
if !ok {
return
}
// Special case, no connections after all attempts.
d.lock.RLock()
connected := len(d.connectedAddrs)
d.lock.RUnlock()
if connected == 0 {
time.Sleep(d.dialTimeout)
requested = oldRequest
}
}
}

View file

@ -34,7 +34,7 @@ func (ft *fakeTransp) Close() {
func TestDefaultDiscoverer(t *testing.T) {
ts := &fakeTransp{}
ts.dialCh = make(chan string)
d := NewDefaultDiscovery(time.Second, ts)
d := NewDefaultDiscovery(nil, time.Second/2, ts)
var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"}
sort.Strings(set1)
@ -134,17 +134,40 @@ func TestDefaultDiscoverer(t *testing.T) {
time.Sleep(time.Second)
}
assert.Equal(t, len(set1), len(d.BadPeers()))
assert.Equal(t, len(set1), len(d.GoodPeers()))
assert.Equal(t, 0, len(d.GoodPeers()))
assert.Equal(t, 0, len(d.UnconnectedPeers()))
// Re-adding bad addresses is a no-op.
d.BackFill(set1...)
assert.Equal(t, 0, len(d.UnconnectedPeers()))
assert.Equal(t, len(set1), len(d.BadPeers()))
assert.Equal(t, len(set1), len(d.GoodPeers()))
assert.Equal(t, 0, len(d.GoodPeers()))
require.Equal(t, 0, d.PoolCount())
// Close should work and subsequent RequestRemote is a no-op.
d.Close()
d.RequestRemote(42)
}
func TestSeedDiscovery(t *testing.T) {
var seeds = []string{"1.1.1.1:10333", "2.2.2.2:10333"}
ts := &fakeTransp{}
ts.dialCh = make(chan string)
atomic.StoreInt32(&ts.retFalse, 1) // Fail all dial requests.
sort.Strings(seeds)
d := NewDefaultDiscovery(seeds, time.Second/10, ts)
d.RequestRemote(len(seeds))
dialled := make([]string, 0)
for i := 0; i < connRetries*2; i++ {
for range seeds {
select {
case a := <-ts.dialCh:
dialled = append(dialled, a)
case <-time.After(time.Second):
t.Fatalf("timeout expecting for transport dial")
}
}
}
}

View file

@ -150,6 +150,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
s.transport = NewTCPTransport(s, fmt.Sprintf("%s:%d", config.Address, config.Port), s.log)
s.discovery = NewDefaultDiscovery(
s.Seeds,
s.DialTimeout,
s.transport,
)
@ -176,8 +177,6 @@ func (s *Server) Start(errChan chan error) {
s.tryStartConsensus()
s.discovery.BackFill(s.Seeds...)
go s.broadcastTxLoop()
go s.relayBlocksLoop()
go s.bQueue.run()