diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index e95d5db1f..25204a0f9 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -1,7 +1,9 @@ package network import ( + "math" "sync" + "sync/atomic" "time" "github.com/nspcc-dev/neo-go/pkg/network/capability" @@ -17,6 +19,7 @@ const ( type Discoverer interface { BackFill(...string) Close() + GetFanOut() int PoolCount() int RequestRemote(int) RegisterBadAddr(string) @@ -46,6 +49,7 @@ type DefaultDiscovery struct { goodAddrs map[string]capability.Capabilities unconnectedAddrs map[string]int attempted map[string]bool + optimalFanOut int32 isDead bool requestCh chan int pool chan string @@ -196,9 +200,21 @@ func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) { d.lock.Unlock() } +// GetFanOut returns the optimal number of nodes to broadcast packets to. +func (d *DefaultDiscovery) GetFanOut() int { + return int(atomic.LoadInt32(&d.optimalFanOut)) +} + // updateNetSize updates network size estimation metric. Must be called under read lock. func (d *DefaultDiscovery) updateNetSize() { - updateNetworkSizeMetric(len(d.connectedAddrs) + len(d.unconnectedAddrs)) + var netsize = len(d.connectedAddrs) + len(d.unconnectedAddrs) + 1 // 1 for the node itself. + var fanOut = 2.5 * math.Log(float64(netsize-1)) // -1 for the number of potential peers. + if netsize == 2 { // log(1) == 0. + fanOut = 1 // But we still want to push messages to the peer. + } + + atomic.StoreInt32(&d.optimalFanOut, int32(fanOut+0.5)) // Truncating conversion, hence +0.5. + updateNetworkSizeMetric(netsize) } func (d *DefaultDiscovery) tryAddress(addr string) { diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index f4a7a56a8..c1d871bed 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -74,6 +74,7 @@ func TestDefaultDiscoverer(t *testing.T) { assert.Equal(t, 0, len(d.BadPeers())) require.Equal(t, set1, set1D) } + require.Equal(t, 2, d.GetFanOut()) // Request should make goroutines dial our addresses draining the pool. d.RequestRemote(len(set1)) diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 77db5f08a..8be8339f5 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -40,6 +40,11 @@ func (d *testDiscovery) RegisterBadAddr(addr string) { defer d.Unlock() d.bad = append(d.bad, addr) } +func (d *testDiscovery) GetFanOut() int { + d.Lock() + defer d.Unlock() + return len(d.connected) + len(d.backfill) +} func (d *testDiscovery) RegisterGoodAddr(string, capability.Capabilities) {} func (d *testDiscovery) RegisterConnectedAddr(addr string) { d.Lock() diff --git a/pkg/network/server.go b/pkg/network/server.go index 99a7291cd..0e505db16 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1363,8 +1363,12 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.C return } - var replies = make(chan error, peerN) // Cache is there just to make goroutines exit faster. - var ctx, cancel = context.WithTimeout(context.Background(), s.TimePerBlock/2) + var ( + // Optimal number of recipients. + enoughN = s.discovery.GetFanOut() + replies = make(chan error, peerN) // Cache is there just to make goroutines exit faster. + ctx, cancel = context.WithTimeout(context.Background(), s.TimePerBlock/2) + ) for _, peer := range peers { go func(p Peer, ctx context.Context, pkt []byte) { // Do this before packet is sent, reader thread can get the reply before this routine wakes up. @@ -1383,8 +1387,7 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.C if sentN+deadN == peerN { break } - // Send to 2/3 of good peers. - if 3*sentN >= 2*(peerN-deadN) && ctx.Err() == nil { + if sentN >= enoughN && ctx.Err() == nil { cancel() } }