Merge pull request #2566 from nspcc-dev/fix-service-shutdown
*: properly unsubscribe from Blockchain events
This commit is contained in:
commit
cc37de5331
4 changed files with 35 additions and 2 deletions
|
@ -286,6 +286,7 @@ events:
|
||||||
select {
|
select {
|
||||||
case <-s.quit:
|
case <-s.quit:
|
||||||
s.dbft.Timer.Stop()
|
s.dbft.Timer.Stop()
|
||||||
|
s.Chain.UnsubscribeFromBlocks(s.blockEvents)
|
||||||
break events
|
break events
|
||||||
case <-s.dbft.Timer.C():
|
case <-s.dbft.Timer.C():
|
||||||
hv := s.dbft.Timer.HV()
|
hv := s.dbft.Timer.HV()
|
||||||
|
@ -331,6 +332,15 @@ events:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
drainBlocksLoop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.blockEvents:
|
||||||
|
default:
|
||||||
|
break drainBlocksLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(s.blockEvents)
|
||||||
close(s.finished)
|
close(s.finished)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1344,11 +1344,12 @@ func (s *Server) broadcastHPMessage(msg *Message) {
|
||||||
func (s *Server) relayBlocksLoop() {
|
func (s *Server) relayBlocksLoop() {
|
||||||
ch := make(chan *block.Block, 2) // Some buffering to smooth out possible egressing delays.
|
ch := make(chan *block.Block, 2) // Some buffering to smooth out possible egressing delays.
|
||||||
s.chain.SubscribeForBlocks(ch)
|
s.chain.SubscribeForBlocks(ch)
|
||||||
|
mainloop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.quit:
|
case <-s.quit:
|
||||||
s.chain.UnsubscribeFromBlocks(ch)
|
s.chain.UnsubscribeFromBlocks(ch)
|
||||||
return
|
break mainloop
|
||||||
case b := <-ch:
|
case b := <-ch:
|
||||||
msg := NewMessage(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()}))
|
msg := NewMessage(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()}))
|
||||||
// Filter out nodes that are more current (avoid spamming the network
|
// Filter out nodes that are more current (avoid spamming the network
|
||||||
|
@ -1359,6 +1360,15 @@ func (s *Server) relayBlocksLoop() {
|
||||||
s.extensiblePool.RemoveStale(b.Index)
|
s.extensiblePool.RemoveStale(b.Index)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
drainBlocksLoop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
default:
|
||||||
|
break drainBlocksLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
// verifyAndPoolTX verifies the TX and adds it to the local mempool.
|
// verifyAndPoolTX verifies the TX and adds it to the local mempool.
|
||||||
|
|
|
@ -167,12 +167,13 @@ func (n *Notary) Start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Notary) mainLoop() {
|
func (n *Notary) mainLoop() {
|
||||||
|
mainloop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-n.stopCh:
|
case <-n.stopCh:
|
||||||
n.mp.UnsubscribeFromTransactions(n.reqCh)
|
n.mp.UnsubscribeFromTransactions(n.reqCh)
|
||||||
n.Config.Chain.UnsubscribeFromBlocks(n.blocksCh)
|
n.Config.Chain.UnsubscribeFromBlocks(n.blocksCh)
|
||||||
return
|
break mainloop
|
||||||
case event := <-n.reqCh:
|
case event := <-n.reqCh:
|
||||||
if req, ok := event.Data.(*payload.P2PNotaryRequest); ok {
|
if req, ok := event.Data.(*payload.P2PNotaryRequest); ok {
|
||||||
switch event.Type {
|
switch event.Type {
|
||||||
|
@ -187,6 +188,17 @@ func (n *Notary) mainLoop() {
|
||||||
n.PostPersist()
|
n.PostPersist()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
drainLoop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-n.blocksCh:
|
||||||
|
case <-n.reqCh:
|
||||||
|
default:
|
||||||
|
break drainLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(n.blocksCh)
|
||||||
|
close(n.reqCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown stops the Notary module.
|
// Shutdown stops the Notary module.
|
||||||
|
|
|
@ -55,6 +55,7 @@ drainloop:
|
||||||
break drainloop
|
break drainloop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
close(s.blockCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown stops the service.
|
// Shutdown stops the service.
|
||||||
|
|
Loading…
Reference in a new issue