network: introduce peer sending queues

Two queues for high-priority and ordinary messages. Fixes #590. These queues
are deliberately made small to avoid buffer bloat problem, there is gonna be
another queueing layer above them to compensate for that. The queues are
designed to be synchronous in enqueueing, async capabilities are to be added
layer above later.
This commit is contained in:
Roman Khimov 2020-01-16 21:16:31 +03:00
parent 7f0882767c
commit 0ba6b2a754
6 changed files with 173 additions and 53 deletions

View file

@ -15,6 +15,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/core/storage" "github.com/CityOfZion/neo-go/pkg/core/storage"
"github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/crypto/keys" "github.com/CityOfZion/neo-go/pkg/crypto/keys"
"github.com/CityOfZion/neo-go/pkg/io"
"github.com/CityOfZion/neo-go/pkg/network/payload" "github.com/CityOfZion/neo-go/pkg/network/payload"
"github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/util"
"github.com/CityOfZion/neo-go/pkg/vm" "github.com/CityOfZion/neo-go/pkg/vm"
@ -179,8 +180,24 @@ func (p *localPeer) PeerAddr() net.Addr {
} }
func (p *localPeer) StartProtocol() {} func (p *localPeer) StartProtocol() {}
func (p *localPeer) Disconnect(err error) {} func (p *localPeer) Disconnect(err error) {}
func (p *localPeer) WriteMsg(msg *Message) error {
func (p *localPeer) EnqueueMessage(msg *Message) error {
b, err := msg.Bytes()
if err != nil {
return err
}
return p.EnqueuePacket(b)
}
func (p *localPeer) EnqueuePacket(m []byte) error {
return p.EnqueueHPPacket(m)
}
func (p *localPeer) EnqueueHPPacket(m []byte) error {
msg := &Message{}
r := io.NewBinReaderFromBuf(m)
err := msg.Decode(r)
if err == nil {
p.messageHandler(p.t, msg) p.messageHandler(p.t, msg)
}
return nil return nil
} }
func (p *localPeer) Version() *payload.Version { func (p *localPeer) Version() *payload.Version {
@ -197,10 +214,12 @@ func (p *localPeer) HandleVersion(v *payload.Version) error {
return nil return nil
} }
func (p *localPeer) SendVersion(m *Message) error { func (p *localPeer) SendVersion(m *Message) error {
return p.WriteMsg(m) _ = p.EnqueueMessage(m)
return nil
} }
func (p *localPeer) SendVersionAck(m *Message) error { func (p *localPeer) SendVersionAck(m *Message) error {
return p.WriteMsg(m) _ = p.EnqueueMessage(m)
return nil
} }
func (p *localPeer) HandleVersionAck() error { func (p *localPeer) HandleVersionAck() error {
p.handshaked = true p.handshaked = true

View file

@ -226,6 +226,18 @@ func (m *Message) Encode(br *io.BinWriter) error {
return nil return nil
} }
// Bytes serializes a Message into the new allocated buffer and returns it.
func (m *Message) Bytes() ([]byte, error) {
w := io.NewBufBinWriter()
if err := m.Encode(w.BinWriter); err != nil {
return nil, err
}
if w.Err != nil {
return nil, w.Err
}
return w.Bytes(), nil
}
// convert a command (string) to a byte slice filled with 0 bytes till // convert a command (string) to a byte slice filled with 0 bytes till
// size 12. // size 12.
func cmdToByteArray(cmd CommandType) [cmdSize]byte { func cmdToByteArray(cmd CommandType) [cmdSize]byte {

View file

@ -18,7 +18,22 @@ type Peer interface {
// before that it returns the same address as RemoteAddr. // before that it returns the same address as RemoteAddr.
PeerAddr() net.Addr PeerAddr() net.Addr
Disconnect(error) Disconnect(error)
WriteMsg(msg *Message) error
// EnqueueMessage is a temporary wrapper that sends a message via
// EnqueuePacket if there is no error in serializing it.
EnqueueMessage(*Message) error
// EnqueuePacket is a blocking packet enqueuer, it doesn't return until
// it puts given packet into the queue. It accepts a slice of bytes that
// can be shared with other queues (so that message marshalling can be
// done once for all peers). Does nothing is the peer is not yet
// completed handshaking.
EnqueuePacket([]byte) error
// EnqueueHPPacket is a blocking high priority packet enqueuer, it
// doesn't return until it puts given packet into the high-priority
// queue.
EnqueueHPPacket([]byte) error
Version() *payload.Version Version() *payload.Version
LastBlockIndex() uint32 LastBlockIndex() uint32
UpdateLastBlockIndex(lbIndex uint32) UpdateLastBlockIndex(lbIndex uint32)

View file

@ -364,7 +364,7 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
// handlePing processes ping request. // handlePing processes ping request.
func (s *Server) handlePing(p Peer, ping *payload.Ping) error { func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
return p.WriteMsg(NewMessage(s.Net, CMDPong, payload.NewPing(s.id, s.chain.BlockHeight()))) return p.EnqueueMessage(NewMessage(s.Net, CMDPong, payload.NewPing(s.id, s.chain.BlockHeight())))
} }
// handlePing processes pong request. // handlePing processes pong request.
@ -400,43 +400,49 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
} }
} }
if len(reqHashes) > 0 { if len(reqHashes) > 0 {
payload := payload.NewInventory(inv.Type, reqHashes) msg := NewMessage(s.Net, CMDGetData, payload.NewInventory(inv.Type, reqHashes))
return p.WriteMsg(NewMessage(s.Net, CMDGetData, payload)) pkt, err := msg.Bytes()
if err != nil {
return err
}
if inv.Type == payload.ConsensusType {
return p.EnqueueHPPacket(pkt)
}
return p.EnqueuePacket(pkt)
} }
return nil return nil
} }
// handleInvCmd processes the received inventory. // handleInvCmd processes the received inventory.
func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
for _, hash := range inv.Hashes {
var msg *Message
switch inv.Type { switch inv.Type {
case payload.TXType: case payload.TXType:
for _, hash := range inv.Hashes {
tx, _, err := s.chain.GetTransaction(hash) tx, _, err := s.chain.GetTransaction(hash)
if err == nil { if err == nil {
err = p.WriteMsg(NewMessage(s.Net, CMDTX, tx)) msg = NewMessage(s.Net, CMDTX, tx)
if err != nil {
return err
}
}
} }
case payload.BlockType: case payload.BlockType:
for _, hash := range inv.Hashes {
b, err := s.chain.GetBlock(hash) b, err := s.chain.GetBlock(hash)
if err == nil { if err == nil {
err = p.WriteMsg(NewMessage(s.Net, CMDBlock, b)) msg = NewMessage(s.Net, CMDBlock, b)
}
case payload.ConsensusType:
if cp := s.consensus.GetPayload(hash); cp != nil {
msg = NewMessage(s.Net, CMDConsensus, cp)
}
}
if msg != nil {
pkt, err := msg.Bytes()
if err != nil { if err != nil {
return err return err
} }
if inv.Type == payload.ConsensusType {
return p.EnqueueHPPacket(pkt)
} }
} return p.EnqueuePacket(pkt)
case payload.ConsensusType:
for _, hash := range inv.Hashes {
if cp := s.consensus.GetPayload(hash); cp != nil {
if err := p.WriteMsg(NewMessage(s.Net, CMDConsensus, cp)); err != nil {
return err
}
}
} }
} }
return nil return nil
@ -468,7 +474,8 @@ func (s *Server) handleGetBlocksCmd(p Peer, gb *payload.GetBlocks) error {
return nil return nil
} }
payload := payload.NewInventory(payload.BlockType, blockHashes) payload := payload.NewInventory(payload.BlockType, blockHashes)
return p.WriteMsg(NewMessage(s.Net, CMDInv, payload)) msg := NewMessage(s.Net, CMDInv, payload)
return p.EnqueueMessage(msg)
} }
// handleGetHeadersCmd processes the getheaders request. // handleGetHeadersCmd processes the getheaders request.
@ -497,7 +504,8 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlocks) error {
if len(resp.Hdrs) == 0 { if len(resp.Hdrs) == 0 {
return nil return nil
} }
return p.WriteMsg(NewMessage(s.Net, CMDHeaders, &resp)) msg := NewMessage(s.Net, CMDHeaders, &resp)
return p.EnqueueMessage(msg)
} }
// handleConsensusCmd processes received consensus payload. // handleConsensusCmd processes received consensus payload.
@ -538,7 +546,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
netaddr, _ := net.ResolveTCPAddr("tcp", addr) netaddr, _ := net.ResolveTCPAddr("tcp", addr)
alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts) alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts)
} }
return p.WriteMsg(NewMessage(s.Net, CMDAddr, alist)) return p.EnqueueMessage(NewMessage(s.Net, CMDAddr, alist))
} }
// requestHeaders sends a getheaders message to the peer. // requestHeaders sends a getheaders message to the peer.
@ -546,7 +554,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
func (s *Server) requestHeaders(p Peer) error { func (s *Server) requestHeaders(p Peer) error {
start := []util.Uint256{s.chain.CurrentHeaderHash()} start := []util.Uint256{s.chain.CurrentHeaderHash()}
payload := payload.NewGetBlocks(start, util.Uint256{}) payload := payload.NewGetBlocks(start, util.Uint256{})
return p.WriteMsg(NewMessage(s.Net, CMDGetHeaders, payload)) return p.EnqueueMessage(NewMessage(s.Net, CMDGetHeaders, payload))
} }
// requestBlocks sends a getdata message to the peer // requestBlocks sends a getdata message to the peer
@ -565,7 +573,7 @@ func (s *Server) requestBlocks(p Peer) error {
} }
if len(hashes) > 0 { if len(hashes) > 0 {
payload := payload.NewInventory(payload.BlockType, hashes) payload := payload.NewInventory(payload.BlockType, hashes)
return p.WriteMsg(NewMessage(s.Net, CMDGetData, payload)) return p.EnqueueMessage(NewMessage(s.Net, CMDGetData, payload))
} else if s.chain.HeaderHeight() < p.Version().StartHeight { } else if s.chain.HeaderHeight() < p.Version().StartHeight {
return s.requestHeaders(p) return s.requestHeaders(p)
} }
@ -667,7 +675,7 @@ func (s *Server) relayInventoryCmd(cmd CommandType, t payload.InventoryType, has
continue continue
} }
// Who cares about these messages anyway? // Who cares about these messages anyway?
_ = peer.WriteMsg(msg) _ = peer.EnqueueMessage(msg)
} }
} }

