consensus: prevent synchronization stalls

When CN is not up to date with the network is synchonizes blocks first and
only then starts consensus process. But while synchronizing it receives
consensus payloads and tries to process them even though messages reader
routine is not started yet. This leads to lots of goroutines waiting to send
their messages:

Jun 25 23:55:53 nodoka neo-go[32733]: goroutine 1639919 [chan send, 4 minutes]:
Jun 25 23:55:53 nodoka neo-go[32733]: github.com/nspcc-dev/neo-go/pkg/consensus.(*service).OnPayload(0xc0000ecb40, 0xc005bd7680)
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/consensus/consensus.go:329 +0x31b
Jun 25 23:55:53 nodoka neo-go[32733]: github.com/nspcc-dev/neo-go/pkg/network.(*Server).handleConsensusCmd(...)
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/network/server.go:687
Jun 25 23:55:53 nodoka neo-go[32733]: github.com/nspcc-dev/neo-go/pkg/network.(*Server).handleMessage(0xc0000ba160, 0x1053260, 0xc00507d170, 0xc005bdd560, 0x0, 0x0)
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/network/server.go:806 +0xd58
Jun 25 23:55:53 nodoka neo-go[32733]: github.com/nspcc-dev/neo-go/pkg/network.(*TCPPeer).handleConn(0xc00507d170)
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/network/tcp_peer.go:160 +0x294
Jun 25 23:55:53 nodoka neo-go[32733]: created by github.com/nspcc-dev/neo-go/pkg/network.(*TCPTransport).Dial
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/network/tcp_transport.go:38 +0x1ad
Jun 25 23:55:53 nodoka neo-go[32733]: goroutine 1639181 [chan send, 10 minutes]:
Jun 25 23:55:53 nodoka neo-go[32733]: github.com/nspcc-dev/neo-go/pkg/consensus.(*service).OnPayload(0xc0000ecb40, 0xc013bb6600)
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/consensus/consensus.go:329 +0x31b
Jun 25 23:55:53 nodoka neo-go[32733]: github.com/nspcc-dev/neo-go/pkg/network.(*Server).handleConsensusCmd(...)
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/network/server.go:687
Jun 25 23:55:53 nodoka neo-go[32733]: github.com/nspcc-dev/neo-go/pkg/network.(*Server).handleMessage(0xc0000ba160, 0x1053260, 0xc01361ee10, 0xc01342c780, 0x0, 0x0)
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/network/server.go:806 +0xd58
Jun 25 23:55:53 nodoka neo-go[32733]: github.com/nspcc-dev/neo-go/pkg/network.(*TCPPeer).handleConn(0xc01361ee10)
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/network/tcp_peer.go:160 +0x294
Jun 25 23:55:53 nodoka neo-go[32733]: created by github.com/nspcc-dev/neo-go/pkg/network.(*TCPTransport).Dial
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/network/tcp_transport.go:38 +0x1ad
Jun 25 23:55:53 nodoka neo-go[32733]: goroutine 39454 [chan send, 32 minutes]:
Jun 25 23:55:53 nodoka neo-go[32733]: github.com/nspcc-dev/neo-go/pkg/consensus.(*service).OnPayload(0xc0000ecb40, 0xc014fea680)
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/consensus/consensus.go:329 +0x31b
Jun 25 23:55:53 nodoka neo-go[32733]: github.com/nspcc-dev/neo-go/pkg/network.(*Server).handleConsensusCmd(...)
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/network/server.go:687
Jun 25 23:55:53 nodoka neo-go[32733]: github.com/nspcc-dev/neo-go/pkg/network.(*Server).handleMessage(0xc0000ba160, 0x1053260, 0xc0140b2ea0, 0xc014fe0ed0, 0x0, 0x0)
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/network/server.go:806 +0xd58
Jun 25 23:55:53 nodoka neo-go[32733]: github.com/nspcc-dev/neo-go/pkg/network.(*TCPPeer).handleConn(0xc0140b2ea0)
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/network/tcp_peer.go:160 +0x294
Jun 25 23:55:53 nodoka neo-go[32733]: created by github.com/nspcc-dev/neo-go/pkg/network.(*TCPTransport).Dial
Jun 25 23:55:53 nodoka neo-go[32733]: #011/go/src/github.com/nspcc-dev/neo-go/pkg/network/tcp_transport.go:38 +0x1ad

