From cfb505801884586d943e270fd8be9d7b4c2aed0f Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 21 Oct 2022 10:49:44 +0300 Subject: [PATCH] network: batch getdata replies This is not exactly the protocol-level batching as was tried in #1770 and proposed by neo-project/neo#2365, but it's a TCP-level change in that we now Write() a set of messages and given that Go sets up TCP sockets with TCP_NODELAY by default this is a substantial change, we have less packets generated with the same amount of data. It doesn't change anything on properly connected networks, but the ones with delays benefit from it a lot. This also improves queueing because we no longer generate 32 messages to deliver on transaction's GetData, it's just one stream of bytes with 32 messages inside. Do the same with GetBlocksByIndex, we can have a lot of messages there too. But don't forget about potential peer DoS attacks, if a peer is to request a lot of big blocks we need to flush them before we process the whole set. --- pkg/network/helper_test.go | 18 ++++++++++--- pkg/network/peer.go | 6 +++++ pkg/network/server.go | 52 ++++++++++++++++++++++++++++++-------- pkg/network/tcp_peer.go | 10 ++++++++ 4 files changed, 72 insertions(+), 14 deletions(-) diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index d06b12847..a44ae9f00 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -2,6 +2,7 @@ package network import ( "context" + "errors" "fmt" "net" "sync" @@ -115,17 +116,25 @@ func (p *localPeer) Disconnect(err error) { } func (p *localPeer) BroadcastPacket(_ context.Context, m []byte) error { + if len(m) == 0 { + return errors.New("empty msg") + } msg := &Message{} r := io.NewBinReaderFromBuf(m) - err := msg.Decode(r) - if err == nil { - p.messageHandler(p.t, msg) + for r.Len() > 0 { + err := msg.Decode(r) + if err == nil { + p.messageHandler(p.t, msg) + } } return nil } func (p *localPeer) EnqueueP2PMessage(msg *Message) error { return p.EnqueueHPMessage(msg) } +func (p *localPeer) EnqueueP2PPacket(m []byte) error { + return p.BroadcastPacket(context.TODO(), m) +} func (p *localPeer) BroadcastHPPacket(ctx context.Context, m []byte) error { return p.BroadcastPacket(ctx, m) } @@ -133,6 +142,9 @@ func (p *localPeer) EnqueueHPMessage(msg *Message) error { p.messageHandler(p.t, msg) return nil } +func (p *localPeer) EnqueueHPPacket(m []byte) error { + return p.BroadcastPacket(context.TODO(), m) +} func (p *localPeer) Version() *payload.Version { return p.version } diff --git a/pkg/network/peer.go b/pkg/network/peer.go index 9854165d5..6dfcf16e0 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -39,10 +39,16 @@ type Peer interface { // (handled by BroadcastPacket) but less important than high-priority // messages (handled by EnqueueHPMessage). EnqueueP2PMessage(*Message) error + // EnqueueP2PPacket is similar to EnqueueP2PMessage, but accepts a slice of + // message(s) bytes. + EnqueueP2PPacket([]byte) error // EnqueueHPMessage is similar to EnqueueP2PMessage, but uses a high-priority // queue. EnqueueHPMessage(*Message) error + // EnqueueHPPacket is similar to EnqueueHPMessage, but accepts a slice of + // message(s) bytes. + EnqueueHPPacket([]byte) error Version() *payload.Version LastBlockIndex() uint32 Handshaked() bool diff --git a/pkg/network/server.go b/pkg/network/server.go index 57aa06ffe..49bd63bc0 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -808,7 +808,15 @@ func (s *Server) handleMempoolCmd(p Peer) error { // handleInvCmd processes the received inventory. func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { - var notFound []util.Uint256 + var ( + err error + notFound []util.Uint256 + reply = io.NewBufBinWriter() + send = p.EnqueueP2PPacket + ) + if inv.Type == payload.ExtensibleType { + send = p.EnqueueHPPacket + } for _, hash := range inv.Hashes { var msg *Message @@ -839,19 +847,37 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { } } if msg != nil { - var err error - if inv.Type == payload.ExtensibleType { - err = p.EnqueueHPMessage(msg) - } else { - err = p.EnqueueP2PMessage(msg) - } + err = addMessageToPacket(reply, msg, send) if err != nil { return err } } } if len(notFound) != 0 { - return p.EnqueueP2PMessage(NewMessage(CMDNotFound, payload.NewInventory(inv.Type, notFound))) + err = addMessageToPacket(reply, NewMessage(CMDNotFound, payload.NewInventory(inv.Type, notFound)), send) + if err != nil { + return err + } + } + if reply.Len() == 0 { + return nil + } + return send(reply.Bytes()) +} + +// addMessageToPacket serializes given message into the given buffer and sends whole +// batch if it exceeds MaxSize/2 memory limit (to prevent DoS). +func addMessageToPacket(batch *io.BufBinWriter, msg *Message, send func([]byte) error) error { + err := msg.Encode(batch.BinWriter) + if err != nil { + return err + } + if batch.Len() > payload.MaxSize/2 { + err = send(batch.Bytes()) + if err != nil { + return err + } + batch.Reset() } return nil } @@ -945,6 +971,7 @@ func (s *Server) handleGetBlocksCmd(p Peer, gb *payload.GetBlocks) error { // handleGetBlockByIndexCmd processes the getblockbyindex request. func (s *Server) handleGetBlockByIndexCmd(p Peer, gbd *payload.GetBlockByIndex) error { + var reply = io.NewBufBinWriter() count := gbd.Count if gbd.Count < 0 || gbd.Count > payload.MaxHashesCount { count = payload.MaxHashesCount @@ -958,12 +985,15 @@ func (s *Server) handleGetBlockByIndexCmd(p Peer, gbd *payload.GetBlockByIndex) if err != nil { break } - msg := NewMessage(CMDBlock, b) - if err = p.EnqueueP2PMessage(msg); err != nil { + err = addMessageToPacket(reply, NewMessage(CMDBlock, b), p.EnqueueP2PPacket) + if err != nil { return err } } - return nil + if reply.Len() == 0 { + return nil + } + return p.EnqueueP2PPacket(reply.Bytes()) } // handleGetHeadersCmd processes the getheaders request. diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 9244716cc..392934615 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -128,6 +128,16 @@ func (p *TCPPeer) EnqueueHPMessage(msg *Message) error { return p.putMsgIntoQueue(p.hpSendQ, msg) } +// EnqueueP2PPacket implements the Peer interface. +func (p *TCPPeer) EnqueueP2PPacket(b []byte) error { + return p.putPacketIntoQueue(context.Background(), p.p2pSendQ, b) +} + +// EnqueueHPPacket implements the Peer interface. +func (p *TCPPeer) EnqueueHPPacket(b []byte) error { + return p.putPacketIntoQueue(context.Background(), p.hpSendQ, b) +} + func (p *TCPPeer) writeMsg(msg *Message) error { b, err := msg.Bytes() if err != nil {