neo-go/pkg/network/discovery.go

323 lines
8.6 KiB
Go
Raw Normal View History

package network
import (
"math"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
)
const (
maxPoolSize = 10000
connRetries = 3
)
var (
// Maximum waiting time before connection attempt.
tryMaxWait = time.Second / 2
)
// Discoverer is an interface that is responsible for maintaining
// a healthy connection pool.
type Discoverer interface {
BackFill(...string)
GetFanOut() int
NetworkSize() int
PoolCount() int
RequestRemote(int)
RegisterSelf(AddressablePeer)
RegisterGood(AddressablePeer)
RegisterConnected(AddressablePeer)
UnregisterConnected(AddressablePeer, bool)
UnconnectedPeers() []string
BadPeers() []string
GoodPeers() []AddressWithCapabilities
}
// AddressWithCapabilities represents a node address with its capabilities.
type AddressWithCapabilities struct {
Address string
Capabilities capability.Capabilities
}
// DefaultDiscovery default implementation of the Discoverer interface.
type DefaultDiscovery struct {
seeds map[string]string
transport Transporter
lock sync.RWMutex
dialTimeout time.Duration
badAddrs map[string]bool
connectedAddrs map[string]bool
handshakedAddrs map[string]bool
goodAddrs map[string]capability.Capabilities
unconnectedAddrs map[string]int
attempted map[string]bool
outstanding int32
optimalFanOut int32
networkSize int32
requestCh chan int
}
// NewDefaultDiscovery returns a new DefaultDiscovery.
func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery {
var seeds = make(map[string]string)
for i := range addrs {
seeds[addrs[i]] = ""
}
d := &DefaultDiscovery{
seeds: seeds,
transport: ts,
dialTimeout: dt,
badAddrs: make(map[string]bool),
connectedAddrs: make(map[string]bool),
handshakedAddrs: make(map[string]bool),
goodAddrs: make(map[string]capability.Capabilities),
unconnectedAddrs: make(map[string]int),
attempted: make(map[string]bool),
requestCh: make(chan int),
}
return d
}
func newDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) Discoverer {
return NewDefaultDiscovery(addrs, dt, ts)
}
// BackFill implements the Discoverer interface and will backfill
// the pool with the given addresses.
func (d *DefaultDiscovery) BackFill(addrs ...string) {
d.lock.Lock()
d.backfill(addrs...)
d.lock.Unlock()
}
func (d *DefaultDiscovery) backfill(addrs ...string) {
for _, addr := range addrs {
if d.badAddrs[addr] || d.connectedAddrs[addr] || d.handshakedAddrs[addr] ||
d.unconnectedAddrs[addr] > 0 {
continue
}
d.pushToPoolOrDrop(addr)
}
d.updateNetSize()
}
// PoolCount returns the number of the available node addresses.
func (d *DefaultDiscovery) PoolCount() int {
d.lock.RLock()
defer d.lock.RUnlock()
return d.poolCount()
}
func (d *DefaultDiscovery) poolCount() int {
return len(d.unconnectedAddrs)
}
// pushToPoolOrDrop tries to push the address given into the pool, but if the pool
// is already full, it just drops it.
func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) {
if len(d.unconnectedAddrs) < maxPoolSize {
d.unconnectedAddrs[addr] = connRetries
}
}
2019-10-22 14:56:03 +00:00
// RequestRemote tries to establish a connection with n nodes.
func (d *DefaultDiscovery) RequestRemote(requested int) {
outstanding := int(atomic.LoadInt32(&d.outstanding))
requested -= outstanding
for ; requested > 0; requested-- {
var nextAddr string
d.lock.Lock()
for addr := range d.unconnectedAddrs {
if !d.connectedAddrs[addr] && !d.handshakedAddrs[addr] && !d.attempted[addr] {
nextAddr = addr
break
}
}
if nextAddr == "" {
// Empty pool, try seeds.
for addr, ip := range d.seeds {
if ip == "" && !d.attempted[addr] {
nextAddr = addr
break
}
}
}
if nextAddr == "" {
d.lock.Unlock()
// The pool is empty, but all seed nodes are already connected (or attempted),
// we can end up in an infinite loop here, so drop the request.
break
}
d.attempted[nextAddr] = true
d.lock.Unlock()
atomic.AddInt32(&d.outstanding, 1)
go d.tryAddress(nextAddr)
}
}
// RegisterSelf registers the given Peer as a bad one, because it's our own node.
func (d *DefaultDiscovery) RegisterSelf(p AddressablePeer) {
var connaddr = p.ConnectionAddr()
d.lock.Lock()
delete(d.connectedAddrs, connaddr)
d.registerBad(connaddr, true)
d.registerBad(p.PeerAddr().String(), true)
d.lock.Unlock()
}
func (d *DefaultDiscovery) registerBad(addr string, force bool) {
_, isSeed := d.seeds[addr]
if isSeed {
if !force {
d.seeds[addr] = ""
} else {
d.seeds[addr] = "forever" // That's our own address, so never try connecting to it.
}
} else {
d.unconnectedAddrs[addr]--
if d.unconnectedAddrs[addr] <= 0 || force {
d.badAddrs[addr] = true
delete(d.unconnectedAddrs, addr)
delete(d.goodAddrs, addr)
}
}
d.updateNetSize()
}
// UnconnectedPeers returns all addresses of unconnected addrs.
func (d *DefaultDiscovery) UnconnectedPeers() []string {
d.lock.RLock()
addrs := make([]string, 0, len(d.unconnectedAddrs))
for addr := range d.unconnectedAddrs {
addrs = append(addrs, addr)
}
d.lock.RUnlock()
return addrs
}
// BadPeers returns all addresses of bad addrs.
func (d *DefaultDiscovery) BadPeers() []string {
d.lock.RLock()
addrs := make([]string, 0, len(d.badAddrs))
for addr := range d.badAddrs {
addrs = append(addrs, addr)
}
d.lock.RUnlock()
return addrs
}
// GoodPeers returns all addresses of known good peers (that at least once
// succeeded handshaking with us).
func (d *DefaultDiscovery) GoodPeers() []AddressWithCapabilities {
d.lock.RLock()
addrs := make([]AddressWithCapabilities, 0, len(d.goodAddrs))
for addr, cap := range d.goodAddrs {
addrs = append(addrs, AddressWithCapabilities{
Address: addr,
Capabilities: cap,
})
}
d.lock.RUnlock()
return addrs
}
// RegisterGood registers a known good connected peer that has passed
// handshake successfully.
func (d *DefaultDiscovery) RegisterGood(p AddressablePeer) {
var s = p.PeerAddr().String()
d.lock.Lock()
d.handshakedAddrs[s] = true
d.goodAddrs[s] = p.Version().Capabilities
delete(d.badAddrs, s)
d.lock.Unlock()
}
// UnregisterConnected tells the discoverer that this peer is no longer
// connected, but it is still considered a good one.
func (d *DefaultDiscovery) UnregisterConnected(p AddressablePeer, duplicate bool) {
var (
peeraddr = p.PeerAddr().String()
connaddr = p.ConnectionAddr()
)
d.lock.Lock()
delete(d.connectedAddrs, connaddr)
if !duplicate {
for addr, ip := range d.seeds {
if ip == peeraddr {
d.seeds[addr] = ""
break
}
}
delete(d.handshakedAddrs, peeraddr)
if _, ok := d.goodAddrs[peeraddr]; ok {
d.backfill(peeraddr)
}
}
d.lock.Unlock()
}
// RegisterConnected tells discoverer that the given peer is now connected.
func (d *DefaultDiscovery) RegisterConnected(p AddressablePeer) {
var addr = p.ConnectionAddr()
d.lock.Lock()
d.registerConnected(addr)
d.lock.Unlock()
}
func (d *DefaultDiscovery) registerConnected(addr string) {
delete(d.unconnectedAddrs, addr)
d.connectedAddrs[addr] = true
d.updateNetSize()
}
// GetFanOut returns the optimal number of nodes to broadcast packets to.
func (d *DefaultDiscovery) GetFanOut() int {
return int(atomic.LoadInt32(&d.optimalFanOut))
}
// NetworkSize returns the estimated network size.
func (d *DefaultDiscovery) NetworkSize() int {
return int(atomic.LoadInt32(&d.networkSize))
}
// updateNetSize updates network size estimation metric. Must be called under read lock.
func (d *DefaultDiscovery) updateNetSize() {
var netsize = len(d.handshakedAddrs) + len(d.unconnectedAddrs) + 1 // 1 for the node itself.
var fanOut = 2.5 * math.Log(float64(netsize-1)) // -1 for the number of potential peers.
if netsize == 2 { // log(1) == 0.
fanOut = 1 // But we still want to push messages to the peer.
}
atomic.StoreInt32(&d.optimalFanOut, int32(fanOut+0.5)) // Truncating conversion, hence +0.5.
atomic.StoreInt32(&d.networkSize, int32(netsize))
updateNetworkSizeMetric(netsize)
updatePoolCountMetric(d.poolCount())
}
func (d *DefaultDiscovery) tryAddress(addr string) {
var tout = rand.Int63n(int64(tryMaxWait))
time.Sleep(time.Duration(tout)) // Have a sleep before working hard.
p, err := d.transport.Dial(addr, d.dialTimeout)
atomic.AddInt32(&d.outstanding, -1)
d.lock.Lock()
delete(d.attempted, addr)
if err == nil {
if _, ok := d.seeds[addr]; ok {
d.seeds[addr] = p.PeerAddr().String()
}
d.registerConnected(addr)
} else {
d.registerBad(addr, false)
}
d.lock.Unlock()
if err != nil {
time.Sleep(d.dialTimeout)
d.RequestRemote(1)
}
}