*: properly unsubscribe from Blockchain events

This commit is contained in:
Anna Shaleva 2022-06-28 18:07:22 +03:00
parent a748298564
commit 8ab422da66
4 changed files with 35 additions and 2 deletions

View file

@ -286,6 +286,7 @@ events:
select { select {
case <-s.quit: case <-s.quit:
s.dbft.Timer.Stop() s.dbft.Timer.Stop()
s.Chain.UnsubscribeFromBlocks(s.blockEvents)
break events break events
case <-s.dbft.Timer.C(): case <-s.dbft.Timer.C():
hv := s.dbft.Timer.HV() hv := s.dbft.Timer.HV()
@ -331,6 +332,15 @@ events:
default: default:
} }
} }
drainBlocksLoop:
for {
select {
case <-s.blockEvents:
default:
break drainBlocksLoop
}
}
close(s.blockEvents)
close(s.finished) close(s.finished)
} }

View file

@ -1344,11 +1344,12 @@ func (s *Server) broadcastHPMessage(msg *Message) {
func (s *Server) relayBlocksLoop() { func (s *Server) relayBlocksLoop() {
ch := make(chan *block.Block, 2) // Some buffering to smooth out possible egressing delays. ch := make(chan *block.Block, 2) // Some buffering to smooth out possible egressing delays.
s.chain.SubscribeForBlocks(ch) s.chain.SubscribeForBlocks(ch)
mainloop:
for { for {
select { select {
case <-s.quit: case <-s.quit:
s.chain.UnsubscribeFromBlocks(ch) s.chain.UnsubscribeFromBlocks(ch)
return break mainloop
case b := <-ch: case b := <-ch:
msg := NewMessage(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) msg := NewMessage(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()}))
// Filter out nodes that are more current (avoid spamming the network // Filter out nodes that are more current (avoid spamming the network
@ -1359,6 +1360,15 @@ func (s *Server) relayBlocksLoop() {
s.extensiblePool.RemoveStale(b.Index) s.extensiblePool.RemoveStale(b.Index)
} }
} }
drainBlocksLoop:
for {
select {
case <-ch:
default:
break drainBlocksLoop
}
}
close(ch)
} }
// verifyAndPoolTX verifies the TX and adds it to the local mempool. // verifyAndPoolTX verifies the TX and adds it to the local mempool.

View file

@ -167,12 +167,13 @@ func (n *Notary) Start() {
} }
func (n *Notary) mainLoop() { func (n *Notary) mainLoop() {
mainloop:
for { for {
select { select {
case <-n.stopCh: case <-n.stopCh:
n.mp.UnsubscribeFromTransactions(n.reqCh) n.mp.UnsubscribeFromTransactions(n.reqCh)
n.Config.Chain.UnsubscribeFromBlocks(n.blocksCh) n.Config.Chain.UnsubscribeFromBlocks(n.blocksCh)
return break mainloop
case event := <-n.reqCh: case event := <-n.reqCh:
if req, ok := event.Data.(*payload.P2PNotaryRequest); ok { if req, ok := event.Data.(*payload.P2PNotaryRequest); ok {
switch event.Type { switch event.Type {
@ -187,6 +188,17 @@ func (n *Notary) mainLoop() {
n.PostPersist() n.PostPersist()
} }
} }
drainLoop:
for {
select {
case <-n.blocksCh:
case <-n.reqCh:
default:
break drainLoop
}
}
close(n.blocksCh)
close(n.reqCh)
} }
// Shutdown stops the Notary module. // Shutdown stops the Notary module.

View file

@ -55,6 +55,7 @@ drainloop:
break drainloop break drainloop
} }
} }
close(s.blockCh)
} }
// Shutdown stops the service. // Shutdown stops the service.