Merge pull request #681 from nspcc-dev/feature/tx_batching

network: batch transactions on broadcast
This commit is contained in:
Roman Khimov 2020-03-02 15:54:19 +03:00 committed by GitHub
commit a2b9d85c80
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 5 deletions

View file

@ -260,9 +260,9 @@ func (mp *Pool) Remove(hash util.Uint256) {
// drop part of the mempool that is now invalid after the block acceptance. // drop part of the mempool that is now invalid after the block acceptance.
func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool) { func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool) {
mp.lock.Lock() mp.lock.Lock()
// We expect a lot of changes, so it's easier to allocate a new slice // We can reuse already allocated slice
// rather than move things in an old one. // because items are iterated one-by-one in increasing order.
newVerifiedTxes := make([]*item, 0, mp.capacity) newVerifiedTxes := mp.verifiedTxes[:0]
newInputs := mp.inputs[:0] newInputs := mp.inputs[:0]
newClaims := mp.claims[:0] newClaims := mp.claims[:0]
for _, itm := range mp.verifiedTxes { for _, itm := range mp.verifiedTxes {

View file

@ -64,6 +64,8 @@ type (
unregister chan peerDrop unregister chan peerDrop
quit chan struct{} quit chan struct{}
transactions chan *transaction.Transaction
consensusStarted *atomic.Bool consensusStarted *atomic.Bool
log *zap.Logger log *zap.Logger
@ -97,6 +99,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
peers: make(map[Peer]bool), peers: make(map[Peer]bool),
consensusStarted: atomic.NewBool(false), consensusStarted: atomic.NewBool(false),
log: log, log: log,
transactions: make(chan *transaction.Transaction, 64),
} }
s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) { s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) {
if s.consensusStarted.Load() { if s.consensusStarted.Load() {
@ -174,6 +177,7 @@ func (s *Server) Start(errChan chan error) {
s.discovery.BackFill(s.Seeds...) s.discovery.BackFill(s.Seeds...)
go s.broadcastTxLoop()
go s.bQueue.run() go s.bQueue.run()
go s.transport.Accept() go s.transport.Accept()
setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10)) setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10))
@ -602,7 +606,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
// in the pool. // in the pool.
if s.verifyAndPoolTX(tx) == RelaySucceed { if s.verifyAndPoolTX(tx) == RelaySucceed {
s.consensus.OnTransaction(tx) s.consensus.OnTransaction(tx)
go s.broadcastTX(tx) s.broadcastTX(tx)
} }
return nil return nil
} }
@ -828,7 +832,14 @@ func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason {
// broadcastTX broadcasts an inventory message about new transaction. // broadcastTX broadcasts an inventory message about new transaction.
func (s *Server) broadcastTX(t *transaction.Transaction) { func (s *Server) broadcastTX(t *transaction.Transaction) {
msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, []util.Uint256{t.Hash()})) select {
case s.transactions <- t:
case <-s.quit:
}
}
func (s *Server) broadcastTxHashes(hs []util.Uint256) {
msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, hs))
// We need to filter out non-relaying nodes, so plain broadcast // We need to filter out non-relaying nodes, so plain broadcast
// functions don't fit here. // functions don't fit here.
@ -836,3 +847,58 @@ func (s *Server) broadcastTX(t *transaction.Transaction) {
return p.Handshaked() && p.Version().Relay return p.Handshaked() && p.Version().Relay
}) })
} }
// broadcastTxLoop is a loop for batching and sending
// transactions hashes in an INV payload.
func (s *Server) broadcastTxLoop() {
const (
batchTime = time.Millisecond * 50
batchSize = 32
)
txs := make([]util.Uint256, 0, batchSize)
var timer *time.Timer
timerCh := func() <-chan time.Time {
if timer == nil {
return nil
}
return timer.C
}
broadcast := func() {
s.broadcastTxHashes(txs)
txs = txs[:0]
if timer != nil {
timer.Stop()
}
}
for {
select {
case <-s.quit:
loop:
for {
select {
case <-s.transactions:
default:
break loop
}
}
return
case <-timerCh():
if len(txs) > 0 {
broadcast()
}
case tx := <-s.transactions:
if len(txs) == 0 {
timer = time.NewTimer(batchTime)
}
txs = append(txs, tx.Hash())
if len(txs) == batchSize {
broadcast()
}
}
}
}