forked from TrueCloudLab/neoneo-go
Merge pull request #1486 from nspcc-dev/fix-empty-discovery-pool
Fix empty discovery pool
This commit is contained in:
commit
e5048b771e
3 changed files with 58 additions and 9 deletions
|
@ -36,6 +36,7 @@ type AddressWithCapabilities struct {
|
||||||
|
|
||||||
// DefaultDiscovery default implementation of the Discoverer interface.
|
// DefaultDiscovery default implementation of the Discoverer interface.
|
||||||
type DefaultDiscovery struct {
|
type DefaultDiscovery struct {
|
||||||
|
seeds []string
|
||||||
transport Transporter
|
transport Transporter
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
closeMtx sync.RWMutex
|
closeMtx sync.RWMutex
|
||||||
|
@ -50,8 +51,9 @@ type DefaultDiscovery struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDefaultDiscovery returns a new DefaultDiscovery.
|
// NewDefaultDiscovery returns a new DefaultDiscovery.
|
||||||
func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery {
|
func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery {
|
||||||
d := &DefaultDiscovery{
|
d := &DefaultDiscovery{
|
||||||
|
seeds: addrs,
|
||||||
transport: ts,
|
transport: ts,
|
||||||
dialTimeout: dt,
|
dialTimeout: dt,
|
||||||
badAddrs: make(map[string]bool),
|
badAddrs: make(map[string]bool),
|
||||||
|
@ -115,6 +117,7 @@ func (d *DefaultDiscovery) RegisterBadAddr(addr string) {
|
||||||
} else {
|
} else {
|
||||||
d.badAddrs[addr] = true
|
d.badAddrs[addr] = true
|
||||||
delete(d.unconnectedAddrs, addr)
|
delete(d.unconnectedAddrs, addr)
|
||||||
|
delete(d.goodAddrs, addr)
|
||||||
}
|
}
|
||||||
d.lock.Unlock()
|
d.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
@ -161,6 +164,7 @@ func (d *DefaultDiscovery) GoodPeers() []AddressWithCapabilities {
|
||||||
func (d *DefaultDiscovery) RegisterGoodAddr(s string, c capability.Capabilities) {
|
func (d *DefaultDiscovery) RegisterGoodAddr(s string, c capability.Capabilities) {
|
||||||
d.lock.Lock()
|
d.lock.Lock()
|
||||||
d.goodAddrs[s] = c
|
d.goodAddrs[s] = c
|
||||||
|
delete(d.badAddrs, s)
|
||||||
d.lock.Unlock()
|
d.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,15 +208,19 @@ func (d *DefaultDiscovery) Close() {
|
||||||
// run is a goroutine that makes DefaultDiscovery process its queue to connect
|
// run is a goroutine that makes DefaultDiscovery process its queue to connect
|
||||||
// to other nodes.
|
// to other nodes.
|
||||||
func (d *DefaultDiscovery) run() {
|
func (d *DefaultDiscovery) run() {
|
||||||
var requested, r int
|
var requested, oldRequest, r int
|
||||||
var ok bool
|
var ok bool
|
||||||
|
|
||||||
for {
|
for {
|
||||||
for requested, ok = <-d.requestCh; ok && requested > 0; requested-- {
|
if requested == 0 {
|
||||||
|
requested, ok = <-d.requestCh
|
||||||
|
}
|
||||||
|
oldRequest = requested
|
||||||
|
for ok && requested > 0 {
|
||||||
select {
|
select {
|
||||||
case r, ok = <-d.requestCh:
|
case r, ok = <-d.requestCh:
|
||||||
if requested <= r {
|
if requested <= r {
|
||||||
requested = r + 1
|
requested = r
|
||||||
}
|
}
|
||||||
case addr := <-d.pool:
|
case addr := <-d.pool:
|
||||||
d.lock.RLock()
|
d.lock.RLock()
|
||||||
|
@ -221,11 +229,30 @@ func (d *DefaultDiscovery) run() {
|
||||||
updatePoolCountMetric(d.PoolCount())
|
updatePoolCountMetric(d.PoolCount())
|
||||||
if !addrIsConnected {
|
if !addrIsConnected {
|
||||||
go d.tryAddress(addr)
|
go d.tryAddress(addr)
|
||||||
|
requested--
|
||||||
}
|
}
|
||||||
|
default: // Empty pool
|
||||||
|
d.lock.Lock()
|
||||||
|
for _, addr := range d.seeds {
|
||||||
|
if !d.connectedAddrs[addr] {
|
||||||
|
delete(d.badAddrs, addr)
|
||||||
|
d.unconnectedAddrs[addr] = connRetries
|
||||||
|
d.pushToPoolOrDrop(addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
d.lock.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// Special case, no connections after all attempts.
|
||||||
|
d.lock.RLock()
|
||||||
|
connected := len(d.connectedAddrs)
|
||||||
|
d.lock.RUnlock()
|
||||||
|
if connected == 0 {
|
||||||
|
time.Sleep(d.dialTimeout)
|
||||||
|
requested = oldRequest
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,7 @@ func (ft *fakeTransp) Close() {
|
||||||
func TestDefaultDiscoverer(t *testing.T) {
|
func TestDefaultDiscoverer(t *testing.T) {
|
||||||
ts := &fakeTransp{}
|
ts := &fakeTransp{}
|
||||||
ts.dialCh = make(chan string)
|
ts.dialCh = make(chan string)
|
||||||
d := NewDefaultDiscovery(time.Second, ts)
|
d := NewDefaultDiscovery(nil, time.Second/2, ts)
|
||||||
|
|
||||||
var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"}
|
var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"}
|
||||||
sort.Strings(set1)
|
sort.Strings(set1)
|
||||||
|
@ -153,17 +153,40 @@ func TestDefaultDiscoverer(t *testing.T) {
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
assert.Equal(t, len(set1), len(d.BadPeers()))
|
assert.Equal(t, len(set1), len(d.BadPeers()))
|
||||||
assert.Equal(t, len(set1), len(d.GoodPeers()))
|
assert.Equal(t, 0, len(d.GoodPeers()))
|
||||||
assert.Equal(t, 0, len(d.UnconnectedPeers()))
|
assert.Equal(t, 0, len(d.UnconnectedPeers()))
|
||||||
|
|
||||||
// Re-adding bad addresses is a no-op.
|
// Re-adding bad addresses is a no-op.
|
||||||
d.BackFill(set1...)
|
d.BackFill(set1...)
|
||||||
assert.Equal(t, 0, len(d.UnconnectedPeers()))
|
assert.Equal(t, 0, len(d.UnconnectedPeers()))
|
||||||
assert.Equal(t, len(set1), len(d.BadPeers()))
|
assert.Equal(t, len(set1), len(d.BadPeers()))
|
||||||
assert.Equal(t, len(set1), len(d.GoodPeers()))
|
assert.Equal(t, 0, len(d.GoodPeers()))
|
||||||
require.Equal(t, 0, d.PoolCount())
|
require.Equal(t, 0, d.PoolCount())
|
||||||
|
|
||||||
// Close should work and subsequent RequestRemote is a no-op.
|
// Close should work and subsequent RequestRemote is a no-op.
|
||||||
d.Close()
|
d.Close()
|
||||||
d.RequestRemote(42)
|
d.RequestRemote(42)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSeedDiscovery(t *testing.T) {
|
||||||
|
var seeds = []string{"1.1.1.1:10333", "2.2.2.2:10333"}
|
||||||
|
ts := &fakeTransp{}
|
||||||
|
ts.dialCh = make(chan string)
|
||||||
|
atomic.StoreInt32(&ts.retFalse, 1) // Fail all dial requests.
|
||||||
|
sort.Strings(seeds)
|
||||||
|
|
||||||
|
d := NewDefaultDiscovery(seeds, time.Second/10, ts)
|
||||||
|
|
||||||
|
d.RequestRemote(len(seeds))
|
||||||
|
dialled := make([]string, 0)
|
||||||
|
for i := 0; i < connRetries*2; i++ {
|
||||||
|
for range seeds {
|
||||||
|
select {
|
||||||
|
case a := <-ts.dialCh:
|
||||||
|
dialled = append(dialled, a)
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatalf("timeout expecting for transport dial")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -151,6 +151,7 @@ func NewServer(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Lo
|
||||||
|
|
||||||
s.transport = NewTCPTransport(s, net.JoinHostPort(config.Address, strconv.Itoa(int(config.Port))), s.log)
|
s.transport = NewTCPTransport(s, net.JoinHostPort(config.Address, strconv.Itoa(int(config.Port))), s.log)
|
||||||
s.discovery = NewDefaultDiscovery(
|
s.discovery = NewDefaultDiscovery(
|
||||||
|
s.Seeds,
|
||||||
s.DialTimeout,
|
s.DialTimeout,
|
||||||
s.transport,
|
s.transport,
|
||||||
)
|
)
|
||||||
|
@ -171,8 +172,6 @@ func (s *Server) Start(errChan chan error) {
|
||||||
|
|
||||||
s.tryStartConsensus()
|
s.tryStartConsensus()
|
||||||
|
|
||||||
s.discovery.BackFill(s.Seeds...)
|
|
||||||
|
|
||||||
go s.broadcastTxLoop()
|
go s.broadcastTxLoop()
|
||||||
go s.relayBlocksLoop()
|
go s.relayBlocksLoop()
|
||||||
go s.bQueue.run()
|
go s.bQueue.run()
|
||||||
|
|
Loading…
Reference in a new issue