Merge pull request #641 from nspcc-dev/consensus-update
Consensus update
This commit is contained in:
commit
70c22ebc7b
2 changed files with 40 additions and 4 deletions
2
go.mod
2
go.mod
|
@ -7,7 +7,7 @@ require (
|
||||||
github.com/go-redis/redis v6.10.2+incompatible
|
github.com/go-redis/redis v6.10.2+incompatible
|
||||||
github.com/go-yaml/yaml v2.1.0+incompatible
|
github.com/go-yaml/yaml v2.1.0+incompatible
|
||||||
github.com/mr-tron/base58 v1.1.2
|
github.com/mr-tron/base58 v1.1.2
|
||||||
github.com/nspcc-dev/dbft v0.0.0-20200130105505-02c208d154bf
|
github.com/nspcc-dev/dbft v0.0.0-20200203121303-549ecf2daaa1
|
||||||
github.com/nspcc-dev/rfc6979 v0.2.0
|
github.com/nspcc-dev/rfc6979 v0.2.0
|
||||||
github.com/pkg/errors v0.8.1
|
github.com/pkg/errors v0.8.1
|
||||||
github.com/prometheus/client_golang v1.2.1
|
github.com/prometheus/client_golang v1.2.1
|
||||||
|
|
|
@ -168,9 +168,39 @@ func (s *service) eventLoop() {
|
||||||
s.log.Debug("timer fired",
|
s.log.Debug("timer fired",
|
||||||
zap.Uint32("height", hv.Height),
|
zap.Uint32("height", hv.Height),
|
||||||
zap.Uint("view", uint(hv.View)))
|
zap.Uint("view", uint(hv.View)))
|
||||||
s.dbft.OnTimeout(hv)
|
if s.Chain.BlockHeight() >= s.dbft.BlockIndex {
|
||||||
|
s.log.Debug("chain already advanced",
|
||||||
|
zap.Uint32("dbft index", s.dbft.BlockIndex),
|
||||||
|
zap.Uint32("chain index", s.Chain.BlockHeight()))
|
||||||
|
s.dbft.InitializeConsensus(0)
|
||||||
|
} else {
|
||||||
|
s.dbft.OnTimeout(hv)
|
||||||
|
}
|
||||||
case msg := <-s.messages:
|
case msg := <-s.messages:
|
||||||
s.log.Debug("received message", zap.Uint16("from", msg.validatorIndex))
|
fields := []zap.Field{
|
||||||
|
zap.Uint16("from", msg.validatorIndex),
|
||||||
|
zap.Stringer("type", msg.Type()),
|
||||||
|
}
|
||||||
|
|
||||||
|
if msg.Type() == payload.RecoveryMessageType {
|
||||||
|
rec := msg.GetRecoveryMessage().(*recoveryMessage)
|
||||||
|
if rec.preparationHash == nil {
|
||||||
|
req := rec.GetPrepareRequest(&msg, s.dbft.Validators, uint16(s.dbft.PrimaryIndex))
|
||||||
|
if req != nil {
|
||||||
|
h := req.Hash()
|
||||||
|
rec.preparationHash = &h
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fields = append(fields,
|
||||||
|
zap.Int("#preparation", len(rec.preparationPayloads)),
|
||||||
|
zap.Int("#commit", len(rec.commitPayloads)),
|
||||||
|
zap.Int("#changeview", len(rec.changeViewPayloads)),
|
||||||
|
zap.Bool("#request", rec.prepareRequest != nil),
|
||||||
|
zap.Bool("#hash", rec.preparationHash != nil))
|
||||||
|
}
|
||||||
|
|
||||||
|
s.log.Debug("received message", fields...)
|
||||||
s.dbft.OnReceive(&msg)
|
s.dbft.OnReceive(&msg)
|
||||||
case tx := <-s.transactions:
|
case tx := <-s.transactions:
|
||||||
s.dbft.OnTransaction(tx)
|
s.dbft.OnTransaction(tx)
|
||||||
|
@ -212,7 +242,12 @@ func (s *service) getKeyPair(pubs []crypto.PublicKey) (int, crypto.PrivateKey, c
|
||||||
|
|
||||||
// OnPayload handles Payload receive.
|
// OnPayload handles Payload receive.
|
||||||
func (s *service) OnPayload(cp *Payload) {
|
func (s *service) OnPayload(cp *Payload) {
|
||||||
if !s.validatePayload(cp) || s.cache.Has(cp.Hash()) {
|
log := s.log.With(zap.Stringer("hash", cp.Hash()), zap.Stringer("type", cp.Type()))
|
||||||
|
if !s.validatePayload(cp) {
|
||||||
|
log.Debug("can't validate payload")
|
||||||
|
return
|
||||||
|
} else if s.cache.Has(cp.Hash()) {
|
||||||
|
log.Debug("payload is already in cache")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,6 +255,7 @@ func (s *service) OnPayload(cp *Payload) {
|
||||||
s.cache.Add(cp)
|
s.cache.Add(cp)
|
||||||
|
|
||||||
if s.dbft == nil {
|
if s.dbft == nil {
|
||||||
|
log.Debug("dbft is nil")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue