network: introduce peer sending queues

Two queues for high-priority and ordinary messages. Fixes #590. These queues
are deliberately made small to avoid buffer bloat problem, there is gonna be
another queueing layer above them to compensate for that. The queues are
designed to be synchronous in enqueueing, async capabilities are to be added
layer above later.
This commit is contained in:
Roman Khimov 2020-01-16 21:16:31 +03:00
parent 7f0882767c
commit 0ba6b2a754
6 changed files with 173 additions and 53 deletions

View file

@ -15,6 +15,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/core/storage"
"github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/crypto/keys"
"github.com/CityOfZion/neo-go/pkg/io"
"github.com/CityOfZion/neo-go/pkg/network/payload"
"github.com/CityOfZion/neo-go/pkg/util"
"github.com/CityOfZion/neo-go/pkg/vm"
@ -179,8 +180,24 @@ func (p *localPeer) PeerAddr() net.Addr {
}
func (p *localPeer) StartProtocol() {}
func (p *localPeer) Disconnect(err error) {}
func (p *localPeer) WriteMsg(msg *Message) error {
func (p *localPeer) EnqueueMessage(msg *Message) error {
b, err := msg.Bytes()
if err != nil {
return err
}
return p.EnqueuePacket(b)
}
func (p *localPeer) EnqueuePacket(m []byte) error {
return p.EnqueueHPPacket(m)
}
func (p *localPeer) EnqueueHPPacket(m []byte) error {
msg := &Message{}
r := io.NewBinReaderFromBuf(m)
err := msg.Decode(r)
if err == nil {
p.messageHandler(p.t, msg)
}
return nil
}
func (p *localPeer) Version() *payload.Version {
@ -197,10 +214,12 @@ func (p *localPeer) HandleVersion(v *payload.Version) error {
return nil
}
func (p *localPeer) SendVersion(m *Message) error {
return p.WriteMsg(m)
_ = p.EnqueueMessage(m)
return nil
}
func (p *localPeer) SendVersionAck(m *Message) error {
return p.WriteMsg(m)
_ = p.EnqueueMessage(m)
return nil
}
func (p *localPeer) HandleVersionAck() error {
p.handshaked = true

View file

@ -226,6 +226,18 @@ func (m *Message) Encode(br *io.BinWriter) error {
return nil
}
// Bytes serializes a Message into the new allocated buffer and returns it.
func (m *Message) Bytes() ([]byte, error) {
w := io.NewBufBinWriter()
if err := m.Encode(w.BinWriter); err != nil {
return nil, err
}
if w.Err != nil {
return nil, w.Err
}
return w.Bytes(), nil
}
// convert a command (string) to a byte slice filled with 0 bytes till
// size 12.
func cmdToByteArray(cmd CommandType) [cmdSize]byte {

View file

@ -18,7 +18,22 @@ type Peer interface {
// before that it returns the same address as RemoteAddr.
PeerAddr() net.Addr
Disconnect(error)
WriteMsg(msg *Message) error
// EnqueueMessage is a temporary wrapper that sends a message via
// EnqueuePacket if there is no error in serializing it.
EnqueueMessage(*Message) error
// EnqueuePacket is a blocking packet enqueuer, it doesn't return until
// it puts given packet into the queue. It accepts a slice of bytes that
// can be shared with other queues (so that message marshalling can be
// done once for all peers). Does nothing is the peer is not yet
// completed handshaking.
EnqueuePacket([]byte) error
// EnqueueHPPacket is a blocking high priority packet enqueuer, it
// doesn't return until it puts given packet into the high-priority
// queue.
EnqueueHPPacket([]byte) error
Version() *payload.Version
LastBlockIndex() uint32
UpdateLastBlockIndex(lbIndex uint32)

View file

@ -364,7 +364,7 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
// handlePing processes ping request.
func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
return p.WriteMsg(NewMessage(s.Net, CMDPong, payload.NewPing(s.id, s.chain.BlockHeight())))
return p.EnqueueMessage(NewMessage(s.Net, CMDPong, payload.NewPing(s.id, s.chain.BlockHeight())))
}
// handlePing processes pong request.
@ -400,43 +400,49 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
}
}
if len(reqHashes) > 0 {
payload := payload.NewInventory(inv.Type, reqHashes)
return p.WriteMsg(NewMessage(s.Net, CMDGetData, payload))
msg := NewMessage(s.Net, CMDGetData, payload.NewInventory(inv.Type, reqHashes))
pkt, err := msg.Bytes()
if err != nil {
return err
}
if inv.Type == payload.ConsensusType {
return p.EnqueueHPPacket(pkt)
}
return p.EnqueuePacket(pkt)
}
return nil
}
// handleInvCmd processes the received inventory.
func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
for _, hash := range inv.Hashes {
var msg *Message
switch inv.Type {
case payload.TXType:
for _, hash := range inv.Hashes {
tx, _, err := s.chain.GetTransaction(hash)
if err == nil {
err = p.WriteMsg(NewMessage(s.Net, CMDTX, tx))
if err != nil {
return err
}
}
msg = NewMessage(s.Net, CMDTX, tx)
}
case payload.BlockType:
for _, hash := range inv.Hashes {
b, err := s.chain.GetBlock(hash)
if err == nil {
err = p.WriteMsg(NewMessage(s.Net, CMDBlock, b))
msg = NewMessage(s.Net, CMDBlock, b)
}
case payload.ConsensusType:
if cp := s.consensus.GetPayload(hash); cp != nil {
msg = NewMessage(s.Net, CMDConsensus, cp)
}
}
if msg != nil {
pkt, err := msg.Bytes()
if err != nil {
return err
}
if inv.Type == payload.ConsensusType {
return p.EnqueueHPPacket(pkt)
}
}
case payload.ConsensusType:
for _, hash := range inv.Hashes {
if cp := s.consensus.GetPayload(hash); cp != nil {
if err := p.WriteMsg(NewMessage(s.Net, CMDConsensus, cp)); err != nil {
return err
}
}
return p.EnqueuePacket(pkt)
}
}
return nil
@ -468,7 +474,8 @@ func (s *Server) handleGetBlocksCmd(p Peer, gb *payload.GetBlocks) error {
return nil
}
payload := payload.NewInventory(payload.BlockType, blockHashes)
return p.WriteMsg(NewMessage(s.Net, CMDInv, payload))
msg := NewMessage(s.Net, CMDInv, payload)
return p.EnqueueMessage(msg)
}
// handleGetHeadersCmd processes the getheaders request.
@ -497,7 +504,8 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlocks) error {
if len(resp.Hdrs) == 0 {
return nil
}
return p.WriteMsg(NewMessage(s.Net, CMDHeaders, &resp))
msg := NewMessage(s.Net, CMDHeaders, &resp)
return p.EnqueueMessage(msg)
}
// handleConsensusCmd processes received consensus payload.
@ -538,7 +546,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
netaddr, _ := net.ResolveTCPAddr("tcp", addr)
alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts)
}
return p.WriteMsg(NewMessage(s.Net, CMDAddr, alist))
return p.EnqueueMessage(NewMessage(s.Net, CMDAddr, alist))
}
// requestHeaders sends a getheaders message to the peer.
@ -546,7 +554,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
func (s *Server) requestHeaders(p Peer) error {
start := []util.Uint256{s.chain.CurrentHeaderHash()}
payload := payload.NewGetBlocks(start, util.Uint256{})
return p.WriteMsg(NewMessage(s.Net, CMDGetHeaders, payload))
return p.EnqueueMessage(NewMessage(s.Net, CMDGetHeaders, payload))
}
// requestBlocks sends a getdata message to the peer
@ -565,7 +573,7 @@ func (s *Server) requestBlocks(p Peer) error {
}
if len(hashes) > 0 {
payload := payload.NewInventory(payload.BlockType, hashes)
return p.WriteMsg(NewMessage(s.Net, CMDGetData, payload))
return p.EnqueueMessage(NewMessage(s.Net, CMDGetData, payload))
} else if s.chain.HeaderHeight() < p.Version().StartHeight {
return s.requestHeaders(p)
}
@ -667,7 +675,7 @@ func (s *Server) relayInventoryCmd(cmd CommandType, t payload.InventoryType, has
continue
}
// Who cares about these messages anyway?
_ = peer.WriteMsg(msg)
_ = peer.EnqueueMessage(msg)
}
}

