neoneo-go/pkg/network/discovery.go
Roman Khimov 773ccc2b92 network: allow discoverer to reuse addresses
...and don't try to connect to the nodes we're already connected to.

Before this change we had a problem of discoverer throwing away good valid
addresses just because they are already known which lead to pool draining over
time (as address reuse was basically forbidden and getaddr may not get enough
new nodes).
2019-09-16 16:32:04 +03:00

200 lines
4.9 KiB
Go

package network
import (
"time"
)
const (
maxPoolSize = 200
connRetries = 3
)
// Discoverer is an interface that is responsible for maintaining
// a healthy connection pool.
type Discoverer interface {
BackFill(...string)
PoolCount() int
RequestRemote(int)
RegisterBadAddr(string)
RegisterGoodAddr(string)
UnregisterConnectedAddr(string)
UnconnectedPeers() []string
BadPeers() []string
GoodPeers() []string
}
// DefaultDiscovery default implementation of the Discoverer interface.
type DefaultDiscovery struct {
transport Transporter
dialTimeout time.Duration
badAddrs map[string]bool
connectedAddrs map[string]bool
goodAddrs map[string]bool
unconnectedAddrs map[string]int
requestCh chan int
connectedCh chan string
backFill chan string
badAddrCh chan string
pool chan string
goodCh chan string
unconnectedCh chan string
}
// NewDefaultDiscovery returns a new DefaultDiscovery.
func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery {
d := &DefaultDiscovery{
transport: ts,
dialTimeout: dt,
badAddrs: make(map[string]bool),
connectedAddrs: make(map[string]bool),
goodAddrs: make(map[string]bool),
unconnectedAddrs: make(map[string]int),
requestCh: make(chan int),
connectedCh: make(chan string),
goodCh: make(chan string),
unconnectedCh: make(chan string),
backFill: make(chan string),
badAddrCh: make(chan string),
pool: make(chan string, maxPoolSize),
}
go d.run()
return d
}
// BackFill implements the Discoverer interface and will backfill the
// the pool with the given addresses.
func (d *DefaultDiscovery) BackFill(addrs ...string) {
for _, addr := range addrs {
d.backFill <- addr
}
}
// PoolCount returns the number of available node addresses.
func (d *DefaultDiscovery) PoolCount() int {
return len(d.pool)
}
// pushToPoolOrDrop tries to push address given into the pool, but if the pool
// is already full, it just drops it
func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) {
select {
case d.pool <- addr:
// ok, queued
default:
// whatever
}
}
// RequestRemote will try to establish a connection with n nodes.
func (d *DefaultDiscovery) RequestRemote(n int) {
d.requestCh <- n
}
// RegisterBadAddr registers the given address as a bad address.
func (d *DefaultDiscovery) RegisterBadAddr(addr string) {
d.badAddrCh <- addr
d.RequestRemote(1)
}
// UnconnectedPeers returns all addresses of unconnected addrs.
func (d *DefaultDiscovery) UnconnectedPeers() []string {
addrs := make([]string, 0, len(d.unconnectedAddrs))
for addr := range d.unconnectedAddrs {
addrs = append(addrs, addr)
}
return addrs
}
// BadPeers returns all addresses of bad addrs.
func (d *DefaultDiscovery) BadPeers() []string {
addrs := make([]string, 0, len(d.badAddrs))
for addr := range d.badAddrs {
addrs = append(addrs, addr)
}
return addrs
}
// GoodPeers returns all addresses of known good peers (that at least once
// succeded handshaking with us).
func (d *DefaultDiscovery) GoodPeers() []string {
addrs := make([]string, 0, len(d.goodAddrs))
for addr := range d.goodAddrs {
addrs = append(addrs, addr)
}
return addrs
}
// RegisterGoodAddr registers good known connected address that passed
// handshake successfuly.
func (d *DefaultDiscovery) RegisterGoodAddr(s string) {
d.goodCh <- s
}
// UnregisterConnectedAddr tells discoverer that this address is no longer
// connected, but it still is considered as good one.
func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) {
d.unconnectedCh <- s
}
func (d *DefaultDiscovery) tryAddress(addr string) {
if err := d.transport.Dial(addr, d.dialTimeout); err != nil {
d.badAddrCh <- addr
} else {
d.connectedCh <- addr
}
}
func (d *DefaultDiscovery) requestToWork() {
var requested int
for {
for requested = <-d.requestCh; requested > 0; requested-- {
select {
case r := <-d.requestCh:
if requested < r {
requested = r
}
case addr := <-d.pool:
if !d.connectedAddrs[addr] {
go d.tryAddress(addr)
}
}
}
}
}
func (d *DefaultDiscovery) run() {
go d.requestToWork()
for {
select {
case addr := <-d.backFill:
if d.badAddrs[addr] || d.connectedAddrs[addr] ||
d.unconnectedAddrs[addr] > 0 {
break
}
d.unconnectedAddrs[addr] = connRetries
d.pushToPoolOrDrop(addr)
case addr := <-d.badAddrCh:
d.unconnectedAddrs[addr]--
if d.unconnectedAddrs[addr] > 0 {
d.pushToPoolOrDrop(addr)
} else {
d.badAddrs[addr] = true
delete(d.unconnectedAddrs, addr)
}
d.RequestRemote(1)
case addr := <-d.connectedCh:
delete(d.unconnectedAddrs, addr)
if !d.connectedAddrs[addr] {
d.connectedAddrs[addr] = true
}
case addr := <-d.goodCh:
if !d.goodAddrs[addr] {
d.goodAddrs[addr] = true
}
case addr := <-d.unconnectedCh:
delete(d.connectedAddrs, addr)
}
}
}