Merge pull request #1861 from nspcc-dev/fix-too-many-connections-attempted

Fix too many connections attempted
This commit is contained in:
Roman Khimov 2021-03-26 13:05:03 +03:00 committed by GitHub
commit 67a01f4719
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 6 deletions

View file

@ -45,6 +45,7 @@ type DefaultDiscovery struct {
connectedAddrs map[string]bool connectedAddrs map[string]bool
goodAddrs map[string]capability.Capabilities goodAddrs map[string]capability.Capabilities
unconnectedAddrs map[string]int unconnectedAddrs map[string]int
attempted map[string]bool
isDead bool isDead bool
requestCh chan int requestCh chan int
pool chan string pool chan string
@ -60,6 +61,7 @@ func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *Defa
connectedAddrs: make(map[string]bool), connectedAddrs: make(map[string]bool),
goodAddrs: make(map[string]capability.Capabilities), goodAddrs: make(map[string]capability.Capabilities),
unconnectedAddrs: make(map[string]int), unconnectedAddrs: make(map[string]int),
attempted: make(map[string]bool),
requestCh: make(chan int), requestCh: make(chan int),
pool: make(chan string, maxPoolSize), pool: make(chan string, maxPoolSize),
} }
@ -189,7 +191,11 @@ func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) {
} }
func (d *DefaultDiscovery) tryAddress(addr string) { func (d *DefaultDiscovery) tryAddress(addr string) {
if err := d.transport.Dial(addr, d.dialTimeout); err != nil { err := d.transport.Dial(addr, d.dialTimeout)
d.lock.Lock()
delete(d.attempted, addr)
d.lock.Unlock()
if err != nil {
d.RegisterBadAddr(addr) d.RegisterBadAddr(addr)
d.RequestRemote(1) d.RequestRemote(1)
} }
@ -225,14 +231,14 @@ func (d *DefaultDiscovery) run() {
requested = r requested = r
} }
case addr := <-d.pool: case addr := <-d.pool:
d.lock.RLock()
addrIsConnected := d.connectedAddrs[addr]
d.lock.RUnlock()
updatePoolCountMetric(d.PoolCount()) updatePoolCountMetric(d.PoolCount())
if !addrIsConnected { d.lock.Lock()
if !d.connectedAddrs[addr] && !d.attempted[addr] {
d.attempted[addr] = true
go d.tryAddress(addr) go d.tryAddress(addr)
requested-- requested--
} }
d.lock.Unlock()
default: // Empty pool default: // Empty pool
var added int var added int
d.lock.Lock() d.lock.Lock()

View file

@ -916,9 +916,11 @@ func (s *Server) handleAddrCmd(p Peer, addrs *payload.AddressList) error {
if !p.CanProcessAddr() { if !p.CanProcessAddr() {
return errors.New("unexpected addr received") return errors.New("unexpected addr received")
} }
dups := make(map[string]bool)
for _, a := range addrs.Addrs { for _, a := range addrs.Addrs {
addr, err := a.GetTCPAddress() addr, err := a.GetTCPAddress()
if err == nil { if err == nil && !dups[addr] {
dups[addr] = true
s.discovery.BackFill(addr) s.discovery.BackFill(addr)
} }
} }