Luckily it doesn't break synchronization completely as eventually connection
timers fire, the node breaks all connections, create new ones and these new
ones request blocks successfully until another consensus payload stalls them
too. In the end the node reaches synchronization, message processing loop
starts and releases all of these waiting goroutines, but it's better for us to
avoid this happening at all.

This also makes double-starting a no-op which is a nice property.
This commit is contained in:
Roman Khimov 2020-06-26 11:19:01 +03:00
parent 3a356aafef
commit a46c71f2de
2 changed files with 47 additions and 24 deletions

View file

@ -20,6 +20,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/vm/emit"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -63,6 +64,9 @@ type service struct {
lastProposal []util.Uint256 lastProposal []util.Uint256
wallet *wallet.Wallet wallet *wallet.Wallet
network netmode.Magic network netmode.Magic
// started is a flag set with Start method that runs an event handling
// goroutine.
started *atomic.Bool
} }
// Config is a configuration for consensus services. // Config is a configuration for consensus services.
@ -104,6 +108,7 @@ func NewService(cfg Config) (Service, error) {
transactions: make(chan *transaction.Transaction, 100), transactions: make(chan *transaction.Transaction, 100),
blockEvents: make(chan *coreb.Block, 1), blockEvents: make(chan *coreb.Block, 1),
network: cfg.Chain.GetConfig().Magic, network: cfg.Chain.GetConfig().Magic,
started: atomic.NewBool(false),
} }
if cfg.Wallet == nil { if cfg.Wallet == nil {
@ -143,6 +148,7 @@ func NewService(cfg Config) (Service, error) {
dbft.WithNewCommit(func() payload.Commit { return new(commit) }), dbft.WithNewCommit(func() payload.Commit { return new(commit) }),
dbft.WithNewRecoveryRequest(func() payload.RecoveryRequest { return new(recoveryRequest) }), dbft.WithNewRecoveryRequest(func() payload.RecoveryRequest { return new(recoveryRequest) }),
dbft.WithNewRecoveryMessage(func() payload.RecoveryMessage { return new(recoveryMessage) }), dbft.WithNewRecoveryMessage(func() payload.RecoveryMessage { return new(recoveryMessage) }),
dbft.WithVerifyPrepareRequest(srv.verifyRequest),
) )
if srv.dbft == nil { if srv.dbft == nil {
@ -169,10 +175,12 @@ func (s *service) newPayload() payload.ConsensusPayload {
} }
func (s *service) Start() { func (s *service) Start() {
if s.started.CAS(false, true) {
s.dbft.Start() s.dbft.Start()
s.Chain.SubscribeForBlocks(s.blockEvents) s.Chain.SubscribeForBlocks(s.blockEvents)
go s.eventLoop() go s.eventLoop()
} }
}
func (s *service) eventLoop() { func (s *service) eventLoop() {
for { for {
@ -267,8 +275,8 @@ func (s *service) OnPayload(cp *Payload) {
s.Config.Broadcast(cp) s.Config.Broadcast(cp)
s.cache.Add(cp) s.cache.Add(cp)
if s.dbft == nil { if s.dbft == nil || !s.started.Load() {
log.Debug("dbft is nil") log.Debug("dbft is inactive or not started yet")
return return
} }
@ -280,13 +288,6 @@ func (s *service) OnPayload(cp *Payload) {
} }
} }
// we use switch here because other payloads could be possibly added in future
switch cp.Type() {
case payload.PrepareRequestType:
req := cp.GetPrepareRequest().(*prepareRequest)
s.lastProposal = req.transactionHashes
}
s.messages <- *cp s.messages <- *cp
} }
@ -347,6 +348,14 @@ func (s *service) verifyBlock(b block.Block) bool {
return true return true
} }
func (s *service) verifyRequest(p payload.ConsensusPayload) error {
req := p.GetPrepareRequest().(*prepareRequest)
// Save lastProposal for getVerified().
s.lastProposal = req.transactionHashes
return nil
}
func (s *service) processBlock(b block.Block) { func (s *service) processBlock(b block.Block) {
bb := &b.(*neoBlock).Block bb := &b.(*neoBlock).Block
bb.Script = *(s.getBlockWitness(bb)) bb.Script = *(s.getBlockWitness(bb))

View file

@ -2,6 +2,7 @@ package consensus
import ( import (
"testing" "testing"
"time"
"github.com/nspcc-dev/dbft/block" "github.com/nspcc-dev/dbft/block"
"github.com/nspcc-dev/dbft/payload" "github.com/nspcc-dev/dbft/payload"
@ -39,6 +40,7 @@ func TestNewService(t *testing.T) {
func TestService_GetVerified(t *testing.T) { func TestService_GetVerified(t *testing.T) {
srv := newTestService(t) srv := newTestService(t)
srv.dbft.Start()
var txs []*transaction.Transaction var txs []*transaction.Transaction
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
tx := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.PUSH1)}, 0) tx := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.PUSH1)}, 0)
@ -52,22 +54,30 @@ func TestService_GetVerified(t *testing.T) {
hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()} hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()}
// Everyone sends a message.
for i := 0; i < 4; i++ {
p := new(Payload) p := new(Payload)
p.message = &message{} p.message = &message{}
// One PrepareRequest and three ChangeViews.
if i == 1 {
p.SetType(payload.PrepareRequestType) p.SetType(payload.PrepareRequestType)
tx := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.PUSH1)}, 0)
tx.Nonce = 999
p.SetPayload(&prepareRequest{transactionHashes: hashes}) p.SetPayload(&prepareRequest{transactionHashes: hashes})
p.SetValidatorIndex(1) } else {
p.SetType(payload.ChangeViewType)
p.SetPayload(&changeView{newViewNumber: 1, timestamp: uint32(time.Now().Unix())})
}
p.SetHeight(1)
p.SetValidatorIndex(uint16(i))
priv, _ := getTestValidator(1) priv, _ := getTestValidator(i)
require.NoError(t, p.Sign(priv)) require.NoError(t, p.Sign(priv))
srv.OnPayload(p) // Skip srv.OnPayload, because the service is not really started.
srv.dbft.OnReceive(p)
}
require.Equal(t, uint8(1), srv.dbft.ViewNumber)
require.Equal(t, hashes, srv.lastProposal) require.Equal(t, hashes, srv.lastProposal)
srv.dbft.ViewNumber = 1
t.Run("new transactions will be proposed in case of failure", func(t *testing.T) { t.Run("new transactions will be proposed in case of failure", func(t *testing.T) {
txx := srv.getVerifiedTx() txx := srv.getVerifiedTx()
require.Equal(t, 1, len(txx), "there is only 1 tx in mempool") require.Equal(t, 1, len(txx), "there is only 1 tx in mempool")
@ -157,6 +167,10 @@ func TestService_getTx(t *testing.T) {
func TestService_OnPayload(t *testing.T) { func TestService_OnPayload(t *testing.T) {
srv := newTestService(t) srv := newTestService(t)
// This test directly reads things from srv.messages that normally
// is read by internal goroutine started with Start(). So let's
// pretend we really did start already.
srv.started.Store(true)
priv, _ := getTestValidator(1) priv, _ := getTestValidator(1)
p := new(Payload) p := new(Payload)