Merge pull request #639 from nspcc-dev/networking-improvements

Networking improvements
This commit is contained in:
Roman Khimov 2020-02-03 16:26:04 +03:00 committed by GitHub
commit 9374c5e7c2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 132 additions and 45 deletions

View file

@ -18,6 +18,7 @@ type Discoverer interface {
RequestRemote(int) RequestRemote(int)
RegisterBadAddr(string) RegisterBadAddr(string)
RegisterGoodAddr(string) RegisterGoodAddr(string)
RegisterConnectedAddr(string)
UnregisterConnectedAddr(string) UnregisterConnectedAddr(string)
UnconnectedPeers() []string UnconnectedPeers() []string
BadPeers() []string BadPeers() []string
@ -153,8 +154,8 @@ func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) {
d.lock.Unlock() d.lock.Unlock()
} }
// registerConnectedAddr tells discoverer that given address is now connected. // RegisterConnectedAddr tells discoverer that given address is now connected.
func (d *DefaultDiscovery) registerConnectedAddr(addr string) { func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) {
d.lock.Lock() d.lock.Lock()
delete(d.unconnectedAddrs, addr) delete(d.unconnectedAddrs, addr)
d.connectedAddrs[addr] = true d.connectedAddrs[addr] = true
@ -166,7 +167,7 @@ func (d *DefaultDiscovery) tryAddress(addr string) {
d.RegisterBadAddr(addr) d.RegisterBadAddr(addr)
d.RequestRemote(1) d.RequestRemote(1)
} else { } else {
d.registerConnectedAddr(addr) d.RegisterConnectedAddr(addr)
} }
} }

View file