View file

@ -19,6 +19,9 @@ const (
versionReceived versionReceived
verAckSent verAckSent
verAckReceived verAckReceived
requestQueueSize = 32
hpRequestQueueSize = 4
) )
var ( var (
@ -42,6 +45,8 @@ type TCPPeer struct {
handShake handShakeStage handShake handShakeStage
done chan struct{} done chan struct{}
sendQ chan []byte
hpSendQ chan []byte
wg sync.WaitGroup wg sync.WaitGroup
@ -55,26 +60,47 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
conn: conn, conn: conn,
server: s, server: s,
done: make(chan struct{}), done: make(chan struct{}),
sendQ: make(chan []byte, requestQueueSize),
hpSendQ: make(chan []byte, hpRequestQueueSize),
} }
} }
// WriteMsg implements the Peer interface. This will write/encode the message // EnqueuePacket implements the Peer interface.
// to the underlying connection, this only works for messages other than Version func (p *TCPPeer) EnqueuePacket(msg []byte) error {
// or VerAck.
func (p *TCPPeer) WriteMsg(msg *Message) error {
if !p.Handshaked() { if !p.Handshaked() {
return errStateMismatch return errStateMismatch
} }
return p.writeMsg(msg) p.sendQ <- msg
return nil
}
// EnqueueMessage is a temporary wrapper that sends a message via
// EnqueuePacket if there is no error in serializing it.
func (p *TCPPeer) EnqueueMessage(msg *Message) error {
b, err := msg.Bytes()
if err != nil {
return err
}
return p.EnqueuePacket(b)
}
// EnqueueHPPacket implements the Peer interface. It the peer is not yet
// handshaked it's a noop.
func (p *TCPPeer) EnqueueHPPacket(msg []byte) error {
if !p.Handshaked() {
return errStateMismatch
}
p.hpSendQ <- msg
return nil
} }
func (p *TCPPeer) writeMsg(msg *Message) error { func (p *TCPPeer) writeMsg(msg *Message) error {
w := io.NewBufBinWriter() b, err := msg.Bytes()
if err := msg.Encode(w.BinWriter); err != nil { if err != nil {
return err return err
} }
_, err := p.conn.Write(w.Bytes()) _, err = p.conn.Write(b)
return err return err
} }
@ -86,6 +112,7 @@ func (p *TCPPeer) handleConn() {
p.server.register <- p p.server.register <- p
go p.handleQueues()
// When a new peer is connected we send out our version immediately. // When a new peer is connected we send out our version immediately.
err = p.server.sendVersion(p) err = p.server.sendVersion(p)
if err == nil { if err == nil {
@ -108,6 +135,40 @@ func (p *TCPPeer) handleConn() {
p.Disconnect(err) p.Disconnect(err)
} }
// handleQueues is a goroutine that is started automatically to handle
// send queues.
func (p *TCPPeer) handleQueues() {
var err error
for {
var msg []byte
// This one is to give priority to the hp queue
select {
case <-p.done:
return
case msg = <-p.hpSendQ:
default:
}
// If there is no message in the hp queue, block until one
// appears in any of the queues.
if msg == nil {
select {
case <-p.done:
return
case msg = <-p.hpSendQ:
case msg = <-p.sendQ:
}
}
_, err = p.conn.Write(msg)
if err != nil {
break
}
}
p.Disconnect(err)
}
// StartProtocol starts a long running background loop that interacts // StartProtocol starts a long running background loop that interacts
// every ProtoTickInterval with the peer. It's only good to run after the // every ProtoTickInterval with the peer. It's only good to run after the
// handshake. // handshake.
@ -136,7 +197,12 @@ func (p *TCPPeer) StartProtocol() {
case <-p.done: case <-p.done:
return return
case m := <-p.server.addrReq: case m := <-p.server.addrReq:
err = p.WriteMsg(m) var pkt []byte
pkt, err = m.Bytes()
if err == nil {
err = p.EnqueueHPPacket(pkt)
}
case <-timer.C: case <-timer.C:
// Try to sync in headers and block with the peer if his block height is higher then ours. // Try to sync in headers and block with the peer if his block height is higher then ours.
if p.LastBlockIndex() > p.server.chain.BlockHeight() { if p.LastBlockIndex() > p.server.chain.BlockHeight() {
@ -153,7 +219,7 @@ func (p *TCPPeer) StartProtocol() {
diff := uint32(time.Now().UTC().Unix()) - block.Timestamp diff := uint32(time.Now().UTC().Unix()) - block.Timestamp
if diff > uint32(p.server.PingInterval/time.Second) { if diff > uint32(p.server.PingInterval/time.Second) {
p.UpdatePingSent(p.GetPingSent() + 1) p.UpdatePingSent(p.GetPingSent() + 1)
err = p.WriteMsg(NewMessage(p.server.Net, CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight()))) err = p.EnqueueMessage(NewMessage(p.server.Net, CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight())))
} }
} }
} }

