network: replace consensusStarted/canHandleExtens with syncReached flag

They're essentially the same.
This commit is contained in:
Roman Khimov 2021-04-02 12:55:56 +03:00
parent a954821b98
commit 690a1db589
2 changed files with 17 additions and 22 deletions

View file

@ -85,8 +85,7 @@ type (
transactions chan *transaction.Transaction transactions chan *transaction.Transaction
consensusStarted *atomic.Bool syncReached *atomic.Bool
canHandleExtens *atomic.Bool
oracle *oracle.Oracle oracle *oracle.Oracle
stateRoot stateroot.Service stateRoot stateroot.Service
@ -132,8 +131,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
register: make(chan Peer), register: make(chan Peer),
unregister: make(chan peerDrop), unregister: make(chan peerDrop),
peers: make(map[Peer]bool), peers: make(map[Peer]bool),
consensusStarted: atomic.NewBool(false), syncReached: atomic.NewBool(false),
canHandleExtens: atomic.NewBool(false),
extensiblePool: extpool.New(chain), extensiblePool: extpool.New(chain),
log: log, log: log,
transactions: make(chan *transaction.Transaction, 64), transactions: make(chan *transaction.Transaction, 64),
@ -168,8 +166,8 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enable") return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enable")
} }
s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) { s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) {
if !s.consensusStarted.Load() { if !s.syncReached.Load() {
s.tryStartConsensus() s.tryStartServices()
} }
}) })
@ -263,7 +261,7 @@ func (s *Server) Start(errChan chan error) {
zap.Uint32("blockHeight", s.chain.BlockHeight()), zap.Uint32("blockHeight", s.chain.BlockHeight()),
zap.Uint32("headerHeight", s.chain.HeaderHeight())) zap.Uint32("headerHeight", s.chain.HeaderHeight()))
s.tryStartConsensus() s.tryStartServices()
s.initStaleMemPools() s.initStaleMemPools()
go s.broadcastTxLoop() go s.broadcastTxLoop()
@ -289,9 +287,7 @@ func (s *Server) Shutdown() {
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
s.transport.Close() s.transport.Close()
s.discovery.Close() s.discovery.Close()
if s.consensusStarted.Load() {
s.consensus.Shutdown() s.consensus.Shutdown()
}
for p := range s.Peers() { for p := range s.Peers() {
p.Disconnect(errServerShutdown) p.Disconnect(errServerShutdown)
} }
@ -443,14 +439,14 @@ func (s *Server) runProto() {
} }
} }
func (s *Server) tryStartConsensus() { func (s *Server) tryStartServices() {
if s.Wallet == nil || s.consensusStarted.Load() { if s.syncReached.Load() {
return return
} }
if s.IsInSync() { if s.IsInSync() && s.syncReached.CAS(false, true) {
s.log.Info("node reached synchronized state, starting consensus") s.log.Info("node reached synchronized state, starting services")
if s.consensusStarted.CAS(false, true) { if s.Wallet != nil {
s.consensus.Start() s.consensus.Start()
} }
} }
@ -811,12 +807,9 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error
// handleExtensibleCmd processes received extensible payload. // handleExtensibleCmd processes received extensible payload.
func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { func (s *Server) handleExtensibleCmd(e *payload.Extensible) error {
if !s.canHandleExtens.Load() { if !s.syncReached.Load() {
if !s.IsInSync() {
return nil return nil
} }
s.canHandleExtens.Store(true)
}
ok, err := s.extensiblePool.Add(e) ok, err := s.extensiblePool.Add(e)
if err != nil { if err != nil {
return err return err
@ -1049,7 +1042,7 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
} }
go peer.StartProtocol() go peer.StartProtocol()
s.tryStartConsensus() s.tryStartServices()
default: default:
return fmt.Errorf("received '%s' during handshake", msg.Command.String()) return fmt.Errorf("received '%s' during handshake", msg.Command.String())
} }

View file

@ -105,7 +105,7 @@ func TestServerStartAndShutdown(t *testing.T) {
<-ch <-ch
require.True(t, s.transport.(*fakeTransp).closed.Load()) require.True(t, s.transport.(*fakeTransp).closed.Load())
require.False(t, s.consensus.(*fakeConsensus).stopped.Load()) require.True(t, s.consensus.(*fakeConsensus).stopped.Load())
err, ok := p.droppedWith.Load().(error) err, ok := p.droppedWith.Load().(error)
require.True(t, ok) require.True(t, ok)
require.True(t, errors.Is(err, errServerShutdown)) require.True(t, errors.Is(err, errServerShutdown))
@ -403,6 +403,8 @@ func TestConsensus(t *testing.T) {
atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4) atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4)
p := newLocalPeer(t, s) p := newLocalPeer(t, s)
p.handshaked = true p.handshaked = true
s.register <- p
require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10)
newConsensusMessage := func(start, end uint32) *Message { newConsensusMessage := func(start, end uint32) *Message {
pl := payload.NewExtensible() pl := payload.NewExtensible()