network: batch getdata replies

This is not exactly the protocol-level batching as was tried in #1770 and
proposed by neo-project/neo#2365, but it's a TCP-level change in that we now
Write() a set of messages and given that Go sets up TCP sockets with
TCP_NODELAY by default this is a substantial change, we have less packets
generated with the same amount of data. It doesn't change anything on properly
connected networks, but the ones with delays benefit from it a lot.

This also improves queueing because we no longer generate 32 messages to
deliver on transaction's GetData, it's just one stream of bytes with 32
messages inside.

Do the same with GetBlocksByIndex, we can have a lot of messages there too.

But don't forget about potential peer DoS attacks, if a peer is to request a
lot of big blocks we need to flush them before we process the whole set.
This commit is contained in:
Roman Khimov 2022-10-21 10:49:44 +03:00
parent bf4636f70a
commit cfb5058018
4 changed files with 72 additions and 14 deletions

View file

@ -2,6 +2,7 @@ package network
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net" "net"
"sync" "sync"
@ -115,17 +116,25 @@ func (p *localPeer) Disconnect(err error) {
} }
func (p *localPeer) BroadcastPacket(_ context.Context, m []byte) error { func (p *localPeer) BroadcastPacket(_ context.Context, m []byte) error {
if len(m) == 0 {
return errors.New("empty msg")
}
msg := &Message{} msg := &Message{}
r := io.NewBinReaderFromBuf(m) r := io.NewBinReaderFromBuf(m)
for r.Len() > 0 {
err := msg.Decode(r) err := msg.Decode(r)
if err == nil { if err == nil {
p.messageHandler(p.t, msg) p.messageHandler(p.t, msg)
} }
}
return nil return nil
} }
func (p *localPeer) EnqueueP2PMessage(msg *Message) error { func (p *localPeer) EnqueueP2PMessage(msg *Message) error {
return p.EnqueueHPMessage(msg) return p.EnqueueHPMessage(msg)
} }
func (p *localPeer) EnqueueP2PPacket(m []byte) error {
return p.BroadcastPacket(context.TODO(), m)
}
func (p *localPeer) BroadcastHPPacket(ctx context.Context, m []byte) error { func (p *localPeer) BroadcastHPPacket(ctx context.Context, m []byte) error {
return p.BroadcastPacket(ctx, m) return p.BroadcastPacket(ctx, m)
} }
@ -133,6 +142,9 @@ func (p *localPeer) EnqueueHPMessage(msg *Message) error {
p.messageHandler(p.t, msg) p.messageHandler(p.t, msg)
return nil return nil
} }
func (p *localPeer) EnqueueHPPacket(m []byte) error {
return p.BroadcastPacket(context.TODO(), m)
}
func (p *localPeer) Version() *payload.Version { func (p *localPeer) Version() *payload.Version {
return p.version return p.version
} }

View file