View file

@ -30,8 +30,8 @@ func TestPeerHandshake(t *testing.T) {
require.Equal(t, false, tcpC.Handshaked()) require.Equal(t, false, tcpC.Handshaked())
// No ordinary messages can be written. // No ordinary messages can be written.
require.Error(t, tcpS.WriteMsg(&Message{})) require.Error(t, tcpS.EnqueueMessage(&Message{}))
require.Error(t, tcpC.WriteMsg(&Message{})) require.Error(t, tcpC.EnqueueMessage(&Message{}))
// Try to mess with VersionAck on both client and server, it should fail. // Try to mess with VersionAck on both client and server, it should fail.
require.Error(t, tcpS.SendVersionAck(&Message{})) require.Error(t, tcpS.SendVersionAck(&Message{}))
@ -80,6 +80,6 @@ func TestPeerHandshake(t *testing.T) {
require.Error(t, tcpS.SendVersionAck(&Message{})) require.Error(t, tcpS.SendVersionAck(&Message{}))
// Now regular messaging can proceed. // Now regular messaging can proceed.
require.NoError(t, tcpS.WriteMsg(&Message{})) require.NoError(t, tcpS.EnqueueMessage(&Message{}))
require.NoError(t, tcpC.WriteMsg(&Message{})) require.NoError(t, tcpC.EnqueueMessage(&Message{}))
} }