network: handle incoming message in a separate goroutine

Network communication takes time. Handling some messages (like transaction)
also takes time. We can share this time by making handler a separate
goroutine. So while message is being handled receiver can already get and
parse the next one.

It doesn't improve metrics a lot, but still I think it makes sense and in some
scenarios this can be more beneficial than this.

e41fc2fd1b, 4 nodes, 10 workers

RPS    6732.979 6396.160 6759.624 6246.398 6589.841 ≈ 6545   ± 3.02%
TPS    6491.062 5984.190 6275.652 5867.477 6360.797 ≈ 6196   ± 3.77%
CPU %    42.053   43.515   44.768   40.344   44.112 ≈   43.0 ± 3.69%
Mem MB 2564.130 2744.236 2636.267 2589.505 2765.926 ≈ 2660   ± 3.06%

Patched:

RPS    6902.296 6465.662 6856.044 6785.515 6157.024 ≈ 6633   ± 4.26% ↑ 1.34%
TPS    6468.431 6218.867 6610.565 6288.596 5790.556 ≈ 6275   ± 4.44% ↑ 1.28%
CPU %    50.231   42.925   49.481   48.396   42.662 ≈   46.7 ± 7.01% ↑ 8.60%
Mem MB 2856.841 2684.103 2756.195 2733.485 2422.787 ≈ 2691   ± 5.40% ↑ 1.17%
This commit is contained in:
Roman Khimov 2021-08-03 21:55:34 +03:00
parent 95e1f5f77b
commit f78bd6474f

View file

@ -26,6 +26,7 @@ const (
requestQueueSize = 32 requestQueueSize = 32
p2pMsgQueueSize = 16 p2pMsgQueueSize = 16
hpRequestQueueSize = 4 hpRequestQueueSize = 4
incomingQueueSize = 1 // Each message can be up to 32MB in size.
) )
var ( var (
@ -57,6 +58,7 @@ type TCPPeer struct {
sendQ chan []byte sendQ chan []byte
p2pSendQ chan []byte p2pSendQ chan []byte
hpSendQ chan []byte hpSendQ chan []byte
incoming chan *Message
// track outstanding getaddr requests. // track outstanding getaddr requests.
getAddrSent atomic.Int32 getAddrSent atomic.Int32
@ -75,6 +77,7 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
sendQ: make(chan []byte, requestQueueSize), sendQ: make(chan []byte, requestQueueSize),
p2pSendQ: make(chan []byte, p2pMsgQueueSize), p2pSendQ: make(chan []byte, p2pMsgQueueSize),
hpSendQ: make(chan []byte, hpRequestQueueSize), hpSendQ: make(chan []byte, hpRequestQueueSize),
incoming: make(chan *Message, incomingQueueSize),
} }
} }
@ -158,6 +161,7 @@ func (p *TCPPeer) handleConn() {
p.server.register <- p p.server.register <- p
go p.handleQueues() go p.handleQueues()
go p.handleIncoming()
// When a new peer is connected we send out our version immediately. // When a new peer is connected we send out our version immediately.
err = p.SendVersion() err = p.SendVersion()
if err == nil { if err == nil {
@ -172,14 +176,24 @@ func (p *TCPPeer) handleConn() {
} else if err != nil { } else if err != nil {
break break
} }
if err = p.server.handleMessage(p, msg); err != nil { p.incoming <- msg
}
}
close(p.incoming)
p.Disconnect(err)
}
func (p *TCPPeer) handleIncoming() {
var err error
for msg := range p.incoming {
err = p.server.handleMessage(p, msg)
if err != nil {
if p.Handshaked() { if p.Handshaked() {
err = fmt.Errorf("handling %s message: %w", msg.Command.String(), err) err = fmt.Errorf("handling %s message: %w", msg.Command.String(), err)
} }
break break
} }
} }
}
p.Disconnect(err) p.Disconnect(err)
} }