From f78bd6474f2d0131b0750959e9c3bd80d45c8e66 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 3 Aug 2021 21:55:34 +0300 Subject: [PATCH 1/9] network: handle incoming message in a separate goroutine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Network communication takes time. Handling some messages (like transaction) also takes time. We can share this time by making handler a separate goroutine. So while message is being handled receiver can already get and parse the next one. It doesn't improve metrics a lot, but still I think it makes sense and in some scenarios this can be more beneficial than this. e41fc2fd1b8826c52a8ec221985a0a990356319d, 4 nodes, 10 workers RPS 6732.979 6396.160 6759.624 6246.398 6589.841 ≈ 6545 ± 3.02% TPS 6491.062 5984.190 6275.652 5867.477 6360.797 ≈ 6196 ± 3.77% CPU % 42.053 43.515 44.768 40.344 44.112 ≈ 43.0 ± 3.69% Mem MB 2564.130 2744.236 2636.267 2589.505 2765.926 ≈ 2660 ± 3.06% Patched: RPS 6902.296 6465.662 6856.044 6785.515 6157.024 ≈ 6633 ± 4.26% ↑ 1.34% TPS 6468.431 6218.867 6610.565 6288.596 5790.556 ≈ 6275 ± 4.44% ↑ 1.28% CPU % 50.231 42.925 49.481 48.396 42.662 ≈ 46.7 ± 7.01% ↑ 8.60% Mem MB 2856.841 2684.103 2756.195 2733.485 2422.787 ≈ 2691 ± 5.40% ↑ 1.17% --- pkg/network/tcp_peer.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 85678d297..8ff47a18c 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -26,6 +26,7 @@ const ( requestQueueSize = 32 p2pMsgQueueSize = 16 hpRequestQueueSize = 4 + incomingQueueSize = 1 // Each message can be up to 32MB in size. ) var ( @@ -57,6 +58,7 @@ type TCPPeer struct { sendQ chan []byte p2pSendQ chan []byte hpSendQ chan []byte + incoming chan *Message // track outstanding getaddr requests. getAddrSent atomic.Int32 @@ -75,6 +77,7 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { sendQ: make(chan []byte, requestQueueSize), p2pSendQ: make(chan []byte, p2pMsgQueueSize), hpSendQ: make(chan []byte, hpRequestQueueSize), + incoming: make(chan *Message, incomingQueueSize), } } @@ -158,6 +161,7 @@ func (p *TCPPeer) handleConn() { p.server.register <- p go p.handleQueues() + go p.handleIncoming() // When a new peer is connected we send out our version immediately. err = p.SendVersion() if err == nil { @@ -172,12 +176,22 @@ func (p *TCPPeer) handleConn() { } else if err != nil { break } - if err = p.server.handleMessage(p, msg); err != nil { - if p.Handshaked() { - err = fmt.Errorf("handling %s message: %w", msg.Command.String(), err) - } - break + p.incoming <- msg + } + } + close(p.incoming) + p.Disconnect(err) +} + +func (p *TCPPeer) handleIncoming() { + var err error + for msg := range p.incoming { + err = p.server.handleMessage(p, msg) + if err != nil { + if p.Handshaked() { + err = fmt.Errorf("handling %s message: %w", msg.Command.String(), err) } + break } } p.Disconnect(err) From 7fc153ed2a8071481078c7e8bf5d4fa44e8f0037 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 3 Aug 2021 22:28:16 +0300 Subject: [PATCH 2/9] network: only ask mempool for intersections with received Inv MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Most of the time on healthy network we see new transactions appearing that are not present in the mempool. Once they get into mempool we don't ask for them again when some other peer sends an Inv with them. Then these transactions are usually added into block, removed from mempool and no one actually sends them again to us. Some stale nodes can do that, but it's not very likely to happen. At the receiving end at the same time it's quite expensive to do full chain HasTransaction() query, so if we can avoid doing that it's always good. Here it technically allows resending old transaction that will be re-requested and an attempt to add it to mempool will be made. But it'll inevitably fail because the same HasTransaction() check is done there too. One can try to maliciously flood the node with stale transactions but it doesn't differ from flooding it with any other invalid transactions, so there is no new attack vector added. Baseline, 4 nodes with 10 workers: RPS 6902.296 6465.662 6856.044 6785.515 6157.024 ≈ 6633 ± 4.26% TPS 6468.431 6218.867 6610.565 6288.596 5790.556 ≈ 6275 ± 4.44% CPU % 50.231 42.925 49.481 48.396 42.662 ≈ 46.7 ± 7.01% Mem MB 2856.841 2684.103 2756.195 2733.485 2422.787 ≈ 2691 ± 5.40% Patched: RPS 7176.784 7014.511 6139.663 7191.280 7080.852 ≈ 6921 ± 5.72% ↑ 4.34% TPS 6945.409 6562.756 5927.050 6681.187 6821.794 ≈ 6588 ± 5.38% ↑ 4.99% CPU % 44.400 43.842 40.418 49.211 49.370 ≈ 45.4 ± 7.53% ↓ 2.78% Mem MB 2693.414 2640.602 2472.007 2731.482 2707.879 ≈ 2649 ± 3.53% ↓ 1.56% --- pkg/network/server.go | 9 +++++---- pkg/network/server_test.go | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 74e9b5502..5b8032de8 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -68,6 +68,7 @@ type ( chain blockchainer.Blockchainer bQueue *blockQueue consensus consensus.Service + mempool *mempool.Pool notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer @@ -138,6 +139,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai unregister: make(chan peerDrop), peers: make(map[Peer]bool), syncReached: atomic.NewBool(false), + mempool: chain.GetMemPool(), extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), log: log, transactions: make(chan *transaction.Transaction, 64), @@ -655,7 +657,7 @@ func (s *Server) handlePong(p Peer, pong *payload.Ping) error { func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { reqHashes := make([]util.Uint256, 0) var typExists = map[payload.InventoryType]func(util.Uint256) bool{ - payload.TXType: s.chain.HasTransaction, + payload.TXType: s.mempool.ContainsKey, payload.BlockType: s.chain.HasBlock, payload.ExtensibleType: func(h util.Uint256) bool { cp := s.extensiblePool.Get(h) @@ -688,7 +690,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { // handleMempoolCmd handles getmempool command. func (s *Server) handleMempoolCmd(p Peer) error { - txs := s.chain.GetMemPool().GetVerifiedTransactions() + txs := s.mempool.GetVerifiedTransactions() hs := make([]util.Uint256, 0, payload.MaxHashesCount) for i := range txs { hs = append(hs, txs[i].Hash()) @@ -1247,8 +1249,7 @@ func (s *Server) initStaleMemPools() { threshold = cfg.ValidatorsCount * 2 } - mp := s.chain.GetMemPool() - mp.SetResendThreshold(uint32(threshold), s.broadcastTX) + s.mempool.SetResendThreshold(uint32(threshold), s.broadcastTX) if s.chain.P2PSigExtensionsEnabled() { s.notaryRequestPool.SetResendThreshold(uint32(threshold), s.broadcastP2PNotaryRequestPayload) } diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index d70618b54..0c9c9a358 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -694,7 +694,7 @@ func TestInv(t *testing.T) { }) t.Run("transaction", func(t *testing.T) { tx := newDummyTx() - s.chain.(*fakechain.FakeChain).PutTx(tx) + require.NoError(t, s.chain.GetMemPool().Add(tx, s.chain)) hs := []util.Uint256{random.Uint256(), tx.Hash(), random.Uint256()} s.testHandleMessage(t, p, CMDInv, &payload.Inventory{ Type: payload.TXType, From 119b4200acebd90ca09abf91f9549dce6567ccc9 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 3 Aug 2021 22:43:31 +0300 Subject: [PATCH 3/9] network: add fail-fast route for tx double processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When transaction spreads through the network many nodes are likely to get it in roughly the same time. They will rebroadcast it also in roughly the same time. As we have a number of peers it's quite likely that we'd get an Inv with the same transaction from multiple peers simultaneously. We will ask them for this transaction (independently!) and again we're likely to get it in roughly the same time. So we can easily end up with multiple threads processing the same transaction. Only one will succeed, but we can actually easily avoid doing it in the first place saving some CPU cycles for other things. Notice that we can't do it _before_ receiving a transaction because nothing guarantees that the peer will respond to our transaction request, so communication overhead is unavoidable at the moment, but saving on processing already gives quite interesting results. Baseline, four nodes with 10 workers: RPS 7176.784 7014.511 6139.663 7191.280 7080.852 ≈ 6921 ± 5.72% TPS 6945.409 6562.756 5927.050 6681.187 6821.794 ≈ 6588 ± 5.38% CPU % 44.400 43.842 40.418 49.211 49.370 ≈ 45.4 ± 7.53% Mem MB 2693.414 2640.602 2472.007 2731.482 2707.879 ≈ 2649 ± 3.53% Patched: RPS ≈ 7791.675 7996.559 7834.504 7746.705 7891.614 ≈ 7852 ± 1.10% ↑ 13.45% TPS ≈ 7241.497 7711.765 7520.211 7425.890 7334.443 ≈ 7447 ± 2.17% ↑ 13.04% CPU % 29.853 39.936 39.945 36.371 39.999 ≈ 37.2 ± 10.57% ↓ 18.06% Mem MB 2749.635 2791.609 2828.610 2910.431 2863.344 ≈ 2829 ± 1.97% ↑ 6.80% --- pkg/network/server.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/network/server.go b/pkg/network/server.go index 5b8032de8..e03c1e6bc 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -74,6 +74,9 @@ type ( notaryFeer NotaryFeer notaryModule *notary.Notary + txInLock sync.Mutex + txInMap map[util.Uint256]struct{} + lock sync.RWMutex peers map[Peer]bool @@ -137,6 +140,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai quit: make(chan struct{}), register: make(chan Peer), unregister: make(chan peerDrop), + txInMap: make(map[util.Uint256]struct{}), peers: make(map[Peer]bool), syncReached: atomic.NewBool(false), mempool: chain.GetMemPool(), @@ -876,10 +880,21 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { func (s *Server) handleTxCmd(tx *transaction.Transaction) error { // It's OK for it to fail for various reasons like tx already existing // in the pool. + s.txInLock.Lock() + _, ok := s.txInMap[tx.Hash()] + if ok || s.mempool.ContainsKey(tx.Hash()) { + s.txInLock.Unlock() + return nil + } + s.txInMap[tx.Hash()] = struct{}{} + s.txInLock.Unlock() if s.verifyAndPoolTX(tx) == nil { s.consensus.OnTransaction(tx) s.broadcastTX(tx, nil) } + s.txInLock.Lock() + delete(s.txInMap, tx.Hash()) + s.txInLock.Unlock() return nil } From b55c75d59d784e82feeb94098b5a9218d8d10e2f Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 5 Aug 2021 23:59:53 +0300 Subject: [PATCH 4/9] network: hide Peers, make it return a slice Slice is a bit more efficient, we don't need a map for Peers() users and it's not really interesting to outside users, so better hide this method. --- pkg/network/server.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index e03c1e6bc..b4ea1c073 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -288,7 +288,7 @@ func (s *Server) Shutdown() { s.transport.Close() s.discovery.Close() s.consensus.Shutdown() - for p := range s.Peers() { + for _, p := range s.getPeers() { p.Disconnect(errServerShutdown) } s.bQueue.discard() @@ -431,7 +431,7 @@ func (s *Server) runProto() { case <-pingTimer.C: if s.chain.BlockHeight() == prevHeight { // Get a copy of s.peers to avoid holding a lock while sending. - for peer := range s.Peers() { + for _, peer := range s.getPeers() { _ = peer.SendPing(NewMessage(CMDPing, payload.NewPing(s.chain.BlockHeight(), s.id))) } } @@ -489,15 +489,15 @@ func (s *Server) UnsubscribeFromNotaryRequests(ch chan<- mempoolevent.Event) { s.notaryRequestPool.UnsubscribeFromTransactions(ch) } -// Peers returns the current list of peers connected to +// getPeers returns current list of peers connected to // the server. -func (s *Server) Peers() map[Peer]bool { +func (s *Server) getPeers() []Peer { s.lock.RLock() defer s.lock.RUnlock() - peers := make(map[Peer]bool, len(s.peers)) - for k, v := range s.peers { - peers[k] = v + peers := make([]Peer, 0, len(s.peers)) + for k := range s.peers { + peers = append(peers, k) } return peers @@ -1142,7 +1142,7 @@ func (s *Server) requestTx(hashes ...util.Uint256) { // peer is considered invalid if it returns false). func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) { // Get a copy of s.peers to avoid holding a lock while sending. - peers := s.Peers() + peers := s.getPeers() if len(peers) == 0 { return } @@ -1154,7 +1154,7 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b success := make(map[Peer]bool, len(peers)) okCount := 0 sentCount := 0 - for peer := range peers { + for _, peer := range peers { if peerOK != nil && !peerOK(peer) { success[peer] = false continue @@ -1176,7 +1176,7 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b } // Perform blocking send now. - for peer := range peers { + for _, peer := range peers { if _, ok := success[peer]; ok || peerOK != nil && !peerOK(peer) { continue } From d51db20405cce39debba9689a4e31865c633b58e Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 6 Aug 2021 00:03:46 +0300 Subject: [PATCH 5/9] network: randomize peer iteration order MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit While iterating over map in getPeers() is non-deterministic it's not really random enough for our purposes (usually maps have 2-3 paths through them), we need to fill our peers queues more uniformly. Believe it or not, but it does affect performance metrics, baseline (four nodes, 10 workers): RPS ≈ 7791.675 7996.559 7834.504 7746.705 7891.614 ≈ 7852 ± 1.10% TPS ≈ 7241.497 7711.765 7520.211 7425.890 7334.443 ≈ 7447 ± 2.17% CPU % 29.853 39.936 39.945 36.371 39.999 ≈ 37.2 ± 10.57% Mem MB 2749.635 2791.609 2828.610 2910.431 2863.344 ≈ 2829 ± 1.97% Patched: RPS 8180.760 8137.822 7858.358 7820.011 8051.076 ≈ 8010 ± 2.04% ↑ 2.01% TPS 7819.831 7521.172 7519.023 7242.965 7426.000 ≈ 7506 ± 2.78% ↑ 0.79% CPU % 41.983 38.775 40.606 39.375 35.537 ≈ 39.3 ± 6.15% ↑ 5.65% Mem MB 2947.189 2743.658 2896.688 2813.276 2863.108 ≈ 2853 ± 2.74% ↑ 0.85% --- pkg/network/server.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/network/server.go b/pkg/network/server.go index b4ea1c073..0eb6e2a77 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1146,6 +1146,9 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b if len(peers) == 0 { return } + mrand.Shuffle(len(peers), func(i, j int) { + peers[i], peers[j] = peers[j], peers[i] + }) pkt, err := msg.Bytes() if err != nil { return From de6f4987f6418d7a5fe9026573797e83eaefea7f Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 6 Aug 2021 13:32:43 +0300 Subject: [PATCH 6/9] network: microoptimize iteratePeersWithSendMsg() Now that s.getPeers() returns a slice we can use slice for `success` too, maps are more expensive. --- pkg/network/server.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 0eb6e2a77..a4311a521 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1154,12 +1154,11 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b return } - success := make(map[Peer]bool, len(peers)) + success := make([]bool, len(peers)) okCount := 0 sentCount := 0 - for _, peer := range peers { + for i, peer := range peers { if peerOK != nil && !peerOK(peer) { - success[peer] = false continue } okCount++ @@ -1169,7 +1168,7 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b if msg.Command == CMDGetAddr { peer.AddGetAddrSent() } - success[peer] = true + success[i] = true sentCount++ } @@ -1179,8 +1178,8 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b } // Perform blocking send now. - for _, peer := range peers { - if _, ok := success[peer]; ok || peerOK != nil && !peerOK(peer) { + for i, peer := range peers { + if success[i] || (peerOK != nil && !peerOK(peer)) { continue } if err := send(peer, true, pkt); err != nil { From 80f3ec2312a1996e2871c5bef1ef13cb8742e166 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 6 Aug 2021 15:04:13 +0300 Subject: [PATCH 7/9] network: move peer filtering to getPeers() It doesn't change much, we can't magically get more valid peers and if some die while we're iterating we'd detect that by an error returned from send(). --- pkg/network/server.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index a4311a521..2818d5de7 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -288,7 +288,7 @@ func (s *Server) Shutdown() { s.transport.Close() s.discovery.Close() s.consensus.Shutdown() - for _, p := range s.getPeers() { + for _, p := range s.getPeers(nil) { p.Disconnect(errServerShutdown) } s.bQueue.discard() @@ -431,7 +431,7 @@ func (s *Server) runProto() { case <-pingTimer.C: if s.chain.BlockHeight() == prevHeight { // Get a copy of s.peers to avoid holding a lock while sending. - for _, peer := range s.getPeers() { + for _, peer := range s.getPeers(nil) { _ = peer.SendPing(NewMessage(CMDPing, payload.NewPing(s.chain.BlockHeight(), s.id))) } } @@ -489,14 +489,17 @@ func (s *Server) UnsubscribeFromNotaryRequests(ch chan<- mempoolevent.Event) { s.notaryRequestPool.UnsubscribeFromTransactions(ch) } -// getPeers returns current list of peers connected to -// the server. -func (s *Server) getPeers() []Peer { +// getPeers returns current list of peers connected to the server filtered by +// isOK function if it's given. +func (s *Server) getPeers(isOK func(Peer) bool) []Peer { s.lock.RLock() defer s.lock.RUnlock() peers := make([]Peer, 0, len(s.peers)) for k := range s.peers { + if isOK != nil && !isOK(k) { + continue + } peers = append(peers, k) } @@ -1142,7 +1145,7 @@ func (s *Server) requestTx(hashes ...util.Uint256) { // peer is considered invalid if it returns false). func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) { // Get a copy of s.peers to avoid holding a lock while sending. - peers := s.getPeers() + peers := s.getPeers(peerOK) if len(peers) == 0 { return } @@ -1158,9 +1161,6 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b okCount := 0 sentCount := 0 for i, peer := range peers { - if peerOK != nil && !peerOK(peer) { - continue - } okCount++ if err := send(peer, false, pkt); err != nil { continue @@ -1179,7 +1179,7 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b // Perform blocking send now. for i, peer := range peers { - if success[i] || (peerOK != nil && !peerOK(peer)) { + if success[i] { continue } if err := send(peer, true, pkt); err != nil { From 966a16e80eb7149e2559d231d7decbdf80b6e53c Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 6 Aug 2021 15:25:41 +0300 Subject: [PATCH 8/9] network: keep track of dead peers in iteratePeersWithSendMsg() send() can return errStateMismatch, errGone and errBusy. errGone means the peer is dead and it won't ever be active again, it doesn't make sense retrying sends to it. errStateMismatch is technically "not yet ready", but we can't wait for it either, no one knows how much will it take to complete handshake. So only errBusy means we can retry. So keep track of dead peers and adjust tries counting appropriately. --- pkg/network/server.go | 43 +++++++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 2818d5de7..268abb175 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1144,12 +1144,15 @@ func (s *Server) requestTx(hashes ...util.Uint256) { // passed, one is to send the message and the other is to filtrate peers (the // peer is considered invalid if it returns false). func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) { + var deadN, peerN, sentN int + // Get a copy of s.peers to avoid holding a lock while sending. peers := s.getPeers(peerOK) - if len(peers) == 0 { + peerN = len(peers) + if peerN == 0 { return } - mrand.Shuffle(len(peers), func(i, j int) { + mrand.Shuffle(peerN, func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) pkt, err := msg.Bytes() @@ -1157,39 +1160,47 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b return } - success := make([]bool, len(peers)) - okCount := 0 - sentCount := 0 + // If true, this node isn't counted any more, either it's dead or we + // have already sent an Inv to it. + finished := make([]bool, peerN) + for i, peer := range peers { - okCount++ - if err := send(peer, false, pkt); err != nil { + err := send(peer, false, pkt) + switch err { + case nil: + if msg.Command == CMDGetAddr { + peer.AddGetAddrSent() + } + sentN++ + case errBusy: continue + default: + deadN++ } - if msg.Command == CMDGetAddr { - peer.AddGetAddrSent() - } - success[i] = true - sentCount++ + finished[i] = true } // Send to at least 2/3 of good peers. - if 3*sentCount >= 2*okCount { + if 3*sentN >= 2*(peerN-deadN) { return } // Perform blocking send now. for i, peer := range peers { - if success[i] { + if finished[i] { continue } if err := send(peer, true, pkt); err != nil { + if err != errBusy { + deadN++ + } continue } if msg.Command == CMDGetAddr { peer.AddGetAddrSent() } - sentCount++ - if 3*sentCount >= 2*okCount { + sentN++ + if 3*sentN >= 2*(peerN-deadN) { return } } From 7bb82f1f99c4896453b1ea71fcf99a5c28a255da Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 6 Aug 2021 15:36:46 +0300 Subject: [PATCH 9/9] network: merge two loops in iteratePeersWithSendMsg, send to 2/3 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor code and be fine with sending to just 2/3 of proper peers. Previously it was an edge case, but it can be a normal thing to do also as broadcasting to everyone is obviously too expensive and excessive (hi, #608). Baseline (four node, 10 workers): RPS 8180.760 8137.822 7858.358 7820.011 8051.076 ≈ 8010 ± 2.04% TPS 7819.831 7521.172 7519.023 7242.965 7426.000 ≈ 7506 ± 2.78% CPU % 41.983 38.775 40.606 39.375 35.537 ≈ 39.3 ± 6.15% Mem MB 2947.189 2743.658 2896.688 2813.276 2863.108 ≈ 2853 ± 2.74% Patched: RPS 9714.567 9676.102 9358.609 9371.408 9301.372 ≈ 9484 ± 2.05% ↑ 18.40% TPS 8809.796 8796.854 8534.754 8661.158 8426.162 ≈ 8646 ± 1.92% ↑ 15.19% CPU % 44.980 45.018 33.640 29.645 43.830 ≈ 39.4 ± 18.41% ↑ 0.25% Mem MB 2989.078 2976.577 2306.185 2351.929 2910.479 ≈ 2707 ± 12.80% ↓ 5.12% There is a nuance with this patch however. While typically it works the way outlined above, sometimes it works like this: RPS ≈ 6734.368 TPS ≈ 6299.332 CPU ≈ 25.552% Mem ≈ 2706.046MB And that's because the log looks like this: DeltaTime, TransactionsCount, TPS 5014, 44212, 8817.710 5163, 49690, 9624.249 5166, 49523, 9586.334 5189, 49693, 9576.604 5198, 49339, 9491.920 5147, 49559, 9628.716 5192, 49680, 9568.567 5163, 49750, 9635.871 5183, 49189, 9490.450 5159, 49653, 9624.540 5167, 47945, 9279.079 5179, 2051, 396.022 5015, 4, 0.798 5004, 0, 0.000 5003, 0, 0.000 5003, 0, 0.000 5003, 0, 0.000 5003, 0, 0.000 5004, 0, 0.000 5003, 2925, 584.649 5040, 49099, 9741.865 5161, 49718, 9633.404 5170, 49228, 9521.857 5179, 49773, 9610.543 5167, 47253, 9145.152 5202, 49788, 9570.934 5177, 47704, 9214.603 5209, 46610, 8947.975 5249, 49156, 9364.831 5163, 18284, 3541.352 5072, 174, 34.306 On a network with 4 CNs and 1 RPC node there is 1/256 probability that a block won't be broadcasted to RPC node, so it won't see it until ping timeout kicks in. While it doesn't see a block it can't accept new incoming transactions so the bench gets stuck basically. To me that's an acceptable trade-off because normal networks are much larger than that and the effect of this patch is way more important there, but still that's what we have and we need to take into account. --- pkg/network/server.go | 55 ++++++++++++++++--------------------------- 1 file changed, 20 insertions(+), 35 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 268abb175..0fdfd2a16 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1164,44 +1164,29 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b // have already sent an Inv to it. finished := make([]bool, peerN) - for i, peer := range peers { - err := send(peer, false, pkt) - switch err { - case nil: - if msg.Command == CMDGetAddr { - peer.AddGetAddrSent() + // Try non-blocking sends first and only block if have to. + for _, blocking := range []bool{false, true} { + for i, peer := range peers { + // Send to 2/3 of good peers. + if 3*sentN >= 2*(peerN-deadN) { + return } - sentN++ - case errBusy: - continue - default: - deadN++ - } - finished[i] = true - } - - // Send to at least 2/3 of good peers. - if 3*sentN >= 2*(peerN-deadN) { - return - } - - // Perform blocking send now. - for i, peer := range peers { - if finished[i] { - continue - } - if err := send(peer, true, pkt); err != nil { - if err != errBusy { + if finished[i] { + continue + } + err := send(peer, blocking, pkt) + switch err { + case nil: + if msg.Command == CMDGetAddr { + peer.AddGetAddrSent() + } + sentN++ + case errBusy: // Can be retried. + continue + default: deadN++ } - continue - } - if msg.Command == CMDGetAddr { - peer.AddGetAddrSent() - } - sentN++ - if 3*sentN >= 2*(peerN-deadN) { - return + finished[i] = true } } }