network: fix server shutdown by waiting for goroutines to finish

s.Shutdown() does not wait for all goroutines of the node server to
finish normally just because the server exits without dependent
goroutines awaiting. Which causes logs to attempt to write after the
test has ended. The consequence of this bug fix is that corresponding
tests are fixed.

Close #2973
Close #2974

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-02-21 12:15:13 +03:00
parent 5bb7c6b715
commit 5cf0c75744

View file

@ -124,12 +124,14 @@ type (
lastRequestedBlock atomic.Uint32
// lastRequestedHeader contains a height of the last requested header.
lastRequestedHeader atomic.Uint32
register chan Peer
unregister chan peerDrop
handshake chan Peer
quit chan struct{}
relayFin chan struct{}
register chan Peer
unregister chan peerDrop
handshake chan Peer
quit chan struct{}
relayFin chan struct{}
runFin chan struct{}
broadcastTxFin chan struct{}
runProtoFin chan struct{}
transactions chan *transaction.Transaction
@ -141,6 +143,8 @@ type (
// started used to Start and Shutdown server only once.
started atomic.Bool
txHandlerLoopWG sync.WaitGroup
}
peerDrop struct {
@ -183,6 +187,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
config: chain.GetConfig().ProtocolConfiguration,
quit: make(chan struct{}),
relayFin: make(chan struct{}),
runFin: make(chan struct{}),
broadcastTxFin: make(chan struct{}),
runProtoFin: make(chan struct{}),
register: make(chan Peer),
unregister: make(chan peerDrop),
handshake: make(chan Peer),
@ -279,6 +286,7 @@ func (s *Server) Start() {
s.initStaleMemPools()
var txThreads = optimalNumOfThreads()
s.txHandlerLoopWG.Add(txThreads)
for i := 0; i < txThreads; i++ {
go s.txHandlerLoop()
}
@ -319,7 +327,11 @@ func (s *Server) Shutdown() {
s.notaryRequestPool.StopSubscriptions()
}
close(s.quit)
<-s.broadcastTxFin
<-s.runProtoFin
<-s.relayFin
<-s.runFin
s.txHandlerLoopWG.Wait()
_ = s.log.Sync()
}
@ -435,6 +447,7 @@ func (s *Server) run() {
addrTimer = time.NewTimer(peerCheckTime)
peerTimer = time.NewTimer(s.ProtoTickInterval)
)
defer close(s.runFin)
defer addrTimer.Stop()
defer peerTimer.Stop()
go s.runProto()
@ -533,6 +546,7 @@ func (s *Server) run() {
// runProto is a goroutine that manages server-wide protocol events.
func (s *Server) runProto() {
defer close(s.runProtoFin)
pingTimer := time.NewTimer(s.PingInterval)
for {
prevHeight := s.chain.BlockHeight()
@ -1135,6 +1149,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
}
func (s *Server) txHandlerLoop() {
defer s.txHandlerLoopWG.Done()
txloop:
for {
select {
@ -1651,6 +1666,7 @@ func (s *Server) broadcastTxLoop() {
batchSize = 42
)
defer close(s.broadcastTxFin)
txs := make([]util.Uint256, 0, batchSize)
var timer *time.Timer