From e80c60a3b97cda3b80fe3dc999a18312f4a58c7c Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 10 Oct 2022 22:48:06 +0300 Subject: [PATCH] network: rework broadcast logic We have a number of queues for different purposes: * regular broadcast queue * direct p2p queue * high-priority queue And two basic egress scenarios: * direct p2p messages (replies to requests in Server's handle* methods) * broadcasted messages Low priority broadcasted messages: * transaction inventories * block inventories * notary inventories * non-consensus extensibles High-priority broadcasted messages: * consensus extensibles * getdata transaction requests from consensus process * getaddr requests P2P messages are a bit more complicated, most of the time they use p2p queue, but extensible message requests/replies use HP queue. Server's handle* code is run from Peer's handleIncoming, every peer has this thread that handles incoming messages. When working with the peer it's important to reply to requests and blocking this thread until we send (queue) a reply is fine, if the peer is slow we just won't get anything new from it. The queue used is irrelevant wrt this issue. Broadcasted messages are radically different, we want them to be delivered to many peers, but we don't care about specific ones. If it's delivered to 2/3 of the peers we're fine, if it's delivered to more of them --- it's not an issue. But doing this fairly is not an easy thing, current code tries performing unblocked sends and if this doesn't yield enough results it then blocks (but has a timeout, we can't wait indefinitely). But it does so in sequential manner, once the peer is chosen the code will wait for it (and only it) until timeout happens. What can be done instead is an attempt to push the message to all of the peers simultaneously (or close to that). If they all deliver --- OK, if some block and wait then we can wait until _any_ of them pushes the message through (or global timeout happens, we still can't wait forever). If we have enough deliveries then we can cancel pending ones and it's again not an error if these canceled threads still do their job. This makes the system more dynamic and adds some substantial processing overhead, but it's a networking code, any of this overhead is much lower than the actual packet delivery time. It also allows to spread the load more fairly, if there is any spare queue it'll get the packet and release the broadcaster. On the next broadcast iteration another peer is more likely to be chosen just because it didn't get a message previously (and had some time to deliver already queued messages). It works perfectly in tests, with optimal networking conditions we have much better block times and TPS increases by 5-25%% depending on the scenario. I'd go as far as to say that it fixes the original problem of #2678, because in this particular scenario we have empty queues in ~100% of the cases and this new logic will likely lead to 100% fan out in this case (cancelation just won't happen fast enough). But when the load grows and there is some waiting in the queue it will optimize out the slowest links. --- pkg/network/helper_test.go | 14 +++++--- pkg/network/peer.go | 20 +++++++---- pkg/network/server.go | 68 +++++++++++++++++------------------- pkg/network/tcp_peer.go | 71 ++++++++++++++++++++------------------ 4 files changed, 92 insertions(+), 81 deletions(-) diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 04b54959a..77db5f08a 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -1,6 +1,7 @@ package network import ( + "context" "fmt" "net" "sync" @@ -109,18 +110,21 @@ func (p *localPeer) EnqueueMessage(msg *Message) error { if err != nil { return err } - return p.EnqueuePacket(true, b) + return p.EnqueueHPPacket(b) } -func (p *localPeer) EnqueuePacket(block bool, m []byte) error { - return p.EnqueueHPPacket(block, m) +func (p *localPeer) BroadcastPacket(_ context.Context, m []byte) error { + return p.EnqueueHPPacket(m) } func (p *localPeer) EnqueueP2PMessage(msg *Message) error { return p.EnqueueMessage(msg) } func (p *localPeer) EnqueueP2PPacket(m []byte) error { - return p.EnqueueHPPacket(true, m) + return p.EnqueueHPPacket(m) } -func (p *localPeer) EnqueueHPPacket(_ bool, m []byte) error { +func (p *localPeer) BroadcastHPPacket(_ context.Context, m []byte) error { + return p.EnqueueHPPacket(m) +} +func (p *localPeer) EnqueueHPPacket(m []byte) error { msg := &Message{} r := io.NewBinReaderFromBuf(m) err := msg.Decode(r) diff --git a/pkg/network/peer.go b/pkg/network/peer.go index ca0127d9d..b4cdcbf1f 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -1,6 +1,7 @@ package network import ( + "context" "net" "github.com/nspcc-dev/neo-go/pkg/network/payload" @@ -23,12 +24,17 @@ type Peer interface { // EnqueuePacket if there is no error in serializing it. EnqueueMessage(*Message) error - // EnqueuePacket is a blocking packet enqueuer, it doesn't return until - // it puts the given packet into the queue. It accepts a slice of bytes that + // BroadcastPacket is a context-bound packet enqueuer, it either puts the + // given packet into the queue or exits with errors if the context expires + // or peer disconnects. It accepts a slice of bytes that // can be shared with other queues (so that message marshalling can be - // done once for all peers). It does nothing if the peer has not yet + // done once for all peers). It returns an error if the peer has not yet // completed handshaking. - EnqueuePacket(bool, []byte) error + BroadcastPacket(context.Context, []byte) error + + // BroadcastHPPacket is the same as BroadcastPacket, but uses a high-priority + // queue. + BroadcastHPPacket(context.Context, []byte) error // EnqueueP2PMessage is a temporary wrapper that sends a message via // EnqueueP2PPacket if there is no error in serializing it. @@ -40,14 +46,14 @@ type Peer interface { // done once for all peers). It does nothing if the peer has not yet // completed handshaking. This queue is intended to be used for unicast // peer to peer communication that is more important than broadcasts - // (handled by EnqueuePacket) but less important than high-priority - // messages (handled by EnqueueHPPacket). + // (handled by BroadcastPacket) but less important than high-priority + // messages (handled by EnqueueHPPacket and BroadcastHPPacket). EnqueueP2PPacket([]byte) error // EnqueueHPPacket is a blocking high priority packet enqueuer, it // doesn't return until it puts the given packet into the high-priority // queue. - EnqueueHPPacket(bool, []byte) error + EnqueueHPPacket([]byte) error Version() *payload.Version LastBlockIndex() uint32 Handshaked() bool diff --git a/pkg/network/server.go b/pkg/network/server.go index 5f8d4f4ea..240f8125f 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1,6 +1,7 @@ package network import ( + "context" "crypto/rand" "encoding/binary" "errors" @@ -755,7 +756,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { return err } if inv.Type == payload.ExtensibleType { - return p.EnqueueHPPacket(true, pkt) + return p.EnqueueHPPacket(pkt) } return p.EnqueueP2PPacket(pkt) } @@ -817,7 +818,7 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { pkt, err := msg.Bytes() if err == nil { if inv.Type == payload.ExtensibleType { - err = p.EnqueueHPPacket(true, pkt) + err = p.EnqueueHPPacket(pkt) } else { err = p.EnqueueP2PPacket(pkt) } @@ -1348,7 +1349,7 @@ func (s *Server) RequestTx(hashes ...util.Uint256) { // iteratePeersWithSendMsg sends the given message to all peers using two functions // passed, one is to send the message and the other is to filtrate peers (the // peer is considered invalid if it returns false). -func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) { +func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.Context, []byte) error, peerOK func(Peer) bool) { var deadN, peerN, sentN int // Get a copy of s.peers to avoid holding a lock while sending. @@ -1357,53 +1358,48 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b if peerN == 0 { return } - mrand.Shuffle(peerN, func(i, j int) { - peers[i], peers[j] = peers[j], peers[i] - }) pkt, err := msg.Bytes() if err != nil { return } - // If true, this node isn't counted any more, either it's dead or we - // have already sent an Inv to it. - finished := make([]bool, peerN) - - // Try non-blocking sends first and only block if have to. - for _, blocking := range []bool{false, true} { - for i, peer := range peers { - // Send to 2/3 of good peers. - if 3*sentN >= 2*(peerN-deadN) { - return + var replies = make(chan error, peerN) // Cache is there just to make goroutines exit faster. + var ctx, cancel = context.WithTimeout(context.Background(), s.TimePerBlock/2) + for _, peer := range peers { + go func(p Peer, ctx context.Context, pkt []byte) { + err := send(p, ctx, pkt) + if err == nil && msg.Command == CMDGetAddr { + p.AddGetAddrSent() } - if finished[i] { - continue - } - err := send(peer, blocking, pkt) - if err == nil { - if msg.Command == CMDGetAddr { - peer.AddGetAddrSent() - } - sentN++ - } else if !blocking && errors.Is(err, errBusy) { - // Can be retried. - continue - } else { - deadN++ - } - finished[i] = true + replies <- err + }(peer, ctx, pkt) + } + for r := range replies { + if r == nil { + sentN++ + } else { + deadN++ + } + if sentN+deadN == peerN { + break + } + // Send to 2/3 of good peers. + if 3*sentN >= 2*(peerN-deadN) && ctx.Err() == nil { + cancel() } } + cancel() + close(replies) } // broadcastMessage sends the message to all available peers. func (s *Server) broadcastMessage(msg *Message) { - s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, nil) + s.iteratePeersWithSendMsg(msg, Peer.BroadcastPacket, nil) } // broadcastHPMessage sends the high-priority message to all available peers. func (s *Server) broadcastHPMessage(msg *Message) { - s.iteratePeersWithSendMsg(msg, Peer.EnqueueHPPacket, nil) + s.iteratePeersWithSendMsg(msg, Peer.BroadcastHPPacket, nil) } // relayBlocksLoop subscribes to new blocks in the ledger and broadcasts them @@ -1421,7 +1417,7 @@ mainloop: msg := NewMessage(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) // Filter out nodes that are more current (avoid spamming the network // during initial sync). - s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { + s.iteratePeersWithSendMsg(msg, Peer.BroadcastPacket, func(p Peer) bool { return p.Handshaked() && p.LastBlockIndex() < b.Index }) s.extensiblePool.RemoveStale(b.Index) @@ -1467,7 +1463,7 @@ func (s *Server) broadcastTxHashes(hs []util.Uint256) { // We need to filter out non-relaying nodes, so plain broadcast // functions don't fit here. - s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, Peer.IsFullNode) + s.iteratePeersWithSendMsg(msg, Peer.BroadcastPacket, Peer.IsFullNode) } // initStaleMemPools initializes mempools for stale tx/payload processing. diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 6d7dc8bf2..330e2268f 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -1,6 +1,7 @@ package network import ( + "context" "errors" "fmt" "net" @@ -31,7 +32,6 @@ const ( var ( errGone = errors.New("the peer is gone already") - errBusy = errors.New("peer is busy") errStateMismatch = errors.New("tried to send protocol message before handshake completed") errPingPong = errors.New("ping/pong timeout") errUnexpectedPong = errors.New("pong message wasn't expected") @@ -81,40 +81,45 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { } } -// putPacketIntoQueue puts the given message into the given queue if the peer has -// done handshaking. -func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, block bool, msg []byte) error { +// putBroadcastPacketIntoQueue puts the given message into the given queue if +// the peer has done handshaking using the given context. +func (p *TCPPeer) putBroadcastPacketIntoQueue(ctx context.Context, queue chan<- []byte, msg []byte) error { if !p.Handshaked() { return errStateMismatch } - var ret error - if block { - timer := time.NewTimer(p.server.TimePerBlock / 2) - select { - case queue <- msg: - case <-p.done: - ret = errGone - case <-timer.C: - ret = errBusy - } - if !errors.Is(ret, errBusy) && !timer.Stop() { - <-timer.C - } - } else { - select { - case queue <- msg: - case <-p.done: - return errGone - default: - return errBusy - } + select { + case queue <- msg: + case <-p.done: + return errGone + case <-ctx.Done(): + return ctx.Err() } - return ret + return nil } -// EnqueuePacket implements the Peer interface. -func (p *TCPPeer) EnqueuePacket(block bool, msg []byte) error { - return p.putPacketIntoQueue(p.sendQ, block, msg) +// putPacketIntoQueue puts the given message into the given queue if the peer has +// done handshaking. +func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, msg []byte) error { + if !p.Handshaked() { + return errStateMismatch + } + select { + case queue <- msg: + case <-p.done: + return errGone + } + return nil +} + +// BroadcastPacket implements the Peer interface. +func (p *TCPPeer) BroadcastPacket(ctx context.Context, msg []byte) error { + return p.putBroadcastPacketIntoQueue(ctx, p.sendQ, msg) +} + +// BroadcastHPPacket implements the Peer interface. It the peer is not yet +// handshaked it's a noop. +func (p *TCPPeer) BroadcastHPPacket(ctx context.Context, msg []byte) error { + return p.putBroadcastPacketIntoQueue(ctx, p.hpSendQ, msg) } // putMessageIntoQueue serializes the given Message and puts it into given queue if @@ -124,7 +129,7 @@ func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error { if err != nil { return err } - return p.putPacketIntoQueue(queue, true, b) + return p.putPacketIntoQueue(queue, b) } // EnqueueMessage is a temporary wrapper that sends a message via @@ -135,7 +140,7 @@ func (p *TCPPeer) EnqueueMessage(msg *Message) error { // EnqueueP2PPacket implements the Peer interface. func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error { - return p.putPacketIntoQueue(p.p2pSendQ, true, msg) + return p.putPacketIntoQueue(p.p2pSendQ, msg) } // EnqueueP2PMessage implements the Peer interface. @@ -145,8 +150,8 @@ func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error { // EnqueueHPPacket implements the Peer interface. It the peer is not yet // handshaked it's a noop. -func (p *TCPPeer) EnqueueHPPacket(block bool, msg []byte) error { - return p.putPacketIntoQueue(p.hpSendQ, block, msg) +func (p *TCPPeer) EnqueueHPPacket(msg []byte) error { + return p.putPacketIntoQueue(p.hpSendQ, msg) } func (p *TCPPeer) writeMsg(msg *Message) error {