diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 85678d297..8ff47a18c 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -26,6 +26,7 @@ const ( requestQueueSize = 32 p2pMsgQueueSize = 16 hpRequestQueueSize = 4 + incomingQueueSize = 1 // Each message can be up to 32MB in size. ) var ( @@ -57,6 +58,7 @@ type TCPPeer struct { sendQ chan []byte p2pSendQ chan []byte hpSendQ chan []byte + incoming chan *Message // track outstanding getaddr requests. getAddrSent atomic.Int32 @@ -75,6 +77,7 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { sendQ: make(chan []byte, requestQueueSize), p2pSendQ: make(chan []byte, p2pMsgQueueSize), hpSendQ: make(chan []byte, hpRequestQueueSize), + incoming: make(chan *Message, incomingQueueSize), } } @@ -158,6 +161,7 @@ func (p *TCPPeer) handleConn() { p.server.register <- p go p.handleQueues() + go p.handleIncoming() // When a new peer is connected we send out our version immediately. err = p.SendVersion() if err == nil { @@ -172,12 +176,22 @@ func (p *TCPPeer) handleConn() { } else if err != nil { break } - if err = p.server.handleMessage(p, msg); err != nil { - if p.Handshaked() { - err = fmt.Errorf("handling %s message: %w", msg.Command.String(), err) - } - break + p.incoming <- msg + } + } + close(p.incoming) + p.Disconnect(err) +} + +func (p *TCPPeer) handleIncoming() { + var err error + for msg := range p.incoming { + err = p.server.handleMessage(p, msg) + if err != nil { + if p.Handshaked() { + err = fmt.Errorf("handling %s message: %w", msg.Command.String(), err) } + break } } p.Disconnect(err)