diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 2ced9fca8..b6cb8cc07 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -244,6 +244,7 @@ func (s *service) newPrepareRequest() payload.PrepareRequest { func (s *service) Start() { if s.started.CAS(false, true) { + s.log.Info("starting consensus service") s.dbft.Start() s.Chain.SubscribeForBlocks(s.blockEvents) go s.eventLoop() @@ -252,8 +253,10 @@ func (s *service) Start() { // Shutdown implements Service interface. func (s *service) Shutdown() { - close(s.quit) - <-s.finished + if s.started.Load() { + close(s.quit) + <-s.finished + } } func (s *service) eventLoop() { diff --git a/pkg/core/stateroot_test.go b/pkg/core/stateroot_test.go index 2277fdfac..8d70ed45e 100644 --- a/pkg/core/stateroot_test.go +++ b/pkg/core/stateroot_test.go @@ -83,7 +83,7 @@ func TestStateRoot(t *testing.T) { defer os.RemoveAll(tmpDir) w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass") cfg := createStateRootConfig(w.Path(), "pass") - srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc) + srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc, nil) require.NoError(t, err) require.EqualValues(t, 0, srv.CurrentValidatedHeight()) r, err := srv.GetStateRoot(bc.BlockHeight()) @@ -153,7 +153,7 @@ func TestStateRootInitNonZeroHeight(t *testing.T) { defer os.RemoveAll(tmpDir) w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass") cfg := createStateRootConfig(w.Path(), "pass") - srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc) + srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc, nil) require.NoError(t, err) r, err := srv.GetStateRoot(2) require.NoError(t, err) @@ -199,17 +199,16 @@ func TestStateRootFull(t *testing.T) { h, pubs, accs := newMajorityMultisigWithGAS(t, 2) w := createAndWriteWallet(t, accs[1], path.Join(tmpDir, "wallet2"), "two") cfg := createStateRootConfig(w.Path(), "two") - srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc) - require.NoError(t, err) - srv.Run() - t.Cleanup(srv.Shutdown) var lastValidated atomic.Value var lastHeight atomic.Uint32 - srv.SetRelayCallback(func(ep *payload.Extensible) { + srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc, func(ep *payload.Extensible) { lastHeight.Store(ep.ValidBlockStart) lastValidated.Store(ep) }) + require.NoError(t, err) + srv.Run() + t.Cleanup(srv.Shutdown) bc.setNodesByRole(t, true, noderoles.StateValidator, pubs) transferTokenFromMultisigAccount(t, bc, h, bc.contracts.GAS.Hash, 1_0000_0000) diff --git a/pkg/network/server.go b/pkg/network/server.go index f53c4c97c..d384fdbcc 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -85,8 +85,7 @@ type ( transactions chan *transaction.Transaction - consensusStarted *atomic.Bool - canHandleExtens *atomic.Bool + syncReached *atomic.Bool oracle *oracle.Oracle stateRoot stateroot.Service @@ -132,8 +131,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai register: make(chan Peer), unregister: make(chan peerDrop), peers: make(map[Peer]bool), - consensusStarted: atomic.NewBool(false), - canHandleExtens: atomic.NewBool(false), + syncReached: atomic.NewBool(false), extensiblePool: extpool.New(chain), log: log, transactions: make(chan *transaction.Transaction, 64), @@ -168,8 +166,8 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enable") } s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) { - if !s.consensusStarted.Load() { - s.tryStartConsensus() + if !s.syncReached.Load() { + s.tryStartServices() } }) @@ -177,7 +175,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled") } - sr, err := stateroot.New(config.StateRootCfg, s.log, chain) + sr, err := stateroot.New(config.StateRootCfg, s.log, chain, s.handleNewPayload) if err != nil { return nil, fmt.Errorf("can't initialize StateRoot service: %w", err) } @@ -221,10 +219,6 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai s.consensus = srv - if config.StateRootCfg.Enabled { - s.stateRoot.SetRelayCallback(s.handleNewPayload) - } - if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", zap.Int("configured", s.MinPeers), @@ -267,20 +261,10 @@ func (s *Server) Start(errChan chan error) { zap.Uint32("blockHeight", s.chain.BlockHeight()), zap.Uint32("headerHeight", s.chain.HeaderHeight())) - s.tryStartConsensus() + s.tryStartServices() s.initStaleMemPools() go s.broadcastTxLoop() - if s.oracle != nil { - go s.oracle.Run() - } - if s.notaryModule != nil { - s.notaryRequestPool.RunSubscriptions() - go s.notaryModule.Run() - } - if s.StateRootCfg.Enabled { - s.stateRoot.Run() - } go s.relayBlocksLoop() go s.bQueue.run() go s.transport.Accept() @@ -293,9 +277,7 @@ func (s *Server) Shutdown() { s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.transport.Close() s.discovery.Close() - if s.consensusStarted.Load() { - s.consensus.Shutdown() - } + s.consensus.Shutdown() for p := range s.Peers() { p.Disconnect(errServerShutdown) } @@ -447,16 +429,26 @@ func (s *Server) runProto() { } } -func (s *Server) tryStartConsensus() { - if s.Wallet == nil || s.consensusStarted.Load() { +func (s *Server) tryStartServices() { + if s.syncReached.Load() { return } - if s.IsInSync() { - s.log.Info("node reached synchronized state, starting consensus") - if s.consensusStarted.CAS(false, true) { + if s.IsInSync() && s.syncReached.CAS(false, true) { + s.log.Info("node reached synchronized state, starting services") + if s.Wallet != nil { s.consensus.Start() } + if s.StateRootCfg.Enabled { + s.stateRoot.Run() + } + if s.oracle != nil { + go s.oracle.Run() + } + if s.notaryModule != nil { + s.notaryRequestPool.RunSubscriptions() + go s.notaryModule.Run() + } } } @@ -815,11 +807,8 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error // handleExtensibleCmd processes received extensible payload. func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { - if !s.canHandleExtens.Load() { - if !s.IsInSync() { - return nil - } - s.canHandleExtens.Store(true) + if !s.syncReached.Load() { + return nil } ok, err := s.extensiblePool.Add(e) if err != nil { @@ -1053,7 +1042,7 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { } go peer.StartProtocol() - s.tryStartConsensus() + s.tryStartServices() default: return fmt.Errorf("received '%s' during handshake", msg.Command.String()) } diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 3f3e589ff..cf20c9a09 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -105,7 +105,7 @@ func TestServerStartAndShutdown(t *testing.T) { <-ch require.True(t, s.transport.(*fakeTransp).closed.Load()) - require.False(t, s.consensus.(*fakeConsensus).stopped.Load()) + require.True(t, s.consensus.(*fakeConsensus).stopped.Load()) err, ok := p.droppedWith.Load().(error) require.True(t, ok) require.True(t, errors.Is(err, errServerShutdown)) @@ -403,6 +403,8 @@ func TestConsensus(t *testing.T) { atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4) p := newLocalPeer(t, s) p.handshaked = true + s.register <- p + require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) newConsensusMessage := func(start, end uint32) *Message { pl := payload.NewExtensible() diff --git a/pkg/services/helpers/rpcbroadcaster/client.go b/pkg/services/helpers/rpcbroadcaster/client.go index 95cbcffa9..3ee04afdf 100644 --- a/pkg/services/helpers/rpcbroadcaster/client.go +++ b/pkg/services/helpers/rpcbroadcaster/client.go @@ -37,7 +37,7 @@ func (r *RPCBroadcaster) NewRPCClient(addr string, method SendMethod, timeout ti func (c *RPCClient) run() { // We ignore error as not every node can be available on startup. - c.client, _ = client.New(context.Background(), "http://"+c.addr, client.Options{ + c.client, _ = client.New(context.Background(), c.addr, client.Options{ DialTimeout: c.sendTimeout, RequestTimeout: c.sendTimeout, }) @@ -48,7 +48,7 @@ func (c *RPCClient) run() { case ps := <-c.responses: if c.client == nil { var err error - c.client, err = client.New(context.Background(), "http://"+c.addr, client.Options{ + c.client, err = client.New(context.Background(), c.addr, client.Options{ DialTimeout: c.sendTimeout, RequestTimeout: c.sendTimeout, }) diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 66d2eb5eb..a2f5d7eb5 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -117,6 +117,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu // Run runs Notary module and should be called in a separate goroutine. func (n *Notary) Run() { + n.Config.Log.Info("starting notary service") n.Config.Chain.SubscribeForBlocks(n.blocksCh) n.mp.SubscribeForTransactions(n.reqCh) for { diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 5a6fe8e2c..8cfb3b424 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -162,6 +162,7 @@ func (o *Oracle) Shutdown() { // Run runs must be executed in a separate goroutine. func (o *Oracle) Run() { + o.Log.Info("starting oracle service") for i := 0; i < o.MainCfg.MaxConcurrentRequests; i++ { go o.runRequestWorker() } diff --git a/pkg/services/stateroot/network.go b/pkg/services/stateroot/network.go index af8e6fceb..9539eab04 100644 --- a/pkg/services/stateroot/network.go +++ b/pkg/services/stateroot/network.go @@ -94,18 +94,5 @@ func (s *service) sendValidatedRoot(r *state.MPTRoot, priv *keys.PrivateKey) { buf := io.NewBufBinWriter() emit.Bytes(buf.BinWriter, sig) ep.Witness.InvocationScript = buf.Bytes() - s.getRelayCallback()(ep) -} - -func (s *service) getRelayCallback() RelayCallback { - s.cbMtx.RLock() - defer s.cbMtx.RUnlock() - return s.onValidatedRoot -} - -// SetRelayCallback sets callback to pool and broadcast tx. -func (s *service) SetRelayCallback(cb RelayCallback) { - s.cbMtx.Lock() - defer s.cbMtx.Unlock() - s.onValidatedRoot = cb + s.onValidatedRoot(ep) } diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index ba3a35998..694d9fd99 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -24,7 +24,6 @@ type ( OnPayload(p *payload.Extensible) error AddSignature(height uint32, validatorIndex int32, sig []byte) error GetConfig() config.StateRoot - SetRelayCallback(RelayCallback) Run() Shutdown() } @@ -46,7 +45,6 @@ type ( srMtx sync.Mutex incompleteRoots map[uint32]*incompleteRoot - cbMtx sync.RWMutex onValidatedRoot RelayCallback blockCh chan *block.Block done chan struct{} @@ -59,7 +57,7 @@ const ( ) // New returns new state root service instance using underlying module. -func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer) (Service, error) { +func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer, cb RelayCallback) (Service, error) { s := &service{ StateRoot: bc.GetStateModule(), Network: bc.GetConfig().Magic, @@ -68,6 +66,7 @@ func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer) (S incompleteRoots: make(map[uint32]*incompleteRoot), blockCh: make(chan *block.Block), done: make(chan struct{}), + onValidatedRoot: cb, } s.MainCfg = cfg diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index 3a56041c4..ed0895ae0 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -12,11 +12,13 @@ import ( // Run runs service instance in a separate goroutine. func (s *service) Run() { + s.log.Info("starting state validation service") s.chain.SubscribeForBlocks(s.blockCh) go s.run() } func (s *service) run() { +runloop: for { select { case b := <-s.blockCh: @@ -27,13 +29,22 @@ func (s *service) run() { s.log.Error("can't sign or send state root", zap.Error(err)) } case <-s.done: - return + break runloop + } + } +drainloop: + for { + select { + case <-s.blockCh: + default: + break drainloop } } } // Shutdown stops the service. func (s *service) Shutdown() { + s.chain.UnsubscribeFromBlocks(s.blockCh) close(s.done) } @@ -81,7 +92,7 @@ func (s *service) signAndSend(r *state.MPTRoot) error { buf := io.NewBufBinWriter() emit.Bytes(buf.BinWriter, sig) e.Witness.InvocationScript = buf.Bytes() - s.getRelayCallback()(e) + s.onValidatedRoot(e) return nil }