diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index f137f71c5..cbc6464a5 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -16,22 +16,28 @@ type Discoverer interface { PoolCount() int RequestRemote(int) RegisterBadAddr(string) + RegisterGoodAddr(string) + UnregisterConnectedAddr(string) UnconnectedPeers() []string BadPeers() []string + GoodPeers() []string } // DefaultDiscovery default implementation of the Discoverer interface. type DefaultDiscovery struct { transport Transporter dialTimeout time.Duration - addrs map[string]bool badAddrs map[string]bool + connectedAddrs map[string]bool + goodAddrs map[string]bool unconnectedAddrs map[string]int requestCh chan int connectedCh chan string backFill chan string badAddrCh chan string pool chan string + goodCh chan string + unconnectedCh chan string } // NewDefaultDiscovery returns a new DefaultDiscovery. @@ -39,11 +45,14 @@ func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery { d := &DefaultDiscovery{ transport: ts, dialTimeout: dt, - addrs: make(map[string]bool), badAddrs: make(map[string]bool), + connectedAddrs: make(map[string]bool), + goodAddrs: make(map[string]bool), unconnectedAddrs: make(map[string]int), requestCh: make(chan int), connectedCh: make(chan string), + goodCh: make(chan string), + unconnectedCh: make(chan string), backFill: make(chan string), badAddrCh: make(chan string), pool: make(chan string, maxPoolSize), @@ -105,6 +114,28 @@ func (d *DefaultDiscovery) BadPeers() []string { return addrs } +// GoodPeers returns all addresses of known good peers (that at least once +// succeded handshaking with us). +func (d *DefaultDiscovery) GoodPeers() []string { + addrs := make([]string, 0, len(d.goodAddrs)) + for addr := range d.goodAddrs { + addrs = append(addrs, addr) + } + return addrs +} + +// RegisterGoodAddr registers good known connected address that passed +// handshake successfuly. +func (d *DefaultDiscovery) RegisterGoodAddr(s string) { + d.goodCh <- s +} + +// UnregisterConnectedAddr tells discoverer that this address is no longer +// connected, but it still is considered as good one. +func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) { + d.unconnectedCh <- s +} + func (d *DefaultDiscovery) tryAddress(addr string) { if err := d.transport.Dial(addr, d.dialTimeout); err != nil { d.badAddrCh <- addr @@ -124,7 +155,9 @@ func (d *DefaultDiscovery) requestToWork() { requested = r } case addr := <-d.pool: - go d.tryAddress(addr) + if !d.connectedAddrs[addr] { + go d.tryAddress(addr) + } } } } @@ -135,14 +168,12 @@ func (d *DefaultDiscovery) run() { for { select { case addr := <-d.backFill: - if _, ok := d.badAddrs[addr]; ok { + if d.badAddrs[addr] || d.connectedAddrs[addr] || + d.unconnectedAddrs[addr] > 0 { break } - if _, ok := d.addrs[addr]; !ok { - d.addrs[addr] = true - d.unconnectedAddrs[addr] = connRetries - d.pushToPoolOrDrop(addr) - } + d.unconnectedAddrs[addr] = connRetries + d.pushToPoolOrDrop(addr) case addr := <-d.badAddrCh: d.unconnectedAddrs[addr]-- if d.unconnectedAddrs[addr] > 0 { @@ -155,6 +186,15 @@ func (d *DefaultDiscovery) run() { case addr := <-d.connectedCh: delete(d.unconnectedAddrs, addr) + if !d.connectedAddrs[addr] { + d.connectedAddrs[addr] = true + } + case addr := <-d.goodCh: + if !d.goodAddrs[addr] { + d.goodAddrs[addr] = true + } + case addr := <-d.unconnectedCh: + delete(d.connectedAddrs, addr) } } } diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index adff3331a..bcbdff5df 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -93,12 +93,15 @@ func (chain testChain) Verify(*transaction.Transaction) error { type testDiscovery struct{} -func (d testDiscovery) BackFill(addrs ...string) {} -func (d testDiscovery) PoolCount() int { return 0 } -func (d testDiscovery) RegisterBadAddr(string) {} -func (d testDiscovery) UnconnectedPeers() []string { return []string{} } -func (d testDiscovery) RequestRemote(n int) {} -func (d testDiscovery) BadPeers() []string { return []string{} } +func (d testDiscovery) BackFill(addrs ...string) {} +func (d testDiscovery) PoolCount() int { return 0 } +func (d testDiscovery) RegisterBadAddr(string) {} +func (d testDiscovery) RegisterGoodAddr(string) {} +func (d testDiscovery) UnregisterConnectedAddr(string) {} +func (d testDiscovery) UnconnectedPeers() []string { return []string{} } +func (d testDiscovery) RequestRemote(n int) {} +func (d testDiscovery) BadPeers() []string { return []string{} } +func (d testDiscovery) GoodPeers() []string { return []string{} } type localTransport struct{} diff --git a/pkg/network/server.go b/pkg/network/server.go index 61cf86da7..ccf876587 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -160,7 +160,9 @@ func (s *Server) run() { "reason": drop.reason, "peerCount": s.PeerCount(), }).Warn("peer disconnected") - s.discovery.BackFill(drop.peer.NetAddr().String()) + addr := drop.peer.NetAddr().String() + s.discovery.UnregisterConnectedAddr(addr) + s.discovery.BackFill(addr) } // else the peer is already gone, which can happen // because we have two goroutines sending signals here @@ -191,6 +193,7 @@ func (s *Server) startProtocol(p Peer) { "id": p.Version().Nonce, }).Info("started protocol") + s.discovery.RegisterGoodAddr(p.NetAddr().String()) err := s.requestHeaders(p) if err != nil { p.Disconnect(err)