diff --git a/go.mod b/go.mod index e5730fc1f..f808f6304 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/go-redis/redis v6.10.2+incompatible github.com/go-yaml/yaml v2.1.0+incompatible github.com/mr-tron/base58 v1.1.2 - github.com/nspcc-dev/dbft v0.0.0-20200130105505-02c208d154bf + github.com/nspcc-dev/dbft v0.0.0-20200203121303-549ecf2daaa1 github.com/nspcc-dev/rfc6979 v0.2.0 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.2.1 diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index acbfa49d6..96847d904 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -168,9 +168,39 @@ func (s *service) eventLoop() { s.log.Debug("timer fired", zap.Uint32("height", hv.Height), zap.Uint("view", uint(hv.View))) - s.dbft.OnTimeout(hv) + if s.Chain.BlockHeight() >= s.dbft.BlockIndex { + s.log.Debug("chain already advanced", + zap.Uint32("dbft index", s.dbft.BlockIndex), + zap.Uint32("chain index", s.Chain.BlockHeight())) + s.dbft.InitializeConsensus(0) + } else { + s.dbft.OnTimeout(hv) + } case msg := <-s.messages: - s.log.Debug("received message", zap.Uint16("from", msg.validatorIndex)) + fields := []zap.Field{ + zap.Uint16("from", msg.validatorIndex), + zap.Stringer("type", msg.Type()), + } + + if msg.Type() == payload.RecoveryMessageType { + rec := msg.GetRecoveryMessage().(*recoveryMessage) + if rec.preparationHash == nil { + req := rec.GetPrepareRequest(&msg, s.dbft.Validators, uint16(s.dbft.PrimaryIndex)) + if req != nil { + h := req.Hash() + rec.preparationHash = &h + } + } + + fields = append(fields, + zap.Int("#preparation", len(rec.preparationPayloads)), + zap.Int("#commit", len(rec.commitPayloads)), + zap.Int("#changeview", len(rec.changeViewPayloads)), + zap.Bool("#request", rec.prepareRequest != nil), + zap.Bool("#hash", rec.preparationHash != nil)) + } + + s.log.Debug("received message", fields...) s.dbft.OnReceive(&msg) case tx := <-s.transactions: s.dbft.OnTransaction(tx) @@ -212,7 +242,12 @@ func (s *service) getKeyPair(pubs []crypto.PublicKey) (int, crypto.PrivateKey, c // OnPayload handles Payload receive. func (s *service) OnPayload(cp *Payload) { - if !s.validatePayload(cp) || s.cache.Has(cp.Hash()) { + log := s.log.With(zap.Stringer("hash", cp.Hash()), zap.Stringer("type", cp.Type())) + if !s.validatePayload(cp) { + log.Debug("can't validate payload") + return + } else if s.cache.Has(cp.Hash()) { + log.Debug("payload is already in cache") return } @@ -220,6 +255,7 @@ func (s *service) OnPayload(cp *Payload) { s.cache.Add(cp) if s.dbft == nil { + log.Debug("dbft is nil") return }