network: rework broadcasting functions, tune priorities

This gives more priority to anything related to consensus.
This commit is contained in:
Roman Khimov 2020-01-22 11:01:13 +03:00
parent 34b863d645
commit f2ffffddb7

View file

@ -662,7 +662,10 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
} }
func (s *Server) handleNewPayload(p *consensus.Payload) { func (s *Server) handleNewPayload(p *consensus.Payload) {
s.relayInventoryCmd(CMDInv, payload.ConsensusType, p.Hash()) msg := s.MkMsg(CMDInv, payload.NewInventory(payload.ConsensusType, []util.Uint256{p.Hash()}))
// It's high priority because it directly affects consensus process,
// even though it's just an inv.
s.broadcastHPMessage(msg)
} }
// getLastBlockTime returns unix timestamp for the moment when the last block // getLastBlockTime returns unix timestamp for the moment when the last block
@ -676,25 +679,44 @@ func (s *Server) requestTx(hashes ...util.Uint256) {
return return
} }
s.relayInventoryCmd(CMDGetData, payload.TXType, hashes...) msg := s.MkMsg(CMDGetData, payload.NewInventory(payload.TXType, hashes))
// It's high priority because it directly affects consensus process,
// even though it's getdata.
s.broadcastHPMessage(msg)
} }
func (s *Server) relayInventoryCmd(cmd CommandType, t payload.InventoryType, hashes ...util.Uint256) { // iteratePeersWithSendMsg sends given message to all peers using two functions
payload := payload.NewInventory(t, hashes) // passed, one is to send the message and the other is to filtrate peers (the
msg := s.MkMsg(cmd, payload) // peer is considered invalid if it returns false).
func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, []byte) error, peerOK func(Peer) bool) {
pkt, err := msg.Bytes()
if err != nil {
return
}
// Get a copy of s.peers to avoid holding a lock while sending.
for peer := range s.Peers() { for peer := range s.Peers() {
if !peer.Handshaked() || !peer.Version().Relay { if peerOK != nil && !peerOK(peer) {
continue continue
} }
// Who cares about these messages anyway? // Who cares about these messages anyway?
_ = peer.EnqueueMessage(msg) _ = send(peer, pkt)
} }
} }
// broadcastMessage sends the message to all available peers.
func (s *Server) broadcastMessage(msg *Message) {
s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, nil)
}
// broadcastHPMessage sends the high-priority message to all available peers.
func (s *Server) broadcastHPMessage(msg *Message) {
s.iteratePeersWithSendMsg(msg, Peer.EnqueueHPPacket, nil)
}
// relayBlock tells all the other connected nodes about the given block. // relayBlock tells all the other connected nodes about the given block.
func (s *Server) relayBlock(b *block.Block) { func (s *Server) relayBlock(b *block.Block) {
s.relayInventoryCmd(CMDInv, payload.BlockType, b.Hash()) msg := s.MkMsg(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()}))
s.broadcastMessage(msg)
} }
// RelayTxn a new transaction to the local node and the connected peers. // RelayTxn a new transaction to the local node and the connected peers.
@ -716,7 +738,13 @@ func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason {
return RelayOutOfMemory return RelayOutOfMemory
} }
s.relayInventoryCmd(CMDInv, payload.TXType, t.Hash()) msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, []util.Uint256{t.Hash()}))
// We need to filter out non-relaying nodes, so plain broadcast
// functions don't fit here.
s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool {
return p.Handshaked() && p.Version().Relay
})
return RelaySucceed return RelaySucceed
} }