@ -136,6 +136,7 @@ func (d testDiscovery) BackFill(addrs ...string) {}
func (d testDiscovery) PoolCount() int { return 0 } func (d testDiscovery) PoolCount() int { return 0 }
func (d testDiscovery) RegisterBadAddr(string) {} func (d testDiscovery) RegisterBadAddr(string) {}
func (d testDiscovery) RegisterGoodAddr(string) {} func (d testDiscovery) RegisterGoodAddr(string) {}
func (d testDiscovery) RegisterConnectedAddr(string) {}
func (d testDiscovery) UnregisterConnectedAddr(string) {} func (d testDiscovery) UnregisterConnectedAddr(string) {}
func (d testDiscovery) UnconnectedPeers() []string { return []string{} } func (d testDiscovery) UnconnectedPeers() []string { return []string{} }
func (d testDiscovery) RequestRemote(n int) {} func (d testDiscovery) RequestRemote(n int) {}
@ -193,6 +194,12 @@ func (p *localPeer) EnqueueMessage(msg *Message) error {
func (p *localPeer) EnqueuePacket(m []byte) error { func (p *localPeer) EnqueuePacket(m []byte) error {
return p.EnqueueHPPacket(m) return p.EnqueueHPPacket(m)
} }
func (p *localPeer) EnqueueP2PMessage(msg *Message) error {
return p.EnqueueMessage(msg)
}
func (p *localPeer) EnqueueP2PPacket(m []byte) error {
return p.EnqueueHPPacket(m)
}
func (p *localPeer) EnqueueHPPacket(m []byte) error { func (p *localPeer) EnqueueHPPacket(m []byte) error {
msg := &Message{} msg := &Message{}
r := io.NewBinReaderFromBuf(m) r := io.NewBinReaderFromBuf(m)

View file

@ -30,6 +30,20 @@ type Peer interface {
// completed handshaking. // completed handshaking.
EnqueuePacket([]byte) error EnqueuePacket([]byte) error
// EnqueueP2PMessage is a temporary wrapper that sends a message via
// EnqueueP2PPacket if there is no error in serializing it.
EnqueueP2PMessage(*Message) error
// EnqueueP2PPacket is a blocking packet enqueuer, it doesn't return until
// it puts given packet into the queue. It accepts a slice of bytes that
// can be shared with other queues (so that message marshalling can be
// done once for all peers). Does nothing is the peer is not yet
// completed handshaking. This queue is intended to be used for unicast
// peer to peer communication that is more important than broadcasts
// (handled by EnqueuePacket), but less important than high-priority
// messages (handled by EnqueueHPPacket).
EnqueueP2PPacket([]byte) error
// EnqueueHPPacket is a blocking high priority packet enqueuer, it // EnqueueHPPacket is a blocking high priority packet enqueuer, it
// doesn't return until it puts given packet into the high-priority // doesn't return until it puts given packet into the high-priority
// queue. // queue.

View file

@ -214,8 +214,8 @@ func (s *Server) run() {
s.lock.Lock() s.lock.Lock()
s.peers[p] = true s.peers[p] = true
s.lock.Unlock() s.lock.Unlock()
s.log.Info("new peer connected", zap.Stringer("addr", p.RemoteAddr()))
peerCount := s.PeerCount() peerCount := s.PeerCount()
s.log.Info("new peer connected", zap.Stringer("addr", p.RemoteAddr()), zap.Int("peerCount", peerCount))
if peerCount > s.MaxPeers { if peerCount > s.MaxPeers {
s.lock.RLock() s.lock.RLock()
// Pick a random peer and drop connection to it. // Pick a random peer and drop connection to it.
@ -239,7 +239,7 @@ func (s *Server) run() {
addr := drop.peer.PeerAddr().String() addr := drop.peer.PeerAddr().String()
if drop.reason == errIdenticalID { if drop.reason == errIdenticalID {
s.discovery.RegisterBadAddr(addr) s.discovery.RegisterBadAddr(addr)
} else { } else if drop.reason != errAlreadyConnected {
s.discovery.UnregisterConnectedAddr(addr) s.discovery.UnregisterConnectedAddr(addr)
s.discovery.BackFill(addr) s.discovery.BackFill(addr)
} }
@ -348,10 +348,15 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {
return errIdenticalID return errIdenticalID
} }
peerAddr := p.PeerAddr().String() peerAddr := p.PeerAddr().String()
s.discovery.RegisterConnectedAddr(peerAddr)
s.lock.RLock() s.lock.RLock()
for peer := range s.peers { for peer := range s.peers {
if p == peer {
continue
}
ver := peer.Version()
// Already connected, drop this connection. // Already connected, drop this connection.
if peer.Handshaked() && peer.PeerAddr().String() == peerAddr && peer.Version().Nonce == version.Nonce { if ver != nil && ver.Nonce == version.Nonce && peer.PeerAddr().String() == peerAddr {
s.lock.RUnlock() s.lock.RUnlock()
return errAlreadyConnected return errAlreadyConnected
} }
@ -384,7 +389,7 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
// handlePing processes ping request. // handlePing processes ping request.
func (s *Server) handlePing(p Peer, ping *payload.Ping) error { func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
return p.EnqueueMessage(s.MkMsg(CMDPong, payload.NewPing(s.id, s.chain.BlockHeight()))) return p.EnqueueP2PMessage(s.MkMsg(CMDPong, payload.NewPing(s.chain.BlockHeight(), s.id)))
} }
// handlePing processes pong request. // handlePing processes pong request.
@ -426,7 +431,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
if inv.Type == payload.ConsensusType { if inv.Type == payload.ConsensusType {
return p.EnqueueHPPacket(pkt) return p.EnqueueHPPacket(pkt)
} }
return p.EnqueuePacket(pkt) return p.EnqueueP2PPacket(pkt)
} }
return nil return nil
} }
@ -458,7 +463,7 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
if inv.Type == payload.ConsensusType { if inv.Type == payload.ConsensusType {
err = p.EnqueueHPPacket(pkt) err = p.EnqueueHPPacket(pkt)
} else { } else {
err = p.EnqueuePacket(pkt) err = p.EnqueueP2PPacket(pkt)
} }
} }
if err != nil { if err != nil {
@ -496,7 +501,7 @@ func (s *Server) handleGetBlocksCmd(p Peer, gb *payload.GetBlocks) error {
} }
payload := payload.NewInventory(payload.BlockType, blockHashes) payload := payload.NewInventory(payload.BlockType, blockHashes)
msg := s.MkMsg(CMDInv, payload) msg := s.MkMsg(CMDInv, payload)
return p.EnqueueMessage(msg) return p.EnqueueP2PMessage(msg)
} }
// handleGetHeadersCmd processes the getheaders request. // handleGetHeadersCmd processes the getheaders request.
@ -526,7 +531,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlocks) error {
return nil return nil
} }
msg := s.MkMsg(CMDHeaders, &resp) msg := s.MkMsg(CMDHeaders, &resp)
return p.EnqueueMessage(msg) return p.EnqueueP2PMessage(msg)
} }
// handleConsensusCmd processes received consensus payload. // handleConsensusCmd processes received consensus payload.
@ -539,10 +544,12 @@ func (s *Server) handleConsensusCmd(cp *consensus.Payload) error {
// handleTxCmd processes received transaction. // handleTxCmd processes received transaction.
// It never returns an error. // It never returns an error.
func (s *Server) handleTxCmd(tx *transaction.Transaction) error { func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
s.consensus.OnTransaction(tx)
// It's OK for it to fail for various reasons like tx already existing // It's OK for it to fail for various reasons like tx already existing
// in the pool. // in the pool.
_ = s.RelayTxn(tx) if s.verifyAndPoolTX(tx) == RelaySucceed {
s.consensus.OnTransaction(tx)
go s.broadcastTX(tx)
}
return nil return nil
} }
@ -567,7 +574,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
netaddr, _ := net.ResolveTCPAddr("tcp", addr) netaddr, _ := net.ResolveTCPAddr("tcp", addr)
alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts) alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts)
} }
return p.EnqueueMessage(s.MkMsg(CMDAddr, alist)) return p.EnqueueP2PMessage(s.MkMsg(CMDAddr, alist))
} }
// requestHeaders sends a getheaders message to the peer. // requestHeaders sends a getheaders message to the peer.
@ -575,7 +582,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
func (s *Server) requestHeaders(p Peer) error { func (s *Server) requestHeaders(p Peer) error {
start := []util.Uint256{s.chain.CurrentHeaderHash()} start := []util.Uint256{s.chain.CurrentHeaderHash()}
payload := payload.NewGetBlocks(start, util.Uint256{}) payload := payload.NewGetBlocks(start, util.Uint256{})
return p.EnqueueMessage(s.MkMsg(CMDGetHeaders, payload)) return p.EnqueueP2PMessage(s.MkMsg(CMDGetHeaders, payload))
} }
// requestBlocks sends a getdata message to the peer // requestBlocks sends a getdata message to the peer
@ -594,7 +601,7 @@ func (s *Server) requestBlocks(p Peer) error {
} }
if len(hashes) > 0 { if len(hashes) > 0 {
payload := payload.NewInventory(payload.BlockType, hashes) payload := payload.NewInventory(payload.BlockType, hashes)
return p.EnqueueMessage(s.MkMsg(CMDGetData, payload)) return p.EnqueueP2PMessage(s.MkMsg(CMDGetData, payload))
} else if s.chain.HeaderHeight() < p.LastBlockIndex() { } else if s.chain.HeaderHeight() < p.LastBlockIndex() {
return s.requestHeaders(p) return s.requestHeaders(p)
} }
@ -603,6 +610,10 @@ func (s *Server) requestBlocks(p Peer) error {
// handleMessage processes the given message. // handleMessage processes the given message.
func (s *Server) handleMessage(peer Peer, msg *Message) error { func (s *Server) handleMessage(peer Peer, msg *Message) error {
s.log.Debug("got msg",
zap.Stringer("addr", peer.RemoteAddr()),
zap.String("type", string(msg.CommandType())))
// Make sure both server and peer are operating on // Make sure both server and peer are operating on
// the same network. // the same network.
if msg.Magic != s.Net { if msg.Magic != s.Net {
@ -727,9 +738,8 @@ func (s *Server) relayBlock(b *block.Block) {
s.broadcastMessage(msg) s.broadcastMessage(msg)
} }
// RelayTxn a new transaction to the local node and the connected peers. // verifyAndPoolTX verifies the TX and adds it to the local mempool.
// Reference: the method OnRelay in C#: https://github.com/neo-project/neo/blob/master/neo/Network/P2P/LocalNode.cs#L159 func (s *Server) verifyAndPoolTX(t *transaction.Transaction) RelayReason {
func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason {
if t.Type == transaction.MinerType { if t.Type == transaction.MinerType {
return RelayInvalid return RelayInvalid
} }
@ -745,7 +755,21 @@ func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason {
if ok := s.chain.GetMemPool().TryAdd(t.Hash(), mempool.NewPoolItem(t, s.chain)); !ok { if ok := s.chain.GetMemPool().TryAdd(t.Hash(), mempool.NewPoolItem(t, s.chain)); !ok {
return RelayOutOfMemory return RelayOutOfMemory
} }
return RelaySucceed
}
// RelayTxn a new transaction to the local node and the connected peers.
// Reference: the method OnRelay in C#: https://github.com/neo-project/neo/blob/master/neo/Network/P2P/LocalNode.cs#L159
func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason {
ret := s.verifyAndPoolTX(t)
if ret == RelaySucceed {
s.broadcastTX(t)
}
return ret
}
// broadcastTX broadcasts an inventory message about new transaction.
func (s *Server) broadcastTX(t *transaction.Transaction) {
msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, []util.Uint256{t.Hash()})) msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, []util.Uint256{t.Hash()}))
// We need to filter out non-relaying nodes, so plain broadcast // We need to filter out non-relaying nodes, so plain broadcast
@ -753,6 +777,4 @@ func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason {
s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool {
return p.Handshaked() && p.Version().Relay return p.Handshaked() && p.Version().Relay
}) })
return RelaySucceed
} }