View file

@ -19,6 +19,9 @@ const (
versionReceived
verAckSent
verAckReceived
requestQueueSize = 32
hpRequestQueueSize = 4
)
var (
@ -42,6 +45,8 @@ type TCPPeer struct {
handShake handShakeStage
done chan struct{}
sendQ chan []byte
hpSendQ chan []byte
wg sync.WaitGroup
@ -55,26 +60,47 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
conn: conn,
server: s,
done: make(chan struct{}),
sendQ: make(chan []byte, requestQueueSize),
hpSendQ: make(chan []byte, hpRequestQueueSize),
}
}
// WriteMsg implements the Peer interface. This will write/encode the message
// to the underlying connection, this only works for messages other than Version
// or VerAck.
func (p *TCPPeer) WriteMsg(msg *Message) error {
// EnqueuePacket implements the Peer interface.
func (p *TCPPeer) EnqueuePacket(msg []byte) error {
if !p.Handshaked() {
return errStateMismatch
}
return p.writeMsg(msg)
p.sendQ <- msg
return nil
}
// EnqueueMessage is a temporary wrapper that sends a message via
// EnqueuePacket if there is no error in serializing it.
func (p *TCPPeer) EnqueueMessage(msg *Message) error {
b, err := msg.Bytes()
if err != nil {
return err
}
return p.EnqueuePacket(b)
}
// EnqueueHPPacket implements the Peer interface. It the peer is not yet
// handshaked it's a noop.
func (p *TCPPeer) EnqueueHPPacket(msg []byte) error {
if !p.Handshaked() {
return errStateMismatch
}
p.hpSendQ <- msg
return nil
}
func (p *TCPPeer) writeMsg(msg *Message) error {
w := io.NewBufBinWriter()
if err := msg.Encode(w.BinWriter); err != nil {
b, err := msg.Bytes()
if err != nil {
return err
}
_, err := p.conn.Write(w.Bytes())
_, err = p.conn.Write(b)
return err
}
@ -86,6 +112,7 @@ func (p *TCPPeer) handleConn() {
p.server.register <- p
go p.handleQueues()
// When a new peer is connected we send out our version immediately.
err = p.server.sendVersion(p)
if err == nil {
@ -108,6 +135,40 @@ func (p *TCPPeer) handleConn() {
p.Disconnect(err)
}
// handleQueues is a goroutine that is started automatically to handle
// send queues.
func (p *TCPPeer) handleQueues() {
var err error
for {
var msg []byte
// This one is to give priority to the hp queue
select {
case <-p.done:
return
case msg = <-p.hpSendQ:
default:
}
// If there is no message in the hp queue, block until one
// appears in any of the queues.
if msg == nil {
select {
case <-p.done:
return
case msg = <-p.hpSendQ:
case msg = <-p.sendQ:
}
}
_, err = p.conn.Write(msg)
if err != nil {
break
}
}
p.Disconnect(err)
}
// StartProtocol starts a long running background loop that interacts
// every ProtoTickInterval with the peer. It's only good to run after the
// handshake.
@ -136,7 +197,12 @@ func (p *TCPPeer) StartProtocol() {
case <-p.done:
return
case m := <-p.server.addrReq:
err = p.WriteMsg(m)
var pkt []byte
pkt, err = m.Bytes()
if err == nil {
err = p.EnqueueHPPacket(pkt)
}
case <-timer.C:
// Try to sync in headers and block with the peer if his block height is higher then ours.
if p.LastBlockIndex() > p.server.chain.BlockHeight() {
@ -153,7 +219,7 @@ func (p *TCPPeer) StartProtocol() {
diff := uint32(time.Now().UTC().Unix()) - block.Timestamp
if diff > uint32(p.server.PingInterval/time.Second) {
p.UpdatePingSent(p.GetPingSent() + 1)
err = p.WriteMsg(NewMessage(p.server.Net, CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight())))
err = p.EnqueueMessage(NewMessage(p.server.Net, CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight())))
}
}
}

View file

@ -30,8 +30,8 @@ func TestPeerHandshake(t *testing.T) {
require.Equal(t, false, tcpC.Handshaked())
// No ordinary messages can be written.
require.Error(t, tcpS.WriteMsg(&Message{}))
require.Error(t, tcpC.WriteMsg(&Message{}))
require.Error(t, tcpS.EnqueueMessage(&Message{}))
require.Error(t, tcpC.EnqueueMessage(&Message{}))
// Try to mess with VersionAck on both client and server, it should fail.
require.Error(t, tcpS.SendVersionAck(&Message{}))
@ -80,6 +80,6 @@ func TestPeerHandshake(t *testing.T) {
require.Error(t, tcpS.SendVersionAck(&Message{}))
// Now regular messaging can proceed.
require.NoError(t, tcpS.WriteMsg(&Message{}))
require.NoError(t, tcpC.WriteMsg(&Message{}))
require.NoError(t, tcpS.EnqueueMessage(&Message{}))
require.NoError(t, tcpC.EnqueueMessage(&Message{}))
}