Merge pull request #1637 from nspcc-dev/broadcast

network: support non-blocking broadcast
This commit is contained in:
Roman Khimov 2020-12-25 14:50:34 +03:00 committed by GitHub
commit f11a223709
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 94 additions and 26 deletions

View file

@ -415,18 +415,18 @@ func (p *localPeer) EnqueueMessage(msg *Message) error {
if err != nil { if err != nil {
return err return err
} }
return p.EnqueuePacket(b) return p.EnqueuePacket(true, b)
} }
func (p *localPeer) EnqueuePacket(m []byte) error { func (p *localPeer) EnqueuePacket(block bool, m []byte) error {
return p.EnqueueHPPacket(m) return p.EnqueueHPPacket(block, m)
} }
func (p *localPeer) EnqueueP2PMessage(msg *Message) error { func (p *localPeer) EnqueueP2PMessage(msg *Message) error {
return p.EnqueueMessage(msg) return p.EnqueueMessage(msg)
} }
func (p *localPeer) EnqueueP2PPacket(m []byte) error { func (p *localPeer) EnqueueP2PPacket(m []byte) error {
return p.EnqueueHPPacket(m) return p.EnqueueHPPacket(true, m)
} }
func (p *localPeer) EnqueueHPPacket(m []byte) error { func (p *localPeer) EnqueueHPPacket(_ bool, m []byte) error {
msg := &Message{Network: netmode.UnitTestNet} msg := &Message{Network: netmode.UnitTestNet}
r := io.NewBinReaderFromBuf(m) r := io.NewBinReaderFromBuf(m)
err := msg.Decode(r) err := msg.Decode(r)

View file

@ -216,7 +216,7 @@ func (m *Message) tryCompressPayload() error {
compressedPayload := buf.Bytes() compressedPayload := buf.Bytes()
if m.Flags&Compressed == 0 { if m.Flags&Compressed == 0 {
switch m.Payload.(type) { switch m.Payload.(type) {
case *payload.Headers, *payload.MerkleBlock, *payload.NullPayload, case *payload.Headers, *payload.MerkleBlock, payload.NullPayload,
*payload.Inventory: *payload.Inventory:
break break
default: default:

View file

@ -28,7 +28,7 @@ type Peer interface {
// can be shared with other queues (so that message marshalling can be // can be shared with other queues (so that message marshalling can be
// done once for all peers). Does nothing is the peer is not yet // done once for all peers). Does nothing is the peer is not yet
// completed handshaking. // completed handshaking.
EnqueuePacket([]byte) error EnqueuePacket(bool, []byte) error
// EnqueueP2PMessage is a temporary wrapper that sends a message via // EnqueueP2PMessage is a temporary wrapper that sends a message via
// EnqueueP2PPacket if there is no error in serializing it. // EnqueueP2PPacket if there is no error in serializing it.
@ -47,7 +47,7 @@ type Peer interface {
// EnqueueHPPacket is a blocking high priority packet enqueuer, it // EnqueueHPPacket is a blocking high priority packet enqueuer, it
// doesn't return until it puts given packet into the high-priority // doesn't return until it puts given packet into the high-priority
// queue. // queue.
EnqueueHPPacket([]byte) error EnqueueHPPacket(bool, []byte) error
Version() *payload.Version Version() *payload.Version
LastBlockIndex() uint32 LastBlockIndex() uint32
Handshaked() bool Handshaked() bool

View file

@ -293,7 +293,28 @@ func (s *Server) run() {
addr := drop.peer.PeerAddr().String() addr := drop.peer.PeerAddr().String()
if drop.reason == errIdenticalID { if drop.reason == errIdenticalID {
s.discovery.RegisterBadAddr(addr) s.discovery.RegisterBadAddr(addr)
} else if drop.reason != errAlreadyConnected { } else if drop.reason == errAlreadyConnected {
// There is a race condition when peer can be disconnected twice for the this reason
// which can lead to no connections to peer at all. Here we check for such a possibility.
stillConnected := false
s.lock.RLock()
verDrop := drop.peer.Version()
addr := drop.peer.PeerAddr().String()
if verDrop != nil {
for peer := range s.peers {
ver := peer.Version()
// Already connected, drop this connection.
if ver != nil && ver.Nonce == verDrop.Nonce && peer.PeerAddr().String() == addr {
stillConnected = true
}
}
}
s.lock.RUnlock()
if !stillConnected {
s.discovery.UnregisterConnectedAddr(addr)
s.discovery.BackFill(addr)
}
} else {
s.discovery.UnregisterConnectedAddr(addr) s.discovery.UnregisterConnectedAddr(addr)
s.discovery.BackFill(addr) s.discovery.BackFill(addr)
} }
@ -474,7 +495,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {
} }
} }
s.lock.RUnlock() s.lock.RUnlock()
return p.SendVersionAck(NewMessage(CMDVerack, nil)) return p.SendVersionAck(NewMessage(CMDVerack, payload.NewNullPayload()))
} }
// handleBlockCmd processes the received block received from its peer. // handleBlockCmd processes the received block received from its peer.
@ -537,7 +558,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
return err return err
} }
if inv.Type == payload.ConsensusType { if inv.Type == payload.ConsensusType {
return p.EnqueueHPPacket(pkt) return p.EnqueueHPPacket(true, pkt)
} }
return p.EnqueueP2PPacket(pkt) return p.EnqueueP2PPacket(pkt)
} }
@ -599,7 +620,7 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
pkt, err := msg.Bytes() pkt, err := msg.Bytes()
if err == nil { if err == nil {
if inv.Type == payload.ConsensusType { if inv.Type == payload.ConsensusType {
err = p.EnqueueHPPacket(pkt) err = p.EnqueueHPPacket(true, pkt)
} else { } else {
err = p.EnqueueP2PPacket(pkt) err = p.EnqueueP2PPacket(pkt)
} }
@ -943,7 +964,7 @@ func (s *Server) requestTx(hashes ...util.Uint256) {
// iteratePeersWithSendMsg sends given message to all peers using two functions // iteratePeersWithSendMsg sends given message to all peers using two functions
// passed, one is to send the message and the other is to filtrate peers (the // passed, one is to send the message and the other is to filtrate peers (the
// peer is considered invalid if it returns false). // peer is considered invalid if it returns false).
func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, []byte) error, peerOK func(Peer) bool) { func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) {
// Get a copy of s.peers to avoid holding a lock while sending. // Get a copy of s.peers to avoid holding a lock while sending.
peers := s.Peers() peers := s.Peers()
if len(peers) == 0 { if len(peers) == 0 {
@ -953,15 +974,46 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, []byte) e
if err != nil { if err != nil {
return return
} }
success := make(map[Peer]bool, len(peers))
okCount := 0
sentCount := 0
for peer := range peers { for peer := range peers {
if peerOK != nil && !peerOK(peer) { if peerOK != nil && !peerOK(peer) {
success[peer] = false
continue
}
okCount++
if err := send(peer, false, pkt); err != nil {
continue continue
} }
if msg.Command == CMDGetAddr { if msg.Command == CMDGetAddr {
peer.AddGetAddrSent() peer.AddGetAddrSent()
} }
// Who cares about these messages anyway? success[peer] = true
_ = send(peer, pkt) sentCount++
}
// Send to at least 2/3 of good peers.
if 3*sentCount >= 2*okCount {
return
}
// Perform blocking send now.
for peer := range peers {
if _, ok := success[peer]; ok || peerOK != nil && !peerOK(peer) {
continue
}
if err := send(peer, true, pkt); err != nil {
continue
}
if msg.Command == CMDGetAddr {
peer.AddGetAddrSent()
}
sentCount++
if 3*sentCount >= 2*okCount {
return
}
} }
} }

View file

@ -30,6 +30,7 @@ const (
var ( var (
errGone = errors.New("the peer is gone already") errGone = errors.New("the peer is gone already")
errBusy = errors.New("peer is busy")
errStateMismatch = errors.New("tried to send protocol message before handshake completed") errStateMismatch = errors.New("tried to send protocol message before handshake completed")
errPingPong = errors.New("ping/pong timeout") errPingPong = errors.New("ping/pong timeout")
errUnexpectedPong = errors.New("pong message wasn't expected") errUnexpectedPong = errors.New("pong message wasn't expected")
@ -81,21 +82,31 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
// putPacketIntoQueue puts given message into the given queue if the peer has // putPacketIntoQueue puts given message into the given queue if the peer has
// done handshaking. // done handshaking.
func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, msg []byte) error { func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, block bool, msg []byte) error {
if !p.Handshaked() { if !p.Handshaked() {
return errStateMismatch return errStateMismatch
} }
select { if block {
case queue <- msg: select {
case <-p.done: case queue <- msg:
return errGone case <-p.done:
return errGone
}
} else {
select {
case queue <- msg:
case <-p.done:
return errGone
default:
return errBusy
}
} }
return nil return nil
} }
// EnqueuePacket implements the Peer interface. // EnqueuePacket implements the Peer interface.
func (p *TCPPeer) EnqueuePacket(msg []byte) error { func (p *TCPPeer) EnqueuePacket(block bool, msg []byte) error {
return p.putPacketIntoQueue(p.sendQ, msg) return p.putPacketIntoQueue(p.sendQ, block, msg)
} }
// putMessageIntoQueue serializes given Message and puts it into given queue if // putMessageIntoQueue serializes given Message and puts it into given queue if
@ -105,7 +116,7 @@ func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error {
if err != nil { if err != nil {
return err return err
} }
return p.putPacketIntoQueue(queue, b) return p.putPacketIntoQueue(queue, true, b)
} }
// EnqueueMessage is a temporary wrapper that sends a message via // EnqueueMessage is a temporary wrapper that sends a message via
@ -116,7 +127,7 @@ func (p *TCPPeer) EnqueueMessage(msg *Message) error {
// EnqueueP2PPacket implements the Peer interface. // EnqueueP2PPacket implements the Peer interface.
func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error { func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error {
return p.putPacketIntoQueue(p.p2pSendQ, msg) return p.putPacketIntoQueue(p.p2pSendQ, true, msg)
} }
// EnqueueP2PMessage implements the Peer interface. // EnqueueP2PMessage implements the Peer interface.
@ -126,8 +137,8 @@ func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error {
// EnqueueHPPacket implements the Peer interface. It the peer is not yet // EnqueueHPPacket implements the Peer interface. It the peer is not yet
// handshaked it's a noop. // handshaked it's a noop.
func (p *TCPPeer) EnqueueHPPacket(msg []byte) error { func (p *TCPPeer) EnqueueHPPacket(block bool, msg []byte) error {
return p.putPacketIntoQueue(p.hpSendQ, msg) return p.putPacketIntoQueue(p.hpSendQ, block, msg)
} }
func (p *TCPPeer) writeMsg(msg *Message) error { func (p *TCPPeer) writeMsg(msg *Message) error {
@ -184,6 +195,7 @@ func (p *TCPPeer) handleQueues() {
var p2pSkipCounter uint32 var p2pSkipCounter uint32
const p2pSkipDivisor = 4 const p2pSkipDivisor = 4
var writeTimeout = time.Duration(p.server.chain.GetConfig().SecondsPerBlock) * time.Second
for { for {
var msg []byte var msg []byte
@ -217,6 +229,10 @@ func (p *TCPPeer) handleQueues() {
case msg = <-p.sendQ: case msg = <-p.sendQ:
} }
} }
err = p.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if err != nil {
break
}
_, err = p.conn.Write(msg) _, err = p.conn.Write(msg)
if err != nil { if err != nil {
break break