package network import ( "sync" "time" ) const ( maxPoolSize = 200 connRetries = 3 ) // Discoverer is an interface that is responsible for maintaining // a healthy connection pool. type Discoverer interface { BackFill(...string) Close() PoolCount() int RequestRemote(int) RegisterBadAddr(string) RegisterGoodAddr(string) RegisterConnectedAddr(string) UnregisterConnectedAddr(string) UnconnectedPeers() []string BadPeers() []string GoodPeers() []string } // DefaultDiscovery default implementation of the Discoverer interface. type DefaultDiscovery struct { seeds []string transport Transporter lock sync.RWMutex closeMtx sync.RWMutex dialTimeout time.Duration badAddrs map[string]bool connectedAddrs map[string]bool goodAddrs map[string]bool unconnectedAddrs map[string]int isDead bool requestCh chan int pool chan string } // NewDefaultDiscovery returns a new DefaultDiscovery. func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery { d := &DefaultDiscovery{ seeds: addrs, transport: ts, dialTimeout: dt, badAddrs: make(map[string]bool), connectedAddrs: make(map[string]bool), goodAddrs: make(map[string]bool), unconnectedAddrs: make(map[string]int), requestCh: make(chan int), pool: make(chan string, maxPoolSize), } go d.run() return d } // BackFill implements the Discoverer interface and will backfill the // the pool with the given addresses. func (d *DefaultDiscovery) BackFill(addrs ...string) { d.lock.Lock() for _, addr := range addrs { if d.badAddrs[addr] || d.connectedAddrs[addr] || d.unconnectedAddrs[addr] > 0 { continue } d.unconnectedAddrs[addr] = connRetries d.pushToPoolOrDrop(addr) } d.lock.Unlock() } // PoolCount returns the number of available node addresses. func (d *DefaultDiscovery) PoolCount() int { return len(d.pool) } // pushToPoolOrDrop tries to push address given into the pool, but if the pool // is already full, it just drops it func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) { select { case d.pool <- addr: updatePoolCountMetric(d.PoolCount()) // ok, queued default: // whatever } } // RequestRemote tries to establish a connection with n nodes. func (d *DefaultDiscovery) RequestRemote(n int) { d.closeMtx.RLock() if !d.isDead { d.requestCh <- n } d.closeMtx.RUnlock() } // RegisterBadAddr registers the given address as a bad address. func (d *DefaultDiscovery) RegisterBadAddr(addr string) { d.lock.Lock() d.unconnectedAddrs[addr]-- if d.unconnectedAddrs[addr] > 0 { d.pushToPoolOrDrop(addr) } else { d.badAddrs[addr] = true delete(d.unconnectedAddrs, addr) delete(d.goodAddrs, addr) } d.lock.Unlock() } // UnconnectedPeers returns all addresses of unconnected addrs. func (d *DefaultDiscovery) UnconnectedPeers() []string { d.lock.RLock() addrs := make([]string, 0, len(d.unconnectedAddrs)) for addr := range d.unconnectedAddrs { addrs = append(addrs, addr) } d.lock.RUnlock() return addrs } // BadPeers returns all addresses of bad addrs. func (d *DefaultDiscovery) BadPeers() []string { d.lock.RLock() addrs := make([]string, 0, len(d.badAddrs)) for addr := range d.badAddrs { addrs = append(addrs, addr) } d.lock.RUnlock() return addrs } // GoodPeers returns all addresses of known good peers (that at least once // succeeded handshaking with us). func (d *DefaultDiscovery) GoodPeers() []string { d.lock.RLock() addrs := make([]string, 0, len(d.goodAddrs)) for addr := range d.goodAddrs { addrs = append(addrs, addr) } d.lock.RUnlock() return addrs } // RegisterGoodAddr registers good known connected address that passed // handshake successfully. func (d *DefaultDiscovery) RegisterGoodAddr(s string) { d.lock.Lock() d.goodAddrs[s] = true delete(d.badAddrs, s) d.lock.Unlock() } // UnregisterConnectedAddr tells discoverer that this address is no longer // connected, but it still is considered as good one. func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) { d.lock.Lock() delete(d.connectedAddrs, s) d.lock.Unlock() } // RegisterConnectedAddr tells discoverer that given address is now connected. func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) { d.lock.Lock() delete(d.unconnectedAddrs, addr) d.connectedAddrs[addr] = true d.lock.Unlock() } func (d *DefaultDiscovery) tryAddress(addr string) { if err := d.transport.Dial(addr, d.dialTimeout); err != nil { d.RegisterBadAddr(addr) d.RequestRemote(1) } else { d.RegisterConnectedAddr(addr) } } // Close stops discoverer pool processing making discoverer almost useless. func (d *DefaultDiscovery) Close() { d.closeMtx.Lock() d.isDead = true d.closeMtx.Unlock() select { case <-d.requestCh: // Drain the channel if there is anything there. default: } close(d.requestCh) } // run is a goroutine that makes DefaultDiscovery process its queue to connect // to other nodes. func (d *DefaultDiscovery) run() { var requested, oldRequest, r int var ok bool for { if requested == 0 { requested, ok = <-d.requestCh } oldRequest = requested for ok && requested > 0 { select { case r, ok = <-d.requestCh: if requested <= r { requested = r } case addr := <-d.pool: d.lock.RLock() addrIsConnected := d.connectedAddrs[addr] d.lock.RUnlock() updatePoolCountMetric(d.PoolCount()) if !addrIsConnected { go d.tryAddress(addr) requested-- } default: // Empty pool d.lock.Lock() for _, addr := range d.seeds { if !d.connectedAddrs[addr] { delete(d.badAddrs, addr) d.unconnectedAddrs[addr] = connRetries d.pushToPoolOrDrop(addr) } } d.lock.Unlock() } } if !ok { return } // Special case, no connections after all attempts. d.lock.RLock() connected := len(d.connectedAddrs) d.lock.RUnlock() if connected == 0 { time.Sleep(d.dialTimeout) requested = oldRequest } } }