network: try connecting to seeds indefinitely, use them with 0 pool

If the node is to start with seeds unavailable it will try connecting to each
of them three times, blacklist them and then sit forever waiting for
something. It's not a good behavior, it should always try connecting to seeds
if nothing else works.
This commit is contained in:
Roman Khimov 2020-10-13 16:30:10 +03:00
parent b36371ed94
commit 78773df6ec
3 changed files with 54 additions and 7 deletions

View file

@ -28,6 +28,7 @@ type Discoverer interface {
// DefaultDiscovery default implementation of the Discoverer interface. // DefaultDiscovery default implementation of the Discoverer interface.
type DefaultDiscovery struct { type DefaultDiscovery struct {
seeds []string
transport Transporter transport Transporter
lock sync.RWMutex lock sync.RWMutex
closeMtx sync.RWMutex closeMtx sync.RWMutex
@ -42,8 +43,9 @@ type DefaultDiscovery struct {
} }
// NewDefaultDiscovery returns a new DefaultDiscovery. // NewDefaultDiscovery returns a new DefaultDiscovery.
func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery { func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery {
d := &DefaultDiscovery{ d := &DefaultDiscovery{
seeds: addrs,
transport: ts, transport: ts,
dialTimeout: dt, dialTimeout: dt,
badAddrs: make(map[string]bool), badAddrs: make(map[string]bool),
@ -195,15 +197,19 @@ func (d *DefaultDiscovery) Close() {
// run is a goroutine that makes DefaultDiscovery process its queue to connect // run is a goroutine that makes DefaultDiscovery process its queue to connect
// to other nodes. // to other nodes.
func (d *DefaultDiscovery) run() { func (d *DefaultDiscovery) run() {
var requested, r int var requested, oldRequest, r int
var ok bool var ok bool
for { for {
for requested, ok = <-d.requestCh; ok && requested > 0; requested-- { if requested == 0 {
requested, ok = <-d.requestCh
}
oldRequest = requested
for ok && requested > 0 {
select { select {
case r, ok = <-d.requestCh: case r, ok = <-d.requestCh:
if requested <= r { if requested <= r {
requested = r + 1 requested = r
} }
case addr := <-d.pool: case addr := <-d.pool:
d.lock.RLock() d.lock.RLock()
@ -212,11 +218,30 @@ func (d *DefaultDiscovery) run() {
updatePoolCountMetric(d.PoolCount()) updatePoolCountMetric(d.PoolCount())
if !addrIsConnected { if !addrIsConnected {
go d.tryAddress(addr) go d.tryAddress(addr)
requested--
} }
default: // Empty pool
d.lock.Lock()
for _, addr := range d.seeds {
if !d.connectedAddrs[addr] {
delete(d.badAddrs, addr)
d.unconnectedAddrs[addr] = connRetries
d.pushToPoolOrDrop(addr)
}
}
d.lock.Unlock()
} }
} }
if !ok { if !ok {
return return
} }
// Special case, no connections after all attempts.
d.lock.RLock()
connected := len(d.connectedAddrs)
d.lock.RUnlock()
if connected == 0 {
time.Sleep(d.dialTimeout)
requested = oldRequest
}
} }
} }

View file

@ -34,7 +34,7 @@ func (ft *fakeTransp) Close() {
func TestDefaultDiscoverer(t *testing.T) { func TestDefaultDiscoverer(t *testing.T) {
ts := &fakeTransp{} ts := &fakeTransp{}
ts.dialCh = make(chan string) ts.dialCh = make(chan string)
d := NewDefaultDiscovery(time.Second, ts) d := NewDefaultDiscovery(nil, time.Second/2, ts)
var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"} var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"}
sort.Strings(set1) sort.Strings(set1)
@ -148,3 +148,26 @@ func TestDefaultDiscoverer(t *testing.T) {
d.Close() d.Close()
d.RequestRemote(42) d.RequestRemote(42)
} }
func TestSeedDiscovery(t *testing.T) {
var seeds = []string{"1.1.1.1:10333", "2.2.2.2:10333"}
ts := &fakeTransp{}
ts.dialCh = make(chan string)
atomic.StoreInt32(&ts.retFalse, 1) // Fail all dial requests.
sort.Strings(seeds)
d := NewDefaultDiscovery(seeds, time.Second/10, ts)
d.RequestRemote(len(seeds))
dialled := make([]string, 0)
for i := 0; i < connRetries*2; i++ {
for range seeds {
select {
case a := <-ts.dialCh:
dialled = append(dialled, a)
case <-time.After(time.Second):
t.Fatalf("timeout expecting for transport dial")
}
}
}
}

View file

@ -150,6 +150,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
s.transport = NewTCPTransport(s, fmt.Sprintf("%s:%d", config.Address, config.Port), s.log) s.transport = NewTCPTransport(s, fmt.Sprintf("%s:%d", config.Address, config.Port), s.log)
s.discovery = NewDefaultDiscovery( s.discovery = NewDefaultDiscovery(
s.Seeds,
s.DialTimeout, s.DialTimeout,
s.transport, s.transport,
) )
@ -176,8 +177,6 @@ func (s *Server) Start(errChan chan error) {
s.tryStartConsensus() s.tryStartConsensus()
s.discovery.BackFill(s.Seeds...)
go s.broadcastTxLoop() go s.broadcastTxLoop()
go s.relayBlocksLoop() go s.relayBlocksLoop()
go s.bQueue.run() go s.bQueue.run()