diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 221db32ee..cb4e32555 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -19,6 +19,7 @@ const ( type Discoverer interface { BackFill(...string) GetFanOut() int + NetworkSize() int PoolCount() int RequestRemote(int) RegisterBadAddr(string) @@ -48,6 +49,7 @@ type DefaultDiscovery struct { unconnectedAddrs map[string]int attempted map[string]bool optimalFanOut int32 + networkSize int32 requestCh chan int } @@ -233,6 +235,11 @@ func (d *DefaultDiscovery) GetFanOut() int { return int(atomic.LoadInt32(&d.optimalFanOut)) } +// NetworkSize returns the estimated network size. +func (d *DefaultDiscovery) NetworkSize() int { + return int(atomic.LoadInt32(&d.networkSize)) +} + // updateNetSize updates network size estimation metric. Must be called under read lock. func (d *DefaultDiscovery) updateNetSize() { var netsize = len(d.connectedAddrs) + len(d.unconnectedAddrs) + 1 // 1 for the node itself. @@ -242,6 +249,7 @@ func (d *DefaultDiscovery) updateNetSize() { } atomic.StoreInt32(&d.optimalFanOut, int32(fanOut+0.5)) // Truncating conversion, hence +0.5. + atomic.StoreInt32(&d.networkSize, int32(netsize)) updateNetworkSizeMetric(netsize) updatePoolCountMetric(d.poolCount()) } diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 5692482cf..00c83a7d0 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -40,6 +40,11 @@ func (d *testDiscovery) RegisterBadAddr(addr string) { d.bad = append(d.bad, addr) } func (d *testDiscovery) GetFanOut() int { + d.Lock() + defer d.Unlock() + return (len(d.connected) + len(d.backfill)) * 2 / 3 +} +func (d *testDiscovery) NetworkSize() int { d.Lock() defer d.Unlock() return len(d.connected) + len(d.backfill) diff --git a/pkg/network/server.go b/pkg/network/server.go index 6cef30e0c..888bc44b4 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -387,10 +387,28 @@ func (s *Server) ConnectedPeers() []string { // while itself dealing with peers management (handling connects/disconnects). func (s *Server) run() { go s.runProto() - for { - if s.PeerCount() < s.MinPeers { + for loopCnt := 0; ; loopCnt++ { + var ( + netSize = s.discovery.NetworkSize() + // "Optimal" number of peers. + optimalN = s.discovery.GetFanOut() * 2 + // Real number of peers. + peerN = s.PeerCount() + ) + + if peerN < s.MinPeers { + // Starting up or going below the minimum -> quickly get many new peers. s.discovery.RequestRemote(s.AttemptConnPeers) + } else if s.MinPeers > 0 && loopCnt%s.MinPeers == 0 && optimalN > peerN && optimalN < s.MaxPeers && optimalN < netSize { + // Having some number of peers, but probably can get some more, the network is big. + // It also allows to start picking up new peers proactively, before we suddenly have optimalN-peerN { + connN = optimalN - peerN + } + s.discovery.RequestRemote(connN) } + if s.discovery.PoolCount() < minPoolCount { s.broadcastHPMessage(NewMessage(CMDGetAddr, payload.NewNullPayload())) }