From bcf77c3c421aa6d678ee19fe065851dd039ddc56 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 12 Oct 2022 15:46:58 +0300 Subject: [PATCH] network: filter out not-yet-ready nodes when broadcasting They can fail right in the getPeers or they can fail later when packet send is attempted. Of course they can complete handshake in-between these events, but most likely they won't and we'll waste more resources on this attempt. So rule out bad peers immediately. --- pkg/network/helper_test.go | 6 +++--- pkg/network/server.go | 4 ++-- pkg/network/server_test.go | 34 +++++++++++++++++----------------- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index a44b1a038..0022a9eac 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -71,7 +71,7 @@ type localPeer struct { server *Server version *payload.Version lastBlockIndex uint32 - handshaked bool + handshaked int32 // TODO: use atomic.Bool after #2626. isFullNode bool t *testing.T messageHandler func(t *testing.T, msg *Message) @@ -147,7 +147,7 @@ func (p *localPeer) SendVersionAck(m *Message) error { return nil } func (p *localPeer) HandleVersionAck() error { - p.handshaked = true + atomic.StoreInt32(&p.handshaked, 1) return nil } func (p *localPeer) SetPingTimer() { @@ -165,7 +165,7 @@ func (p *localPeer) HandlePong(pong *payload.Ping) error { } func (p *localPeer) Handshaked() bool { - return p.handshaked + return atomic.LoadInt32(&p.handshaked) != 0 } func (p *localPeer) IsFullNode() bool { diff --git a/pkg/network/server.go b/pkg/network/server.go index 44943b484..8ef0f6eea 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1388,12 +1388,12 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.C // broadcastMessage sends the message to all available peers. func (s *Server) broadcastMessage(msg *Message) { - s.iteratePeersWithSendMsg(msg, Peer.BroadcastPacket, nil) + s.iteratePeersWithSendMsg(msg, Peer.BroadcastPacket, Peer.Handshaked) } // broadcastHPMessage sends the high-priority message to all available peers. func (s *Server) broadcastHPMessage(msg *Message) { - s.iteratePeersWithSendMsg(msg, Peer.BroadcastHPPacket, nil) + s.iteratePeersWithSendMsg(msg, Peer.BroadcastHPPacket, Peer.Handshaked) } // relayBlocksLoop subscribes to new blocks in the ledger and broadcasts them diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 578e82f7e..2e4b7cf8e 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -372,7 +372,7 @@ func TestServerNotSendsVerack(t *testing.T) { func (s *Server) testHandleMessage(t *testing.T, p Peer, cmd CommandType, pl payload.Payload) *Server { if p == nil { p = newLocalPeer(t, s) - p.(*localPeer).handshaked = true + p.(*localPeer).handshaked = 1 } msg := NewMessage(cmd, pl) require.NoError(t, s.handleMessage(p, msg)) @@ -419,7 +419,7 @@ func TestConsensus(t *testing.T) { atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4) p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 s.register <- p require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) @@ -491,7 +491,7 @@ func (s *Server) testHandleGetData(t *testing.T, invType payload.InventoryType, var recvNotFound atomic.Bool p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 p.messageHandler = func(t *testing.T, msg *Message) { switch msg.Command { case CMDTX, CMDBlock, CMDExtensible, CMDP2PNotaryRequest: @@ -587,7 +587,7 @@ func TestGetBlocks(t *testing.T) { } var actual []util.Uint256 p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 p.messageHandler = func(t *testing.T, msg *Message) { if msg.Command == CMDInv { actual = msg.Payload.(*payload.Inventory).Hashes @@ -614,7 +614,7 @@ func TestGetBlockByIndex(t *testing.T) { var expected []*block.Block var actual []*block.Block p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 p.messageHandler = func(t *testing.T, msg *Message) { if msg.Command == CMDBlock { actual = append(actual, msg.Payload.(*block.Block)) @@ -652,7 +652,7 @@ func TestGetHeaders(t *testing.T) { var actual *payload.Headers p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 p.messageHandler = func(t *testing.T, msg *Message) { if msg.Command == CMDHeaders { actual = msg.Payload.(*payload.Headers) @@ -690,7 +690,7 @@ func TestInv(t *testing.T) { var actual []util.Uint256 p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 p.messageHandler = func(t *testing.T, msg *Message) { if msg.Command == CMDGetData { actual = msg.Payload.(*payload.Inventory).Hashes @@ -752,7 +752,7 @@ func TestHandleGetMPTData(t *testing.T) { t.Run("P2PStateExchange extensions off", func(t *testing.T) { s := startTestServer(t) p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 msg := NewMessage(CMDGetMPTData, &payload.MPTInventory{ Hashes: []util.Uint256{{1, 2, 3}}, }) @@ -776,7 +776,7 @@ func TestHandleGetMPTData(t *testing.T) { Nodes: [][]byte{node}, // no duplicates expected } p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 p.messageHandler = func(t *testing.T, msg *Message) { switch msg.Command { case CMDMPTData: @@ -809,7 +809,7 @@ func TestHandleMPTData(t *testing.T) { t.Run("P2PStateExchange extensions off", func(t *testing.T) { s := startTestServer(t) p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 msg := NewMessage(CMDMPTData, &payload.MPTData{ Nodes: [][]byte{{1, 2, 3}}, }) @@ -829,7 +829,7 @@ func TestHandleMPTData(t *testing.T) { startWithCleanup(t, s) p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 msg := NewMessage(CMDMPTData, &payload.MPTData{ Nodes: expected, }) @@ -842,7 +842,7 @@ func TestRequestMPTNodes(t *testing.T) { var actual []util.Uint256 p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 p.messageHandler = func(t *testing.T, msg *Message) { if msg.Command == CMDGetMPTData { actual = append(actual, msg.Payload.(*payload.MPTInventory).Hashes...) @@ -887,7 +887,7 @@ func TestRequestTx(t *testing.T) { var actual []util.Uint256 p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 p.messageHandler = func(t *testing.T, msg *Message) { if msg.Command == CMDGetData { actual = append(actual, msg.Payload.(*payload.Inventory).Hashes...) @@ -938,7 +938,7 @@ func TestAddrs(t *testing.T) { } p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 p.getAddrSent = 1 pl := &payload.AddressList{ Addrs: []*payload.AddressAndTime{ @@ -990,7 +990,7 @@ func TestMemPool(t *testing.T) { var actual []util.Uint256 p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 p.messageHandler = func(t *testing.T, msg *Message) { if msg.Command == CMDInv { actual = append(actual, msg.Payload.(*payload.Inventory).Hashes...) @@ -1070,12 +1070,12 @@ func TestTryInitStateSync(t *testing.T) { s := startTestServer(t) for _, h := range []uint32{10, 8, 7, 4, 11, 4} { p := newLocalPeer(t, s) - p.handshaked = true + p.handshaked = 1 p.lastBlockIndex = h s.register <- p } p := newLocalPeer(t, s) - p.handshaked = false // one disconnected peer to check it won't be taken into attention + p.handshaked = 0 // one disconnected peer to check it won't be taken into attention p.lastBlockIndex = 5 s.register <- p require.Eventually(t, func() bool { return 7 == s.PeerCount() }, time.Second, time.Millisecond*10)