diff --git a/pkg/network/server.go b/pkg/network/server.go index 79bf78ec2..86600405d 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -64,6 +64,8 @@ type ( unregister chan peerDrop quit chan struct{} + transactions chan *transaction.Transaction + consensusStarted *atomic.Bool log *zap.Logger @@ -97,6 +99,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* peers: make(map[Peer]bool), consensusStarted: atomic.NewBool(false), log: log, + transactions: make(chan *transaction.Transaction, 64), } s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) { if s.consensusStarted.Load() { @@ -174,6 +177,7 @@ func (s *Server) Start(errChan chan error) { s.discovery.BackFill(s.Seeds...) + go s.broadcastTxLoop() go s.bQueue.run() go s.transport.Accept() setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10)) @@ -602,7 +606,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { // in the pool. if s.verifyAndPoolTX(tx) == RelaySucceed { s.consensus.OnTransaction(tx) - go s.broadcastTX(tx) + s.broadcastTX(tx) } return nil } @@ -828,7 +832,14 @@ func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason { // broadcastTX broadcasts an inventory message about new transaction. func (s *Server) broadcastTX(t *transaction.Transaction) { - msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, []util.Uint256{t.Hash()})) + select { + case s.transactions <- t: + case <-s.quit: + } +} + +func (s *Server) broadcastTxHashes(hs []util.Uint256) { + msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, hs)) // We need to filter out non-relaying nodes, so plain broadcast // functions don't fit here. @@ -836,3 +847,58 @@ func (s *Server) broadcastTX(t *transaction.Transaction) { return p.Handshaked() && p.Version().Relay }) } + +// broadcastTxLoop is a loop for batching and sending +// transactions hashes in an INV payload. +func (s *Server) broadcastTxLoop() { + const ( + batchTime = time.Millisecond * 50 + batchSize = 32 + ) + + txs := make([]util.Uint256, 0, batchSize) + var timer *time.Timer + + timerCh := func() <-chan time.Time { + if timer == nil { + return nil + } + return timer.C + } + + broadcast := func() { + s.broadcastTxHashes(txs) + txs = txs[:0] + if timer != nil { + timer.Stop() + } + } + + for { + select { + case <-s.quit: + loop: + for { + select { + case <-s.transactions: + default: + break loop + } + } + return + case <-timerCh(): + if len(txs) > 0 { + broadcast() + } + case tx := <-s.transactions: + if len(txs) == 0 { + timer = time.NewTimer(batchTime) + } + + txs = append(txs, tx.Hash()) + if len(txs) == batchSize { + broadcast() + } + } + } +}