Merge pull request #532 from nspcc-dev/fix/peer_count

Consensus fixes.
This commit is contained in:
Roman Khimov 2019-12-02 18:08:36 +03:00 committed by GitHub
commit abc0ec33bd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 1 deletions

View file

@ -136,11 +136,18 @@ func (m *recoveryMessage) AddPayload(p payload.ConsensusPayload) {
switch p.Type() { switch p.Type() {
case payload.PrepareRequestType: case payload.PrepareRequestType:
m.prepareRequest = p.GetPrepareRequest().(*prepareRequest) m.prepareRequest = p.GetPrepareRequest().(*prepareRequest)
h := p.Hash()
m.preparationHash = &h
case payload.PrepareResponseType: case payload.PrepareResponseType:
m.preparationPayloads = append(m.preparationPayloads, &preparationCompact{ m.preparationPayloads = append(m.preparationPayloads, &preparationCompact{
ValidatorIndex: p.ValidatorIndex(), ValidatorIndex: p.ValidatorIndex(),
InvocationScript: p.(*Payload).Witness.InvocationScript, InvocationScript: p.(*Payload).Witness.InvocationScript,
}) })
if m.preparationHash == nil {
h := p.GetPrepareResponse().PreparationHash()
m.preparationHash = &h
}
case payload.ChangeViewType: case payload.ChangeViewType:
m.changeViewPayloads = append(m.changeViewPayloads, &changeViewCompact{ m.changeViewPayloads = append(m.changeViewPayloads, &changeViewCompact{
ValidatorIndex: p.ValidatorIndex(), ValidatorIndex: p.ValidatorIndex(),

View file

@ -0,0 +1,80 @@
package consensus
import (
"testing"
"github.com/CityOfZion/neo-go/pkg/util"
"github.com/nspcc-dev/dbft/payload"
"github.com/stretchr/testify/require"
)
func TestRecoveryMessage_Setters(t *testing.T) {
r := &recoveryMessage{}
p := new(Payload)
p.SetType(payload.RecoveryMessageType)
p.SetPayload(r)
req := &prepareRequest{
timestamp: 87,
nonce: 321,
transactionHashes: []util.Uint256{{1}},
minerTx: *newMinerTx(123),
nextConsensus: util.Uint160{1, 2},
}
p1 := new(Payload)
p1.SetType(payload.PrepareRequestType)
p1.SetPayload(req)
t.Run("prepare response is added", func(t *testing.T) {
p2 := new(Payload)
p2.SetType(payload.PrepareResponseType)
p2.SetPayload(&prepareResponse{
preparationHash: p1.Hash(),
})
r.AddPayload(p2)
require.NotNil(t, r.PreparationHash())
require.Equal(t, p1.Hash(), *r.PreparationHash())
ps := r.GetPrepareResponses(p)
require.Len(t, ps, 1)
require.Equal(t, p2, ps[0])
})
t.Run("prepare request is added", func(t *testing.T) {
pr := r.GetPrepareRequest(p)
require.Nil(t, pr)
r.AddPayload(p1)
pr = r.GetPrepareRequest(p)
require.NotNil(t, pr)
require.Equal(t, p1, pr)
})
t.Run("change view is added", func(t *testing.T) {
p3 := new(Payload)
p3.SetType(payload.ChangeViewType)
p3.SetPayload(&changeView{
newViewNumber: 1,
timestamp: 12345,
})
r.AddPayload(p3)
ps := r.GetChangeViews(p)
require.Len(t, ps, 1)
require.Equal(t, p3, ps[0])
})
t.Run("commit is added", func(t *testing.T) {
p4 := new(Payload)
p4.SetType(payload.CommitType)
p4.SetPayload(randomMessage(t, commitType))
r.AddPayload(p4)
ps := r.GetCommits(p)
require.Len(t, ps, 1)
require.Equal(t, p4, ps[0])
})
}

View file

@ -253,7 +253,7 @@ func (s *Server) tryStartConsensus() {
return return
} }
if s.PeerCount() >= s.MinPeers { if s.HandshakedPeersCount() >= s.MinPeers {
log.Info("minimum amount of peers were connected to") log.Info("minimum amount of peers were connected to")
if s.connected.CAS(false, true) { if s.connected.CAS(false, true) {
s.consensus.Start() s.consensus.Start()
@ -282,6 +282,23 @@ func (s *Server) PeerCount() int {
return len(s.peers) return len(s.peers)
} }
// HandshakedPeersCount returns the number of connected peers
// which have already performed handshake.
func (s *Server) HandshakedPeersCount() int {
s.lock.RLock()
defer s.lock.RUnlock()
var count int
for p := range s.peers {
if p.Handshaked() {
count++
}
}
return count
}
// startProtocol starts a long running background loop that interacts // startProtocol starts a long running background loop that interacts
// every ProtoTickInterval with the peer. // every ProtoTickInterval with the peer.
func (s *Server) startProtocol(p Peer) { func (s *Server) startProtocol(p Peer) {