network: allow discoverer to reuse addresses

...and don't try to connect to the nodes we're already connected to.

Before this change we had a problem of discoverer throwing away good valid
addresses just because they are already known which lead to pool draining over
time (as address reuse was basically forbidden and getaddr may not get enough
new nodes).
This commit is contained in:
Roman Khimov 2019-09-13 19:51:58 +03:00
parent 46dc141c6c
commit 773ccc2b92
3 changed files with 62 additions and 16 deletions

View file

@ -16,22 +16,28 @@ type Discoverer interface {
PoolCount() int PoolCount() int
RequestRemote(int) RequestRemote(int)
RegisterBadAddr(string) RegisterBadAddr(string)
RegisterGoodAddr(string)
UnregisterConnectedAddr(string)
UnconnectedPeers() []string UnconnectedPeers() []string
BadPeers() []string BadPeers() []string
GoodPeers() []string
} }
// DefaultDiscovery default implementation of the Discoverer interface. // DefaultDiscovery default implementation of the Discoverer interface.
type DefaultDiscovery struct { type DefaultDiscovery struct {
transport Transporter transport Transporter
dialTimeout time.Duration dialTimeout time.Duration
addrs map[string]bool
badAddrs map[string]bool badAddrs map[string]bool
connectedAddrs map[string]bool
goodAddrs map[string]bool
unconnectedAddrs map[string]int unconnectedAddrs map[string]int
requestCh chan int requestCh chan int
connectedCh chan string connectedCh chan string
backFill chan string backFill chan string
badAddrCh chan string badAddrCh chan string
pool chan string pool chan string
goodCh chan string
unconnectedCh chan string
} }
// NewDefaultDiscovery returns a new DefaultDiscovery. // NewDefaultDiscovery returns a new DefaultDiscovery.
@ -39,11 +45,14 @@ func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery {
d := &DefaultDiscovery{ d := &DefaultDiscovery{
transport: ts, transport: ts,
dialTimeout: dt, dialTimeout: dt,
addrs: make(map[string]bool),
badAddrs: make(map[string]bool), badAddrs: make(map[string]bool),
connectedAddrs: make(map[string]bool),
goodAddrs: make(map[string]bool),
unconnectedAddrs: make(map[string]int), unconnectedAddrs: make(map[string]int),
requestCh: make(chan int), requestCh: make(chan int),
connectedCh: make(chan string), connectedCh: make(chan string),
goodCh: make(chan string),
unconnectedCh: make(chan string),
backFill: make(chan string), backFill: make(chan string),
badAddrCh: make(chan string), badAddrCh: make(chan string),
pool: make(chan string, maxPoolSize), pool: make(chan string, maxPoolSize),
@ -105,6 +114,28 @@ func (d *DefaultDiscovery) BadPeers() []string {
return addrs return addrs
} }
// GoodPeers returns all addresses of known good peers (that at least once
// succeded handshaking with us).
func (d *DefaultDiscovery) GoodPeers() []string {
addrs := make([]string, 0, len(d.goodAddrs))
for addr := range d.goodAddrs {
addrs = append(addrs, addr)
}
return addrs
}
// RegisterGoodAddr registers good known connected address that passed
// handshake successfuly.
func (d *DefaultDiscovery) RegisterGoodAddr(s string) {
d.goodCh <- s
}
// UnregisterConnectedAddr tells discoverer that this address is no longer
// connected, but it still is considered as good one.
func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) {
d.unconnectedCh <- s
}
func (d *DefaultDiscovery) tryAddress(addr string) { func (d *DefaultDiscovery) tryAddress(addr string) {
if err := d.transport.Dial(addr, d.dialTimeout); err != nil { if err := d.transport.Dial(addr, d.dialTimeout); err != nil {
d.badAddrCh <- addr d.badAddrCh <- addr
@ -124,10 +155,12 @@ func (d *DefaultDiscovery) requestToWork() {
requested = r requested = r
} }
case addr := <-d.pool: case addr := <-d.pool:
if !d.connectedAddrs[addr] {
go d.tryAddress(addr) go d.tryAddress(addr)
} }
} }
} }
}
} }
func (d *DefaultDiscovery) run() { func (d *DefaultDiscovery) run() {
@ -135,14 +168,12 @@ func (d *DefaultDiscovery) run() {
for { for {
select { select {
case addr := <-d.backFill: case addr := <-d.backFill:
if _, ok := d.badAddrs[addr]; ok { if d.badAddrs[addr] || d.connectedAddrs[addr] ||
d.unconnectedAddrs[addr] > 0 {
break break
} }
if _, ok := d.addrs[addr]; !ok {
d.addrs[addr] = true
d.unconnectedAddrs[addr] = connRetries d.unconnectedAddrs[addr] = connRetries
d.pushToPoolOrDrop(addr) d.pushToPoolOrDrop(addr)
}
case addr := <-d.badAddrCh: case addr := <-d.badAddrCh:
d.unconnectedAddrs[addr]-- d.unconnectedAddrs[addr]--
if d.unconnectedAddrs[addr] > 0 { if d.unconnectedAddrs[addr] > 0 {
@ -155,6 +186,15 @@ func (d *DefaultDiscovery) run() {
case addr := <-d.connectedCh: case addr := <-d.connectedCh:
delete(d.unconnectedAddrs, addr) delete(d.unconnectedAddrs, addr)
if !d.connectedAddrs[addr] {
d.connectedAddrs[addr] = true
}
case addr := <-d.goodCh:
if !d.goodAddrs[addr] {
d.goodAddrs[addr] = true
}
case addr := <-d.unconnectedCh:
delete(d.connectedAddrs, addr)
} }
} }
} }

View file

@ -96,9 +96,12 @@ type testDiscovery struct{}
func (d testDiscovery) BackFill(addrs ...string) {} func (d testDiscovery) BackFill(addrs ...string) {}
func (d testDiscovery) PoolCount() int { return 0 } func (d testDiscovery) PoolCount() int { return 0 }
func (d testDiscovery) RegisterBadAddr(string) {} func (d testDiscovery) RegisterBadAddr(string) {}
func (d testDiscovery) RegisterGoodAddr(string) {}
func (d testDiscovery) UnregisterConnectedAddr(string) {}
func (d testDiscovery) UnconnectedPeers() []string { return []string{} } func (d testDiscovery) UnconnectedPeers() []string { return []string{} }
func (d testDiscovery) RequestRemote(n int) {} func (d testDiscovery) RequestRemote(n int) {}
func (d testDiscovery) BadPeers() []string { return []string{} } func (d testDiscovery) BadPeers() []string { return []string{} }
func (d testDiscovery) GoodPeers() []string { return []string{} }
type localTransport struct{} type localTransport struct{}

View file

@ -160,7 +160,9 @@ func (s *Server) run() {
"reason": drop.reason, "reason": drop.reason,
"peerCount": s.PeerCount(), "peerCount": s.PeerCount(),
}).Warn("peer disconnected") }).Warn("peer disconnected")
s.discovery.BackFill(drop.peer.NetAddr().String()) addr := drop.peer.NetAddr().String()
s.discovery.UnregisterConnectedAddr(addr)
s.discovery.BackFill(addr)
} }
// else the peer is already gone, which can happen // else the peer is already gone, which can happen
// because we have two goroutines sending signals here // because we have two goroutines sending signals here
@ -191,6 +193,7 @@ func (s *Server) startProtocol(p Peer) {
"id": p.Version().Nonce, "id": p.Version().Nonce,
}).Info("started protocol") }).Info("started protocol")
s.discovery.RegisterGoodAddr(p.NetAddr().String())
err := s.requestHeaders(p) err := s.requestHeaders(p)
if err != nil { if err != nil {
p.Disconnect(err) p.Disconnect(err)