network: broadcast to log-dependent number of nodes

Fixes #608.
This commit is contained in:
Roman Khimov 2022-10-12 22:57:49 +03:00
parent dc62046019
commit 631f166709
4 changed files with 30 additions and 5 deletions

View file

@ -1,7 +1,9 @@
package network
import (
"math"
"sync"
"sync/atomic"
"time"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
@ -17,6 +19,7 @@ const (
type Discoverer interface {
BackFill(...string)
Close()
GetFanOut() int
PoolCount() int
RequestRemote(int)
RegisterBadAddr(string)
@ -46,6 +49,7 @@ type DefaultDiscovery struct {
goodAddrs map[string]capability.Capabilities
unconnectedAddrs map[string]int
attempted map[string]bool
optimalFanOut int32
isDead bool
requestCh chan int
pool chan string
@ -196,9 +200,21 @@ func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) {
d.lock.Unlock()
}
// GetFanOut returns the optimal number of nodes to broadcast packets to.
func (d *DefaultDiscovery) GetFanOut() int {
return int(atomic.LoadInt32(&d.optimalFanOut))
}
// updateNetSize updates network size estimation metric. Must be called under read lock.
func (d *DefaultDiscovery) updateNetSize() {
updateNetworkSizeMetric(len(d.connectedAddrs) + len(d.unconnectedAddrs))
var netsize = len(d.connectedAddrs) + len(d.unconnectedAddrs) + 1 // 1 for the node itself.
var fanOut = 2.5 * math.Log(float64(netsize-1)) // -1 for the number of potential peers.
if netsize == 2 { // log(1) == 0.
fanOut = 1 // But we still want to push messages to the peer.
}
atomic.StoreInt32(&d.optimalFanOut, int32(fanOut+0.5)) // Truncating conversion, hence +0.5.
updateNetworkSizeMetric(netsize)
}
func (d *DefaultDiscovery) tryAddress(addr string) {

View file

@ -74,6 +74,7 @@ func TestDefaultDiscoverer(t *testing.T) {
assert.Equal(t, 0, len(d.BadPeers()))
require.Equal(t, set1, set1D)
}
require.Equal(t, 2, d.GetFanOut())
// Request should make goroutines dial our addresses draining the pool.
d.RequestRemote(len(set1))

View file

@ -40,6 +40,11 @@ func (d *testDiscovery) RegisterBadAddr(addr string) {
defer d.Unlock()
d.bad = append(d.bad, addr)
}
func (d *testDiscovery) GetFanOut() int {
d.Lock()
defer d.Unlock()
return len(d.connected) + len(d.backfill)
}
func (d *testDiscovery) RegisterGoodAddr(string, capability.Capabilities) {}
func (d *testDiscovery) RegisterConnectedAddr(addr string) {
d.Lock()

View file

@ -1363,8 +1363,12 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.C
return
}
var replies = make(chan error, peerN) // Cache is there just to make goroutines exit faster.
var ctx, cancel = context.WithTimeout(context.Background(), s.TimePerBlock/2)
var (
// Optimal number of recipients.
enoughN = s.discovery.GetFanOut()
replies = make(chan error, peerN) // Cache is there just to make goroutines exit faster.
ctx, cancel = context.WithTimeout(context.Background(), s.TimePerBlock/2)
)
for _, peer := range peers {
go func(p Peer, ctx context.Context, pkt []byte) {
// Do this before packet is sent, reader thread can get the reply before this routine wakes up.
@ -1383,8 +1387,7 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.C
if sentN+deadN == peerN {
break
}
// Send to 2/3 of good peers.
if 3*sentN >= 2*(peerN-deadN) && ctx.Err() == nil {
if sentN >= enoughN && ctx.Err() == nil {
cancel()
}
}