network: try connecting to seeds indefinitely, use them with 0 pool

If the node is to start with seeds unavailable it will try connecting to each
of them three times, blacklist them and then sit forever waiting for
something. It's not a good behavior, it should always try connecting to seeds
if nothing else works.
This commit is contained in:
Roman Khimov 2020-10-13 16:30:10 +03:00
parent 8028e08abc
commit 38a22b44b2
3 changed files with 54 additions and 7 deletions

View file

@ -36,6 +36,7 @@ type AddressWithCapabilities struct {
// DefaultDiscovery default implementation of the Discoverer interface.
type DefaultDiscovery struct {
seeds []string
transport Transporter
lock sync.RWMutex
closeMtx sync.RWMutex
@ -50,8 +51,9 @@ type DefaultDiscovery struct {
}
// NewDefaultDiscovery returns a new DefaultDiscovery.
func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery {
func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery {
d := &DefaultDiscovery{
seeds: addrs,
transport: ts,
dialTimeout: dt,
badAddrs: make(map[string]bool),
@ -206,15 +208,19 @@ func (d *DefaultDiscovery) Close() {
// run is a goroutine that makes DefaultDiscovery process its queue to connect
// to other nodes.
func (d *DefaultDiscovery) run() {
var requested, r int
var requested, oldRequest, r int
var ok bool
for {
for requested, ok = <-d.requestCh; ok && requested > 0; requested-- {
if requested == 0 {
requested, ok = <-d.requestCh
}
oldRequest = requested
for ok && requested > 0 {
select {
case r, ok = <-d.requestCh:
if requested <= r {
requested = r + 1
requested = r
}
case addr := <-d.pool:
d.lock.RLock()
@ -223,11 +229,30 @@ func (d *DefaultDiscovery) run() {
updatePoolCountMetric(d.PoolCount())
if !addrIsConnected {
go d.tryAddress(addr)
requested--
}
default: // Empty pool
d.lock.Lock()
for _, addr := range d.seeds {
if !d.connectedAddrs[addr] {
delete(d.badAddrs, addr)
d.unconnectedAddrs[addr] = connRetries
d.pushToPoolOrDrop(addr)
}
}
d.lock.Unlock()
}
}
if !ok {
return
}
// Special case, no connections after all attempts.
d.lock.RLock()
connected := len(d.connectedAddrs)
d.lock.RUnlock()
if connected == 0 {
time.Sleep(d.dialTimeout)
requested = oldRequest
}
}
}

View file

@ -38,7 +38,7 @@ func (ft *fakeTransp) Close() {
func TestDefaultDiscoverer(t *testing.T) {
ts := &fakeTransp{}
ts.dialCh = make(chan string)
d := NewDefaultDiscovery(time.Second, ts)
d := NewDefaultDiscovery(nil, time.Second/2, ts)
var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"}
sort.Strings(set1)
@ -167,3 +167,26 @@ func TestDefaultDiscoverer(t *testing.T) {
d.Close()
d.RequestRemote(42)
}
func TestSeedDiscovery(t *testing.T) {
var seeds = []string{"1.1.1.1:10333", "2.2.2.2:10333"}
ts := &fakeTransp{}
ts.dialCh = make(chan string)
atomic.StoreInt32(&ts.retFalse, 1) // Fail all dial requests.
sort.Strings(seeds)
d := NewDefaultDiscovery(seeds, time.Second/10, ts)
d.RequestRemote(len(seeds))
dialled := make([]string, 0)
for i := 0; i < connRetries*2; i++ {
for range seeds {
select {
case a := <-ts.dialCh:
dialled = append(dialled, a)
case <-time.After(time.Second):
t.Fatalf("timeout expecting for transport dial")
}
}
}
}

View file

@ -151,6 +151,7 @@ func NewServer(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Lo
s.transport = NewTCPTransport(s, net.JoinHostPort(config.Address, strconv.Itoa(int(config.Port))), s.log)
s.discovery = NewDefaultDiscovery(
s.Seeds,
s.DialTimeout,
s.transport,
)
@ -171,8 +172,6 @@ func (s *Server) Start(errChan chan error) {
s.tryStartConsensus()
s.discovery.BackFill(s.Seeds...)
go s.broadcastTxLoop()
go s.relayBlocksLoop()
go s.bQueue.run()