diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index c1dff3105..76fd67428 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -129,10 +129,6 @@ func NewService(cfg Config) (Service, error) { finished: make(chan struct{}), } - if cfg.Wallet == nil { - return srv, nil - } - var err error if srv.wallet, err = wallet.NewWalletFromFile(cfg.Wallet.Path); err != nil { diff --git a/pkg/core/blockchainer/services/oracle.go b/pkg/core/blockchainer/services/oracle.go index 102ea390b..426e03967 100644 --- a/pkg/core/blockchainer/services/oracle.go +++ b/pkg/core/blockchainer/services/oracle.go @@ -16,8 +16,8 @@ type Oracle interface { UpdateOracleNodes(keys.PublicKeys) // UpdateNativeContract updates oracle contract native script and hash. UpdateNativeContract([]byte, []byte, util.Uint160, int) - // Run runs oracle module. Must be invoked in a separate goroutine. - Run() + // Start runs oracle module. + Start() // Shutdown shutdowns oracle module. Shutdown() } diff --git a/pkg/core/notary_test.go b/pkg/core/notary_test.go index 67b5bbd3d..f7eda2ef3 100644 --- a/pkg/core/notary_test.go +++ b/pkg/core/notary_test.go @@ -140,9 +140,9 @@ func TestNotary(t *testing.T) { }) mp1.RunSubscriptions() - go ntr1.Run() + ntr1.Start() t.Cleanup(func() { - ntr1.Stop() + ntr1.Shutdown() mp1.StopSubscriptions() }) diff --git a/pkg/core/oracle_test.go b/pkg/core/oracle_test.go index 6461d08bf..7f21650d2 100644 --- a/pkg/core/oracle_test.go +++ b/pkg/core/oracle_test.go @@ -305,7 +305,7 @@ func TestOracleFull(t *testing.T) { require.NoError(t, bc.contracts.Management.PutContractState(bc.dao, cs)) go bc.Run() - go orc.Run() + orc.Start() t.Cleanup(orc.Shutdown) bc.setNodesByRole(t, true, noderoles.Oracle, keys.PublicKeys{acc.PrivateKey().PublicKey()}) @@ -351,7 +351,7 @@ func TestNotYetRunningOracle(t *testing.T) { ids = []uint64{3} orc.RemoveRequests(ids) // 3 removed from pending -> 2, 4 in pending. - go orc.Run() + orc.Start() t.Cleanup(orc.Shutdown) require.Eventually(t, func() bool { return mp.Count() == 2 }, diff --git a/pkg/core/stateroot_test.go b/pkg/core/stateroot_test.go index cb66c078e..014d250b6 100644 --- a/pkg/core/stateroot_test.go +++ b/pkg/core/stateroot_test.go @@ -199,7 +199,7 @@ func TestStateRootFull(t *testing.T) { lastValidated.Store(ep) }) require.NoError(t, err) - srv.Run() + srv.Start() t.Cleanup(srv.Shutdown) bc.setNodesByRole(t, true, noderoles.StateValidator, pubs) diff --git a/pkg/network/server.go b/pkg/network/server.go index ddca67d2f..1fa88eda7 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -52,6 +52,12 @@ var ( ) type ( + // Service is a service abstraction (oracle, state root, consensus, etc). + Service interface { + Start() + Shutdown() + } + // Server represents the local Node in the network. Its transport could // be of any kind. Server struct { @@ -77,6 +83,7 @@ type ( extensiblePool *extpool.Pool notaryFeer NotaryFeer notaryModule *notary.Notary + services []Service txInLock sync.Mutex txInMap map[util.Uint256]struct{} @@ -179,6 +186,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai return nil, fmt.Errorf("failed to create Notary module: %w", err) } s.notaryModule = n + s.services = append(s.services, n) chain.SetNotary(n) } } else if config.P2PNotaryCfg.Enabled { @@ -197,6 +205,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai return nil, fmt.Errorf("can't initialize StateRoot service: %w", err) } s.stateRoot = sr + s.services = append(s.services, sr) sSync := chain.GetStateSyncModule() s.stateSync = sSync @@ -221,25 +230,29 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai } }) s.oracle = orc + s.services = append(s.services, orc) chain.SetOracle(orc) } - srv, err := newConsensus(consensus.Config{ - Logger: log, - Broadcast: s.handleNewPayload, - Chain: chain, - ProtocolConfiguration: chain.GetConfig(), - RequestTx: s.requestTx, - Wallet: config.Wallet, + if config.Wallet != nil { + srv, err := newConsensus(consensus.Config{ + Logger: log, + Broadcast: s.handleNewPayload, + Chain: chain, + ProtocolConfiguration: chain.GetConfig(), + RequestTx: s.requestTx, + Wallet: config.Wallet, - TimePerBlock: config.TimePerBlock, - }) - if err != nil { - return nil, err + TimePerBlock: config.TimePerBlock, + }) + if err != nil { + return nil, err + } + + s.consensus = srv + s.services = append(s.services, srv) } - s.consensus = srv - if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", zap.Int("configured", s.MinPeers), @@ -299,20 +312,13 @@ func (s *Server) Shutdown() { s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.transport.Close() s.discovery.Close() - s.consensus.Shutdown() for _, p := range s.getPeers(nil) { p.Disconnect(errServerShutdown) } s.bQueue.discard() s.bSyncQueue.discard() - if s.StateRootCfg.Enabled { - s.stateRoot.Shutdown() - } - if s.oracle != nil { - s.oracle.Shutdown() - } - if s.notaryModule != nil { - s.notaryModule.Stop() + for _, svc := range s.services { + svc.Shutdown() } if s.chain.P2PSigExtensionsEnabled() { s.notaryRequestPool.StopSubscriptions() @@ -460,20 +466,11 @@ func (s *Server) tryStartServices() { if s.IsInSync() && s.syncReached.CAS(false, true) { s.log.Info("node reached synchronized state, starting services") - if s.Wallet != nil { - s.consensus.Start() - } - if s.StateRootCfg.Enabled { - s.stateRoot.Run() - } - if s.oracle != nil { - go s.oracle.Run() - } if s.chain.P2PSigExtensionsEnabled() { s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber. } - if s.notaryModule != nil { - go s.notaryModule.Run() + for _, svc := range s.services { + svc.Start() } } } @@ -976,7 +973,9 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { } switch e.Category { case consensus.Category: - s.consensus.OnPayload(e) + if s.consensus != nil { + s.consensus.OnPayload(e) + } case stateroot.Category: err := s.stateRoot.OnPayload(e) if err != nil { @@ -1009,7 +1008,9 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { s.txInMap[tx.Hash()] = struct{}{} s.txInLock.Unlock() if s.verifyAndPoolTX(tx) == nil { - s.consensus.OnTransaction(tx) + if s.consensus != nil { + s.consensus.OnTransaction(tx) + } s.broadcastTX(tx, nil) } s.txInLock.Lock() diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 6cc83d3fc..872501484 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -78,7 +78,7 @@ func TestNewServer(t *testing.T) { }) t.Run("consensus error is not dropped", func(t *testing.T) { errConsensus := errors.New("can't create consensus") - _, err = newServerFromConstructors(ServerConfig{MinPeers: -1}, bc, zaptest.NewLogger(t), newFakeTransp, + _, err = newServerFromConstructors(ServerConfig{Wallet: new(config.Wallet), MinPeers: -1}, bc, zaptest.NewLogger(t), newFakeTransp, func(consensus.Config) (consensus.Service, error) { return nil, errConsensus }, newTestDiscovery) require.True(t, errors.Is(err, errConsensus), "got: %#v", err) @@ -104,13 +104,12 @@ func TestServerStartAndShutdown(t *testing.T) { require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) assert.True(t, s.transport.(*fakeTransp).started.Load()) - assert.False(t, s.consensus.(*fakeConsensus).started.Load()) + assert.Nil(t, s.consensus) s.Shutdown() <-ch require.True(t, s.transport.(*fakeTransp).closed.Load()) - require.True(t, s.consensus.(*fakeConsensus).stopped.Load()) err, ok := p.droppedWith.Load().(error) require.True(t, ok) require.True(t, errors.Is(err, errServerShutdown)) @@ -416,7 +415,8 @@ func TestBlock(t *testing.T) { } func TestConsensus(t *testing.T) { - s := startTestServer(t) + s := newTestServer(t, ServerConfig{Wallet: new(config.Wallet)}) + startWithCleanup(t, s) atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4) p := newLocalPeer(t, s) @@ -465,7 +465,8 @@ func TestConsensus(t *testing.T) { } func TestTransaction(t *testing.T) { - s := startTestServer(t) + s := newTestServer(t, ServerConfig{Wallet: new(config.Wallet)}) + startWithCleanup(t, s) t.Run("good", func(t *testing.T) { tx := newDummyTx() diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index a0bc20e0b..b9cc1bcdd 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -143,12 +143,16 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu }, nil } -// Run runs Notary module and should be called in a separate goroutine. -func (n *Notary) Run() { +// Start runs Notary module in a separate goroutine. +func (n *Notary) Start() { n.Config.Log.Info("starting notary service") n.Config.Chain.SubscribeForBlocks(n.blocksCh) n.mp.SubscribeForTransactions(n.reqCh) go n.newTxCallbackLoop() + go n.mainLoop() +} + +func (n *Notary) mainLoop() { for { select { case <-n.stopCh: @@ -171,8 +175,8 @@ func (n *Notary) Run() { } } -// Stop shutdowns Notary module. -func (n *Notary) Stop() { +// Shutdown stops Notary module. +func (n *Notary) Shutdown() { close(n.stopCh) } diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index a9fcad05d..d0b37b326 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -170,15 +170,18 @@ func (o *Oracle) Shutdown() { o.getBroadcaster().Shutdown() } -// Run runs must be executed in a separate goroutine. -func (o *Oracle) Run() { +// Start runs the oracle service in a separate goroutine. +func (o *Oracle) Start() { o.respMtx.Lock() if o.running { o.respMtx.Unlock() return } o.Log.Info("starting oracle service") + go o.start() +} +func (o *Oracle) start() { o.requestMap <- o.pending // Guaranteed to not block, only AddRequests sends to it. o.pending = nil o.running = true diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 6716d4b68..375695dab 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -25,7 +25,7 @@ type ( OnPayload(p *payload.Extensible) error AddSignature(height uint32, validatorIndex int32, sig []byte) error GetConfig() config.StateRoot - Run() + Start() Shutdown() } diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index e36ba9bd5..8c934590f 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -17,8 +17,8 @@ const ( firstVoteResendDelay = 3 * time.Second ) -// Run runs service instance in a separate goroutine. -func (s *service) Run() { +// Start runs service instance in a separate goroutine. +func (s *service) Start() { s.log.Info("starting state validation service") s.chain.SubscribeForBlocks(s.blockCh) go s.run()