diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 3f76a5b30..da58f3b5c 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -193,6 +193,12 @@ func (p *localPeer) EnqueueMessage(msg *Message) error { func (p *localPeer) EnqueuePacket(m []byte) error { return p.EnqueueHPPacket(m) } +func (p *localPeer) EnqueueP2PMessage(msg *Message) error { + return p.EnqueueMessage(msg) +} +func (p *localPeer) EnqueueP2PPacket(m []byte) error { + return p.EnqueueHPPacket(m) +} func (p *localPeer) EnqueueHPPacket(m []byte) error { msg := &Message{} r := io.NewBinReaderFromBuf(m) diff --git a/pkg/network/peer.go b/pkg/network/peer.go index 332a4f99a..323127b4e 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -30,6 +30,20 @@ type Peer interface { // completed handshaking. EnqueuePacket([]byte) error + // EnqueueP2PMessage is a temporary wrapper that sends a message via + // EnqueueP2PPacket if there is no error in serializing it. + EnqueueP2PMessage(*Message) error + + // EnqueueP2PPacket is a blocking packet enqueuer, it doesn't return until + // it puts given packet into the queue. It accepts a slice of bytes that + // can be shared with other queues (so that message marshalling can be + // done once for all peers). Does nothing is the peer is not yet + // completed handshaking. This queue is intended to be used for unicast + // peer to peer communication that is more important than broadcasts + // (handled by EnqueuePacket), but less important than high-priority + // messages (handled by EnqueueHPPacket). + EnqueueP2PPacket([]byte) error + // EnqueueHPPacket is a blocking high priority packet enqueuer, it // doesn't return until it puts given packet into the high-priority // queue. diff --git a/pkg/network/server.go b/pkg/network/server.go index 104c72bff..db5867254 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -388,7 +388,7 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { // handlePing processes ping request. func (s *Server) handlePing(p Peer, ping *payload.Ping) error { - return p.EnqueueMessage(s.MkMsg(CMDPong, payload.NewPing(s.chain.BlockHeight(), s.id))) + return p.EnqueueP2PMessage(s.MkMsg(CMDPong, payload.NewPing(s.chain.BlockHeight(), s.id))) } // handlePing processes pong request. @@ -430,7 +430,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { if inv.Type == payload.ConsensusType { return p.EnqueueHPPacket(pkt) } - return p.EnqueuePacket(pkt) + return p.EnqueueP2PPacket(pkt) } return nil } @@ -462,7 +462,7 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { if inv.Type == payload.ConsensusType { err = p.EnqueueHPPacket(pkt) } else { - err = p.EnqueuePacket(pkt) + err = p.EnqueueP2PPacket(pkt) } } if err != nil { @@ -500,7 +500,7 @@ func (s *Server) handleGetBlocksCmd(p Peer, gb *payload.GetBlocks) error { } payload := payload.NewInventory(payload.BlockType, blockHashes) msg := s.MkMsg(CMDInv, payload) - return p.EnqueueMessage(msg) + return p.EnqueueP2PMessage(msg) } // handleGetHeadersCmd processes the getheaders request. @@ -530,7 +530,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlocks) error { return nil } msg := s.MkMsg(CMDHeaders, &resp) - return p.EnqueueMessage(msg) + return p.EnqueueP2PMessage(msg) } // handleConsensusCmd processes received consensus payload. @@ -571,7 +571,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error { netaddr, _ := net.ResolveTCPAddr("tcp", addr) alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts) } - return p.EnqueueMessage(s.MkMsg(CMDAddr, alist)) + return p.EnqueueP2PMessage(s.MkMsg(CMDAddr, alist)) } // requestHeaders sends a getheaders message to the peer. @@ -579,7 +579,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error { func (s *Server) requestHeaders(p Peer) error { start := []util.Uint256{s.chain.CurrentHeaderHash()} payload := payload.NewGetBlocks(start, util.Uint256{}) - return p.EnqueueMessage(s.MkMsg(CMDGetHeaders, payload)) + return p.EnqueueP2PMessage(s.MkMsg(CMDGetHeaders, payload)) } // requestBlocks sends a getdata message to the peer @@ -598,7 +598,7 @@ func (s *Server) requestBlocks(p Peer) error { } if len(hashes) > 0 { payload := payload.NewInventory(payload.BlockType, hashes) - return p.EnqueueMessage(s.MkMsg(CMDGetData, payload)) + return p.EnqueueP2PMessage(s.MkMsg(CMDGetData, payload)) } else if s.chain.HeaderHeight() < p.LastBlockIndex() { return s.requestHeaders(p) } diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 80ce6c695..84fb00c4c 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -22,6 +22,7 @@ const ( verAckReceived requestQueueSize = 32 + p2pMsgQueueSize = 16 hpRequestQueueSize = 4 ) @@ -47,9 +48,10 @@ type TCPPeer struct { finale sync.Once handShake handShakeStage - done chan struct{} - sendQ chan []byte - hpSendQ chan []byte + done chan struct{} + sendQ chan []byte + p2pSendQ chan []byte + hpSendQ chan []byte wg sync.WaitGroup @@ -61,41 +63,60 @@ type TCPPeer struct { // NewTCPPeer returns a TCPPeer structure based on the given connection. func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { return &TCPPeer{ - conn: conn, - server: s, - done: make(chan struct{}), - sendQ: make(chan []byte, requestQueueSize), - hpSendQ: make(chan []byte, hpRequestQueueSize), + conn: conn, + server: s, + done: make(chan struct{}), + sendQ: make(chan []byte, requestQueueSize), + p2pSendQ: make(chan []byte, p2pMsgQueueSize), + hpSendQ: make(chan []byte, hpRequestQueueSize), } } +// putPacketIntoQueue puts given message into the given queue if the peer has +// done handshaking. +func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, msg []byte) error { + if !p.Handshaked() { + return errStateMismatch + } + queue <- msg + return nil +} + // EnqueuePacket implements the Peer interface. func (p *TCPPeer) EnqueuePacket(msg []byte) error { - if !p.Handshaked() { - return errStateMismatch + return p.putPacketIntoQueue(p.sendQ, msg) +} + +// putMessageIntoQueue serializes given Message and puts it into given queue if +// the peer has done handshaking. +func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error { + b, err := msg.Bytes() + if err != nil { + return err } - p.sendQ <- msg - return nil + return p.putPacketIntoQueue(queue, b) } // EnqueueMessage is a temporary wrapper that sends a message via // EnqueuePacket if there is no error in serializing it. func (p *TCPPeer) EnqueueMessage(msg *Message) error { - b, err := msg.Bytes() - if err != nil { - return err - } - return p.EnqueuePacket(b) + return p.putMsgIntoQueue(p.sendQ, msg) +} + +// EnqueueP2PPacket implements the Peer interface. +func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error { + return p.putPacketIntoQueue(p.p2pSendQ, msg) +} + +// EnqueueP2PMessage implements the Peer interface. +func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error { + return p.putMsgIntoQueue(p.p2pSendQ, msg) } // EnqueueHPPacket implements the Peer interface. It the peer is not yet // handshaked it's a noop. func (p *TCPPeer) EnqueueHPPacket(msg []byte) error { - if !p.Handshaked() { - return errStateMismatch - } - p.hpSendQ <- msg - return nil + return p.putPacketIntoQueue(p.hpSendQ, msg) } func (p *TCPPeer) writeMsg(msg *Message) error { @@ -158,13 +179,24 @@ func (p *TCPPeer) handleQueues() { default: } - // If there is no message in the hp queue, block until one + if msg == nil { + // Then look at the p2p queue. + select { + case <-p.done: + return + case msg = <-p.hpSendQ: + case msg = <-p.p2pSendQ: + default: + } + } + // If there is no message in HP or P2P queues, block until one // appears in any of the queues. if msg == nil { select { case <-p.done: return case msg = <-p.hpSendQ: + case msg = <-p.p2pSendQ: case msg = <-p.sendQ: } }