773ccc2b92
...and don't try to connect to the nodes we're already connected to. Before this change we had a problem of discoverer throwing away good valid addresses just because they are already known which lead to pool draining over time (as address reuse was basically forbidden and getaddr may not get enough new nodes).
200 lines
4.9 KiB
Go
200 lines
4.9 KiB
Go
package network
|
|
|
|
import (
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
maxPoolSize = 200
|
|
connRetries = 3
|
|
)
|
|
|
|
// Discoverer is an interface that is responsible for maintaining
|
|
// a healthy connection pool.
|
|
type Discoverer interface {
|
|
BackFill(...string)
|
|
PoolCount() int
|
|
RequestRemote(int)
|
|
RegisterBadAddr(string)
|
|
RegisterGoodAddr(string)
|
|
UnregisterConnectedAddr(string)
|
|
UnconnectedPeers() []string
|
|
BadPeers() []string
|
|
GoodPeers() []string
|
|
}
|
|
|
|
// DefaultDiscovery default implementation of the Discoverer interface.
|
|
type DefaultDiscovery struct {
|
|
transport Transporter
|
|
dialTimeout time.Duration
|
|
badAddrs map[string]bool
|
|
connectedAddrs map[string]bool
|
|
goodAddrs map[string]bool
|
|
unconnectedAddrs map[string]int
|
|
requestCh chan int
|
|
connectedCh chan string
|
|
backFill chan string
|
|
badAddrCh chan string
|
|
pool chan string
|
|
goodCh chan string
|
|
unconnectedCh chan string
|
|
}
|
|
|
|
// NewDefaultDiscovery returns a new DefaultDiscovery.
|
|
func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery {
|
|
d := &DefaultDiscovery{
|
|
transport: ts,
|
|
dialTimeout: dt,
|
|
badAddrs: make(map[string]bool),
|
|
connectedAddrs: make(map[string]bool),
|
|
goodAddrs: make(map[string]bool),
|
|
unconnectedAddrs: make(map[string]int),
|
|
requestCh: make(chan int),
|
|
connectedCh: make(chan string),
|
|
goodCh: make(chan string),
|
|
unconnectedCh: make(chan string),
|
|
backFill: make(chan string),
|
|
badAddrCh: make(chan string),
|
|
pool: make(chan string, maxPoolSize),
|
|
}
|
|
go d.run()
|
|
return d
|
|
}
|
|
|
|
// BackFill implements the Discoverer interface and will backfill the
|
|
// the pool with the given addresses.
|
|
func (d *DefaultDiscovery) BackFill(addrs ...string) {
|
|
for _, addr := range addrs {
|
|
d.backFill <- addr
|
|
}
|
|
}
|
|
|
|
// PoolCount returns the number of available node addresses.
|
|
func (d *DefaultDiscovery) PoolCount() int {
|
|
return len(d.pool)
|
|
}
|
|
|
|
// pushToPoolOrDrop tries to push address given into the pool, but if the pool
|
|
// is already full, it just drops it
|
|
func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) {
|
|
select {
|
|
case d.pool <- addr:
|
|
// ok, queued
|
|
default:
|
|
// whatever
|
|
}
|
|
}
|
|
|
|
// RequestRemote will try to establish a connection with n nodes.
|
|
func (d *DefaultDiscovery) RequestRemote(n int) {
|
|
d.requestCh <- n
|
|
}
|
|
|
|
// RegisterBadAddr registers the given address as a bad address.
|
|
func (d *DefaultDiscovery) RegisterBadAddr(addr string) {
|
|
d.badAddrCh <- addr
|
|
d.RequestRemote(1)
|
|
}
|
|
|
|
// UnconnectedPeers returns all addresses of unconnected addrs.
|
|
func (d *DefaultDiscovery) UnconnectedPeers() []string {
|
|
addrs := make([]string, 0, len(d.unconnectedAddrs))
|
|
for addr := range d.unconnectedAddrs {
|
|
addrs = append(addrs, addr)
|
|
}
|
|
return addrs
|
|
}
|
|
|
|
// BadPeers returns all addresses of bad addrs.
|
|
func (d *DefaultDiscovery) BadPeers() []string {
|
|
addrs := make([]string, 0, len(d.badAddrs))
|
|
for addr := range d.badAddrs {
|
|
addrs = append(addrs, addr)
|
|
}
|
|
return addrs
|
|
}
|
|
|
|
// GoodPeers returns all addresses of known good peers (that at least once
|
|
// succeded handshaking with us).
|
|
func (d *DefaultDiscovery) GoodPeers() []string {
|
|
addrs := make([]string, 0, len(d.goodAddrs))
|
|
for addr := range d.goodAddrs {
|
|
addrs = append(addrs, addr)
|
|
}
|
|
return addrs
|
|
}
|
|
|
|
// RegisterGoodAddr registers good known connected address that passed
|
|
// handshake successfuly.
|
|
func (d *DefaultDiscovery) RegisterGoodAddr(s string) {
|
|
d.goodCh <- s
|
|
}
|
|
|
|
// UnregisterConnectedAddr tells discoverer that this address is no longer
|
|
// connected, but it still is considered as good one.
|
|
func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) {
|
|
d.unconnectedCh <- s
|
|
}
|
|
|
|
func (d *DefaultDiscovery) tryAddress(addr string) {
|
|
if err := d.transport.Dial(addr, d.dialTimeout); err != nil {
|
|
d.badAddrCh <- addr
|
|
} else {
|
|
d.connectedCh <- addr
|
|
}
|
|
}
|
|
|
|
func (d *DefaultDiscovery) requestToWork() {
|
|
var requested int
|
|
|
|
for {
|
|
for requested = <-d.requestCh; requested > 0; requested-- {
|
|
select {
|
|
case r := <-d.requestCh:
|
|
if requested < r {
|
|
requested = r
|
|
}
|
|
case addr := <-d.pool:
|
|
if !d.connectedAddrs[addr] {
|
|
go d.tryAddress(addr)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *DefaultDiscovery) run() {
|
|
go d.requestToWork()
|
|
for {
|
|
select {
|
|
case addr := <-d.backFill:
|
|
if d.badAddrs[addr] || d.connectedAddrs[addr] ||
|
|
d.unconnectedAddrs[addr] > 0 {
|
|
break
|
|
}
|
|
d.unconnectedAddrs[addr] = connRetries
|
|
d.pushToPoolOrDrop(addr)
|
|
case addr := <-d.badAddrCh:
|
|
d.unconnectedAddrs[addr]--
|
|
if d.unconnectedAddrs[addr] > 0 {
|
|
d.pushToPoolOrDrop(addr)
|
|
} else {
|
|
d.badAddrs[addr] = true
|
|
delete(d.unconnectedAddrs, addr)
|
|
}
|
|
d.RequestRemote(1)
|
|
|
|
case addr := <-d.connectedCh:
|
|
delete(d.unconnectedAddrs, addr)
|
|
if !d.connectedAddrs[addr] {
|
|
d.connectedAddrs[addr] = true
|
|
}
|
|
case addr := <-d.goodCh:
|
|
if !d.goodAddrs[addr] {
|
|
d.goodAddrs[addr] = true
|
|
}
|
|
case addr := <-d.unconnectedCh:
|
|
delete(d.connectedAddrs, addr)
|
|
}
|
|
}
|
|
}
|