network: introduce peer-to-peer message queue

This one is designed to give more priority to direct nodes communication, that
is that their messaging would have more priority than generic broadcasts. It
should improve consensus process under TX pressure and allow to handle
pings in time (preventing disconnects).
This commit is contained in:
Roman Khimov 2020-01-23 19:40:40 +03:00
parent 72e4eb7172
commit 9eafec0d1d
4 changed files with 83 additions and 31 deletions

View file

@ -193,6 +193,12 @@ func (p *localPeer) EnqueueMessage(msg *Message) error {
func (p *localPeer) EnqueuePacket(m []byte) error { func (p *localPeer) EnqueuePacket(m []byte) error {
return p.EnqueueHPPacket(m) return p.EnqueueHPPacket(m)
} }
func (p *localPeer) EnqueueP2PMessage(msg *Message) error {
return p.EnqueueMessage(msg)
}
func (p *localPeer) EnqueueP2PPacket(m []byte) error {
return p.EnqueueHPPacket(m)
}
func (p *localPeer) EnqueueHPPacket(m []byte) error { func (p *localPeer) EnqueueHPPacket(m []byte) error {
msg := &Message{} msg := &Message{}
r := io.NewBinReaderFromBuf(m) r := io.NewBinReaderFromBuf(m)

View file

@ -30,6 +30,20 @@ type Peer interface {
// completed handshaking. // completed handshaking.
EnqueuePacket([]byte) error EnqueuePacket([]byte) error
// EnqueueP2PMessage is a temporary wrapper that sends a message via
// EnqueueP2PPacket if there is no error in serializing it.
EnqueueP2PMessage(*Message) error
// EnqueueP2PPacket is a blocking packet enqueuer, it doesn't return until
// it puts given packet into the queue. It accepts a slice of bytes that
// can be shared with other queues (so that message marshalling can be
// done once for all peers). Does nothing is the peer is not yet
// completed handshaking. This queue is intended to be used for unicast
// peer to peer communication that is more important than broadcasts
// (handled by EnqueuePacket), but less important than high-priority
// messages (handled by EnqueueHPPacket).
EnqueueP2PPacket([]byte) error
// EnqueueHPPacket is a blocking high priority packet enqueuer, it // EnqueueHPPacket is a blocking high priority packet enqueuer, it
// doesn't return until it puts given packet into the high-priority // doesn't return until it puts given packet into the high-priority
// queue. // queue.

View file

@ -388,7 +388,7 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
// handlePing processes ping request. // handlePing processes ping request.
func (s *Server) handlePing(p Peer, ping *payload.Ping) error { func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
return p.EnqueueMessage(s.MkMsg(CMDPong, payload.NewPing(s.chain.BlockHeight(), s.id))) return p.EnqueueP2PMessage(s.MkMsg(CMDPong, payload.NewPing(s.chain.BlockHeight(), s.id)))
} }
// handlePing processes pong request. // handlePing processes pong request.
@ -430,7 +430,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
if inv.Type == payload.ConsensusType { if inv.Type == payload.ConsensusType {
return p.EnqueueHPPacket(pkt) return p.EnqueueHPPacket(pkt)
} }
return p.EnqueuePacket(pkt) return p.EnqueueP2PPacket(pkt)
} }
return nil return nil
} }
@ -462,7 +462,7 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
if inv.Type == payload.ConsensusType { if inv.Type == payload.ConsensusType {
err = p.EnqueueHPPacket(pkt) err = p.EnqueueHPPacket(pkt)
} else { } else {
err = p.EnqueuePacket(pkt) err = p.EnqueueP2PPacket(pkt)
} }
} }
if err != nil { if err != nil {
@ -500,7 +500,7 @@ func (s *Server) handleGetBlocksCmd(p Peer, gb *payload.GetBlocks) error {
} }
payload := payload.NewInventory(payload.BlockType, blockHashes) payload := payload.NewInventory(payload.BlockType, blockHashes)
msg := s.MkMsg(CMDInv, payload) msg := s.MkMsg(CMDInv, payload)
return p.EnqueueMessage(msg) return p.EnqueueP2PMessage(msg)
} }
// handleGetHeadersCmd processes the getheaders request. // handleGetHeadersCmd processes the getheaders request.
@ -530,7 +530,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlocks) error {
return nil return nil
} }
msg := s.MkMsg(CMDHeaders, &resp) msg := s.MkMsg(CMDHeaders, &resp)
return p.EnqueueMessage(msg) return p.EnqueueP2PMessage(msg)
} }
// handleConsensusCmd processes received consensus payload. // handleConsensusCmd processes received consensus payload.
@ -571,7 +571,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
netaddr, _ := net.ResolveTCPAddr("tcp", addr) netaddr, _ := net.ResolveTCPAddr("tcp", addr)
alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts) alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts)
} }
return p.EnqueueMessage(s.MkMsg(CMDAddr, alist)) return p.EnqueueP2PMessage(s.MkMsg(CMDAddr, alist))
} }
// requestHeaders sends a getheaders message to the peer. // requestHeaders sends a getheaders message to the peer.
@ -579,7 +579,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
func (s *Server) requestHeaders(p Peer) error { func (s *Server) requestHeaders(p Peer) error {
start := []util.Uint256{s.chain.CurrentHeaderHash()} start := []util.Uint256{s.chain.CurrentHeaderHash()}
payload := payload.NewGetBlocks(start, util.Uint256{}) payload := payload.NewGetBlocks(start, util.Uint256{})
return p.EnqueueMessage(s.MkMsg(CMDGetHeaders, payload)) return p.EnqueueP2PMessage(s.MkMsg(CMDGetHeaders, payload))
} }
// requestBlocks sends a getdata message to the peer // requestBlocks sends a getdata message to the peer
@ -598,7 +598,7 @@ func (s *Server) requestBlocks(p Peer) error {
} }
if len(hashes) > 0 { if len(hashes) > 0 {
payload := payload.NewInventory(payload.BlockType, hashes) payload := payload.NewInventory(payload.BlockType, hashes)
return p.EnqueueMessage(s.MkMsg(CMDGetData, payload)) return p.EnqueueP2PMessage(s.MkMsg(CMDGetData, payload))
} else if s.chain.HeaderHeight() < p.LastBlockIndex() { } else if s.chain.HeaderHeight() < p.LastBlockIndex() {
return s.requestHeaders(p) return s.requestHeaders(p)
} }

