diff --git a/pkg/network/server.go b/pkg/network/server.go index 2c7c467c6..ecbf213f6 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -85,8 +85,7 @@ type ( transactions chan *transaction.Transaction - consensusStarted *atomic.Bool - canHandleExtens *atomic.Bool + syncReached *atomic.Bool oracle *oracle.Oracle stateRoot stateroot.Service @@ -132,8 +131,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai register: make(chan Peer), unregister: make(chan peerDrop), peers: make(map[Peer]bool), - consensusStarted: atomic.NewBool(false), - canHandleExtens: atomic.NewBool(false), + syncReached: atomic.NewBool(false), extensiblePool: extpool.New(chain), log: log, transactions: make(chan *transaction.Transaction, 64), @@ -168,8 +166,8 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enable") } s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) { - if !s.consensusStarted.Load() { - s.tryStartConsensus() + if !s.syncReached.Load() { + s.tryStartServices() } }) @@ -263,7 +261,7 @@ func (s *Server) Start(errChan chan error) { zap.Uint32("blockHeight", s.chain.BlockHeight()), zap.Uint32("headerHeight", s.chain.HeaderHeight())) - s.tryStartConsensus() + s.tryStartServices() s.initStaleMemPools() go s.broadcastTxLoop() @@ -289,9 +287,7 @@ func (s *Server) Shutdown() { s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.transport.Close() s.discovery.Close() - if s.consensusStarted.Load() { - s.consensus.Shutdown() - } + s.consensus.Shutdown() for p := range s.Peers() { p.Disconnect(errServerShutdown) } @@ -443,14 +439,14 @@ func (s *Server) runProto() { } } -func (s *Server) tryStartConsensus() { - if s.Wallet == nil || s.consensusStarted.Load() { +func (s *Server) tryStartServices() { + if s.syncReached.Load() { return } - if s.IsInSync() { - s.log.Info("node reached synchronized state, starting consensus") - if s.consensusStarted.CAS(false, true) { + if s.IsInSync() && s.syncReached.CAS(false, true) { + s.log.Info("node reached synchronized state, starting services") + if s.Wallet != nil { s.consensus.Start() } } @@ -811,11 +807,8 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error // handleExtensibleCmd processes received extensible payload. func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { - if !s.canHandleExtens.Load() { - if !s.IsInSync() { - return nil - } - s.canHandleExtens.Store(true) + if !s.syncReached.Load() { + return nil } ok, err := s.extensiblePool.Add(e) if err != nil { @@ -1049,7 +1042,7 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { } go peer.StartProtocol() - s.tryStartConsensus() + s.tryStartServices() default: return fmt.Errorf("received '%s' during handshake", msg.Command.String()) } diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 3f3e589ff..cf20c9a09 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -105,7 +105,7 @@ func TestServerStartAndShutdown(t *testing.T) { <-ch require.True(t, s.transport.(*fakeTransp).closed.Load()) - require.False(t, s.consensus.(*fakeConsensus).stopped.Load()) + require.True(t, s.consensus.(*fakeConsensus).stopped.Load()) err, ok := p.droppedWith.Load().(error) require.True(t, ok) require.True(t, errors.Is(err, errServerShutdown)) @@ -403,6 +403,8 @@ func TestConsensus(t *testing.T) { atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4) p := newLocalPeer(t, s) p.handshaked = true + s.register <- p + require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) newConsensusMessage := func(start, end uint32) *Message { pl := payload.NewExtensible()