From 0ef7a76e8401a58637610dd6b6b2736f1ce40bcd Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 18 Feb 2020 18:11:55 +0300 Subject: [PATCH 1/2] network: batch transactions on broadcast --- pkg/network/server.go | 70 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 79bf78ec2..86600405d 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -64,6 +64,8 @@ type ( unregister chan peerDrop quit chan struct{} + transactions chan *transaction.Transaction + consensusStarted *atomic.Bool log *zap.Logger @@ -97,6 +99,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* peers: make(map[Peer]bool), consensusStarted: atomic.NewBool(false), log: log, + transactions: make(chan *transaction.Transaction, 64), } s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) { if s.consensusStarted.Load() { @@ -174,6 +177,7 @@ func (s *Server) Start(errChan chan error) { s.discovery.BackFill(s.Seeds...) + go s.broadcastTxLoop() go s.bQueue.run() go s.transport.Accept() setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10)) @@ -602,7 +606,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { // in the pool. if s.verifyAndPoolTX(tx) == RelaySucceed { s.consensus.OnTransaction(tx) - go s.broadcastTX(tx) + s.broadcastTX(tx) } return nil } @@ -828,7 +832,14 @@ func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason { // broadcastTX broadcasts an inventory message about new transaction. func (s *Server) broadcastTX(t *transaction.Transaction) { - msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, []util.Uint256{t.Hash()})) + select { + case s.transactions <- t: + case <-s.quit: + } +} + +func (s *Server) broadcastTxHashes(hs []util.Uint256) { + msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, hs)) // We need to filter out non-relaying nodes, so plain broadcast // functions don't fit here. @@ -836,3 +847,58 @@ func (s *Server) broadcastTX(t *transaction.Transaction) { return p.Handshaked() && p.Version().Relay }) } + +// broadcastTxLoop is a loop for batching and sending +// transactions hashes in an INV payload. +func (s *Server) broadcastTxLoop() { + const ( + batchTime = time.Millisecond * 50 + batchSize = 32 + ) + + txs := make([]util.Uint256, 0, batchSize) + var timer *time.Timer + + timerCh := func() <-chan time.Time { + if timer == nil { + return nil + } + return timer.C + } + + broadcast := func() { + s.broadcastTxHashes(txs) + txs = txs[:0] + if timer != nil { + timer.Stop() + } + } + + for { + select { + case <-s.quit: + loop: + for { + select { + case <-s.transactions: + default: + break loop + } + } + return + case <-timerCh(): + if len(txs) > 0 { + broadcast() + } + case tx := <-s.transactions: + if len(txs) == 0 { + timer = time.NewTimer(batchTime) + } + + txs = append(txs, tx.Hash()) + if len(txs) == batchSize { + broadcast() + } + } + } +} From cbf26bac8329d590d88ee20bd7d00b8dd1d3aa8f Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 19 Feb 2020 16:10:40 +0300 Subject: [PATCH 2/2] mempool: do not allocate new slice for verified transactions Because transactions a iterated in an increasing order, we can filter slice in-place. --- pkg/core/mempool/mem_pool.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index e0371fe2a..09b30bc29 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -260,9 +260,9 @@ func (mp *Pool) Remove(hash util.Uint256) { // drop part of the mempool that is now invalid after the block acceptance. func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool) { mp.lock.Lock() - // We expect a lot of changes, so it's easier to allocate a new slice - // rather than move things in an old one. - newVerifiedTxes := make([]*item, 0, mp.capacity) + // We can reuse already allocated slice + // because items are iterated one-by-one in increasing order. + newVerifiedTxes := mp.verifiedTxes[:0] newInputs := mp.inputs[:0] newClaims := mp.claims[:0] for _, itm := range mp.verifiedTxes {