View file

@ -22,6 +22,7 @@ const (
verAckReceived verAckReceived
requestQueueSize = 32 requestQueueSize = 32
p2pMsgQueueSize = 16
hpRequestQueueSize = 4 hpRequestQueueSize = 4
) )
@ -49,6 +50,7 @@ type TCPPeer struct {
done chan struct{} done chan struct{}
sendQ chan []byte sendQ chan []byte
p2pSendQ chan []byte
hpSendQ chan []byte hpSendQ chan []byte
wg sync.WaitGroup wg sync.WaitGroup
@ -65,37 +67,56 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
server: s, server: s,
done: make(chan struct{}), done: make(chan struct{}),
sendQ: make(chan []byte, requestQueueSize), sendQ: make(chan []byte, requestQueueSize),
p2pSendQ: make(chan []byte, p2pMsgQueueSize),
hpSendQ: make(chan []byte, hpRequestQueueSize), hpSendQ: make(chan []byte, hpRequestQueueSize),
} }
} }
// EnqueuePacket implements the Peer interface. // putPacketIntoQueue puts given message into the given queue if the peer has
func (p *TCPPeer) EnqueuePacket(msg []byte) error { // done handshaking.
func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, msg []byte) error {
if !p.Handshaked() { if !p.Handshaked() {
return errStateMismatch return errStateMismatch
} }
p.sendQ <- msg queue <- msg
return nil return nil
} }
// EnqueuePacket implements the Peer interface.
func (p *TCPPeer) EnqueuePacket(msg []byte) error {
return p.putPacketIntoQueue(p.sendQ, msg)
}
// putMessageIntoQueue serializes given Message and puts it into given queue if
// the peer has done handshaking.
func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error {
b, err := msg.Bytes()
if err != nil {
return err
}
return p.putPacketIntoQueue(queue, b)
}
// EnqueueMessage is a temporary wrapper that sends a message via // EnqueueMessage is a temporary wrapper that sends a message via
// EnqueuePacket if there is no error in serializing it. // EnqueuePacket if there is no error in serializing it.
func (p *TCPPeer) EnqueueMessage(msg *Message) error { func (p *TCPPeer) EnqueueMessage(msg *Message) error {
b, err := msg.Bytes() return p.putMsgIntoQueue(p.sendQ, msg)
if err != nil { }
return err
} // EnqueueP2PPacket implements the Peer interface.
return p.EnqueuePacket(b) func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error {
return p.putPacketIntoQueue(p.p2pSendQ, msg)
}
// EnqueueP2PMessage implements the Peer interface.
func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error {
return p.putMsgIntoQueue(p.p2pSendQ, msg)
} }
// EnqueueHPPacket implements the Peer interface. It the peer is not yet // EnqueueHPPacket implements the Peer interface. It the peer is not yet
// handshaked it's a noop. // handshaked it's a noop.
func (p *TCPPeer) EnqueueHPPacket(msg []byte) error { func (p *TCPPeer) EnqueueHPPacket(msg []byte) error {
if !p.Handshaked() { return p.putPacketIntoQueue(p.hpSendQ, msg)
return errStateMismatch
}
p.hpSendQ <- msg
return nil
} }
func (p *TCPPeer) writeMsg(msg *Message) error { func (p *TCPPeer) writeMsg(msg *Message) error {
@ -158,13 +179,24 @@ func (p *TCPPeer) handleQueues() {
default: default:
} }
// If there is no message in the hp queue, block until one if msg == nil {
// Then look at the p2p queue.
select {
case <-p.done:
return
case msg = <-p.hpSendQ:
case msg = <-p.p2pSendQ:
default:
}
}
// If there is no message in HP or P2P queues, block until one
// appears in any of the queues. // appears in any of the queues.
if msg == nil { if msg == nil {
select { select {
case <-p.done: case <-p.done:
return return
case msg = <-p.hpSendQ: case msg = <-p.hpSendQ:
case msg = <-p.p2pSendQ:
case msg = <-p.sendQ: case msg = <-p.sendQ:
} }
} }