diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 23c28ae88..b7722501d 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -42,6 +42,9 @@ type Service interface { OnTransaction(tx *transaction.Transaction) // GetPayload returns Payload with specified hash if it is present in the local cache. GetPayload(h util.Uint256) *Payload + // OnNewBlock notifies consensus service that there is a new block in + // the chain (without explicitly passing it to the service). + OnNewBlock() } type service struct { @@ -57,6 +60,9 @@ type service struct { // everything in single thread. messages chan Payload transactions chan *transaction.Transaction + // blockEvents is used to pass a new block event to the consensus + // process. + blockEvents chan struct{} lastProposal []util.Uint256 wallet *wallet.Wallet } @@ -101,6 +107,7 @@ func NewService(cfg Config) (Service, error) { messages: make(chan Payload, 100), transactions: make(chan *transaction.Transaction, 100), + blockEvents: make(chan struct{}, 1), } if cfg.Wallet == nil { @@ -168,14 +175,7 @@ func (s *service) eventLoop() { s.log.Debug("timer fired", zap.Uint32("height", hv.Height), zap.Uint("view", uint(hv.View))) - if s.Chain.BlockHeight() >= s.dbft.BlockIndex { - s.log.Debug("chain already advanced", - zap.Uint32("dbft index", s.dbft.BlockIndex), - zap.Uint32("chain index", s.Chain.BlockHeight())) - s.dbft.InitializeConsensus(0) - } else { - s.dbft.OnTimeout(hv) - } + s.dbft.OnTimeout(hv) case msg := <-s.messages: fields := []zap.Field{ zap.Uint16("from", msg.validatorIndex), @@ -204,6 +204,11 @@ func (s *service) eventLoop() { s.dbft.OnReceive(&msg) case tx := <-s.transactions: s.dbft.OnTransaction(tx) + case <-s.blockEvents: + s.log.Debug("new block in the chain", + zap.Uint32("dbft index", s.dbft.BlockIndex), + zap.Uint32("chain index", s.Chain.BlockHeight())) + s.dbft.InitializeConsensus(0) } } } @@ -276,6 +281,20 @@ func (s *service) OnTransaction(tx *transaction.Transaction) { } } +// OnNewBlock notifies consensus process that there is a new block in the chain +// and dbft should probably be reinitialized. +func (s *service) OnNewBlock() { + if s.dbft != nil { + // If there is something in the queue already, the second + // consecutive event doesn't make much sense (reinitializing + // dbft twice doesn't improve it in any way). + select { + case s.blockEvents <- struct{}{}: + default: + } + } +} + // GetPayload returns payload stored in cache. func (s *service) GetPayload(h util.Uint256) *Payload { p := s.cache.Get(h) diff --git a/pkg/network/server.go b/pkg/network/server.go index f238df365..0c8bdee4b 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -99,7 +99,11 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* log: log, } s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) { - s.tryStartConsensus() + if s.consensusStarted.Load() { + s.consensus.OnNewBlock() + } else { + s.tryStartConsensus() + } s.relayBlock(b) })