consensus/network: reinit dbft after block addition

Don't stall on some height if everyone else have moved up already. Fix #673.
This commit is contained in:
Roman Khimov 2020-02-17 16:20:04 +03:00
parent c5d54e9992
commit 37c48b00b4
2 changed files with 32 additions and 9 deletions

View file

@ -42,6 +42,9 @@ type Service interface {
OnTransaction(tx *transaction.Transaction) OnTransaction(tx *transaction.Transaction)
// GetPayload returns Payload with specified hash if it is present in the local cache. // GetPayload returns Payload with specified hash if it is present in the local cache.
GetPayload(h util.Uint256) *Payload GetPayload(h util.Uint256) *Payload
// OnNewBlock notifies consensus service that there is a new block in
// the chain (without explicitly passing it to the service).
OnNewBlock()
} }
type service struct { type service struct {
@ -57,6 +60,9 @@ type service struct {
// everything in single thread. // everything in single thread.
messages chan Payload messages chan Payload
transactions chan *transaction.Transaction transactions chan *transaction.Transaction
// blockEvents is used to pass a new block event to the consensus
// process.
blockEvents chan struct{}
lastProposal []util.Uint256 lastProposal []util.Uint256
wallet *wallet.Wallet wallet *wallet.Wallet
} }
@ -101,6 +107,7 @@ func NewService(cfg Config) (Service, error) {
messages: make(chan Payload, 100), messages: make(chan Payload, 100),
transactions: make(chan *transaction.Transaction, 100), transactions: make(chan *transaction.Transaction, 100),
blockEvents: make(chan struct{}, 1),
} }
if cfg.Wallet == nil { if cfg.Wallet == nil {
@ -168,14 +175,7 @@ func (s *service) eventLoop() {
s.log.Debug("timer fired", s.log.Debug("timer fired",
zap.Uint32("height", hv.Height), zap.Uint32("height", hv.Height),
zap.Uint("view", uint(hv.View))) zap.Uint("view", uint(hv.View)))
if s.Chain.BlockHeight() >= s.dbft.BlockIndex { s.dbft.OnTimeout(hv)
s.log.Debug("chain already advanced",
zap.Uint32("dbft index", s.dbft.BlockIndex),
zap.Uint32("chain index", s.Chain.BlockHeight()))
s.dbft.InitializeConsensus(0)
} else {
s.dbft.OnTimeout(hv)
}
case msg := <-s.messages: case msg := <-s.messages:
fields := []zap.Field{ fields := []zap.Field{
zap.Uint16("from", msg.validatorIndex), zap.Uint16("from", msg.validatorIndex),
@ -204,6 +204,11 @@ func (s *service) eventLoop() {
s.dbft.OnReceive(&msg) s.dbft.OnReceive(&msg)
case tx := <-s.transactions: case tx := <-s.transactions:
s.dbft.OnTransaction(tx) s.dbft.OnTransaction(tx)
case <-s.blockEvents:
s.log.Debug("new block in the chain",
zap.Uint32("dbft index", s.dbft.BlockIndex),
zap.Uint32("chain index", s.Chain.BlockHeight()))
s.dbft.InitializeConsensus(0)
} }
} }
} }
@ -276,6 +281,20 @@ func (s *service) OnTransaction(tx *transaction.Transaction) {
} }
} }
// OnNewBlock notifies consensus process that there is a new block in the chain
// and dbft should probably be reinitialized.
func (s *service) OnNewBlock() {
if s.dbft != nil {
// If there is something in the queue already, the second
// consecutive event doesn't make much sense (reinitializing
// dbft twice doesn't improve it in any way).
select {
case s.blockEvents <- struct{}{}:
default:
}
}
}
// GetPayload returns payload stored in cache. // GetPayload returns payload stored in cache.
func (s *service) GetPayload(h util.Uint256) *Payload { func (s *service) GetPayload(h util.Uint256) *Payload {
p := s.cache.Get(h) p := s.cache.Get(h)

View file

@ -99,7 +99,11 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
log: log, log: log,
} }
s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) { s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) {
s.tryStartConsensus() if s.consensusStarted.Load() {
s.consensus.OnNewBlock()
} else {
s.tryStartConsensus()
}
s.relayBlock(b) s.relayBlock(b)
}) })