diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 9b52a2d91..716ba2afa 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -415,18 +415,18 @@ func (p *localPeer) EnqueueMessage(msg *Message) error { if err != nil { return err } - return p.EnqueuePacket(b) + return p.EnqueuePacket(true, b) } -func (p *localPeer) EnqueuePacket(m []byte) error { - return p.EnqueueHPPacket(m) +func (p *localPeer) EnqueuePacket(block bool, m []byte) error { + return p.EnqueueHPPacket(block, m) } func (p *localPeer) EnqueueP2PMessage(msg *Message) error { return p.EnqueueMessage(msg) } func (p *localPeer) EnqueueP2PPacket(m []byte) error { - return p.EnqueueHPPacket(m) + return p.EnqueueHPPacket(true, m) } -func (p *localPeer) EnqueueHPPacket(m []byte) error { +func (p *localPeer) EnqueueHPPacket(_ bool, m []byte) error { msg := &Message{Network: netmode.UnitTestNet} r := io.NewBinReaderFromBuf(m) err := msg.Decode(r) diff --git a/pkg/network/message.go b/pkg/network/message.go index 3b73a5422..c8bf606ae 100644 --- a/pkg/network/message.go +++ b/pkg/network/message.go @@ -216,7 +216,7 @@ func (m *Message) tryCompressPayload() error { compressedPayload := buf.Bytes() if m.Flags&Compressed == 0 { switch m.Payload.(type) { - case *payload.Headers, *payload.MerkleBlock, *payload.NullPayload, + case *payload.Headers, *payload.MerkleBlock, payload.NullPayload, *payload.Inventory: break default: diff --git a/pkg/network/peer.go b/pkg/network/peer.go index fb06c59d2..b2df6ff69 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -28,7 +28,7 @@ type Peer interface { // can be shared with other queues (so that message marshalling can be // done once for all peers). Does nothing is the peer is not yet // completed handshaking. - EnqueuePacket([]byte) error + EnqueuePacket(bool, []byte) error // EnqueueP2PMessage is a temporary wrapper that sends a message via // EnqueueP2PPacket if there is no error in serializing it. @@ -47,7 +47,7 @@ type Peer interface { // EnqueueHPPacket is a blocking high priority packet enqueuer, it // doesn't return until it puts given packet into the high-priority // queue. - EnqueueHPPacket([]byte) error + EnqueueHPPacket(bool, []byte) error Version() *payload.Version LastBlockIndex() uint32 Handshaked() bool diff --git a/pkg/network/server.go b/pkg/network/server.go index 8beeea9d2..b8befac11 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -293,7 +293,28 @@ func (s *Server) run() { addr := drop.peer.PeerAddr().String() if drop.reason == errIdenticalID { s.discovery.RegisterBadAddr(addr) - } else if drop.reason != errAlreadyConnected { + } else if drop.reason == errAlreadyConnected { + // There is a race condition when peer can be disconnected twice for the this reason + // which can lead to no connections to peer at all. Here we check for such a possibility. + stillConnected := false + s.lock.RLock() + verDrop := drop.peer.Version() + addr := drop.peer.PeerAddr().String() + if verDrop != nil { + for peer := range s.peers { + ver := peer.Version() + // Already connected, drop this connection. + if ver != nil && ver.Nonce == verDrop.Nonce && peer.PeerAddr().String() == addr { + stillConnected = true + } + } + } + s.lock.RUnlock() + if !stillConnected { + s.discovery.UnregisterConnectedAddr(addr) + s.discovery.BackFill(addr) + } + } else { s.discovery.UnregisterConnectedAddr(addr) s.discovery.BackFill(addr) } @@ -474,7 +495,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { } } s.lock.RUnlock() - return p.SendVersionAck(NewMessage(CMDVerack, nil)) + return p.SendVersionAck(NewMessage(CMDVerack, payload.NewNullPayload())) } // handleBlockCmd processes the received block received from its peer. @@ -537,7 +558,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { return err } if inv.Type == payload.ConsensusType { - return p.EnqueueHPPacket(pkt) + return p.EnqueueHPPacket(true, pkt) } return p.EnqueueP2PPacket(pkt) } @@ -599,7 +620,7 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { pkt, err := msg.Bytes() if err == nil { if inv.Type == payload.ConsensusType { - err = p.EnqueueHPPacket(pkt) + err = p.EnqueueHPPacket(true, pkt) } else { err = p.EnqueueP2PPacket(pkt) } @@ -943,7 +964,7 @@ func (s *Server) requestTx(hashes ...util.Uint256) { // iteratePeersWithSendMsg sends given message to all peers using two functions // passed, one is to send the message and the other is to filtrate peers (the // peer is considered invalid if it returns false). -func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, []byte) error, peerOK func(Peer) bool) { +func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) { // Get a copy of s.peers to avoid holding a lock while sending. peers := s.Peers() if len(peers) == 0 { @@ -953,15 +974,46 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, []byte) e if err != nil { return } + + success := make(map[Peer]bool, len(peers)) + okCount := 0 + sentCount := 0 for peer := range peers { if peerOK != nil && !peerOK(peer) { + success[peer] = false + continue + } + okCount++ + if err := send(peer, false, pkt); err != nil { continue } if msg.Command == CMDGetAddr { peer.AddGetAddrSent() } - // Who cares about these messages anyway? - _ = send(peer, pkt) + success[peer] = true + sentCount++ + } + + // Send to at least 2/3 of good peers. + if 3*sentCount >= 2*okCount { + return + } + + // Perform blocking send now. + for peer := range peers { + if _, ok := success[peer]; ok || peerOK != nil && !peerOK(peer) { + continue + } + if err := send(peer, true, pkt); err != nil { + continue + } + if msg.Command == CMDGetAddr { + peer.AddGetAddrSent() + } + sentCount++ + if 3*sentCount >= 2*okCount { + return + } } } diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 67329c161..f8c29d61d 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -30,6 +30,7 @@ const ( var ( errGone = errors.New("the peer is gone already") + errBusy = errors.New("peer is busy") errStateMismatch = errors.New("tried to send protocol message before handshake completed") errPingPong = errors.New("ping/pong timeout") errUnexpectedPong = errors.New("pong message wasn't expected") @@ -81,21 +82,31 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { // putPacketIntoQueue puts given message into the given queue if the peer has // done handshaking. -func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, msg []byte) error { +func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, block bool, msg []byte) error { if !p.Handshaked() { return errStateMismatch } - select { - case queue <- msg: - case <-p.done: - return errGone + if block { + select { + case queue <- msg: + case <-p.done: + return errGone + } + } else { + select { + case queue <- msg: + case <-p.done: + return errGone + default: + return errBusy + } } return nil } // EnqueuePacket implements the Peer interface. -func (p *TCPPeer) EnqueuePacket(msg []byte) error { - return p.putPacketIntoQueue(p.sendQ, msg) +func (p *TCPPeer) EnqueuePacket(block bool, msg []byte) error { + return p.putPacketIntoQueue(p.sendQ, block, msg) } // putMessageIntoQueue serializes given Message and puts it into given queue if @@ -105,7 +116,7 @@ func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error { if err != nil { return err } - return p.putPacketIntoQueue(queue, b) + return p.putPacketIntoQueue(queue, true, b) } // EnqueueMessage is a temporary wrapper that sends a message via @@ -116,7 +127,7 @@ func (p *TCPPeer) EnqueueMessage(msg *Message) error { // EnqueueP2PPacket implements the Peer interface. func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error { - return p.putPacketIntoQueue(p.p2pSendQ, msg) + return p.putPacketIntoQueue(p.p2pSendQ, true, msg) } // EnqueueP2PMessage implements the Peer interface. @@ -126,8 +137,8 @@ func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error { // EnqueueHPPacket implements the Peer interface. It the peer is not yet // handshaked it's a noop. -func (p *TCPPeer) EnqueueHPPacket(msg []byte) error { - return p.putPacketIntoQueue(p.hpSendQ, msg) +func (p *TCPPeer) EnqueueHPPacket(block bool, msg []byte) error { + return p.putPacketIntoQueue(p.hpSendQ, block, msg) } func (p *TCPPeer) writeMsg(msg *Message) error { @@ -184,6 +195,7 @@ func (p *TCPPeer) handleQueues() { var p2pSkipCounter uint32 const p2pSkipDivisor = 4 + var writeTimeout = time.Duration(p.server.chain.GetConfig().SecondsPerBlock) * time.Second for { var msg []byte @@ -217,6 +229,10 @@ func (p *TCPPeer) handleQueues() { case msg = <-p.sendQ: } } + err = p.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) + if err != nil { + break + } _, err = p.conn.Write(msg) if err != nil { break