From dc6204601927d346fb1812bf2482457895f6b3f5 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 12 Oct 2022 22:29:55 +0300 Subject: [PATCH 1/8] network: add network size estimation metric --- pkg/network/discovery.go | 9 +++++++++ pkg/network/prometheus.go | 13 +++++++++++++ 2 files changed, 22 insertions(+) diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index e6580139d..e95d5db1f 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -87,6 +87,7 @@ func (d *DefaultDiscovery) BackFill(addrs ...string) { d.unconnectedAddrs[addr] = connRetries d.pushToPoolOrDrop(addr) } + d.updateNetSize() d.lock.Unlock() } @@ -127,6 +128,7 @@ func (d *DefaultDiscovery) RegisterBadAddr(addr string) { delete(d.unconnectedAddrs, addr) delete(d.goodAddrs, addr) } + d.updateNetSize() d.lock.Unlock() } @@ -181,6 +183,7 @@ func (d *DefaultDiscovery) RegisterGoodAddr(s string, c capability.Capabilities) func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) { d.lock.Lock() delete(d.connectedAddrs, s) + d.updateNetSize() d.lock.Unlock() } @@ -189,9 +192,15 @@ func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) { d.lock.Lock() delete(d.unconnectedAddrs, addr) d.connectedAddrs[addr] = true + d.updateNetSize() d.lock.Unlock() } +// updateNetSize updates network size estimation metric. Must be called under read lock. +func (d *DefaultDiscovery) updateNetSize() { + updateNetworkSizeMetric(len(d.connectedAddrs) + len(d.unconnectedAddrs)) +} + func (d *DefaultDiscovery) tryAddress(addr string) { err := d.transport.Dial(addr, d.dialTimeout) d.lock.Lock() diff --git a/pkg/network/prometheus.go b/pkg/network/prometheus.go index dab35c44c..31088e3f7 100644 --- a/pkg/network/prometheus.go +++ b/pkg/network/prometheus.go @@ -6,6 +6,14 @@ import ( // Metric used in monitoring service. var ( + estimatedNetworkSize = prometheus.NewGauge( + prometheus.GaugeOpts{ + Help: "Estimated network size", + Name: "network_size", + Namespace: "neogo", + }, + ) + peersConnected = prometheus.NewGauge( prometheus.GaugeOpts{ Help: "Number of connected peers", @@ -42,6 +50,7 @@ var ( func init() { prometheus.MustRegister( + estimatedNetworkSize, peersConnected, servAndNodeVersion, poolCount, @@ -49,6 +58,10 @@ func init() { ) } +func updateNetworkSizeMetric(sz int) { + estimatedNetworkSize.Set(float64(sz)) +} + func updateBlockQueueLenMetric(bqLen int) { blockQueueLength.Set(float64(bqLen)) } From 631f166709ecc82253a3f711ef5cf51a07fbc7ac Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 12 Oct 2022 22:57:49 +0300 Subject: [PATCH 2/8] network: broadcast to log-dependent number of nodes Fixes #608. --- pkg/network/discovery.go | 18 +++++++++++++++++- pkg/network/discovery_test.go | 1 + pkg/network/helper_test.go | 5 +++++ pkg/network/server.go | 11 +++++++---- 4 files changed, 30 insertions(+), 5 deletions(-) diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index e95d5db1f..25204a0f9 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -1,7 +1,9 @@ package network import ( + "math" "sync" + "sync/atomic" "time" "github.com/nspcc-dev/neo-go/pkg/network/capability" @@ -17,6 +19,7 @@ const ( type Discoverer interface { BackFill(...string) Close() + GetFanOut() int PoolCount() int RequestRemote(int) RegisterBadAddr(string) @@ -46,6 +49,7 @@ type DefaultDiscovery struct { goodAddrs map[string]capability.Capabilities unconnectedAddrs map[string]int attempted map[string]bool + optimalFanOut int32 isDead bool requestCh chan int pool chan string @@ -196,9 +200,21 @@ func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) { d.lock.Unlock() } +// GetFanOut returns the optimal number of nodes to broadcast packets to. +func (d *DefaultDiscovery) GetFanOut() int { + return int(atomic.LoadInt32(&d.optimalFanOut)) +} + // updateNetSize updates network size estimation metric. Must be called under read lock. func (d *DefaultDiscovery) updateNetSize() { - updateNetworkSizeMetric(len(d.connectedAddrs) + len(d.unconnectedAddrs)) + var netsize = len(d.connectedAddrs) + len(d.unconnectedAddrs) + 1 // 1 for the node itself. + var fanOut = 2.5 * math.Log(float64(netsize-1)) // -1 for the number of potential peers. + if netsize == 2 { // log(1) == 0. + fanOut = 1 // But we still want to push messages to the peer. + } + + atomic.StoreInt32(&d.optimalFanOut, int32(fanOut+0.5)) // Truncating conversion, hence +0.5. + updateNetworkSizeMetric(netsize) } func (d *DefaultDiscovery) tryAddress(addr string) { diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index f4a7a56a8..c1d871bed 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -74,6 +74,7 @@ func TestDefaultDiscoverer(t *testing.T) { assert.Equal(t, 0, len(d.BadPeers())) require.Equal(t, set1, set1D) } + require.Equal(t, 2, d.GetFanOut()) // Request should make goroutines dial our addresses draining the pool. d.RequestRemote(len(set1)) diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 77db5f08a..8be8339f5 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -40,6 +40,11 @@ func (d *testDiscovery) RegisterBadAddr(addr string) { defer d.Unlock() d.bad = append(d.bad, addr) } +func (d *testDiscovery) GetFanOut() int { + d.Lock() + defer d.Unlock() + return len(d.connected) + len(d.backfill) +} func (d *testDiscovery) RegisterGoodAddr(string, capability.Capabilities) {} func (d *testDiscovery) RegisterConnectedAddr(addr string) { d.Lock() diff --git a/pkg/network/server.go b/pkg/network/server.go index 99a7291cd..0e505db16 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1363,8 +1363,12 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.C return } - var replies = make(chan error, peerN) // Cache is there just to make goroutines exit faster. - var ctx, cancel = context.WithTimeout(context.Background(), s.TimePerBlock/2) + var ( + // Optimal number of recipients. + enoughN = s.discovery.GetFanOut() + replies = make(chan error, peerN) // Cache is there just to make goroutines exit faster. + ctx, cancel = context.WithTimeout(context.Background(), s.TimePerBlock/2) + ) for _, peer := range peers { go func(p Peer, ctx context.Context, pkt []byte) { // Do this before packet is sent, reader thread can get the reply before this routine wakes up. @@ -1383,8 +1387,7 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.C if sentN+deadN == peerN { break } - // Send to 2/3 of good peers. - if 3*sentN >= 2*(peerN-deadN) && ctx.Err() == nil { + if sentN >= enoughN && ctx.Err() == nil { cancel() } } From c1ef3261836efb1ae7d67195a7c7b0ac2d6bd9f5 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 13 Oct 2022 15:47:55 +0300 Subject: [PATCH 3/8] network: re-add addresses to the pool on UnregisterConnectedAddr That's what we do anyway, but this way we can be a bit more efficient. --- pkg/network/discovery.go | 8 ++++++-- pkg/network/discovery_test.go | 5 ++--- pkg/network/server.go | 2 -- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 25204a0f9..08e59f2ee 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -83,6 +83,11 @@ func newDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) Disco // the pool with the given addresses. func (d *DefaultDiscovery) BackFill(addrs ...string) { d.lock.Lock() + d.backfill(addrs...) + d.lock.Unlock() +} + +func (d *DefaultDiscovery) backfill(addrs ...string) { for _, addr := range addrs { if d.badAddrs[addr] || d.connectedAddrs[addr] || d.unconnectedAddrs[addr] > 0 { @@ -92,7 +97,6 @@ func (d *DefaultDiscovery) BackFill(addrs ...string) { d.pushToPoolOrDrop(addr) } d.updateNetSize() - d.lock.Unlock() } // PoolCount returns the number of the available node addresses. @@ -187,7 +191,7 @@ func (d *DefaultDiscovery) RegisterGoodAddr(s string, c capability.Capabilities) func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) { d.lock.Lock() delete(d.connectedAddrs, s) - d.updateNetSize() + d.backfill(s) d.lock.Unlock() } diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index c1d871bed..047aceb9b 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -132,14 +132,13 @@ func TestDefaultDiscoverer(t *testing.T) { for _, addr := range set1 { d.UnregisterConnectedAddr(addr) } - assert.Equal(t, 0, len(d.UnconnectedPeers())) + assert.Equal(t, 2, len(d.UnconnectedPeers())) // They're re-added automatically. assert.Equal(t, 0, len(d.BadPeers())) assert.Equal(t, len(set1), len(d.GoodPeers())) - require.Equal(t, 0, d.PoolCount()) + require.Equal(t, 2, d.PoolCount()) // Now make Dial() fail and wait to see addresses in the bad list. atomic.StoreInt32(&ts.retFalse, 1) - d.BackFill(set1...) assert.Equal(t, len(set1), d.PoolCount()) set1D := d.UnconnectedPeers() sort.Strings(set1D) diff --git a/pkg/network/server.go b/pkg/network/server.go index 0e505db16..b33901543 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -439,11 +439,9 @@ func (s *Server) run() { s.lock.RUnlock() if !stillConnected { s.discovery.UnregisterConnectedAddr(addr) - s.discovery.BackFill(addr) } } else { s.discovery.UnregisterConnectedAddr(addr) - s.discovery.BackFill(addr) } updatePeersConnectedMetric(s.PeerCount()) } else { From 215e8704f1f10294092a2d4cf651d2c667fa5282 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 13 Oct 2022 18:26:05 +0300 Subject: [PATCH 4/8] network: simplify discoverer, make it almost a lib We already have two basic lists: connected and unconnected nodes, we don't need an additional channel and we don't need a goroutine to handle it. --- pkg/network/discovery.go | 156 ++++++++++++---------------------- pkg/network/discovery_test.go | 6 +- pkg/network/helper_test.go | 2 - pkg/network/server.go | 1 - 4 files changed, 55 insertions(+), 110 deletions(-) diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 08e59f2ee..221db32ee 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -10,7 +10,7 @@ import ( ) const ( - maxPoolSize = 200 + maxPoolSize = 10000 connRetries = 3 ) @@ -18,7 +18,6 @@ const ( // a healthy connection pool. type Discoverer interface { BackFill(...string) - Close() GetFanOut() int PoolCount() int RequestRemote(int) @@ -42,7 +41,6 @@ type DefaultDiscovery struct { seeds []string transport Transporter lock sync.RWMutex - closeMtx sync.RWMutex dialTimeout time.Duration badAddrs map[string]bool connectedAddrs map[string]bool @@ -50,10 +48,7 @@ type DefaultDiscovery struct { unconnectedAddrs map[string]int attempted map[string]bool optimalFanOut int32 - isDead bool requestCh chan int - pool chan string - runExit chan struct{} } // NewDefaultDiscovery returns a new DefaultDiscovery. @@ -68,10 +63,7 @@ func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *Defa unconnectedAddrs: make(map[string]int), attempted: make(map[string]bool), requestCh: make(chan int), - pool: make(chan string, maxPoolSize), - runExit: make(chan struct{}), } - go d.run() return d } @@ -93,7 +85,6 @@ func (d *DefaultDiscovery) backfill(addrs ...string) { d.unconnectedAddrs[addr] > 0 { continue } - d.unconnectedAddrs[addr] = connRetries d.pushToPoolOrDrop(addr) } d.updateNetSize() @@ -101,40 +92,73 @@ func (d *DefaultDiscovery) backfill(addrs ...string) { // PoolCount returns the number of the available node addresses. func (d *DefaultDiscovery) PoolCount() int { - return len(d.pool) + d.lock.RLock() + defer d.lock.RUnlock() + return d.poolCount() +} + +func (d *DefaultDiscovery) poolCount() int { + return len(d.unconnectedAddrs) } // pushToPoolOrDrop tries to push the address given into the pool, but if the pool // is already full, it just drops it. func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) { - select { - case d.pool <- addr: - updatePoolCountMetric(d.PoolCount()) - // ok, queued - default: - // whatever + if len(d.unconnectedAddrs) < maxPoolSize { + d.unconnectedAddrs[addr] = connRetries } } // RequestRemote tries to establish a connection with n nodes. -func (d *DefaultDiscovery) RequestRemote(n int) { - d.closeMtx.RLock() - if !d.isDead { - d.requestCh <- n +func (d *DefaultDiscovery) RequestRemote(requested int) { + for ; requested > 0; requested-- { + var nextAddr string + d.lock.Lock() + for addr := range d.unconnectedAddrs { + if !d.connectedAddrs[addr] && !d.attempted[addr] { + nextAddr = addr + break + } + } + + if nextAddr == "" { + // Empty pool, try seeds. + for _, addr := range d.seeds { + if !d.connectedAddrs[addr] && !d.attempted[addr] { + nextAddr = addr + break + } + } + } + if nextAddr == "" { + d.lock.Unlock() + // The pool is empty, but all seed nodes are already connected (or attempted), + // we can end up in an infinite loop here, so drop the request. + break + } + d.attempted[nextAddr] = true + d.lock.Unlock() + go d.tryAddress(nextAddr) } - d.closeMtx.RUnlock() } // RegisterBadAddr registers the given address as a bad address. func (d *DefaultDiscovery) RegisterBadAddr(addr string) { + var isSeed bool d.lock.Lock() - d.unconnectedAddrs[addr]-- - if d.unconnectedAddrs[addr] > 0 { - d.pushToPoolOrDrop(addr) - } else { - d.badAddrs[addr] = true - delete(d.unconnectedAddrs, addr) - delete(d.goodAddrs, addr) + for _, seed := range d.seeds { + if addr == seed { + isSeed = true + break + } + } + if !isSeed { + d.unconnectedAddrs[addr]-- + if d.unconnectedAddrs[addr] <= 0 { + d.badAddrs[addr] = true + delete(d.unconnectedAddrs, addr) + delete(d.goodAddrs, addr) + } } d.updateNetSize() d.lock.Unlock() @@ -219,6 +243,7 @@ func (d *DefaultDiscovery) updateNetSize() { atomic.StoreInt32(&d.optimalFanOut, int32(fanOut+0.5)) // Truncating conversion, hence +0.5. updateNetworkSizeMetric(netsize) + updatePoolCountMetric(d.poolCount()) } func (d *DefaultDiscovery) tryAddress(addr string) { @@ -231,76 +256,3 @@ func (d *DefaultDiscovery) tryAddress(addr string) { d.RequestRemote(1) } } - -// Close stops discoverer pool processing, which makes the discoverer almost useless. -func (d *DefaultDiscovery) Close() { - d.closeMtx.Lock() - d.isDead = true - d.closeMtx.Unlock() - select { - case <-d.requestCh: // Drain the channel if there is anything there. - default: - } - close(d.requestCh) - <-d.runExit -} - -// run is a goroutine that makes DefaultDiscovery process its queue to connect -// to other nodes. -func (d *DefaultDiscovery) run() { - var requested, oldRequest, r int - var ok bool - - for { - if requested == 0 { - requested, ok = <-d.requestCh - } - oldRequest = requested - for ok && requested > 0 { - select { - case r, ok = <-d.requestCh: - if requested <= r { - requested = r - } - case addr := <-d.pool: - updatePoolCountMetric(d.PoolCount()) - d.lock.Lock() - if !d.connectedAddrs[addr] && !d.attempted[addr] { - d.attempted[addr] = true - go d.tryAddress(addr) - requested-- - } - d.lock.Unlock() - default: // Empty pool - var added int - d.lock.Lock() - for _, addr := range d.seeds { - if !d.connectedAddrs[addr] { - delete(d.badAddrs, addr) - d.unconnectedAddrs[addr] = connRetries - d.pushToPoolOrDrop(addr) - added++ - } - } - d.lock.Unlock() - // The pool is empty, but all seed nodes are already connected, - // we can end up in an infinite loop here, so drop the request. - if added == 0 { - requested = 0 - } - } - } - if !ok { - break - } - // Special case, no connections after all attempts. - d.lock.RLock() - connected := len(d.connectedAddrs) - d.lock.RUnlock() - if connected == 0 { - time.Sleep(d.dialTimeout) - requested = oldRequest - } - } - close(d.runExit) -} diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index 047aceb9b..fdcea6af1 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -157,7 +157,7 @@ func TestDefaultDiscoverer(t *testing.T) { } } } - require.Equal(t, 0, d.PoolCount()) + require.Eventually(t, func() bool { return d.PoolCount() == 0 }, 2*time.Second, 50*time.Millisecond) sort.Strings(dialledBad) for i := 0; i < len(set1); i++ { for j := 0; j < connRetries; j++ { @@ -174,10 +174,6 @@ func TestDefaultDiscoverer(t *testing.T) { assert.Equal(t, len(set1), len(d.BadPeers())) assert.Equal(t, 0, len(d.GoodPeers())) require.Equal(t, 0, d.PoolCount()) - - // Close should work and subsequent RequestRemote is a no-op. - d.Close() - d.RequestRemote(42) } func TestSeedDiscovery(t *testing.T) { diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 8be8339f5..5692482cf 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -33,7 +33,6 @@ func (d *testDiscovery) BackFill(addrs ...string) { defer d.Unlock() d.backfill = append(d.backfill, addrs...) } -func (d *testDiscovery) Close() {} func (d *testDiscovery) PoolCount() int { return 0 } func (d *testDiscovery) RegisterBadAddr(addr string) { d.Lock() @@ -204,6 +203,5 @@ func newTestServerWithCustomCfg(t *testing.T, serverConfig ServerConfig, protoco s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), new(fakechain.FakeStateSync), zaptest.NewLogger(t), newFakeTransp, newTestDiscovery) require.NoError(t, err) - t.Cleanup(s.discovery.Close) return s } diff --git a/pkg/network/server.go b/pkg/network/server.go index b33901543..542a934b6 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -261,7 +261,6 @@ func (s *Server) Start(errChan chan error) { func (s *Server) Shutdown() { s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.transport.Close() - s.discovery.Close() for _, p := range s.getPeers(nil) { p.Disconnect(errServerShutdown) } From 3ed140abbf8fec1af2889cf4041129d02da69347 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 13 Oct 2022 18:49:24 +0300 Subject: [PATCH 5/8] config: raise mainnet/testnet MinPeers to 10 That's the configuration C# node has by default and 5 is really a very small number of peers. --- config/protocol.mainnet.yml | 2 +- config/protocol.testnet.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/protocol.mainnet.yml b/config/protocol.mainnet.yml index 5325d6afe..b511485ec 100644 --- a/config/protocol.mainnet.yml +++ b/config/protocol.mainnet.yml @@ -69,7 +69,7 @@ ApplicationConfiguration: PingTimeout: 90 MaxPeers: 100 AttemptConnPeers: 20 - MinPeers: 5 + MinPeers: 10 Oracle: Enabled: false AllowedContentTypes: diff --git a/config/protocol.testnet.yml b/config/protocol.testnet.yml index c5cc9ec0f..30696bc06 100644 --- a/config/protocol.testnet.yml +++ b/config/protocol.testnet.yml @@ -72,7 +72,7 @@ ApplicationConfiguration: PingTimeout: 90 MaxPeers: 100 AttemptConnPeers: 20 - MinPeers: 5 + MinPeers: 10 Oracle: Enabled: false AllowedContentTypes: From c17b2afab5723b0b6d2bbdc408683512d58f18cf Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 13 Oct 2022 22:14:14 +0300 Subject: [PATCH 6/8] network: add BroadcastFactor to control gossip, fix #2678 --- docs/node-configuration.md | 1 + pkg/config/application_config.go | 9 ++++++--- pkg/network/server.go | 9 +++++++++ pkg/network/server_config.go | 4 ++++ 4 files changed, 20 insertions(+), 3 deletions(-) diff --git a/docs/node-configuration.md b/docs/node-configuration.md index fc4fb9340..453b6fa5b 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -19,6 +19,7 @@ node-related settings described in the table below. | Address | `string` | `0.0.0.0` | Node address that P2P protocol handler binds to. | | AnnouncedPort | `uint16` | Same as `NodePort` | Node port which should be used to announce node's port on P2P layer, it can differ from the `NodePort` the node is bound to (for example, if your node is behind NAT). | | AttemptConnPeers | `int` | `20` | Number of connection to try to establish when the connection count drops below the `MinPeers` value.| +| BroadcastFactor | `int` | `0` | Multiplier that is used to determine the number of optimal gossip fan-out peer number for broadcasted messages (0-100). By default it's zero, node uses the most optimized value depending on the estimated network size (`2.5×log(size)`), so the node may have 20 peers and calculate that it needs to broadcast messages to just 10 of them. With BroadcastFactor set to 100 it will always send messages to all peers, any value in-between 0 and 100 is used for weighted calculation, for example if it's 30 then 13 neighbors will be used in the previous case. | | DBConfiguration | [DB Configuration](#DB-Configuration) | | Describes configuration for database. See the [DB Configuration](#DB-Configuration) section for details. | | DialTimeout | `int64` | `0` | Maximum duration a single dial may take in seconds. | | ExtensiblePoolSize | `int` | `20` | Maximum amount of the extensible payloads from a single sender stored in a local pool. | diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index ac6db77af..bee9d461f 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -6,9 +6,11 @@ import ( // ApplicationConfiguration config specific to the node. type ApplicationConfiguration struct { - Address string `yaml:"Address"` - AnnouncedNodePort uint16 `yaml:"AnnouncedPort"` - AttemptConnPeers int `yaml:"AttemptConnPeers"` + Address string `yaml:"Address"` + AnnouncedNodePort uint16 `yaml:"AnnouncedPort"` + AttemptConnPeers int `yaml:"AttemptConnPeers"` + // BroadcastFactor is the factor (0-100) controlling gossip fan-out number optimization. + BroadcastFactor int `yaml:"BroadcastFactor"` DBConfiguration dbconfig.DBConfiguration `yaml:"DBConfiguration"` DialTimeout int64 `yaml:"DialTimeout"` LogPath string `yaml:"LogPath"` @@ -36,6 +38,7 @@ func (a *ApplicationConfiguration) EqualsButServices(o *ApplicationConfiguration if a.Address != o.Address || a.AnnouncedNodePort != o.AnnouncedNodePort || a.AttemptConnPeers != o.AttemptConnPeers || + a.BroadcastFactor != o.BroadcastFactor || a.DBConfiguration != o.DBConfiguration || a.DialTimeout != o.DialTimeout || a.ExtensiblePoolSize != o.ExtensiblePoolSize || diff --git a/pkg/network/server.go b/pkg/network/server.go index 542a934b6..6cef30e0c 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -35,6 +35,7 @@ const ( defaultAttemptConnPeers = 20 defaultMaxPeers = 100 defaultExtensiblePoolSize = 20 + defaultBroadcastFactor = 0 maxBlockBatch = 200 minPoolCount = 30 ) @@ -222,6 +223,13 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy s.AttemptConnPeers = defaultAttemptConnPeers } + if s.BroadcastFactor < 0 || s.BroadcastFactor > 100 { + s.log.Info("bad BroadcastFactor configured, using the default value", + zap.Int("configured", s.BroadcastFactor), + zap.Int("actual", defaultBroadcastFactor)) + s.BroadcastFactor = defaultBroadcastFactor + } + s.transport = newTransport(s) s.discovery = newDiscovery( s.Seeds, @@ -1366,6 +1374,7 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.C replies = make(chan error, peerN) // Cache is there just to make goroutines exit faster. ctx, cancel = context.WithTimeout(context.Background(), s.TimePerBlock/2) ) + enoughN = (enoughN*(100-s.BroadcastFactor) + peerN*s.BroadcastFactor) / 100 for _, peer := range peers { go func(p Peer, ctx context.Context, pkt []byte) { // Do this before packet is sent, reader thread can get the reply before this routine wakes up. diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index e46f52900..c3eb3baa4 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -78,6 +78,9 @@ type ( // ExtensiblePoolSize is the size of the pool for extensible payloads from a single sender. ExtensiblePoolSize int + + // BroadcastFactor is the factor (0-100) for fan-out optimization. + BroadcastFactor int } ) @@ -107,5 +110,6 @@ func NewServerConfig(cfg config.Config) ServerConfig { P2PNotaryCfg: appConfig.P2PNotary, StateRootCfg: appConfig.StateRoot, ExtensiblePoolSize: appConfig.ExtensiblePoolSize, + BroadcastFactor: appConfig.BroadcastFactor, } } From 851cbc7dab4f40c9c9095642678c249334f3ae68 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 13 Oct 2022 22:53:20 +0300 Subject: [PATCH 7/8] network: implement adaptive peer requests When the network is big enough, MinPeers may be suboptimal for good network connectivity, but if we know the network size we can do some estimation on the number of sufficient peers. --- pkg/network/discovery.go | 8 ++++++++ pkg/network/helper_test.go | 5 +++++ pkg/network/server.go | 22 ++++++++++++++++++++-- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 221db32ee..cb4e32555 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -19,6 +19,7 @@ const ( type Discoverer interface { BackFill(...string) GetFanOut() int + NetworkSize() int PoolCount() int RequestRemote(int) RegisterBadAddr(string) @@ -48,6 +49,7 @@ type DefaultDiscovery struct { unconnectedAddrs map[string]int attempted map[string]bool optimalFanOut int32 + networkSize int32 requestCh chan int } @@ -233,6 +235,11 @@ func (d *DefaultDiscovery) GetFanOut() int { return int(atomic.LoadInt32(&d.optimalFanOut)) } +// NetworkSize returns the estimated network size. +func (d *DefaultDiscovery) NetworkSize() int { + return int(atomic.LoadInt32(&d.networkSize)) +} + // updateNetSize updates network size estimation metric. Must be called under read lock. func (d *DefaultDiscovery) updateNetSize() { var netsize = len(d.connectedAddrs) + len(d.unconnectedAddrs) + 1 // 1 for the node itself. @@ -242,6 +249,7 @@ func (d *DefaultDiscovery) updateNetSize() { } atomic.StoreInt32(&d.optimalFanOut, int32(fanOut+0.5)) // Truncating conversion, hence +0.5. + atomic.StoreInt32(&d.networkSize, int32(netsize)) updateNetworkSizeMetric(netsize) updatePoolCountMetric(d.poolCount()) } diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 5692482cf..00c83a7d0 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -40,6 +40,11 @@ func (d *testDiscovery) RegisterBadAddr(addr string) { d.bad = append(d.bad, addr) } func (d *testDiscovery) GetFanOut() int { + d.Lock() + defer d.Unlock() + return (len(d.connected) + len(d.backfill)) * 2 / 3 +} +func (d *testDiscovery) NetworkSize() int { d.Lock() defer d.Unlock() return len(d.connected) + len(d.backfill) diff --git a/pkg/network/server.go b/pkg/network/server.go index 6cef30e0c..888bc44b4 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -387,10 +387,28 @@ func (s *Server) ConnectedPeers() []string { // while itself dealing with peers management (handling connects/disconnects). func (s *Server) run() { go s.runProto() - for { - if s.PeerCount() < s.MinPeers { + for loopCnt := 0; ; loopCnt++ { + var ( + netSize = s.discovery.NetworkSize() + // "Optimal" number of peers. + optimalN = s.discovery.GetFanOut() * 2 + // Real number of peers. + peerN = s.PeerCount() + ) + + if peerN < s.MinPeers { + // Starting up or going below the minimum -> quickly get many new peers. s.discovery.RequestRemote(s.AttemptConnPeers) + } else if s.MinPeers > 0 && loopCnt%s.MinPeers == 0 && optimalN > peerN && optimalN < s.MaxPeers && optimalN < netSize { + // Having some number of peers, but probably can get some more, the network is big. + // It also allows to start picking up new peers proactively, before we suddenly have optimalN-peerN { + connN = optimalN - peerN + } + s.discovery.RequestRemote(connN) } + if s.discovery.PoolCount() < minPoolCount { s.broadcastHPMessage(NewMessage(CMDGetAddr, payload.NewNullPayload())) } From 65f0fadddbcfdf4ae2750f6c34346e05edb23db1 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 14 Oct 2022 10:16:50 +0300 Subject: [PATCH 8/8] network: register peer only if it's not a duplicate --- pkg/network/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 888bc44b4..6bef2c817 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -670,7 +670,6 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { return errInvalidNetwork } peerAddr := p.PeerAddr().String() - s.discovery.RegisterConnectedAddr(peerAddr) s.lock.RLock() for peer := range s.peers { if p == peer { @@ -684,6 +683,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { } } s.lock.RUnlock() + s.discovery.RegisterConnectedAddr(peerAddr) return p.SendVersionAck(NewMessage(CMDVerack, payload.NewNullPayload())) }