diff --git a/cli/server/server.go b/cli/server/server.go index f2670d599..d51b70159 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -498,7 +498,7 @@ func startServer(ctx *cli.Context) error { rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) serv.AddService(&rpcServer) - go serv.Start() + serv.Start() if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized { // Run RPC server in a separate routine. This is necessary to avoid a potential // deadlock: Start() can write errors to errChan which is not yet read in the diff --git a/internal/testcli/executor.go b/internal/testcli/executor.go index 0dfa5be13..5fec5e06a 100644 --- a/internal/testcli/executor.go +++ b/internal/testcli/executor.go @@ -164,7 +164,7 @@ func NewTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch }) require.NoError(t, err) netSrv.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) - go netSrv.Start() + netSrv.Start() errCh := make(chan error, 2) rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh) rpcServer.Start() diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index c40e72b16..2eb9d6e15 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -281,7 +281,6 @@ func (s *service) Start() { b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block! s.lastTimestamp = b.Timestamp s.dbft.Start(s.lastTimestamp * nsInMs) - s.Chain.SubscribeForBlocks(s.blockEvents) go s.eventLoop() } } @@ -296,9 +295,18 @@ func (s *service) Shutdown() { s.wallet.Close() } } + _ = s.log.Sync() } func (s *service) eventLoop() { + s.Chain.SubscribeForBlocks(s.blockEvents) + + // Manually sync up with potentially missed fresh blocks that may be added by blockchain + // before the subscription. + b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block! + if b.Timestamp >= s.lastTimestamp { + s.handleChainBlock(b) + } events: for { select { diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 73f95b95c..aa1a0f324 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -1447,6 +1447,7 @@ func (bc *Blockchain) Close() { close(bc.stopCh) <-bc.runToExitCh bc.addLock.Unlock() + _ = bc.log.Sync() } // AddBlock accepts successive block for the Blockchain, verifies it and diff --git a/pkg/network/server.go b/pkg/network/server.go index e000c18c5..5bc4b3c8f 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -124,12 +124,14 @@ type ( lastRequestedBlock atomic.Uint32 // lastRequestedHeader contains a height of the last requested header. lastRequestedHeader atomic.Uint32 - - register chan Peer - unregister chan peerDrop - handshake chan Peer - quit chan struct{} - relayFin chan struct{} + register chan Peer + unregister chan peerDrop + handshake chan Peer + quit chan struct{} + relayFin chan struct{} + runFin chan struct{} + broadcastTxFin chan struct{} + runProtoFin chan struct{} transactions chan *transaction.Transaction @@ -138,6 +140,11 @@ type ( stateSync StateSync log *zap.Logger + + // started used to Start and Shutdown server only once. + started atomic.Bool + + txHandlerLoopWG sync.WaitGroup } peerDrop struct { @@ -180,6 +187,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy config: chain.GetConfig().ProtocolConfiguration, quit: make(chan struct{}), relayFin: make(chan struct{}), + runFin: make(chan struct{}), + broadcastTxFin: make(chan struct{}), + runProtoFin: make(chan struct{}), register: make(chan Peer), unregister: make(chan peerDrop), handshake: make(chan Peer), @@ -262,8 +272,12 @@ func (s *Server) ID() uint32 { } // Start will start the server and its underlying transport. Calling it twice -// is an error. +// is a no-op. Caller should wait for Start to finish for normal server operation. func (s *Server) Start() { + if !s.started.CompareAndSwap(false, true) { + s.log.Info("node server already started") + return + } s.log.Info("node started", zap.Uint32("blockHeight", s.chain.BlockHeight()), zap.Uint32("headerHeight", s.chain.HeaderHeight())) @@ -272,6 +286,7 @@ func (s *Server) Start() { s.initStaleMemPools() var txThreads = optimalNumOfThreads() + s.txHandlerLoopWG.Add(txThreads) for i := 0; i < txThreads; i++ { go s.txHandlerLoop() } @@ -285,12 +300,15 @@ func (s *Server) Start() { setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10)) setNeoGoVersion(config.Version) setSeverID(strconv.FormatUint(uint64(s.id), 10)) - s.run() + go s.run() } -// Shutdown disconnects all peers and stops listening. Calling it twice is an error, -// once stopped the same intance of the Server can't be started again by calling Start. +// Shutdown disconnects all peers and stops listening. Calling it twice is a no-op, +// once stopped the same instance of the Server can't be started again by calling Start. func (s *Server) Shutdown() { + if !s.started.CompareAndSwap(true, false) { + return + } s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) for _, tr := range s.transports { tr.Close() @@ -309,7 +327,13 @@ func (s *Server) Shutdown() { s.notaryRequestPool.StopSubscriptions() } close(s.quit) + <-s.broadcastTxFin + <-s.runProtoFin <-s.relayFin + <-s.runFin + s.txHandlerLoopWG.Wait() + + _ = s.log.Sync() } // AddService allows to add a service to be started/stopped by Server. @@ -423,6 +447,7 @@ func (s *Server) run() { addrTimer = time.NewTimer(peerCheckTime) peerTimer = time.NewTimer(s.ProtoTickInterval) ) + defer close(s.runFin) defer addrTimer.Stop() defer peerTimer.Stop() go s.runProto() @@ -521,6 +546,7 @@ func (s *Server) run() { // runProto is a goroutine that manages server-wide protocol events. func (s *Server) runProto() { + defer close(s.runProtoFin) pingTimer := time.NewTimer(s.PingInterval) for { prevHeight := s.chain.BlockHeight() @@ -1123,6 +1149,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { } func (s *Server) txHandlerLoop() { + defer s.txHandlerLoopWG.Done() txloop: for { select { @@ -1639,6 +1666,7 @@ func (s *Server) broadcastTxLoop() { batchSize = 42 ) + defer close(s.broadcastTxFin) txs := make([]util.Uint256, 0, batchSize) var timer *time.Timer diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 2ca95885f..c1b5d96dc 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -90,16 +90,18 @@ func TestServerStartAndShutdown(t *testing.T) { t.Run("no consensus", func(t *testing.T) { s := newTestServer(t, ServerConfig{}) - go s.Start() + s.Start() p := newLocalPeer(t, s) s.register <- p require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) assert.True(t, s.transports[0].(*fakeTransp).started.Load()) + require.True(t, s.started.Load()) assert.Nil(t, s.txCallback) s.Shutdown() + require.False(t, s.started.Load()) require.True(t, s.transports[0].(*fakeTransp).closed.Load()) err, ok := p.droppedWith.Load().(error) require.True(t, ok) @@ -110,16 +112,39 @@ func TestServerStartAndShutdown(t *testing.T) { cons := new(fakeConsensus) s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) - go s.Start() + s.Start() p := newLocalPeer(t, s) s.register <- p assert.True(t, s.services["fake"].(*fakeConsensus).started.Load()) + require.True(t, s.started.Load()) s.Shutdown() + require.False(t, s.started.Load()) require.True(t, s.services["fake"].(*fakeConsensus).stopped.Load()) }) + t.Run("double start", func(t *testing.T) { + s := newTestServer(t, ServerConfig{}) + startWithCleanup(t, s) + + // Attempt to start the server again. + s.Start() + + require.True(t, s.started.Load(), "server should still be marked as started after second Start call") + }) + t.Run("double shutdown", func(t *testing.T) { + s := newTestServer(t, ServerConfig{}) + s.Start() + require.True(t, s.started.Load(), "server should still be marked as started after second Start call") + s.Shutdown() + + require.False(t, s.started.Load(), "server should be marked as not started after second Shutdown call") + // Attempt to shutdown the server again. + s.Shutdown() + // Verify the server state remains unchanged and is still considered shutdown. + require.False(t, s.started.Load(), "server should remain shutdown after second call") + }) } func TestServerRegisterPeer(t *testing.T) { @@ -312,7 +337,7 @@ func TestServerNotSendsVerack(t *testing.T) { s.id = 1 finished := make(chan struct{}) go func() { - s.run() + go s.run() close(finished) }() t.Cleanup(func() { @@ -389,7 +414,7 @@ func startTestServer(t *testing.T, protocolCfg ...func(*config.Blockchain)) *Ser } func startWithCleanup(t *testing.T, s *Server) { - go s.Start() + s.Start() t.Cleanup(func() { s.Shutdown() }) diff --git a/pkg/services/metrics/metrics.go b/pkg/services/metrics/metrics.go index e0f12a1ad..039182bea 100644 --- a/pkg/services/metrics/metrics.go +++ b/pkg/services/metrics/metrics.go @@ -75,4 +75,5 @@ func (ms *Service) ShutDown() { ms.log.Error("can't shut service down", zap.String("endpoint", srv.Addr), zap.Error(err)) } } + _ = ms.log.Sync() } diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 7593c5a2a..205891c27 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -175,13 +175,13 @@ func (n *Notary) Start() { return } n.Config.Log.Info("starting notary service") - n.Config.Chain.SubscribeForBlocks(n.blocksCh) - n.mp.SubscribeForTransactions(n.reqCh) go n.newTxCallbackLoop() go n.mainLoop() } func (n *Notary) mainLoop() { + n.Config.Chain.SubscribeForBlocks(n.blocksCh) + n.mp.SubscribeForTransactions(n.reqCh) mainloop: for { select { @@ -228,6 +228,7 @@ func (n *Notary) Shutdown() { close(n.stopCh) <-n.done n.wallet.Close() + _ = n.Config.Log.Sync() } // IsAuthorized returns whether Notary service currently is authorized to collect diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 8bce1bf95..b9612f4a0 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -192,6 +192,7 @@ func (o *Oracle) Shutdown() { o.ResponseHandler.Shutdown() <-o.done o.wallet.Close() + _ = o.Log.Sync() } // Start runs the oracle service in a separate goroutine. diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index 5c4e2e9f6..87db5b440 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -482,6 +482,7 @@ func (s *Server) Shutdown() { // Wait for handleSubEvents to finish. <-s.subEventsToExitCh + _ = s.log.Sync() } // SetOracleHandler allows to update oracle handler used by the Server. diff --git a/pkg/services/rpcsrv/subscription_test.go b/pkg/services/rpcsrv/subscription_test.go index 857f3b2d1..d5a291903 100644 --- a/pkg/services/rpcsrv/subscription_test.go +++ b/pkg/services/rpcsrv/subscription_test.go @@ -99,7 +99,7 @@ func TestSubscriptions(t *testing.T) { defer chain.Close() defer rpcSrv.Shutdown() - go rpcSrv.coreServer.Start() + rpcSrv.coreServer.Start() defer rpcSrv.coreServer.Shutdown() for _, feed := range subFeeds { @@ -395,7 +395,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) { } chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) - go rpcSrv.coreServer.Start() + rpcSrv.coreServer.Start() defer chain.Close() defer rpcSrv.Shutdown() diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index a62e75892..857ecba9e 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -29,11 +29,11 @@ func (s *service) Start() { return } s.log.Info("starting state validation service") - s.chain.SubscribeForBlocks(s.blockCh) go s.run() } func (s *service) run() { + s.chain.SubscribeForBlocks(s.blockCh) runloop: for { select { @@ -77,6 +77,7 @@ func (s *service) Shutdown() { if s.wallet != nil { s.wallet.Close() } + _ = s.log.Sync() } func (s *service) signAndSend(r *state.MPTRoot) error {