forked from TrueCloudLab/neoneo-go
network: batch transactions on broadcast
This commit is contained in:
parent
ff551d2015
commit
0ef7a76e84
1 changed files with 68 additions and 2 deletions
|
@ -64,6 +64,8 @@ type (
|
||||||
unregister chan peerDrop
|
unregister chan peerDrop
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
|
|
||||||
|
transactions chan *transaction.Transaction
|
||||||
|
|
||||||
consensusStarted *atomic.Bool
|
consensusStarted *atomic.Bool
|
||||||
|
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
|
@ -97,6 +99,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
|
||||||
peers: make(map[Peer]bool),
|
peers: make(map[Peer]bool),
|
||||||
consensusStarted: atomic.NewBool(false),
|
consensusStarted: atomic.NewBool(false),
|
||||||
log: log,
|
log: log,
|
||||||
|
transactions: make(chan *transaction.Transaction, 64),
|
||||||
}
|
}
|
||||||
s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) {
|
s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) {
|
||||||
if s.consensusStarted.Load() {
|
if s.consensusStarted.Load() {
|
||||||
|
@ -174,6 +177,7 @@ func (s *Server) Start(errChan chan error) {
|
||||||
|
|
||||||
s.discovery.BackFill(s.Seeds...)
|
s.discovery.BackFill(s.Seeds...)
|
||||||
|
|
||||||
|
go s.broadcastTxLoop()
|
||||||
go s.bQueue.run()
|
go s.bQueue.run()
|
||||||
go s.transport.Accept()
|
go s.transport.Accept()
|
||||||
setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10))
|
setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10))
|
||||||
|
@ -602,7 +606,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
|
||||||
// in the pool.
|
// in the pool.
|
||||||
if s.verifyAndPoolTX(tx) == RelaySucceed {
|
if s.verifyAndPoolTX(tx) == RelaySucceed {
|
||||||
s.consensus.OnTransaction(tx)
|
s.consensus.OnTransaction(tx)
|
||||||
go s.broadcastTX(tx)
|
s.broadcastTX(tx)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -828,7 +832,14 @@ func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason {
|
||||||
|
|
||||||
// broadcastTX broadcasts an inventory message about new transaction.
|
// broadcastTX broadcasts an inventory message about new transaction.
|
||||||
func (s *Server) broadcastTX(t *transaction.Transaction) {
|
func (s *Server) broadcastTX(t *transaction.Transaction) {
|
||||||
msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, []util.Uint256{t.Hash()}))
|
select {
|
||||||
|
case s.transactions <- t:
|
||||||
|
case <-s.quit:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) broadcastTxHashes(hs []util.Uint256) {
|
||||||
|
msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, hs))
|
||||||
|
|
||||||
// We need to filter out non-relaying nodes, so plain broadcast
|
// We need to filter out non-relaying nodes, so plain broadcast
|
||||||
// functions don't fit here.
|
// functions don't fit here.
|
||||||
|
@ -836,3 +847,58 @@ func (s *Server) broadcastTX(t *transaction.Transaction) {
|
||||||
return p.Handshaked() && p.Version().Relay
|
return p.Handshaked() && p.Version().Relay
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// broadcastTxLoop is a loop for batching and sending
|
||||||
|
// transactions hashes in an INV payload.
|
||||||
|
func (s *Server) broadcastTxLoop() {
|
||||||
|
const (
|
||||||
|
batchTime = time.Millisecond * 50
|
||||||
|
batchSize = 32
|
||||||
|
)
|
||||||
|
|
||||||
|
txs := make([]util.Uint256, 0, batchSize)
|
||||||
|
var timer *time.Timer
|
||||||
|
|
||||||
|
timerCh := func() <-chan time.Time {
|
||||||
|
if timer == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return timer.C
|
||||||
|
}
|
||||||
|
|
||||||
|
broadcast := func() {
|
||||||
|
s.broadcastTxHashes(txs)
|
||||||
|
txs = txs[:0]
|
||||||
|
if timer != nil {
|
||||||
|
timer.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.quit:
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.transactions:
|
||||||
|
default:
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
case <-timerCh():
|
||||||
|
if len(txs) > 0 {
|
||||||
|
broadcast()
|
||||||
|
}
|
||||||
|
case tx := <-s.transactions:
|
||||||
|
if len(txs) == 0 {
|
||||||
|
timer = time.NewTimer(batchTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
txs = append(txs, tx.Hash())
|
||||||
|
if len(txs) == batchSize {
|
||||||
|
broadcast()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue