diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 9b52a2d91..716ba2afa 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -415,18 +415,18 @@ func (p *localPeer) EnqueueMessage(msg *Message) error { if err != nil { return err } - return p.EnqueuePacket(b) + return p.EnqueuePacket(true, b) } -func (p *localPeer) EnqueuePacket(m []byte) error { - return p.EnqueueHPPacket(m) +func (p *localPeer) EnqueuePacket(block bool, m []byte) error { + return p.EnqueueHPPacket(block, m) } func (p *localPeer) EnqueueP2PMessage(msg *Message) error { return p.EnqueueMessage(msg) } func (p *localPeer) EnqueueP2PPacket(m []byte) error { - return p.EnqueueHPPacket(m) + return p.EnqueueHPPacket(true, m) } -func (p *localPeer) EnqueueHPPacket(m []byte) error { +func (p *localPeer) EnqueueHPPacket(_ bool, m []byte) error { msg := &Message{Network: netmode.UnitTestNet} r := io.NewBinReaderFromBuf(m) err := msg.Decode(r) diff --git a/pkg/network/peer.go b/pkg/network/peer.go index fb06c59d2..b2df6ff69 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -28,7 +28,7 @@ type Peer interface { // can be shared with other queues (so that message marshalling can be // done once for all peers). Does nothing is the peer is not yet // completed handshaking. - EnqueuePacket([]byte) error + EnqueuePacket(bool, []byte) error // EnqueueP2PMessage is a temporary wrapper that sends a message via // EnqueueP2PPacket if there is no error in serializing it. @@ -47,7 +47,7 @@ type Peer interface { // EnqueueHPPacket is a blocking high priority packet enqueuer, it // doesn't return until it puts given packet into the high-priority // queue. - EnqueueHPPacket([]byte) error + EnqueueHPPacket(bool, []byte) error Version() *payload.Version LastBlockIndex() uint32 Handshaked() bool diff --git a/pkg/network/server.go b/pkg/network/server.go index 8beeea9d2..e4721f1e0 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -537,7 +537,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { return err } if inv.Type == payload.ConsensusType { - return p.EnqueueHPPacket(pkt) + return p.EnqueueHPPacket(true, pkt) } return p.EnqueueP2PPacket(pkt) } @@ -599,7 +599,7 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { pkt, err := msg.Bytes() if err == nil { if inv.Type == payload.ConsensusType { - err = p.EnqueueHPPacket(pkt) + err = p.EnqueueHPPacket(true, pkt) } else { err = p.EnqueueP2PPacket(pkt) } @@ -943,7 +943,7 @@ func (s *Server) requestTx(hashes ...util.Uint256) { // iteratePeersWithSendMsg sends given message to all peers using two functions // passed, one is to send the message and the other is to filtrate peers (the // peer is considered invalid if it returns false). -func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, []byte) error, peerOK func(Peer) bool) { +func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) { // Get a copy of s.peers to avoid holding a lock while sending. peers := s.Peers() if len(peers) == 0 { @@ -953,15 +953,46 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, []byte) e if err != nil { return } + + success := make(map[Peer]bool, len(peers)) + okCount := 0 + sentCount := 0 for peer := range peers { if peerOK != nil && !peerOK(peer) { + success[peer] = false + continue + } + okCount++ + if err := send(peer, false, pkt); err != nil { continue } if msg.Command == CMDGetAddr { peer.AddGetAddrSent() } - // Who cares about these messages anyway? - _ = send(peer, pkt) + success[peer] = true + sentCount++ + } + + // Send to at least 2/3 of good peers. + if 3*sentCount >= 2*okCount { + return + } + + // Perform blocking send now. + for peer := range peers { + if _, ok := success[peer]; ok || peerOK != nil && !peerOK(peer) { + continue + } + if err := send(peer, true, pkt); err != nil { + continue + } + if msg.Command == CMDGetAddr { + peer.AddGetAddrSent() + } + sentCount++ + if 3*sentCount >= 2*okCount { + return + } } } diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 67329c161..f55a3954b 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -30,6 +30,7 @@ const ( var ( errGone = errors.New("the peer is gone already") + errBusy = errors.New("peer is busy") errStateMismatch = errors.New("tried to send protocol message before handshake completed") errPingPong = errors.New("ping/pong timeout") errUnexpectedPong = errors.New("pong message wasn't expected") @@ -81,21 +82,31 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { // putPacketIntoQueue puts given message into the given queue if the peer has // done handshaking. -func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, msg []byte) error { +func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, block bool, msg []byte) error { if !p.Handshaked() { return errStateMismatch } - select { - case queue <- msg: - case <-p.done: - return errGone + if block { + select { + case queue <- msg: + case <-p.done: + return errGone + } + } else { + select { + case queue <- msg: + case <-p.done: + return errGone + default: + return errBusy + } } return nil } // EnqueuePacket implements the Peer interface. -func (p *TCPPeer) EnqueuePacket(msg []byte) error { - return p.putPacketIntoQueue(p.sendQ, msg) +func (p *TCPPeer) EnqueuePacket(block bool, msg []byte) error { + return p.putPacketIntoQueue(p.sendQ, block, msg) } // putMessageIntoQueue serializes given Message and puts it into given queue if @@ -105,7 +116,7 @@ func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error { if err != nil { return err } - return p.putPacketIntoQueue(queue, b) + return p.putPacketIntoQueue(queue, true, b) } // EnqueueMessage is a temporary wrapper that sends a message via @@ -116,7 +127,7 @@ func (p *TCPPeer) EnqueueMessage(msg *Message) error { // EnqueueP2PPacket implements the Peer interface. func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error { - return p.putPacketIntoQueue(p.p2pSendQ, msg) + return p.putPacketIntoQueue(p.p2pSendQ, true, msg) } // EnqueueP2PMessage implements the Peer interface. @@ -126,8 +137,8 @@ func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error { // EnqueueHPPacket implements the Peer interface. It the peer is not yet // handshaked it's a noop. -func (p *TCPPeer) EnqueueHPPacket(msg []byte) error { - return p.putPacketIntoQueue(p.hpSendQ, msg) +func (p *TCPPeer) EnqueueHPPacket(block bool, msg []byte) error { + return p.putPacketIntoQueue(p.hpSendQ, block, msg) } func (p *TCPPeer) writeMsg(msg *Message) error {