From f2ffffddb7bc5c15b4ed6a82265a47101ccbae9f Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 22 Jan 2020 11:01:13 +0300 Subject: [PATCH] network: rework broadcasting functions, tune priorities This gives more priority to anything related to consensus. --- pkg/network/server.go | 48 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 3ac65741b..ad67fd1d6 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -662,7 +662,10 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { } func (s *Server) handleNewPayload(p *consensus.Payload) { - s.relayInventoryCmd(CMDInv, payload.ConsensusType, p.Hash()) + msg := s.MkMsg(CMDInv, payload.NewInventory(payload.ConsensusType, []util.Uint256{p.Hash()})) + // It's high priority because it directly affects consensus process, + // even though it's just an inv. + s.broadcastHPMessage(msg) } // getLastBlockTime returns unix timestamp for the moment when the last block @@ -676,25 +679,44 @@ func (s *Server) requestTx(hashes ...util.Uint256) { return } - s.relayInventoryCmd(CMDGetData, payload.TXType, hashes...) + msg := s.MkMsg(CMDGetData, payload.NewInventory(payload.TXType, hashes)) + // It's high priority because it directly affects consensus process, + // even though it's getdata. + s.broadcastHPMessage(msg) } -func (s *Server) relayInventoryCmd(cmd CommandType, t payload.InventoryType, hashes ...util.Uint256) { - payload := payload.NewInventory(t, hashes) - msg := s.MkMsg(cmd, payload) - +// iteratePeersWithSendMsg sends given message to all peers using two functions +// passed, one is to send the message and the other is to filtrate peers (the +// peer is considered invalid if it returns false). +func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, []byte) error, peerOK func(Peer) bool) { + pkt, err := msg.Bytes() + if err != nil { + return + } + // Get a copy of s.peers to avoid holding a lock while sending. for peer := range s.Peers() { - if !peer.Handshaked() || !peer.Version().Relay { + if peerOK != nil && !peerOK(peer) { continue } // Who cares about these messages anyway? - _ = peer.EnqueueMessage(msg) + _ = send(peer, pkt) } } +// broadcastMessage sends the message to all available peers. +func (s *Server) broadcastMessage(msg *Message) { + s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, nil) +} + +// broadcastHPMessage sends the high-priority message to all available peers. +func (s *Server) broadcastHPMessage(msg *Message) { + s.iteratePeersWithSendMsg(msg, Peer.EnqueueHPPacket, nil) +} + // relayBlock tells all the other connected nodes about the given block. func (s *Server) relayBlock(b *block.Block) { - s.relayInventoryCmd(CMDInv, payload.BlockType, b.Hash()) + msg := s.MkMsg(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) + s.broadcastMessage(msg) } // RelayTxn a new transaction to the local node and the connected peers. @@ -716,7 +738,13 @@ func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason { return RelayOutOfMemory } - s.relayInventoryCmd(CMDInv, payload.TXType, t.Hash()) + msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, []util.Uint256{t.Hash()})) + + // We need to filter out non-relaying nodes, so plain broadcast + // functions don't fit here. + s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { + return p.Handshaked() && p.Version().Relay + }) return RelaySucceed }