Merge pull request #2741 from nspcc-dev/separate-broadcast-queue-handling

Rework broadcast logic
This commit is contained in:
Roman Khimov 2022-10-12 16:33:27 +07:00 committed by GitHub
commit ec4983e88e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 84 deletions

View file

@ -1,6 +1,7 @@
package network
import (
"context"
"fmt"
"net"
"sync"
@ -109,18 +110,21 @@ func (p *localPeer) EnqueueMessage(msg *Message) error {
if err != nil {
return err
}
return p.EnqueuePacket(true, b)
return p.EnqueueHPPacket(b)
}
func (p *localPeer) EnqueuePacket(block bool, m []byte) error {
return p.EnqueueHPPacket(block, m)
func (p *localPeer) BroadcastPacket(_ context.Context, m []byte) error {
return p.EnqueueHPPacket(m)
}
func (p *localPeer) EnqueueP2PMessage(msg *Message) error {
return p.EnqueueMessage(msg)
}
func (p *localPeer) EnqueueP2PPacket(m []byte) error {
return p.EnqueueHPPacket(true, m)
return p.EnqueueHPPacket(m)
}
func (p *localPeer) EnqueueHPPacket(_ bool, m []byte) error {
func (p *localPeer) BroadcastHPPacket(_ context.Context, m []byte) error {
return p.EnqueueHPPacket(m)
}
func (p *localPeer) EnqueueHPPacket(m []byte) error {
msg := &Message{}
r := io.NewBinReaderFromBuf(m)
err := msg.Decode(r)

View file

@ -1,6 +1,7 @@
package network
import (
"context"
"net"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
@ -19,16 +20,21 @@ type Peer interface {
PeerAddr() net.Addr
Disconnect(error)
// EnqueueMessage is a temporary wrapper that sends a message via
// EnqueuePacket if there is no error in serializing it.
// EnqueueMessage is a blocking packet enqueuer similar to EnqueueP2PMessage,
// but using the lowest priority queue.
EnqueueMessage(*Message) error
// EnqueuePacket is a blocking packet enqueuer, it doesn't return until
// it puts the given packet into the queue. It accepts a slice of bytes that
// BroadcastPacket is a context-bound packet enqueuer, it either puts the
// given packet into the queue or exits with errors if the context expires
// or peer disconnects. It accepts a slice of bytes that
// can be shared with other queues (so that message marshalling can be
// done once for all peers). It does nothing if the peer has not yet
// done once for all peers). It returns an error if the peer has not yet
// completed handshaking.
EnqueuePacket(bool, []byte) error
BroadcastPacket(context.Context, []byte) error
// BroadcastHPPacket is the same as BroadcastPacket, but uses a high-priority
// queue.
BroadcastHPPacket(context.Context, []byte) error
// EnqueueP2PMessage is a temporary wrapper that sends a message via
// EnqueueP2PPacket if there is no error in serializing it.
@ -37,17 +43,17 @@ type Peer interface {
// EnqueueP2PPacket is a blocking packet enqueuer, it doesn't return until
// it puts the given packet into the queue. It accepts a slice of bytes that
// can be shared with other queues (so that message marshalling can be
// done once for all peers). It does nothing if the peer has not yet
// done once for all peers). It returns an error if the peer has not yet
// completed handshaking. This queue is intended to be used for unicast
// peer to peer communication that is more important than broadcasts
// (handled by EnqueuePacket) but less important than high-priority
// messages (handled by EnqueueHPPacket).
// (handled by BroadcastPacket) but less important than high-priority
// messages (handled by EnqueueHPPacket and BroadcastHPPacket).
EnqueueP2PPacket([]byte) error
// EnqueueHPPacket is a blocking high priority packet enqueuer, it
// doesn't return until it puts the given packet into the high-priority
// queue.
EnqueueHPPacket(bool, []byte) error
EnqueueHPPacket([]byte) error
Version() *payload.Version
LastBlockIndex() uint32
Handshaked() bool

View file

@ -1,6 +1,7 @@
package network
import (
"context"
"crypto/rand"
"encoding/binary"
"errors"
@ -755,7 +756,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
return err
}
if inv.Type == payload.ExtensibleType {
return p.EnqueueHPPacket(true, pkt)
return p.EnqueueHPPacket(pkt)
}
return p.EnqueueP2PPacket(pkt)
}
@ -817,7 +818,7 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
pkt, err := msg.Bytes()
if err == nil {
if inv.Type == payload.ExtensibleType {
err = p.EnqueueHPPacket(true, pkt)
err = p.EnqueueHPPacket(pkt)
} else {
err = p.EnqueueP2PPacket(pkt)
}
@ -1348,7 +1349,7 @@ func (s *Server) RequestTx(hashes ...util.Uint256) {
// iteratePeersWithSendMsg sends the given message to all peers using two functions
// passed, one is to send the message and the other is to filtrate peers (the
// peer is considered invalid if it returns false).
func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) {
func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.Context, []byte) error, peerOK func(Peer) bool) {
var deadN, peerN, sentN int
// Get a copy of s.peers to avoid holding a lock while sending.
@ -1357,53 +1358,48 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b
if peerN == 0 {
return
}
mrand.Shuffle(peerN, func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
pkt, err := msg.Bytes()
if err != nil {
return
}
// If true, this node isn't counted any more, either it's dead or we
// have already sent an Inv to it.
finished := make([]bool, peerN)
// Try non-blocking sends first and only block if have to.
for _, blocking := range []bool{false, true} {
for i, peer := range peers {
// Send to 2/3 of good peers.
if 3*sentN >= 2*(peerN-deadN) {
return
}
if finished[i] {
continue
}
err := send(peer, blocking, pkt)
if err == nil {
var replies = make(chan error, peerN) // Cache is there just to make goroutines exit faster.
var ctx, cancel = context.WithTimeout(context.Background(), s.TimePerBlock/2)
for _, peer := range peers {
go func(p Peer, ctx context.Context, pkt []byte) {
// Do this before packet is sent, reader thread can get the reply before this routine wakes up.
if msg.Command == CMDGetAddr {
peer.AddGetAddrSent()
p.AddGetAddrSent()
}
replies <- send(p, ctx, pkt)
}(peer, ctx, pkt)
}
for r := range replies {
if r == nil {
sentN++
} else if !blocking && errors.Is(err, errBusy) {
// Can be retried.
continue
} else {
deadN++
}
finished[i] = true
if sentN+deadN == peerN {
break
}
// Send to 2/3 of good peers.
if 3*sentN >= 2*(peerN-deadN) && ctx.Err() == nil {
cancel()
}
}
cancel()
close(replies)
}
// broadcastMessage sends the message to all available peers.
func (s *Server) broadcastMessage(msg *Message) {
s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, nil)
s.iteratePeersWithSendMsg(msg, Peer.BroadcastPacket, nil)
}
// broadcastHPMessage sends the high-priority message to all available peers.
func (s *Server) broadcastHPMessage(msg *Message) {
s.iteratePeersWithSendMsg(msg, Peer.EnqueueHPPacket, nil)
s.iteratePeersWithSendMsg(msg, Peer.BroadcastHPPacket, nil)
}
// relayBlocksLoop subscribes to new blocks in the ledger and broadcasts them
@ -1421,7 +1417,7 @@ mainloop:
msg := NewMessage(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()}))
// Filter out nodes that are more current (avoid spamming the network
// during initial sync).
s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool {
s.iteratePeersWithSendMsg(msg, Peer.BroadcastPacket, func(p Peer) bool {
return p.Handshaked() && p.LastBlockIndex() < b.Index
})
s.extensiblePool.RemoveStale(b.Index)
@ -1467,7 +1463,7 @@ func (s *Server) broadcastTxHashes(hs []util.Uint256) {
// We need to filter out non-relaying nodes, so plain broadcast
// functions don't fit here.
s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, Peer.IsFullNode)
s.iteratePeersWithSendMsg(msg, Peer.BroadcastPacket, Peer.IsFullNode)
}
// initStaleMemPools initializes mempools for stale tx/payload processing.

View file

@ -1,6 +1,7 @@
package network
import (
"context"
"errors"
"fmt"
"net"
@ -31,7 +32,6 @@ const (
var (
errGone = errors.New("the peer is gone already")
errBusy = errors.New("peer is busy")
errStateMismatch = errors.New("tried to send protocol message before handshake completed")
errPingPong = errors.New("ping/pong timeout")
errUnexpectedPong = errors.New("pong message wasn't expected")
@ -81,40 +81,45 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
}
}
// putPacketIntoQueue puts the given message into the given queue if the peer has
// done handshaking.
func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, block bool, msg []byte) error {
// putBroadcastPacketIntoQueue puts the given message into the given queue if
// the peer has done handshaking using the given context.
func (p *TCPPeer) putBroadcastPacketIntoQueue(ctx context.Context, queue chan<- []byte, msg []byte) error {
if !p.Handshaked() {
return errStateMismatch
}
var ret error
if block {
timer := time.NewTimer(p.server.TimePerBlock / 2)
select {
case queue <- msg:
case <-p.done:
ret = errGone
case <-timer.C:
ret = errBusy
}
if !errors.Is(ret, errBusy) && !timer.Stop() {
<-timer.C
}
} else {
select {
case queue <- msg:
case <-p.done:
return errGone
default:
return errBusy
case <-ctx.Done():
return ctx.Err()
}
}
return ret
return nil
}
// EnqueuePacket implements the Peer interface.
func (p *TCPPeer) EnqueuePacket(block bool, msg []byte) error {
return p.putPacketIntoQueue(p.sendQ, block, msg)
// putPacketIntoQueue puts the given message into the given queue if the peer has
// done handshaking.
func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, msg []byte) error {
if !p.Handshaked() {
return errStateMismatch
}
select {
case queue <- msg:
case <-p.done:
return errGone
}
return nil
}
// BroadcastPacket implements the Peer interface.
func (p *TCPPeer) BroadcastPacket(ctx context.Context, msg []byte) error {
return p.putBroadcastPacketIntoQueue(ctx, p.sendQ, msg)
}
// BroadcastHPPacket implements the Peer interface. It the peer is not yet
// handshaked it's a noop.
func (p *TCPPeer) BroadcastHPPacket(ctx context.Context, msg []byte) error {
return p.putBroadcastPacketIntoQueue(ctx, p.hpSendQ, msg)
}
// putMessageIntoQueue serializes the given Message and puts it into given queue if
@ -124,7 +129,7 @@ func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error {
if err != nil {
return err
}
return p.putPacketIntoQueue(queue, true, b)
return p.putPacketIntoQueue(queue, b)
}
// EnqueueMessage is a temporary wrapper that sends a message via
@ -135,7 +140,7 @@ func (p *TCPPeer) EnqueueMessage(msg *Message) error {
// EnqueueP2PPacket implements the Peer interface.
func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error {
return p.putPacketIntoQueue(p.p2pSendQ, true, msg)
return p.putPacketIntoQueue(p.p2pSendQ, msg)
}
// EnqueueP2PMessage implements the Peer interface.
@ -145,8 +150,8 @@ func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error {
// EnqueueHPPacket implements the Peer interface. It the peer is not yet
// handshaked it's a noop.
func (p *TCPPeer) EnqueueHPPacket(block bool, msg []byte) error {
return p.putPacketIntoQueue(p.hpSendQ, block, msg)
func (p *TCPPeer) EnqueueHPPacket(msg []byte) error {
return p.putPacketIntoQueue(p.hpSendQ, msg)
}
func (p *TCPPeer) writeMsg(msg *Message) error {