From 8ab422da6609f2871a01de02de9802b6e7d6c300 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 28 Jun 2022 18:07:22 +0300 Subject: [PATCH] *: properly unsubscribe from Blockchain events --- pkg/consensus/consensus.go | 10 ++++++++++ pkg/network/server.go | 12 +++++++++++- pkg/services/notary/notary.go | 14 +++++++++++++- pkg/services/stateroot/validators.go | 1 + 4 files changed, 35 insertions(+), 2 deletions(-) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 8c09ba212..dfdb8539b 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -286,6 +286,7 @@ events: select { case <-s.quit: s.dbft.Timer.Stop() + s.Chain.UnsubscribeFromBlocks(s.blockEvents) break events case <-s.dbft.Timer.C(): hv := s.dbft.Timer.HV() @@ -331,6 +332,15 @@ events: default: } } +drainBlocksLoop: + for { + select { + case <-s.blockEvents: + default: + break drainBlocksLoop + } + } + close(s.blockEvents) close(s.finished) } diff --git a/pkg/network/server.go b/pkg/network/server.go index ed79ee54c..91e591f8d 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1344,11 +1344,12 @@ func (s *Server) broadcastHPMessage(msg *Message) { func (s *Server) relayBlocksLoop() { ch := make(chan *block.Block, 2) // Some buffering to smooth out possible egressing delays. s.chain.SubscribeForBlocks(ch) +mainloop: for { select { case <-s.quit: s.chain.UnsubscribeFromBlocks(ch) - return + break mainloop case b := <-ch: msg := NewMessage(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) // Filter out nodes that are more current (avoid spamming the network @@ -1359,6 +1360,15 @@ func (s *Server) relayBlocksLoop() { s.extensiblePool.RemoveStale(b.Index) } } +drainBlocksLoop: + for { + select { + case <-ch: + default: + break drainBlocksLoop + } + } + close(ch) } // verifyAndPoolTX verifies the TX and adds it to the local mempool. diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index cd50cecae..bc567055c 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -167,12 +167,13 @@ func (n *Notary) Start() { } func (n *Notary) mainLoop() { +mainloop: for { select { case <-n.stopCh: n.mp.UnsubscribeFromTransactions(n.reqCh) n.Config.Chain.UnsubscribeFromBlocks(n.blocksCh) - return + break mainloop case event := <-n.reqCh: if req, ok := event.Data.(*payload.P2PNotaryRequest); ok { switch event.Type { @@ -187,6 +188,17 @@ func (n *Notary) mainLoop() { n.PostPersist() } } +drainLoop: + for { + select { + case <-n.blocksCh: + case <-n.reqCh: + default: + break drainLoop + } + } + close(n.blocksCh) + close(n.reqCh) } // Shutdown stops the Notary module. diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index e762e6331..03ec9cf63 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -55,6 +55,7 @@ drainloop: break drainloop } } + close(s.blockCh) } // Shutdown stops the service.