neo-go/pkg/network/discovery.go

268 lines
7.1 KiB
Go
Raw Normal View History

package network
import (
"math"
"sync"
"sync/atomic"
"time"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
)
const (
maxPoolSize = 10000
connRetries = 3
)
// Discoverer is an interface that is responsible for maintaining
// a healthy connection pool.
type Discoverer interface {
BackFill(...string)
GetFanOut() int
NetworkSize() int
PoolCount() int
RequestRemote(int)
RegisterBadAddr(string)
RegisterGoodAddr(string, capability.Capabilities)
RegisterConnectedAddr(string)
UnregisterConnectedAddr(string)
UnconnectedPeers() []string
BadPeers() []string
GoodPeers() []AddressWithCapabilities
}
// AddressWithCapabilities represents a node address with its capabilities.
type AddressWithCapabilities struct {
Address string
Capabilities capability.Capabilities
}
// DefaultDiscovery default implementation of the Discoverer interface.
type DefaultDiscovery struct {
seeds []string
transport Transporter
lock sync.RWMutex
dialTimeout time.Duration
badAddrs map[string]bool
connectedAddrs map[string]bool
goodAddrs map[string]capability.Capabilities
unconnectedAddrs map[string]int
attempted map[string]bool
optimalFanOut int32
networkSize int32
requestCh chan int
}
// NewDefaultDiscovery returns a new DefaultDiscovery.
func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery {
d := &DefaultDiscovery{
seeds: addrs,
transport: ts,
dialTimeout: dt,
badAddrs: make(map[string]bool),
connectedAddrs: make(map[string]bool),
goodAddrs: make(map[string]capability.Capabilities),
unconnectedAddrs: make(map[string]int),
attempted: make(map[string]bool),
requestCh: make(chan int),
}
return d
}
func newDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) Discoverer {
return NewDefaultDiscovery(addrs, dt, ts)
}
// BackFill implements the Discoverer interface and will backfill
// the pool with the given addresses.
func (d *DefaultDiscovery) BackFill(addrs ...string) {
d.lock.Lock()
d.backfill(addrs...)
d.lock.Unlock()
}
func (d *DefaultDiscovery) backfill(addrs ...string) {
for _, addr := range addrs {
if d.badAddrs[addr] || d.connectedAddrs[addr] ||
d.unconnectedAddrs[addr] > 0 {
continue
}
d.pushToPoolOrDrop(addr)
}
d.updateNetSize()
}
// PoolCount returns the number of the available node addresses.
func (d *DefaultDiscovery) PoolCount() int {
d.lock.RLock()
defer d.lock.RUnlock()
return d.poolCount()
}
func (d *DefaultDiscovery) poolCount() int {
return len(d.unconnectedAddrs)
}
// pushToPoolOrDrop tries to push the address given into the pool, but if the pool
// is already full, it just drops it.
func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) {
if len(d.unconnectedAddrs) < maxPoolSize {
d.unconnectedAddrs[addr] = connRetries
}
}
2019-10-22 14:56:03 +00:00
// RequestRemote tries to establish a connection with n nodes.
func (d *DefaultDiscovery) RequestRemote(requested int) {
for ; requested > 0; requested-- {
var nextAddr string
d.lock.Lock()
for addr := range d.unconnectedAddrs {
if !d.connectedAddrs[addr] && !d.attempted[addr] {
nextAddr = addr
break
}
}
if nextAddr == "" {
// Empty pool, try seeds.
for _, addr := range d.seeds {
if !d.connectedAddrs[addr] && !d.attempted[addr] {
nextAddr = addr
break
}
}
}
if nextAddr == "" {
d.lock.Unlock()
// The pool is empty, but all seed nodes are already connected (or attempted),
// we can end up in an infinite loop here, so drop the request.
break
}
d.attempted[nextAddr] = true
d.lock.Unlock()
go d.tryAddress(nextAddr)
}
}
// RegisterBadAddr registers the given address as a bad address.
func (d *DefaultDiscovery) RegisterBadAddr(addr string) {
var isSeed bool
d.lock.Lock()
for _, seed := range d.seeds {
if addr == seed {
isSeed = true
break
}
}
if !isSeed {
d.unconnectedAddrs[addr]--
if d.unconnectedAddrs[addr] <= 0 {
d.badAddrs[addr] = true
delete(d.unconnectedAddrs, addr)
delete(d.goodAddrs, addr)
}
}
d.updateNetSize()
d.lock.Unlock()
}
// UnconnectedPeers returns all addresses of unconnected addrs.
func (d *DefaultDiscovery) UnconnectedPeers() []string {
d.lock.RLock()
addrs := make([]string, 0, len(d.unconnectedAddrs))
for addr := range d.unconnectedAddrs {
addrs = append(addrs, addr)
}
d.lock.RUnlock()
return addrs
}
// BadPeers returns all addresses of bad addrs.
func (d *DefaultDiscovery) BadPeers() []string {
d.lock.RLock()
addrs := make([]string, 0, len(d.badAddrs))
for addr := range d.badAddrs {
addrs = append(addrs, addr)
}
d.lock.RUnlock()
return addrs
}
// GoodPeers returns all addresses of known good peers (that at least once
// succeeded handshaking with us).
func (d *DefaultDiscovery) GoodPeers() []AddressWithCapabilities {
d.lock.RLock()
addrs := make([]AddressWithCapabilities, 0, len(d.goodAddrs))
for addr, cap := range d.goodAddrs {
addrs = append(addrs, AddressWithCapabilities{
Address: addr,
Capabilities: cap,
})
}
d.lock.RUnlock()
return addrs
}
// RegisterGoodAddr registers a known good connected address that has passed
// handshake successfully.
func (d *DefaultDiscovery) RegisterGoodAddr(s string, c capability.Capabilities) {
d.lock.Lock()
d.goodAddrs[s] = c
delete(d.badAddrs, s)
d.lock.Unlock()
}
// UnregisterConnectedAddr tells the discoverer that this address is no longer
// connected, but it is still considered a good one.
func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) {
d.lock.Lock()
delete(d.connectedAddrs, s)
d.backfill(s)
d.lock.Unlock()
}
// RegisterConnectedAddr tells discoverer that the given address is now connected.
func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) {
d.lock.Lock()
delete(d.unconnectedAddrs, addr)
d.connectedAddrs[addr] = true
d.updateNetSize()
d.lock.Unlock()
}
// GetFanOut returns the optimal number of nodes to broadcast packets to.
func (d *DefaultDiscovery) GetFanOut() int {
return int(atomic.LoadInt32(&d.optimalFanOut))
}
// NetworkSize returns the estimated network size.
func (d *DefaultDiscovery) NetworkSize() int {
return int(atomic.LoadInt32(&d.networkSize))
}
// updateNetSize updates network size estimation metric. Must be called under read lock.
func (d *DefaultDiscovery) updateNetSize() {
var netsize = len(d.connectedAddrs) + len(d.unconnectedAddrs) + 1 // 1 for the node itself.
var fanOut = 2.5 * math.Log(float64(netsize-1)) // -1 for the number of potential peers.
if netsize == 2 { // log(1) == 0.
fanOut = 1 // But we still want to push messages to the peer.
}
atomic.StoreInt32(&d.optimalFanOut, int32(fanOut+0.5)) // Truncating conversion, hence +0.5.
atomic.StoreInt32(&d.networkSize, int32(netsize))
updateNetworkSizeMetric(netsize)
updatePoolCountMetric(d.poolCount())
}
func (d *DefaultDiscovery) tryAddress(addr string) {
err := d.transport.Dial(addr, d.dialTimeout)
d.lock.Lock()
delete(d.attempted, addr)
d.lock.Unlock()
if err != nil {
d.RegisterBadAddr(addr)
time.Sleep(d.dialTimeout)
d.RequestRemote(1)
}
}