From f8dc5ec44f5ed855ed78c82fe03479eb2da22c19 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 21 Feb 2024 18:07:28 +0300 Subject: [PATCH 1/6] network: change server Start() behavior Previously user should Start server in a separate goroutine. Now separate goroutine is created inside the Start(). For normal server operation, the caller should wait for Start to finish. Also, fixed TestTryInitStateSync test which was exiting earlier than logs are called. Close #3112 Signed-off-by: Ekaterina Pavlova --- cli/server/server.go | 2 +- internal/testcli/executor.go | 2 +- pkg/network/server.go | 4 ++-- pkg/network/server_test.go | 8 ++++---- pkg/services/rpcsrv/subscription_test.go | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index f2670d599..d51b70159 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -498,7 +498,7 @@ func startServer(ctx *cli.Context) error { rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) serv.AddService(&rpcServer) - go serv.Start() + serv.Start() if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized { // Run RPC server in a separate routine. This is necessary to avoid a potential // deadlock: Start() can write errors to errChan which is not yet read in the diff --git a/internal/testcli/executor.go b/internal/testcli/executor.go index 0dfa5be13..5fec5e06a 100644 --- a/internal/testcli/executor.go +++ b/internal/testcli/executor.go @@ -164,7 +164,7 @@ func NewTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch }) require.NoError(t, err) netSrv.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) - go netSrv.Start() + netSrv.Start() errCh := make(chan error, 2) rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh) rpcServer.Start() diff --git a/pkg/network/server.go b/pkg/network/server.go index e000c18c5..0e919cbdd 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -262,7 +262,7 @@ func (s *Server) ID() uint32 { } // Start will start the server and its underlying transport. Calling it twice -// is an error. +// is an error. Caller should wait for Start to finish for normal server operation. func (s *Server) Start() { s.log.Info("node started", zap.Uint32("blockHeight", s.chain.BlockHeight()), @@ -285,7 +285,7 @@ func (s *Server) Start() { setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10)) setNeoGoVersion(config.Version) setSeverID(strconv.FormatUint(uint64(s.id), 10)) - s.run() + go s.run() } // Shutdown disconnects all peers and stops listening. Calling it twice is an error, diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 2ca95885f..7dbce881d 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -90,7 +90,7 @@ func TestServerStartAndShutdown(t *testing.T) { t.Run("no consensus", func(t *testing.T) { s := newTestServer(t, ServerConfig{}) - go s.Start() + s.Start() p := newLocalPeer(t, s) s.register <- p require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) @@ -110,7 +110,7 @@ func TestServerStartAndShutdown(t *testing.T) { cons := new(fakeConsensus) s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) - go s.Start() + s.Start() p := newLocalPeer(t, s) s.register <- p @@ -312,7 +312,7 @@ func TestServerNotSendsVerack(t *testing.T) { s.id = 1 finished := make(chan struct{}) go func() { - s.run() + go s.run() close(finished) }() t.Cleanup(func() { @@ -389,7 +389,7 @@ func startTestServer(t *testing.T, protocolCfg ...func(*config.Blockchain)) *Ser } func startWithCleanup(t *testing.T, s *Server) { - go s.Start() + s.Start() t.Cleanup(func() { s.Shutdown() }) diff --git a/pkg/services/rpcsrv/subscription_test.go b/pkg/services/rpcsrv/subscription_test.go index 857f3b2d1..d5a291903 100644 --- a/pkg/services/rpcsrv/subscription_test.go +++ b/pkg/services/rpcsrv/subscription_test.go @@ -99,7 +99,7 @@ func TestSubscriptions(t *testing.T) { defer chain.Close() defer rpcSrv.Shutdown() - go rpcSrv.coreServer.Start() + rpcSrv.coreServer.Start() defer rpcSrv.coreServer.Shutdown() for _, feed := range subFeeds { @@ -395,7 +395,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) { } chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) - go rpcSrv.coreServer.Start() + rpcSrv.coreServer.Start() defer chain.Close() defer rpcSrv.Shutdown() From 4715e523e09c4ca49c6c8c538165ad74914b7693 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Thu, 22 Feb 2024 16:50:58 +0300 Subject: [PATCH 2/6] services: move blockchain/mempool subscriptions to separate routine Start of some services is bound to blockchain subscriptions, and thus, can't be run before the blockchain notifications dispatcher. Signed-off-by: Ekaterina Pavlova --- pkg/consensus/consensus.go | 9 ++++++++- pkg/services/notary/notary.go | 4 ++-- pkg/services/stateroot/validators.go | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index c40e72b16..47aab9c5a 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -281,7 +281,6 @@ func (s *service) Start() { b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block! s.lastTimestamp = b.Timestamp s.dbft.Start(s.lastTimestamp * nsInMs) - s.Chain.SubscribeForBlocks(s.blockEvents) go s.eventLoop() } } @@ -299,6 +298,14 @@ func (s *service) Shutdown() { } func (s *service) eventLoop() { + s.Chain.SubscribeForBlocks(s.blockEvents) + + // Manually sync up with potentially missed fresh blocks that may be added by blockchain + // before the subscription. + b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block! + if b.Timestamp >= s.lastTimestamp { + s.handleChainBlock(b) + } events: for { select { diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 7593c5a2a..f824e2ff1 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -175,13 +175,13 @@ func (n *Notary) Start() { return } n.Config.Log.Info("starting notary service") - n.Config.Chain.SubscribeForBlocks(n.blocksCh) - n.mp.SubscribeForTransactions(n.reqCh) go n.newTxCallbackLoop() go n.mainLoop() } func (n *Notary) mainLoop() { + n.Config.Chain.SubscribeForBlocks(n.blocksCh) + n.mp.SubscribeForTransactions(n.reqCh) mainloop: for { select { diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index a62e75892..afaad8617 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -29,11 +29,11 @@ func (s *service) Start() { return } s.log.Info("starting state validation service") - s.chain.SubscribeForBlocks(s.blockCh) go s.run() } func (s *service) run() { + s.chain.SubscribeForBlocks(s.blockCh) runloop: for { select { From 775c56e87e742a53f5d117601346c691ac2a4b08 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Sun, 18 Feb 2024 15:27:52 +0300 Subject: [PATCH 3/6] network: ensure server is started and shut down only once Use started atomic.Bool field to ensure that the node server shutdown procedure is executed only once. Prevent the following panic caused by server double-shutdown in testing code: ``` --- FAIL: TestServerRegisterPeer (0 .06s) panic: closed twice goroutine 60 [running]: testing.tRunner.func1.2({0x104c40b20, 0x104d0ec90}) /opt/homebrew/opt/go/libexec/src/testing/testing.go:1545 +0x1c8 testing.tRunner.func1() /opt/homebrew/opt/go/libexec/src/testing/testing.go:1548 +0x360 panic({0x104c40b20?, 0x104d0ec90?}) /opt/homebrew/opt/go/libexec/src/runtime/panic.go:914 +0x218 github.com/nspcc-dev/neo-go/pkg/network.(*fakeTransp).Close (0x14000159e08?) /Users/ekaterinapavlova/Workplace/neo-go/pkg/network /discovery_test.go:83 +0x54 github.com/nspcc-dev/neo-go/pkg/network.(*Server).Shutdown (0x14000343400) /Users/ekaterinapavlova/Workplace/neo-go/pkg/network/server.go:299 +0x104 github.com/nspcc-dev/neo-go/pkg/network.startWithCleanup.func1() /Users/ekaterinapavlova/Workplace/neo-go/pkg/network/server_test .go:408 +0x20 testing.(*common).Cleanup.func1() /opt/homebrew/opt/go/libexec/src/testing/testing.go:1169 +0x110 testing.(*common).runCleanup(0x1400032c340, 0x14000159d80?) /opt/homebrew/opt/go/libexec/src/testing/testing.go:1347 +0xd8 testing.tRunner.func2() /opt/homebrew/opt/go/libexec/src/testing/testing.go:1589 +0x2c testing.tRunner(0x1400032c340, 0x104d0c5d0) /opt/homebrew/opt/go/libexec/src/testing/testing.go:1601 +0x114 created by testing.(*T).Run in goroutine 1 /opt/homebrew/opt/go/libexec/src/testing/testing.go:1648 +0x33c ``` Signed-off-by: Ekaterina Pavlova --- pkg/network/server.go | 14 ++++++++++++-- pkg/network/server_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 0e919cbdd..b97cf2dd1 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -138,6 +138,9 @@ type ( stateSync StateSync log *zap.Logger + + // started used to Start and Shutdown server only once. + started atomic.Bool } peerDrop struct { @@ -262,8 +265,12 @@ func (s *Server) ID() uint32 { } // Start will start the server and its underlying transport. Calling it twice -// is an error. Caller should wait for Start to finish for normal server operation. +// is a no-op. Caller should wait for Start to finish for normal server operation. func (s *Server) Start() { + if !s.started.CompareAndSwap(false, true) { + s.log.Info("node server already started") + return + } s.log.Info("node started", zap.Uint32("blockHeight", s.chain.BlockHeight()), zap.Uint32("headerHeight", s.chain.HeaderHeight())) @@ -288,9 +295,12 @@ func (s *Server) Start() { go s.run() } -// Shutdown disconnects all peers and stops listening. Calling it twice is an error, +// Shutdown disconnects all peers and stops listening. Calling it twice is a no-op, // once stopped the same intance of the Server can't be started again by calling Start. func (s *Server) Shutdown() { + if !s.started.CompareAndSwap(true, false) { + return + } s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) for _, tr := range s.transports { tr.Close() diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 7dbce881d..c1b5d96dc 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -96,10 +96,12 @@ func TestServerStartAndShutdown(t *testing.T) { require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) assert.True(t, s.transports[0].(*fakeTransp).started.Load()) + require.True(t, s.started.Load()) assert.Nil(t, s.txCallback) s.Shutdown() + require.False(t, s.started.Load()) require.True(t, s.transports[0].(*fakeTransp).closed.Load()) err, ok := p.droppedWith.Load().(error) require.True(t, ok) @@ -115,11 +117,34 @@ func TestServerStartAndShutdown(t *testing.T) { s.register <- p assert.True(t, s.services["fake"].(*fakeConsensus).started.Load()) + require.True(t, s.started.Load()) s.Shutdown() + require.False(t, s.started.Load()) require.True(t, s.services["fake"].(*fakeConsensus).stopped.Load()) }) + t.Run("double start", func(t *testing.T) { + s := newTestServer(t, ServerConfig{}) + startWithCleanup(t, s) + + // Attempt to start the server again. + s.Start() + + require.True(t, s.started.Load(), "server should still be marked as started after second Start call") + }) + t.Run("double shutdown", func(t *testing.T) { + s := newTestServer(t, ServerConfig{}) + s.Start() + require.True(t, s.started.Load(), "server should still be marked as started after second Start call") + s.Shutdown() + + require.False(t, s.started.Load(), "server should be marked as not started after second Shutdown call") + // Attempt to shutdown the server again. + s.Shutdown() + // Verify the server state remains unchanged and is still considered shutdown. + require.False(t, s.started.Load(), "server should remain shutdown after second call") + }) } func TestServerRegisterPeer(t *testing.T) { From 9b540770cd333b06b81e4a181b172983566db6ad Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 21 Feb 2024 21:14:12 +0300 Subject: [PATCH 4/6] network: fix typo Signed-off-by: Ekaterina Pavlova --- pkg/network/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index b97cf2dd1..814d9e6f5 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -296,7 +296,7 @@ func (s *Server) Start() { } // Shutdown disconnects all peers and stops listening. Calling it twice is a no-op, -// once stopped the same intance of the Server can't be started again by calling Start. +// once stopped the same instance of the Server can't be started again by calling Start. func (s *Server) Shutdown() { if !s.started.CompareAndSwap(true, false) { return From 5bb7c6b715daf6dc032e9900c87da6a213ccb456 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Sun, 18 Feb 2024 15:29:04 +0300 Subject: [PATCH 5/6] services: update logs flush after services shutdown Added sync logs for every service separately to provide the ability to have a custom logger for each service. This commit makes the code follow the zap usages rules: `Sync calls the underlying Core's Sync method, flushing any buffered log entries. Applications should take care to call Sync before exiting.` Signed-off-by: Ekaterina Pavlova --- pkg/consensus/consensus.go | 1 + pkg/core/blockchain.go | 1 + pkg/network/server.go | 2 ++ pkg/services/metrics/metrics.go | 1 + pkg/services/notary/notary.go | 1 + pkg/services/oracle/oracle.go | 1 + pkg/services/rpcsrv/server.go | 1 + pkg/services/stateroot/validators.go | 1 + 8 files changed, 9 insertions(+) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 47aab9c5a..2eb9d6e15 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -295,6 +295,7 @@ func (s *service) Shutdown() { s.wallet.Close() } } + _ = s.log.Sync() } func (s *service) eventLoop() { diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 73f95b95c..aa1a0f324 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -1447,6 +1447,7 @@ func (bc *Blockchain) Close() { close(bc.stopCh) <-bc.runToExitCh bc.addLock.Unlock() + _ = bc.log.Sync() } // AddBlock accepts successive block for the Blockchain, verifies it and diff --git a/pkg/network/server.go b/pkg/network/server.go index 814d9e6f5..6132e69ff 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -320,6 +320,8 @@ func (s *Server) Shutdown() { } close(s.quit) <-s.relayFin + + _ = s.log.Sync() } // AddService allows to add a service to be started/stopped by Server. diff --git a/pkg/services/metrics/metrics.go b/pkg/services/metrics/metrics.go index e0f12a1ad..039182bea 100644 --- a/pkg/services/metrics/metrics.go +++ b/pkg/services/metrics/metrics.go @@ -75,4 +75,5 @@ func (ms *Service) ShutDown() { ms.log.Error("can't shut service down", zap.String("endpoint", srv.Addr), zap.Error(err)) } } + _ = ms.log.Sync() } diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index f824e2ff1..205891c27 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -228,6 +228,7 @@ func (n *Notary) Shutdown() { close(n.stopCh) <-n.done n.wallet.Close() + _ = n.Config.Log.Sync() } // IsAuthorized returns whether Notary service currently is authorized to collect diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 8bce1bf95..b9612f4a0 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -192,6 +192,7 @@ func (o *Oracle) Shutdown() { o.ResponseHandler.Shutdown() <-o.done o.wallet.Close() + _ = o.Log.Sync() } // Start runs the oracle service in a separate goroutine. diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index 5c4e2e9f6..87db5b440 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -482,6 +482,7 @@ func (s *Server) Shutdown() { // Wait for handleSubEvents to finish. <-s.subEventsToExitCh + _ = s.log.Sync() } // SetOracleHandler allows to update oracle handler used by the Server. diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index afaad8617..857ecba9e 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -77,6 +77,7 @@ func (s *service) Shutdown() { if s.wallet != nil { s.wallet.Close() } + _ = s.log.Sync() } func (s *service) signAndSend(r *state.MPTRoot) error { From 5cf0c757448b3e9556e41dad8f7f4082b9e3c5a3 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 21 Feb 2024 12:15:13 +0300 Subject: [PATCH 6/6] network: fix server shutdown by waiting for goroutines to finish s.Shutdown() does not wait for all goroutines of the node server to finish normally just because the server exits without dependent goroutines awaiting. Which causes logs to attempt to write after the test has ended. The consequence of this bug fix is that corresponding tests are fixed. Close #2973 Close #2974 Signed-off-by: Ekaterina Pavlova --- pkg/network/server.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 6132e69ff..5bc4b3c8f 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -124,12 +124,14 @@ type ( lastRequestedBlock atomic.Uint32 // lastRequestedHeader contains a height of the last requested header. lastRequestedHeader atomic.Uint32 - - register chan Peer - unregister chan peerDrop - handshake chan Peer - quit chan struct{} - relayFin chan struct{} + register chan Peer + unregister chan peerDrop + handshake chan Peer + quit chan struct{} + relayFin chan struct{} + runFin chan struct{} + broadcastTxFin chan struct{} + runProtoFin chan struct{} transactions chan *transaction.Transaction @@ -141,6 +143,8 @@ type ( // started used to Start and Shutdown server only once. started atomic.Bool + + txHandlerLoopWG sync.WaitGroup } peerDrop struct { @@ -183,6 +187,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy config: chain.GetConfig().ProtocolConfiguration, quit: make(chan struct{}), relayFin: make(chan struct{}), + runFin: make(chan struct{}), + broadcastTxFin: make(chan struct{}), + runProtoFin: make(chan struct{}), register: make(chan Peer), unregister: make(chan peerDrop), handshake: make(chan Peer), @@ -279,6 +286,7 @@ func (s *Server) Start() { s.initStaleMemPools() var txThreads = optimalNumOfThreads() + s.txHandlerLoopWG.Add(txThreads) for i := 0; i < txThreads; i++ { go s.txHandlerLoop() } @@ -319,7 +327,11 @@ func (s *Server) Shutdown() { s.notaryRequestPool.StopSubscriptions() } close(s.quit) + <-s.broadcastTxFin + <-s.runProtoFin <-s.relayFin + <-s.runFin + s.txHandlerLoopWG.Wait() _ = s.log.Sync() } @@ -435,6 +447,7 @@ func (s *Server) run() { addrTimer = time.NewTimer(peerCheckTime) peerTimer = time.NewTimer(s.ProtoTickInterval) ) + defer close(s.runFin) defer addrTimer.Stop() defer peerTimer.Stop() go s.runProto() @@ -533,6 +546,7 @@ func (s *Server) run() { // runProto is a goroutine that manages server-wide protocol events. func (s *Server) runProto() { + defer close(s.runProtoFin) pingTimer := time.NewTimer(s.PingInterval) for { prevHeight := s.chain.BlockHeight() @@ -1135,6 +1149,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { } func (s *Server) txHandlerLoop() { + defer s.txHandlerLoopWG.Done() txloop: for { select { @@ -1651,6 +1666,7 @@ func (s *Server) broadcastTxLoop() { batchSize = 42 ) + defer close(s.broadcastTxFin) txs := make([]util.Uint256, 0, batchSize) var timer *time.Timer