View file

@ -2,6 +2,7 @@ package network
import ( import (
"errors" "errors"
"fmt"
"net" "net"
"strconv" "strconv"
"sync" "sync"
@ -21,6 +22,7 @@ const (
verAckReceived verAckReceived
requestQueueSize = 32 requestQueueSize = 32
p2pMsgQueueSize = 16
hpRequestQueueSize = 4 hpRequestQueueSize = 4
) )
@ -48,6 +50,7 @@ type TCPPeer struct {
done chan struct{} done chan struct{}
sendQ chan []byte sendQ chan []byte
p2pSendQ chan []byte
hpSendQ chan []byte hpSendQ chan []byte
wg sync.WaitGroup wg sync.WaitGroup
@ -64,37 +67,56 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
server: s, server: s,
done: make(chan struct{}), done: make(chan struct{}),
sendQ: make(chan []byte, requestQueueSize), sendQ: make(chan []byte, requestQueueSize),
p2pSendQ: make(chan []byte, p2pMsgQueueSize),
hpSendQ: make(chan []byte, hpRequestQueueSize), hpSendQ: make(chan []byte, hpRequestQueueSize),
} }
} }
// EnqueuePacket implements the Peer interface. // putPacketIntoQueue puts given message into the given queue if the peer has
func (p *TCPPeer) EnqueuePacket(msg []byte) error { // done handshaking.
func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, msg []byte) error {
if !p.Handshaked() { if !p.Handshaked() {
return errStateMismatch return errStateMismatch
} }
p.sendQ <- msg queue <- msg
return nil return nil
} }
// EnqueuePacket implements the Peer interface.
func (p *TCPPeer) EnqueuePacket(msg []byte) error {
return p.putPacketIntoQueue(p.sendQ, msg)
}
// putMessageIntoQueue serializes given Message and puts it into given queue if
// the peer has done handshaking.
func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error {
b, err := msg.Bytes()
if err != nil {
return err
}
return p.putPacketIntoQueue(queue, b)
}
// EnqueueMessage is a temporary wrapper that sends a message via // EnqueueMessage is a temporary wrapper that sends a message via
// EnqueuePacket if there is no error in serializing it. // EnqueuePacket if there is no error in serializing it.
func (p *TCPPeer) EnqueueMessage(msg *Message) error { func (p *TCPPeer) EnqueueMessage(msg *Message) error {
b, err := msg.Bytes() return p.putMsgIntoQueue(p.sendQ, msg)
if err != nil { }
return err
} // EnqueueP2PPacket implements the Peer interface.
return p.EnqueuePacket(b) func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error {
return p.putPacketIntoQueue(p.p2pSendQ, msg)
}
// EnqueueP2PMessage implements the Peer interface.
func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error {
return p.putMsgIntoQueue(p.p2pSendQ, msg)
} }
// EnqueueHPPacket implements the Peer interface. It the peer is not yet // EnqueueHPPacket implements the Peer interface. It the peer is not yet
// handshaked it's a noop. // handshaked it's a noop.
func (p *TCPPeer) EnqueueHPPacket(msg []byte) error { func (p *TCPPeer) EnqueueHPPacket(msg []byte) error {
if !p.Handshaked() { return p.putPacketIntoQueue(p.hpSendQ, msg)
return errStateMismatch
}
p.hpSendQ <- msg
return nil
} }
func (p *TCPPeer) writeMsg(msg *Message) error { func (p *TCPPeer) writeMsg(msg *Message) error {
@ -131,6 +153,9 @@ func (p *TCPPeer) handleConn() {
break break
} }
if err = p.server.handleMessage(p, msg); err != nil { if err = p.server.handleMessage(p, msg); err != nil {
if p.Handshaked() {
err = fmt.Errorf("handling %s message: %v", msg.CommandType(), err)
}
break break
} }
} }
@ -142,6 +167,11 @@ func (p *TCPPeer) handleConn() {
// send queues. // send queues.
func (p *TCPPeer) handleQueues() { func (p *TCPPeer) handleQueues() {
var err error var err error
// p2psend queue shares its time with send queue in around
// ((p2pSkipDivisor - 1) * 2 + 1)/1 ratio, ratio because the third
// select can still choose p2psend over send.
var p2pSkipCounter uint32
const p2pSkipDivisor = 4
for { for {
var msg []byte var msg []byte
@ -154,13 +184,25 @@ func (p *TCPPeer) handleQueues() {
default: default:
} }
// If there is no message in the hp queue, block until one // Skip this select every p2pSkipDivisor iteration.
if msg == nil && p2pSkipCounter%p2pSkipDivisor != 0 {
// Then look at the p2p queue.
select {
case <-p.done:
return
case msg = <-p.hpSendQ:
case msg = <-p.p2pSendQ:
default:
}
}
// If there is no message in HP or P2P queues, block until one
// appears in any of the queues. // appears in any of the queues.
if msg == nil { if msg == nil {
select { select {
case <-p.done: case <-p.done:
return return
case msg = <-p.hpSendQ: case msg = <-p.hpSendQ:
case msg = <-p.p2pSendQ:
case msg = <-p.sendQ: case msg = <-p.sendQ:
} }
} }
@ -168,6 +210,7 @@ func (p *TCPPeer) handleQueues() {
if err != nil { if err != nil {
break break
} }
p2pSkipCounter++
} }
p.Disconnect(err) p.Disconnect(err)
} }
@ -297,7 +340,7 @@ func (p *TCPPeer) RemoteAddr() net.Addr {
func (p *TCPPeer) PeerAddr() net.Addr { func (p *TCPPeer) PeerAddr() net.Addr {
remote := p.conn.RemoteAddr() remote := p.conn.RemoteAddr()
// The network can be non-tcp in unit tests. // The network can be non-tcp in unit tests.
if !p.Handshaked() || remote.Network() != "tcp" { if p.version == nil || remote.Network() != "tcp" {
return p.RemoteAddr() return p.RemoteAddr()
} }
host, _, err := net.SplitHostPort(remote.String()) host, _, err := net.SplitHostPort(remote.String())