services: move blockchain/mempool subscriptions to separate routine

Start of some services is bound to blockchain subscriptions, and thus,
can't be run before the blockchain notifications dispatcher.

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-02-22 16:50:58 +03:00
parent f8dc5ec44f
commit 4715e523e0
3 changed files with 11 additions and 4 deletions

View file

@ -281,7 +281,6 @@ func (s *service) Start() {
b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block! b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block!
s.lastTimestamp = b.Timestamp s.lastTimestamp = b.Timestamp
s.dbft.Start(s.lastTimestamp * nsInMs) s.dbft.Start(s.lastTimestamp * nsInMs)
s.Chain.SubscribeForBlocks(s.blockEvents)
go s.eventLoop() go s.eventLoop()
} }
} }
@ -299,6 +298,14 @@ func (s *service) Shutdown() {
} }
func (s *service) eventLoop() { func (s *service) eventLoop() {
s.Chain.SubscribeForBlocks(s.blockEvents)
// Manually sync up with potentially missed fresh blocks that may be added by blockchain
// before the subscription.
b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block!
if b.Timestamp >= s.lastTimestamp {
s.handleChainBlock(b)
}
events: events:
for { for {
select { select {

View file

@ -175,13 +175,13 @@ func (n *Notary) Start() {
return return
} }
n.Config.Log.Info("starting notary service") n.Config.Log.Info("starting notary service")
n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh)
go n.newTxCallbackLoop() go n.newTxCallbackLoop()
go n.mainLoop() go n.mainLoop()
} }
func (n *Notary) mainLoop() { func (n *Notary) mainLoop() {
n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh)
mainloop: mainloop:
for { for {
select { select {

View file

@ -29,11 +29,11 @@ func (s *service) Start() {
return return
} }
s.log.Info("starting state validation service") s.log.Info("starting state validation service")
s.chain.SubscribeForBlocks(s.blockCh)
go s.run() go s.run()
} }
func (s *service) run() { func (s *service) run() {
s.chain.SubscribeForBlocks(s.blockCh)
runloop: runloop:
for { for {
select { select {