b8c09f509f
Small (especially dockerized/virtualized) networks often start all nodes at ones and then we see a lot of connection flapping in the log. This happens because nodes try to connect to each other simultaneously, establish two connections, then each one finds a duplicate and drops it, but this can be different duplicate connections on other sides, so they retry and it all happens for some time. Eventually everything settles, but we have a lot of garbage in the log and a lot of useless attempts. This random waiting timeout doesn't change the logic much, adds a minimal delay, but increases chances for both nodes to establish a proper single connection on both sides to only then see another one and drop it on both sides as well. It leads to almost no flapping in small networks, doesn't affect much bigger ones. The delay is close to unnoticeable especially if there is something in the DB for node to process during startup.
322 lines
8.6 KiB
Go
322 lines
8.6 KiB
Go
package network
|
|
|
|
import (
|
|
"math"
|
|
"math/rand"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/nspcc-dev/neo-go/pkg/network/capability"
|
|
)
|
|
|
|
const (
|
|
maxPoolSize = 10000
|
|
connRetries = 3
|
|
)
|
|
|
|
var (
|
|
// Maximum waiting time before connection attempt.
|
|
tryMaxWait = time.Second / 2
|
|
)
|
|
|
|
// Discoverer is an interface that is responsible for maintaining
|
|
// a healthy connection pool.
|
|
type Discoverer interface {
|
|
BackFill(...string)
|
|
GetFanOut() int
|
|
NetworkSize() int
|
|
PoolCount() int
|
|
RequestRemote(int)
|
|
RegisterSelf(AddressablePeer)
|
|
RegisterGood(AddressablePeer)
|
|
RegisterConnected(AddressablePeer)
|
|
UnregisterConnected(AddressablePeer, bool)
|
|
UnconnectedPeers() []string
|
|
BadPeers() []string
|
|
GoodPeers() []AddressWithCapabilities
|
|
}
|
|
|
|
// AddressWithCapabilities represents a node address with its capabilities.
|
|
type AddressWithCapabilities struct {
|
|
Address string
|
|
Capabilities capability.Capabilities
|
|
}
|
|
|
|
// DefaultDiscovery default implementation of the Discoverer interface.
|
|
type DefaultDiscovery struct {
|
|
seeds map[string]string
|
|
transport Transporter
|
|
lock sync.RWMutex
|
|
dialTimeout time.Duration
|
|
badAddrs map[string]bool
|
|
connectedAddrs map[string]bool
|
|
handshakedAddrs map[string]bool
|
|
goodAddrs map[string]capability.Capabilities
|
|
unconnectedAddrs map[string]int
|
|
attempted map[string]bool
|
|
outstanding int32
|
|
optimalFanOut int32
|
|
networkSize int32
|
|
requestCh chan int
|
|
}
|
|
|
|
// NewDefaultDiscovery returns a new DefaultDiscovery.
|
|
func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery {
|
|
var seeds = make(map[string]string)
|
|
for i := range addrs {
|
|
seeds[addrs[i]] = ""
|
|
}
|
|
d := &DefaultDiscovery{
|
|
seeds: seeds,
|
|
transport: ts,
|
|
dialTimeout: dt,
|
|
badAddrs: make(map[string]bool),
|
|
connectedAddrs: make(map[string]bool),
|
|
handshakedAddrs: make(map[string]bool),
|
|
goodAddrs: make(map[string]capability.Capabilities),
|
|
unconnectedAddrs: make(map[string]int),
|
|
attempted: make(map[string]bool),
|
|
requestCh: make(chan int),
|
|
}
|
|
return d
|
|
}
|
|
|
|
func newDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) Discoverer {
|
|
return NewDefaultDiscovery(addrs, dt, ts)
|
|
}
|
|
|
|
// BackFill implements the Discoverer interface and will backfill
|
|
// the pool with the given addresses.
|
|
func (d *DefaultDiscovery) BackFill(addrs ...string) {
|
|
d.lock.Lock()
|
|
d.backfill(addrs...)
|
|
d.lock.Unlock()
|
|
}
|
|
|
|
func (d *DefaultDiscovery) backfill(addrs ...string) {
|
|
for _, addr := range addrs {
|
|
if d.badAddrs[addr] || d.connectedAddrs[addr] || d.handshakedAddrs[addr] ||
|
|
d.unconnectedAddrs[addr] > 0 {
|
|
continue
|
|
}
|
|
d.pushToPoolOrDrop(addr)
|
|
}
|
|
d.updateNetSize()
|
|
}
|
|
|
|
// PoolCount returns the number of the available node addresses.
|
|
func (d *DefaultDiscovery) PoolCount() int {
|
|
d.lock.RLock()
|
|
defer d.lock.RUnlock()
|
|
return d.poolCount()
|
|
}
|
|
|
|
func (d *DefaultDiscovery) poolCount() int {
|
|
return len(d.unconnectedAddrs)
|
|
}
|
|
|
|
// pushToPoolOrDrop tries to push the address given into the pool, but if the pool
|
|
// is already full, it just drops it.
|
|
func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) {
|
|
if len(d.unconnectedAddrs) < maxPoolSize {
|
|
d.unconnectedAddrs[addr] = connRetries
|
|
}
|
|
}
|
|
|
|
// RequestRemote tries to establish a connection with n nodes.
|
|
func (d *DefaultDiscovery) RequestRemote(requested int) {
|
|
outstanding := int(atomic.LoadInt32(&d.outstanding))
|
|
requested -= outstanding
|
|
for ; requested > 0; requested-- {
|
|
var nextAddr string
|
|
d.lock.Lock()
|
|
for addr := range d.unconnectedAddrs {
|
|
if !d.connectedAddrs[addr] && !d.handshakedAddrs[addr] && !d.attempted[addr] {
|
|
nextAddr = addr
|
|
break
|
|
}
|
|
}
|
|
|
|
if nextAddr == "" {
|
|
// Empty pool, try seeds.
|
|
for addr, ip := range d.seeds {
|
|
if ip == "" && !d.attempted[addr] {
|
|
nextAddr = addr
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if nextAddr == "" {
|
|
d.lock.Unlock()
|
|
// The pool is empty, but all seed nodes are already connected (or attempted),
|
|
// we can end up in an infinite loop here, so drop the request.
|
|
break
|
|
}
|
|
d.attempted[nextAddr] = true
|
|
d.lock.Unlock()
|
|
atomic.AddInt32(&d.outstanding, 1)
|
|
go d.tryAddress(nextAddr)
|
|
}
|
|
}
|
|
|
|
// RegisterSelf registers the given Peer as a bad one, because it's our own node.
|
|
func (d *DefaultDiscovery) RegisterSelf(p AddressablePeer) {
|
|
var connaddr = p.ConnectionAddr()
|
|
d.lock.Lock()
|
|
delete(d.connectedAddrs, connaddr)
|
|
d.registerBad(connaddr, true)
|
|
d.registerBad(p.PeerAddr().String(), true)
|
|
d.lock.Unlock()
|
|
}
|
|
|
|
func (d *DefaultDiscovery) registerBad(addr string, force bool) {
|
|
_, isSeed := d.seeds[addr]
|
|
if isSeed {
|
|
if !force {
|
|
d.seeds[addr] = ""
|
|
} else {
|
|
d.seeds[addr] = "forever" // That's our own address, so never try connecting to it.
|
|
}
|
|
} else {
|
|
d.unconnectedAddrs[addr]--
|
|
if d.unconnectedAddrs[addr] <= 0 || force {
|
|
d.badAddrs[addr] = true
|
|
delete(d.unconnectedAddrs, addr)
|
|
delete(d.goodAddrs, addr)
|
|
}
|
|
}
|
|
d.updateNetSize()
|
|
}
|
|
|
|
// UnconnectedPeers returns all addresses of unconnected addrs.
|
|
func (d *DefaultDiscovery) UnconnectedPeers() []string {
|
|
d.lock.RLock()
|
|
addrs := make([]string, 0, len(d.unconnectedAddrs))
|
|
for addr := range d.unconnectedAddrs {
|
|
addrs = append(addrs, addr)
|
|
}
|
|
d.lock.RUnlock()
|
|
return addrs
|
|
}
|
|
|
|
// BadPeers returns all addresses of bad addrs.
|
|
func (d *DefaultDiscovery) BadPeers() []string {
|
|
d.lock.RLock()
|
|
addrs := make([]string, 0, len(d.badAddrs))
|
|
for addr := range d.badAddrs {
|
|
addrs = append(addrs, addr)
|
|
}
|
|
d.lock.RUnlock()
|
|
return addrs
|
|
}
|
|
|
|
// GoodPeers returns all addresses of known good peers (that at least once
|
|
// succeeded handshaking with us).
|
|
func (d *DefaultDiscovery) GoodPeers() []AddressWithCapabilities {
|
|
d.lock.RLock()
|
|
addrs := make([]AddressWithCapabilities, 0, len(d.goodAddrs))
|
|
for addr, cap := range d.goodAddrs {
|
|
addrs = append(addrs, AddressWithCapabilities{
|
|
Address: addr,
|
|
Capabilities: cap,
|
|
})
|
|
}
|
|
d.lock.RUnlock()
|
|
return addrs
|
|
}
|
|
|
|
// RegisterGood registers a known good connected peer that has passed
|
|
// handshake successfully.
|
|
func (d *DefaultDiscovery) RegisterGood(p AddressablePeer) {
|
|
var s = p.PeerAddr().String()
|
|
d.lock.Lock()
|
|
d.handshakedAddrs[s] = true
|
|
d.goodAddrs[s] = p.Version().Capabilities
|
|
delete(d.badAddrs, s)
|
|
d.lock.Unlock()
|
|
}
|
|
|
|
// UnregisterConnected tells the discoverer that this peer is no longer
|
|
// connected, but it is still considered a good one.
|
|
func (d *DefaultDiscovery) UnregisterConnected(p AddressablePeer, duplicate bool) {
|
|
var (
|
|
peeraddr = p.PeerAddr().String()
|
|
connaddr = p.ConnectionAddr()
|
|
)
|
|
d.lock.Lock()
|
|
delete(d.connectedAddrs, connaddr)
|
|
if !duplicate {
|
|
for addr, ip := range d.seeds {
|
|
if ip == peeraddr {
|
|
d.seeds[addr] = ""
|
|
break
|
|
}
|
|
}
|
|
delete(d.handshakedAddrs, peeraddr)
|
|
if _, ok := d.goodAddrs[peeraddr]; ok {
|
|
d.backfill(peeraddr)
|
|
}
|
|
}
|
|
d.lock.Unlock()
|
|
}
|
|
|
|
// RegisterConnected tells discoverer that the given peer is now connected.
|
|
func (d *DefaultDiscovery) RegisterConnected(p AddressablePeer) {
|
|
var addr = p.ConnectionAddr()
|
|
d.lock.Lock()
|
|
d.registerConnected(addr)
|
|
d.lock.Unlock()
|
|
}
|
|
|
|
func (d *DefaultDiscovery) registerConnected(addr string) {
|
|
delete(d.unconnectedAddrs, addr)
|
|
d.connectedAddrs[addr] = true
|
|
d.updateNetSize()
|
|
}
|
|
|
|
// GetFanOut returns the optimal number of nodes to broadcast packets to.
|
|
func (d *DefaultDiscovery) GetFanOut() int {
|
|
return int(atomic.LoadInt32(&d.optimalFanOut))
|
|
}
|
|
|
|
// NetworkSize returns the estimated network size.
|
|
func (d *DefaultDiscovery) NetworkSize() int {
|
|
return int(atomic.LoadInt32(&d.networkSize))
|
|
}
|
|
|
|
// updateNetSize updates network size estimation metric. Must be called under read lock.
|
|
func (d *DefaultDiscovery) updateNetSize() {
|
|
var netsize = len(d.handshakedAddrs) + len(d.unconnectedAddrs) + 1 // 1 for the node itself.
|
|
var fanOut = 2.5 * math.Log(float64(netsize-1)) // -1 for the number of potential peers.
|
|
if netsize == 2 { // log(1) == 0.
|
|
fanOut = 1 // But we still want to push messages to the peer.
|
|
}
|
|
|
|
atomic.StoreInt32(&d.optimalFanOut, int32(fanOut+0.5)) // Truncating conversion, hence +0.5.
|
|
atomic.StoreInt32(&d.networkSize, int32(netsize))
|
|
updateNetworkSizeMetric(netsize)
|
|
updatePoolCountMetric(d.poolCount())
|
|
}
|
|
|
|
func (d *DefaultDiscovery) tryAddress(addr string) {
|
|
var tout = rand.Int63n(int64(tryMaxWait))
|
|
time.Sleep(time.Duration(tout)) // Have a sleep before working hard.
|
|
p, err := d.transport.Dial(addr, d.dialTimeout)
|
|
atomic.AddInt32(&d.outstanding, -1)
|
|
d.lock.Lock()
|
|
delete(d.attempted, addr)
|
|
if err == nil {
|
|
if _, ok := d.seeds[addr]; ok {
|
|
d.seeds[addr] = p.PeerAddr().String()
|
|
}
|
|
d.registerConnected(addr)
|
|
} else {
|
|
d.registerBad(addr, false)
|
|
}
|
|
d.lock.Unlock()
|
|
if err != nil {
|
|
time.Sleep(d.dialTimeout)
|
|
d.RequestRemote(1)
|
|
}
|
|
}
|