diff --git a/pkg/network/server.go b/pkg/network/server.go index 6132e69ff..5bc4b3c8f 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -124,12 +124,14 @@ type ( lastRequestedBlock atomic.Uint32 // lastRequestedHeader contains a height of the last requested header. lastRequestedHeader atomic.Uint32 - - register chan Peer - unregister chan peerDrop - handshake chan Peer - quit chan struct{} - relayFin chan struct{} + register chan Peer + unregister chan peerDrop + handshake chan Peer + quit chan struct{} + relayFin chan struct{} + runFin chan struct{} + broadcastTxFin chan struct{} + runProtoFin chan struct{} transactions chan *transaction.Transaction @@ -141,6 +143,8 @@ type ( // started used to Start and Shutdown server only once. started atomic.Bool + + txHandlerLoopWG sync.WaitGroup } peerDrop struct { @@ -183,6 +187,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy config: chain.GetConfig().ProtocolConfiguration, quit: make(chan struct{}), relayFin: make(chan struct{}), + runFin: make(chan struct{}), + broadcastTxFin: make(chan struct{}), + runProtoFin: make(chan struct{}), register: make(chan Peer), unregister: make(chan peerDrop), handshake: make(chan Peer), @@ -279,6 +286,7 @@ func (s *Server) Start() { s.initStaleMemPools() var txThreads = optimalNumOfThreads() + s.txHandlerLoopWG.Add(txThreads) for i := 0; i < txThreads; i++ { go s.txHandlerLoop() } @@ -319,7 +327,11 @@ func (s *Server) Shutdown() { s.notaryRequestPool.StopSubscriptions() } close(s.quit) + <-s.broadcastTxFin + <-s.runProtoFin <-s.relayFin + <-s.runFin + s.txHandlerLoopWG.Wait() _ = s.log.Sync() } @@ -435,6 +447,7 @@ func (s *Server) run() { addrTimer = time.NewTimer(peerCheckTime) peerTimer = time.NewTimer(s.ProtoTickInterval) ) + defer close(s.runFin) defer addrTimer.Stop() defer peerTimer.Stop() go s.runProto() @@ -533,6 +546,7 @@ func (s *Server) run() { // runProto is a goroutine that manages server-wide protocol events. func (s *Server) runProto() { + defer close(s.runProtoFin) pingTimer := time.NewTimer(s.PingInterval) for { prevHeight := s.chain.BlockHeight() @@ -1135,6 +1149,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { } func (s *Server) txHandlerLoop() { + defer s.txHandlerLoopWG.Done() txloop: for { select { @@ -1651,6 +1666,7 @@ func (s *Server) broadcastTxLoop() { batchSize = 42 ) + defer close(s.broadcastTxFin) txs := make([]util.Uint256, 0, batchSize) var timer *time.Timer