network: simplify discoverer, make it almost a lib

We already have two basic lists: connected and unconnected nodes, we don't
need an additional channel and we don't need a goroutine to handle it.
This commit is contained in:
Roman Khimov 2022-10-13 18:26:05 +03:00
parent c1ef326183
commit 215e8704f1
4 changed files with 55 additions and 110 deletions

View file

@ -10,7 +10,7 @@ import (
)
const (
maxPoolSize = 200
maxPoolSize = 10000
connRetries = 3
)
@ -18,7 +18,6 @@ const (
// a healthy connection pool.
type Discoverer interface {
BackFill(...string)
Close()
GetFanOut() int
PoolCount() int
RequestRemote(int)
@ -42,7 +41,6 @@ type DefaultDiscovery struct {
seeds []string
transport Transporter
lock sync.RWMutex
closeMtx sync.RWMutex
dialTimeout time.Duration
badAddrs map[string]bool
connectedAddrs map[string]bool
@ -50,10 +48,7 @@ type DefaultDiscovery struct {
unconnectedAddrs map[string]int
attempted map[string]bool
optimalFanOut int32
isDead bool
requestCh chan int
pool chan string
runExit chan struct{}
}
// NewDefaultDiscovery returns a new DefaultDiscovery.
@ -68,10 +63,7 @@ func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *Defa
unconnectedAddrs: make(map[string]int),
attempted: make(map[string]bool),
requestCh: make(chan int),
pool: make(chan string, maxPoolSize),
runExit: make(chan struct{}),
}
go d.run()
return d
}
@ -93,7 +85,6 @@ func (d *DefaultDiscovery) backfill(addrs ...string) {
d.unconnectedAddrs[addr] > 0 {
continue
}
d.unconnectedAddrs[addr] = connRetries
d.pushToPoolOrDrop(addr)
}
d.updateNetSize()
@ -101,40 +92,73 @@ func (d *DefaultDiscovery) backfill(addrs ...string) {
// PoolCount returns the number of the available node addresses.
func (d *DefaultDiscovery) PoolCount() int {
return len(d.pool)
d.lock.RLock()
defer d.lock.RUnlock()
return d.poolCount()
}
func (d *DefaultDiscovery) poolCount() int {
return len(d.unconnectedAddrs)
}
// pushToPoolOrDrop tries to push the address given into the pool, but if the pool
// is already full, it just drops it.
func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) {
select {
case d.pool <- addr:
updatePoolCountMetric(d.PoolCount())
// ok, queued
default:
// whatever
if len(d.unconnectedAddrs) < maxPoolSize {
d.unconnectedAddrs[addr] = connRetries
}
}
// RequestRemote tries to establish a connection with n nodes.
func (d *DefaultDiscovery) RequestRemote(n int) {
d.closeMtx.RLock()
if !d.isDead {
d.requestCh <- n
func (d *DefaultDiscovery) RequestRemote(requested int) {
for ; requested > 0; requested-- {
var nextAddr string
d.lock.Lock()
for addr := range d.unconnectedAddrs {
if !d.connectedAddrs[addr] && !d.attempted[addr] {
nextAddr = addr
break
}
}
if nextAddr == "" {
// Empty pool, try seeds.
for _, addr := range d.seeds {
if !d.connectedAddrs[addr] && !d.attempted[addr] {
nextAddr = addr
break
}
}
}
if nextAddr == "" {
d.lock.Unlock()
// The pool is empty, but all seed nodes are already connected (or attempted),
// we can end up in an infinite loop here, so drop the request.
break
}
d.attempted[nextAddr] = true
d.lock.Unlock()
go d.tryAddress(nextAddr)
}
d.closeMtx.RUnlock()
}
// RegisterBadAddr registers the given address as a bad address.
func (d *DefaultDiscovery) RegisterBadAddr(addr string) {
var isSeed bool
d.lock.Lock()
d.unconnectedAddrs[addr]--
if d.unconnectedAddrs[addr] > 0 {
d.pushToPoolOrDrop(addr)
} else {
d.badAddrs[addr] = true
delete(d.unconnectedAddrs, addr)
delete(d.goodAddrs, addr)
for _, seed := range d.seeds {
if addr == seed {
isSeed = true
break
}
}
if !isSeed {
d.unconnectedAddrs[addr]--
if d.unconnectedAddrs[addr] <= 0 {
d.badAddrs[addr] = true
delete(d.unconnectedAddrs, addr)
delete(d.goodAddrs, addr)
}
}
d.updateNetSize()
d.lock.Unlock()
@ -219,6 +243,7 @@ func (d *DefaultDiscovery) updateNetSize() {
atomic.StoreInt32(&d.optimalFanOut, int32(fanOut+0.5)) // Truncating conversion, hence +0.5.
updateNetworkSizeMetric(netsize)
updatePoolCountMetric(d.poolCount())
}
func (d *DefaultDiscovery) tryAddress(addr string) {
@ -231,76 +256,3 @@ func (d *DefaultDiscovery) tryAddress(addr string) {
d.RequestRemote(1)
}
}
// Close stops discoverer pool processing, which makes the discoverer almost useless.
func (d *DefaultDiscovery) Close() {
d.closeMtx.Lock()
d.isDead = true
d.closeMtx.Unlock()
select {
case <-d.requestCh: // Drain the channel if there is anything there.
default:
}
close(d.requestCh)
<-d.runExit
}
// run is a goroutine that makes DefaultDiscovery process its queue to connect
// to other nodes.
func (d *DefaultDiscovery) run() {
var requested, oldRequest, r int
var ok bool
for {
if requested == 0 {
requested, ok = <-d.requestCh
}
oldRequest = requested
for ok && requested > 0 {
select {
case r, ok = <-d.requestCh:
if requested <= r {
requested = r
}
case addr := <-d.pool:
updatePoolCountMetric(d.PoolCount())
d.lock.Lock()
if !d.connectedAddrs[addr] && !d.attempted[addr] {
d.attempted[addr] = true
go d.tryAddress(addr)
requested--
}
d.lock.Unlock()
default: // Empty pool
var added int
d.lock.Lock()
for _, addr := range d.seeds {
if !d.connectedAddrs[addr] {
delete(d.badAddrs, addr)
d.unconnectedAddrs[addr] = connRetries
d.pushToPoolOrDrop(addr)
added++
}
}
d.lock.Unlock()
// The pool is empty, but all seed nodes are already connected,
// we can end up in an infinite loop here, so drop the request.
if added == 0 {
requested = 0
}
}
}
if !ok {
break
}
// Special case, no connections after all attempts.
d.lock.RLock()
connected := len(d.connectedAddrs)
d.lock.RUnlock()
if connected == 0 {
time.Sleep(d.dialTimeout)
requested = oldRequest
}
}
close(d.runExit)
}

View file

@ -157,7 +157,7 @@ func TestDefaultDiscoverer(t *testing.T) {
}
}
}
require.Equal(t, 0, d.PoolCount())
require.Eventually(t, func() bool { return d.PoolCount() == 0 }, 2*time.Second, 50*time.Millisecond)
sort.Strings(dialledBad)
for i := 0; i < len(set1); i++ {
for j := 0; j < connRetries; j++ {
@ -174,10 +174,6 @@ func TestDefaultDiscoverer(t *testing.T) {
assert.Equal(t, len(set1), len(d.BadPeers()))
assert.Equal(t, 0, len(d.GoodPeers()))
require.Equal(t, 0, d.PoolCount())
// Close should work and subsequent RequestRemote is a no-op.
d.Close()
d.RequestRemote(42)
}
func TestSeedDiscovery(t *testing.T) {

View file

@ -33,7 +33,6 @@ func (d *testDiscovery) BackFill(addrs ...string) {
defer d.Unlock()
d.backfill = append(d.backfill, addrs...)
}
func (d *testDiscovery) Close() {}
func (d *testDiscovery) PoolCount() int { return 0 }
func (d *testDiscovery) RegisterBadAddr(addr string) {
d.Lock()
@ -204,6 +203,5 @@ func newTestServerWithCustomCfg(t *testing.T, serverConfig ServerConfig, protoco
s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), new(fakechain.FakeStateSync), zaptest.NewLogger(t),
newFakeTransp, newTestDiscovery)
require.NoError(t, err)
t.Cleanup(s.discovery.Close)
return s
}

View file

@ -261,7 +261,6 @@ func (s *Server) Start(errChan chan error) {
func (s *Server) Shutdown() {
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
s.transport.Close()
s.discovery.Close()
for _, p := range s.getPeers(nil) {
p.Disconnect(errServerShutdown)
}