Merge pull request #3307 from nspcc-dev/test-register-peers

services: fix logging data race after shutdown
This commit is contained in:
Roman Khimov 2024-02-27 17:09:08 +03:00 committed by GitHub
commit cc38221d77
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 89 additions and 22 deletions

View file

@ -498,7 +498,7 @@ func startServer(ctx *cli.Context) error {
rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan)
serv.AddService(&rpcServer) serv.AddService(&rpcServer)
go serv.Start() serv.Start()
if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized { if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized {
// Run RPC server in a separate routine. This is necessary to avoid a potential // Run RPC server in a separate routine. This is necessary to avoid a potential
// deadlock: Start() can write errors to errChan which is not yet read in the // deadlock: Start() can write errors to errChan which is not yet read in the

View file

@ -164,7 +164,7 @@ func NewTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch
}) })
require.NoError(t, err) require.NoError(t, err)
netSrv.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) netSrv.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)
go netSrv.Start() netSrv.Start()
errCh := make(chan error, 2) errCh := make(chan error, 2)
rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh) rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh)
rpcServer.Start() rpcServer.Start()

View file

@ -281,7 +281,6 @@ func (s *service) Start() {
b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block! b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block!
s.lastTimestamp = b.Timestamp s.lastTimestamp = b.Timestamp
s.dbft.Start(s.lastTimestamp * nsInMs) s.dbft.Start(s.lastTimestamp * nsInMs)
s.Chain.SubscribeForBlocks(s.blockEvents)
go s.eventLoop() go s.eventLoop()
} }
} }
@ -296,9 +295,18 @@ func (s *service) Shutdown() {
s.wallet.Close() s.wallet.Close()
} }
} }
_ = s.log.Sync()
} }
func (s *service) eventLoop() { func (s *service) eventLoop() {
s.Chain.SubscribeForBlocks(s.blockEvents)
// Manually sync up with potentially missed fresh blocks that may be added by blockchain
// before the subscription.
b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block!
if b.Timestamp >= s.lastTimestamp {
s.handleChainBlock(b)
}
events: events:
for { for {
select { select {

View file

@ -1447,6 +1447,7 @@ func (bc *Blockchain) Close() {
close(bc.stopCh) close(bc.stopCh)
<-bc.runToExitCh <-bc.runToExitCh
bc.addLock.Unlock() bc.addLock.Unlock()
_ = bc.log.Sync()
} }
// AddBlock accepts successive block for the Blockchain, verifies it and // AddBlock accepts successive block for the Blockchain, verifies it and

View file

@ -124,12 +124,14 @@ type (
lastRequestedBlock atomic.Uint32 lastRequestedBlock atomic.Uint32
// lastRequestedHeader contains a height of the last requested header. // lastRequestedHeader contains a height of the last requested header.
lastRequestedHeader atomic.Uint32 lastRequestedHeader atomic.Uint32
register chan Peer
register chan Peer unregister chan peerDrop
unregister chan peerDrop handshake chan Peer
handshake chan Peer quit chan struct{}
quit chan struct{} relayFin chan struct{}
relayFin chan struct{} runFin chan struct{}
broadcastTxFin chan struct{}
runProtoFin chan struct{}
transactions chan *transaction.Transaction transactions chan *transaction.Transaction
@ -138,6 +140,11 @@ type (
stateSync StateSync stateSync StateSync
log *zap.Logger log *zap.Logger
// started used to Start and Shutdown server only once.
started atomic.Bool
txHandlerLoopWG sync.WaitGroup
} }
peerDrop struct { peerDrop struct {
@ -180,6 +187,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
config: chain.GetConfig().ProtocolConfiguration, config: chain.GetConfig().ProtocolConfiguration,
quit: make(chan struct{}), quit: make(chan struct{}),
relayFin: make(chan struct{}), relayFin: make(chan struct{}),
runFin: make(chan struct{}),
broadcastTxFin: make(chan struct{}),
runProtoFin: make(chan struct{}),
register: make(chan Peer), register: make(chan Peer),
unregister: make(chan peerDrop), unregister: make(chan peerDrop),
handshake: make(chan Peer), handshake: make(chan Peer),
@ -262,8 +272,12 @@ func (s *Server) ID() uint32 {
} }
// Start will start the server and its underlying transport. Calling it twice // Start will start the server and its underlying transport. Calling it twice
// is an error. // is a no-op. Caller should wait for Start to finish for normal server operation.
func (s *Server) Start() { func (s *Server) Start() {
if !s.started.CompareAndSwap(false, true) {
s.log.Info("node server already started")
return
}
s.log.Info("node started", s.log.Info("node started",
zap.Uint32("blockHeight", s.chain.BlockHeight()), zap.Uint32("blockHeight", s.chain.BlockHeight()),
zap.Uint32("headerHeight", s.chain.HeaderHeight())) zap.Uint32("headerHeight", s.chain.HeaderHeight()))
@ -272,6 +286,7 @@ func (s *Server) Start() {
s.initStaleMemPools() s.initStaleMemPools()
var txThreads = optimalNumOfThreads() var txThreads = optimalNumOfThreads()
s.txHandlerLoopWG.Add(txThreads)
for i := 0; i < txThreads; i++ { for i := 0; i < txThreads; i++ {
go s.txHandlerLoop() go s.txHandlerLoop()
} }
@ -285,12 +300,15 @@ func (s *Server) Start() {
setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10)) setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10))
setNeoGoVersion(config.Version) setNeoGoVersion(config.Version)
setSeverID(strconv.FormatUint(uint64(s.id), 10)) setSeverID(strconv.FormatUint(uint64(s.id), 10))
s.run() go s.run()
} }
// Shutdown disconnects all peers and stops listening. Calling it twice is an error, // Shutdown disconnects all peers and stops listening. Calling it twice is a no-op,
// once stopped the same intance of the Server can't be started again by calling Start. // once stopped the same instance of the Server can't be started again by calling Start.
func (s *Server) Shutdown() { func (s *Server) Shutdown() {
if !s.started.CompareAndSwap(true, false) {
return
}
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
for _, tr := range s.transports { for _, tr := range s.transports {
tr.Close() tr.Close()
@ -309,7 +327,13 @@ func (s *Server) Shutdown() {
s.notaryRequestPool.StopSubscriptions() s.notaryRequestPool.StopSubscriptions()
} }
close(s.quit) close(s.quit)
<-s.broadcastTxFin
<-s.runProtoFin
<-s.relayFin <-s.relayFin
<-s.runFin
s.txHandlerLoopWG.Wait()
_ = s.log.Sync()
} }
// AddService allows to add a service to be started/stopped by Server. // AddService allows to add a service to be started/stopped by Server.
@ -423,6 +447,7 @@ func (s *Server) run() {
addrTimer = time.NewTimer(peerCheckTime) addrTimer = time.NewTimer(peerCheckTime)
peerTimer = time.NewTimer(s.ProtoTickInterval) peerTimer = time.NewTimer(s.ProtoTickInterval)
) )
defer close(s.runFin)
defer addrTimer.Stop() defer addrTimer.Stop()
defer peerTimer.Stop() defer peerTimer.Stop()
go s.runProto() go s.runProto()
@ -521,6 +546,7 @@ func (s *Server) run() {
// runProto is a goroutine that manages server-wide protocol events. // runProto is a goroutine that manages server-wide protocol events.
func (s *Server) runProto() { func (s *Server) runProto() {
defer close(s.runProtoFin)
pingTimer := time.NewTimer(s.PingInterval) pingTimer := time.NewTimer(s.PingInterval)
for { for {
prevHeight := s.chain.BlockHeight() prevHeight := s.chain.BlockHeight()
@ -1123,6 +1149,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
} }
func (s *Server) txHandlerLoop() { func (s *Server) txHandlerLoop() {
defer s.txHandlerLoopWG.Done()
txloop: txloop:
for { for {
select { select {
@ -1639,6 +1666,7 @@ func (s *Server) broadcastTxLoop() {
batchSize = 42 batchSize = 42
) )
defer close(s.broadcastTxFin)
txs := make([]util.Uint256, 0, batchSize) txs := make([]util.Uint256, 0, batchSize)
var timer *time.Timer var timer *time.Timer

View file

@ -90,16 +90,18 @@ func TestServerStartAndShutdown(t *testing.T) {
t.Run("no consensus", func(t *testing.T) { t.Run("no consensus", func(t *testing.T) {
s := newTestServer(t, ServerConfig{}) s := newTestServer(t, ServerConfig{})
go s.Start() s.Start()
p := newLocalPeer(t, s) p := newLocalPeer(t, s)
s.register <- p s.register <- p
require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10)
assert.True(t, s.transports[0].(*fakeTransp).started.Load()) assert.True(t, s.transports[0].(*fakeTransp).started.Load())
require.True(t, s.started.Load())
assert.Nil(t, s.txCallback) assert.Nil(t, s.txCallback)
s.Shutdown() s.Shutdown()
require.False(t, s.started.Load())
require.True(t, s.transports[0].(*fakeTransp).closed.Load()) require.True(t, s.transports[0].(*fakeTransp).closed.Load())
err, ok := p.droppedWith.Load().(error) err, ok := p.droppedWith.Load().(error)
require.True(t, ok) require.True(t, ok)
@ -110,16 +112,39 @@ func TestServerStartAndShutdown(t *testing.T) {
cons := new(fakeConsensus) cons := new(fakeConsensus)
s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)
go s.Start() s.Start()
p := newLocalPeer(t, s) p := newLocalPeer(t, s)
s.register <- p s.register <- p
assert.True(t, s.services["fake"].(*fakeConsensus).started.Load()) assert.True(t, s.services["fake"].(*fakeConsensus).started.Load())
require.True(t, s.started.Load())
s.Shutdown() s.Shutdown()
require.False(t, s.started.Load())
require.True(t, s.services["fake"].(*fakeConsensus).stopped.Load()) require.True(t, s.services["fake"].(*fakeConsensus).stopped.Load())
}) })
t.Run("double start", func(t *testing.T) {
s := newTestServer(t, ServerConfig{})
startWithCleanup(t, s)
// Attempt to start the server again.
s.Start()
require.True(t, s.started.Load(), "server should still be marked as started after second Start call")
})
t.Run("double shutdown", func(t *testing.T) {
s := newTestServer(t, ServerConfig{})
s.Start()
require.True(t, s.started.Load(), "server should still be marked as started after second Start call")
s.Shutdown()
require.False(t, s.started.Load(), "server should be marked as not started after second Shutdown call")
// Attempt to shutdown the server again.
s.Shutdown()
// Verify the server state remains unchanged and is still considered shutdown.
require.False(t, s.started.Load(), "server should remain shutdown after second call")
})
} }
func TestServerRegisterPeer(t *testing.T) { func TestServerRegisterPeer(t *testing.T) {
@ -312,7 +337,7 @@ func TestServerNotSendsVerack(t *testing.T) {
s.id = 1 s.id = 1
finished := make(chan struct{}) finished := make(chan struct{})
go func() { go func() {
s.run() go s.run()
close(finished) close(finished)
}() }()
t.Cleanup(func() { t.Cleanup(func() {
@ -389,7 +414,7 @@ func startTestServer(t *testing.T, protocolCfg ...func(*config.Blockchain)) *Ser
} }
func startWithCleanup(t *testing.T, s *Server) { func startWithCleanup(t *testing.T, s *Server) {
go s.Start() s.Start()
t.Cleanup(func() { t.Cleanup(func() {
s.Shutdown() s.Shutdown()
}) })

View file

@ -75,4 +75,5 @@ func (ms *Service) ShutDown() {
ms.log.Error("can't shut service down", zap.String("endpoint", srv.Addr), zap.Error(err)) ms.log.Error("can't shut service down", zap.String("endpoint", srv.Addr), zap.Error(err))
} }
} }
_ = ms.log.Sync()
} }

View file

@ -175,13 +175,13 @@ func (n *Notary) Start() {
return return
} }
n.Config.Log.Info("starting notary service") n.Config.Log.Info("starting notary service")
n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh)
go n.newTxCallbackLoop() go n.newTxCallbackLoop()
go n.mainLoop() go n.mainLoop()
} }
func (n *Notary) mainLoop() { func (n *Notary) mainLoop() {
n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh)
mainloop: mainloop:
for { for {
select { select {
@ -228,6 +228,7 @@ func (n *Notary) Shutdown() {
close(n.stopCh) close(n.stopCh)
<-n.done <-n.done
n.wallet.Close() n.wallet.Close()
_ = n.Config.Log.Sync()
} }
// IsAuthorized returns whether Notary service currently is authorized to collect // IsAuthorized returns whether Notary service currently is authorized to collect

View file

@ -192,6 +192,7 @@ func (o *Oracle) Shutdown() {
o.ResponseHandler.Shutdown() o.ResponseHandler.Shutdown()
<-o.done <-o.done
o.wallet.Close() o.wallet.Close()
_ = o.Log.Sync()
} }
// Start runs the oracle service in a separate goroutine. // Start runs the oracle service in a separate goroutine.

View file

@ -482,6 +482,7 @@ func (s *Server) Shutdown() {
// Wait for handleSubEvents to finish. // Wait for handleSubEvents to finish.
<-s.subEventsToExitCh <-s.subEventsToExitCh
_ = s.log.Sync()
} }
// SetOracleHandler allows to update oracle handler used by the Server. // SetOracleHandler allows to update oracle handler used by the Server.

View file

@ -99,7 +99,7 @@ func TestSubscriptions(t *testing.T) {
defer chain.Close() defer chain.Close()
defer rpcSrv.Shutdown() defer rpcSrv.Shutdown()
go rpcSrv.coreServer.Start() rpcSrv.coreServer.Start()
defer rpcSrv.coreServer.Shutdown() defer rpcSrv.coreServer.Shutdown()
for _, feed := range subFeeds { for _, feed := range subFeeds {
@ -395,7 +395,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) {
} }
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
go rpcSrv.coreServer.Start() rpcSrv.coreServer.Start()
defer chain.Close() defer chain.Close()
defer rpcSrv.Shutdown() defer rpcSrv.Shutdown()

View file

@ -29,11 +29,11 @@ func (s *service) Start() {
return return
} }
s.log.Info("starting state validation service") s.log.Info("starting state validation service")
s.chain.SubscribeForBlocks(s.blockCh)
go s.run() go s.run()
} }
func (s *service) run() { func (s *service) run() {
s.chain.SubscribeForBlocks(s.blockCh)
runloop: runloop:
for { for {
select { select {
@ -77,6 +77,7 @@ func (s *service) Shutdown() {
if s.wallet != nil { if s.wallet != nil {
s.wallet.Close() s.wallet.Close()
} }
_ = s.log.Sync()
} }
func (s *service) signAndSend(r *state.MPTRoot) error { func (s *service) signAndSend(r *state.MPTRoot) error {