neoneo-go/pkg/network/discovery.go
Roman Khimov b8c09f509f network: add random slight delay to connection attempts
Small (especially dockerized/virtualized) networks often start all nodes at
ones and then we see a lot of connection flapping in the log. This happens
because nodes try to connect to each other simultaneously, establish two
connections, then each one finds a duplicate and drops it, but this can be
different duplicate connections on other sides, so they retry and it all
happens for some time. Eventually everything settles, but we have a lot of
garbage in the log and a lot of useless attempts.

This random waiting timeout doesn't change the logic much, adds a minimal
delay, but increases chances for both nodes to establish a proper single
connection on both sides to only then see another one and drop it on both
sides as well. It leads to almost no flapping in small networks, doesn't
affect much bigger ones. The delay is close to unnoticeable especially if
there is something in the DB for node to process during startup.
2022-11-17 18:42:43 +03:00

322 lines
8.6 KiB
Go

package network
import (
"math"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
)
const (
maxPoolSize = 10000
connRetries = 3
)
var (
// Maximum waiting time before connection attempt.
tryMaxWait = time.Second / 2
)
// Discoverer is an interface that is responsible for maintaining
// a healthy connection pool.
type Discoverer interface {
BackFill(...string)
GetFanOut() int
NetworkSize() int
PoolCount() int
RequestRemote(int)
RegisterSelf(AddressablePeer)
RegisterGood(AddressablePeer)
RegisterConnected(AddressablePeer)
UnregisterConnected(AddressablePeer, bool)
UnconnectedPeers() []string
BadPeers() []string
GoodPeers() []AddressWithCapabilities
}
// AddressWithCapabilities represents a node address with its capabilities.
type AddressWithCapabilities struct {
Address string
Capabilities capability.Capabilities
}
// DefaultDiscovery default implementation of the Discoverer interface.
type DefaultDiscovery struct {
seeds map[string]string
transport Transporter
lock sync.RWMutex
dialTimeout time.Duration
badAddrs map[string]bool
connectedAddrs map[string]bool
handshakedAddrs map[string]bool
goodAddrs map[string]capability.Capabilities
unconnectedAddrs map[string]int
attempted map[string]bool
outstanding int32
optimalFanOut int32
networkSize int32
requestCh chan int
}
// NewDefaultDiscovery returns a new DefaultDiscovery.
func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery {
var seeds = make(map[string]string)
for i := range addrs {
seeds[addrs[i]] = ""
}
d := &DefaultDiscovery{
seeds: seeds,
transport: ts,
dialTimeout: dt,
badAddrs: make(map[string]bool),
connectedAddrs: make(map[string]bool),
handshakedAddrs: make(map[string]bool),
goodAddrs: make(map[string]capability.Capabilities),
unconnectedAddrs: make(map[string]int),
attempted: make(map[string]bool),
requestCh: make(chan int),
}
return d
}
func newDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) Discoverer {
return NewDefaultDiscovery(addrs, dt, ts)
}
// BackFill implements the Discoverer interface and will backfill
// the pool with the given addresses.
func (d *DefaultDiscovery) BackFill(addrs ...string) {
d.lock.Lock()
d.backfill(addrs...)
d.lock.Unlock()
}
func (d *DefaultDiscovery) backfill(addrs ...string) {
for _, addr := range addrs {
if d.badAddrs[addr] || d.connectedAddrs[addr] || d.handshakedAddrs[addr] ||
d.unconnectedAddrs[addr] > 0 {
continue
}
d.pushToPoolOrDrop(addr)
}
d.updateNetSize()
}
// PoolCount returns the number of the available node addresses.
func (d *DefaultDiscovery) PoolCount() int {
d.lock.RLock()
defer d.lock.RUnlock()
return d.poolCount()
}
func (d *DefaultDiscovery) poolCount() int {
return len(d.unconnectedAddrs)
}
// pushToPoolOrDrop tries to push the address given into the pool, but if the pool
// is already full, it just drops it.
func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) {
if len(d.unconnectedAddrs) < maxPoolSize {
d.unconnectedAddrs[addr] = connRetries
}
}
// RequestRemote tries to establish a connection with n nodes.
func (d *DefaultDiscovery) RequestRemote(requested int) {
outstanding := int(atomic.LoadInt32(&d.outstanding))
requested -= outstanding
for ; requested > 0; requested-- {
var nextAddr string
d.lock.Lock()
for addr := range d.unconnectedAddrs {
if !d.connectedAddrs[addr] && !d.handshakedAddrs[addr] && !d.attempted[addr] {
nextAddr = addr
break
}
}
if nextAddr == "" {
// Empty pool, try seeds.
for addr, ip := range d.seeds {
if ip == "" && !d.attempted[addr] {
nextAddr = addr
break
}
}
}
if nextAddr == "" {
d.lock.Unlock()
// The pool is empty, but all seed nodes are already connected (or attempted),
// we can end up in an infinite loop here, so drop the request.
break
}
d.attempted[nextAddr] = true
d.lock.Unlock()
atomic.AddInt32(&d.outstanding, 1)
go d.tryAddress(nextAddr)
}
}
// RegisterSelf registers the given Peer as a bad one, because it's our own node.
func (d *DefaultDiscovery) RegisterSelf(p AddressablePeer) {
var connaddr = p.ConnectionAddr()
d.lock.Lock()
delete(d.connectedAddrs, connaddr)
d.registerBad(connaddr, true)
d.registerBad(p.PeerAddr().String(), true)
d.lock.Unlock()
}
func (d *DefaultDiscovery) registerBad(addr string, force bool) {
_, isSeed := d.seeds[addr]
if isSeed {
if !force {
d.seeds[addr] = ""
} else {
d.seeds[addr] = "forever" // That's our own address, so never try connecting to it.
}
} else {
d.unconnectedAddrs[addr]--
if d.unconnectedAddrs[addr] <= 0 || force {
d.badAddrs[addr] = true
delete(d.unconnectedAddrs, addr)
delete(d.goodAddrs, addr)
}
}
d.updateNetSize()
}
// UnconnectedPeers returns all addresses of unconnected addrs.
func (d *DefaultDiscovery) UnconnectedPeers() []string {
d.lock.RLock()
addrs := make([]string, 0, len(d.unconnectedAddrs))
for addr := range d.unconnectedAddrs {
addrs = append(addrs, addr)
}
d.lock.RUnlock()
return addrs
}
// BadPeers returns all addresses of bad addrs.
func (d *DefaultDiscovery) BadPeers() []string {
d.lock.RLock()
addrs := make([]string, 0, len(d.badAddrs))
for addr := range d.badAddrs {
addrs = append(addrs, addr)
}
d.lock.RUnlock()
return addrs
}
// GoodPeers returns all addresses of known good peers (that at least once
// succeeded handshaking with us).
func (d *DefaultDiscovery) GoodPeers() []AddressWithCapabilities {
d.lock.RLock()
addrs := make([]AddressWithCapabilities, 0, len(d.goodAddrs))
for addr, cap := range d.goodAddrs {
addrs = append(addrs, AddressWithCapabilities{
Address: addr,
Capabilities: cap,
})
}
d.lock.RUnlock()
return addrs
}
// RegisterGood registers a known good connected peer that has passed
// handshake successfully.
func (d *DefaultDiscovery) RegisterGood(p AddressablePeer) {
var s = p.PeerAddr().String()
d.lock.Lock()
d.handshakedAddrs[s] = true
d.goodAddrs[s] = p.Version().Capabilities
delete(d.badAddrs, s)
d.lock.Unlock()
}
// UnregisterConnected tells the discoverer that this peer is no longer
// connected, but it is still considered a good one.
func (d *DefaultDiscovery) UnregisterConnected(p AddressablePeer, duplicate bool) {
var (
peeraddr = p.PeerAddr().String()
connaddr = p.ConnectionAddr()
)
d.lock.Lock()
delete(d.connectedAddrs, connaddr)
if !duplicate {
for addr, ip := range d.seeds {
if ip == peeraddr {
d.seeds[addr] = ""
break
}
}
delete(d.handshakedAddrs, peeraddr)
if _, ok := d.goodAddrs[peeraddr]; ok {
d.backfill(peeraddr)
}
}
d.lock.Unlock()
}
// RegisterConnected tells discoverer that the given peer is now connected.
func (d *DefaultDiscovery) RegisterConnected(p AddressablePeer) {
var addr = p.ConnectionAddr()
d.lock.Lock()
d.registerConnected(addr)
d.lock.Unlock()
}
func (d *DefaultDiscovery) registerConnected(addr string) {
delete(d.unconnectedAddrs, addr)
d.connectedAddrs[addr] = true
d.updateNetSize()
}
// GetFanOut returns the optimal number of nodes to broadcast packets to.
func (d *DefaultDiscovery) GetFanOut() int {
return int(atomic.LoadInt32(&d.optimalFanOut))
}
// NetworkSize returns the estimated network size.
func (d *DefaultDiscovery) NetworkSize() int {
return int(atomic.LoadInt32(&d.networkSize))
}
// updateNetSize updates network size estimation metric. Must be called under read lock.
func (d *DefaultDiscovery) updateNetSize() {
var netsize = len(d.handshakedAddrs) + len(d.unconnectedAddrs) + 1 // 1 for the node itself.
var fanOut = 2.5 * math.Log(float64(netsize-1)) // -1 for the number of potential peers.
if netsize == 2 { // log(1) == 0.
fanOut = 1 // But we still want to push messages to the peer.
}
atomic.StoreInt32(&d.optimalFanOut, int32(fanOut+0.5)) // Truncating conversion, hence +0.5.
atomic.StoreInt32(&d.networkSize, int32(netsize))
updateNetworkSizeMetric(netsize)
updatePoolCountMetric(d.poolCount())
}
func (d *DefaultDiscovery) tryAddress(addr string) {
var tout = rand.Int63n(int64(tryMaxWait))
time.Sleep(time.Duration(tout)) // Have a sleep before working hard.
p, err := d.transport.Dial(addr, d.dialTimeout)
atomic.AddInt32(&d.outstanding, -1)
d.lock.Lock()
delete(d.attempted, addr)
if err == nil {
if _, ok := d.seeds[addr]; ok {
d.seeds[addr] = p.PeerAddr().String()
}
d.registerConnected(addr)
} else {
d.registerBad(addr, false)
}
d.lock.Unlock()
if err != nil {
time.Sleep(d.dialTimeout)
d.RequestRemote(1)
}
}