network: filter out not-yet-ready nodes when broadcasting

They can fail right in the getPeers or they can fail later when packet send
is attempted. Of course they can complete handshake in-between these events,
but most likely they won't and we'll waste more resources on this attempt. So
rule out bad peers immediately.
This commit is contained in:
Roman Khimov 2022-10-12 15:46:58 +03:00
parent 137f2cb192
commit bcf77c3c42
3 changed files with 22 additions and 22 deletions

View file

@ -71,7 +71,7 @@ type localPeer struct {
server *Server
version *payload.Version
lastBlockIndex uint32
handshaked bool
handshaked int32 // TODO: use atomic.Bool after #2626.
isFullNode bool
t *testing.T
messageHandler func(t *testing.T, msg *Message)
@ -147,7 +147,7 @@ func (p *localPeer) SendVersionAck(m *Message) error {
return nil
}
func (p *localPeer) HandleVersionAck() error {
p.handshaked = true
atomic.StoreInt32(&p.handshaked, 1)
return nil
}
func (p *localPeer) SetPingTimer() {
@ -165,7 +165,7 @@ func (p *localPeer) HandlePong(pong *payload.Ping) error {
}
func (p *localPeer) Handshaked() bool {
return p.handshaked
return atomic.LoadInt32(&p.handshaked) != 0
}
func (p *localPeer) IsFullNode() bool {

View file

@ -1388,12 +1388,12 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.C
// broadcastMessage sends the message to all available peers.
func (s *Server) broadcastMessage(msg *Message) {
s.iteratePeersWithSendMsg(msg, Peer.BroadcastPacket, nil)
s.iteratePeersWithSendMsg(msg, Peer.BroadcastPacket, Peer.Handshaked)
}
// broadcastHPMessage sends the high-priority message to all available peers.
func (s *Server) broadcastHPMessage(msg *Message) {
s.iteratePeersWithSendMsg(msg, Peer.BroadcastHPPacket, nil)
s.iteratePeersWithSendMsg(msg, Peer.BroadcastHPPacket, Peer.Handshaked)
}
// relayBlocksLoop subscribes to new blocks in the ledger and broadcasts them

View file

@ -372,7 +372,7 @@ func TestServerNotSendsVerack(t *testing.T) {
func (s *Server) testHandleMessage(t *testing.T, p Peer, cmd CommandType, pl payload.Payload) *Server {
if p == nil {
p = newLocalPeer(t, s)
p.(*localPeer).handshaked = true
p.(*localPeer).handshaked = 1
}
msg := NewMessage(cmd, pl)
require.NoError(t, s.handleMessage(p, msg))
@ -419,7 +419,7 @@ func TestConsensus(t *testing.T) {
atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4)
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
s.register <- p
require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10)
@ -491,7 +491,7 @@ func (s *Server) testHandleGetData(t *testing.T, invType payload.InventoryType,
var recvNotFound atomic.Bool
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
p.messageHandler = func(t *testing.T, msg *Message) {
switch msg.Command {
case CMDTX, CMDBlock, CMDExtensible, CMDP2PNotaryRequest:
@ -587,7 +587,7 @@ func TestGetBlocks(t *testing.T) {
}
var actual []util.Uint256
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
p.messageHandler = func(t *testing.T, msg *Message) {
if msg.Command == CMDInv {
actual = msg.Payload.(*payload.Inventory).Hashes
@ -614,7 +614,7 @@ func TestGetBlockByIndex(t *testing.T) {
var expected []*block.Block
var actual []*block.Block
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
p.messageHandler = func(t *testing.T, msg *Message) {
if msg.Command == CMDBlock {
actual = append(actual, msg.Payload.(*block.Block))
@ -652,7 +652,7 @@ func TestGetHeaders(t *testing.T) {
var actual *payload.Headers
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
p.messageHandler = func(t *testing.T, msg *Message) {
if msg.Command == CMDHeaders {
actual = msg.Payload.(*payload.Headers)
@ -690,7 +690,7 @@ func TestInv(t *testing.T) {
var actual []util.Uint256
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
p.messageHandler = func(t *testing.T, msg *Message) {
if msg.Command == CMDGetData {
actual = msg.Payload.(*payload.Inventory).Hashes
@ -752,7 +752,7 @@ func TestHandleGetMPTData(t *testing.T) {
t.Run("P2PStateExchange extensions off", func(t *testing.T) {
s := startTestServer(t)
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
msg := NewMessage(CMDGetMPTData, &payload.MPTInventory{
Hashes: []util.Uint256{{1, 2, 3}},
})
@ -776,7 +776,7 @@ func TestHandleGetMPTData(t *testing.T) {
Nodes: [][]byte{node}, // no duplicates expected
}
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
p.messageHandler = func(t *testing.T, msg *Message) {
switch msg.Command {
case CMDMPTData:
@ -809,7 +809,7 @@ func TestHandleMPTData(t *testing.T) {
t.Run("P2PStateExchange extensions off", func(t *testing.T) {
s := startTestServer(t)
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
msg := NewMessage(CMDMPTData, &payload.MPTData{
Nodes: [][]byte{{1, 2, 3}},
})
@ -829,7 +829,7 @@ func TestHandleMPTData(t *testing.T) {
startWithCleanup(t, s)
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
msg := NewMessage(CMDMPTData, &payload.MPTData{
Nodes: expected,
})
@ -842,7 +842,7 @@ func TestRequestMPTNodes(t *testing.T) {
var actual []util.Uint256
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
p.messageHandler = func(t *testing.T, msg *Message) {
if msg.Command == CMDGetMPTData {
actual = append(actual, msg.Payload.(*payload.MPTInventory).Hashes...)
@ -887,7 +887,7 @@ func TestRequestTx(t *testing.T) {
var actual []util.Uint256
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
p.messageHandler = func(t *testing.T, msg *Message) {
if msg.Command == CMDGetData {
actual = append(actual, msg.Payload.(*payload.Inventory).Hashes...)
@ -938,7 +938,7 @@ func TestAddrs(t *testing.T) {
}
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
p.getAddrSent = 1
pl := &payload.AddressList{
Addrs: []*payload.AddressAndTime{
@ -990,7 +990,7 @@ func TestMemPool(t *testing.T) {
var actual []util.Uint256
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
p.messageHandler = func(t *testing.T, msg *Message) {
if msg.Command == CMDInv {
actual = append(actual, msg.Payload.(*payload.Inventory).Hashes...)
@ -1070,12 +1070,12 @@ func TestTryInitStateSync(t *testing.T) {
s := startTestServer(t)
for _, h := range []uint32{10, 8, 7, 4, 11, 4} {
p := newLocalPeer(t, s)
p.handshaked = true
p.handshaked = 1
p.lastBlockIndex = h
s.register <- p
}
p := newLocalPeer(t, s)
p.handshaked = false // one disconnected peer to check it won't be taken into attention
p.handshaked = 0 // one disconnected peer to check it won't be taken into attention
p.lastBlockIndex = 5
s.register <- p
require.Eventually(t, func() bool { return 7 == s.PeerCount() }, time.Second, time.Millisecond*10)