@ -39,10 +39,16 @@ type Peer interface {
// (handled by BroadcastPacket) but less important than high-priority // (handled by BroadcastPacket) but less important than high-priority
// messages (handled by EnqueueHPMessage). // messages (handled by EnqueueHPMessage).
EnqueueP2PMessage(*Message) error EnqueueP2PMessage(*Message) error
// EnqueueP2PPacket is similar to EnqueueP2PMessage, but accepts a slice of
// message(s) bytes.
EnqueueP2PPacket([]byte) error
// EnqueueHPMessage is similar to EnqueueP2PMessage, but uses a high-priority // EnqueueHPMessage is similar to EnqueueP2PMessage, but uses a high-priority
// queue. // queue.
EnqueueHPMessage(*Message) error EnqueueHPMessage(*Message) error
// EnqueueHPPacket is similar to EnqueueHPMessage, but accepts a slice of
// message(s) bytes.
EnqueueHPPacket([]byte) error
Version() *payload.Version Version() *payload.Version
LastBlockIndex() uint32 LastBlockIndex() uint32
Handshaked() bool Handshaked() bool

View file

@ -808,7 +808,15 @@ func (s *Server) handleMempoolCmd(p Peer) error {
// handleInvCmd processes the received inventory. // handleInvCmd processes the received inventory.
func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
var notFound []util.Uint256 var (
err error
notFound []util.Uint256
reply = io.NewBufBinWriter()
send = p.EnqueueP2PPacket
)
if inv.Type == payload.ExtensibleType {
send = p.EnqueueHPPacket
}
for _, hash := range inv.Hashes { for _, hash := range inv.Hashes {
var msg *Message var msg *Message
@ -839,19 +847,37 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
} }
} }
if msg != nil { if msg != nil {
var err error err = addMessageToPacket(reply, msg, send)
if inv.Type == payload.ExtensibleType {
err = p.EnqueueHPMessage(msg)
} else {
err = p.EnqueueP2PMessage(msg)
}
if err != nil { if err != nil {
return err return err
} }
} }
} }
if len(notFound) != 0 { if len(notFound) != 0 {
return p.EnqueueP2PMessage(NewMessage(CMDNotFound, payload.NewInventory(inv.Type, notFound))) err = addMessageToPacket(reply, NewMessage(CMDNotFound, payload.NewInventory(inv.Type, notFound)), send)
if err != nil {
return err
}
}
if reply.Len() == 0 {
return nil
}
return send(reply.Bytes())
}
// addMessageToPacket serializes given message into the given buffer and sends whole
// batch if it exceeds MaxSize/2 memory limit (to prevent DoS).
func addMessageToPacket(batch *io.BufBinWriter, msg *Message, send func([]byte) error) error {
err := msg.Encode(batch.BinWriter)
if err != nil {
return err
}
if batch.Len() > payload.MaxSize/2 {
err = send(batch.Bytes())
if err != nil {
return err
}
batch.Reset()
} }
return nil return nil
} }
@ -945,6 +971,7 @@ func (s *Server) handleGetBlocksCmd(p Peer, gb *payload.GetBlocks) error {
// handleGetBlockByIndexCmd processes the getblockbyindex request. // handleGetBlockByIndexCmd processes the getblockbyindex request.
func (s *Server) handleGetBlockByIndexCmd(p Peer, gbd *payload.GetBlockByIndex) error { func (s *Server) handleGetBlockByIndexCmd(p Peer, gbd *payload.GetBlockByIndex) error {
var reply = io.NewBufBinWriter()
count := gbd.Count count := gbd.Count
if gbd.Count < 0 || gbd.Count > payload.MaxHashesCount { if gbd.Count < 0 || gbd.Count > payload.MaxHashesCount {
count = payload.MaxHashesCount count = payload.MaxHashesCount
@ -958,13 +985,16 @@ func (s *Server) handleGetBlockByIndexCmd(p Peer, gbd *payload.GetBlockByIndex)
if err != nil { if err != nil {
break break
} }
msg := NewMessage(CMDBlock, b) err = addMessageToPacket(reply, NewMessage(CMDBlock, b), p.EnqueueP2PPacket)
if err = p.EnqueueP2PMessage(msg); err != nil { if err != nil {
return err return err
} }
} }
if reply.Len() == 0 {
return nil return nil
} }
return p.EnqueueP2PPacket(reply.Bytes())
}
// handleGetHeadersCmd processes the getheaders request. // handleGetHeadersCmd processes the getheaders request.
func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error { func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error {

View file

@ -128,6 +128,16 @@ func (p *TCPPeer) EnqueueHPMessage(msg *Message) error {
return p.putMsgIntoQueue(p.hpSendQ, msg) return p.putMsgIntoQueue(p.hpSendQ, msg)
} }
// EnqueueP2PPacket implements the Peer interface.
func (p *TCPPeer) EnqueueP2PPacket(b []byte) error {
return p.putPacketIntoQueue(context.Background(), p.p2pSendQ, b)
}
// EnqueueHPPacket implements the Peer interface.
func (p *TCPPeer) EnqueueHPPacket(b []byte) error {
return p.putPacketIntoQueue(context.Background(), p.hpSendQ, b)
}
func (p *TCPPeer) writeMsg(msg *Message) error { func (p *TCPPeer) writeMsg(msg *Message) error {
b, err := msg.Bytes() b, err := msg.Bytes()
if err != nil { if err != nil {