neoneo-go/pkg/network/discovery.go
Roman Khimov 78773df6ec network: try connecting to seeds indefinitely, use them with 0 pool
If the node is to start with seeds unavailable it will try connecting to each
of them three times, blacklist them and then sit forever waiting for
something. It's not a good behavior, it should always try connecting to seeds
if nothing else works.
2020-10-13 18:00:31 +03:00

247 lines
5.9 KiB
Go

package network
import (
"sync"
"time"
)
const (
maxPoolSize = 200
connRetries = 3
)
// Discoverer is an interface that is responsible for maintaining
// a healthy connection pool.
type Discoverer interface {
BackFill(...string)
Close()
PoolCount() int
RequestRemote(int)
RegisterBadAddr(string)
RegisterGoodAddr(string)
RegisterConnectedAddr(string)
UnregisterConnectedAddr(string)
UnconnectedPeers() []string
BadPeers() []string
GoodPeers() []string
}
// DefaultDiscovery default implementation of the Discoverer interface.
type DefaultDiscovery struct {
seeds []string
transport Transporter
lock sync.RWMutex
closeMtx sync.RWMutex
dialTimeout time.Duration
badAddrs map[string]bool
connectedAddrs map[string]bool
goodAddrs map[string]bool
unconnectedAddrs map[string]int
isDead bool
requestCh chan int
pool chan string
}
// NewDefaultDiscovery returns a new DefaultDiscovery.
func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery {
d := &DefaultDiscovery{
seeds: addrs,
transport: ts,
dialTimeout: dt,
badAddrs: make(map[string]bool),
connectedAddrs: make(map[string]bool),
goodAddrs: make(map[string]bool),
unconnectedAddrs: make(map[string]int),
requestCh: make(chan int),
pool: make(chan string, maxPoolSize),
}
go d.run()
return d
}
// BackFill implements the Discoverer interface and will backfill the
// the pool with the given addresses.
func (d *DefaultDiscovery) BackFill(addrs ...string) {
d.lock.Lock()
for _, addr := range addrs {
if d.badAddrs[addr] || d.connectedAddrs[addr] ||
d.unconnectedAddrs[addr] > 0 {
continue
}
d.unconnectedAddrs[addr] = connRetries
d.pushToPoolOrDrop(addr)
}
d.lock.Unlock()
}
// PoolCount returns the number of available node addresses.
func (d *DefaultDiscovery) PoolCount() int {
return len(d.pool)
}
// pushToPoolOrDrop tries to push address given into the pool, but if the pool
// is already full, it just drops it
func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) {
select {
case d.pool <- addr:
updatePoolCountMetric(d.PoolCount())
// ok, queued
default:
// whatever
}
}
// RequestRemote tries to establish a connection with n nodes.
func (d *DefaultDiscovery) RequestRemote(n int) {
d.closeMtx.RLock()
if !d.isDead {
d.requestCh <- n
}
d.closeMtx.RUnlock()
}
// RegisterBadAddr registers the given address as a bad address.
func (d *DefaultDiscovery) RegisterBadAddr(addr string) {
d.lock.Lock()
d.unconnectedAddrs[addr]--
if d.unconnectedAddrs[addr] > 0 {
d.pushToPoolOrDrop(addr)
} else {
d.badAddrs[addr] = true
delete(d.unconnectedAddrs, addr)
delete(d.goodAddrs, addr)
}
d.lock.Unlock()
}
// UnconnectedPeers returns all addresses of unconnected addrs.
func (d *DefaultDiscovery) UnconnectedPeers() []string {
d.lock.RLock()
addrs := make([]string, 0, len(d.unconnectedAddrs))
for addr := range d.unconnectedAddrs {
addrs = append(addrs, addr)
}
d.lock.RUnlock()
return addrs
}
// BadPeers returns all addresses of bad addrs.
func (d *DefaultDiscovery) BadPeers() []string {
d.lock.RLock()
addrs := make([]string, 0, len(d.badAddrs))
for addr := range d.badAddrs {
addrs = append(addrs, addr)
}
d.lock.RUnlock()
return addrs
}
// GoodPeers returns all addresses of known good peers (that at least once
// succeeded handshaking with us).
func (d *DefaultDiscovery) GoodPeers() []string {
d.lock.RLock()
addrs := make([]string, 0, len(d.goodAddrs))
for addr := range d.goodAddrs {
addrs = append(addrs, addr)
}
d.lock.RUnlock()
return addrs
}
// RegisterGoodAddr registers good known connected address that passed
// handshake successfully.
func (d *DefaultDiscovery) RegisterGoodAddr(s string) {
d.lock.Lock()
d.goodAddrs[s] = true
delete(d.badAddrs, s)
d.lock.Unlock()
}
// UnregisterConnectedAddr tells discoverer that this address is no longer
// connected, but it still is considered as good one.
func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) {
d.lock.Lock()
delete(d.connectedAddrs, s)
d.lock.Unlock()
}
// RegisterConnectedAddr tells discoverer that given address is now connected.
func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) {
d.lock.Lock()
delete(d.unconnectedAddrs, addr)
d.connectedAddrs[addr] = true
d.lock.Unlock()
}
func (d *DefaultDiscovery) tryAddress(addr string) {
if err := d.transport.Dial(addr, d.dialTimeout); err != nil {
d.RegisterBadAddr(addr)
d.RequestRemote(1)
} else {
d.RegisterConnectedAddr(addr)
}
}
// Close stops discoverer pool processing making discoverer almost useless.
func (d *DefaultDiscovery) Close() {
d.closeMtx.Lock()
d.isDead = true
d.closeMtx.Unlock()
select {
case <-d.requestCh: // Drain the channel if there is anything there.
default:
}
close(d.requestCh)
}
// run is a goroutine that makes DefaultDiscovery process its queue to connect
// to other nodes.
func (d *DefaultDiscovery) run() {
var requested, oldRequest, r int
var ok bool
for {
if requested == 0 {
requested, ok = <-d.requestCh
}
oldRequest = requested
for ok && requested > 0 {
select {
case r, ok = <-d.requestCh:
if requested <= r {
requested = r
}
case addr := <-d.pool:
d.lock.RLock()
addrIsConnected := d.connectedAddrs[addr]
d.lock.RUnlock()
updatePoolCountMetric(d.PoolCount())
if !addrIsConnected {
go d.tryAddress(addr)
requested--
}
default: // Empty pool
d.lock.Lock()
for _, addr := range d.seeds {
if !d.connectedAddrs[addr] {
delete(d.badAddrs, addr)
d.unconnectedAddrs[addr] = connRetries
d.pushToPoolOrDrop(addr)
}
}
d.lock.Unlock()
}
}
if !ok {
return
}
// Special case, no connections after all attempts.
d.lock.RLock()
connected := len(d.connectedAddrs)
d.lock.RUnlock()
if connected == 0 {
time.Sleep(d.dialTimeout)
requested = oldRequest
}
}
}