diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 6452931ad..8f8bbd7d4 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -42,9 +42,6 @@ type Service interface { OnTransaction(tx *transaction.Transaction) // GetPayload returns Payload with specified hash if it is present in the local cache. GetPayload(h util.Uint256) *Payload - // OnNewBlock notifies consensus service that there is a new block in - // the chain (without explicitly passing it to the service). - OnNewBlock() } type service struct { @@ -62,7 +59,7 @@ type service struct { transactions chan *transaction.Transaction // blockEvents is used to pass a new block event to the consensus // process. - blockEvents chan struct{} + blockEvents chan *coreb.Block lastProposal []util.Uint256 wallet *wallet.Wallet } @@ -107,7 +104,7 @@ func NewService(cfg Config) (Service, error) { messages: make(chan Payload, 100), transactions: make(chan *transaction.Transaction, 100), - blockEvents: make(chan struct{}, 1), + blockEvents: make(chan *coreb.Block, 1), } if cfg.Wallet == nil { @@ -164,7 +161,7 @@ var ( func (s *service) Start() { s.dbft.Start() - + s.Chain.SubscribeForBlocks(s.blockEvents) go s.eventLoop() } @@ -204,11 +201,14 @@ func (s *service) eventLoop() { s.dbft.OnReceive(&msg) case tx := <-s.transactions: s.dbft.OnTransaction(tx) - case <-s.blockEvents: - s.log.Debug("new block in the chain", - zap.Uint32("dbft index", s.dbft.BlockIndex), - zap.Uint32("chain index", s.Chain.BlockHeight())) - s.dbft.InitializeConsensus(0) + case b := <-s.blockEvents: + // We also receive our own blocks here, so check for index. + if b.Index >= s.dbft.BlockIndex { + s.log.Debug("new block in the chain", + zap.Uint32("dbft index", s.dbft.BlockIndex), + zap.Uint32("chain index", s.Chain.BlockHeight())) + s.dbft.InitializeConsensus(0) + } } } } @@ -287,20 +287,6 @@ func (s *service) OnTransaction(tx *transaction.Transaction) { } } -// OnNewBlock notifies consensus process that there is a new block in the chain -// and dbft should probably be reinitialized. -func (s *service) OnNewBlock() { - if s.dbft != nil { - // If there is something in the queue already, the second - // consecutive event doesn't make much sense (reinitializing - // dbft twice doesn't improve it in any way). - select { - case s.blockEvents <- struct{}{}: - default: - } - } -} - // GetPayload returns payload stored in cache. func (s *service) GetPayload(h util.Uint256) *Payload { p := s.cache.Get(h) diff --git a/pkg/network/server.go b/pkg/network/server.go index 1ecc7782d..0031ec877 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -103,9 +103,7 @@ func NewServer(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Lo transactions: make(chan *transaction.Transaction, 64), } s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) { - if s.consensusStarted.Load() { - s.consensus.OnNewBlock() - } else { + if !s.consensusStarted.Load() { s.tryStartConsensus() } s.relayBlock(b)