diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 04b54959a..77db5f08a 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -1,6 +1,7 @@ package network import ( + "context" "fmt" "net" "sync" @@ -109,18 +110,21 @@ func (p *localPeer) EnqueueMessage(msg *Message) error { if err != nil { return err } - return p.EnqueuePacket(true, b) + return p.EnqueueHPPacket(b) } -func (p *localPeer) EnqueuePacket(block bool, m []byte) error { - return p.EnqueueHPPacket(block, m) +func (p *localPeer) BroadcastPacket(_ context.Context, m []byte) error { + return p.EnqueueHPPacket(m) } func (p *localPeer) EnqueueP2PMessage(msg *Message) error { return p.EnqueueMessage(msg) } func (p *localPeer) EnqueueP2PPacket(m []byte) error { - return p.EnqueueHPPacket(true, m) + return p.EnqueueHPPacket(m) } -func (p *localPeer) EnqueueHPPacket(_ bool, m []byte) error { +func (p *localPeer) BroadcastHPPacket(_ context.Context, m []byte) error { + return p.EnqueueHPPacket(m) +} +func (p *localPeer) EnqueueHPPacket(m []byte) error { msg := &Message{} r := io.NewBinReaderFromBuf(m) err := msg.Decode(r) diff --git a/pkg/network/peer.go b/pkg/network/peer.go index ca0127d9d..b4cdcbf1f 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -1,6 +1,7 @@ package network import ( + "context" "net" "github.com/nspcc-dev/neo-go/pkg/network/payload" @@ -23,12 +24,17 @@ type Peer interface { // EnqueuePacket if there is no error in serializing it. EnqueueMessage(*Message) error - // EnqueuePacket is a blocking packet enqueuer, it doesn't return until - // it puts the given packet into the queue. It accepts a slice of bytes that + // BroadcastPacket is a context-bound packet enqueuer, it either puts the + // given packet into the queue or exits with errors if the context expires + // or peer disconnects. It accepts a slice of bytes that // can be shared with other queues (so that message marshalling can be - // done once for all peers). It does nothing if the peer has not yet + // done once for all peers). It returns an error if the peer has not yet // completed handshaking. - EnqueuePacket(bool, []byte) error + BroadcastPacket(context.Context, []byte) error + + // BroadcastHPPacket is the same as BroadcastPacket, but uses a high-priority + // queue. + BroadcastHPPacket(context.Context, []byte) error // EnqueueP2PMessage is a temporary wrapper that sends a message via // EnqueueP2PPacket if there is no error in serializing it. @@ -40,14 +46,14 @@ type Peer interface { // done once for all peers). It does nothing if the peer has not yet // completed handshaking. This queue is intended to be used for unicast // peer to peer communication that is more important than broadcasts - // (handled by EnqueuePacket) but less important than high-priority - // messages (handled by EnqueueHPPacket). + // (handled by BroadcastPacket) but less important than high-priority + // messages (handled by EnqueueHPPacket and BroadcastHPPacket). EnqueueP2PPacket([]byte) error // EnqueueHPPacket is a blocking high priority packet enqueuer, it // doesn't return until it puts the given packet into the high-priority // queue. - EnqueueHPPacket(bool, []byte) error + EnqueueHPPacket([]byte) error Version() *payload.Version LastBlockIndex() uint32 Handshaked() bool diff --git a/pkg/network/server.go b/pkg/network/server.go index 5f8d4f4ea..240f8125f 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1,6 +1,7 @@ package network import ( + "context" "crypto/rand" "encoding/binary" "errors" @@ -755,7 +756,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { return err } if inv.Type == payload.ExtensibleType { - return p.EnqueueHPPacket(true, pkt) + return p.EnqueueHPPacket(pkt) } return p.EnqueueP2PPacket(pkt) } @@ -817,7 +818,7 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { pkt, err := msg.Bytes() if err == nil { if inv.Type == payload.ExtensibleType { - err = p.EnqueueHPPacket(true, pkt) + err = p.EnqueueHPPacket(pkt) } else { err = p.EnqueueP2PPacket(pkt) } @@ -1348,7 +1349,7 @@ func (s *Server) RequestTx(hashes ...util.Uint256) { // iteratePeersWithSendMsg sends the given message to all peers using two functions // passed, one is to send the message and the other is to filtrate peers (the // peer is considered invalid if it returns false). -func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) { +func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.Context, []byte) error, peerOK func(Peer) bool) { var deadN, peerN, sentN int // Get a copy of s.peers to avoid holding a lock while sending. @@ -1357,53 +1358,48 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b if peerN == 0 { return } - mrand.Shuffle(peerN, func(i, j int) { - peers[i], peers[j] = peers[j], peers[i] - }) pkt, err := msg.Bytes() if err != nil { return } - // If true, this node isn't counted any more, either it's dead or we - // have already sent an Inv to it. - finished := make([]bool, peerN) - - // Try non-blocking sends first and only block if have to. - for _, blocking := range []bool{false, true} { - for i, peer := range peers { - // Send to 2/3 of good peers. - if 3*sentN >= 2*(peerN-deadN) { - return + var replies = make(chan error, peerN) // Cache is there just to make goroutines exit faster. + var ctx, cancel = context.WithTimeout(context.Background(), s.TimePerBlock/2) + for _, peer := range peers { + go func(p Peer, ctx context.Context, pkt []byte) { + err := send(p, ctx, pkt) + if err == nil && msg.Command == CMDGetAddr { + p.AddGetAddrSent() } - if finished[i] { - continue - } - err := send(peer, blocking, pkt) - if err == nil { - if msg.Command == CMDGetAddr { - peer.AddGetAddrSent() - } - sentN++ - } else if !blocking && errors.Is(err, errBusy) { - // Can be retried. - continue - } else { - deadN++ - } - finished[i] = true + replies <- err + }(peer, ctx, pkt) + } + for r := range replies { + if r == nil { + sentN++ + } else { + deadN++ + } + if sentN+deadN == peerN { + break + } + // Send to 2/3 of good peers. + if 3*sentN >= 2*(peerN-deadN) && ctx.Err() == nil { + cancel() } } + cancel() + close(replies) } // broadcastMessage sends the message to all available peers. func (s *Server) broadcastMessage(msg *Message) { - s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, nil) + s.iteratePeersWithSendMsg(msg, Peer.BroadcastPacket, nil) } // broadcastHPMessage sends the high-priority message to all available peers. func (s *Server) broadcastHPMessage(msg *Message) { - s.iteratePeersWithSendMsg(msg, Peer.EnqueueHPPacket, nil) + s.iteratePeersWithSendMsg(msg, Peer.BroadcastHPPacket, nil) } // relayBlocksLoop subscribes to new blocks in the ledger and broadcasts them @@ -1421,7 +1417,7 @@ mainloop: msg := NewMessage(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) // Filter out nodes that are more current (avoid spamming the network // during initial sync). - s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { + s.iteratePeersWithSendMsg(msg, Peer.BroadcastPacket, func(p Peer) bool { return p.Handshaked() && p.LastBlockIndex() < b.Index }) s.extensiblePool.RemoveStale(b.Index) @@ -1467,7 +1463,7 @@ func (s *Server) broadcastTxHashes(hs []util.Uint256) { // We need to filter out non-relaying nodes, so plain broadcast // functions don't fit here. - s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, Peer.IsFullNode) + s.iteratePeersWithSendMsg(msg, Peer.BroadcastPacket, Peer.IsFullNode) } // initStaleMemPools initializes mempools for stale tx/payload processing. diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 6d7dc8bf2..330e2268f 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -1,6 +1,7 @@ package network import ( + "context" "errors" "fmt" "net" @@ -31,7 +32,6 @@ const ( var ( errGone = errors.New("the peer is gone already") - errBusy = errors.New("peer is busy") errStateMismatch = errors.New("tried to send protocol message before handshake completed") errPingPong = errors.New("ping/pong timeout") errUnexpectedPong = errors.New("pong message wasn't expected") @@ -81,40 +81,45 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { } } -// putPacketIntoQueue puts the given message into the given queue if the peer has -// done handshaking. -func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, block bool, msg []byte) error { +// putBroadcastPacketIntoQueue puts the given message into the given queue if +// the peer has done handshaking using the given context. +func (p *TCPPeer) putBroadcastPacketIntoQueue(ctx context.Context, queue chan<- []byte, msg []byte) error { if !p.Handshaked() { return errStateMismatch } - var ret error - if block { - timer := time.NewTimer(p.server.TimePerBlock / 2) - select { - case queue <- msg: - case <-p.done: - ret = errGone - case <-timer.C: - ret = errBusy - } - if !errors.Is(ret, errBusy) && !timer.Stop() { - <-timer.C - } - } else { - select { - case queue <- msg: - case <-p.done: - return errGone - default: - return errBusy - } + select { + case queue <- msg: + case <-p.done: + return errGone + case <-ctx.Done(): + return ctx.Err() } - return ret + return nil } -// EnqueuePacket implements the Peer interface. -func (p *TCPPeer) EnqueuePacket(block bool, msg []byte) error { - return p.putPacketIntoQueue(p.sendQ, block, msg) +// putPacketIntoQueue puts the given message into the given queue if the peer has +// done handshaking. +func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, msg []byte) error { + if !p.Handshaked() { + return errStateMismatch + } + select { + case queue <- msg: + case <-p.done: + return errGone + } + return nil +} + +// BroadcastPacket implements the Peer interface. +func (p *TCPPeer) BroadcastPacket(ctx context.Context, msg []byte) error { + return p.putBroadcastPacketIntoQueue(ctx, p.sendQ, msg) +} + +// BroadcastHPPacket implements the Peer interface. It the peer is not yet +// handshaked it's a noop. +func (p *TCPPeer) BroadcastHPPacket(ctx context.Context, msg []byte) error { + return p.putBroadcastPacketIntoQueue(ctx, p.hpSendQ, msg) } // putMessageIntoQueue serializes the given Message and puts it into given queue if @@ -124,7 +129,7 @@ func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error { if err != nil { return err } - return p.putPacketIntoQueue(queue, true, b) + return p.putPacketIntoQueue(queue, b) } // EnqueueMessage is a temporary wrapper that sends a message via @@ -135,7 +140,7 @@ func (p *TCPPeer) EnqueueMessage(msg *Message) error { // EnqueueP2PPacket implements the Peer interface. func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error { - return p.putPacketIntoQueue(p.p2pSendQ, true, msg) + return p.putPacketIntoQueue(p.p2pSendQ, msg) } // EnqueueP2PMessage implements the Peer interface. @@ -145,8 +150,8 @@ func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error { // EnqueueHPPacket implements the Peer interface. It the peer is not yet // handshaked it's a noop. -func (p *TCPPeer) EnqueueHPPacket(block bool, msg []byte) error { - return p.putPacketIntoQueue(p.hpSendQ, block, msg) +func (p *TCPPeer) EnqueueHPPacket(msg []byte) error { + return p.putPacketIntoQueue(p.hpSendQ, msg) } func (p *TCPPeer) writeMsg(msg *Message) error {