Merge pull request #2118 from nspcc-dev/neopt2

Networking improvements
This commit is contained in:
Roman Khimov 2021-08-10 13:29:40 +03:00 committed by GitHub
commit 0a2bbf3c04
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 57 deletions

View file

@ -68,11 +68,15 @@ type (
chain blockchainer.Blockchainer chain blockchainer.Blockchainer
bQueue *blockQueue bQueue *blockQueue
consensus consensus.Service consensus consensus.Service
mempool *mempool.Pool
notaryRequestPool *mempool.Pool notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool extensiblePool *extpool.Pool
notaryFeer NotaryFeer notaryFeer NotaryFeer
notaryModule *notary.Notary notaryModule *notary.Notary
txInLock sync.Mutex
txInMap map[util.Uint256]struct{}
lock sync.RWMutex lock sync.RWMutex
peers map[Peer]bool peers map[Peer]bool
@ -136,8 +140,10 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
quit: make(chan struct{}), quit: make(chan struct{}),
register: make(chan Peer), register: make(chan Peer),
unregister: make(chan peerDrop), unregister: make(chan peerDrop),
txInMap: make(map[util.Uint256]struct{}),
peers: make(map[Peer]bool), peers: make(map[Peer]bool),
syncReached: atomic.NewBool(false), syncReached: atomic.NewBool(false),
mempool: chain.GetMemPool(),
extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), extensiblePool: extpool.New(chain, config.ExtensiblePoolSize),
log: log, log: log,
transactions: make(chan *transaction.Transaction, 64), transactions: make(chan *transaction.Transaction, 64),
@ -282,7 +288,7 @@ func (s *Server) Shutdown() {
s.transport.Close() s.transport.Close()
s.discovery.Close() s.discovery.Close()
s.consensus.Shutdown() s.consensus.Shutdown()
for p := range s.Peers() { for _, p := range s.getPeers(nil) {
p.Disconnect(errServerShutdown) p.Disconnect(errServerShutdown)
} }
s.bQueue.discard() s.bQueue.discard()
@ -425,7 +431,7 @@ func (s *Server) runProto() {
case <-pingTimer.C: case <-pingTimer.C:
if s.chain.BlockHeight() == prevHeight { if s.chain.BlockHeight() == prevHeight {
// Get a copy of s.peers to avoid holding a lock while sending. // Get a copy of s.peers to avoid holding a lock while sending.
for peer := range s.Peers() { for _, peer := range s.getPeers(nil) {
_ = peer.SendPing(NewMessage(CMDPing, payload.NewPing(s.chain.BlockHeight(), s.id))) _ = peer.SendPing(NewMessage(CMDPing, payload.NewPing(s.chain.BlockHeight(), s.id)))
} }
} }
@ -483,15 +489,18 @@ func (s *Server) UnsubscribeFromNotaryRequests(ch chan<- mempoolevent.Event) {
s.notaryRequestPool.UnsubscribeFromTransactions(ch) s.notaryRequestPool.UnsubscribeFromTransactions(ch)
} }
// Peers returns the current list of peers connected to // getPeers returns current list of peers connected to the server filtered by
// the server. // isOK function if it's given.
func (s *Server) Peers() map[Peer]bool { func (s *Server) getPeers(isOK func(Peer) bool) []Peer {
s.lock.RLock() s.lock.RLock()
defer s.lock.RUnlock() defer s.lock.RUnlock()
peers := make(map[Peer]bool, len(s.peers)) peers := make([]Peer, 0, len(s.peers))
for k, v := range s.peers { for k := range s.peers {
peers[k] = v if isOK != nil && !isOK(k) {
continue
}
peers = append(peers, k)
} }
return peers return peers
@ -655,7 +664,7 @@ func (s *Server) handlePong(p Peer, pong *payload.Ping) error {
func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
reqHashes := make([]util.Uint256, 0) reqHashes := make([]util.Uint256, 0)
var typExists = map[payload.InventoryType]func(util.Uint256) bool{ var typExists = map[payload.InventoryType]func(util.Uint256) bool{
payload.TXType: s.chain.HasTransaction, payload.TXType: s.mempool.ContainsKey,
payload.BlockType: s.chain.HasBlock, payload.BlockType: s.chain.HasBlock,
payload.ExtensibleType: func(h util.Uint256) bool { payload.ExtensibleType: func(h util.Uint256) bool {
cp := s.extensiblePool.Get(h) cp := s.extensiblePool.Get(h)
@ -688,7 +697,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
// handleMempoolCmd handles getmempool command. // handleMempoolCmd handles getmempool command.
func (s *Server) handleMempoolCmd(p Peer) error { func (s *Server) handleMempoolCmd(p Peer) error {
txs := s.chain.GetMemPool().GetVerifiedTransactions() txs := s.mempool.GetVerifiedTransactions()
hs := make([]util.Uint256, 0, payload.MaxHashesCount) hs := make([]util.Uint256, 0, payload.MaxHashesCount)
for i := range txs { for i := range txs {
hs = append(hs, txs[i].Hash()) hs = append(hs, txs[i].Hash())
@ -874,10 +883,21 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error {
func (s *Server) handleTxCmd(tx *transaction.Transaction) error { func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
// It's OK for it to fail for various reasons like tx already existing // It's OK for it to fail for various reasons like tx already existing
// in the pool. // in the pool.
s.txInLock.Lock()
_, ok := s.txInMap[tx.Hash()]
if ok || s.mempool.ContainsKey(tx.Hash()) {
s.txInLock.Unlock()
return nil
}
s.txInMap[tx.Hash()] = struct{}{}
s.txInLock.Unlock()
if s.verifyAndPoolTX(tx) == nil { if s.verifyAndPoolTX(tx) == nil {
s.consensus.OnTransaction(tx) s.consensus.OnTransaction(tx)
s.broadcastTX(tx, nil) s.broadcastTX(tx, nil)
} }
s.txInLock.Lock()
delete(s.txInMap, tx.Hash())
s.txInLock.Unlock()
return nil return nil
} }
@ -1124,54 +1144,49 @@ func (s *Server) requestTx(hashes ...util.Uint256) {
// passed, one is to send the message and the other is to filtrate peers (the // passed, one is to send the message and the other is to filtrate peers (the
// peer is considered invalid if it returns false). // peer is considered invalid if it returns false).
func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) { func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) {
var deadN, peerN, sentN int
// Get a copy of s.peers to avoid holding a lock while sending. // Get a copy of s.peers to avoid holding a lock while sending.
peers := s.Peers() peers := s.getPeers(peerOK)
if len(peers) == 0 { peerN = len(peers)
if peerN == 0 {
return return
} }
mrand.Shuffle(peerN, func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
pkt, err := msg.Bytes() pkt, err := msg.Bytes()
if err != nil { if err != nil {
return return
} }
success := make(map[Peer]bool, len(peers)) // If true, this node isn't counted any more, either it's dead or we
okCount := 0 // have already sent an Inv to it.
sentCount := 0 finished := make([]bool, peerN)
for peer := range peers {
if peerOK != nil && !peerOK(peer) { // Try non-blocking sends first and only block if have to.
success[peer] = false for _, blocking := range []bool{false, true} {
continue for i, peer := range peers {
} // Send to 2/3 of good peers.
okCount++ if 3*sentN >= 2*(peerN-deadN) {
if err := send(peer, false, pkt); err != nil { return
}
if finished[i] {
continue continue
} }
err := send(peer, blocking, pkt)
switch err {
case nil:
if msg.Command == CMDGetAddr { if msg.Command == CMDGetAddr {
peer.AddGetAddrSent() peer.AddGetAddrSent()
} }
success[peer] = true sentN++
sentCount++ case errBusy: // Can be retried.
}
// Send to at least 2/3 of good peers.
if 3*sentCount >= 2*okCount {
return
}
// Perform blocking send now.
for peer := range peers {
if _, ok := success[peer]; ok || peerOK != nil && !peerOK(peer) {
continue continue
default:
deadN++
} }
if err := send(peer, true, pkt); err != nil { finished[i] = true
continue
}
if msg.Command == CMDGetAddr {
peer.AddGetAddrSent()
}
sentCount++
if 3*sentCount >= 2*okCount {
return
} }
} }
} }
@ -1247,8 +1262,7 @@ func (s *Server) initStaleMemPools() {
threshold = cfg.ValidatorsCount * 2 threshold = cfg.ValidatorsCount * 2
} }
mp := s.chain.GetMemPool() s.mempool.SetResendThreshold(uint32(threshold), s.broadcastTX)
mp.SetResendThreshold(uint32(threshold), s.broadcastTX)
if s.chain.P2PSigExtensionsEnabled() { if s.chain.P2PSigExtensionsEnabled() {
s.notaryRequestPool.SetResendThreshold(uint32(threshold), s.broadcastP2PNotaryRequestPayload) s.notaryRequestPool.SetResendThreshold(uint32(threshold), s.broadcastP2PNotaryRequestPayload)
} }

View file

@ -694,7 +694,7 @@ func TestInv(t *testing.T) {
}) })
t.Run("transaction", func(t *testing.T) { t.Run("transaction", func(t *testing.T) {
tx := newDummyTx() tx := newDummyTx()
s.chain.(*fakechain.FakeChain).PutTx(tx) require.NoError(t, s.chain.GetMemPool().Add(tx, s.chain))
hs := []util.Uint256{random.Uint256(), tx.Hash(), random.Uint256()} hs := []util.Uint256{random.Uint256(), tx.Hash(), random.Uint256()}
s.testHandleMessage(t, p, CMDInv, &payload.Inventory{ s.testHandleMessage(t, p, CMDInv, &payload.Inventory{
Type: payload.TXType, Type: payload.TXType,

View file

@ -26,6 +26,7 @@ const (
requestQueueSize = 32 requestQueueSize = 32
p2pMsgQueueSize = 16 p2pMsgQueueSize = 16
hpRequestQueueSize = 4 hpRequestQueueSize = 4
incomingQueueSize = 1 // Each message can be up to 32MB in size.
) )
var ( var (
@ -57,6 +58,7 @@ type TCPPeer struct {
sendQ chan []byte sendQ chan []byte
p2pSendQ chan []byte p2pSendQ chan []byte
hpSendQ chan []byte hpSendQ chan []byte
incoming chan *Message
// track outstanding getaddr requests. // track outstanding getaddr requests.
getAddrSent atomic.Int32 getAddrSent atomic.Int32
@ -75,6 +77,7 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
sendQ: make(chan []byte, requestQueueSize), sendQ: make(chan []byte, requestQueueSize),
p2pSendQ: make(chan []byte, p2pMsgQueueSize), p2pSendQ: make(chan []byte, p2pMsgQueueSize),
hpSendQ: make(chan []byte, hpRequestQueueSize), hpSendQ: make(chan []byte, hpRequestQueueSize),
incoming: make(chan *Message, incomingQueueSize),
} }
} }
@ -158,6 +161,7 @@ func (p *TCPPeer) handleConn() {
p.server.register <- p p.server.register <- p
go p.handleQueues() go p.handleQueues()
go p.handleIncoming()
// When a new peer is connected we send out our version immediately. // When a new peer is connected we send out our version immediately.
err = p.SendVersion() err = p.SendVersion()
if err == nil { if err == nil {
@ -172,14 +176,24 @@ func (p *TCPPeer) handleConn() {
} else if err != nil { } else if err != nil {
break break
} }
if err = p.server.handleMessage(p, msg); err != nil { p.incoming <- msg
}
}
close(p.incoming)
p.Disconnect(err)
}
func (p *TCPPeer) handleIncoming() {
var err error
for msg := range p.incoming {
err = p.server.handleMessage(p, msg)
if err != nil {
if p.Handshaked() { if p.Handshaked() {
err = fmt.Errorf("handling %s message: %w", msg.Command.String(), err) err = fmt.Errorf("handling %s message: %w", msg.Command.String(), err)
} }
break break
} }
} }
}
p.Disconnect(err) p.Disconnect(err)
} }